From f434fb732a7e834b8f05245ca1d3631a305d6c27 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Aug 29 2018 06:12:39 +0000 Subject: Remove limit of number of Pulp composes picked up in the pickup_waiting_composes(). --- diff --git a/server/odcs/server/backend.py b/server/odcs/server/backend.py index 6beaed3..0c1d86e 100644 --- a/server/odcs/server/backend.py +++ b/server/odcs/server/backend.py @@ -711,23 +711,37 @@ class ComposerThread(BackendThread): now = datetime.utcnow() from_time = now - timedelta(days=3) to_time = now - timedelta(minutes=3) - # We don't want to be to greedy here, because there are other backends - # which can handle the lost composes too later, so just take few of - # them in each run in each backend to balance the load. + + # Get composes which are in 'wait' state for too long. composes = Compose.query.filter( Compose.state == COMPOSE_STATES["wait"], Compose.time_submitted < to_time).order_by( - Compose.id).limit(4).all() + Compose.id).all() + + # We don't want to be to greedy here, because there are other backends + # which can handle the lost composes too later, so just take few non-Pulp + # composes each time. + # Pulp composes are much cheaper to generate - usually just single call to + # Pulp, so get all of them. + non_pulp_composes_count = 0 for compose in composes: if compose.time_submitted < from_time: compose.state = COMPOSE_STATES["failed"] compose.state_reason = "Compose stuck in 'wait' state for longer than 3 days." db.session.add(compose) - else: - log.info("%r: Going to regenerate compose stuck in 'wait' " - "state.", compose) - self.generate_new_compose(compose) + continue + + # Take only num_concurrent_pungi * 2 non-Pulp composes to keep some queue + # but left something for other backends. + if compose.source_type != PungiSourceType.PULP: + non_pulp_composes_count += 1 + if non_pulp_composes_count > conf.num_concurrent_pungi * 2: + continue + + log.info("%r: Going to regenerate compose stuck in 'wait' " + "state.", compose) + self.generate_new_compose(compose) db.session.commit() def generate_lost_composes(self): diff --git a/server/tests/test_composerthread.py b/server/tests/test_composerthread.py index e3ab45a..f042a85 100644 --- a/server/tests/test_composerthread.py +++ b/server/tests/test_composerthread.py @@ -373,9 +373,10 @@ class TestComposerThreadStuckWaitComposes(ModelsBaseTest): super(TestComposerThreadStuckWaitComposes, self).tearDown() self.patch_generate_new_compose.stop() - def _add_test_compose(self, state, time_submitted=None): + def _add_test_compose(self, state, time_submitted=None, + source_type=PungiSourceType.KOJI_TAG): compose = Compose.create( - db.session, "unknown", PungiSourceType.KOJI_TAG, "f26", + db.session, "unknown", source_type, "f26", COMPOSE_RESULTS["repository"], 60, "", 0) compose.state = state if time_submitted: @@ -422,3 +423,22 @@ class TestComposerThreadStuckWaitComposes(ModelsBaseTest): composes = sorted(composes, key=lambda c: c.id) self.composer.pickup_waiting_composes() self.generate_new_compose.assert_not_called() + + def test_pickup_waiting_composes_no_limit_for_pulp(self): + time_submitted = datetime.utcnow() - timedelta(minutes=5) + composes = [] + for i in range(10): + composes.append(self._add_test_compose( + COMPOSE_STATES["wait"], time_submitted=time_submitted)) + for i in range(10): + composes.append(self._add_test_compose( + COMPOSE_STATES["wait"], time_submitted=time_submitted, + source_type=PungiSourceType.PULP)) + composes = sorted(composes, key=lambda c: c.id) + self.composer.pickup_waiting_composes() + self.generate_new_compose.assert_has_calls([ + call(composes[0]), call(composes[1]), call(composes[2]), + call(composes[3]), call(composes[10]), call(composes[11]), + call(composes[12]), call(composes[13]), call(composes[14]), + call(composes[15]), call(composes[16]), call(composes[17]), + call(composes[18]), call(composes[19])])