From 99d090ec453377f87a03d8032a2e3c930f8cda8b Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Aug 13 2018 12:41:38 +0000 Subject: Pick up composes stuck in 'wait' states. --- diff --git a/server/odcs/server/backend.py b/server/odcs/server/backend.py index 126fbb9..4a2cca4 100644 --- a/server/odcs/server/backend.py +++ b/server/odcs/server/backend.py @@ -28,7 +28,7 @@ import shutil import six import productmd.compose import productmd.common -from datetime import datetime +from datetime import datetime, timedelta from odcs.server import log, conf, app, db from odcs.server.models import Compose, COMPOSE_STATES, COMPOSE_FLAGS from odcs.server.pungi import Pungi, PungiConfig, PungiSourceType, PungiLogs @@ -689,6 +689,32 @@ class ComposerThread(BackendThread): log.info("%r: Going to start compose generation.", compose) self.generate_new_compose(compose) + def pickup_waiting_composes(self): + """ + Gets all the composes in "wait" state and starts generating them. + + This method exists to pro-actively generate "wait" composes in case + the UMB message from frontend to backend is lost from whatever reason. + """ + # Composes transition from 'wait' to 'generating' quite fast. + # The frontend changes the state of compose to 'wait', sends a message + # to the bus and once some backend receives it, it moves it to + # 'generating'. This should not take more than 3 minutes, so that's + # the limit we will use to find out the stuck composes. + limit = datetime.utcnow() - timedelta(minutes=3) + # We don't want to be to greedy here, because there are other backends + # which can handle the lost composes too later, so just take few of + # them in each run in each backend to balance the load. + composes = Compose.query.filter( + Compose.state == COMPOSE_STATES["wait"], + Compose.time_submitted < limit).order_by( + Compose.id).limit(4).all() + + for compose in composes: + log.info("%r: Going to regenerate compose stuck in 'wait' " + "state.", compose) + self.generate_new_compose(compose) + def generate_lost_composes(self): """ Gets all the composes in "generating" state and continues with diff --git a/server/odcs/server/consumer.py b/server/odcs/server/consumer.py index 7bfac49..1c79a28 100644 --- a/server/odcs/server/consumer.py +++ b/server/odcs/server/consumer.py @@ -114,3 +114,4 @@ class ODCSConsumer(fedmsg.consumers.FedmsgConsumer): self.composer.generate_new_compose(compose) elif topic.endswith(conf.internal_messaging_topic): self.remove_expired_compose_thread.do_work() + self.composer.pickup_waiting_composes() diff --git a/server/tests/test_composerthread.py b/server/tests/test_composerthread.py index d33f95b..9ffe55d 100644 --- a/server/tests/test_composerthread.py +++ b/server/tests/test_composerthread.py @@ -22,8 +22,9 @@ import os import time +from datetime import datetime, timedelta -from mock import patch, MagicMock +from mock import patch, MagicMock, call import odcs.server from odcs.server import db, app @@ -354,3 +355,60 @@ class TestComposerThreadLostComposes(ModelsBaseTest): self.composer.currently_generating += [done.id, generating.id] self.composer.do_work() self.assertEqual(self.composer.currently_generating, [generating.id]) + + +class TestComposerThreadStuckWaitComposes(ModelsBaseTest): + maxDiff = None + + def setUp(self): + self.client = app.test_client() + super(TestComposerThreadStuckWaitComposes, self).setUp() + self.composer = ComposerThread() + + self.patch_generate_new_compose = patch( + "odcs.server.backend.ComposerThread.generate_new_compose") + self.generate_new_compose = self.patch_generate_new_compose.start() + + def tearDown(self): + super(TestComposerThreadStuckWaitComposes, self).tearDown() + self.patch_generate_new_compose.stop() + + def _add_test_compose(self, state, time_submitted=None): + compose = Compose.create( + db.session, "unknown", PungiSourceType.KOJI_TAG, "f26", + COMPOSE_RESULTS["repository"], 60, "", 0) + compose.state = state + if time_submitted: + compose.time_submitted = time_submitted + db.session.add(compose) + db.session.commit() + return compose + + def test_pickup_waiting_composes_generating_state(self): + time_submitted = datetime.utcnow() - timedelta(minutes=5) + composes = [] + for i in range(10): + composes.append(self._add_test_compose( + COMPOSE_STATES["wait"], time_submitted=time_submitted)) + composes = sorted(composes, key=lambda c: c.id) + self.composer.pickup_waiting_composes() + self.generate_new_compose.assert_has_calls([ + call(composes[0]), call(composes[1]), call(composes[2]), + call(composes[3])]) + + def test_pickup_waiting_composes_generating_state_not_old_enough(self): + composes = [] + for i in range(10): + composes.append(self._add_test_compose( + COMPOSE_STATES["wait"])) + composes = sorted(composes, key=lambda c: c.id) + self.composer.pickup_waiting_composes() + self.generate_new_compose.assert_not_called() + + def test_generate_lost_composes_generating_state(self): + composes = [] + for i in range(10): + composes.append(self._add_test_compose(COMPOSE_STATES["generating"])) + composes = sorted(composes, key=lambda c: c.id) + self.composer.pickup_waiting_composes() + self.generate_new_compose.assert_not_called() diff --git a/server/tests/test_consumer.py b/server/tests/test_consumer.py index c84c61c..3e6a4ab 100644 --- a/server/tests/test_consumer.py +++ b/server/tests/test_consumer.py @@ -106,7 +106,10 @@ class ConsumerTest(ConsumerBaseTest): generate_new_compose.assert_not_called() @mock.patch("odcs.server.backend.RemoveExpiredComposesThread.do_work") - def test_consumer_processing_internal_cleaup(self, remove_expired): + @mock.patch("odcs.server.backend.ComposerThread.pickup_waiting_composes") + def test_consumer_processing_internal_cleaup( + self, pickup_waiting_composes, remove_expired): msg = self._internal_clean_composes_msg() self.consumer.consume(msg) remove_expired.assert_called_once() + pickup_waiting_composes.assert_called_once()