From 95b54c963ff28ecc2df9b5d68d6e2696cc5b43e2 Mon Sep 17 00:00:00 2001 From: Andrei Paplauski Date: May 05 2020 14:18:16 +0000 Subject: Resolve exception when DB is disconnected Now when database is disconnected(restarted) during ongoing transaction, that transaction will be rolled back and connection must be established again as expected. RESOLVE: CLOUDWF-515 --- diff --git a/freshmaker/producer.py b/freshmaker/producer.py index 4cb6088..6e18cc4 100644 --- a/freshmaker/producer.py +++ b/freshmaker/producer.py @@ -31,6 +31,8 @@ from freshmaker.kojiservice import koji_service from freshmaker.events import BrewContainerTaskStateChangeEvent from freshmaker.consumer import work_queue_put +from sqlalchemy.exc import StatementError + class FreshmakerProducer(PollingProducer): frequency = timedelta(seconds=conf.polling_interval) @@ -38,6 +40,9 @@ class FreshmakerProducer(PollingProducer): def poll(self): try: self.check_unfinished_koji_tasks(db.session) + except StatementError as ex: + db.session.rollback() + log.error("Invalid request, session is rolled back: %s", ex.orig) except Exception: msg = 'Error in poller execution:' log.exception(msg) diff --git a/tests/test_producer.py b/tests/test_producer.py index 7d98bc0..cdc8cef 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -57,7 +57,7 @@ class TestCheckUnfinishedKojiTasks(helpers.ModelsTestCase): self.koji_read_config_patcher.stop() @patch('freshmaker.kojiservice.KojiService.get_task_info') - @patch("freshmaker.consumer.get_global_consumer") + @patch('freshmaker.consumer.get_global_consumer') def test_koji_task_failed(self, global_consumer, get_task_info): consumer = self.create_consumer() global_consumer.return_value = consumer @@ -72,7 +72,7 @@ class TestCheckUnfinishedKojiTasks(helpers.ModelsTestCase): self.assertEqual(event.new_state, "FAILED") @patch('freshmaker.kojiservice.KojiService.get_task_info') - @patch("freshmaker.consumer.get_global_consumer") + @patch('freshmaker.consumer.get_global_consumer') def test_koji_task_closed(self, global_consumer, get_task_info): consumer = self.create_consumer() global_consumer.return_value = consumer @@ -87,7 +87,7 @@ class TestCheckUnfinishedKojiTasks(helpers.ModelsTestCase): self.assertEqual(event.new_state, "CLOSED") @patch('freshmaker.kojiservice.KojiService.get_task_info') - @patch("freshmaker.consumer.get_global_consumer") + @patch('freshmaker.consumer.get_global_consumer') def test_koji_task_dry_run(self, global_consumer, get_task_info): self.build.build_id = -10 consumer = self.create_consumer() @@ -101,7 +101,7 @@ class TestCheckUnfinishedKojiTasks(helpers.ModelsTestCase): self.assertRaises(queue.Empty, consumer.incoming.get, block=False) @patch('freshmaker.kojiservice.KojiService.get_task_info') - @patch("freshmaker.consumer.get_global_consumer") + @patch('freshmaker.consumer.get_global_consumer') def test_koji_task_open(self, global_consumer, get_task_info): self.build.build_id = -10 consumer = self.create_consumer() @@ -113,3 +113,39 @@ class TestCheckUnfinishedKojiTasks(helpers.ModelsTestCase): producer = FreshmakerProducer(hub) producer.check_unfinished_koji_tasks(db.session) self.assertRaises(queue.Empty, consumer.incoming.get, block=False) + + @patch('freshmaker.kojiservice.KojiService.get_task_info') + @patch('freshmaker.consumer.get_global_consumer') + def test_koji_invalid_request(self, global_consumer, get_task_info): + from sqlalchemy.exc import StatementError, InvalidRequestError + from sqlalchemy import select + self.build.build_id = -10 + consumer = self.create_consumer() + global_consumer.return_value = consumer + + get_task_info.return_value = {'state': koji.TASK_STATES['OPEN']} + + hub = MagicMock() + producer = FreshmakerProducer(hub) + # make new session to hold new connection and transaction + my_session = db.session() + # create connection and begin new transaction + my_connection = my_session.connection() + my_connection.begin() + # invalidate one connection to simulate disconnect from DB + my_connection.invalidate() + + # check if it will raise Statement Error + with self.assertRaises(StatementError) as cm: + producer.check_unfinished_koji_tasks(my_session) + # check if Statement Error is caused by InvalidRequestError + self.assertIsInstance(cm.exception.orig, InvalidRequestError) + + # rollback session, and rollback invalid transaction inside it + my_session.rollback() + + # check that new connection is created in our session + self.assertIsNot(my_connection, my_session.connection()) + # Check if connection to db is established again + my_session.connection().scalar(select([1])) + self.assertFalse(my_session.connection().invalidated)