#1043 remove old messagebus plugin
Merged 5 years ago by mikem. Opened 5 years ago by tkopecek.
tkopecek/koji issue878  into  master

@@ -1,24 +0,0 @@ 

- # config file for the Koji messagebus plugin

- 

- [broker]

- host = amqp.example.com

- port = 5671

- ssl = true

- timeout = 10

- heartbeat = 60

- # PLAIN options

- auth = PLAIN

- username = guest

- password = guest

- # GSSAPI options

- # auth = GSSAPI

- # keytab = /etc/koji-hub/plugins/koji-messagebus.keytab

- # principal = messagebus/koji.example.com@EXAMPLE.COM

- 

- [exchange]

- name = koji.events

- type = topic

- durable = true

- 

- [topic]

- prefix = koji.event

file removed
-278
@@ -1,278 +0,0 @@ 

- # Koji callback for sending notifications about events to a messagebus (amqp broker)

- # Copyright (c) 2009-2014 Red Hat, Inc.

- #

- # Authors:

- #     Mike Bonnet <mikeb@redhat.com>

- 

- from __future__ import absolute_import

- from koji import PluginError

- from koji.context import context

- from koji.plugin import callbacks, callback, ignore_error, convert_datetime

- import six.moves.configparser

- import logging

- import qpid.messaging

- import qpid.messaging.transports

- from ssl import wrap_socket

- import socket

- import os

- try:

-     import krbV

- except ImportError:  # pragma: no cover

-     krbV = None

- 

- MAX_KEY_LENGTH = 255

- CONFIG_FILE = '/etc/koji-hub/plugins/messagebus.conf'

- 

- config = None

- session = None

- target = None

- 

- def connect_timeout(host, port, timeout):

-     for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):

-         af, socktype, proto, canonname, sa = res

-         sock = socket.socket(af, socktype, proto)

-         sock.settimeout(timeout)

-         try:

-             sock.connect(sa)

-             break

-         except socket.error:

-             sock.close()

-     else:

-         # If we got here then we couldn't connect (yet)

-         raise

-     return sock

- 

- class tlstimeout(qpid.messaging.transports.tls):

-     def __init__(self, conn, host, port):

-         self.socket = connect_timeout(host, port, getattr(conn, '_timeout'))

-         if conn.tcp_nodelay:

-             self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

-         self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile)

-         self.socket.setblocking(0)

-         self.state = None

-         self.write_retry = None

- 

- qpid.messaging.transports.TRANSPORTS['tls+timeout'] = tlstimeout

- 

- class Connection(qpid.messaging.Connection):

-     """

-     A connection class which supports a timeout option

-     to the establish() method.  Only necessary until

-     upstream Apache Qpid commit 1487578 is available in

-     a supported release.

-     """

-     @staticmethod

-     def establish(url=None, timeout=None, **options):

-         conn = Connection(url, **options)

-         conn._timeout = timeout

-         conn.open()

-         return conn

- 

-     def _wait(self, predicate, timeout=None):

-         if timeout is None and hasattr(self, '_timeout'):

-             timeout = self._timeout

-         return qpid.messaging.Connection._wait(self, predicate, timeout)

- 

- 

- def get_config():

-     global config

-     if config:

-         return config

- 

-     config = six.moves.configparser.SafeConfigParser()

-     config.read(CONFIG_FILE)

-     if not config.has_option('broker', 'timeout'):

-         config.set('broker', 'timeout', '60')

-     if not config.has_option('broker', 'heartbeat'):

-         config.set('broker', 'heartbeat', '60')

-     return config

- 

- 

- def get_sender():

-     global session, target

-     if session and target:

-         try:

-             return session.sender(target)

-         except:

-             logging.getLogger('koji.plugin.messagebus').warning('Error getting session, will retry', exc_info=True)

-             session = None

-             target = None

- 

-     config = get_config()

- 

-     if config.getboolean('broker', 'ssl'):

-         url = 'amqps://'

-     else:

-         url = 'amqp://'

-     auth = config.get('broker', 'auth')

-     if auth == 'PLAIN':

-         url += config.get('broker', 'username') + '/'

-         url += config.get('broker', 'password') + '@'

-     elif auth == 'GSSAPI':

-         if krbV is None:

-             # TODO: port this to python-gssapi

-             raise PluginError('krbV module not installed')

-         ccname = 'MEMORY:messagebus'

-         os.environ['KRB5CCNAME'] = ccname

-         ctx = krbV.default_context()

-         ccache = krbV.CCache(name=ccname, context=ctx)

-         cprinc = krbV.Principal(name=config.get('broker', 'principal'), context=ctx)

-         ccache.init(principal=cprinc)

-         keytab = krbV.Keytab(name='FILE:' + config.get('broker', 'keytab'), context=ctx)

-         ccache.init_creds_keytab(principal=cprinc, keytab=keytab)

-     else:

-         raise PluginError('unsupported auth type: %s' % auth)

- 

-     url += config.get('broker', 'host') + ':'

-     url += config.get('broker', 'port')

- 

-     conn = Connection.establish(url,

-                                 sasl_mechanisms=config.get('broker', 'auth'),

-                                 transport='tls+timeout',

-                                 timeout=config.getfloat('broker', 'timeout'),

-                                 heartbeat=config.getint('broker', 'heartbeat'))

-     sess = conn.session()

-     tgt = """%s;

-              { create: sender,

-                assert: always,

-                node: { type: topic,

-                        durable: %s,

-                        x-declare: { exchange: "%s",

-                                     type: %s } } }""" % \

-     (config.get('exchange', 'name'), config.getboolean('exchange', 'durable'),

-      config.get('exchange', 'name'), config.get('exchange', 'type'))

-     sender = sess.sender(tgt)

-     session = sess

-     target = tgt

- 

-     return sender

- 

- def _token_append(tokenlist, val):

-     # Replace any periods with underscores so we have a deterministic number of tokens

-     val = val.replace('.', '_')

-     tokenlist.append(val)

- 

- def get_message_subject(msgtype, *args, **kws):

-     key = [config.get('topic', 'prefix'), msgtype]

- 

-     if msgtype == 'PackageListChange':

-         _token_append(key, kws['tag']['name'])

-         _token_append(key, kws['package']['name'])

-     elif msgtype == 'TaskStateChange':

-         _token_append(key, kws['info']['method'])

-         _token_append(key, kws['attribute'])

-     elif msgtype == 'BuildStateChange':

-         info = kws['info']

-         _token_append(key, kws['attribute'])

-         _token_append(key, info['name'])

-     elif msgtype == 'Import':

-         _token_append(key, kws['type'])

-     elif msgtype in ('Tag', 'Untag'):

-         _token_append(key, kws['tag']['name'])

-         build = kws['build']

-         _token_append(key, build['name'])

-         _token_append(key, kws['user']['name'])

-     elif msgtype == 'RepoInit':

-         _token_append(key, kws['tag']['name'])

-     elif msgtype == 'RepoDone':

-         _token_append(key, kws['repo']['tag_name'])

- 

-     key = '.'.join(key)

-     key = key[:MAX_KEY_LENGTH]

-     return key

- 

- def get_message_headers(msgtype, *args, **kws):

-     headers = {'type': msgtype}

- 

-     if msgtype == 'PackageListChange':

-         headers['tag'] = kws['tag']['name']

-         headers['package'] = kws['package']['name']

-     elif msgtype == 'TaskStateChange':

-         headers['id'] = kws['info']['id']

-         headers['parent'] = kws['info']['parent']

-         headers['method'] = kws['info']['method']

-         headers['attribute'] = kws['attribute']

-         headers['old'] = kws['old']

-         headers['new'] = kws['new']

-     elif msgtype == 'BuildStateChange':

-         info = kws['info']

-         headers['name'] = info['name']

-         headers['version'] = info['version']

-         headers['release'] = info['release']

-         headers['attribute'] = kws['attribute']

-         headers['old'] = kws['old']

-         headers['new'] = kws['new']

-     elif msgtype == 'Import':

-         headers['importType'] = kws['type']

-     elif msgtype in ('Tag', 'Untag'):

-         headers['tag'] = kws['tag']['name']

-         build = kws['build']

-         headers['name'] = build['name']

-         headers['version'] = build['version']

-         headers['release'] = build['release']

-         headers['user'] = kws['user']['name']

-     elif msgtype == 'RepoInit':

-         headers['tag'] = kws['tag']['name']

-     elif msgtype == 'RepoDone':

-         headers['tag'] = kws['repo']['tag_name']

- 

-     return headers

- 

- @callback(*[c for c in callbacks.keys() if c.startswith('post')

-             and c != 'postCommit'])

- @ignore_error

- @convert_datetime

- def prep_message(cbtype, *args, **kws):

-     if cbtype.startswith('post'):

-         msgtype = cbtype[4:]

-     else:

-         msgtype = cbtype[3:]

- 

-     data = kws.copy()

-     if args:

-         data['args'] = list(args)

- 

-     config = get_config()

-     exchange_type = config.get('exchange', 'type')

-     if exchange_type == 'topic':

-         subject = get_message_subject(msgtype, *args, **kws)

-         message = qpid.messaging.Message(subject=subject, content=data)

-     elif exchange_type == 'headers':

-         headers = get_message_headers(msgtype, *args, **kws)

-         message = qpid.messaging.Message(properties=headers, content=data)

-     else:

-         raise PluginError('unsupported exchange type: %s' % exchange_type)

- 

-     messages = getattr(context, 'messagebus_plugin_messages', None)

-     if messages is None:

-         messages = []

-         context.messagebus_plugin_messages = messages

-     messages.append(message)

- 

- 

- @callback('postCommit')

- @ignore_error

- def send_messages(cbtype, *args, **kws):

-     '''Send the messages cached by prep_message'''

- 

-     logger = logging.getLogger('koji.plugin.messagebus')

-     config = get_config()

-     messages = getattr(context, 'messagebus_plugin_messages', [])

-     if not messages:

-         return

-     test_mode = False

-     if config.has_option('broker', 'test_mode'):

-         test_mode = config.getboolean('broker', 'test_mode')

-     if test_mode:

-         logger.debug('test mode: skipping broker connection')

-         for message in messages:

-             logger.debug('test mode: skipping message: %r', message)

-     else:

-         sender = get_sender()

-         for message in messages:

-             sender.send(message, sync=False,

-                     timeout=config.getfloat('broker', 'timeout'))

-         sender.close(timeout=config.getfloat('broker', 'timeout'))

- 

-     # koji should do this for us, but just in case...

-     del context.messagebus_plugin_messages

There is no more known user. If you need messaging functionality, you
could migrate to protonmsg plugin.

Fixes: https://pagure.io/koji/issue/878

Won't Fedora need an AMQP based message plugin soon? I'm not saying keep this one (because I've had a ton of trouble with qpid and don't like it...), but we probably might want a new amqp plugin using pika instead (since pika is well-maintained).

Isn't proton plugin sufficient for that?

Huh, I didn't realize both of them were AMQP plugins. But no, Qpid-proton is AMQP 1.0, which is not what Fedora is deploying. And apparently this one won't work either, since it doesn't support the right AMQP version, either...

RabbitMQ uses AMQP 0-9-1, which as far as I'm aware, is apparently supported only by pika.

@kevin Is Fedora going to need this plugin?

No, we use a fedmsg plugin currently ( https://infrastructure.fedoraproject.org/cgit/ansible.git/tree/roles/koji_hub/templates/fedmsg-koji-plugin.py )

I imagine we will port that to our new fedora messaging setup, but we don't use this one as far as I know.

Adding @jcline and @abompard for more comment if I got something wrong...

I imagine we will port that to our new fedora messaging setup, but we don't use this one as far as I know.

Sounds right to me. In theory rabbit has a plugin for AMQP 1.0 support, but porting that plugin to the pika-based client should be trivial.

@jcline Could we get a generic pika-based AMQP plugin that happens to work with new fedora messaging that could be upstreamed?

That is certainly possible. fedora-messaging is just boilerplate around pika to enforce certain JSON schema and handle the details of setting up the connection to the broker. What's the use case you have where taking the fedora plugin won't work, but you also need AMQP 0.9.1 support?

@jcline Usage of Koji in Mageia, maybe also CentOS with CBS, and some private instance systems too.

Commit ecfcb5b fixes this pull-request

Pull-Request has been merged by mikem

5 years ago