#170 new msg bus options
Merged 6 years ago by praiskup. Opened 6 years ago by praiskup.
Unknown source new-bus-options  into  master

file modified
+77 -21
@@ -20,6 +20,12 @@

      # stomp is also optional

      stomp = None

  

+ 

+ class _LogAdapter(logging.LoggerAdapter):

+     def process(self, msg, kwargs):

+         return "[BUS '{0}'] {1}".format(self.extra['bus_id'], msg), kwargs

+ 

+ 

  class MsgBus(object):

      """

      An "abstract" message bus class, don't instantiate!
@@ -27,13 +33,17 @@

      messages = {}

  

      def __init__(self, opts, log=None):

+         self.opts = opts

+         # Fix bus_id soon enough.

+         self.opts.bus_id = getattr(self.opts, 'bus_id', type(self).__name__)

+ 

          if not log:

              log = logging

              logging.basicConfig(level=logging.DEBUG)

  

-         self.log = log

-         self.opts = opts

+         self.log = _LogAdapter(log, {'bus_id': self.opts.bus_id})

  

+         self.log.info("initializing bus")

          if hasattr(self.opts, 'messages'):

              self.messages.update(self.opts.messages)

  
@@ -92,6 +102,18 @@

          self.send(topic, msg)

  

  

+ class StompListener(stomp.ConnectionListener):

+     def __init__(self, msgbus):

+         self.msgbus = msgbus

+ 

+     def on_error(self, headers, message):

+         self.msgbus.log.warning('received an error "%s"' % message)

+ 

+     def on_disconnected(self):

+         self.msgbus.log.warning('disconnected, trying to connect again..')

+         self.msgbus.connect()

+ 

+ 

  class MsgBusStomp(MsgBus):

      """

      Connect to STOMP bus and send messages.  Make sure you have correctly
@@ -99,32 +121,66 @@

      default messages here!

      """

  

+     def connect(self):

+         """

+         connect (even repeatedly) to STOMP message bus

+         """

+         self.conn.start()

+         self.log.debug("connecting")

+         self.conn.connect(

+             # username/passcode can be None if ssl_key is used

+             username=self.username,

+             passcode=self.password,

+             wait=True,

+         )

+         if not getattr(self.opts, 'destination', None):

+             setattr(self.opts, 'destination', '/default')

+ 

+ 

      def __init__(self, opts, log=None):

          super(MsgBusStomp, self).__init__(opts, log)

  

+         hosts = []

          # shortcuts

-         host = self.opts.host

-         port = int(self.opts.port)

-         username = None

-         password = None

- 

-         self.log.info("connecting to (stomp) message bus '{0}:{1}"

-                       .format(host, port))

-         self.conn = stomp.Connection([(host, int(port))])

-         self.conn.start()

+         if getattr(self.opts, 'hosts', None):

+             assert type(self.opts.hosts) == list

+             hosts += self.opts.hosts

  

-         if getattr(self.opts, 'auth', None):

-             username = self.opts.auth['username']

-             password = self.opts.auth['password']

-             self.log.info("authenticating with username '{0}'".format(username))

+         if getattr(self.opts, 'host', None):

+             # TODO: Compat to be removed.

+             self.log.warning("obsoleted 'host' parameter, use 'hosts' " \

+                              "array (failover capable)")

+             hosts.append((self.opts.host, self.opts.port))

  

-         self.conn.connect(

-             username=username,

-             passcode=password,

-         )

+         # Ensure integer ports.

+         hosts = [(pair[0], int(pair[1])) for pair in hosts]

  

-         if not getattr(self.opts, 'destination', None):

-             setattr(self.opts, 'destination', '/default')

+         self.conn = stomp.Connection(hosts)

+         self.conn.set_listener('', StompListener(self))

+ 

+         # allow dict.get() (with default None) method

+         auth = {}

+         if getattr(self.opts, 'auth', None):

+             auth = self.opts.auth

+ 

+         self.username = auth.get('username')

+         self.password = auth.get('password')

+         ssl_key  = auth.get('key_file')

+         ssl_crt  = auth.get('cert_file')

+         cacert = getattr(self.opts, 'cacert', None)

+ 

+         if (ssl_key, ssl_crt, cacert) != (None, None, None):

+             self.log.debug("ssl: key = {0}, crt = {1}, cacert = {2}".format(

+                 ssl_key, ssl_crt, cacert))

+ 

+             self.conn.set_ssl(

+                     for_hosts=hosts,

+                     key_file=ssl_key,

+                     cert_file=ssl_crt,

+                     ca_certs=cacert

+             )

+ 

+         self.connect()

  

  

      def _send(self, topic, body, headers):

@@ -4,23 +4,33 @@

  

  bus_id = 'ci_message_bus'

  

- host = 'bus.example.com'

- port = '61613'

+ # we use python-stomppy, see it's documentation for more info

+ hosts = [

+     ('bus1.example.com', '61613'),

+     ('bus2.example.com', '61613'),

+ ]

  

  auth = {

+     # optional if Client certificate is used

      'username': 'jenkins',

      'password': 'johnHolmes',

+ 

+     # client certificate items

+     'ssl_key': '/my/auth.key',

+     'ssl_crt': '/my/auth.crt',

  }

  

- # Put arbitrary message headers here.  E.g. you can identify your

- # staging/production copr instance.

+ # CA that signed our client key (optional)

+ cacert = '/etc/pki/ca-trust/source/anchors/my-company.crt'

  

+ # headers which should be present in each message

  headers = {

      'CI_TYPE': 'copr-service',

      'copr_instance': 'development',

  }

  

- destination = "/topic/CI"

+ # topic we want to write to on the bus (stomppy syntax)

+ destination = "/topic/copr"

  

  # Define message templates.  Each message is identified by "key" (e.g.

  # 'build.start') and contains "key/value" pairs, while "values" are subject of

[backend] msgbus: support for ssl + initial connection failover

The 'host' and 'port' configuration options are obsoleted now in
favor of 'hosts' array (reflects the stomppy implementation).  To
be dropped.

The 'auth' config option now allows user to specify client's
key/crt and cacert bundle.

Slightly modified the connection policy;  worker wait's till the
stomp connection is done before it continues with the assigned
task.

Thanks, rebased to avoid merge commit, and merging.

rebased onto c334096dba48f6e6aea471e22156929db3002cd0

6 years ago

rebased onto 4d0272f

6 years ago

Pull-Request has been merged by praiskup

6 years ago