#1513 Convert the poller to be Celery periodic tasks
Merged 4 years ago by cqi. Opened 4 years ago by cqi.
cqi/fm-orchestrator convert-producer-to-periodic-tasks  into  v3

@@ -20,11 +20,6 @@ 

  import sqlalchemy.exc

  

  import module_build_service.messaging

- import module_build_service.scheduler.handlers.repos

- import module_build_service.scheduler.handlers.components

- import module_build_service.scheduler.handlers.modules

- import module_build_service.scheduler.handlers.tags

- import module_build_service.scheduler.handlers.greenwave

  import module_build_service.monitor as monitor

  

  from module_build_service import models, log, conf
@@ -32,9 +27,42 @@ 

  from module_build_service.errors import IgnoreMessage

  from module_build_service.messaging import default_messaging_backend

  from module_build_service.scheduler import events

+ from module_build_service.scheduler.handlers import components

+ from module_build_service.scheduler.handlers import repos

+ from module_build_service.scheduler.handlers import modules

+ from module_build_service.scheduler.handlers import tags

  from module_build_service.scheduler.handlers import greenwave

  

  

+ def no_op_handler(*args, **kwargs):

+     return True

+ 

+ 

+ ON_BUILD_CHANGE_HANDLERS = {

+     koji.BUILD_STATES["BUILDING"]: no_op_handler,

+     koji.BUILD_STATES["COMPLETE"]: components.build_task_finalize,

+     koji.BUILD_STATES["FAILED"]: components.build_task_finalize,

+     koji.BUILD_STATES["CANCELED"]: components.build_task_finalize,

+     koji.BUILD_STATES["DELETED"]: no_op_handler,

+ }

+ 

+ ON_MODULE_CHANGE_HANDLERS = {

+     models.BUILD_STATES["init"]: modules.init,

+     models.BUILD_STATES["wait"]: modules.wait,

+     models.BUILD_STATES["build"]: no_op_handler,

+     models.BUILD_STATES["failed"]: modules.failed,

+     models.BUILD_STATES["done"]: modules.done,

+     # XXX: DIRECT TRANSITION TO READY

+     models.BUILD_STATES["ready"]: no_op_handler,

+     models.BUILD_STATES["garbage"]: no_op_handler,

+ }

+ 

+ # Only one kind of repo change event, though...

+ ON_REPO_CHANGE_HANDLER = repos.done

+ ON_TAG_CHANGE_HANDLER = tags.tagged

+ ON_DECISION_UPDATE_HANDLER = greenwave.decision_update

+ 

+ 

  class MBSConsumer(fedmsg.consumers.FedmsgConsumer):

      """ This is triggered by running fedmsg-hub. This class is responsible for

      ingesting and processing messages from the message bus.
@@ -83,32 +111,6 @@ 

              msg = module_build_service.messaging._initial_messages.pop(0)

              self.incoming.put(msg)

  

-         from module_build_service.scheduler import handlers

- 

-         # These are our main lookup tables for figuring out what to run in

-         # response to what messaging events.

-         self.NO_OP = NO_OP = lambda *args, **kwargs: True

-         self.on_build_change = {

-             koji.BUILD_STATES["BUILDING"]: NO_OP,

-             koji.BUILD_STATES["COMPLETE"]: handlers.components.build_task_finalize,

-             koji.BUILD_STATES["FAILED"]: handlers.components.build_task_finalize,

-             koji.BUILD_STATES["CANCELED"]: handlers.components.build_task_finalize,

-             koji.BUILD_STATES["DELETED"]: NO_OP,

-         }

-         self.on_module_change = {

-             models.BUILD_STATES["init"]: handlers.modules.init,

-             models.BUILD_STATES["wait"]: handlers.modules.wait,

-             models.BUILD_STATES["build"]: NO_OP,

-             models.BUILD_STATES["failed"]: handlers.modules.failed,

-             models.BUILD_STATES["done"]: handlers.modules.done,

-             # XXX: DIRECT TRANSITION TO READY

-             models.BUILD_STATES["ready"]: NO_OP,

-             models.BUILD_STATES["garbage"]: NO_OP,

-         }

-         # Only one kind of repo change event, though...

-         self.on_repo_change = handlers.repos.done

-         self.on_tag_change = handlers.tags.tagged

-         self.on_decision_update = handlers.greenwave.decision_update

          self.sanity_check()

  

      def shutdown(self):
@@ -180,10 +182,10 @@ 

          """ On startup, make sure our implementation is sane. """

          # Ensure we have every state covered

          for state in models.BUILD_STATES:

-             if models.BUILD_STATES[state] not in self.on_module_change:

+             if models.BUILD_STATES[state] not in ON_MODULE_CHANGE_HANDLERS:

                  raise KeyError("Module build states %r not handled." % state)

          for state in koji.BUILD_STATES:

-             if koji.BUILD_STATES[state] not in self.on_build_change:

+             if koji.BUILD_STATES[state] not in ON_BUILD_CHANGE_HANDLERS:

                  raise KeyError("Koji build states %r not handled." % state)

  

      def _map_message(self, db_session, event_info):
@@ -192,7 +194,7 @@ 

          event = event_info["event"]

  

          if event == events.KOJI_BUILD_CHANGE:

-             handler = self.on_build_change[event_info["build_new_state"]]

+             handler = ON_BUILD_CHANGE_HANDLERS[event_info["build_new_state"]]

              build = models.ComponentBuild.from_component_event(

                  db_session, event_info["task_id"], event_info["module_build_id"])

              if build:
@@ -201,13 +203,13 @@ 

  

          if event == events.KOJI_REPO_CHANGE:

              return (

-                 self.on_repo_change,

+                 ON_REPO_CHANGE_HANDLER,

                  models.ModuleBuild.get_by_tag(db_session, event_info["repo_tag"])

              )

  

          if event == events.KOJI_TAG_CHANGE:

              return (

-                 self.on_tag_change,

+                 ON_TAG_CHANGE_HANDLER,

                  models.ModuleBuild.get_by_tag(db_session, event_info["tag_name"])

              )

  
@@ -219,14 +221,13 @@ 

                      state, type(state), valid_module_build_states

                  ))

              return (

-                 self.on_module_change[state],

-                 models.ModuleBuild.get_by_id(

-                     db_session, event_info["module_build_id"])

+                 ON_MODULE_CHANGE_HANDLERS[state],

+                 models.ModuleBuild.get_by_id(db_session, event_info["module_build_id"])

              )

  

          if event == events.GREENWAVE_DECISION_UPDATE:

              return (

-                 self.on_decision_update,

+                 ON_DECISION_UPDATE_HANDLER,

                  greenwave.get_corresponding_module_build(event_info["subject_identifier"])

              )

  
@@ -243,7 +244,7 @@ 

          idx = "%s: %s, %s" % (

              handler.__name__, event_info["event"], event_info["msg_id"])

  

-         if handler is self.NO_OP:

+         if handler is no_op_handler:

              log.debug("Handler is NO_OP: %s", idx)

              return

  

@@ -1,501 +1,470 @@ 

  # -*- coding: utf-8 -*-

  # SPDX-License-Identifier: MIT

- """ The PollingProducer class that acts as a producer entry point for

- fedmsg-hub. This class polls the database for tasks to do.

- """

  

  import koji

  import operator

  from datetime import timedelta, datetime

- from sqlalchemy.orm import lazyload

- from moksha.hub.api.producer import PollingProducer

+ from sqlalchemy.orm import lazyload, load_only

  

  import module_build_service.messaging

  import module_build_service.scheduler

  import module_build_service.scheduler.consumer

- from module_build_service import conf, models, log

+ from module_build_service import celery_app, conf, models, log

  from module_build_service.builder import GenericBuilder

  from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder

  from module_build_service.utils.greenwave import greenwave

  from module_build_service.db_session import db_session

- from module_build_service.scheduler import events

- 

- 

- class MBSProducer(PollingProducer):

-     frequency = timedelta(seconds=conf.polling_interval)

- 

-     @events.mbs_event_handler()

-     def poll(self):

-         try:

-             self.log_summary()

-             self.process_waiting_module_builds()

-             self.process_open_component_builds()

-             self.fail_lost_builds()

-             self.process_paused_module_builds(conf)

-             self.retrigger_new_repo_on_failure(conf)

-             self.delete_old_koji_targets(conf)

-             self.cleanup_stale_failed_builds(conf)

-             self.sync_koji_build_tags(conf)

-             self.poll_greenwave(conf)

-         except Exception:

-             msg = "Error in poller execution:"

-             log.exception(msg)

- 

-         # Poller runs in its own thread. Database session can be removed safely.

-         db_session.remove()

- 

-         log.info('Poller will now sleep for "{}" seconds'.format(conf.polling_interval))

- 

-     def fail_lost_builds(self):

-         # This function is supposed to be handling only the part which can't be

-         # updated through messaging (e.g. srpm-build failures). Please keep it

-         # fit `n` slim. We do want rest to be processed elsewhere

-         # TODO re-use

- 

-         if conf.system == "koji":

-             # We don't do this on behalf of users

-             koji_session = KojiModuleBuilder.get_session(conf, login=False)

-             log.info("Querying tasks for statuses:")

-             res = (

-                 db_session.query(models.ComponentBuild)

-                 .filter_by(state=koji.BUILD_STATES["BUILDING"])

-                 .options(lazyload("module_build"))

-                 .all()

-             )

- 

-             log.info("Checking status for {0} tasks".format(len(res)))

-             for component_build in res:

-                 log.debug(component_build.json(db_session))

-                 # Don't check tasks which haven't been triggered yet

-                 if not component_build.task_id:

-                     continue

- 

-                 # Don't check tasks for components which have been reused,

-                 # they may have BUILDING state temporarily before we tag them

-                 # to new module tag. Checking them would be waste of resources.

-                 if component_build.reused_component_id:

-                     log.debug(

-                         'Skipping check for task "{0}", '

-                         'the component has been reused ("{1}").'.format(

-                             component_build.task_id, component_build.reused_component_id)

-                     )

-                     continue

- 

-                 task_id = component_build.task_id

- 

-                 log.info('Checking status of task_id "{0}"'.format(task_id))

-                 task_info = koji_session.getTaskInfo(task_id)

- 

-                 state_mapping = {

-                     # Cancelled and failed builds should be marked as failed.

-                     koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"],

-                     koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"],

-                     # Completed tasks should be marked as complete.

-                     koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"],

-                 }

- 

-                 # If it is a closed/completed task, then we can extract the NVR

-                 build_version, build_release = None, None  # defaults

-                 if task_info["state"] == koji.TASK_STATES["CLOSED"]:

-                     builds = koji_session.listBuilds(taskID=task_id)

-                     if not builds:

-                         log.warning(

-                             "Task ID %r is closed, but we found no builds in koji." % task_id)

-                     elif len(builds) > 1:

-                         log.warning(

-                             "Task ID %r is closed, but more than one build is present!" % task_id)

-                     else:

-                         build_version = builds[0]["version"]

-                         build_release = builds[0]["release"]

- 

-                 log.info("  task {0!r} is in state {1!r}".format(task_id, task_info["state"]))

-                 if task_info["state"] in state_mapping:

-                     # Fake a fedmsg message on our internal queue

-                     msg = {

-                         "msg_id": "producer::fail_lost_builds fake msg",

-                         "event": events.KOJI_BUILD_CHANGE,

-                         "build_id": component_build.task_id,

-                         "task_id": component_build.task_id,

-                         "build_new_state": state_mapping[task_info["state"]],

-                         "build_name": component_build.package,

-                         "build_release": build_release,

-                         "build_version": build_version,

-                         "module_build_id": None,

-                         "state_reason": None

-                     }

-                     module_build_service.scheduler.consumer.work_queue_put(msg)

- 

-         elif conf.system == "mock":

-             pass

- 

-     def cleanup_stale_failed_builds(self, conf):

-         """ Does various clean up tasks on stale failed module builds

-         :param conf: the MBS configuration object

-         :param db_session: a SQLAlchemy database session

-         """

-         if conf.system == "koji":

-             stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time)

-             stale_module_builds = (

-                 db_session.query(models.ModuleBuild)

-                 .filter(

-                     models.ModuleBuild.state == models.BUILD_STATES["failed"],

-                     models.ModuleBuild.time_modified <= stale_date,

-                 )

-                 .all()

-             )

-             if stale_module_builds:

-                 log.info(

-                     "{0} stale failed module build(s) will be cleaned up".format(

-                         len(stale_module_builds))

-                 )

-             for module in stale_module_builds:

-                 log.info("{0!r} is stale and is being cleaned up".format(module))

-                 # Find completed artifacts in the stale build

-                 artifacts = [c for c in module.component_builds if c.is_completed]

-                 # If there are no completed artifacts, then there is nothing to tag

-                 if artifacts:

-                     # Set buildroot_connect=False so it doesn't recreate the Koji target and etc.

-                     builder = GenericBuilder.create_from_module(

-                         db_session, module, conf, buildroot_connect=False

-                     )

-                     builder.untag_artifacts([c.nvr for c in artifacts])

-                     # Mark the artifacts as untagged in the database

-                     for c in artifacts:

-                         c.tagged = False

-                         c.tagged_in_final = False

-                         db_session.add(c)

-                 state_reason = (

-                     "The module was garbage collected since it has failed over {0}"

-                     " day(s) ago".format(conf.cleanup_failed_builds_time)

-                 )

-                 module.transition(

-                     db_session,

-                     conf,

-                     models.BUILD_STATES["garbage"],

-                     state_reason=state_reason,

-                     failure_type="user",

-                 )

-                 db_session.add(module)

-                 db_session.commit()

- 

-     def log_summary(self):

-         log.info("Current status:")

-         consumer = module_build_service.scheduler.consumer.get_global_consumer()

-         backlog = consumer.incoming.qsize()

-         log.info("  * internal queue backlog is {0}".format(backlog))

-         states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))

-         for name, code in states:

-             query = db_session.query(models.ModuleBuild).filter_by(state=code)

-             count = query.count()

-             if count:

-                 log.info("  * {0} module builds in the {1} state".format(count, name))

-             if name == "build":

-                 for module_build in query.all():

-                     log.info("    * {0!r}".format(module_build))

-                     # First batch is number '1'.

-                     for i in range(1, module_build.batch + 1):

-                         n = len([c for c in module_build.component_builds if c.batch == i])

-                         log.info("      * {0} components in batch {1}".format(n, i))

- 

-     def _nudge_module_builds_in_state(self, state_name, older_than_minutes):

-         """

-         Finds all the module builds in the `state` with `time_modified` older

-         than `older_than_minutes` and adds fake MBSModule message to the

-         work queue.

-         """

-         log.info("Looking for module builds stuck in the %s state", state_name)

-         builds = models.ModuleBuild.by_state(db_session, state_name)

-         log.info(" %r module builds in the %s state...", len(builds), state_name)

-         now = datetime.utcnow()

-         time_modified_threshold = timedelta(minutes=older_than_minutes)

-         for build in builds:

+ from module_build_service.scheduler.consumer import ON_MODULE_CHANGE_HANDLERS

+ from module_build_service.scheduler.handlers.components import build_task_finalize

+ from module_build_service.scheduler.handlers.tags import tagged

+ 

+ 

+ @celery_app.on_after_configure.connect

+ def setup_periodic_tasks(sender, **kwargs):

+     tasks = (

+         (log_summary, "Log summary of module builds and component builds"),

+         (process_waiting_module_builds, "Process waiting module builds"),

+         (fail_lost_builds, "Fail lost builds"),

+         (process_paused_module_builds, "Process paused module builds"),

+         (delete_old_koji_targets, "Delete old koji targets"),

+         (cleanup_stale_failed_builds, "Cleanup stale failed builds"),

+         (cancel_stuck_module_builds, "Cancel stuck module builds"),

+         (sync_koji_build_tags, "Sync Koji build tags"),

+         (poll_greenwave, "Gating module build to ready state"),

+     )

+ 

+     for task, name in tasks:

+         sender.add_periodic_task(conf.polling_interval, task.s(), name=name)

+ 

+ 

+ @celery_app.task

+ def log_summary():

+     states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))

+     for name, code in states:

+         query = db_session.query(models.ModuleBuild).filter_by(state=code)

+         count = query.count()

+         if count:

+             log.info("  * %s module builds in the %s state", count, name)

+         if name == "build":

+             for module_build in query.all():

+                 log.info("    * %r", module_build)

+                 # First batch is number '1'.

+                 for i in range(1, module_build.batch + 1):

+                     n = len([c for c in module_build.component_builds if c.batch == i])

+                     log.info("      * %s components in batch %s", n, i)

+ 

+ 

+ @celery_app.task

+ def process_waiting_module_builds():

+     for state in ["init", "wait"]:

+         nudge_module_builds_in_state(state, 10)

+ 

+ 

+ def nudge_module_builds_in_state(state_name, older_than_minutes):

+     """

+     Finds all the module builds in the `state` with `time_modified` older

+     than `older_than_minutes` and adds fake MBSModule message to the

+     work queue.

+     """

+     log.info("Looking for module builds stuck in the %s state", state_name)

+     builds = models.ModuleBuild.by_state(db_session, state_name)

+     log.info(" %r module builds in the %s state...", len(builds), state_name)

+     now = datetime.utcnow()

+     time_modified_threshold = timedelta(minutes=older_than_minutes)

+     for build in builds:

+ 

+         # Only give builds a nudge if stuck for more than ten minutes

+         if (now - build.time_modified) < time_modified_threshold:

+             continue

+ 

+         # Pretend the build is modified, so we don't tight spin.

+         build.time_modified = now

+         db_session.commit()

  

-             # Only give builds a nudge if stuck for more than ten minutes

-             if (now - build.time_modified) < time_modified_threshold:

-                 continue

+         # Fake a message to kickstart the build anew in the consumer

+         state = module_build_service.models.BUILD_STATES[state_name]

+         handler = ON_MODULE_CHANGE_HANDLERS[state]

+         handler.delay("internal:mbs.module.state.change", build.id, state)

  

-             # Pretend the build is modified, so we don't tight spin.

-             build.time_modified = now

  

-             # Fake a message to kickstart the build anew in the consumer

-             state = module_build_service.models.BUILD_STATES[state_name]

-             msg = {

-                 "msg_id": "nudge_module_builds_fake_message",

-                 "event": events.MBS_MODULE_STATE_CHANGE,

-                 "module_build_id": build.id,

-                 "module_build_state": state,

-             }

-             log.info("  Scheduling faked event %r", msg)

-             module_build_service.scheduler.consumer.work_queue_put(msg)

+ def process_open_component_builds():

+     log.warning("process_open_component_builds is not yet implemented...")

  

-         db_session.commit()

- 

-     def process_waiting_module_builds(self):

-         for state in ["init", "wait"]:

-             self._nudge_module_builds_in_state(state, 10)

  

-     def process_open_component_builds(self):

-         log.warning("process_open_component_builds is not yet implemented...")

+ @celery_app.task

+ def fail_lost_builds():

+     # This function is supposed to be handling only the part which can't be

+     # updated through messaging (e.g. srpm-build failures). Please keep it

+     # fit `n` slim. We do want rest to be processed elsewhere

+     # TODO re-use

  

-     def process_paused_module_builds(self, config):

-         log.info("Looking for paused module builds in the build state")

-         if module_build_service.utils.at_concurrent_component_threshold(config):

-             log.debug(

-                 "Will not attempt to start paused module builds due to "

-                 "the concurrent build threshold being met"

-             )

-             return

- 

-         ten_minutes = timedelta(minutes=10)

-         # Check for module builds that are in the build state but don't have any active component

-         # builds. Exclude module builds in batch 0. This is likely a build of a module without

-         # components.

-         module_builds = (

-             db_session.query(models.ModuleBuild)

-             .filter(

-                 models.ModuleBuild.state == models.BUILD_STATES["build"],

-                 models.ModuleBuild.batch > 0,

-             )

-             .all()

-         )

-         for module_build in module_builds:

-             now = datetime.utcnow()

-             # Only give builds a nudge if stuck for more than ten minutes

-             if (now - module_build.time_modified) < ten_minutes:

+     if conf.system == "koji":

+         # We don't do this on behalf of users

+         koji_session = KojiModuleBuilder.get_session(conf, login=False)

+         log.info("Querying tasks for statuses:")

+         res = db_session.query(models.ComponentBuild).filter_by(

+             state=koji.BUILD_STATES["BUILDING"]

+         ).options(lazyload("module_build")).all()

+ 

+         log.info("Checking status for %s tasks", len(res))

+         for component_build in res:

+             log.debug(component_build.json(db_session))

+             # Don't check tasks which haven't been triggered yet

+             if not component_build.task_id:

                  continue

-             # If there are no components in the build state on the module build,

-             # then no possible event will start off new component builds.

-             # But do not try to start new builds when we are waiting for the

-             # repo-regen.

-             if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]):

-                 # Initialize the builder...

-                 builder = GenericBuilder.create_from_module(db_session, module_build, config)

- 

-                 if _has_missed_new_repo_message(module_build, builder.koji_session):

-                     log.info("  Processing the paused module build %r", module_build)

-                     module_build_service.utils.start_next_batch_build(

-                         config, module_build, builder)

- 

-             # Check if we have met the threshold.

-             if module_build_service.utils.at_concurrent_component_threshold(config, db_session):

-                 break

  

-     def retrigger_new_repo_on_failure(self, config):

-         """

-         Retrigger failed new repo tasks for module builds in the build state.

+             # Don't check tasks for components which have been reused,

+             # they may have BUILDING state temporarily before we tag them

+             # to new module tag. Checking them would be waste of resources.

+             if component_build.reused_component_id:

+                 log.debug(

+                     'Skipping check for task "%s", the component has been reused ("%s").',

+                     component_build.task_id, component_build.reused_component_id

+                 )

+                 continue

  

-         The newRepo task may fail for various reasons outside the scope of MBS.

-         This method will detect this scenario and retrigger the newRepo task

-         if needed to avoid the module build from being stuck in the "build" state.

-         """

-         if config.system != "koji":

-             return

+             task_id = component_build.task_id

  

-         koji_session = module_build_service.builder.KojiModuleBuilder.KojiModuleBuilder.get_session(

-             config)

+             log.info('Checking status of task_id "%s"', task_id)

+             task_info = koji_session.getTaskInfo(task_id)

  

-         for module_build in (

-             db_session.query(models.ModuleBuild).filter_by(state=models.BUILD_STATES["build"]).all()

-         ):

-             if not module_build.new_repo_task_id:

-                 continue

+             state_mapping = {

+                 # Cancelled and failed builds should be marked as failed.

+                 koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"],

+                 koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"],

+                 # Completed tasks should be marked as complete.

+                 koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"],

+             }

  

-             task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)

-             if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]:

-                 log.info(

-                     "newRepo task %s for %r failed, starting another one",

-                     str(module_build.new_repo_task_id), module_build,

+             # If it is a closed/completed task, then we can extract the NVR

+             build_version, build_release = None, None  # defaults

+             if task_info["state"] == koji.TASK_STATES["CLOSED"]:

+                 builds = koji_session.listBuilds(taskID=task_id)

+                 if not builds:

+                     log.warning(

+                         "Task ID %r is closed, but we found no builds in koji.", task_id)

+                 elif len(builds) > 1:

+                     log.warning(

+                         "Task ID %r is closed, but more than one build is present!", task_id)

+                 else:

+                     build_version = builds[0]["version"]

+                     build_release = builds[0]["release"]

+ 

+             log.info("  task %r is in state %r", task_id, task_info["state"])

+             if task_info["state"] in state_mapping:

+                 build_task_finalize.delay(

+                     msg_id="producer::fail_lost_builds fake msg",

+                     build_id=component_build.task_id,

+                     task_id=component_build.task_id,

+                     build_new_state=state_mapping[task_info["state"]],

+                     build_name=component_build.package,

+                     build_release=build_release,

+                     build_version=build_version,

                  )

-                 taginfo = koji_session.getTag(module_build.koji_tag + "-build")

-                 module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"])

- 

-         db_session.commit()

  

-     def delete_old_koji_targets(self, config):

-         """

-         Deletes targets older than `config.koji_target_delete_time` seconds

-         from Koji to cleanup after the module builds.

-         """

-         if config.system != "koji":

-             return

+     elif conf.system == "mock":

+         pass

  

-         log.info("Looking for module builds which Koji target can be removed")

  

+ @celery_app.task

+ def process_paused_module_builds():

+     log.info("Looking for paused module builds in the build state")

+     if module_build_service.utils.at_concurrent_component_threshold(conf):

+         log.debug(

+             "Will not attempt to start paused module builds due to "

+             "the concurrent build threshold being met"

+         )

+         return

+ 

+     ten_minutes = timedelta(minutes=10)

+     # Check for module builds that are in the build state but don't have any active component

+     # builds. Exclude module builds in batch 0. This is likely a build of a module without

+     # components.

+     module_builds = db_session.query(models.ModuleBuild).filter(

+         models.ModuleBuild.state == models.BUILD_STATES["build"],

+         models.ModuleBuild.batch > 0,

+     ).all()

+     for module_build in module_builds:

          now = datetime.utcnow()

+         # Only give builds a nudge if stuck for more than ten minutes

+         if (now - module_build.time_modified) < ten_minutes:

+             continue

+         # If there are no components in the build state on the module build,

+         # then no possible event will start off new component builds.

+         # But do not try to start new builds when we are waiting for the

+         # repo-regen.

+         if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]):

+             # Initialize the builder...

+             builder = GenericBuilder.create_from_module(

+                 db_session, module_build, conf)

+ 

+             if has_missed_new_repo_message(module_build, builder.koji_session):

+                 log.info("  Processing the paused module build %r", module_build)

+                 module_build_service.utils.start_next_batch_build(

+                     conf, module_build, builder)

+ 

+         # Check if we have met the threshold.

+         if module_build_service.utils.at_concurrent_component_threshold(conf):

+             break

+ 

+ 

+ @celery_app.task

+ def retrigger_new_repo_on_failure():

+     """

+     Retrigger failed new repo tasks for module builds in the build state.

  

-         koji_session = KojiModuleBuilder.get_session(config)

-         for target in koji_session.getBuildTargets():

-             koji_tag = target["dest_tag_name"]

-             module = db_session.query(models.ModuleBuild).filter_by(koji_tag=koji_tag).first()

-             if (

-                 not module

-                 or module.name in conf.base_module_names

-                 or module.state in [

-                     models.BUILD_STATES["init"],

-                     models.BUILD_STATES["wait"],

-                     models.BUILD_STATES["build"],

-                 ]

-             ):

-                 continue

+     The newRepo task may fail for various reasons outside the scope of MBS.

+     This method will detect this scenario and retrigger the newRepo task

+     if needed to avoid the module build from being stuck in the "build" state.

+     """

+     if conf.system != "koji":

+         return

+ 

+     koji_session = KojiModuleBuilder.get_session(conf)

+     module_builds = db_session.query(models.ModuleBuild).filter(

+         models.ModuleBuild.state == models.BUILD_STATES["build"],

+         models.ModuleBuild.new_repo_task_id.isnot(None),

+     ).all()

+ 

+     for module_build in module_builds:

+         task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)

+         if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]:

+             log.info(

+                 "newRepo task %s for %r failed, starting another one",

+                 str(module_build.new_repo_task_id), module_build,

+             )

+             taginfo = koji_session.getTag(module_build.koji_tag + "-build")

+             module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"])

  

-             # Double-check that the target we are going to remove is prefixed

-             # by our prefix, so we won't remove f26 when there is some garbage

-             # in DB or Koji.

-             for allowed_prefix in config.koji_tag_prefixes:

-                 if target["name"].startswith(allowed_prefix + "-"):

-                     break

-             else:

-                 log.error("Module %r has Koji target with not allowed prefix.", module)

-                 continue

+     db_session.commit()

  

-             delta = now - module.time_completed

-             if delta.total_seconds() > config.koji_target_delete_time:

-                 log.info("Removing target of module %r", module)

-                 koji_session.deleteBuildTarget(target["id"])

- 

-     def cancel_stuck_module_builds(self, config):

-         """

-         Method transitions builds which are stuck in one state too long to the "failed" state.

-         The states are defined with the "cleanup_stuck_builds_states" config option and the

-         time is defined by the "cleanup_stuck_builds_time" config option.

-         """

-         log.info(

-             'Looking for module builds stuck in the states "{states}" more than {days} days'

-             .format(

-                 states=" and ".join(config.cleanup_stuck_builds_states),

-                 days=config.cleanup_stuck_builds_time,

-             )

-         )

  

-         delta = timedelta(days=config.cleanup_stuck_builds_time)

-         now = datetime.utcnow()

-         threshold = now - delta

-         states = [

-             module_build_service.models.BUILD_STATES[state]

-             for state in config.cleanup_stuck_builds_states

-         ]

- 

-         module_builds = (

-             db_session.query(models.ModuleBuild)

-             .filter(

-                 models.ModuleBuild.state.in_(states), models.ModuleBuild.time_modified < threshold

-             )

-             .all()

-         )

+ @celery_app.task

+ def delete_old_koji_targets():

+     """

+     Deletes targets older than `config.koji_target_delete_time` seconds

+     from Koji to cleanup after the module builds.

+     """

+     if conf.system != "koji":

+         return

+ 

+     log.info("Looking for module builds which Koji target can be removed")

+ 

+     now = datetime.utcnow()

+ 

+     koji_session = KojiModuleBuilder.get_session(conf)

+     for target in koji_session.getBuildTargets():

+         module = db_session.query(models.ModuleBuild).filter(

+             models.ModuleBuild.koji_tag == target["dest_tag_name"],

+             models.ModuleBuild.name.notin_(conf.base_module_names),

+             models.ModuleBuild.state.notin_([

+                 models.BUILD_STATES["init"],

+                 models.BUILD_STATES["wait"],

+                 models.BUILD_STATES["build"],

+             ]),

+         ).options(

+             load_only("time_completed"),

+         ).first()

+ 

+         if module is None:

+             continue

+ 

+         # Double-check that the target we are going to remove is prefixed

+         # by our prefix, so we won't remove f26 when there is some garbage

+         # in DB or Koji.

+         for allowed_prefix in conf.koji_tag_prefixes:

+             if target["name"].startswith(allowed_prefix + "-"):

+                 break

+         else:

+             log.error("Module %r has Koji target with not allowed prefix.", module)

+             continue

  

-         log.info(" {0!r} module builds are stuck...".format(len(module_builds)))

+         delta = now - module.time_completed

+         if delta.total_seconds() > conf.koji_target_delete_time:

+             log.info("Removing target of module %r", module)

+             koji_session.deleteBuildTarget(target["id"])

  

-         for build in module_builds:

-             nsvc = ":".join([build.name, build.stream, build.version, build.context])

-             log.info('Transitioning build "{nsvc}" to "Failed" state.'.format(nsvc=nsvc))

  

-             state_reason = "The module was in {state} for more than {days} days".format(

-                 state=build.state, days=config.cleanup_stuck_builds_time

-             )

-             build.transition(

-                 db_session,

-                 config,

-                 state=models.BUILD_STATES["failed"],

-                 state_reason=state_reason,

-                 failure_type="user",

+ @celery_app.task

+ def cleanup_stale_failed_builds():

+     """Does various clean up tasks on stale failed module builds"""

+ 

+     if conf.system != "koji":

+         return

+ 

+     stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time)

+     stale_module_builds = db_session.query(models.ModuleBuild).filter(

+         models.ModuleBuild.state == models.BUILD_STATES["failed"],

+         models.ModuleBuild.time_modified <= stale_date,

+     ).all()

+     if stale_module_builds:

+         log.info(

+             "%s stale failed module build(s) will be cleaned up",

+             len(stale_module_builds)

+         )

+     for module in stale_module_builds:

+         log.info("%r is stale and is being cleaned up", module)

+         # Find completed artifacts in the stale build

+         artifacts = [c for c in module.component_builds if c.is_completed]

+         # If there are no completed artifacts, then there is nothing to tag

+         if artifacts:

+             # Set buildroot_connect=False so it doesn't recreate the Koji target and etc.

+             builder = GenericBuilder.create_from_module(

+                 db_session, module, conf, buildroot_connect=False

              )

-             db_session.commit()

+             builder.untag_artifacts([c.nvr for c in artifacts])

+             # Mark the artifacts as untagged in the database

+             for c in artifacts:

+                 c.tagged = False

+                 c.tagged_in_final = False

+                 db_session.add(c)

+         state_reason = (

+             "The module was garbage collected since it has failed over {0}"

+             " day(s) ago".format(conf.cleanup_failed_builds_time)

+         )

+         module.transition(

+             db_session,

+             conf,

+             models.BUILD_STATES["garbage"],

+             state_reason=state_reason,

+             failure_type="user",

+         )

+         db_session.add(module)

+         db_session.commit()

  

-     def sync_koji_build_tags(self, config):

-         """

-         Method checking the "tagged" and "tagged_in_final" attributes of

-         "complete" ComponentBuilds in the current batch of module builds

-         in "building" state against the Koji.

  

-         In case the Koji shows the build as tagged/tagged_in_final,

-         fake "tagged" message is added to work queue.

-         """

-         if conf.system != "koji":

-             return

+ @celery_app.task

+ def cancel_stuck_module_builds():

+     """

+     Method transitions builds which are stuck in one state too long to the "failed" state.

+     The states are defined with the "cleanup_stuck_builds_states" config option and the

+     time is defined by the "cleanup_stuck_builds_time" config option.

+     """

+     log.info(

+         'Looking for module builds stuck in the states "%s" more than %s days',

+         " and ".join(conf.cleanup_stuck_builds_states),

+         conf.cleanup_stuck_builds_time,

+     )

+ 

+     threshold = datetime.utcnow() - timedelta(days=conf.cleanup_stuck_builds_time)

+     states = [

+         module_build_service.models.BUILD_STATES[state]

+         for state in conf.cleanup_stuck_builds_states

+     ]

+ 

+     module_builds = db_session.query(models.ModuleBuild).filter(

+         models.ModuleBuild.state.in_(states),

+         models.ModuleBuild.time_modified < threshold

+     ).all()

+ 

+     log.info(" %s module builds are stuck...", len(module_builds))

+ 

+     for build in module_builds:

+         log.info(

+             'Transitioning build "%s:%s:%s:%s" to "Failed" state.',

+             build.name, build.stream, build.version, build.context

+         )

+         state_reason = "The module was in {} for more than {} days".format(

+             build.state, conf.cleanup_stuck_builds_time

+         )

+         build.transition(

+             db_session,

+             conf,

+             state=models.BUILD_STATES["failed"],

+             state_reason=state_reason,

+             failure_type="user",

+         )

+         db_session.commit()

  

-         koji_session = KojiModuleBuilder.get_session(conf, login=False)

  

-         threshold = datetime.utcnow() - timedelta(minutes=10)

-         module_builds = db_session.query(models.ModuleBuild).filter(

-             models.ModuleBuild.time_modified < threshold,

-             models.ModuleBuild.state == models.BUILD_STATES["build"]

-         ).all()

-         for module_build in module_builds:

-             complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"])

-             for c in complete_components:

-                 # In case the component is tagged in the build tag and

-                 # also tagged in the final tag (or it is build_time_only

-                 # and therefore should not be tagged in final tag), skip it.

-                 if c.tagged and (c.tagged_in_final or c.build_time_only):

-                     continue

+ @celery_app.task

+ def sync_koji_build_tags():

+     """

+     Method checking the "tagged" and "tagged_in_final" attributes of

+     "complete" ComponentBuilds in the current batch of module builds

+     in "building" state against the Koji.

  

-                 log.info(

-                     "%r: Component %r is complete, but not tagged in the "

-                     "final and/or build tags.",

-                     module_build, c,

-                 )

+     In case the Koji shows the build as tagged/tagged_in_final,

+     fake "tagged" message is added to work queue.

+     """

+     if conf.system != "koji":

+         return

+ 

+     koji_session = KojiModuleBuilder.get_session(conf, login=False)

+ 

+     threshold = datetime.utcnow() - timedelta(minutes=10)

+     module_builds = db_session.query(models.ModuleBuild).filter(

+         models.ModuleBuild.time_modified < threshold,

+         models.ModuleBuild.state == models.BUILD_STATES["build"]

+     ).all()

+     for module_build in module_builds:

+         complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"])

+         for c in complete_components:

+             # In case the component is tagged in the build tag and

+             # also tagged in the final tag (or it is build_time_only

+             # and therefore should not be tagged in final tag), skip it.

+             if c.tagged and (c.tagged_in_final or c.build_time_only):

+                 continue

  

-                 # Check in which tags the component is tagged.

-                 tag_dicts = koji_session.listTags(c.nvr)

-                 tags = [tag_dict["name"] for tag_dict in tag_dicts]

- 

-                 # If it is tagged in final tag, but MBS does not think so,

-                 # schedule fake message.

-                 if not c.tagged_in_final and module_build.koji_tag in tags:

-                     msg = {

-                         "msg_id": "sync_koji_build_tags_fake_message",

-                         "event": events.KOJI_TAG_CHANGE,

-                         "tag_name": module_build.koji_tag,

-                         "build_name": c.package,

-                         "build_nvr": c.nvr,

-                     }

-                     log.info("  Scheduling faked event %r", msg)

-                     module_build_service.scheduler.consumer.work_queue_put(msg)

- 

-                 # If it is tagged in the build tag, but MBS does not think so,

-                 # schedule fake message.

-                 build_tag = module_build.koji_tag + "-build"

-                 if not c.tagged and build_tag in tags:

-                     msg = {

-                         "msg_id": "sync_koji_build_tags_fake_message",

-                         "event": events.KOJI_TAG_CHANGE,

-                         "tag_name": build_tag,

-                         "build_name": c.package,

-                         "build_nvr": c.nvr,

-                     }

-                     log.info("  Scheduling faked event %r", msg)

-                     module_build_service.scheduler.consumer.work_queue_put(msg)

- 

-     def poll_greenwave(self, config):

-         """

-         Polls Greenwave for all builds in done state

-         :param db_session: SQLAlchemy DB session

-         :return: None

-         """

-         if greenwave is None:

-             return

- 

-         module_builds = (

-             db_session.query(models.ModuleBuild)

-             .filter_by(state=models.BUILD_STATES["done"], scratch=False).all()

-         )

+             log.info(

+                 "%r: Component %r is complete, but not tagged in the "

+                 "final and/or build tags.",

+                 module_build, c,

+             )

  

-         log.info("Checking Greenwave for %d builds", len(module_builds))

+             # Check in which tags the component is tagged.

+             tag_dicts = koji_session.listTags(c.nvr)

+             tags = [tag_dict["name"] for tag_dict in tag_dicts]

  

-         for build in module_builds:

-             if greenwave.check_gating(build):

-                 build.transition(db_session, config, state=models.BUILD_STATES["ready"])

-             else:

-                 build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format(

-                     conf.polling_interval

-                 )

-                 if greenwave.error_occurred:

-                     build.state_reason += " (Error occured while querying Greenwave)"

-                 build.time_modified = datetime.utcnow()

-             db_session.commit()

+             # If it is tagged in final tag, but MBS does not think so,

+             # schedule fake message.

+             if not c.tagged_in_final and module_build.koji_tag in tags:

+                 log.info(

+                     "Apply tag %s to module build %r",

+                     module_build.koji_tag, module_build)

+                 tagged.delay(

+                     "internal:sync_koji_build_tags",

+                     module_build.koji_tag, c.package, c.nvr)

+ 

+             # If it is tagged in the build tag, but MBS does not think so,

+             # schedule fake message.

+             build_tag = module_build.koji_tag + "-build"

+             if not c.tagged and build_tag in tags:

+                 log.info(

+                     "Apply build tag %s to module build %r",

+                     build_tag, module_build)

+                 tagged.delay(

+                     "internal:sync_koji_build_tags",

+                     build_tag, c.package, c.nvr)

+ 

+ 

+ @celery_app.task

+ def poll_greenwave():

+     """Polls Greenwave for all builds in done state"""

+     if greenwave is None:

+         return

+ 

+     module_builds = db_session.query(models.ModuleBuild).filter_by(

+         state=models.BUILD_STATES["done"],

+         scratch=False

+     ).all()

+ 

+     log.info("Checking Greenwave for %d builds", len(module_builds))

+ 

+     for build in module_builds:

+         if greenwave.check_gating(build):

+             build.transition(db_session, conf, state=models.BUILD_STATES["ready"])

+         else:

+             build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format(

+                 conf.polling_interval

+             )

+             if greenwave.error_occurred:

+                 build.state_reason += " (Error occured while querying Greenwave)"

+             build.time_modified = datetime.utcnow()

+         db_session.commit()

  

  

- def _has_missed_new_repo_message(module_build, koji_session):

+ def has_missed_new_repo_message(module_build, koji_session):

      """

      Returns whether or not a new repo message has probably been missed.

      """
@@ -504,7 +473,8 @@ 

          # message so module build can recover.

          return True

      log.debug(

-         'Checking status of newRepo task "%d" for %s', module_build.new_repo_task_id, module_build)

+         'Checking status of newRepo task "%d" for %s',

+         module_build.new_repo_task_id, module_build)

      task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)

      # Other final states, FAILED and CANCELED, are handled by retrigger_new_repo_on_failure

      return task_info["state"] == koji.TASK_STATES["CLOSED"]

@@ -8,7 +8,6 @@ 

  from os import path

  

  import module_build_service.messaging

- import module_build_service.scheduler.handlers.repos  # noqa

  from module_build_service import models, conf, build_logs, Modulemd

  from module_build_service.db_session import db_session

  from module_build_service.utils.general import mmd_to_str

@@ -2,15 +2,13 @@ 

  # SPDX-License-Identifier: MIT

  import re

  import pytest

- from mock import patch

+ from mock import call, patch

  from module_build_service import models, conf

  from tests import clean_database, make_module_in_db

  import mock

  import koji

  from module_build_service.db_session import db_session

- from module_build_service.scheduler import events

- from module_build_service.scheduler.producer import MBSProducer

- import six.moves.queue as queue

+ from module_build_service.scheduler import producer

  from datetime import datetime, timedelta

  

  
@@ -19,7 +17,6 @@ 

      "module_build_service.builder.GenericBuilder.default_buildroot_groups",

      return_value={"build": [], "srpm-build": []},

  )

- @patch("module_build_service.scheduler.consumer.get_global_consumer")

  @patch("module_build_service.builder.GenericBuilder.create_from_module")

  class TestPoller:

      def setup_method(self, test_method):
@@ -40,15 +37,11 @@ 

      @pytest.mark.parametrize("fresh", [True, False])

      @patch("module_build_service.utils.batches.start_build_component")

      def test_process_paused_module_builds(

-         self, start_build_component, create_builder, global_consumer, dbg, fresh

+         self, start_build_component, create_builder, dbg, fresh

      ):

          """

          Tests general use-case of process_paused_module_builds.

          """

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          builder = mock.MagicMock()

          create_builder.return_value = builder

  
@@ -64,9 +57,7 @@ 

          db_session.commit()

  

          # Poll :)

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.poll()

+         producer.process_paused_module_builds()

  

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  
@@ -92,16 +83,12 @@ 

      ))

      @patch("module_build_service.utils.batches.start_build_component")

      def test_process_paused_module_builds_with_new_repo_task(

-         self, start_build_component, create_builder, global_consumer, dbg, task_state,

+         self, start_build_component, create_builder, dbg, task_state,

          expect_start_build_component

      ):

          """

          Tests general use-case of process_paused_module_builds.

          """

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          builder = mock.MagicMock()

          create_builder.return_value = builder

  
@@ -118,9 +105,7 @@ 

          db_session.commit()

  

          # Poll :)

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.poll()

+         producer.process_paused_module_builds()

  

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  
@@ -139,16 +124,10 @@ 

  

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

-     def test_retrigger_new_repo_on_failure(

-         self, ClientSession, create_builder, global_consumer, dbg

-     ):

+     def test_retrigger_new_repo_on_failure(self, ClientSession, create_builder, dbg):

          """

          Tests that we call koji_sesion.newRepo when newRepo task failed.

          """

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          koji_session = ClientSession.return_value

          koji_session.getTag = lambda tag_name: {"name": tag_name}

          koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["FAILED"]}
@@ -165,26 +144,18 @@ 

          module_build.new_repo_task_id = 123456

          db_session.commit()

  

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.poll()

+         producer.retrigger_new_repo_on_failure()

  

          koji_session.newRepo.assert_called_once_with(

              "module-testmodule-master-20170219191323-c40c156c-build")

  

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

-     def test_trigger_new_repo_when_succeeded(

-         self, ClientSession, create_builder, global_consumer, dbg

-     ):

+     def test_trigger_new_repo_when_succeeded(self, ClientSession, create_builder, dbg):

          """

          Tests that we do not call koji_sesion.newRepo when newRepo task

          succeeded.

          """

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          koji_session = ClientSession.return_value

          koji_session.getTag = lambda tag_name: {"name": tag_name}

          koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["CLOSED"]}
@@ -201,26 +172,18 @@ 

          module_build.new_repo_task_id = 123456

          db_session.commit()

  

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.poll()

+         producer.retrigger_new_repo_on_failure()

  

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  

          assert not koji_session.newRepo.called

          assert module_build.new_repo_task_id == 123456

  

-     def test_process_paused_module_builds_waiting_for_repo(

-         self, create_builder, global_consumer, dbg

-     ):

+     def test_process_paused_module_builds_waiting_for_repo(self, create_builder, dbg):

          """

          Tests that process_paused_module_builds does not start new batch

          when we are waiting for repo.

          """

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          builder = mock.MagicMock()

          create_builder.return_value = builder

  
@@ -232,9 +195,7 @@ 

          db_session.commit()

  

          # Poll :)

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.poll()

+         producer.process_paused_module_builds()

  

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  
@@ -246,12 +207,8 @@ 

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

      def test_old_build_targets_are_not_associated_with_any_module_builds(

-         self, ClientSession, create_builder, global_consumer, dbg

+         self, ClientSession, create_builder, dbg

      ):

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          koji_session = ClientSession.return_value

          # No created module build has any of these tags.

          koji_session.getBuildTargets.return_value = [
@@ -259,16 +216,14 @@ 

              {"dest_tag_name": "module-yyy-2"},

          ]

  

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

-         poller.delete_old_koji_targets(conf)

+         producer.delete_old_koji_targets()

  

          koji_session.deleteBuildTarget.assert_not_called()

  

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

      def test_dont_delete_base_module_build_target(

-         self, ClientSession, create_builder, global_consumer, dbg

+         self, ClientSession, create_builder, dbg

      ):

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  
@@ -276,24 +231,16 @@ 

          # No created module build has any of these tags.

          koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}]

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          # If module build's name is one of base module names, build target

          # should not be deleted.

          with patch.object(conf, "base_module_names", new=[module_build.name]):

- 

-             hub = mock.MagicMock()

-             poller = MBSProducer(hub)

-             poller.delete_old_koji_targets(conf)

- 

+             producer.delete_old_koji_targets()

              koji_session.deleteBuildTarget.assert_not_called()

  

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

      def test_dont_delete_build_target_for_unfinished_module_builds(

-         self, ClientSession, create_builder, global_consumer, dbg

+         self, ClientSession, create_builder, dbg

      ):

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

  
@@ -301,26 +248,20 @@ 

          # No created module build has any of these tags.

          koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}]

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          # Each time when a module build is in one of these state, build target

          # should not be deleted.

          for state in ["init", "wait", "build"]:

              module_build.state = state

              db_session.commit()

  

-             hub = mock.MagicMock()

-             poller = MBSProducer(hub)

-             poller.delete_old_koji_targets(conf)

+             producer.delete_old_koji_targets()

  

              koji_session.deleteBuildTarget.assert_not_called()

  

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

      def test_only_delete_build_target_with_allowed_koji_tag_prefix(

-         self, ClientSession, create_builder, global_consumer, dbg

+         self, ClientSession, create_builder, dbg

      ):

          module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)

          # Only module build 1's build target should be deleted.
@@ -343,15 +284,9 @@ 

              {"id": 2, "dest_tag_name": module_build_3.koji_tag, "name": module_build_3.koji_tag},

          ]

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          with patch.object(conf, "koji_tag_prefixes", new=["module", "another-prefix"]):

              with patch.object(conf, "koji_target_delete_time", new=60):

-                 hub = mock.MagicMock()

-                 poller = MBSProducer(hub)

-                 poller.delete_old_koji_targets(conf)

+                 producer.delete_old_koji_targets()

  

              koji_session.deleteBuildTarget.assert_called_once_with(1)

              koji_session.krb_login.assert_called_once()
@@ -359,7 +294,7 @@ 

      @patch.dict("sys.modules", krbV=mock.MagicMock())

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

      def test_cant_delete_build_target_if_not_reach_delete_time(

-         self, ClientSession, create_builder, global_consumer, dbg

+         self, ClientSession, create_builder, dbg

      ):

          module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)

          # Only module build 1's build target should be deleted.
@@ -377,31 +312,22 @@ 

              {"id": 1, "dest_tag_name": module_build_2.koji_tag, "name": module_build_2.koji_tag}

          ]

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

          with patch.object(conf, "koji_tag_prefixes", new=["module"]):

              # Use default koji_target_delete_time in config. That time is long

              # enough for test.

-             hub = mock.MagicMock()

-             poller = MBSProducer(hub)

-             poller.delete_old_koji_targets(conf)

+             producer.delete_old_koji_targets()

  

              koji_session.deleteBuildTarget.assert_not_called()

  

      @pytest.mark.parametrize("state", ["init", "wait"])

-     def test_process_waiting_module_build(

-         self, create_builder, global_consumer, dbg, state

-     ):

+     @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={

+         models.BUILD_STATES["init"]: mock.Mock(),

+         models.BUILD_STATES["wait"]: mock.Mock(),

+     })

+     def test_process_waiting_module_build(self, create_builder, dbg, state):

          """ Test that processing old waiting module builds works. """

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

+         handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]]

  

          # Change the batch to 2, so the module build is in state where

          # it is not building anything, but the state is "build".
@@ -413,32 +339,32 @@ 

          db_session.commit()

          db_session.refresh(module_build)

  

-         # Ensure the queue is empty before we start.

-         assert consumer.incoming.qsize() == 0

- 

          # Poll :)

-         poller.process_waiting_module_builds()

+         producer.process_waiting_module_builds()

  

-         assert consumer.incoming.qsize() == 1

+         handler.delay.assert_called_once_with(

+             "internal:mbs.module.state.change",

+             module_build.id,

+             module_build.state

+         )

  

          db_session.refresh(module_build)

          # ensure the time_modified was changed.

          assert module_build.time_modified > original

  

      @pytest.mark.parametrize("state", ["init", "wait"])

+     @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={

+         models.BUILD_STATES["init"]: mock.Mock(),

+         models.BUILD_STATES["wait"]: mock.Mock(),

+     })

      def test_process_waiting_module_build_not_old_enough(

-         self, create_builder, global_consumer, dbg, state

+         self, create_builder, dbg, state

      ):

          """ Test that we do not process young waiting builds. """

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

+         handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]]

  

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

- 

-         # Change the batch to 2, so the module build is in state where

+         # Change the batch to build, so the module build is in state where

          # it is not building anything, but the state is "build".

          module_build = models.ModuleBuild.get_by_id(db_session, 3)

          module_build.state = models.BUILD_STATES[state]
@@ -448,37 +374,25 @@ 

          db_session.commit()

          db_session.refresh(module_build)

  

-         # Ensure the queue is empty before we start.

-         assert consumer.incoming.qsize() == 0

- 

          # Poll :)

-         poller.process_waiting_module_builds()

+         producer.process_waiting_module_builds()

  

-         # Ensure we did *not* process the 9 minute-old build.

-         assert consumer.incoming.qsize() == 0

+         handler.assert_not_called()

  

-     def test_process_waiting_module_build_none_found(

-         self, create_builder, global_consumer, dbg

-     ):

+     @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={

+         models.BUILD_STATES["init"]: mock.Mock(),

+         models.BUILD_STATES["wait"]: mock.Mock(),

+     })

+     def test_process_waiting_module_build_none_found(self, create_builder, dbg):

          """ Test nothing happens when no module builds are waiting. """

- 

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

- 

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

- 

-         # Ensure the queue is empty before we start.

-         assert consumer.incoming.qsize() == 0

- 

          # Poll :)

-         poller.process_waiting_module_builds()

+         producer.process_waiting_module_builds()

  

          # Ensure we did *not* process any of the non-waiting builds.

-         assert consumer.incoming.qsize() == 0

+         for handler in producer.ON_MODULE_CHANGE_HANDLERS.values():

+             handler.assert_not_called()

  

-     def test_cleanup_stale_failed_builds(self, create_builder, global_consumer, dbg):

+     def test_cleanup_stale_failed_builds(self, create_builder, dbg):

          """ Test that one of the two module builds gets to the garbage state when running

          cleanup_stale_failed_builds.

          """
@@ -502,15 +416,8 @@ 

  

          db_session.commit()

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

+         producer.cleanup_stale_failed_builds()

  

-         # Ensure the queue is empty before we start

-         assert consumer.incoming.qsize() == 0

-         poller.cleanup_stale_failed_builds(conf)

          db_session.refresh(module_build_two)

          # Make sure module_build_one was transitioned to garbage

          assert module_build_one.state == models.BUILD_STATES["garbage"]
@@ -533,9 +440,7 @@ 

              "module-build-macros-0.1-1.module+0+d027b723",

          ])

  

-     def test_cleanup_stale_failed_builds_no_components(

-         self, create_builder, global_consumer, dbg

-     ):

+     def test_cleanup_stale_failed_builds_no_components(self, create_builder, dbg):

          """ Test that a module build without any components built gets to the garbage state when

          running cleanup_stale_failed_builds.

          """
@@ -555,15 +460,8 @@ 

  

          db_session.commit()

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

+         producer.cleanup_stale_failed_builds()

  

-         # Ensure the queue is empty before we start

-         assert consumer.incoming.qsize() == 0

-         poller.cleanup_stale_failed_builds(conf)

          db_session.refresh(module_build_two)

          # Make sure module_build_two was transitioned to garbage

          assert module_build_two.state == models.BUILD_STATES["garbage"]
@@ -580,9 +478,7 @@ 

      @pytest.mark.parametrize(

          "test_state", [models.BUILD_STATES[state] for state in conf.cleanup_stuck_builds_states]

      )

-     def test_cancel_stuck_module_builds(

-         self, create_builder, global_consumer, dbg, test_state

-     ):

+     def test_cancel_stuck_module_builds(self, create_builder, dbg, test_state):

  

          module_build1 = models.ModuleBuild.get_by_id(db_session, 1)

          module_build1.state = test_state
@@ -601,17 +497,9 @@ 

  

          db_session.commit()

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

- 

-         assert consumer.incoming.qsize() == 0

- 

-         poller.cancel_stuck_module_builds(conf)

+         producer.cancel_stuck_module_builds()

  

-         module = db_session.query(models.ModuleBuild).filter_by(state=4).all()

+         module = models.ModuleBuild.by_state(db_session, "failed")

          assert len(module) == 1

          assert module[0].id == 2

  
@@ -619,8 +507,10 @@ 

      @pytest.mark.parametrize("tagged_in_final", (True, False))

      @pytest.mark.parametrize("btime", (True, False))

      @patch("module_build_service.builder.KojiModuleBuilder.KojiClientSession")

+     @patch("module_build_service.scheduler.producer.tagged")

      def test_sync_koji_build_tags(

-         self, ClientSession, create_builder, global_consumer, dbg, tagged, tagged_in_final, btime

+         self, tagged_handler, ClientSession, create_builder, dbg,

+         tagged, tagged_in_final, btime

      ):

          module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)

          # Only module build 1's build target should be deleted.
@@ -639,48 +529,35 @@ 

  

          koji_session = ClientSession.return_value

          # No created module build has any of these tags.

-         ret = []

  

-         if btime:

-             if tagged:

-                 ret.append({"id": 1, "name": module_build_2.koji_tag + "-build"})

-             if tagged_in_final:

-                 ret.append({"id": 2, "name": module_build_2.koji_tag})

-         koji_session.listTags.return_value = ret

- 

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

- 

-         assert consumer.incoming.qsize() == 0

+         listtags_return_value = []

+         expected_tagged_calls = []

  

-         poller.sync_koji_build_tags(conf)

- 

-         assert consumer.incoming.qsize() == len(ret)

- 

-         expected_msg_tags = []

          if btime:

              if tagged:

-                 expected_msg_tags.append(module_build_2.koji_tag + "-build")

+                 listtags_return_value.append(

+                     {"id": 1, "name": module_build_2.koji_tag + "-build"})

+                 expected_tagged_calls.append(call(

+                     "internal:sync_koji_build_tags",

+                     module_build_2.koji_tag + "-build", c.package, c.nvr

+                 ))

              if tagged_in_final:

-                 expected_msg_tags.append(module_build_2.koji_tag)

+                 listtags_return_value.append(

+                     {"id": 2, "name": module_build_2.koji_tag})

+                 expected_tagged_calls.append(call(

+                     "internal:sync_koji_build_tags",

+                     module_build_2.koji_tag, c.package, c.nvr

+                 ))

+         koji_session.listTags.return_value = listtags_return_value

  

-         assert len(expected_msg_tags) == consumer.incoming.qsize()

+         producer.sync_koji_build_tags()

  

-         for i in range(consumer.incoming.qsize()):

-             msg = consumer.incoming.get()

-             assert events.KOJI_TAG_CHANGE == msg["event"]

-             assert c.package == msg["build_name"]

-             assert c.nvr == msg["build_nvr"]

-             assert msg["tag_name"] in expected_msg_tags

+         tagged_handler.delay.assert_has_calls(

+             expected_tagged_calls, any_order=True)

  

      @pytest.mark.parametrize("greenwave_result", [True, False])

      @patch("module_build_service.utils.greenwave.Greenwave.check_gating")

-     def test_poll_greenwave(

-         self, mock_gw, create_builder, global_consumer, dbg, greenwave_result

-     ):

+     def test_poll_greenwave(self, mock_gw, create_builder, dbg, greenwave_result):

  

          module_build1 = models.ModuleBuild.get_by_id(db_session, 1)

          module_build1.state = models.BUILD_STATES["ready"]
@@ -697,17 +574,9 @@ 

  

          db_session.commit()

  

-         consumer = mock.MagicMock()

-         consumer.incoming = queue.Queue()

-         global_consumer.return_value = consumer

-         hub = mock.MagicMock()

-         poller = MBSProducer(hub)

- 

-         assert consumer.incoming.qsize() == 0

- 

          mock_gw.return_value = greenwave_result

  

-         poller.poll_greenwave(conf)

+         producer.poll_greenwave()

  

          mock_gw.assert_called_once()

          modules = models.ModuleBuild.by_state(db_session, "ready")

@@ -26,7 +26,6 @@ 

  from module_build_service.models import ModuleBuild, BUILD_STATES, ComponentBuild

  from module_build_service import version

  import module_build_service.config as mbs_config

- import module_build_service.scheduler.handlers.modules

  import module_build_service.utils.submit

  from module_build_service.utils.general import (

      import_mmd, mmd_to_str, load_mmd,

Poller methods within original class MBSProducer become module level
functions and are registered as Celery periodic tasks.

Code logging the size of fedmsg-hub queue are removed from log_summary.

cancel_stuck_module_builds is removed which is not called anywhere.
Relative configs are removed as well.

process_open_component_builds is still kept there and not converted to a
periodic task.

There are some small refactor:

  • do not format string in logging method call.
  • reformat some lines of code doing SQLAlchemy database query to make
    them more readable.

Signed-off-by: Chenxiong Qi cqi@redhat.com

Build #592 failed (commit: 9cd9fcfde7a9d89ca1373ddf32524df283fd53d6).
Rebase or make new commits to rebuild.

Could you remove this please?

@cqi did any of the code change inside the poller functions? It's hard to tell in Pagure what actually changed.

@mprahl Just refactored a few lines of code of some functions, not all poller functions were changed. The original test purposes of relative tests were not changed.

@cqi the issue I see is that all handlers that are called from the poller are now called by the same Celery worker. This means that tasks for a particular module build can happen in parallel and the design document states that per module build, the tasks should be run serially to start.

One example is the nudge_module_builds_in_state function. This should actually schedule a Celery task and not run by the poller.

I think this PR will have to wait until after the handlers have been migrated to Celery tasks.

One example is the nudge_module_builds_in_state function. This should actually schedule a Celery task and not run by the poller.

@mprahl I'm sorry I don't get your point here. nudge_module_builds_in_state is called by process_waiting_module_builds which is registered as a periodic task. Do you mean the code should look like this?

@celery_app.task
def process_module_builds_in_init_state():
    nudge_module_builds_in_state("init", 10)

@celery_app.task
def process_module_builds_in_wait_state():
    nudge_module_builds_in_state("wait", 10)

One example is the nudge_module_builds_in_state function. This should actually schedule a Celery task and not run by the poller.

@mprahl I'm sorry I don't get your point here. nudge_module_builds_in_state is called by process_waiting_module_builds which is registered as a periodic task. Do you mean the code should look like this?
@celery_app.task
def process_module_builds_in_init_state():
nudge_module_builds_in_state("init", 10)

@celery_app.task
def process_module_builds_in_wait_state():
nudge_module_builds_in_state("wait", 10)

@cqi it looked like you aren't calling the handlers with delay is what I mean, so won't the handlers get executed in the thread?

@mprahl Oh, all the tasks are registered as periodic tasks in this function.

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    ...

@cqi, what I mean is the scheduled tasks call other handlers, and I think those handlers should have delay called on them.

For example:

 ON_MODULE_CHANGE_HANDLERS[state]("internal:mbs.module.state.change", build.id, state)

Should be:

 ON_MODULE_CHANGE_HANDLERS[state].delay("internal:mbs.module.state.change", build.id, state)

@mprahl

I see. The code does not call delay because when I submitted this PR, the event handlers haven't been converted to celery tasks yet at that time. The final version of this PR should be what you mentioned. :)

@mprahl
I see. The code does not call delay because when I submitted this PR, the event handlers haven't been converted to celery tasks yet at that time. The final version of this PR should be what you mentioned. :)

Okay great! Please let me know when the PR is ready.

rebased onto 776cef8ea0cfae2466d271789ba8ff620e47f4e1

4 years ago

Build #648 failed (commit: 776cef8ea0cfae2466d271789ba8ff620e47f4e1).
Rebase or make new commits to rebuild.

rebased onto 04e818dd8037a5605a75d5018a04d759ed909e7b

4 years ago

Build #650 failed (commit: 04e818dd8037a5605a75d5018a04d759ed909e7b).
Rebase or make new commits to rebuild.

I don't think this change was meant to be in this PR. Could you please remove it?

What happened to the cancel_stuck_module_builds method?

Optional: To match the rest of the code, the formatting should be:

if (
    not module or module.name in conf.base_module_names or module.state in [

This is based on the "black" autoformatting tool.

Why not use delay?

I thought this could be easy to pass the queue name. But, as we talked in the mail, we can use the Celery route method instead. I'll fix this with delay.

What happened to the cancel_stuck_module_builds method?

I have no idea about this. I searched through the project, no reference to this function.

Optional: To match the rest of the code, the formatting should be:

This if conditions are refactored into the SQL query.

@mprahl All the comments are addressed. PTAL. Thanks.

rebased onto 3f31529f024e9f7fbfc1087d805c1864e564ee73

4 years ago

Build #658 failed (commit: 3f31529f024e9f7fbfc1087d805c1864e564ee73).
Rebase or make new commits to rebuild.

What happened to the cancel_stuck_module_builds method?

I have no idea about this. I searched through the project, no reference to this function.

You're right... how strange. I see the commit here:
https://pagure.io/fm-orchestrator/c/5a3d3d99d3210b44d4b739a565754342087bdf38?branch=5a3d3d99d3210b44d4b739a565754342087bdf38

Having this was intentional, so if you can, could you please enable it rather than delete it?

@cqi, once the comments are addressed, feel free to merge this. This looks great!

Could use double quotes here please?

This string contains double quotes "%d".

rebased onto 940a2fb

4 years ago

Build #662 failed (commit: 940a2fb).
Rebase or make new commits to rebuild.

Pull-Request has been merged by cqi

4 years ago