| |
@@ -20,6 +20,12 @@
|
| |
# stomp is also optional
|
| |
stomp = None
|
| |
|
| |
+
|
| |
+ class _LogAdapter(logging.LoggerAdapter):
|
| |
+ def process(self, msg, kwargs):
|
| |
+ return "[BUS '{0}'] {1}".format(self.extra['bus_id'], msg), kwargs
|
| |
+
|
| |
+
|
| |
class MsgBus(object):
|
| |
"""
|
| |
An "abstract" message bus class, don't instantiate!
|
| |
@@ -27,13 +33,17 @@
|
| |
messages = {}
|
| |
|
| |
def __init__(self, opts, log=None):
|
| |
+ self.opts = opts
|
| |
+ # Fix bus_id soon enough.
|
| |
+ self.opts.bus_id = getattr(self.opts, 'bus_id', type(self).__name__)
|
| |
+
|
| |
if not log:
|
| |
log = logging
|
| |
logging.basicConfig(level=logging.DEBUG)
|
| |
|
| |
- self.log = log
|
| |
- self.opts = opts
|
| |
+ self.log = _LogAdapter(log, {'bus_id': self.opts.bus_id})
|
| |
|
| |
+ self.log.info("initializing bus")
|
| |
if hasattr(self.opts, 'messages'):
|
| |
self.messages.update(self.opts.messages)
|
| |
|
| |
@@ -92,6 +102,18 @@
|
| |
self.send(topic, msg)
|
| |
|
| |
|
| |
+ class StompListener(stomp.ConnectionListener):
|
| |
+ def __init__(self, msgbus):
|
| |
+ self.msgbus = msgbus
|
| |
+
|
| |
+ def on_error(self, headers, message):
|
| |
+ self.msgbus.log.warning('received an error "%s"' % message)
|
| |
+
|
| |
+ def on_disconnected(self):
|
| |
+ self.msgbus.log.warning('disconnected, trying to connect again..')
|
| |
+ self.msgbus.connect()
|
| |
+
|
| |
+
|
| |
class MsgBusStomp(MsgBus):
|
| |
"""
|
| |
Connect to STOMP bus and send messages. Make sure you have correctly
|
| |
@@ -99,32 +121,66 @@
|
| |
default messages here!
|
| |
"""
|
| |
|
| |
+ def connect(self):
|
| |
+ """
|
| |
+ connect (even repeatedly) to STOMP message bus
|
| |
+ """
|
| |
+ self.conn.start()
|
| |
+ self.log.debug("connecting")
|
| |
+ self.conn.connect(
|
| |
+ # username/passcode can be None if ssl_key is used
|
| |
+ username=self.username,
|
| |
+ passcode=self.password,
|
| |
+ wait=True,
|
| |
+ )
|
| |
+ if not getattr(self.opts, 'destination', None):
|
| |
+ setattr(self.opts, 'destination', '/default')
|
| |
+
|
| |
+
|
| |
def __init__(self, opts, log=None):
|
| |
super(MsgBusStomp, self).__init__(opts, log)
|
| |
|
| |
+ hosts = []
|
| |
# shortcuts
|
| |
- host = self.opts.host
|
| |
- port = int(self.opts.port)
|
| |
- username = None
|
| |
- password = None
|
| |
-
|
| |
- self.log.info("connecting to (stomp) message bus '{0}:{1}"
|
| |
- .format(host, port))
|
| |
- self.conn = stomp.Connection([(host, int(port))])
|
| |
- self.conn.start()
|
| |
+ if getattr(self.opts, 'hosts', None):
|
| |
+ assert type(self.opts.hosts) == list
|
| |
+ hosts += self.opts.hosts
|
| |
|
| |
- if getattr(self.opts, 'auth', None):
|
| |
- username = self.opts.auth['username']
|
| |
- password = self.opts.auth['password']
|
| |
- self.log.info("authenticating with username '{0}'".format(username))
|
| |
+ if getattr(self.opts, 'host', None):
|
| |
+ # TODO: Compat to be removed.
|
| |
+ self.log.warning("obsoleted 'host' parameter, use 'hosts' " \
|
| |
+ "array (failover capable)")
|
| |
+ hosts.append((self.opts.host, self.opts.port))
|
| |
|
| |
- self.conn.connect(
|
| |
- username=username,
|
| |
- passcode=password,
|
| |
- )
|
| |
+ # Ensure integer ports.
|
| |
+ hosts = [(pair[0], int(pair[1])) for pair in hosts]
|
| |
|
| |
- if not getattr(self.opts, 'destination', None):
|
| |
- setattr(self.opts, 'destination', '/default')
|
| |
+ self.conn = stomp.Connection(hosts)
|
| |
+ self.conn.set_listener('', StompListener(self))
|
| |
+
|
| |
+ # allow dict.get() (with default None) method
|
| |
+ auth = {}
|
| |
+ if getattr(self.opts, 'auth', None):
|
| |
+ auth = self.opts.auth
|
| |
+
|
| |
+ self.username = auth.get('username')
|
| |
+ self.password = auth.get('password')
|
| |
+ ssl_key = auth.get('key_file')
|
| |
+ ssl_crt = auth.get('cert_file')
|
| |
+ cacert = getattr(self.opts, 'cacert', None)
|
| |
+
|
| |
+ if (ssl_key, ssl_crt, cacert) != (None, None, None):
|
| |
+ self.log.debug("ssl: key = {0}, crt = {1}, cacert = {2}".format(
|
| |
+ ssl_key, ssl_crt, cacert))
|
| |
+
|
| |
+ self.conn.set_ssl(
|
| |
+ for_hosts=hosts,
|
| |
+ key_file=ssl_key,
|
| |
+ cert_file=ssl_crt,
|
| |
+ ca_certs=cacert
|
| |
+ )
|
| |
+
|
| |
+ self.connect()
|
| |
|
| |
|
| |
def _send(self, topic, body, headers):
|
| |
[backend] msgbus: support for ssl + initial connection failover