From ec53e01547309c27083d1986fa5289565eee211b Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Oct 05 2017 20:15:23 +0000 Subject: defer message sending until after commit --- diff --git a/plugins/hub/messagebus.py b/plugins/hub/messagebus.py index b09cccb..eb7e5fb 100644 --- a/plugins/hub/messagebus.py +++ b/plugins/hub/messagebus.py @@ -5,6 +5,7 @@ # Mike Bonnet from koji import PluginError +from koji.context import context from koji.plugin import callbacks, callback, ignore_error, convert_datetime import ConfigParser import logging @@ -204,9 +205,8 @@ def get_message_headers(msgtype, *args, **kws): and c != 'postCommit']) @ignore_error @convert_datetime -def send_message(cbtype, *args, **kws): +def prep_message(cbtype, *args, **kws): global config - sender = get_sender() if cbtype.startswith('post'): msgtype = cbtype[4:] else: @@ -226,5 +226,19 @@ def send_message(cbtype, *args, **kws): else: raise PluginError('unsupported exchange type: %s' % exchange_type) - sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout')) + messages = getattr(context, 'messagebus_plugin_messages', []) + messages.append(message) + context.messagebus_plugin_messages = messages + + +@callback('postCommit') +@ignore_error +def send_messages(cbtype, *args, **kws): + '''Send the cached message from the other callback''' + + global config + messages = getattr(context, 'messagebus_plugin_messages', []) + sender = get_sender() + for message in messages: + sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout')) sender.close(timeout=config.getfloat('broker', 'timeout'))