From 7448a6e8ccc62c416dc4c1f34ec348cf08e78dd4 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 28 2020 07:17:51 +0000 Subject: [PATCH 1/4] proton: persistent message queue Fixes: https://pagure.io/koji/issue/2230 --- diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 5fdbf75..2151e87 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -158,3 +158,38 @@ And in scripts, you can use following calls: ks = koji.ClientSession('https://koji.fedoraproject.org/kojihub') ks.gssapi_login() ks.createSideTag('f30-build') + +Proton messaging +================ + +It is hub-only plugin which can send all the messages produced by koji to amqps +message brokers. + +``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. +Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``. +There are three sections in config file - broker, queue and message. + +Broker section allows admin to set up connection options like urls, +certificates, timeouts and topic prefix. + +Normally, only messages in apache process memory are remembered. There are +various reasons, why these messages can be lost if broker is unavailable for +longer time. For more reliability admin can enable persistent database message +queue. For this is section ``queue`` where ``enabled`` boolean enables this +behaviour. Currently you need to create table manually by running the following +SQL: + +.. code-block:: plpgsql + + CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + props JSON NOT NULL, + body JSON NOT NULL + ) + +Last related option is ``batch_size`` - it says how many messages are send +during one request. It should be balanced number. If there is a large queue it +shouldn't block the request itself as user is waiting for it. On the other hand +it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small +messages if ``extra_limit`` is set to small number or less bigger messages with +unlimited size. diff --git a/plugins/hub/protonmsg.conf b/plugins/hub/protonmsg.conf index 97c2679..8f912de 100644 --- a/plugins/hub/protonmsg.conf +++ b/plugins/hub/protonmsg.conf @@ -11,3 +11,11 @@ send_timeout = 60 # if field is longer (json.dumps), ignore it # default value is 0 - unlimited size extra_limit = 0 + +[queue] +# enable persistent database queue +enabled = true +# how many messages are picked from db in one call +# note, that big number can slow requests if there +# is a huge message backlog (typically after broker outage) +batch_size = 100 diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 219401a..f542589 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -16,6 +16,7 @@ from proton.reactor import Container import koji from koji.context import context from koji.plugin import callback, convert_datetime, ignore_error +from kojihub import QueryProcessor, InsertProcessor CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf' CONFIG = None @@ -295,34 +296,91 @@ def prep_repo_done(cbtype, *args, **kws): queue_msg(address, props, kws) +def _send_msgs(urls, msgs, CONFIG): + random.shuffle(urls) + for url in urls: + container = Container(TimeoutHandler(url, msgs, CONFIG)) + container.run() + if msgs: + LOG.debug('could not send to %s, %s messages remaining', + url, len(msgs)) + else: + LOG.debug('all messages sent to %s successfully', url) + break + else: + LOG.error('could not send messages to any destinations') + return msgs + + +def store_to_db(msgs): + for msg in msgs: + if isinstance(msg, tuple): + address = msg[0] + props = json.dumps(msg[1]) + body = msg[2] + else: + address = msg['address'] + body = msg['body'] # already serialized + props = json.dumps(msg['props']) + insert = InsertProcessor(table='proton_queue') + insert.set(address=address, props=props, body=body) + if 'id' in msg: + # if we've something from db, we should store it in correct order + insert.set(id=msg['db_id']) + insert.execute() + context.cnx.commit() + + +def query_from_db(): + limit = CONFIG.getint('queue', 'batch_size', fallback=100) + c = context.cnx.cursor() + c.execute('BEGIN') + c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + query = QueryProcessor(tables=('proton_queue',), + columns=('id', 'address', 'props', 'body'), + opts={'order': 'id', 'limit': limit}) + msgs = list(query.execute()) + if msgs: + c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', + {'ids': [msg['id'] for msg in msgs]}) + c.execute('COMMIT') + + @ignore_error @convert_datetime @callback('postCommit') def send_queued_msgs(cbtype, *args, **kws): + global CONFIG msgs = getattr(context, 'protonmsg_msgs', None) if not msgs: return - global CONFIG if not CONFIG: CONFIG = koji.read_config_files([(CONFIG_FILE, True)]) urls = CONFIG.get('broker', 'urls').split() test_mode = False if CONFIG.has_option('broker', 'test_mode'): test_mode = CONFIG.getboolean('broker', 'test_mode') + db_enabled = False + if CONFIG.has_option('queue', 'enabled'): + db_enabled = CONFIG.getboolean('queue', 'test_mode') if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) for msg in msgs: LOG.debug('test mode: skipped msg: %r', msg) return - random.shuffle(urls) - for url in urls: - container = Container(TimeoutHandler(url, msgs, CONFIG)) - container.run() + + msgs = _send_msgs(urls, msgs, CONFIG) + + if db_enabled and not test_mode: if msgs: - LOG.debug('could not send to %s, %s messages remaining', - url, len(msgs)) + # if we still have some messages, store them and leave for another call to pick them up + store_to_db(msgs) else: - LOG.debug('all messages sent to %s successfully', url) - break - else: - LOG.error('could not send messages to any destinations') + # otherwise we are another call - look to db if there remains something to send + msgs = query_from_db() + msgs = _send_msgs(urls, msgs, CONFIG) + # return unsuccesful data to db + store_to_db(msgs) + + if msgs: + LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs)) diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 4bcad7e..9a38bbd 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -215,7 +215,7 @@ extra_limit = 2048 self.assertEqual(log.debug.call_count, 2) for args in log.debug.call_args_list: self.assertTrue(args[0][0].startswith('could not send')) - self.assertEqual(log.error.call_count, 1) + self.assertEqual(log.error.call_count, 2) self.assertTrue(log.error.call_args[0][0].startswith('could not send')) @patch('protonmsg.Container') From 49a2725f6c8679bde291df9c7161dac3bd50f89e Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 28 2020 07:17:51 +0000 Subject: [PATCH 2/4] delete too old messages --- diff --git a/plugins/hub/protonmsg.conf b/plugins/hub/protonmsg.conf index 8f912de..33e98b7 100644 --- a/plugins/hub/protonmsg.conf +++ b/plugins/hub/protonmsg.conf @@ -19,3 +19,5 @@ enabled = true # note, that big number can slow requests if there # is a huge message backlog (typically after broker outage) batch_size = 100 +# how old messages should be stored (hours) +max_age = 24 diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index f542589..b34decf 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -336,6 +336,8 @@ def query_from_db(): c = context.cnx.cursor() c.execute('BEGIN') c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % + CONFIG.getint('queue', 'age', fallback=24)) query = QueryProcessor(tables=('proton_queue',), columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) From 0d71aa653aedb2dddad182f4fb23874fe34d5bab Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 28 2020 07:17:51 +0000 Subject: [PATCH 3/4] proton: handling of lock failure --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index b34decf..23f4dd3 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -9,6 +9,7 @@ import json import logging import random +import psycopg2 from proton import Message, SSLDomain from proton.handlers import MessagingHandler from proton.reactor import Container @@ -313,6 +314,9 @@ def _send_msgs(urls, msgs, CONFIG): def store_to_db(msgs): + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') for msg in msgs: if isinstance(msg, tuple): address = msg[0] @@ -328,24 +332,29 @@ def store_to_db(msgs): # if we've something from db, we should store it in correct order insert.set(id=msg['db_id']) insert.execute() - context.cnx.commit() + c.execute('COMMIT') def query_from_db(): limit = CONFIG.getint('queue', 'batch_size', fallback=100) - c = context.cnx.cursor() - c.execute('BEGIN') - c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') - c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % - CONFIG.getint('queue', 'age', fallback=24)) - query = QueryProcessor(tables=('proton_queue',), - columns=('id', 'address', 'props', 'body'), - opts={'order': 'id', 'limit': limit}) - msgs = list(query.execute()) - if msgs: - c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', - {'ids': [msg['id'] for msg in msgs]}) - c.execute('COMMIT') + try: + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') + c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % + CONFIG.getint('queue', 'age', fallback=24)) + query = QueryProcessor(tables=('proton_queue',), + columns=('id', 'address', 'props', 'body'), + opts={'order': 'id', 'limit': limit}) + msgs = list(query.execute()) + if msgs: + c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', + {'ids': [msg['id'] for msg in msgs]}) + c.execute('COMMIT') + return msgs + except psycopg2.errors.LockNotAvailable: + return [] @ignore_error @@ -364,7 +373,7 @@ def send_queued_msgs(cbtype, *args, **kws): test_mode = CONFIG.getboolean('broker', 'test_mode') db_enabled = False if CONFIG.has_option('queue', 'enabled'): - db_enabled = CONFIG.getboolean('queue', 'test_mode') + db_enabled = CONFIG.getboolean('queue', 'enabled') if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) for msg in msgs: From 209a235e1cc1ad2e00e00c8ebaaacf74e86824aa Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Sep 28 2020 07:17:51 +0000 Subject: [PATCH 4/4] avoid message re-insertion, extend test mode, schema and docs updates include new table in main schema, since this plugin is part of Koji itself clean up and expand the docs for this plugin refactor query_from_db() into handle_db_msgs() * fix lock error cases * only delete messages from queue if we successfully send them * handle test_mode allow test_mode to exercise db queue via test_mode_fail setting --- diff --git a/docs/schema-upgrade-1.22-1.23.sql b/docs/schema-upgrade-1.22-1.23.sql index e5fa4dc..841344d 100644 --- a/docs/schema-upgrade-1.22-1.23.sql +++ b/docs/schema-upgrade-1.22-1.23.sql @@ -6,4 +6,15 @@ BEGIN; CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT; diff --git a/docs/schema.sql b/docs/schema.sql index 74d38d9..eb38d24 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -937,4 +937,15 @@ CREATE TABLE win_archives ( flags TEXT ) WITHOUT OIDS; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT WORK; diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 2151e87..2299af8 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -162,34 +162,54 @@ And in scripts, you can use following calls: Proton messaging ================ -It is hub-only plugin which can send all the messages produced by koji to amqps -message brokers. - -``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. -Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``. -There are three sections in config file - broker, queue and message. - -Broker section allows admin to set up connection options like urls, -certificates, timeouts and topic prefix. - -Normally, only messages in apache process memory are remembered. There are -various reasons, why these messages can be lost if broker is unavailable for -longer time. For more reliability admin can enable persistent database message -queue. For this is section ``queue`` where ``enabled`` boolean enables this -behaviour. Currently you need to create table manually by running the following -SQL: - -.. code-block:: plpgsql - - CREATE TABLE proton_queue ( - id SERIAL PRIMARY KEY, - props JSON NOT NULL, - body JSON NOT NULL - ) - -Last related option is ``batch_size`` - it says how many messages are send -during one request. It should be balanced number. If there is a large queue it -shouldn't block the request itself as user is waiting for it. On the other hand -it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small -messages if ``extra_limit`` is set to small number or less bigger messages with -unlimited size. +The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of +messages about Koji activity to the configured amqps message brokers. +Most callback events on the hub are translated into messages. + +In order to enable this plugin, you must: + +* add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf`` + +* provide a configuration file for the plugin at + ``/etc/koji-hub/plugins/protonmsg.conf`` + +The configuration file is ini-style format with three sections: broker, +queue and message. +The ``[broker]`` section defines how the plugin connects to the message bus. +The following fields are understood: + +* ``urls`` -- a space separated list of amqps urls. Additional urls are + treated as fallbacks. The plugin will send to the first one that accepts + the message +* ``cert`` -- the client cert file for authentication +* ``cacert`` -- the ca cert to validate the server +* ``topic_prefix`` -- this string will be used as a prefix for all message topics +* ``connect_timeout`` -- the number of seconds to wait for a connection before + timing out +* ``send_timeout`` -- the number of seconds to wait while sending a message + before timing out + +The ``[message]`` section sets parameters for how messages are formed. +Currently only one field is understood: + +* ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that + appear in messages. If the ``build.extra`` field is longer (in terms of + json-encoded length), then it will be omitted. The default value is ``0`` + which means no limit. + +The ``[queue]`` section controls how (or if) the plugin will use the database +to queue messages when they cannot be immediately sent. +The following fields are understood: + +* ``enabled`` -- if true, then the feature is enabled +* ``batch_size`` -- the maximum number of queued messages to send at one time +* ``max_age`` -- the age (in hours) at which old messages in the queue are discarded + +It is important to note that the database queue is only a fallback mechanism. +The plugin will always attempt to send messages as they are issued. +Messages are only placed in the database queue when they cannot be immediately +sent on the bus (e.g. if the amqps server is offline). + +Admins should consider the balance between the ``batch_size`` and +``extra_limit`` options, as both can affect the total amount of data that the +plugin could attempt to send during a single call. diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 23f4dd3..db57198 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -335,26 +335,39 @@ def store_to_db(msgs): c.execute('COMMIT') -def query_from_db(): +def handle_db_msgs(urls, CONFIG): limit = CONFIG.getint('queue', 'batch_size', fallback=100) + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') try: - c = context.cnx.cursor() - # we're running in postCommit, so we need to handle new transaction - c.execute('BEGIN') c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + except psycopg2.OperationalError: + LOG.debug('skipping db queue due to lock') + return + try: c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % CONFIG.getint('queue', 'age', fallback=24)) query = QueryProcessor(tables=('proton_queue',), columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) msgs = list(query.execute()) + if CONFIG.getboolean('broker', 'test_mode', fallback=False): + if msgs: + LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) + unsent = [] + else: + unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} + sent = [m for m in msgs if m['id'] not in unsent] if msgs: c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', - {'ids': [msg['id'] for msg in msgs]}) - c.execute('COMMIT') - return msgs - except psycopg2.errors.LockNotAvailable: - return [] + {'ids': [msg['id'] for msg in sent]}) + finally: + # make sure we free the lock + try: + c.execute('COMMIT') + except Exception: + c.execute('ROLLBACK') @ignore_error @@ -374,24 +387,35 @@ def send_queued_msgs(cbtype, *args, **kws): db_enabled = False if CONFIG.has_option('queue', 'enabled'): db_enabled = CONFIG.getboolean('queue', 'enabled') + if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) - for msg in msgs: + fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0) + if fail_chance: + # simulate unsent messages in test mode + sent = [] + unsent = [] + for m in msgs: + if random.randint(1, 100) <= fail_chance: + unsent.append(m) + else: + sent.append(m) + if unsent: + LOG.info('simulating %i unsent messages' % len(unsent)) + else: + sent = msgs + unsent = [] + for msg in sent: LOG.debug('test mode: skipped msg: %r', msg) - return - - msgs = _send_msgs(urls, msgs, CONFIG) + else: + unsent = _send_msgs(urls, msgs, CONFIG) - if db_enabled and not test_mode: - if msgs: + if db_enabled: + if unsent: # if we still have some messages, store them and leave for another call to pick them up store_to_db(msgs) else: # otherwise we are another call - look to db if there remains something to send - msgs = query_from_db() - msgs = _send_msgs(urls, msgs, CONFIG) - # return unsuccesful data to db - store_to_db(msgs) - - if msgs: - LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs)) + handle_db_msgs(urls, CONFIG) + elif unsent: + LOG.error('could not send %i messages. db queue disabled' % len(msgs))