#2441 proton: persistent message queue
Merged 9 months ago by tkopecek. Opened 10 months ago by tkopecek.
tkopecek/koji issue2230  into  master

@@ -6,4 +6,15 @@ 

  

  CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL;

  

+ 

+ -- Message queue for the protonmsg plugin

+ CREATE TABLE proton_queue (

+         id SERIAL PRIMARY KEY,

+         created_ts TIMESTAMPTZ,

+         address TEXT NOT NULL,

+         props JSON NOT NULL,

+         body JSON NOT NULL

+ ) WITHOUT OIDS;

+ 

+ 

  COMMIT;

file modified
+11
@@ -937,4 +937,15 @@ 

          flags TEXT

  ) WITHOUT OIDS;

  

+ 

+ -- Message queue for the protonmsg plugin

+ CREATE TABLE proton_queue (

+         id SERIAL PRIMARY KEY,

+         created_ts TIMESTAMPTZ,

+         address TEXT NOT NULL,

+         props JSON NOT NULL,

+         body JSON NOT NULL

+ ) WITHOUT OIDS;

+ 

+ 

  COMMIT WORK;

file modified
+55
@@ -158,3 +158,58 @@ 

      ks = koji.ClientSession('https://koji.fedoraproject.org/kojihub')

      ks.gssapi_login()

      ks.createSideTag('f30-build')

+ 

+ Proton messaging

+ ================

+ 

+ The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of

+ messages about Koji activity to the configured amqps message brokers.

+ Most callback events on the hub are translated into messages.

+ 

+ In order to enable this plugin, you must:

+ 

+ * add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf``

+ 

+ * provide a configuration file for the plugin at

+   ``/etc/koji-hub/plugins/protonmsg.conf``

+ 

+ The configuration file is ini-style format with three sections: broker,

+ queue and message.

+ The ``[broker]`` section defines how the plugin connects to the message bus.

+ The following fields are understood:

+ 

+ * ``urls`` -- a space separated list of amqps urls. Additional urls are

+   treated as fallbacks. The plugin will send to the first one that accepts

+   the message

+ * ``cert`` -- the client cert file for authentication

+ * ``cacert`` -- the ca cert to validate the server

+ * ``topic_prefix`` -- this string will be used as a prefix for all message topics

+ * ``connect_timeout`` -- the number of seconds to wait for a connection before

+   timing out

+ * ``send_timeout`` -- the number of seconds to wait while sending a message

+   before timing out

+ 

+ The ``[message]`` section sets parameters for how messages are formed.

+ Currently only one field is understood:

+ 

+ * ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that

+   appear in messages. If the ``build.extra`` field is longer (in terms of 

+   json-encoded length), then it will be omitted. The default value is ``0``

+   which means no limit.

+ 

+ The ``[queue]`` section controls how (or if) the plugin will use the database

+ to queue messages when they cannot be immediately sent.

+ The following fields are understood:

+ 

+ * ``enabled`` -- if true, then the feature is enabled

+ * ``batch_size`` -- the maximum number of queued messages to send at one time

+ * ``max_age`` -- the age (in hours) at which old messages in the queue are discarded

+ 

+ It is important to note that the database queue is only a fallback mechanism.

+ The plugin will always attempt to send messages as they are issued.

+ Messages are only placed in the database queue when they cannot be immediately

+ sent on the bus (e.g. if the amqps server is offline).

+ 

+ Admins should consider the balance between the ``batch_size`` and

+ ``extra_limit`` options, as both can affect the total amount of data that the

+ plugin could attempt to send during a single call.

@@ -11,3 +11,13 @@ 

  # if field is longer (json.dumps), ignore it

  # default value is 0 - unlimited size

  extra_limit = 0

+ 

+ [queue]

+ # enable persistent database queue

+ enabled = true

+ # how many messages are picked from db in one call

+ # note, that big number can slow requests if there

+ # is a huge message backlog (typically after broker outage)

+ batch_size = 100

+ # how old messages should be stored (hours)

+ max_age = 24

file modified
+107 -14
@@ -9,6 +9,7 @@ 

  import logging

  import random

  

+ import psycopg2

  from proton import Message, SSLDomain

  from proton.handlers import MessagingHandler

  from proton.reactor import Container
@@ -16,6 +17,7 @@ 

  import koji

  from koji.context import context

  from koji.plugin import callback, convert_datetime, ignore_error

+ from kojihub import QueryProcessor, InsertProcessor

  

  CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'

  CONFIG = None
@@ -295,34 +297,125 @@ 

      queue_msg(address, props, kws)

  

  

+ def _send_msgs(urls, msgs, CONFIG):

+     random.shuffle(urls)

+     for url in urls:

+         container = Container(TimeoutHandler(url, msgs, CONFIG))

+         container.run()

+         if msgs:

+             LOG.debug('could not send to %s, %s messages remaining',

+                       url, len(msgs))

+         else:

+             LOG.debug('all messages sent to %s successfully', url)

+             break

+     else:

+         LOG.error('could not send messages to any destinations')

+     return msgs

+ 

+ 

+ def store_to_db(msgs):

+     c = context.cnx.cursor()

+     # we're running in postCommit, so we need to handle new transaction

+     c.execute('BEGIN')

+     for msg in msgs:

+         if isinstance(msg, tuple):

+             address = msg[0]

+             props = json.dumps(msg[1])

+             body = msg[2]

+         else:

+             address = msg['address']

+             body = msg['body']  # already serialized

+             props = json.dumps(msg['props'])

+         insert = InsertProcessor(table='proton_queue')

+         insert.set(address=address, props=props, body=body)

+         if 'id' in msg:

+             # if we've something from db, we should store it in correct order

+             insert.set(id=msg['db_id'])

+         insert.execute()

+     c.execute('COMMIT')

+ 

+ 

+ def handle_db_msgs(urls, CONFIG):

+     limit = CONFIG.getint('queue', 'batch_size', fallback=100)

+     c = context.cnx.cursor()

+     # we're running in postCommit, so we need to handle new transaction

+     c.execute('BEGIN')

+     try:

+         c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')

+     except psycopg2.OperationalError:

+         LOG.debug('skipping db queue due to lock')

+         return

+     try:

+         c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %

+                   CONFIG.getint('queue', 'age', fallback=24))

+         query = QueryProcessor(tables=('proton_queue',),

+                                columns=('id', 'address', 'props', 'body'),

+                                opts={'order': 'id', 'limit': limit})

+         msgs = list(query.execute())

+         if CONFIG.getboolean('broker', 'test_mode', fallback=False):

+             if msgs:

+                 LOG.debug('test mode: skipping send for %i messages from db', len(msgs))

+             unsent = []

+         else:

+             unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)}

+         sent = [m for m in msgs if m['id'] not in unsent]

+         if msgs:

+             c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',

+                       {'ids': [msg['id'] for msg in sent]})

+     finally:

+         # make sure we free the lock

+         try:

+             c.execute('COMMIT')

+         except Exception:

+             c.execute('ROLLBACK')

+ 

+ 

  @ignore_error

  @convert_datetime

  @callback('postCommit')

  def send_queued_msgs(cbtype, *args, **kws):

+     global CONFIG

      msgs = getattr(context, 'protonmsg_msgs', None)

      if not msgs:

          return

-     global CONFIG

      if not CONFIG:

          CONFIG = koji.read_config_files([(CONFIG_FILE, True)])

      urls = CONFIG.get('broker', 'urls').split()

      test_mode = False

      if CONFIG.has_option('broker', 'test_mode'):

          test_mode = CONFIG.getboolean('broker', 'test_mode')

+     db_enabled = False

+     if CONFIG.has_option('queue', 'enabled'):

+         db_enabled = CONFIG.getboolean('queue', 'enabled')

+ 

      if test_mode:

          LOG.debug('test mode: skipping send to urls: %r', urls)

-         for msg in msgs:

-             LOG.debug('test mode: skipped msg: %r', msg)

-         return

-     random.shuffle(urls)

-     for url in urls:

-         container = Container(TimeoutHandler(url, msgs, CONFIG))

-         container.run()

-         if msgs:

-             LOG.debug('could not send to %s, %s messages remaining',

-                       url, len(msgs))

+         fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0)

+         if fail_chance:

+             # simulate unsent messages in test mode

+             sent = []

+             unsent = []

+             for m in msgs:

+                 if random.randint(1, 100) <= fail_chance:

+                     unsent.append(m)

+                 else:

+                     sent.append(m)

+             if unsent:

+                 LOG.info('simulating %i unsent messages' % len(unsent))

          else:

-             LOG.debug('all messages sent to %s successfully', url)

-             break

+             sent = msgs

+             unsent = []

+         for msg in sent:

+             LOG.debug('test mode: skipped msg: %r', msg)

      else:

-         LOG.error('could not send messages to any destinations')

+         unsent = _send_msgs(urls, msgs, CONFIG)

+ 

+     if db_enabled:

+         if unsent:

+             # if we still have some messages, store them and leave for another call to pick them up

+             store_to_db(msgs)

+         else:

+             # otherwise we are another call - look to db if there remains something to send

+             handle_db_msgs(urls, CONFIG)

+     elif unsent:

+         LOG.error('could not send %i messages. db queue disabled' % len(msgs))

@@ -215,7 +215,7 @@ 

          self.assertEqual(log.debug.call_count, 2)

          for args in log.debug.call_args_list:

              self.assertTrue(args[0][0].startswith('could not send'))

-         self.assertEqual(log.error.call_count, 1)

+         self.assertEqual(log.error.call_count, 2)

          self.assertTrue(log.error.call_args[0][0].startswith('could not send'))

  

      @patch('protonmsg.Container')

rebased onto 617a1f1ce7abe83bafd6e4291c4cc04996c19c33

10 months ago
CONFIG = None

if not CONFIG:
    CONFIG = koji.read_config_files([(CONFIG_FILE, True)])

Previously, the config was only loaded as needed, and the if not CONFIG check was there to check if it had been loaded yet. Now that we're reading the config at load time, this check seems superfluous -- we've just set the value to None in the line before. That is unless this is about thread-safety, but if that's the case I think we'd want to do something a little different.

        if extra_limit == 0:
            return buildinfo

This appears to be an unrelated behavior change. It certainly seems reasonable, but I think it deserves to be tracked and documented. Perhaps a separate issue?

    if CONFIG.getboolean('broker', 'test_mode', fallback=False):
        LOG.debug("test mode: Would queue msg: %r/%r/%r" % (address, props, data))
        return

This significantly reduces the code coverage of test mode. Previously test mode would take things all the way up to the point of sending the message.

 Currently you need to create table manually by running the following SQL:

I don't really like requiring this and it also seems a little strange for the table to be configurable.

Since this is a supported plugin bundled with the code, perhaps we can just include this in the schema.

We should probably explain the batch_size config in the doc.

the removed message from self.msgs: debug log message should be issued regardless of the queue setting.

in queue_msg we're dumping data to json twice in the non-db queue case.

Ok, after thinking further, I have a fairly fundamental criticism.

The two different queue pathways, context and db, are written as if they are two ways of doing the same thing, but the db pathway is substantially different.

In the db pathway, messages can be buffered across calls. This is a very significant difference. We can have a call handler emitting messages that are completely unrelated to that call. E.g. a tagBuildBypass call could end up emitting messages from an earlier repoInit call.

I'm worried we could run into some very surprising situations where an earlier call could affect or even break a later one.

I'm also worried about shifting from a thread local queue to a one that is not-only not thread-local, but shared across all call handlers.

Two calls running in parallel will both insert their message data into the table, and then they will both query all messages in that table, up to the batch_size limit. It looks like we could easily generate duplicate messages.

I think that we should treat the "buffer-across-calls" case as a clear exception, and we need to think very carefully about how and when we handle emitting messages that we were unable to emit during the call that generated them.

The send_queued_msgs handler is hooked into the postCommit callback which happens after the commit for the call. The DELETE statements in on_settled will end up executing after the commit and will be rolled back when we call context.cnx.close().

1 new commit added

  • wip
10 months ago

1 new commit added

  • wip
10 months ago

rebased onto 8d6f8184db41a6a2d9f0ffcd4ef538baac66b6b7

9 months ago

rebased onto 0e3a294c247acef3ec9737b215ea61cb7f892d11

9 months ago

rebased onto 616237558ba958eba5cc516085ac3e68808ea0a2

9 months ago

I've rewritten it - now it saves only remaining messages. If all messages were sent, it looks to db and locks table for query/deletion of messages.

rebased onto c058274e7fec4e61e626e2194daddbd4413e7e51

9 months ago

1 new commit added

  • delete too old messages
9 months ago

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

9 months ago
db_enabled = CONFIG.getboolean('queue', 'test_mode')

I think you want CONFIG.getboolean('queue', 'enabled')

Do we need a BEGIN in store_to_db?

When we have db queue entries and are unable to send, it looks like we're going to query, delete, and re-insert each entry each time. This seems like unnecessary churn, and a case I think we'll encounter regularly with message bus outages.

c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')

It doesn't look like we're handling the case where we don't get the lock

In places where we're performing db writes, it's probably worth including a comment to remind the reader that we're running in postCommit.

Metadata Update from @tkopecek:
- Pull-request untagged with: testing-ready

9 months ago

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

9 months ago

rebased onto baddd112ee664de9ad8762b00f4d850a22b11ce2

9 months ago

pretty please pagure-ci rebuild

9 months ago

rebased onto 1423346944547a0370a69b2065f445261ce35f1d

9 months ago

pretty please pagure-ci rebuild

9 months ago

I started going through this and ended up wanting to make a number of changes. Here is what I came up with.

https://pagure.io/fork/mikem/koji/commits/pr2441updates

There's a lot there. I definitely prefer to avoid the re-insertion churn, which this does. I'm not 100% sure about the test_mode_fail setting, but it enabled me to test out the plugin locally and debug some issues. I think the docs changes are helpful.

WDYT?

I'm also tempted to add a simple query to check to see if there are any messages in the queue before we bother locking. It seems like that would be a common case. But perhaps I'm over-optimizing? It's not like we wait for the lock.

there is no column: address in proton_queue table.

there is no column: address in proton_queue table.

nor is there a created_tz in the current version of the patch. My branch above adds both.

Another issue I addressed on my branch -- psycopg2.errors is new in version 2.8. While the project released this version about 1.5 yrs ago, it's not readily available on all our platforms. I changed the except clause to be more compatible with older versions of the lib.

I'm hesitating about running this inside the transaction. But it is probably not a big problem. Reason for this is that we effectively serialize sending these messages as only one process in time can be sending db-queued messages. Former solution could have sent more, but it still could have mixed the order if broker is not working well (or if one process hits the working broker while other not). So in the end I'm ok with this approach. (BTW, PG 9+ has nice SKIP LOCKED which we can leverage later if we fix the requirements for PG version https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/)

rebased onto 7448a6e

9 months ago

rebased onto 7448a6e

9 months ago

LOCK NOWAIT is almost for free, so I would leave it there as second select (we would still need it after acuqiring the lock) would be more work.

Commit 729f847 fixes this pull-request

Pull-Request has been merged by tkopecek

9 months ago

Metadata Update from @jcupova:
- Pull-request tagged with: testing-done

8 months ago