#21 [backend] support for STOMP msg buses
Merged 7 years ago by praiskup. Opened 7 years ago by praiskup.
Unknown source generic-msg-bus  into  master

@@ -25,6 +25,14 @@

      PENDING = 4

      SKIPPED = 5

  

+     @classmethod
clime commented 7 years ago

What about placing this into helpers.py?

+     def string(cls, number):

+         """ convert number to string """

+         for key, val in cls.__dict__.iteritems():

+             if isinstance(val, int) and number == val:

+                 return key

+         raise AttributeError("no such status id: {0} ".format(number))

+ 

  

  LOG_PUB_SUB = "copr:backend:log:pubsub::"

  

@@ -14,17 +14,14 @@

  from ..helpers import register_build_result, get_redis_connection, get_redis_logger, \

      local_file_logger

  

+ from ..msgbus import MsgBusStomp, MsgBusFedmsg

  

- # ansible_playbook = "ansible-playbook"

- 

- try:

-     import fedmsg

- except ImportError:

-     # fedmsg is optional

-     fedmsg = None

  

+ # ansible_playbook = "ansible-playbook"

  

  class Worker(multiprocessing.Process):

+     msg_buses = []

+ 

      def __init__(self, opts, frontend_client, vm_manager, worker_id, vm, job):

          multiprocessing.Process.__init__(self, name="worker-{}".format(worker_id))

  
@@ -50,23 +47,15 @@

                                 "Original error: {}".format(error))

              return str(self.vm.group)

  

-     def fedmsg_notify(self, topic, template, content=None):

-         """

-         Publish message to fedmsg bus when it is available

-         :param topic:

-         :param template:

-         :param content:

-         """

-         if self.opts.fedmsg_enabled and fedmsg:

-             content = content or {}

-             content["who"] = self.name

-             content["what"] = template.format(**content)

  

-             try:

-                 fedmsg.publish(modname="copr", topic=topic, msg=content)

-             # pylint: disable=W0703

-             except Exception as e:

-                 self.log.exception("failed to publish message: {0}".format(e))

+     def _announce(self, topic, job):

+         for bus in self.msg_buses:

+             bus.announce_job(topic, job, {

+                 'who': self.name,

+                 'ip': self.vm.vm_ip,

+                 'pid': self.pid,

+             })

+ 

  

      def _announce_start(self, job):

          """
@@ -75,41 +64,19 @@

          job.started_on = time.time()

          self.mark_started(job)

  

-         template = "build start: user:{user} copr:{copr}" \

-             "pkg: {pkg} build:{build} ip:{ip}  pid:{pid}"

- 

-         content = dict(user=job.submitter, copr=job.project_name,

-                        owner=job.project_owner, pkg=job.package_name,

-                        build=job.build_id, ip=self.vm.vm_ip, pid=self.pid)

-         self.fedmsg_notify("build.start", template, content)

+         for bus in self.msg_buses:

+             for topic in ['build.start', 'chroot.start']:

+                 self._announce(topic, job)

  

-         template = "chroot start: chroot:{chroot} user:{user}" \

-             "copr:{copr} pkg: {pkg} build:{build} ip:{ip}  pid:{pid}"

- 

-         content = dict(chroot=job.chroot, user=job.submitter,

-                        owner=job.project_owner, pkg=job.package_name,

-                        copr=job.project_name, build=job.build_id,

-                        ip=self.vm.vm_ip, pid=self.pid)

- 

-         self.fedmsg_notify("chroot.start", template, content)

  

      def _announce_end(self, job):

          """

          Announce everywhere that a build process ended now.

          """

          job.ended_on = time.time()

- 

          self.return_results(job)

          self.log.info("worker finished build: {0}".format(self.vm.vm_ip))

-         template = "build end: user:{user} copr:{copr} build:{build}" \

-             "  pkg: {pkg}  version: {version} ip:{ip}  pid:{pid} status:{status}"

- 

-         content = dict(user=job.submitter, copr=job.project_name,

-                        owner=job.project_owner,

-                        pkg=job.package_name, version=job.package_version,

-                        build=job.build_id, ip=self.vm.vm_ip, pid=self.pid,

-                        status=job.status, chroot=job.chroot)

-         self.fedmsg_notify("build.end", template, content)

+         self._announce('build.end', job)

  

      def mark_started(self, job):

          """
@@ -155,17 +122,14 @@

              return True

          return False

  

-     def init_fedmsg(self):

-         """

-         Initialize Fedmsg (this assumes there are certs and a fedmsg config on disk).

-         """

-         if not (self.opts.fedmsg_enabled and fedmsg):

-             return

+     def init_buses(self):

  

-         try:

-             fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True)

-         except Exception as e:

-             self.log.exception("Failed to initialize fedmsg: {}".format(e))

+         self.log.info(self.opts.msg_buses)

+         for bus_config in self.opts.msg_buses:

+             self.msg_buses.append(MsgBusStomp(bus_config, self.log))

+ 

+         if self.opts.fedmsg_enabled:

+             self.msg_buses.append(MsgBusFedmsg(self.log))

  

      # TODO: doing skip logic on fronted during @start_build query

      # def on_pkg_skip(self, job):
@@ -310,7 +274,7 @@

  

      def run(self):

          self.log.info("Starting worker")

-         self.init_fedmsg()

+         self.init_buses()

  

          try:

              self.do_job(self.job)

@@ -13,6 +13,8 @@

  import errno

  import time

  from contextlib import contextmanager

+ import types

+ import glob

  

  import traceback

  
@@ -33,6 +35,22 @@

  import logging

  import munch

  

+ def pyconffile(filename):

+     """

+     Load python file as configuration file, inspired by python-flask

+     "from_pyfile()

+     """

+     d = types.ModuleType(str('config'))

+     d.__file__ = filename

+     try:

+         with open(filename) as config_file:

+             exec(compile(config_file.read(), filename, 'exec'), d.__dict__)

+     except IOError as e:

+         e.strerror = 'Unable to load configuration file (%s)' % e.strerror

+         raise

+     return d

+ 

+ 

  def run_cmd(cmd):

      """Runs given command in a subprocess.

  
@@ -287,6 +305,10 @@

          opts.ssh.transport = _get_conf(

              cp, "ssh", "transport", "paramiko")

  

+         opts.msg_buses = []

+         for bus_config in glob.glob('/etc/copr/msgbuses/*.conf'):

+             opts.msg_buses.append(pyconffile(bus_config))

+ 

          # thoughts for later

          # ssh key for connecting to builders?

          # cloud key stuff?

@@ -0,0 +1,172 @@

+ """

+ Message buses abstraction.

+ """

+ 

+ import logging

+ import copy

+ import json

+ 

+ from .constants import BuildStatus

+ 

+ try:

+     import fedmsg

+ except ImportError:

+     # fedmsg is optional

+     fedmsg = None

+ 

+ try:

+     import stomp

+ except ImportError:

+     # stomp is also optional

+     stomp = None

+ 

+ class MsgBus(object):

+     """

+     An "abstract" message bus class, don't instantiate!

+     """

+     messages = {}

+ 

+     def __init__(self, opts, log=None):

+         if not log:

+             log = logging

+             logging.basicConfig(level=logging.DEBUG)

+ 

+         self.log = log

+         self.opts = opts

+ 

+         if hasattr(self.opts, 'messages'):

+             self.messages.update(self.opts.messages)

+ 

+ 

+     def _send(self, topic, body, headers):

+         raise NotImplementedError

+ 

+ 

+     def send(self, topic, body, headers=None):

+         """

+         Send (dict) message over _send() method.

+         """

+         out_headers = copy.deepcopy(self.opts.headers)

+         if headers:

+             out_headers.update(copy.deepcopy(headers))

+         try:

+             self._send(topic, body, out_headers)

+         # pylint: disable=W0703

+         except Exception as _:

+             self.log.exception("Failed to publish message.")

+ 

+ 

+     def announce_job(self, topic, job, subst):

+         """

+         Announce everywhere that a build process started now.

+         """

+         if not topic in self.messages:

+             return

+ 

+ 

+         und = '(undefined)'

+         content = {

+             'user':        getattr(job, 'submitter', und),

+             'copr':        getattr(job, 'project_name', und),

+             'owner':       getattr(job, 'project_owner', und),

+             'pkg':         getattr(job, 'package_name', und),

+             'build':       getattr(job, 'build_id', und),

+             'chroot':      getattr(job, 'chroot', und),

+             'version':     getattr(job, 'package_version', und),

+             'status':      getattr(job, 'status', und),

+         }

+ 

+         content['str_status'] = BuildStatus.string(content['status'])

+ 

+         # Additional replacements?

+         if subst:

+             content.update(subst)

+ 

+         msg = {}

+         try:

+             for key in self.messages[topic]:

+                 msg[key] = self.messages[topic][key].format(**content)

+         # pylint: disable=W0703

+         except Exception as _:

+             self.log.exception("Failed to format '{0}' announcement."

+                                .format(topic))

+             return

+ 

+         self.send(topic, msg)

+ 

+ 

+ class MsgBusStomp(MsgBus):

+     """

+     Connect to STOMP bus and send messages.  Make sure you have correctly

+     configured 'messages' attribute in every message bus configuration, no

+     default messages here!

+     """

+ 

+     def __init__(self, opts, log=None):

+         super(MsgBusStomp, self).__init__(opts, log)

+ 

+         # shortcuts

+         host = self.opts.host

+         port = int(self.opts.port)

+         username = None

+         password = None

+ 

+         self.log.info("connecting to (stomp) message bus '{0}:{1}"

+                       .format(host, port))

+         self.conn = stomp.Connection([(host, int(port))])

+         self.conn.start()

+ 

+         if getattr(self.opts, 'auth', None):

+             username = self.opts.auth['username']

+             password = self.opts.auth['password']

+             self.log.info("authenticating with username '{0}'".format(username))

+ 

+         self.conn.connect(

+             username=username,

+             passcode=password,

+         )

+ 

+         if not getattr(self.opts, 'destination', None):

+             setattr(self.opts, 'destination', '/default')

+ 

+ 

+     def _send(self, topic, body, headers):

+         send_headers = copy.deepcopy(headers)

+         send_headers['topic'] = topic

+         self.conn.send(body=json.dumps(body), headers=send_headers,

+                        destination=self.opts.destination)

+ 

+ 

+ class MsgBusFedmsg(MsgBus):

+     """

+     Connect to fedmsg and send messages over it.

+     """

+     messages = {

+         'build.start': {

+             'who': '{who}',

+             'what': "build start: user:{user} copr:{copr}" \

+                     "pkg: {pkg} build:{build} ip:{ip}  pid:{pid}"

+         },

+         'chroot.start': {

+             'who': '{who}',

+             'what': "chroot start: chroot:{chroot} user:{user}" \

+                      "copr:{copr} pkg: {pkg} build:{build} ip:{ip}  pid:{pid}"

+         },

+         'build.end': {

+             'who': '{who}',

+             'what': "build end: user:{user} copr:{copr} build:{build}" \

+                     "  pkg: {pkg}  version: {version} ip:{ip}  pid:{pid} status:{status}"

+         },

+     }

+ 

+     def __init__(self, log=None):

+         # Hack to not require opts argument for now.

+         opts = type('', (), {})

+         opts.headers = {}

+ 

+         super(MsgBusFedmsg, self).__init__(opts, log)

+ 

+         fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True)

+ 

+     def _send(self, topic, body, headers):

+         fedmsg.publish(modname="copr", topic=topic, content=body)

@@ -0,0 +1,50 @@

+ """

+ Example configuration file for message bus.

+ """

+ 

+ bus_id = 'ci_message_bus'

+ 

+ host = 'bus.example.com'

+ port = '61613'

+ 

+ auth = {

+     'username': 'jenkins',

+     'password': 'johnHolmes',

+ }

+ 

+ # Put arbitrary message headers here.  E.g. you can identify your

+ # staging/production copr instance.

+ 

+ headers = {

+     'CI_TYPE': 'copr-service',

+     'copr_instance': 'development',

+ }

+ 

+ destination = "/topic/CI"

+ 

+ # Define message templates.  Each message is identified by "key" (e.g.

+ # 'build.start') and contains "key/value" pairs, while "values" are subject of

+ # string substitution at the time message is generated.  Available substitutions

+ # are:

+ #   'user' (submitter), 'copr', 'owner', 'pkg', 'build' (id), 'chroot',

+ #   'version' (pkg), 'ip' (of builder), 'who' (process name), 'pid' (of builder

+ #   process), 'status' (status ID, see class BuildStatus), 'status_str'

+ #   (user readable status)

+ 

+ messages = {

+     'build.start': {

+         'package': '{pkg}-{version}',

+         'chroot': '{chroot}',

+         'user': '{user}',

+     },

+     'build.end': {

+         'package': '{pkg}-{version}',

+         'chroot': '{chroot}',

+         'user': '{user}',

+     },

+     'chroot.start': {

+         'chroot': "{chroot}",

+     },

+ }

+ 

+ # vi: ft=python

Please have a look. This is related to
https://bugzilla.redhat.com/show_bug.cgi?id=1332438

I made the contribution a bit transparent WRT to fedpkg, so both fedkpg and stomp message buses are used at the same time, providing similar content (via the same API).

I would test this PR against fedmsg too (to check I don't break something) but I am probably not allowed to do it without credentials?

rebased

7 years ago

rebased

7 years ago

rebased

7 years ago

rebased

7 years ago

rebased

7 years ago

rebased

7 years ago

I now added the possibility to expand user-readable job status into message ('status_str'). Please have a look.

What about placing this into helpers.py?

I think it is quite nice for the time being. Have you managed to test out fedmsg part?

Ad moving ID => TEXT_ID convertor to helpers.py, I don't like the idea of retyping this on two places (iirc I already debugged ugly issue because of desync here). But if you insist on that, please ping me.

Ad testing fedmsg part: Unfortunately no, AFAIK I don't have a permissions to write there, right? If you helped me to get this, I can give it a try.

Ad moving ID => TEXT_ID convertor to helpers.py, I don't like the idea of retyping this on two places (iirc I already debugged ugly issue because of desync here). But if you insist on that, please ping me.

Ok, now I get it -> string copying is not needed probably. I'll try to move that...

rebased

7 years ago

Could you please have a look? I'm not proud of it .. but I don't know how I should avoid circular deps between constants.py and helpers.py. So I added the fix to separate commit at least for now.

2 new commits added

  • [backend] define class Enum in helpers.py
  • [backend] support for STOMP msg buses
7 years ago

Ye, the circular dep is a bit unlucky. I was thinking about making it a non-class method. I am happy with any working state for now.

Making it non-class method doesn't help WRT circular-dep problem, IMO. So if working state is OK for now, could I push to master without the second patch or is there anything else for fix?

I bet the fedmsg testing will be done semi-automatically afterwards, so I'll be able to help with fixing bugs.

rebased

7 years ago

Thanks! I dropped the second commit and now I'm merging.

Pull-Request has been merged by praiskup

7 years ago