From 8047690a7e42426449e3ef6fb0e7ae0caf01fd4c Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Jan 06 2017 14:03:27 +0000 Subject: [backend] support for STOMP msg buses Also make the STOMP msg bus more configurable, so particular copr instances can negotiate and configure the message formats with message consumers. Related: rhbz#1332438 --- diff --git a/backend/backend/constants.py b/backend/backend/constants.py index 1e73dd0..05a4c9c 100644 --- a/backend/backend/constants.py +++ b/backend/backend/constants.py @@ -25,6 +25,14 @@ class BuildStatus(object): PENDING = 4 SKIPPED = 5 + @classmethod + def string(cls, number): + """ convert number to string """ + for key, val in cls.__dict__.iteritems(): + if isinstance(val, int) and number == val: + return key + raise AttributeError("no such status id: {0} ".format(number)) + LOG_PUB_SUB = "copr:backend:log:pubsub::" diff --git a/backend/backend/daemons/worker.py b/backend/backend/daemons/worker.py index a3f4310..aa1770e 100644 --- a/backend/backend/daemons/worker.py +++ b/backend/backend/daemons/worker.py @@ -14,17 +14,14 @@ from ..constants import BuildStatus, build_log_format from ..helpers import register_build_result, get_redis_connection, get_redis_logger, \ local_file_logger +from ..msgbus import MsgBusStomp, MsgBusFedmsg -# ansible_playbook = "ansible-playbook" - -try: - import fedmsg -except ImportError: - # fedmsg is optional - fedmsg = None +# ansible_playbook = "ansible-playbook" class Worker(multiprocessing.Process): + msg_buses = [] + def __init__(self, opts, frontend_client, vm_manager, worker_id, vm, job): multiprocessing.Process.__init__(self, name="worker-{}".format(worker_id)) @@ -50,23 +47,15 @@ class Worker(multiprocessing.Process): "Original error: {}".format(error)) return str(self.vm.group) - def fedmsg_notify(self, topic, template, content=None): - """ - Publish message to fedmsg bus when it is available - :param topic: - :param template: - :param content: - """ - if self.opts.fedmsg_enabled and fedmsg: - content = content or {} - content["who"] = self.name - content["what"] = template.format(**content) - try: - fedmsg.publish(modname="copr", topic=topic, msg=content) - # pylint: disable=W0703 - except Exception as e: - self.log.exception("failed to publish message: {0}".format(e)) + def _announce(self, topic, job): + for bus in self.msg_buses: + bus.announce_job(topic, job, { + 'who': self.name, + 'ip': self.vm.vm_ip, + 'pid': self.pid, + }) + def _announce_start(self, job): """ @@ -75,41 +64,19 @@ class Worker(multiprocessing.Process): job.started_on = time.time() self.mark_started(job) - template = "build start: user:{user} copr:{copr}" \ - "pkg: {pkg} build:{build} ip:{ip} pid:{pid}" - - content = dict(user=job.submitter, copr=job.project_name, - owner=job.project_owner, pkg=job.package_name, - build=job.build_id, ip=self.vm.vm_ip, pid=self.pid) - self.fedmsg_notify("build.start", template, content) + for bus in self.msg_buses: + for topic in ['build.start', 'chroot.start']: + self._announce(topic, job) - template = "chroot start: chroot:{chroot} user:{user}" \ - "copr:{copr} pkg: {pkg} build:{build} ip:{ip} pid:{pid}" - - content = dict(chroot=job.chroot, user=job.submitter, - owner=job.project_owner, pkg=job.package_name, - copr=job.project_name, build=job.build_id, - ip=self.vm.vm_ip, pid=self.pid) - - self.fedmsg_notify("chroot.start", template, content) def _announce_end(self, job): """ Announce everywhere that a build process ended now. """ job.ended_on = time.time() - self.return_results(job) self.log.info("worker finished build: {0}".format(self.vm.vm_ip)) - template = "build end: user:{user} copr:{copr} build:{build}" \ - " pkg: {pkg} version: {version} ip:{ip} pid:{pid} status:{status}" - - content = dict(user=job.submitter, copr=job.project_name, - owner=job.project_owner, - pkg=job.package_name, version=job.package_version, - build=job.build_id, ip=self.vm.vm_ip, pid=self.pid, - status=job.status, chroot=job.chroot) - self.fedmsg_notify("build.end", template, content) + self._announce('build.end', job) def mark_started(self, job): """ @@ -155,17 +122,14 @@ class Worker(multiprocessing.Process): return True return False - def init_fedmsg(self): - """ - Initialize Fedmsg (this assumes there are certs and a fedmsg config on disk). - """ - if not (self.opts.fedmsg_enabled and fedmsg): - return + def init_buses(self): - try: - fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True) - except Exception as e: - self.log.exception("Failed to initialize fedmsg: {}".format(e)) + self.log.info(self.opts.msg_buses) + for bus_config in self.opts.msg_buses: + self.msg_buses.append(MsgBusStomp(bus_config, self.log)) + + if self.opts.fedmsg_enabled: + self.msg_buses.append(MsgBusFedmsg(self.log)) # TODO: doing skip logic on fronted during @start_build query # def on_pkg_skip(self, job): @@ -310,7 +274,7 @@ class Worker(multiprocessing.Process): def run(self): self.log.info("Starting worker") - self.init_fedmsg() + self.init_buses() try: self.do_job(self.job) diff --git a/backend/backend/helpers.py b/backend/backend/helpers.py index e6d4a40..10fb09f 100644 --- a/backend/backend/helpers.py +++ b/backend/backend/helpers.py @@ -13,6 +13,8 @@ import sys import errno import time from contextlib import contextmanager +import types +import glob import traceback @@ -33,6 +35,22 @@ import subprocess import logging import munch +def pyconffile(filename): + """ + Load python file as configuration file, inspired by python-flask + "from_pyfile() + """ + d = types.ModuleType(str('config')) + d.__file__ = filename + try: + with open(filename) as config_file: + exec(compile(config_file.read(), filename, 'exec'), d.__dict__) + except IOError as e: + e.strerror = 'Unable to load configuration file (%s)' % e.strerror + raise + return d + + def run_cmd(cmd): """Runs given command in a subprocess. @@ -287,6 +305,10 @@ class BackendConfigReader(object): opts.ssh.transport = _get_conf( cp, "ssh", "transport", "paramiko") + opts.msg_buses = [] + for bus_config in glob.glob('/etc/copr/msgbuses/*.conf'): + opts.msg_buses.append(pyconffile(bus_config)) + # thoughts for later # ssh key for connecting to builders? # cloud key stuff? diff --git a/backend/backend/msgbus.py b/backend/backend/msgbus.py new file mode 100644 index 0000000..e9c493e --- /dev/null +++ b/backend/backend/msgbus.py @@ -0,0 +1,172 @@ +""" +Message buses abstraction. +""" + +import logging +import copy +import json + +from .constants import BuildStatus + +try: + import fedmsg +except ImportError: + # fedmsg is optional + fedmsg = None + +try: + import stomp +except ImportError: + # stomp is also optional + stomp = None + +class MsgBus(object): + """ + An "abstract" message bus class, don't instantiate! + """ + messages = {} + + def __init__(self, opts, log=None): + if not log: + log = logging + logging.basicConfig(level=logging.DEBUG) + + self.log = log + self.opts = opts + + if hasattr(self.opts, 'messages'): + self.messages.update(self.opts.messages) + + + def _send(self, topic, body, headers): + raise NotImplementedError + + + def send(self, topic, body, headers=None): + """ + Send (dict) message over _send() method. + """ + out_headers = copy.deepcopy(self.opts.headers) + if headers: + out_headers.update(copy.deepcopy(headers)) + try: + self._send(topic, body, out_headers) + # pylint: disable=W0703 + except Exception as _: + self.log.exception("Failed to publish message.") + + + def announce_job(self, topic, job, subst): + """ + Announce everywhere that a build process started now. + """ + if not topic in self.messages: + return + + + und = '(undefined)' + content = { + 'user': getattr(job, 'submitter', und), + 'copr': getattr(job, 'project_name', und), + 'owner': getattr(job, 'project_owner', und), + 'pkg': getattr(job, 'package_name', und), + 'build': getattr(job, 'build_id', und), + 'chroot': getattr(job, 'chroot', und), + 'version': getattr(job, 'package_version', und), + 'status': getattr(job, 'status', und), + } + + content['str_status'] = BuildStatus.string(content['status']) + + # Additional replacements? + if subst: + content.update(subst) + + msg = {} + try: + for key in self.messages[topic]: + msg[key] = self.messages[topic][key].format(**content) + # pylint: disable=W0703 + except Exception as _: + self.log.exception("Failed to format '{0}' announcement." + .format(topic)) + return + + self.send(topic, msg) + + +class MsgBusStomp(MsgBus): + """ + Connect to STOMP bus and send messages. Make sure you have correctly + configured 'messages' attribute in every message bus configuration, no + default messages here! + """ + + def __init__(self, opts, log=None): + super(MsgBusStomp, self).__init__(opts, log) + + # shortcuts + host = self.opts.host + port = int(self.opts.port) + username = None + password = None + + self.log.info("connecting to (stomp) message bus '{0}:{1}" + .format(host, port)) + self.conn = stomp.Connection([(host, int(port))]) + self.conn.start() + + if getattr(self.opts, 'auth', None): + username = self.opts.auth['username'] + password = self.opts.auth['password'] + self.log.info("authenticating with username '{0}'".format(username)) + + self.conn.connect( + username=username, + passcode=password, + ) + + if not getattr(self.opts, 'destination', None): + setattr(self.opts, 'destination', '/default') + + + def _send(self, topic, body, headers): + send_headers = copy.deepcopy(headers) + send_headers['topic'] = topic + self.conn.send(body=json.dumps(body), headers=send_headers, + destination=self.opts.destination) + + +class MsgBusFedmsg(MsgBus): + """ + Connect to fedmsg and send messages over it. + """ + messages = { + 'build.start': { + 'who': '{who}', + 'what': "build start: user:{user} copr:{copr}" \ + "pkg: {pkg} build:{build} ip:{ip} pid:{pid}" + }, + 'chroot.start': { + 'who': '{who}', + 'what': "chroot start: chroot:{chroot} user:{user}" \ + "copr:{copr} pkg: {pkg} build:{build} ip:{ip} pid:{pid}" + }, + 'build.end': { + 'who': '{who}', + 'what': "build end: user:{user} copr:{copr} build:{build}" \ + " pkg: {pkg} version: {version} ip:{ip} pid:{pid} status:{status}" + }, + } + + def __init__(self, log=None): + # Hack to not require opts argument for now. + opts = type('', (), {}) + opts.headers = {} + + super(MsgBusFedmsg, self).__init__(opts, log) + + fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True) + + def _send(self, topic, body, headers): + fedmsg.publish(modname="copr", topic=topic, content=body) diff --git a/backend/conf/msgbus.conf.example b/backend/conf/msgbus.conf.example new file mode 100644 index 0000000..a2806cc --- /dev/null +++ b/backend/conf/msgbus.conf.example @@ -0,0 +1,50 @@ +""" +Example configuration file for message bus. +""" + +bus_id = 'ci_message_bus' + +host = 'bus.example.com' +port = '61613' + +auth = { + 'username': 'jenkins', + 'password': 'johnHolmes', +} + +# Put arbitrary message headers here. E.g. you can identify your +# staging/production copr instance. + +headers = { + 'CI_TYPE': 'copr-service', + 'copr_instance': 'development', +} + +destination = "/topic/CI" + +# Define message templates. Each message is identified by "key" (e.g. +# 'build.start') and contains "key/value" pairs, while "values" are subject of +# string substitution at the time message is generated. Available substitutions +# are: +# 'user' (submitter), 'copr', 'owner', 'pkg', 'build' (id), 'chroot', +# 'version' (pkg), 'ip' (of builder), 'who' (process name), 'pid' (of builder +# process), 'status' (status ID, see class BuildStatus), 'status_str' +# (user readable status) + +messages = { + 'build.start': { + 'package': '{pkg}-{version}', + 'chroot': '{chroot}', + 'user': '{user}', + }, + 'build.end': { + 'package': '{pkg}-{version}', + 'chroot': '{chroot}', + 'user': '{user}', + }, + 'chroot.start': { + 'chroot': "{chroot}", + }, +} + +# vi: ft=python