| |
@@ -9,6 +9,7 @@
|
| |
import logging
|
| |
import random
|
| |
|
| |
+ import psycopg2
|
| |
from proton import Message, SSLDomain
|
| |
from proton.handlers import MessagingHandler
|
| |
from proton.reactor import Container
|
| |
@@ -16,6 +17,7 @@
|
| |
import koji
|
| |
from koji.context import context
|
| |
from koji.plugin import callback, convert_datetime, ignore_error
|
| |
+ from kojihub import QueryProcessor, InsertProcessor
|
| |
|
| |
CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'
|
| |
CONFIG = None
|
| |
@@ -295,34 +297,125 @@
|
| |
queue_msg(address, props, kws)
|
| |
|
| |
|
| |
+ def _send_msgs(urls, msgs, CONFIG):
|
| |
+ random.shuffle(urls)
|
| |
+ for url in urls:
|
| |
+ container = Container(TimeoutHandler(url, msgs, CONFIG))
|
| |
+ container.run()
|
| |
+ if msgs:
|
| |
+ LOG.debug('could not send to %s, %s messages remaining',
|
| |
+ url, len(msgs))
|
| |
+ else:
|
| |
+ LOG.debug('all messages sent to %s successfully', url)
|
| |
+ break
|
| |
+ else:
|
| |
+ LOG.error('could not send messages to any destinations')
|
| |
+ return msgs
|
| |
+
|
| |
+
|
| |
+ def store_to_db(msgs):
|
| |
+ c = context.cnx.cursor()
|
| |
+ # we're running in postCommit, so we need to handle new transaction
|
| |
+ c.execute('BEGIN')
|
| |
+ for msg in msgs:
|
| |
+ if isinstance(msg, tuple):
|
| |
+ address = msg[0]
|
| |
+ props = json.dumps(msg[1])
|
| |
+ body = msg[2]
|
| |
+ else:
|
| |
+ address = msg['address']
|
| |
+ body = msg['body'] # already serialized
|
| |
+ props = json.dumps(msg['props'])
|
| |
+ insert = InsertProcessor(table='proton_queue')
|
| |
+ insert.set(address=address, props=props, body=body)
|
| |
+ if 'id' in msg:
|
| |
+ # if we've something from db, we should store it in correct order
|
| |
+ insert.set(id=msg['db_id'])
|
| |
+ insert.execute()
|
| |
+ c.execute('COMMIT')
|
| |
+
|
| |
+
|
| |
+ def handle_db_msgs(urls, CONFIG):
|
| |
+ limit = CONFIG.getint('queue', 'batch_size', fallback=100)
|
| |
+ c = context.cnx.cursor()
|
| |
+ # we're running in postCommit, so we need to handle new transaction
|
| |
+ c.execute('BEGIN')
|
| |
+ try:
|
| |
+ c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
|
| |
+ except psycopg2.OperationalError:
|
| |
+ LOG.debug('skipping db queue due to lock')
|
| |
+ return
|
| |
+ try:
|
| |
+ c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %
|
| |
+ CONFIG.getint('queue', 'age', fallback=24))
|
| |
+ query = QueryProcessor(tables=('proton_queue',),
|
| |
+ columns=('id', 'address', 'props', 'body'),
|
| |
+ opts={'order': 'id', 'limit': limit})
|
| |
+ msgs = list(query.execute())
|
| |
+ if CONFIG.getboolean('broker', 'test_mode', fallback=False):
|
| |
+ if msgs:
|
| |
+ LOG.debug('test mode: skipping send for %i messages from db', len(msgs))
|
| |
+ unsent = []
|
| |
+ else:
|
| |
+ unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)}
|
| |
+ sent = [m for m in msgs if m['id'] not in unsent]
|
| |
+ if msgs:
|
| |
+ c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
|
| |
+ {'ids': [msg['id'] for msg in sent]})
|
| |
+ finally:
|
| |
+ # make sure we free the lock
|
| |
+ try:
|
| |
+ c.execute('COMMIT')
|
| |
+ except Exception:
|
| |
+ c.execute('ROLLBACK')
|
| |
+
|
| |
+
|
| |
@ignore_error
|
| |
@convert_datetime
|
| |
@callback('postCommit')
|
| |
def send_queued_msgs(cbtype, *args, **kws):
|
| |
+ global CONFIG
|
| |
msgs = getattr(context, 'protonmsg_msgs', None)
|
| |
if not msgs:
|
| |
return
|
| |
- global CONFIG
|
| |
if not CONFIG:
|
| |
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
|
| |
urls = CONFIG.get('broker', 'urls').split()
|
| |
test_mode = False
|
| |
if CONFIG.has_option('broker', 'test_mode'):
|
| |
test_mode = CONFIG.getboolean('broker', 'test_mode')
|
| |
+ db_enabled = False
|
| |
+ if CONFIG.has_option('queue', 'enabled'):
|
| |
+ db_enabled = CONFIG.getboolean('queue', 'enabled')
|
| |
+
|
| |
if test_mode:
|
| |
LOG.debug('test mode: skipping send to urls: %r', urls)
|
| |
- for msg in msgs:
|
| |
- LOG.debug('test mode: skipped msg: %r', msg)
|
| |
- return
|
| |
- random.shuffle(urls)
|
| |
- for url in urls:
|
| |
- container = Container(TimeoutHandler(url, msgs, CONFIG))
|
| |
- container.run()
|
| |
- if msgs:
|
| |
- LOG.debug('could not send to %s, %s messages remaining',
|
| |
- url, len(msgs))
|
| |
+ fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0)
|
| |
+ if fail_chance:
|
| |
+ # simulate unsent messages in test mode
|
| |
+ sent = []
|
| |
+ unsent = []
|
| |
+ for m in msgs:
|
| |
+ if random.randint(1, 100) <= fail_chance:
|
| |
+ unsent.append(m)
|
| |
+ else:
|
| |
+ sent.append(m)
|
| |
+ if unsent:
|
| |
+ LOG.info('simulating %i unsent messages' % len(unsent))
|
| |
else:
|
| |
- LOG.debug('all messages sent to %s successfully', url)
|
| |
- break
|
| |
+ sent = msgs
|
| |
+ unsent = []
|
| |
+ for msg in sent:
|
| |
+ LOG.debug('test mode: skipped msg: %r', msg)
|
| |
else:
|
| |
- LOG.error('could not send messages to any destinations')
|
| |
+ unsent = _send_msgs(urls, msgs, CONFIG)
|
| |
+
|
| |
+ if db_enabled:
|
| |
+ if unsent:
|
| |
+ # if we still have some messages, store them and leave for another call to pick them up
|
| |
+ store_to_db(msgs)
|
| |
+ else:
|
| |
+ # otherwise we are another call - look to db if there remains something to send
|
| |
+ handle_db_msgs(urls, CONFIG)
|
| |
+ elif unsent:
|
| |
+ LOG.error('could not send %i messages. db queue disabled' % len(msgs))
|
| |
Fixes: https://pagure.io/koji/issue/2230