#411 Fix #405 - Fix num_consecutive_builds.
Merged 7 years ago by jkaluza. Opened 7 years ago by jkaluza.
jkaluza/fm-orchestrator num_consecutive_builds  into  master

@@ -172,7 +172,7 @@ 

      db.create_all()

  

      username = getpass.getuser()

-     submit_module_build_from_scm(username, url, allow_local_url=True)

+     submit_module_build_from_scm(username, url, "master", allow_local_url=True)

  

      stop = module_build_service.scheduler.make_simple_stop_condition(db.session)

      initial_messages = [MBSModule("local module build", 1, 1)]

file modified
+73 -64
@@ -91,6 +91,25 @@ 

  

      return False

  

+ def start_build_component(builder, c):

+     """

+     Submits single component build to builder. Called in thread

+     by QueueBasedThreadPool in start_build_batch.

+     """

+     try:

+         c.task_id, c.state, c.state_reason, c.nvr = builder.build(

+             artifact_name=c.package, source=c.scmurl)

+     except Exception as e:

+         c.state = koji.BUILD_STATES['FAILED']

+         c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))

+         return

+ 

+     if not c.task_id and c.state == koji.BUILD_STATES['BUILDING']:

+         c.state = koji.BUILD_STATES['FAILED']

+         c.state_reason = ("Failed to build artifact %s: "

+             "Builder did not return task ID" % (c.package))

+         return

+ 

  def start_build_batch(config, module, session, builder, components=None):

      """

      Starts a round of the build cycle for a module.
@@ -142,76 +161,66 @@ 

      log.info("Starting build of next batch %d, %s" % (module.batch,

          unbuilt_components))

  

+     # Get the list of components to be build in this batch. We are not

+     # building all `unbuilt_components`, because we can a) meet

+     # the num_consecutive_builds threshold or b) reuse previous build.

      further_work = []

- 

-     def start_build_component(c):

-         """

-         Submits single component build to builder. Called in thread

-         by QueueBasedThreadPool later.

-         """

-         try:

-             with models.make_session(conf) as s:

-                 previous_component_build = None

-                 # Check to see if we can reuse a previous component build

-                 # instead of rebuilding it if the builder is Koji

-                 if conf.system == 'koji':

-                     previous_component_build = get_reusable_component(

-                         s, module, c.package)

-                 # If a component build can't be reused, we need to check

-                 # the concurrent threshold

-                 if not previous_component_build and \

-                         at_concurrent_component_threshold(config, s):

-                     log.info('Concurrent build threshold met')

-                     return

- 

-             if previous_component_build:

-                 log.info(

-                     'Reusing component "{0}" from a previous module '

-                     'build with the nvr "{1}"'.format(

-                         c.package, previous_component_build.nvr))

-                 c.reused_component_id = previous_component_build.id

-                 c.task_id = previous_component_build.task_id

-                 c.state = previous_component_build.state

-                 c.reused_component_id = previous_component_build.id

-                 c.task_id = previous_component_build.task_id

-                 c.state = previous_component_build.state

-                 c.state_reason = \

-                     'Reused component from previous module build'

-                 c.nvr = previous_component_build.nvr

-                 nvr_dict = kobo.rpmlib.parse_nvr(c.nvr)

-                 # Add this message to further_work so that the reused

-                 # component will be tagged properly

-                 further_work.append(

-                     module_build_service.messaging.KojiBuildChange(

-                         msg_id='start_build_batch: fake msg',

-                         build_id=None,

-                         task_id=c.task_id,

-                         build_new_state=c.state,

-                         build_name=c.package,

-                         build_version=nvr_dict['version'],

-                         build_release=nvr_dict['release'],

-                         module_build_id=c.module_id,

-                         state_reason=c.state_reason

-                     )

+     components_to_build = []

+     for c in unbuilt_components:

+         previous_component_build = None

+         # Check to see if we can reuse a previous component build

+         # instead of rebuilding it if the builder is Koji

+         if conf.system == 'koji':

+             previous_component_build = get_reusable_component(

+                 session, module, c.package)

+         # If a component build can't be reused, we need to check

+         # the concurrent threshold

+         if not previous_component_build and \

+                 at_concurrent_component_threshold(config, session):

+             log.info('Concurrent build threshold met')

+             break

+ 

+         if previous_component_build:

+             log.info(

+                 'Reusing component "{0}" from a previous module '

+                 'build with the nvr "{1}"'.format(

+                     c.package, previous_component_build.nvr))

+             c.reused_component_id = previous_component_build.id

+             c.task_id = previous_component_build.task_id

+             c.state = previous_component_build.state

+             c.reused_component_id = previous_component_build.id

+             c.task_id = previous_component_build.task_id

+             c.state = previous_component_build.state

+             c.state_reason = \

+                 'Reused component from previous module build'

+             c.nvr = previous_component_build.nvr

+             nvr_dict = kobo.rpmlib.parse_nvr(c.nvr)

+             # Add this message to further_work so that the reused

+             # component will be tagged properly

+             further_work.append(

+                 module_build_service.messaging.KojiBuildChange(

+                     msg_id='start_build_batch: fake msg',

+                     build_id=None,

+                     task_id=c.task_id,

+                     build_new_state=c.state,

+                     build_name=c.package,

+                     build_version=nvr_dict['version'],

+                     build_release=nvr_dict['release'],

+                     module_build_id=c.module_id,

+                     state_reason=c.state_reason

                  )

-                 return

- 

-             c.task_id, c.state, c.state_reason, c.nvr = builder.build(

-                 artifact_name=c.package, source=c.scmurl)

-         except Exception as e:

-             c.state = koji.BUILD_STATES['FAILED']

-             c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))

-             return

+             )

+             continue

  

-         if not c.task_id and c.state == koji.BUILD_STATES['BUILDING']:

-             c.state = koji.BUILD_STATES['FAILED']

-             c.state_reason = ("Failed to build artifact %s: "

-                 "Builder did not return task ID" % (c.package))

-             return

+         # We set state to BUILDING here, because we are going to build the

+         # component anyway and at_concurrent_component_threshold() works

+         # by counting components in BUILDING state.

+         c.state = koji.BUILD_STATES['BUILDING']

+         components_to_build.append(c)

  

      # Start build of components in this batch.

      with concurrent.futures.ThreadPoolExecutor(max_workers=config.num_consecutive_builds) as executor:

-         futures = {executor.submit(start_build_component, c): c for c in unbuilt_components}

+         futures = {executor.submit(start_build_component, builder, c): c for c in components_to_build}

          concurrent.futures.wait(futures)

  

      # If all components in this batch are already done, it can mean that they

@@ -31,6 +31,7 @@ 

  

  import module_build_service.messaging

  import module_build_service.scheduler.handlers.repos

+ import module_build_service.utils

  from module_build_service import db, models, conf

  

  from mock import patch
@@ -208,7 +209,7 @@ 

          GenericBuilder.register_backend_class(TestModuleBuilder)

          self.client = app.test_client()

          self._prev_system = conf.system

-         self._prev_num_consencutive_builds = conf.num_consecutive_builds

+         self._prev_num_consecutive_builds = conf.num_consecutive_builds

  

          conf.set_item("system", "mock")

  
@@ -222,7 +223,7 @@ 

  

      def tearDown(self):

          conf.set_item("system", self._prev_system)

-         conf.set_item("num_consecutive_builds", self._prev_num_consencutive_builds)

+         conf.set_item("num_consecutive_builds", self._prev_num_consecutive_builds)

          TestModuleBuilder.reset()

  

          # Necessary to restart the twisted reactor for the next test.
@@ -434,12 +435,24 @@ 

          data = json.loads(rv.data)

          module_build_id = data['id']

  

+         def stop(message):

+             """

+             Stop the scheduler when the module is built or when we try to build

+             more components than the num_consecutive_builds.

+             """

+             main_stop = module_build_service.scheduler.make_simple_stop_condition(db.session)

+             over_threshold = conf.num_consecutive_builds < \

+                 db.session.query(models.ComponentBuild).filter_by(

+                 state=koji.BUILD_STATES['BUILDING']).count()

+             return main_stop(message) or over_threshold

+ 

          msgs = []

-         stop = module_build_service.scheduler.make_simple_stop_condition(db.session)

          module_build_service.scheduler.main(msgs, stop)

  

          # All components should be built and module itself should be in "done"

          # or "ready" state.

          for build in models.ComponentBuild.query.filter_by(module_id=module_build_id).all():

              self.assertEqual(build.state, koji.BUILD_STATES['COMPLETE'])

+             # When this fails, it can mean that num_consecutive_builds

+             # threshold has been met.

              self.assertTrue(build.module_build.state in [models.BUILD_STATES["done"], models.BUILD_STATES["ready"]] )

The root of the problem is following: We used to create per-thread SQLAlchemy session when submitting builds in threads, but thanks to SQLAlchemy session cache, the changes to ComponentBuild.state has not been propagated to database and to other threads. This meant that at_concurrent_component_threshold (which works by checking the database) was returning invalid data, because the database was not in consistent state.

The fix is to move the at_concurrent_component_threshold logic as well as code for "reuse previous component build" out of the thread and keep just builder.build() there.

This PR also extends test_submit_build_concurrent_threshold to actually fail if there are more than num_consecutive_builds submitted - previously it just checked if the build succeeds with num_consecutive_builds set to low number.

Pull-Request has been merged by jkaluza

7 years ago