From b37bcce487f43fbe84dbcbef3596ddd65c43ac59 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Dec 13 2017 08:48:30 +0000 Subject: Regenerate composes lost during the ODCS restarts in case when restar happens in the middle of compose generation. --- diff --git a/server/odcs/server/backend.py b/server/odcs/server/backend.py index c7c77b9..0a53272 100644 --- a/server/odcs/server/backend.py +++ b/server/odcs/server/backend.py @@ -549,7 +549,7 @@ def validate_pungi_compose(compose): raise RuntimeError(msg) -def generate_compose(compose_id): +def generate_compose(compose_id, lost_compose=False): """ Generates the compose defined by its `compose_id`. It is run by ThreadPoolExecutor from the ComposerThread. @@ -603,7 +603,22 @@ class ComposerThread(BackendThread): super(ComposerThread, self).__init__(1) self.executor = ThreadPoolExecutor(conf.num_concurrent_pungi) - def do_work(self): + # List of composes which are being currently generated by this ODCS + # instance. + self.currently_generating = [] + + def _generate_new_compose(self, compose): + """ + Adds the compose to queue of composes to generate, so + the ThreadPoolExecutor can start working on it. + """ + compose.state = COMPOSE_STATES["generating"] + db.session.add(compose) + db.session.commit() + self.currently_generating.append(compose.id) + self.executor.submit(generate_compose, compose.id) + + def generate_new_composes(self): """ Gets all the composes in "wait" state. Generates them using Pungi by calling `generate_compose(...)` in ThreadPoolExecutor. @@ -613,10 +628,50 @@ class ComposerThread(BackendThread): for compose in composes: log.info("%r: Going to start compose generation.", compose) - compose.state = COMPOSE_STATES["generating"] - db.session.add(compose) - db.session.commit() - self.executor.submit(generate_compose, compose.id) + self._generate_new_compose(compose) + + def generate_lost_composes(self): + """ + Gets all the composes in "generating" state and continues with + the generation process. + + This method is here to handle situation where the ODCS is restarted + in the middle of compose generation. + """ + composes = Compose.query.filter( + Compose.state == COMPOSE_STATES["generating"]).all() + + for compose in composes: + if compose.id in self.currently_generating: + # We already have a thread working on this compose. + continue + + log.info("%r: Going to regenerate lost compose.", compose) + self._generate_new_compose(compose) + + def refresh_currently_generating(self): + """ + Checks the status of all composes in self.currently_generating + and removes those which have been already done from this list. + """ + + new_currently_generating_list = [] + for compose_id in self.currently_generating: + compose = Compose.query.filter(Compose.id == compose_id).one() + if compose.state != COMPOSE_STATES["generating"]: + continue + + new_currently_generating_list.append(compose_id) + self.currently_generating = new_currently_generating_list + + def do_work(self): + """ + Gets all the composes in "wait" state. Generates them using Pungi + by calling `generate_compose(...)` in ThreadPoolExecutor. + """ + self.generate_lost_composes() + self.generate_new_composes() + self.refresh_currently_generating() def run_backend(): diff --git a/server/tests/test_composerthread.py b/server/tests/test_composerthread.py index 7c92a0c..b3b583d 100644 --- a/server/tests/test_composerthread.py +++ b/server/tests/test_composerthread.py @@ -102,12 +102,15 @@ class TestComposerThread(ModelsBaseTest): c = db.session.query(Compose).filter(Compose.id == 1).one() self.assertEqual(c.state, COMPOSE_STATES["wait"]) + self.assertEqual(self.composer.currently_generating, []) + self.composer.do_work() c = self._wait_for_compose_state(1, COMPOSE_STATES["done"]) self.assertEqual(c.state, COMPOSE_STATES["done"]) self.assertEqual(c.result_repo_dir, os.path.join(odcs.server.conf.target_dir, "latest-odcs-1-1/compose/Temporary")) self.assertEqual(c.result_repo_url, "http://localhost/odcs/latest-odcs-1-1/compose/Temporary") + self.assertEqual(self.composer.currently_generating, [1]) @mock_pdc @patch("odcs.server.utils.execute_cmd") @@ -260,3 +263,55 @@ class TestComposerThread(ModelsBaseTest): self.assertEqual(c.result_repo_dir, os.path.join(odcs.server.conf.target_dir, "latest-odcs-2-1/compose/Temporary")) self.assertEqual(c.result_repo_url, "http://localhost/odcs/latest-odcs-2-1/compose/Temporary") + + +class TestComposerThreadLostComposes(ModelsBaseTest): + maxDiff = None + + def setUp(self): + self.client = app.test_client() + super(TestComposerThreadLostComposes, self).setUp() + self.composer = ComposerThread() + + self.patch_generate_new_compose = patch( + "odcs.server.backend.ComposerThread._generate_new_compose") + self.generate_new_compose = self.patch_generate_new_compose.start() + + def tearDown(self): + super(TestComposerThreadLostComposes, self).tearDown() + self.patch_generate_new_compose.stop() + + def _add_test_compose(self, state): + compose = Compose.create( + db.session, "unknown", PungiSourceType.KOJI_TAG, "f26", + COMPOSE_RESULTS["repository"], 60, "", 0) + compose.state = state + db.session.add(compose) + db.session.commit() + return compose + + def test_generate_lost_composes_generating_state(self): + compose = self._add_test_compose(COMPOSE_STATES["generating"]) + self.composer.do_work() + self.generate_new_compose.assert_called_once_with(compose) + + def test_generate_lost_composes_currently_generating(self): + compose = self._add_test_compose(COMPOSE_STATES["generating"]) + self.composer.currently_generating.append(compose.id) + self.composer.do_work() + self.generate_new_compose.assert_not_called() + + def test_generate_lost_composes_all_states(self): + for state in ["wait", "done", "removed", "failed"]: + self._add_test_compose(COMPOSE_STATES[state]) + + self.composer.generate_lost_composes() + self.generate_new_compose.assert_not_called() + + def test_refresh_currently_generating(self): + generating = self._add_test_compose(COMPOSE_STATES["generating"]) + done = self._add_test_compose(COMPOSE_STATES["done"]) + + self.composer.currently_generating += [done.id, generating.id] + self.composer.do_work() + self.assertEqual(self.composer.currently_generating, [generating.id])