#214 Pick up composes stuck in 'wait' states.
Merged 11 months ago by jkaluza. Opened 11 months ago by jkaluza.
jkaluza/odcs poller  into  master

file modified
+27 -1

@@ -28,7 +28,7 @@ 

  import six

  import productmd.compose

  import productmd.common

- from datetime import datetime

+ from datetime import datetime, timedelta

  from odcs.server import log, conf, app, db

  from odcs.server.models import Compose, COMPOSE_STATES, COMPOSE_FLAGS

  from odcs.server.pungi import Pungi, PungiConfig, PungiSourceType, PungiLogs

@@ -689,6 +689,32 @@ 

              log.info("%r: Going to start compose generation.", compose)

              self.generate_new_compose(compose)

  

+     def pickup_waiting_composes(self):

+         """

+         Gets all the composes in "wait" state and starts generating them.

+ 

+         This method exists to pro-actively generate "wait" composes in case

+         the UMB message from frontend to backend is lost from whatever reason.

+         """

+         # Composes transition from 'wait' to 'generating' quite fast.

+         # The frontend changes the state of compose to 'wait', sends a message

+         # to the bus and once some backend receives it, it moves it to

+         # 'generating'. This should not take more than 3 minutes, so that's

+         # the limit we will use to find out the stuck composes.

+         limit = datetime.utcnow() - timedelta(minutes=3)

+         # We don't want to be to greedy here, because there are other backends

+         # which can handle the lost composes too later, so just take few of

+         # them in each run in each backend to balance the load.

+         composes = Compose.query.filter(

+             Compose.state == COMPOSE_STATES["wait"],

+             Compose.time_submitted < limit).order_by(

+                 Compose.id).limit(4).all()

+ 

+         for compose in composes:

+             log.info("%r: Going to regenerate compose stuck in 'wait' "

+                      "state.", compose)

+             self.generate_new_compose(compose)

+ 

      def generate_lost_composes(self):

          """

          Gets all the composes in "generating" state and continues with

@@ -114,3 +114,4 @@ 

              self.composer.generate_new_compose(compose)

          elif topic.endswith(conf.internal_messaging_topic):

              self.remove_expired_compose_thread.do_work()

+             self.composer.pickup_waiting_composes()

@@ -22,8 +22,9 @@ 

  

  import os

  import time

+ from datetime import datetime, timedelta

  

- from mock import patch, MagicMock

+ from mock import patch, MagicMock, call

  

  import odcs.server

  from odcs.server import db, app

@@ -354,3 +355,60 @@ 

          self.composer.currently_generating += [done.id, generating.id]

          self.composer.do_work()

          self.assertEqual(self.composer.currently_generating, [generating.id])

+ 

+ 

+ class TestComposerThreadStuckWaitComposes(ModelsBaseTest):

+     maxDiff = None

+ 

+     def setUp(self):

+         self.client = app.test_client()

+         super(TestComposerThreadStuckWaitComposes, self).setUp()

+         self.composer = ComposerThread()

+ 

+         self.patch_generate_new_compose = patch(

+             "odcs.server.backend.ComposerThread.generate_new_compose")

+         self.generate_new_compose = self.patch_generate_new_compose.start()

+ 

+     def tearDown(self):

+         super(TestComposerThreadStuckWaitComposes, self).tearDown()

+         self.patch_generate_new_compose.stop()

+ 

+     def _add_test_compose(self, state, time_submitted=None):

+         compose = Compose.create(

+             db.session, "unknown", PungiSourceType.KOJI_TAG, "f26",

+             COMPOSE_RESULTS["repository"], 60, "", 0)

+         compose.state = state

+         if time_submitted:

+             compose.time_submitted = time_submitted

+         db.session.add(compose)

+         db.session.commit()

+         return compose

+ 

+     def test_pickup_waiting_composes_generating_state(self):

+         time_submitted = datetime.utcnow() - timedelta(minutes=5)

+         composes = []

+         for i in range(10):

+             composes.append(self._add_test_compose(

+                 COMPOSE_STATES["wait"], time_submitted=time_submitted))

+         composes = sorted(composes, key=lambda c: c.id)

+         self.composer.pickup_waiting_composes()

+         self.generate_new_compose.assert_has_calls([

+             call(composes[0]), call(composes[1]), call(composes[2]),

+             call(composes[3])])

+ 

+     def test_pickup_waiting_composes_generating_state_not_old_enough(self):

+         composes = []

+         for i in range(10):

+             composes.append(self._add_test_compose(

+                 COMPOSE_STATES["wait"]))

+         composes = sorted(composes, key=lambda c: c.id)

+         self.composer.pickup_waiting_composes()

+         self.generate_new_compose.assert_not_called()

+ 

+     def test_generate_lost_composes_generating_state(self):

+         composes = []

+         for i in range(10):

+             composes.append(self._add_test_compose(COMPOSE_STATES["generating"]))

+         composes = sorted(composes, key=lambda c: c.id)

+         self.composer.pickup_waiting_composes()

+         self.generate_new_compose.assert_not_called()

@@ -106,7 +106,10 @@ 

              generate_new_compose.assert_not_called()

  

      @mock.patch("odcs.server.backend.RemoveExpiredComposesThread.do_work")

-     def test_consumer_processing_internal_cleaup(self, remove_expired):

+     @mock.patch("odcs.server.backend.ComposerThread.pickup_waiting_composes")

+     def test_consumer_processing_internal_cleaup(

+             self, pickup_waiting_composes, remove_expired):

          msg = self._internal_clean_composes_msg()

          self.consumer.consume(msg)

          remove_expired.assert_called_once()

+         pickup_waiting_composes.assert_called_once()

When fedmsg or UMB message from frontend to backend is lost for whatever reason, the compose can stay in "wait" state forever.

This PR addresses that by picking up composes which are in "wait" state for more than 3 minutes. This should be enough time, because the only thing which needs to happen in those 3 minutes is to transfer fedmsg or UMB message from frontend to backend.

ODCS does that on "internal cleanup" message which is sent to backends regularly and is currently use to remove expired composes.

Nice! This isn't a poller. This lets jenkins essentially be the poller for us.

Jenkins failed here on 9fd7015c. Does that matter?

:+1: based on code review only.

We rely on jenkins for removing the expired composes too.

The issue why it cannot be simple poller is that we have multiple backends and we need to ensure that there won't be two backends taking the same composes in the same time.

No other comment, but just a quick question. Why to limit 4?

rebased onto 99d090e

11 months ago

Pull-Request has been merged by jkaluza

11 months ago