From bea9a869165622117f19b76bf06d379ae38b65f1 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Oct 05 2017 20:16:26 +0000 Subject: PR#537: messagebus plugin: deferred sending and test mode Merges #537 https://pagure.io/koji/pull-request/537 --- diff --git a/plugins/hub/messagebus.py b/plugins/hub/messagebus.py index b09cccb..1e208c4 100644 --- a/plugins/hub/messagebus.py +++ b/plugins/hub/messagebus.py @@ -5,6 +5,7 @@ # Mike Bonnet from koji import PluginError +from koji.context import context from koji.plugin import callbacks, callback, ignore_error, convert_datetime import ConfigParser import logging @@ -68,8 +69,23 @@ class Connection(qpid.messaging.Connection): timeout = self._timeout return qpid.messaging.Connection._wait(self, predicate, timeout) + +def get_config(): + global config + if config: + return config + + config = ConfigParser.SafeConfigParser() + config.read(CONFIG_FILE) + if not config.has_option('broker', 'timeout'): + config.set('broker', 'timeout', '60') + if not config.has_option('broker', 'heartbeat'): + config.set('broker', 'heartbeat', '60') + return config + + def get_sender(): - global config, session, target + global session, target if session and target: try: return session.sender(target) @@ -78,12 +94,7 @@ def get_sender(): session = None target = None - config = ConfigParser.SafeConfigParser() - config.read(CONFIG_FILE) - if not config.has_option('broker', 'timeout'): - config.set('broker', 'timeout', '60') - if not config.has_option('broker', 'heartbeat'): - config.set('broker', 'heartbeat', '60') + config = get_config() if config.getboolean('broker', 'ssl'): url = 'amqps://' @@ -204,9 +215,7 @@ def get_message_headers(msgtype, *args, **kws): and c != 'postCommit']) @ignore_error @convert_datetime -def send_message(cbtype, *args, **kws): - global config - sender = get_sender() +def prep_message(cbtype, *args, **kws): if cbtype.startswith('post'): msgtype = cbtype[4:] else: @@ -216,6 +225,7 @@ def send_message(cbtype, *args, **kws): if args: data['args'] = list(args) + config = get_config() exchange_type = config.get('exchange', 'type') if exchange_type == 'topic': subject = get_message_subject(msgtype, *args, **kws) @@ -226,5 +236,36 @@ def send_message(cbtype, *args, **kws): else: raise PluginError('unsupported exchange type: %s' % exchange_type) - sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout')) - sender.close(timeout=config.getfloat('broker', 'timeout')) + messages = getattr(context, 'messagebus_plugin_messages', None) + if messages is None: + messages = [] + context.messagebus_plugin_messages = messages + messages.append(message) + + +@callback('postCommit') +@ignore_error +def send_messages(cbtype, *args, **kws): + '''Send the messages cached by prep_message''' + + logger = logging.getLogger('koji.plugin.messagebus') + config = get_config() + messages = getattr(context, 'messagebus_plugin_messages', []) + if not messages: + return + test_mode = False + if config.has_option('broker', 'test_mode'): + test_mode = config.getboolean('broker', 'test_mode') + if test_mode: + logger.debug('test mode: skipping broker connection') + for message in messages: + logger.debug('test mode: skipping message: %r', message) + else: + sender = get_sender() + for message in messages: + sender.send(message, sync=False, + timeout=config.getfloat('broker', 'timeout')) + sender.close(timeout=config.getfloat('broker', 'timeout')) + + # koji should do this for us, but just in case... + del context.messagebus_plugin_messages