#2844 protonmsg: use consistent data format for messages
Merged a year ago by tkopecek. Opened a year ago by mikem.

file modified
+17 -23
@@ -77,7 +77,7 @@ 

      def send_msgs(self, event):

          prefix = self.conf.get('broker', 'topic_prefix')

          for msg in self.msgs:

-             address = 'topic://' + prefix + '.' + msg[0]

+             address = 'topic://' + prefix + '.' + msg['address']

              if address in self.senders:

                  sender = self.senders[address]

                  self.log.debug('retrieved cached sender for %s', address)
@@ -85,15 +85,15 @@ 

                  sender = event.container.create_sender(event.connection, target=address)

                  self.log.debug('created new sender for %s', address)

                  self.senders[address] = sender

-             pmsg = Message(properties=msg[1], body=msg[2])

+             pmsg = Message(properties=msg['props'], body=msg['body'])

              delivery = sender.send(pmsg)

-             self.log.debug('sent message: %s', msg[1])

+             self.log.debug('sent message: %s', msg['props'])

              self.pending[delivery] = msg

  

      def update_pending(self, event):

          msg = self.pending[event.delivery]

          del self.pending[event.delivery]

-         self.log.debug('removed message from self.pending: %s', msg[1])

+         self.log.debug('removed message from self.pending: %s', msg['props'])

          if not self.pending:

              if self.msgs:

                  self.log.error('%s messages unsent (rejected or released)', len(self.msgs))
@@ -112,17 +112,17 @@ 

      def on_settled(self, event):

          msg = self.pending[event.delivery]

          self.msgs.remove(msg)

-         self.log.debug('removed message from self.msgs: %s', msg[1])

+         self.log.debug('removed message from self.msgs: %s', msg['props'])

          self.update_pending(event)

  

      def on_rejected(self, event):

          msg = self.pending[event.delivery]

-         self.log.error('message was rejected: %s', msg[1])

+         self.log.error('message was rejected: %s', msg['props'])

          self.update_pending(event)

  

      def on_released(self, event):

          msg = self.pending[event.delivery]

-         self.log.error('message was released: %s', msg[1])

+         self.log.error('message was released: %s', msg['props'])

          self.update_pending(event)

  

      def on_transport_tail_closed(self, event):
@@ -169,7 +169,7 @@ 

          msgs = []

          context.protonmsg_msgs = msgs

      body = json.dumps(data, default=json_serialize)

-     msgs.append((address, props, body))

+     msgs.append({'address': address, 'props': props, 'body': body})

  

  

  @convert_datetime
@@ -322,19 +322,11 @@ 

      # we're running in postCommit, so we need to handle new transaction

      c.execute('BEGIN')

      for msg in msgs:

-         if isinstance(msg, tuple):

-             address = msg[0]

-             props = json.dumps(msg[1])

-             body = msg[2]

-         else:

-             address = msg['address']

-             body = msg['body']  # already serialized

-             props = json.dumps(msg['props'])

+         address = msg['address']

+         body = msg['body']

+         props = json.dumps(msg['props'])

          insert = InsertProcessor(table='proton_queue')

          insert.set(address=address, props=props, body=body)

-         if 'id' in msg:

-             # if we've something from db, we should store it in correct order

-             insert.set(id=msg['db_id'])

          insert.execute()

      c.execute('COMMIT')

  
@@ -356,14 +348,16 @@ 

                                 columns=('id', 'address', 'props', 'body'),

                                 opts={'order': 'id', 'limit': limit})

          msgs = list(query.execute())

+         if not msgs:

+             return

          if CONFIG.getboolean('broker', 'test_mode', fallback=False):

-             if msgs:

-                 LOG.debug('test mode: skipping send for %i messages from db', len(msgs))

+             LOG.debug('test mode: skipping send for %i messages from db', len(msgs))

              unsent = []

          else:

-             unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)}

+             # we pass a copy of msgs because _send_msgs modifies it

+             unsent = {m['id'] for m in _send_msgs(urls, list(msgs), CONFIG)}

          sent = [m for m in msgs if m['id'] not in unsent]

-         if msgs:

+         if sent:

              c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',

                        {'ids': [msg['id'] for msg in sent]})

      finally:

@@ -36,13 +36,13 @@ 

          self.assertTrue(hasattr(context, 'protonmsg_msgs'))

          self.assertEqual(len(context.protonmsg_msgs), 1)

          msg = context.protonmsg_msgs[0]

-         self.assertEqual(msg[0], topic)

+         self.assertEqual(msg['address'], topic)

          for kw in kws:

-             self.assertTrue(kw in msg[1])

-             self.assertEqual(msg[1][kw], kws[kw])

-         self.assertEqual(len(msg[1]), len(kws))

+             self.assertTrue(kw in msg['props'])

+             self.assertEqual(msg['props'][kw], kws[kw])

+         self.assertEqual(len(msg['props']), len(kws))

          if body:

-             self.assertEqual(msg[2], body)

+             self.assertEqual(msg['body'], body)

  

      def test_queue_msg(self):

          protonmsg.queue_msg('test.msg', {'testheader': 1}, 'test body')
@@ -207,7 +207,8 @@ 

  

      @patch('protonmsg.Container')

      def test_send_queued_msgs_fail(self, Container):

-         context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')]

+         context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1},

+                                    'body': 'test body'}]

          protonmsg.send_queued_msgs('postCommit')

  

          log = protonmsg.LOG
@@ -219,7 +220,8 @@ 

  

      @patch('protonmsg.Container')

      def test_send_queued_msgs_success(self, Container):

-         context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')]

+         context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1},

+                                    'body': 'test body'}]

          def clear_msgs():

              del context.protonmsg_msgs[:]

          Container.return_value.run.side_effect = clear_msgs
@@ -231,7 +233,8 @@ 

  

      @patch('protonmsg.Container')

      def test_send_queued_msgs_test_mode(self, Container):

-         context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')]

+         context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1},

+                                    'body': 'test body'}]

          conf = tempfile.NamedTemporaryFile()

          conf.write(six.b("""[broker]

  urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671
@@ -336,7 +339,8 @@ 

      def test_send_msgs(self, SSLDomain, Message):

          event = MagicMock()

          self.handler.on_start(event)

-         self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')]

+         self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1},

+                               'body': '"test body"'}]

          self.handler.on_connection_opened(event)

          event.container.create_sender.assert_called_once_with(event.connection,

                                                                target='topic://koji.testtopic')
@@ -349,8 +353,10 @@ 

      def test_update_pending(self, SSLDomain, Message):

          event = MagicMock()

          self.handler.on_start(event)

-         self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"'),

-                              ('testtopic', {'testheader': 2}, '"test body"')]

+         self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1},

+                               'body': '"test body"'},

+                              {'address': 'testtopic', 'props': {'testheader': 2},

+                               'body': '"test body"'}]

          delivery0 = MagicMock()

          delivery1 = MagicMock()

          sender = event.container.create_sender.return_value
@@ -378,7 +384,8 @@ 

      def test_on_settled(self, SSLDomain, Message):

          event = MagicMock()

          self.handler.on_start(event)

-         self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')]

+         self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1},

+                               'body': '"test body"'}]

          self.handler.on_connection_opened(event)

          delivery = event.container.create_sender.return_value.send.return_value

          self.assertTrue(delivery in self.handler.pending)
@@ -392,7 +399,8 @@ 

      def test_on_rejected(self, SSLDomain, Message):

          event = MagicMock()

          self.handler.on_start(event)

-         self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')]

+         self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1},

+                               'body': '"test body"'}]

          self.handler.on_connection_opened(event)

          delivery = event.container.create_sender.return_value.send.return_value

          self.assertTrue(delivery in self.handler.pending)
@@ -406,7 +414,8 @@ 

      def test_on_released(self, SSLDomain, Message):

          event = MagicMock()

          self.handler.on_start(event)

-         self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')]

+         self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1},

+                               'body': '"test body"'}]

          self.handler.on_connection_opened(event)

          delivery = event.container.create_sender.return_value.send.return_value

          self.assertTrue(delivery in self.handler.pending)

This change makes the data representation for messages consistent. They are always dictionaries.

Dictionaries were chosen over tuples because we need the flexibility of including the id field for messages from the db. That is needed so we can delete sent messages from the db after sending them.

Fixes: https://pagure.io/koji/issue/2841
Fixes: https://pagure.io/koji/issue/2846

Isn't this still needed (fixed variant msg['id'])? Otherwise we will add older messages after some which could have been inserted meanwhile and replay order will be wrong. (It can occur already as other thread can be slower e.g. by timeouting on some URL).

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

a year ago

Dropping the bit handling the db id field was explained in the commit message for that change.

This is a relic of an earlier version of the db work.
At this point in the code, there is never an id field.
We don't re-store messages from the db. We only delete them
after confirmed send.

At first I was looking at this part because it looked like a potential KeyError (there is never a db_id field), but then I realized the above.

@tkopecek discussed this earlier today and we both agreed this part was fine.

Testing Notes

Issue #2841 was discovered while testing on a stage system with protonmsg enabled and configured. The plugin was connecting to our stage amqps broker.

In order to test the db queue code path, I used iptables to temporarily block connections to our broker hosts. E.g.

# for ip in $BROKER_IPS; do  iptables -A INPUT -s "$ip" -j DROP ; done
# iptables -L INPUT

A short time later, the tracebacks from #2841 started to appear.

Once I'd explored the problem, I undid the ip block

# for ip in $BROKER_IPS; do  iptables -D INPUT -s "$ip" -j DROP ; done
# iptables -L INPUT

Removing the block doesn't solve the issue. Once there are entries in the proton_queue table, the plugin will error on every call that manages to get the table lock, losing new messages in the process.

I've found a separate issue in testing. The messages are never deleted from the db.

New issue is #2846
Will append a fix shortly

1 new commit added

  • protonmsg: actually remove messages from db queue
a year ago

I ran these current changes (including the last) on the same system where I discovered the bug. The behavior was as expected. With ampq connections blocked, the db queue started to fill up with message. When the connections were unblocked, the queued messages were sent (once) and removed from the db.

Commit 8b07fcf fixes this pull-request

Pull-Request has been merged by tkopecek

a year ago

Metadata Update from @mfilip:
- Pull-request tagged with: testing-done

a year ago