From 8c253396e8bf44dbae3e2d1ffb93ef3c9675d7db Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 30 2020 08:14:49 +0000 Subject: proton: persistent message queue Fixes: https://pagure.io/koji/issue/2230 --- diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 5fdbf75..2151e87 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -158,3 +158,38 @@ And in scripts, you can use following calls: ks = koji.ClientSession('https://koji.fedoraproject.org/kojihub') ks.gssapi_login() ks.createSideTag('f30-build') + +Proton messaging +================ + +It is hub-only plugin which can send all the messages produced by koji to amqps +message brokers. + +``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. +Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``. +There are three sections in config file - broker, queue and message. + +Broker section allows admin to set up connection options like urls, +certificates, timeouts and topic prefix. + +Normally, only messages in apache process memory are remembered. There are +various reasons, why these messages can be lost if broker is unavailable for +longer time. For more reliability admin can enable persistent database message +queue. For this is section ``queue`` where ``enabled`` boolean enables this +behaviour. Currently you need to create table manually by running the following +SQL: + +.. code-block:: plpgsql + + CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + props JSON NOT NULL, + body JSON NOT NULL + ) + +Last related option is ``batch_size`` - it says how many messages are send +during one request. It should be balanced number. If there is a large queue it +shouldn't block the request itself as user is waiting for it. On the other hand +it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small +messages if ``extra_limit`` is set to small number or less bigger messages with +unlimited size. diff --git a/plugins/hub/protonmsg.conf b/plugins/hub/protonmsg.conf index 97c2679..8f912de 100644 --- a/plugins/hub/protonmsg.conf +++ b/plugins/hub/protonmsg.conf @@ -11,3 +11,11 @@ send_timeout = 60 # if field is longer (json.dumps), ignore it # default value is 0 - unlimited size extra_limit = 0 + +[queue] +# enable persistent database queue +enabled = true +# how many messages are picked from db in one call +# note, that big number can slow requests if there +# is a huge message backlog (typically after broker outage) +batch_size = 100 diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 219401a..f542589 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -16,6 +16,7 @@ from proton.reactor import Container import koji from koji.context import context from koji.plugin import callback, convert_datetime, ignore_error +from kojihub import QueryProcessor, InsertProcessor CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf' CONFIG = None @@ -295,34 +296,91 @@ def prep_repo_done(cbtype, *args, **kws): queue_msg(address, props, kws) +def _send_msgs(urls, msgs, CONFIG): + random.shuffle(urls) + for url in urls: + container = Container(TimeoutHandler(url, msgs, CONFIG)) + container.run() + if msgs: + LOG.debug('could not send to %s, %s messages remaining', + url, len(msgs)) + else: + LOG.debug('all messages sent to %s successfully', url) + break + else: + LOG.error('could not send messages to any destinations') + return msgs + + +def store_to_db(msgs): + for msg in msgs: + if isinstance(msg, tuple): + address = msg[0] + props = json.dumps(msg[1]) + body = msg[2] + else: + address = msg['address'] + body = msg['body'] # already serialized + props = json.dumps(msg['props']) + insert = InsertProcessor(table='proton_queue') + insert.set(address=address, props=props, body=body) + if 'id' in msg: + # if we've something from db, we should store it in correct order + insert.set(id=msg['db_id']) + insert.execute() + context.cnx.commit() + + +def query_from_db(): + limit = CONFIG.getint('queue', 'batch_size', fallback=100) + c = context.cnx.cursor() + c.execute('BEGIN') + c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + query = QueryProcessor(tables=('proton_queue',), + columns=('id', 'address', 'props', 'body'), + opts={'order': 'id', 'limit': limit}) + msgs = list(query.execute()) + if msgs: + c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', + {'ids': [msg['id'] for msg in msgs]}) + c.execute('COMMIT') + + @ignore_error @convert_datetime @callback('postCommit') def send_queued_msgs(cbtype, *args, **kws): + global CONFIG msgs = getattr(context, 'protonmsg_msgs', None) if not msgs: return - global CONFIG if not CONFIG: CONFIG = koji.read_config_files([(CONFIG_FILE, True)]) urls = CONFIG.get('broker', 'urls').split() test_mode = False if CONFIG.has_option('broker', 'test_mode'): test_mode = CONFIG.getboolean('broker', 'test_mode') + db_enabled = False + if CONFIG.has_option('queue', 'enabled'): + db_enabled = CONFIG.getboolean('queue', 'test_mode') if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) for msg in msgs: LOG.debug('test mode: skipped msg: %r', msg) return - random.shuffle(urls) - for url in urls: - container = Container(TimeoutHandler(url, msgs, CONFIG)) - container.run() + + msgs = _send_msgs(urls, msgs, CONFIG) + + if db_enabled and not test_mode: if msgs: - LOG.debug('could not send to %s, %s messages remaining', - url, len(msgs)) + # if we still have some messages, store them and leave for another call to pick them up + store_to_db(msgs) else: - LOG.debug('all messages sent to %s successfully', url) - break - else: - LOG.error('could not send messages to any destinations') + # otherwise we are another call - look to db if there remains something to send + msgs = query_from_db() + msgs = _send_msgs(urls, msgs, CONFIG) + # return unsuccesful data to db + store_to_db(msgs) + + if msgs: + LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs)) diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 4bcad7e..9a38bbd 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -215,7 +215,7 @@ extra_limit = 2048 self.assertEqual(log.debug.call_count, 2) for args in log.debug.call_args_list: self.assertTrue(args[0][0].startswith('could not send')) - self.assertEqual(log.error.call_count, 1) + self.assertEqual(log.error.call_count, 2) self.assertTrue(log.error.call_args[0][0].startswith('could not send')) @patch('protonmsg.Container')