From 4ddc48e72a621ca66c04608d3ab125f47075d5ae Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Sep 30 2020 08:14:49 +0000 Subject: avoid message re-insertion, extend test mode, schema and docs updates include new table in main schema, since this plugin is part of Koji itself clean up and expand the docs for this plugin refactor query_from_db() into handle_db_msgs() * fix lock error cases * only delete messages from queue if we successfully send them * handle test_mode allow test_mode to exercise db queue via test_mode_fail setting --- diff --git a/docs/schema-upgrade-1.22-1.23.sql b/docs/schema-upgrade-1.22-1.23.sql index e5fa4dc..841344d 100644 --- a/docs/schema-upgrade-1.22-1.23.sql +++ b/docs/schema-upgrade-1.22-1.23.sql @@ -6,4 +6,15 @@ BEGIN; CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT; diff --git a/docs/schema.sql b/docs/schema.sql index ac3b374..3fbcd27 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -937,4 +937,15 @@ CREATE TABLE win_archives ( flags TEXT ) WITHOUT OIDS; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT WORK; diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 2151e87..2299af8 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -162,34 +162,54 @@ And in scripts, you can use following calls: Proton messaging ================ -It is hub-only plugin which can send all the messages produced by koji to amqps -message brokers. - -``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. -Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``. -There are three sections in config file - broker, queue and message. - -Broker section allows admin to set up connection options like urls, -certificates, timeouts and topic prefix. - -Normally, only messages in apache process memory are remembered. There are -various reasons, why these messages can be lost if broker is unavailable for -longer time. For more reliability admin can enable persistent database message -queue. For this is section ``queue`` where ``enabled`` boolean enables this -behaviour. Currently you need to create table manually by running the following -SQL: - -.. code-block:: plpgsql - - CREATE TABLE proton_queue ( - id SERIAL PRIMARY KEY, - props JSON NOT NULL, - body JSON NOT NULL - ) - -Last related option is ``batch_size`` - it says how many messages are send -during one request. It should be balanced number. If there is a large queue it -shouldn't block the request itself as user is waiting for it. On the other hand -it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small -messages if ``extra_limit`` is set to small number or less bigger messages with -unlimited size. +The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of +messages about Koji activity to the configured amqps message brokers. +Most callback events on the hub are translated into messages. + +In order to enable this plugin, you must: + +* add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf`` + +* provide a configuration file for the plugin at + ``/etc/koji-hub/plugins/protonmsg.conf`` + +The configuration file is ini-style format with three sections: broker, +queue and message. +The ``[broker]`` section defines how the plugin connects to the message bus. +The following fields are understood: + +* ``urls`` -- a space separated list of amqps urls. Additional urls are + treated as fallbacks. The plugin will send to the first one that accepts + the message +* ``cert`` -- the client cert file for authentication +* ``cacert`` -- the ca cert to validate the server +* ``topic_prefix`` -- this string will be used as a prefix for all message topics +* ``connect_timeout`` -- the number of seconds to wait for a connection before + timing out +* ``send_timeout`` -- the number of seconds to wait while sending a message + before timing out + +The ``[message]`` section sets parameters for how messages are formed. +Currently only one field is understood: + +* ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that + appear in messages. If the ``build.extra`` field is longer (in terms of + json-encoded length), then it will be omitted. The default value is ``0`` + which means no limit. + +The ``[queue]`` section controls how (or if) the plugin will use the database +to queue messages when they cannot be immediately sent. +The following fields are understood: + +* ``enabled`` -- if true, then the feature is enabled +* ``batch_size`` -- the maximum number of queued messages to send at one time +* ``max_age`` -- the age (in hours) at which old messages in the queue are discarded + +It is important to note that the database queue is only a fallback mechanism. +The plugin will always attempt to send messages as they are issued. +Messages are only placed in the database queue when they cannot be immediately +sent on the bus (e.g. if the amqps server is offline). + +Admins should consider the balance between the ``batch_size`` and +``extra_limit`` options, as both can affect the total amount of data that the +plugin could attempt to send during a single call. diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 23f4dd3..db57198 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -335,26 +335,39 @@ def store_to_db(msgs): c.execute('COMMIT') -def query_from_db(): +def handle_db_msgs(urls, CONFIG): limit = CONFIG.getint('queue', 'batch_size', fallback=100) + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') try: - c = context.cnx.cursor() - # we're running in postCommit, so we need to handle new transaction - c.execute('BEGIN') c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + except psycopg2.OperationalError: + LOG.debug('skipping db queue due to lock') + return + try: c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % CONFIG.getint('queue', 'age', fallback=24)) query = QueryProcessor(tables=('proton_queue',), columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) msgs = list(query.execute()) + if CONFIG.getboolean('broker', 'test_mode', fallback=False): + if msgs: + LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) + unsent = [] + else: + unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} + sent = [m for m in msgs if m['id'] not in unsent] if msgs: c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', - {'ids': [msg['id'] for msg in msgs]}) - c.execute('COMMIT') - return msgs - except psycopg2.errors.LockNotAvailable: - return [] + {'ids': [msg['id'] for msg in sent]}) + finally: + # make sure we free the lock + try: + c.execute('COMMIT') + except Exception: + c.execute('ROLLBACK') @ignore_error @@ -374,24 +387,35 @@ def send_queued_msgs(cbtype, *args, **kws): db_enabled = False if CONFIG.has_option('queue', 'enabled'): db_enabled = CONFIG.getboolean('queue', 'enabled') + if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) - for msg in msgs: + fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0) + if fail_chance: + # simulate unsent messages in test mode + sent = [] + unsent = [] + for m in msgs: + if random.randint(1, 100) <= fail_chance: + unsent.append(m) + else: + sent.append(m) + if unsent: + LOG.info('simulating %i unsent messages' % len(unsent)) + else: + sent = msgs + unsent = [] + for msg in sent: LOG.debug('test mode: skipped msg: %r', msg) - return - - msgs = _send_msgs(urls, msgs, CONFIG) + else: + unsent = _send_msgs(urls, msgs, CONFIG) - if db_enabled and not test_mode: - if msgs: + if db_enabled: + if unsent: # if we still have some messages, store them and leave for another call to pick them up store_to_db(msgs) else: # otherwise we are another call - look to db if there remains something to send - msgs = query_from_db() - msgs = _send_msgs(urls, msgs, CONFIG) - # return unsuccesful data to db - store_to_db(msgs) - - if msgs: - LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs)) + handle_db_msgs(urls, CONFIG) + elif unsent: + LOG.error('could not send %i messages. db queue disabled' % len(msgs))