From 36b81da5d49eaac3c3b8d2e1af9e573942405f80 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: May 01 2021 19:04:06 +0000 Subject: [PATCH 1/5] protonmsg: use consistent type for messages Previously messages were represented as either tuples or dictionaries. Now they are always dictionaries. Fixes: https://pagure.io/koji/issue/2841 --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 83b3b7c..7c6dad0 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -77,7 +77,7 @@ class TimeoutHandler(MessagingHandler): def send_msgs(self, event): prefix = self.conf.get('broker', 'topic_prefix') for msg in self.msgs: - address = 'topic://' + prefix + '.' + msg[0] + address = 'topic://' + prefix + '.' + msg['address'] if address in self.senders: sender = self.senders[address] self.log.debug('retrieved cached sender for %s', address) @@ -85,15 +85,15 @@ class TimeoutHandler(MessagingHandler): sender = event.container.create_sender(event.connection, target=address) self.log.debug('created new sender for %s', address) self.senders[address] = sender - pmsg = Message(properties=msg[1], body=msg[2]) + pmsg = Message(properties=msg['props'], body=msg['body']) delivery = sender.send(pmsg) - self.log.debug('sent message: %s', msg[1]) + self.log.debug('sent message: %s', msg['props']) self.pending[delivery] = msg def update_pending(self, event): msg = self.pending[event.delivery] del self.pending[event.delivery] - self.log.debug('removed message from self.pending: %s', msg[1]) + self.log.debug('removed message from self.pending: %s', msg['props']) if not self.pending: if self.msgs: self.log.error('%s messages unsent (rejected or released)', len(self.msgs)) @@ -112,17 +112,17 @@ class TimeoutHandler(MessagingHandler): def on_settled(self, event): msg = self.pending[event.delivery] self.msgs.remove(msg) - self.log.debug('removed message from self.msgs: %s', msg[1]) + self.log.debug('removed message from self.msgs: %s', msg['props']) self.update_pending(event) def on_rejected(self, event): msg = self.pending[event.delivery] - self.log.error('message was rejected: %s', msg[1]) + self.log.error('message was rejected: %s', msg['props']) self.update_pending(event) def on_released(self, event): msg = self.pending[event.delivery] - self.log.error('message was released: %s', msg[1]) + self.log.error('message was released: %s', msg['props']) self.update_pending(event) def on_transport_tail_closed(self, event): @@ -169,7 +169,7 @@ def queue_msg(address, props, data): msgs = [] context.protonmsg_msgs = msgs body = json.dumps(data, default=json_serialize) - msgs.append((address, props, body)) + msgs.append({'address': address, 'props': props, 'body': body}) @convert_datetime @@ -322,14 +322,9 @@ def store_to_db(msgs): # we're running in postCommit, so we need to handle new transaction c.execute('BEGIN') for msg in msgs: - if isinstance(msg, tuple): - address = msg[0] - props = json.dumps(msg[1]) - body = msg[2] - else: - address = msg['address'] - body = msg['body'] # already serialized - props = json.dumps(msg['props']) + address = msg['address'] + body = msg['body'] + props = json.dumps(msg['props']) insert = InsertProcessor(table='proton_queue') insert.set(address=address, props=props, body=body) if 'id' in msg: From d6308d8611fe6e7a251b47f9ac3665608a12651f Mon Sep 17 00:00:00 2001 From: Mike McLean Date: May 01 2021 19:21:03 +0000 Subject: [PATCH 2/5] protonmsg: avoid calling _send_msgs on empty list --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 7c6dad0..27df6d0 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -351,9 +351,10 @@ def handle_db_msgs(urls, CONFIG): columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) msgs = list(query.execute()) + if not msgs: + return if CONFIG.getboolean('broker', 'test_mode', fallback=False): - if msgs: - LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) + LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) unsent = [] else: unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} From 23fb8857a4c985db1ba4c156600540eb8758f254 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: May 01 2021 19:21:03 +0000 Subject: [PATCH 3/5] protonmsg: drop bad key reference This is a relic of an earlier version of the db work. At this point in the code, there is never an id field. We don't re-store messages from the db. We only delete them after confirmed send. --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 27df6d0..a7ed023 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -327,9 +327,6 @@ def store_to_db(msgs): props = json.dumps(msg['props']) insert = InsertProcessor(table='proton_queue') insert.set(address=address, props=props, body=body) - if 'id' in msg: - # if we've something from db, we should store it in correct order - insert.set(id=msg['db_id']) insert.execute() c.execute('COMMIT') From 943ebda151c44feff977ccf1a5181ba071b96264 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: May 03 2021 15:31:34 +0000 Subject: [PATCH 4/5] fix unit tests --- diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 25d59fe..7bbd6eb 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -36,13 +36,13 @@ extra_limit = 2048 self.assertTrue(hasattr(context, 'protonmsg_msgs')) self.assertEqual(len(context.protonmsg_msgs), 1) msg = context.protonmsg_msgs[0] - self.assertEqual(msg[0], topic) + self.assertEqual(msg['address'], topic) for kw in kws: - self.assertTrue(kw in msg[1]) - self.assertEqual(msg[1][kw], kws[kw]) - self.assertEqual(len(msg[1]), len(kws)) + self.assertTrue(kw in msg['props']) + self.assertEqual(msg['props'][kw], kws[kw]) + self.assertEqual(len(msg['props']), len(kws)) if body: - self.assertEqual(msg[2], body) + self.assertEqual(msg['body'], body) def test_queue_msg(self): protonmsg.queue_msg('test.msg', {'testheader': 1}, 'test body') @@ -207,7 +207,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_fail(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] protonmsg.send_queued_msgs('postCommit') log = protonmsg.LOG @@ -219,7 +220,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_success(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] def clear_msgs(): del context.protonmsg_msgs[:] Container.return_value.run.side_effect = clear_msgs @@ -231,7 +233,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_test_mode(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] conf = tempfile.NamedTemporaryFile() conf.write(six.b("""[broker] urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671 @@ -336,7 +339,8 @@ send_timeout = 60 def test_send_msgs(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) event.container.create_sender.assert_called_once_with(event.connection, target='topic://koji.testtopic') @@ -349,8 +353,10 @@ send_timeout = 60 def test_update_pending(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"'), - ('testtopic', {'testheader': 2}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}, + {'address': 'testtopic', 'props': {'testheader': 2}, + 'body': '"test body"'}] delivery0 = MagicMock() delivery1 = MagicMock() sender = event.container.create_sender.return_value @@ -378,7 +384,8 @@ send_timeout = 60 def test_on_settled(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending) @@ -392,7 +399,8 @@ send_timeout = 60 def test_on_rejected(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending) @@ -406,7 +414,8 @@ send_timeout = 60 def test_on_released(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending) From 8fe08a8e543b2ae09f1bf15186439ae503a908ea Mon Sep 17 00:00:00 2001 From: Mike McLean Date: May 06 2021 19:07:14 +0000 Subject: [PATCH 5/5] protonmsg: actually remove messages from db queue Fixes: https://pagure.io/koji/issue/2846 --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index a7ed023..3fc62b0 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -354,9 +354,10 @@ def handle_db_msgs(urls, CONFIG): LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) unsent = [] else: - unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} + # we pass a copy of msgs because _send_msgs modifies it + unsent = {m['id'] for m in _send_msgs(urls, list(msgs), CONFIG)} sent = [m for m in msgs if m['id'] not in unsent] - if msgs: + if sent: c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', {'ids': [msg['id'] for msg in sent]}) finally: