From aff74c4b3a8064d58c952d102747836582908fa0 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 30 2020 08:14:49 +0000 Subject: proton: handling of lock failure --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index b34decf..23f4dd3 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -9,6 +9,7 @@ import json import logging import random +import psycopg2 from proton import Message, SSLDomain from proton.handlers import MessagingHandler from proton.reactor import Container @@ -313,6 +314,9 @@ def _send_msgs(urls, msgs, CONFIG): def store_to_db(msgs): + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') for msg in msgs: if isinstance(msg, tuple): address = msg[0] @@ -328,24 +332,29 @@ def store_to_db(msgs): # if we've something from db, we should store it in correct order insert.set(id=msg['db_id']) insert.execute() - context.cnx.commit() + c.execute('COMMIT') def query_from_db(): limit = CONFIG.getint('queue', 'batch_size', fallback=100) - c = context.cnx.cursor() - c.execute('BEGIN') - c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') - c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % - CONFIG.getint('queue', 'age', fallback=24)) - query = QueryProcessor(tables=('proton_queue',), - columns=('id', 'address', 'props', 'body'), - opts={'order': 'id', 'limit': limit}) - msgs = list(query.execute()) - if msgs: - c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', - {'ids': [msg['id'] for msg in msgs]}) - c.execute('COMMIT') + try: + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') + c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % + CONFIG.getint('queue', 'age', fallback=24)) + query = QueryProcessor(tables=('proton_queue',), + columns=('id', 'address', 'props', 'body'), + opts={'order': 'id', 'limit': limit}) + msgs = list(query.execute()) + if msgs: + c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', + {'ids': [msg['id'] for msg in msgs]}) + c.execute('COMMIT') + return msgs + except psycopg2.errors.LockNotAvailable: + return [] @ignore_error @@ -364,7 +373,7 @@ def send_queued_msgs(cbtype, *args, **kws): test_mode = CONFIG.getboolean('broker', 'test_mode') db_enabled = False if CONFIG.has_option('queue', 'enabled'): - db_enabled = CONFIG.getboolean('queue', 'test_mode') + db_enabled = CONFIG.getboolean('queue', 'enabled') if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) for msg in msgs: