#1488 Changes for FACTORY-5226 in stages and work on removing argument msg will continue based on this PR
Closed 4 years ago by cqi. Opened 4 years ago by cqi.
cqi/fm-orchestrator refactor-multiple-backend-workers  into  v3

@@ -29,6 +29,7 @@ 

  

  from module_build_service.builder.base import GenericBuilder

  from module_build_service.builder.KojiContentGenerator import KojiContentGenerator

+ from module_build_service.scheduler import events

  from module_build_service.utils import get_reusable_components, get_reusable_module, set_locale

  

  logging.basicConfig(level=logging.DEBUG)
@@ -741,7 +742,7 @@ 

          nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)

          # Trigger a completed build message

          further_work.append(

-             module_build_service.messaging.KojiBuildChange(

+             events.KojiBuildChange(

                  "recover_orphaned_artifact: fake message",

                  build["build_id"],

                  build["task_id"],
@@ -772,7 +773,7 @@ 

                  "the tag handler".format(tag)

              )

              further_work.append(

-                 module_build_service.messaging.KojiTagChange(

+                 events.KojiTagChange(

                      "recover_orphaned_artifact: fake message",

                      tag,

                      component_build.package,

@@ -2,280 +2,11 @@ 

  # SPDX-License-Identifier: MIT

  """Generic messaging functions."""

  

- import re

  import pkg_resources

  

- try:

-     from inspect import signature

- except ImportError:

-     from funcsigs import signature

+ from module_build_service.scheduler.parser import FedmsgMessageParser

  

- from module_build_service import log

- 

- 

- class IgnoreMessage(Exception):

-     pass

- 

- 

- class BaseMessage(object):

-     def __init__(self, msg_id):

-         """

-         A base class to abstract messages from different backends

-         :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-         """

-         self.msg_id = msg_id

- 

-         # Moksha calls `consumer.validate` on messages that it receives, and

-         # even though we have validation turned off in the config there's still

-         # a step that tries to access `msg['body']`, `msg['topic']` and

-         # `msg.get('topic')`.

-         # These are here just so that the `validate` method won't raise an

-         # exception when we push our fake messages through.

-         # Note that, our fake message pushing has worked for a while... but the

-         # *latest* version of fedmsg has some code that exercises the bug.  I

-         # didn't hit this until I went to test in jenkins.

-         self.body = {}

-         self.topic = None

- 

-     def __repr__(self):

-         init_sig = signature(self.__init__)

- 

-         args_strs = (

-             "{}={!r}".format(name, getattr(self, name))

-             if param.default != param.empty

-             else repr(getattr(self, name))

-             for name, param in init_sig.parameters.items()

-         )

- 

-         return "{}({})".format(type(self).__name__, ", ".join(args_strs))

- 

-     def __getitem__(self, key):

-         """ Used to trick moksha into thinking we are a dict. """

-         return getattr(self, key)

- 

-     def __setitem__(self, key, value):

-         """ Used to trick moksha into thinking we are a dict. """

-         return setattr(self, key, value)

- 

-     def get(self, key, value=None):

-         """ Used to trick moksha into thinking we are a dict. """

-         return getattr(self, key, value)

- 

-     def __json__(self):

-         return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)

- 

- 

- class MessageParser(object):

-     def parse(self, msg):

-         raise NotImplementedError()

- 

- 

- class FedmsgMessageParser(MessageParser):

-     def parse(self, msg):

-         """

-         Takes a fedmsg topic and message and converts it to a message object

-         :param msg: the message contents from the fedmsg message

-         :return: an object of BaseMessage descent if the message is a type

-         that the app looks for, otherwise None is returned

-         """

-         if "body" in msg:

-             msg = msg["body"]

-         topic = msg["topic"]

-         topic_categories = _messaging_backends["fedmsg"]["services"]

-         categories_re = "|".join(map(re.escape, topic_categories))

-         regex_pattern = re.compile(

-             r"(?P<category>" + categories_re + r")"

-             r"(?:(?:\.)(?P<object>build|repo|module|decision))?"

-             r"(?:(?:\.)(?P<subobject>state|build))?"

-             r"(?:\.)(?P<event>change|done|end|tag|update)$"

-         )

-         regex_results = re.search(regex_pattern, topic)

- 

-         if regex_results:

-             category = regex_results.group("category")

-             object = regex_results.group("object")

-             subobject = regex_results.group("subobject")

-             event = regex_results.group("event")

- 

-             msg_id = msg.get("msg_id")

-             msg_inner_msg = msg.get("msg")

- 

-             # If there isn't a msg dict in msg then this message can be skipped

-             if not msg_inner_msg:

-                 log.debug(

-                     "Skipping message without any content with the " 'topic "{0}"'.format(topic))

-                 return None

- 

-             msg_obj = None

- 

-             # Ignore all messages from the secondary koji instances.

-             if category == "buildsys":

-                 instance = msg_inner_msg.get("instance", "primary")

-                 if instance != "primary":

-                     log.debug("Ignoring message from %r koji hub." % instance)

-                     return

- 

-             if (

-                 category == "buildsys"

-                 and object == "build"

-                 and subobject == "state"

-                 and event == "change"

-             ):

-                 build_id = msg_inner_msg.get("build_id")

-                 task_id = msg_inner_msg.get("task_id")

-                 build_new_state = msg_inner_msg.get("new")

-                 build_name = msg_inner_msg.get("name")

-                 build_version = msg_inner_msg.get("version")

-                 build_release = msg_inner_msg.get("release")

- 

-                 msg_obj = KojiBuildChange(

-                     msg_id,

-                     build_id,

-                     task_id,

-                     build_new_state,

-                     build_name,

-                     build_version,

-                     build_release,

-                 )

- 

-             elif (

-                 category == "buildsys"

-                 and object == "repo"

-                 and subobject is None

-                 and event == "done"

-             ):

-                 repo_tag = msg_inner_msg.get("tag")

-                 msg_obj = KojiRepoChange(msg_id, repo_tag)

- 

-             elif category == "buildsys" and event == "tag":

-                 tag = msg_inner_msg.get("tag")

-                 name = msg_inner_msg.get("name")

-                 version = msg_inner_msg.get("version")

-                 release = msg_inner_msg.get("release")

-                 nvr = None

-                 if name and version and release:

-                     nvr = "-".join((name, version, release))

-                 msg_obj = KojiTagChange(msg_id, tag, name, nvr)

- 

-             elif (

-                 category == "mbs"

-                 and object == "module"

-                 and subobject == "state"

-                 and event == "change"

-             ):

-                 msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state"))

- 

-             elif (

-                 category == "greenwave"

-                 and object == "decision"

-                 and subobject is None

-                 and event == "update"

-             ):

-                 msg_obj = GreenwaveDecisionUpdate(

-                     msg_id=msg_id,

-                     decision_context=msg_inner_msg.get("decision_context"),

-                     policies_satisfied=msg_inner_msg.get("policies_satisfied"),

-                     subject_identifier=msg_inner_msg.get("subject_identifier"),

-                 )

- 

-             # If the message matched the regex and is important to the app,

-             # it will be returned

-             if msg_obj:

-                 return msg_obj

- 

-         return None

- 

- 

- class KojiBuildChange(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a build's info (in fedmsg this replaces the msg dictionary)

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param build_id: the id of the build (e.g. 264382)

-     :param build_new_state: the new build state, this is currently a Koji

-     integer

-     :param build_name: the name of what is being built

-     (e.g. golang-googlecode-tools)

-     :param build_version: the version of the build (e.g. 6.06.06)

-     :param build_release: the release of the build (e.g. 4.fc25)

-     :param module_build_id: the optional id of the module_build in the database

-     :param state_reason: the optional reason as to why the state changed

-     """

- 

-     def __init__(

-         self,

-         msg_id,

-         build_id,

-         task_id,

-         build_new_state,

-         build_name,

-         build_version,

-         build_release,

-         module_build_id=None,

-         state_reason=None,

-     ):

-         if task_id is None:

-             raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")

-         super(KojiBuildChange, self).__init__(msg_id)

-         self.build_id = build_id

-         self.task_id = task_id

-         self.build_new_state = build_new_state

-         self.build_name = build_name

-         self.build_version = build_version

-         self.build_release = build_release

-         self.module_build_id = module_build_id

-         self.state_reason = state_reason

- 

- 

- class KojiTagChange(BaseMessage):

-     """

-     A class that inherits from BaseMessage to provide a message

-     object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)

-     :param tag: the name of tag (e.g. module-123456789-build)

-     :param artifact: the name of tagged artifact (e.g. module-build-macros)

-     :param nvr: the nvr of the tagged artifact

-     """

- 

-     def __init__(self, msg_id, tag, artifact, nvr):

-         super(KojiTagChange, self).__init__(msg_id)

-         self.tag = tag

-         self.artifact = artifact

-         self.nvr = nvr

- 

- 

- class KojiRepoChange(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a repo's info (in fedmsg this replaces the msg dictionary)

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)

-     """

- 

-     def __init__(self, msg_id, repo_tag):

-         super(KojiRepoChange, self).__init__(msg_id)

-         self.repo_tag = repo_tag

- 

- 

- class MBSModule(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a module event generated by module_build_service

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param module_build_id: the id of the module build

-     :param module_build_state: the state of the module build

-     """

- 

-     def __init__(self, msg_id, module_build_id, module_build_state):

-         super(MBSModule, self).__init__(msg_id)

-         self.module_build_id = module_build_id

-         self.module_build_state = module_build_state

- 

- 

- class GreenwaveDecisionUpdate(BaseMessage):

-     """A class representing message send to topic greenwave.decision.update"""

- 

-     def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):

-         super(GreenwaveDecisionUpdate, self).__init__(msg_id)

-         self.decision_context = decision_context

-         self.policies_satisfied = policies_satisfied

-         self.subject_identifier = subject_identifier

+ from module_build_service import conf, log

  

  

  def publish(topic, msg, conf, service):
@@ -331,7 +62,7 @@ 

      # Create fake fedmsg from the message so we can reuse

      # the BaseMessage.from_fedmsg code to get the particular BaseMessage

      # class instance.

-     wrapped_msg = FedmsgMessageParser().parse({

+     wrapped_msg = FedmsgMessageParser(known_fedmsg_services).parse({

          "msg_id": str(_in_memory_msg_id),

          "topic": service + "." + topic,

          "msg": msg
@@ -352,16 +83,19 @@ 

          _initial_messages.append(wrapped_msg)

  

  

+ known_fedmsg_services = ["buildsys", "mbs", "greenwave"]

+ 

+ 

  _fedmsg_backend = {

      "publish": _fedmsg_publish,

-     "services": ["buildsys", "mbs", "greenwave"],

-     "parser": FedmsgMessageParser(),

+     "parser": FedmsgMessageParser(known_fedmsg_services),

+     "services": known_fedmsg_services,

      "topic_suffix": ".",

  }

  _in_memory_backend = {

      "publish": _in_memory_publish,

+     "parser": FedmsgMessageParser(known_fedmsg_services),  # re-used.  :)

      "services": [],

-     "parser": FedmsgMessageParser(),  # re-used.  :)

      "topic_suffix": ".",

  }

  
@@ -375,3 +109,7 @@ 

  

  if not _messaging_backends:

      raise ValueError("No messaging plugins are installed or available.")

+ 

+ # After loading registered messaging backends, the default messaging backend

+ # can be determined by configured messaging backend.

+ default_messaging_backend = _messaging_backends[conf.messaging]

@@ -21,6 +21,7 @@ 

  import module_build_service.messaging

  from module_build_service import db, log, get_url_for, conf

  from module_build_service.errors import UnprocessableEntity

+ from module_build_service.scheduler import events

  

  DEFAULT_MODULE_CONTEXT = "00000000"

  
@@ -600,7 +601,7 @@ 

  

      @classmethod

      def from_module_event(cls, db_session, event):

-         if type(event) == module_build_service.messaging.MBSModule:

+         if type(event) == events.MBSModule:

              return db_session.query(cls).filter(cls.id == event.module_build_id).first()

          else:

              raise ValueError("%r is not a module message." % type(event).__name__)
@@ -1244,7 +1245,7 @@ 

  

      @classmethod

      def from_component_event(cls, db_session, event):

-         if isinstance(event, module_build_service.messaging.KojiBuildChange):

+         if isinstance(event, events.KojiBuildChange):

              if event.module_build_id:

                  return (

                      db_session.query(cls)

@@ -6,7 +6,6 @@ 

  import moksha.hub

  

  import module_build_service.models

- import module_build_service.scheduler.consumer

  

  import logging

  
@@ -30,6 +29,12 @@ 

      config["zmq_enabled"] = True

      config["zmq_subscribe_endpoints"] = "ipc:///dev/null"

  

+     # Lazy import consumer to avoid potential import cycle.

+     # For example, in some cases, importing event message from events.py would

+     # cause importing the consumer module, which then starts to import relative

+     # code inside handlers module, and the original code is imported eventually.

+     import module_build_service.scheduler.consumer

+ 

      consumers = [module_build_service.scheduler.consumer.MBSConsumer]

  

      # Note that the hub we kick off here cannot send any message.  You

@@ -30,8 +30,10 @@ 

  import module_build_service.monitor as monitor

  

  from module_build_service import models, log, conf

+ from module_build_service.messaging import default_messaging_backend

  from module_build_service.scheduler.handlers import greenwave

  from module_build_service.utils import module_build_state_from_msg

+ from module_build_service.scheduler import events

  

  

  class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
@@ -49,10 +51,9 @@ 

      def __init__(self, hub):

          # Topic setting needs to be done *before* the call to `super`.

  

-         backends = module_build_service.messaging._messaging_backends

          prefixes = conf.messaging_topic_prefix  # This is a list.

-         services = backends[conf.messaging]["services"]

-         suffix = backends[conf.messaging]["topic_suffix"]

+         services = default_messaging_backend["services"]

+         suffix = default_messaging_backend["topic_suffix"]

          self.topic = [

              "{}.{}{}".format(prefix.rstrip("."), category, suffix)

              for prefix, category in itertools.product(prefixes, services)
@@ -85,7 +86,7 @@ 

  

          # These are our main lookup tables for figuring out what to run in

          # response to what messaging events.

-         self.NO_OP = NO_OP = lambda config, db_session, msg: True

+         self.NO_OP = NO_OP = lambda db_session, msg: True

          self.on_build_change = {

              koji.BUILD_STATES["BUILDING"]: NO_OP,

              koji.BUILD_STATES[
@@ -123,7 +124,7 @@ 

      def validate(self, message):

          if conf.messaging == "fedmsg":

              # If this is a faked internal message, don't bother.

-             if isinstance(message, module_build_service.messaging.BaseMessage):

+             if isinstance(message, events.BaseMessage):

                  log.info("Skipping crypto validation for %r" % message)

                  return

              # Otherwise, if it is a real message from the network, pass it
@@ -138,7 +139,7 @@ 

          # messages, then just use them as-is.  If they are not already

          # instances of our message abstraction base class, then first transform

          # them before proceeding.

-         if isinstance(message, module_build_service.messaging.BaseMessage):

+         if isinstance(message, events.BaseMessage):

              msg = message

          else:

              msg = self.get_abstracted_msg(message)
@@ -163,11 +164,11 @@ 

              self.shutdown()

  

      def get_abstracted_msg(self, message):

-         parser = module_build_service.messaging._messaging_backends[conf.messaging].get("parser")

+         parser = default_messaging_backend.get("parser")

          if parser:

              try:

                  return parser.parse(message)

-             except module_build_service.messaging.IgnoreMessage:

+             except events.IgnoreMessage:

                  pass

          else:

              raise ValueError("{0} backend does not define a message parser".format(conf.messaging))
@@ -184,7 +185,7 @@ 

  

          all_fns = list(self.on_build_change.items()) + list(self.on_module_change.items())

          for key, callback in all_fns:

-             expected = ["config", "db_session", "msg"]

+             expected = ["db_session", "msg"]

              if six.PY2:

                  argspec = inspect.getargspec(callback)[0]

              else:
@@ -196,32 +197,32 @@ 

      def _map_message(self, db_session, msg):

          """Map message to its corresponding event handler and module build"""

  

-         if isinstance(msg, module_build_service.messaging.KojiBuildChange):

+         if isinstance(msg, events.KojiBuildChange):

              handler = self.on_build_change[msg.build_new_state]

              build = models.ComponentBuild.from_component_event(db_session, msg)

              if build:

                  build = build.module_build

              return handler, build

  

-         if isinstance(msg, module_build_service.messaging.KojiRepoChange):

+         if isinstance(msg, events.KojiRepoChange):

              return (

                  self.on_repo_change,

                  models.ModuleBuild.from_repo_done_event(db_session, msg)

              )

  

-         if isinstance(msg, module_build_service.messaging.KojiTagChange):

+         if isinstance(msg, events.KojiTagChange):

              return (

                  self.on_tag_change,

                  models.ModuleBuild.from_tag_change_event(db_session, msg)

              )

  

-         if isinstance(msg, module_build_service.messaging.MBSModule):

+         if isinstance(msg, events.MBSModule):

              return (

                  self.on_module_change[module_build_state_from_msg(msg)],

                  models.ModuleBuild.from_module_event(db_session, msg)

              )

  

-         if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate):

+         if isinstance(msg, events.GreenwaveDecisionUpdate):

              return (

                  self.on_decision_update,

                  greenwave.get_corresponding_module_build(
@@ -253,7 +254,7 @@ 

          log.info("Calling %s", idx)

  

          try:

-             further_work = handler(conf, db_session, msg) or []

+             further_work = handler(db_session, msg) or []

          except Exception as e:

              log.exception()

              db_session.rollback()
@@ -304,6 +305,6 @@ 

  

  

  def fake_repo_done_message(tag_name):

-     msg = module_build_service.messaging.KojiRepoChange(

+     msg = events.KojiRepoChange(

          msg_id="a faked internal message", repo_tag=tag_name + "-build")

      work_queue_put(msg)

@@ -19,7 +19,7 @@ 

  from module_build_service.utils import retry

  

  

- def add_default_modules(db_session, mmd, arches):

+ def add_default_modules(db_session, mmd):

      """

      Add default modules as buildrequires to the input modulemd.

  
@@ -29,8 +29,6 @@ 

  

      :param db_session: a SQLAlchemy database session

      :param Modulemd.ModuleStream mmd: the modulemd of the module to add the module defaults to

-     :param list arches: the arches to limit the external repo queries to; this should be the arches

-         the module will be built with

      :raises RuntimeError: if the buildrequired base module isn't in the database or the default

          modules list can't be downloaded

      """

@@ -0,0 +1,151 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: MIT

+ 

+ try:

+     from inspect import signature

+ except ImportError:

+     from funcsigs import signature

+ 

+ 

+ class IgnoreMessage(Exception):

+     pass

+ 

+ 

+ class BaseMessage(object):

+     def __init__(self, msg_id):

+         """

+         A base class to abstract messages from different backends

+         :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

+         """

+         self.msg_id = msg_id

+ 

+         # Moksha calls `consumer.validate` on messages that it receives, and

+         # even though we have validation turned off in the config there's still

+         # a step that tries to access `msg['body']`, `msg['topic']` and

+         # `msg.get('topic')`.

+         # These are here just so that the `validate` method won't raise an

+         # exception when we push our fake messages through.

+         # Note that, our fake message pushing has worked for a while... but the

+         # *latest* version of fedmsg has some code that exercises the bug.  I

+         # didn't hit this until I went to test in jenkins.

+         self.body = {}

+         self.topic = None

+ 

+     def __repr__(self):

+         init_sig = signature(self.__init__)

+ 

+         args_strs = (

+             "{}={!r}".format(name, getattr(self, name))

+             if param.default != param.empty

+             else repr(getattr(self, name))

+             for name, param in init_sig.parameters.items()

+         )

+ 

+         return "{}({})".format(type(self).__name__, ", ".join(args_strs))

+ 

+     def __getitem__(self, key):

+         """ Used to trick moksha into thinking we are a dict. """

+         return getattr(self, key)

+ 

+     def __setitem__(self, key, value):

+         """ Used to trick moksha into thinking we are a dict. """

+         return setattr(self, key, value)

+ 

+     def get(self, key, value=None):

+         """ Used to trick moksha into thinking we are a dict. """

+         return getattr(self, key, value)

+ 

+     def __json__(self):

+         return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)

+ 

+ 

+ class KojiBuildChange(BaseMessage):

+     """ A class that inherits from BaseMessage to provide a message

+     object for a build's info (in fedmsg this replaces the msg dictionary)

+     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

+     :param build_id: the id of the build (e.g. 264382)

+     :param build_new_state: the new build state, this is currently a Koji

+     integer

+     :param build_name: the name of what is being built

+     (e.g. golang-googlecode-tools)

+     :param build_version: the version of the build (e.g. 6.06.06)

+     :param build_release: the release of the build (e.g. 4.fc25)

+     :param module_build_id: the optional id of the module_build in the database

+     :param state_reason: the optional reason as to why the state changed

+     """

+ 

+     def __init__(

+             self,

+             msg_id,

+             build_id,

+             task_id,

+             build_new_state,

+             build_name,

+             build_version,

+             build_release,

+             module_build_id=None,

+             state_reason=None,

+     ):

+         if task_id is None:

+             raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")

+         super(KojiBuildChange, self).__init__(msg_id)

+         self.build_id = build_id

+         self.task_id = task_id

+         self.build_new_state = build_new_state

+         self.build_name = build_name

+         self.build_version = build_version

+         self.build_release = build_release

+         self.module_build_id = module_build_id

+         self.state_reason = state_reason

+ 

+ 

+ class KojiTagChange(BaseMessage):

+     """

+     A class that inherits from BaseMessage to provide a message

+     object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)

+     :param tag: the name of tag (e.g. module-123456789-build)

+     :param artifact: the name of tagged artifact (e.g. module-build-macros)

+     :param nvr: the nvr of the tagged artifact

+     """

+ 

+     def __init__(self, msg_id, tag, artifact, nvr):

+         super(KojiTagChange, self).__init__(msg_id)

+         self.tag = tag

+         self.artifact = artifact

+         self.nvr = nvr

+ 

+ 

+ class KojiRepoChange(BaseMessage):

+     """ A class that inherits from BaseMessage to provide a message

+     object for a repo's info (in fedmsg this replaces the msg dictionary)

+     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

+     :param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)

+     """

+ 

+     def __init__(self, msg_id, repo_tag):

+         super(KojiRepoChange, self).__init__(msg_id)

+         self.repo_tag = repo_tag

+ 

+ 

+ class MBSModule(BaseMessage):

+     """ A class that inherits from BaseMessage to provide a message

+     object for a module event generated by module_build_service

+     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

+     :param module_build_id: the id of the module build

+     :param module_build_state: the state of the module build

+     """

+ 

+     def __init__(self, msg_id, module_build_id, module_build_state):

+         super(MBSModule, self).__init__(msg_id)

+         self.module_build_id = module_build_id

+         self.module_build_state = module_build_state

+ 

+ 

+ class GreenwaveDecisionUpdate(BaseMessage):

+     """A class representing message send to topic greenwave.decision.update"""

+ 

+     def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):

+         super(GreenwaveDecisionUpdate, self).__init__(msg_id)

+         self.decision_context = decision_context

+         self.policies_satisfied = policies_satisfied

+         self.subject_identifier = subject_identifier

@@ -6,14 +6,15 @@ 

  import koji

  import module_build_service.builder

  

+ from module_build_service import conf, models, log

  from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder

+ from module_build_service.scheduler import events

  from module_build_service.utils.general import mmd_to_str

- from module_build_service import models, log, messaging

  

  logging.basicConfig(level=logging.DEBUG)

  

  

- def _finalize(config, db_session, msg, state):

+ def _finalize(db_session, msg, state):

      """ Called whenever a koji build completes or fails. """

  

      # First, find our ModuleBuild associated with this component, if any.
@@ -48,7 +49,7 @@ 

      if component_build.package == "module-build-macros" and state != koji.BUILD_STATES["COMPLETE"]:

          parent.transition(

              db_session,

-             config,

+             conf,

              state=models.BUILD_STATES["failed"],

              state_reason=state_reason,

              failure_type="user",
@@ -58,10 +59,10 @@ 

  

      if (

          component_build.buildonly

-         and config.system in ["koji", "test"]

+         and conf.system in ["koji", "test"]

          and state == koji.BUILD_STATES["COMPLETE"]

      ):

-         koji_session = KojiModuleBuilder.get_session(config)

+         koji_session = KojiModuleBuilder.get_session(conf)

          rpms = koji_session.listBuildRPMs(component_build.nvr)

          mmd = parent.mmd()

          for artifact in rpms:
@@ -84,7 +85,7 @@ 

          built_components_in_batch = [c for c in parent_current_batch if c.is_completed]

  

          builder = module_build_service.builder.GenericBuilder.create_from_module(

-             db_session, parent, config

+             db_session, parent, conf

          )

  

          if failed_components_in_batch:
@@ -96,7 +97,7 @@ 

                  ", ".join(c.package for c in failed_components_in_batch))

              parent.transition(

                  db_session,

-                 config,

+                 conf,

                  state=models.BUILD_STATES["failed"],

                  state_reason=state_reason,

                  failure_type="user",
@@ -109,7 +110,7 @@ 

              # change message here.

              log.info("Batch done. No component to tag")

              further_work += [

-                 messaging.KojiRepoChange(

+                 events.KojiRepoChange(

                      "components::_finalize: fake msg", builder.module_build_tag["name"])

              ]

          else:
@@ -147,20 +148,20 @@ 

          # build, try to call continue_batch_build again so in case we hit the

          # threshold previously, we will submit another build from this batch.

          builder = module_build_service.builder.GenericBuilder.create_from_module(

-             db_session, parent, config)

+             db_session, parent, conf)

          further_work += module_build_service.utils.continue_batch_build(

-             config, parent, db_session, builder)

+             conf, parent, db_session, builder)

  

      return further_work

  

  

- def complete(config, db_session, msg):

-     return _finalize(config, db_session, msg, state=koji.BUILD_STATES["COMPLETE"])

+ def complete(db_session, msg):

+     return _finalize(db_session, msg, state=koji.BUILD_STATES["COMPLETE"])

  

  

- def failed(config, db_session, msg):

-     return _finalize(config, db_session, msg, state=koji.BUILD_STATES["FAILED"])

+ def failed(db_session, msg):

+     return _finalize(db_session, msg, state=koji.BUILD_STATES["FAILED"])

  

  

- def canceled(config, db_session, msg):

-     return _finalize(config, db_session, msg, state=koji.BUILD_STATES["CANCELED"])

+ def canceled(db_session, msg):

+     return _finalize(db_session, msg, state=koji.BUILD_STATES["CANCELED"])

@@ -31,7 +31,7 @@ 

      return ModuleBuild.get_by_id(db_session, module_build_id)

  

  

- def decision_update(config, db_session, msg):

+ def decision_update(db_session, msg):

      """Move module build to ready or failed according to Greenwave result

  

      :param config: the config object returned from function :func:`init_config`,
@@ -42,7 +42,7 @@ 

          ``greenwave.decision.update``.

      :type msg: :class:`GreenwaveDecisionUpdate`

      """

-     if not config.greenwave_decision_context:

+     if not conf.greenwave_decision_context:

          log.debug(

              "Skip Greenwave message %s as MBS does not have GREENWAVE_DECISION_CONTEXT "

              "configured",
@@ -50,12 +50,12 @@ 

          )

          return

  

-     if msg.decision_context != config.greenwave_decision_context:

+     if msg.decision_context != conf.greenwave_decision_context:

          log.debug(

              "Skip Greenwave message %s as MBS only handles messages with the "

              'decision context "%s"',

              msg.msg_id,

-             config.greenwave_decision_context,

+             conf.greenwave_decision_context,

          )

          return

  

@@ -6,7 +6,6 @@ 

  import module_build_service.builder

  import module_build_service.resolver

  import module_build_service.utils

- import module_build_service.messaging

  from module_build_service.utils import (

      attempt_to_reuse_all_components,

      record_component_builds,
@@ -16,6 +15,7 @@ 

      record_module_build_arches

  )

  from module_build_service.errors import UnprocessableEntity, Forbidden, ValidationError

+ from module_build_service.scheduler import events

  from module_build_service.utils.greenwave import greenwave

  from module_build_service.scheduler.default_modules import (

      add_default_modules, handle_collisions_with_base_module_rpms)
@@ -39,7 +39,7 @@ 

      return os.path.basename(srpm_path).replace(".src.rpm", "")

  

  

- def failed(config, db_session, msg):

+ def failed(db_session, msg):

      """

      Called whenever a module enters the 'failed' state.

  
@@ -59,7 +59,7 @@ 

  

      if build.koji_tag:

          builder = module_build_service.builder.GenericBuilder.create_from_module(

-             db_session, build, config)

+             db_session, build, conf)

  

          if build.new_repo_task_id:

              builder.cancel_build(build.new_repo_task_id)
@@ -79,7 +79,7 @@ 

              reason = "Missing koji tag. Assuming previously failed module lookup."

              log.error(reason)

              build.transition(

-                 db_session, config,

+                 db_session, conf,

                  state=models.BUILD_STATES["failed"],

                  state_reason=reason, failure_type="infra")

              db_session.commit()
@@ -88,7 +88,7 @@ 

      # Don't transition it again if it's already been transitioned

      if build.state != models.BUILD_STATES["failed"]:

          build.transition(

-             db_session, config, state=models.BUILD_STATES["failed"], failure_type="user")

+             db_session, conf, state=models.BUILD_STATES["failed"], failure_type="user")

  

      db_session.commit()

  
@@ -96,7 +96,7 @@ 

      module_build_service.builder.GenericBuilder.clear_cache(build)

  

  

- def done(config, db_session, msg):

+ def done(db_session, msg):

      """Called whenever a module enters the 'done' state.

  

      We currently don't do anything useful, so moving to ready.
@@ -116,7 +116,7 @@ 

      # Scratch builds stay in 'done' state

      if not build.scratch:

          if greenwave is None or greenwave.check_gating(build):

-             build.transition(db_session, config, state=models.BUILD_STATES["ready"])

+             build.transition(db_session, conf, state=models.BUILD_STATES["ready"])

          else:

              build.state_reason = "Gating failed"

              if greenwave.error_occurred:
@@ -128,7 +128,7 @@ 

      module_build_service.builder.GenericBuilder.clear_cache(build)

  

  

- def init(config, db_session, msg):

+ def init(db_session, msg):

      """ Called whenever a module enters the 'init' state."""

      # Sleep for a few seconds to make sure the module in the database is committed

      # TODO: Remove this once messaging is implemented in SQLAlchemy hooks
@@ -156,7 +156,7 @@ 

          mmd = build.mmd()

          record_module_build_arches(mmd, build, db_session)

          arches = [arch.name for arch in build.arches]

-         defaults_added = add_default_modules(db_session, mmd, arches)

+         defaults_added = add_default_modules(db_session, mmd)

  

          # Format the modulemd by putting in defaults and replacing streams that

          # are branches with commit hashes
@@ -296,7 +296,7 @@ 

          return conf.koji_cg_default_build_tag

  

  

- def wait(config, db_session, msg):

+ def wait(db_session, msg):

      """ Called whenever a module enters the 'wait' state.

  

      We transition to this state shortly after a modulebuild is first requested.
@@ -336,7 +336,7 @@ 

          reason = "Failed to get module info from MBS. Max retries reached."

          log.exception(reason)

          build.transition(

-             db_session, config,

+             db_session, conf,

              state=models.BUILD_STATES["failed"],

              state_reason=reason, failure_type="infra")

          db_session.commit()
@@ -365,7 +365,7 @@ 

          log.debug("Skip to assign Content Generator build koji tag to module build.")

  

      builder = module_build_service.builder.GenericBuilder.create_from_module(

-         db_session, build, config)

+         db_session, build, conf)

  

      log.debug(

          "Adding dependencies %s into buildroot for module %s:%s:%s",
@@ -375,13 +375,13 @@ 

  

      if not build.component_builds:

          log.info("There are no components in module %r, skipping build" % build)

-         build.transition(db_session, config, state=models.BUILD_STATES["build"])

+         build.transition(db_session, conf, state=models.BUILD_STATES["build"])

          db_session.add(build)

          db_session.commit()

          # Return a KojiRepoChange message so that the build can be transitioned to done

          # in the repos handler

          return [

-             module_build_service.messaging.KojiRepoChange(

+             events.KojiRepoChange(

                  "handlers.modules.wait: fake msg", builder.module_build_tag["name"])

          ]

  
@@ -389,7 +389,7 @@ 

      # module-build-macros, because there won't be any build done.

      if attempt_to_reuse_all_components(builder, db_session, build):

          log.info("All components have been reused for module %r, skipping build" % build)

-         build.transition(db_session, config, state=models.BUILD_STATES["build"])

+         build.transition(db_session, conf, state=models.BUILD_STATES["build"])

          db_session.add(build)

          db_session.commit()

          return []
@@ -445,19 +445,19 @@ 

              component_build.nvr = nvr

  

      db_session.add(component_build)

-     build.transition(db_session, config, state=models.BUILD_STATES["build"])

+     build.transition(db_session, conf, state=models.BUILD_STATES["build"])

      db_session.add(build)

      db_session.commit()

  

      # We always have to regenerate the repository.

-     if config.system == "koji":

+     if conf.system == "koji":

          log.info("Regenerating the repository")

          task_id = builder.koji_session.newRepo(builder.module_build_tag["name"])

          build.new_repo_task_id = task_id

          db_session.commit()

      else:

          further_work.append(

-             module_build_service.messaging.KojiRepoChange(

+             events.KojiRepoChange(

                  "fake msg", builder.module_build_tag["name"])

          )

      return further_work

@@ -5,18 +5,18 @@ 

  import module_build_service.builder

  import logging

  from datetime import datetime

- from module_build_service import models, log

+ from module_build_service import conf, models, log

  from module_build_service.utils import start_next_batch_build

  

  logging.basicConfig(level=logging.DEBUG)

  

  

- def done(config, db_session, msg):

+ def done(db_session, msg):

      """ Called whenever koji rebuilds a repo, any repo. """

  

      # First, find our ModuleBuild associated with this repo, if any.

      tag = msg.repo_tag

-     if config.system in ("koji", "test") and not tag.endswith("-build"):

+     if conf.system in ("koji", "test") and not tag.endswith("-build"):

          log.debug("Tag %r does not end with '-build' suffix, ignoring" % tag)

          return

      tag = tag[:-6] if tag.endswith("-build") else tag
@@ -40,7 +40,7 @@ 

  

      # Get the list of untagged components in current/previous batches which

      # have been built successfully

-     if config.system in ("koji", "test") and current_batch:

+     if conf.system in ("koji", "test") and current_batch:

          if any(c.is_completed and not c.is_tagged for c in module_build.up_to_current_batch()):

              log.info("Ignoring repo regen, because not all components are tagged.")

              return
@@ -69,7 +69,7 @@ 

          state_reason = "Component(s) {} failed to build.".format(

              ", ".join(c.package for c in current_batch if c.is_unsuccessful))

          module_build.transition(

-             db_session, config, models.BUILD_STATES["failed"], state_reason, failure_type="infra")

+             db_session, conf, models.BUILD_STATES["failed"], state_reason, failure_type="infra")

          db_session.commit()

          log.warning("Odd!  All components in batch failed for %r." % module_build)

          return
@@ -81,8 +81,8 @@ 

          db_session,

          module_build.owner,

          module_build,

-         config.system,

-         config,

+         conf.system,

+         conf,

          tag_name=tag,

          components=[c.package for c in module_build.component_builds],

      )
@@ -111,7 +111,7 @@ 

  

          # Try to start next batch build, because there are still unbuilt

          # components in a module.

-         further_work += start_next_batch_build(config, module_build, db_session, builder)

+         further_work += start_next_batch_build(conf, module_build, db_session, builder)

  

      else:

          if has_failed_components:
@@ -122,7 +122,7 @@ 

              )

              module_build.transition(

                  db_session,

-                 config,

+                 conf,

                  state=models.BUILD_STATES["failed"],

                  state_reason=state_reason,

                  failure_type="user",
@@ -132,7 +132,7 @@ 

              module_build.time_completed = datetime.utcnow()

              builder.finalize(succeeded=True)

  

-             module_build.transition(db_session, config, state=models.BUILD_STATES["done"])

+             module_build.transition(db_session, conf, state=models.BUILD_STATES["done"])

          db_session.commit()

  

      return further_work

@@ -5,14 +5,15 @@ 

  import module_build_service.builder

  import logging

  import koji

- from module_build_service import models, log, messaging

+ from module_build_service import conf, models, log

+ from module_build_service.scheduler import events

  

  logging.basicConfig(level=logging.DEBUG)

  

  

- def tagged(config, db_session, msg):

+ def tagged(db_session, msg):

      """ Called whenever koji tags a build to tag. """

-     if config.system not in ("koji", "test"):

+     if conf.system not in ("koji", "test"):

          return []

  

      # Find our ModuleBuild associated with this tagged artifact.
@@ -49,7 +50,7 @@ 

      # If all components are tagged, start newRepo task.

      if not any(c.is_completed and not c.is_tagged for c in module_build.up_to_current_batch()):

          builder = module_build_service.builder.GenericBuilder.create_from_module(

-             db_session, module_build, config)

+             db_session, module_build, conf)

  

          if any(c.is_unbuilt for c in module_build.component_builds):

              if not _is_new_repo_generating(module_build, builder.koji_session):
@@ -69,7 +70,7 @@ 

              log.info(

                  "All components in module tagged and built, skipping the last repo regeneration")

              further_work += [

-                 messaging.KojiRepoChange(

+                 events.KojiRepoChange(

                      "components::_finalize: fake msg", builder.module_build_tag["name"])

              ]

          db_session.commit()

@@ -0,0 +1,118 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: MIT

+ 

+ import re

+ 

+ from module_build_service import log

+ from module_build_service.scheduler import events

+ 

+ 

+ class MessageParser(object):

+     """Base class for parsing messages received from a specific message bus

+ 

+     :param topic_categories: list of known services, that MBS can handle the

+         messages sent from them. For example, a value could be

+         ``["buildsys", "mbs", "greenwave"]``.

+     :type topic_categories: list[str]

+     """

+ 

+     def __init__(self, topic_categories):

+         self.topic_categories = topic_categories

+ 

+     def parse(self, msg):

+         raise NotImplementedError()

+ 

+ 

+ class FedmsgMessageParser(MessageParser):

+ 

+     def parse(self, msg):

+         """

+         Parse a received message and convert it to a consistent format

+ 

+         :param dict msg: the message contents from the message bus.

+         :return: a mapping representing the corresponding event.

+             If the topic isn't recognized, None is returned.

+         :rtype: dict or None

+         """

+ 

+         if "body" in msg:

+             msg = msg["body"]

+         topic = msg["topic"]

+         categories_re = "|".join(map(re.escape, self.topic_categories))

+         regex_pattern = re.compile(

+             r"(?P<category>" + categories_re + r")"

+             r"(?:(?:\.)(?P<object>build|repo|module|decision))?"

+             r"(?:(?:\.)(?P<subobject>state|build))?"

+             r"(?:\.)(?P<event>change|done|end|tag|update)$"

+         )

+         regex_results = re.search(regex_pattern, topic)

+ 

+         if regex_results:

+             category = regex_results.group("category")

+             object = regex_results.group("object")

+             subobject = regex_results.group("subobject")

+             event = regex_results.group("event")

+ 

+             msg_id = msg.get("msg_id")

+             msg_inner_msg = msg.get("msg")

+ 

+             # If there isn't a msg dict in msg then this message can be skipped

+             if not msg_inner_msg:

+                 log.debug(

+                     "Skipping message without any content with the " 'topic "{0}"'.format(topic))

+                 return None

+ 

+             # Ignore all messages from the secondary koji instances.

+             if category == "buildsys":

+                 instance = msg_inner_msg.get("instance", "primary")

+                 if instance != "primary":

+                     log.debug("Ignoring message from %r koji hub." % instance)

+                     return

+ 

+                 if object == "build" and subobject == "state" and event == "change":

+                     build_id = msg_inner_msg.get("build_id")

+                     task_id = msg_inner_msg.get("task_id")

+                     build_new_state = msg_inner_msg.get("new")

+                     build_name = msg_inner_msg.get("name")

+                     build_version = msg_inner_msg.get("version")

+                     build_release = msg_inner_msg.get("release")

+ 

+                     return events.KojiBuildChange(

+                         msg_id,

+                         build_id,

+                         task_id,

+                         build_new_state,

+                         build_name,

+                         build_version,

+                         build_release,

+                     )

+ 

+                 if object == "repo" and subobject is None and event == "done":

+                     repo_tag = msg_inner_msg.get("tag")

+                     return events.KojiRepoChange(msg_id, repo_tag)

+ 

+                 if event == "tag":

+                     tag = msg_inner_msg.get("tag")

+                     name = msg_inner_msg.get("name")

+                     version = msg_inner_msg.get("version")

+                     release = msg_inner_msg.get("release")

+                     nvr = None

+                     if name and version and release:

+                         nvr = "-".join((name, version, release))

+                     return events.KojiTagChange(msg_id, tag, name, nvr)

+ 

+             if (category == "mbs"

+                     and object == "module" and subobject == "state" and event == "change"):

+                 return events.MBSModule(

+                     msg_id,

+                     msg_inner_msg.get("id"),

+                     msg_inner_msg.get("state"))

+ 

+             if (category == "greenwave"

+                     and object == "decision" and subobject is None and event == "update"):

+                 return events.GreenwaveDecisionUpdate(

+                     msg_id=msg_id,

+                     decision_context=msg_inner_msg.get("decision_context"),

+                     policies_satisfied=msg_inner_msg.get("policies_satisfied"),

+                     subject_identifier=msg_inner_msg.get("subject_identifier"),

+                 )

@@ -16,6 +16,7 @@ 

  from module_build_service import conf, models, log

  from module_build_service.builder import GenericBuilder

  from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder

+ from module_build_service.scheduler import events

  from module_build_service.utils.greenwave import greenwave

  

  
@@ -106,7 +107,7 @@ 

                  log.info("  task {0!r} is in state {1!r}".format(task_id, task_info["state"]))

                  if task_info["state"] in state_mapping:

                      # Fake a fedmsg message on our internal queue

-                     msg = module_build_service.messaging.KojiBuildChange(

+                     msg = events.KojiBuildChange(

                          msg_id="producer::fail_lost_builds fake msg",

                          build_id=component_build.task_id,

                          task_id=component_build.task_id,
@@ -211,7 +212,7 @@ 

  

              # Fake a message to kickstart the build anew in the consumer

              state = module_build_service.models.BUILD_STATES[state_name]

-             msg = module_build_service.messaging.MBSModule(

+             msg = events.MBSModule(

                  "nudge_module_builds_fake_message", build.id, state)

              log.info("  Scheduling faked event %r" % msg)

              module_build_service.scheduler.consumer.work_queue_put(msg)
@@ -433,7 +434,7 @@ 

                  # If it is tagged in final tag, but MBS does not think so,

                  # schedule fake message.

                  if not c.tagged_in_final and module_build.koji_tag in tags:

-                     msg = module_build_service.messaging.KojiTagChange(

+                     msg = events.KojiTagChange(

                          "sync_koji_build_tags_fake_message", module_build.koji_tag, c.package, c.nvr

                      )

                      log.info("  Scheduling faked event %r" % msg)
@@ -443,7 +444,7 @@ 

                  # schedule fake message.

                  build_tag = module_build.koji_tag + "-build"

                  if not c.tagged and build_tag in tags:

-                     msg = module_build_service.messaging.KojiTagChange(

+                     msg = events.KojiTagChange(

                          "sync_koji_build_tags_fake_message", build_tag, c.package, c.nvr)

                      log.info("  Scheduling faked event %r" % msg)

                      module_build_service.scheduler.consumer.work_queue_put(msg)

@@ -4,7 +4,7 @@ 

  import concurrent.futures

  

  from module_build_service import conf, log, models

- import module_build_service.messaging

+ from module_build_service.scheduler.events import KojiRepoChange

  from .reuse import get_reusable_components, reuse_component

  

  
@@ -275,8 +275,7 @@ 

      # message and return

      if components_reused and not unbuilt_components_after_reuse:

          further_work.append(

-             module_build_service.messaging.KojiRepoChange(

-                 "start_build_batch: fake msg", builder.module_build_tag["name"])

+             KojiRepoChange("start_build_batch: fake msg", builder.module_build_tag["name"])

          )

          return further_work

  

@@ -2,10 +2,10 @@ 

  # SPDX-License-Identifier: MIT

  import kobo.rpmlib

  

- import module_build_service.messaging

  from module_build_service import log, models, conf

- from module_build_service.utils.mse import get_base_module_mmds

  from module_build_service.resolver import GenericResolver

+ from module_build_service.scheduler.events import KojiBuildChange

+ from module_build_service.utils.mse import get_base_module_mmds

  

  

  def reuse_component(component, previous_component_build, change_state_now=False):
@@ -40,7 +40,7 @@ 

      # Add this message to further_work so that the reused

      # component will be tagged properly

      return [

-         module_build_service.messaging.KojiBuildChange(

+         KojiBuildChange(

              msg_id="reuse_component: fake msg",

              build_id=None,

              task_id=component.task_id,

@@ -28,7 +28,7 @@ 

  

  from module_build_service.builder.base import GenericBuilder

  from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder

- from module_build_service.messaging import MBSModule

+ from module_build_service.scheduler import events

  from tests import (

      app, clean_database, read_staged_data, staged_data_filename

  )
@@ -231,7 +231,7 @@ 

          return {"name": self.tag_name + "-build"}

  

      def _send_repo_done(self):

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = events.KojiRepoChange(

              msg_id="a faked internal message", repo_tag=self.tag_name + "-build")

          module_build_service.scheduler.consumer.work_queue_put(msg)

  
@@ -240,14 +240,14 @@ 

              tag = self.tag_name

          else:

              tag = self.tag_name + "-build"

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = events.KojiTagChange(

              msg_id="a faked internal message", tag=tag, artifact=artifact, nvr=nvr)

          module_build_service.scheduler.consumer.work_queue_put(msg)

  

      def _send_build_change(self, state, name, build_id):

          # build_id=1 and task_id=1 are OK here, because we are building just

          # one RPM at the time.

-         msg = module_build_service.messaging.KojiBuildChange(

+         msg = events.KojiBuildChange(

              msg_id="a faked internal message",

              build_id=build_id,

              task_id=build_id,
@@ -298,7 +298,7 @@ 

              nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)

              # Send a message stating the build is complete

              msgs.append(

-                 module_build_service.messaging.KojiBuildChange(

+                 events.KojiBuildChange(

                      "recover_orphaned_artifact: fake message",

                      randint(1, 9999999),

                      component_build.task_id,
@@ -311,7 +311,7 @@ 

              )

              # Send a message stating that the build was tagged in the build tag

              msgs.append(

-                 module_build_service.messaging.KojiTagChange(

+                 events.KojiTagChange(

                      "recover_orphaned_artifact: fake message",

                      component_build.module_build.koji_tag + "-build",

                      component_build.package,
@@ -1085,7 +1085,9 @@ 

  

          # Create a dedicated database session for scheduler to avoid hang

          with models.make_db_session(conf) as scheduler_db_session:

-             self.run_scheduler(scheduler_db_session, msgs=[MBSModule("local module build", 3, 1)])

+             self.run_scheduler(

+                 scheduler_db_session,

+                 msgs=[events.MBSModule("local module build", 3, 1)])

  

          reused_component_ids = {

              "module-build-macros": None,
@@ -1163,7 +1165,9 @@ 

          FakeModuleBuilder.on_buildroot_add_artifacts_cb = on_buildroot_add_artifacts_cb

  

          with models.make_db_session(conf) as scheduler_db_session:

-             self.run_scheduler(scheduler_db_session, msgs=[MBSModule("local module build", 3, 1)])

+             self.run_scheduler(

+                 scheduler_db_session,

+                 msgs=[events.MBSModule("local module build", 3, 1)])

  

          # All components should be built and module itself should be in "done"

          # or "ready" state.
@@ -1731,7 +1735,7 @@ 

          cleanup_moksha()

          module = models.ModuleBuild.get_by_id(db_session, module_build_id)

          msgs = [

-             module_build_service.messaging.KojiRepoChange(

+             events.KojiRepoChange(

                  msg_id="a faked internal message", repo_tag=module.koji_tag + "-build"

              )

          ]

@@ -16,6 +16,7 @@ 

  import module_build_service.builder

  from module_build_service import Modulemd

  from module_build_service.utils.general import mmd_to_str

+ from module_build_service.scheduler import events

  

  import pytest

  from mock import patch, MagicMock
@@ -145,7 +146,7 @@ 

  

          actual = builder.recover_orphaned_artifact(component_build)

          assert len(actual) == 3

-         assert type(actual[0]) == module_build_service.messaging.KojiBuildChange

+         assert type(actual[0]) == events.KojiBuildChange

          assert actual[0].build_id == 91

          assert actual[0].task_id == 12345

          assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"]
@@ -153,10 +154,10 @@ 

          assert actual[0].build_version == "1.0"

          assert actual[0].build_release == "1.module+e0095747"

          assert actual[0].module_build_id == 4

-         assert type(actual[1]) == module_build_service.messaging.KojiTagChange

+         assert type(actual[1]) == events.KojiTagChange

          assert actual[1].tag == "module-foo-build"

          assert actual[1].artifact == "rubygem-rails"

-         assert type(actual[2]) == module_build_service.messaging.KojiTagChange

+         assert type(actual[2]) == events.KojiTagChange

          assert actual[2].tag == "module-foo"

          assert actual[2].artifact == "rubygem-rails"

          assert component_build.state == koji.BUILD_STATES["COMPLETE"]
@@ -197,7 +198,7 @@ 

  

          actual = builder.recover_orphaned_artifact(component_build)

          assert len(actual) == 1

-         assert type(actual[0]) == module_build_service.messaging.KojiBuildChange

+         assert type(actual[0]) == events.KojiBuildChange

          assert actual[0].build_id == 91

          assert actual[0].task_id == 12345

          assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"]
@@ -248,7 +249,7 @@ 

  

          actual = builder.recover_orphaned_artifact(component_build)

          assert len(actual) == 1

-         assert type(actual[0]) == module_build_service.messaging.KojiBuildChange

+         assert type(actual[0]) == events.KojiBuildChange

          assert actual[0].build_id == 91

          assert actual[0].task_id == 12345

          assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"]

file modified
+8 -4
@@ -1,7 +1,8 @@ 

  # -*- coding: utf-8 -*-

  # SPDX-License-Identifier: MIT

+ 

  from module_build_service import messaging

- from module_build_service.messaging import KojiRepoChange  # noqa

+ from module_build_service.scheduler.parser import FedmsgMessageParser

  

  

  class TestFedmsgMessaging:
@@ -25,7 +26,8 @@ 

              "topic": "org.fedoraproject.prod.buildsys.build.state.change",

          }

  

-         msg = messaging.FedmsgMessageParser().parse(buildsys_state_change_msg)

+         parser = FedmsgMessageParser(messaging.known_fedmsg_services)

+         msg = parser.parse(buildsys_state_change_msg)

  

          assert msg.build_id == 614503

          assert msg.build_new_state == 1
@@ -49,7 +51,8 @@ 

              "topic": "org.fedoraproject.prod.buildsys.tag",

          }

  

-         msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg)

+         parser = FedmsgMessageParser(messaging.known_fedmsg_services)

+         msg = parser.parse(buildsys_tag_msg)

  

          assert msg.tag == "module-debugging-tools-master-20170405115403-build"

          assert msg.artifact == "module-build-macros"
@@ -68,6 +71,7 @@ 

              "topic": "org.fedoraproject.prod.buildsys.repo.done",

          }

  

-         msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg)

+         parser = FedmsgMessageParser(messaging.known_fedmsg_services)

+         msg = parser.parse(buildsys_tag_msg)

  

          assert msg.repo_tag == "module-f0f7e44f3c6cccab-build"

@@ -2,7 +2,7 @@ 

  # SPDX-License-Identifier: MIT

  from mock import patch, MagicMock

  from module_build_service.scheduler.consumer import MBSConsumer

- from module_build_service.messaging import KojiTagChange, KojiRepoChange

+ from module_build_service.scheduler.events import KojiTagChange, KojiRepoChange

  

  

  class TestConsumer:

@@ -48,7 +48,7 @@ 

          "python": "3",

          "ruby": "2.6",

      }

-     defaults_added = default_modules.add_default_modules(db_session, mmd, ["x86_64"])

+     defaults_added = default_modules.add_default_modules(db_session, mmd)

      # Make sure that the default modules were added. ruby:2.6 will be ignored since it's not in

      # the database

      assert set(mmd.get_xmd()["mbs"]["buildrequires"].keys()) == {"nodejs", "platform", "python"}
@@ -68,7 +68,7 @@ 

      clean_database()

      mmd = load_mmd(read_staged_data("formatted_testmodule.yaml"))

      assert set(mmd.get_xmd()["mbs"]["buildrequires"].keys()) == {"platform"}

-     default_modules.add_default_modules(db_session, mmd, ["x86_64"])

+     default_modules.add_default_modules(db_session, mmd)

      assert set(mmd.get_xmd()["mbs"]["buildrequires"].keys()) == {"platform"}

      mock_get_dm.assert_not_called()

  
@@ -84,7 +84,7 @@ 

  

      expected_error = "Failed to retrieve the module platform:f28:3:00000000 from the database"

      with pytest.raises(RuntimeError, match=expected_error):

-         default_modules.add_default_modules(db_session, mmd, ["x86_64"])

+         default_modules.add_default_modules(db_session, mmd)

  

  

  @patch("module_build_service.scheduler.default_modules._get_default_modules")
@@ -118,7 +118,7 @@ 

      mock_get_dm.side_effect = ValueError(expected_error)

  

      with pytest.raises(ValueError, match=expected_error):

-         default_modules.add_default_modules(db_session, mmd, ["x86_64"])

+         default_modules.add_default_modules(db_session, mmd)

  

  

  @pytest.mark.parametrize("is_rawhide", (True, False))

@@ -76,7 +76,7 @@ 

      @patch("module_build_service.scheduler.handlers.greenwave.log")

      def test_decision_context_is_not_match(self, log, db_session):

          msg = Mock(msg_id="msg-id-1", decision_context="bodhi_update_push_testing")

-         decision_update(conf, db_session, msg)

+         decision_update(db_session, msg)

          log.debug.assert_called_once_with(

              'Skip Greenwave message %s as MBS only handles messages with the decision context "%s"',

              "msg-id-1",
@@ -91,7 +91,7 @@ 

              policies_satisfied=False,

              subject_identifier="pkg-0.1-1.c1",

          )

-         decision_update(conf, db_session, msg)

+         decision_update(db_session, msg)

          log.debug.assert_called_once_with(

              "Skip to handle module build %s because it has not satisfied Greenwave policies.",

              msg.subject_identifier,

@@ -11,6 +11,7 @@ 

  from module_build_service import build_logs

  from module_build_service.models import make_db_session, ModuleBuild

  from module_build_service.utils.general import mmd_to_str, load_mmd

+ from module_build_service.scheduler.events import MBSModule

  

  

  class TestModuleInit:
@@ -71,11 +72,8 @@ 

          platform_build.modulemd = mmd_to_str(mmd)

          db_session.commit()

  

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="init"

-         )

- 

-         self.fn(config=conf, db_session=db_session, msg=msg)

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init")

+         self.fn(db_session=db_session, msg=msg)

  

          build = ModuleBuild.get_by_id(db_session, 2)

          # Make sure the module entered the wait state
@@ -115,9 +113,8 @@ 

              get_latest_error=RuntimeError("Failed in mocked_scm_get_latest")

          )

  

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="init")

-         self.fn(config=conf, db_session=db_session, msg=msg)

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init")

+         self.fn(db_session=db_session, msg=msg)

  

          build = ModuleBuild.get_by_id(db_session, 2)

          # Make sure the module entered the failed state
@@ -142,9 +139,8 @@ 

          scmurl = "git://pkgs.domain.local/modules/includedmodule?#da95886"

          ModuleBuild.create(

              db_session, conf, "includemodule", "1", 3, mmd_to_str(mmd), scmurl, "mprahl")

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=3, module_build_state="init")

-         self.fn(config=conf, db_session=db_session, msg=msg)

+         msg = MBSModule(msg_id=None, module_build_id=3, module_build_state="init")

+         self.fn(db_session=db_session, msg=msg)

          build = ModuleBuild.get_by_id(db_session, 3)

          assert build.state == 1

          assert build.name == "includemodule"
@@ -178,12 +174,11 @@ 

              "7035bd33614972ac66559ac1fdd019ff6027ad22",

              get_latest_raise=True,

          )

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="init")

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init")

          build = ModuleBuild.get_by_id(db_session, 2)

          mocked_from_module_event.return_value = build

  

-         self.fn(config=conf, db_session=db_session, msg=msg)

+         self.fn(db_session=db_session, msg=msg)

  

          # Query the database again to make sure the build object is updated

          db_session.refresh(build)

@@ -12,6 +12,7 @@ 

  from module_build_service import build_logs, Modulemd

  from module_build_service.utils.general import load_mmd

  from module_build_service.models import ComponentBuild, ModuleBuild

+ from module_build_service.scheduler.events import MBSModule

  

  base_dir = os.path.dirname(os.path.dirname(__file__))

  
@@ -58,10 +59,9 @@ 

  

          from_module_event.return_value = mocked_module_build

  

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=1, module_build_state="some state")

+         msg = MBSModule(msg_id=None, module_build_id=1, module_build_state="some state")

          with patch("module_build_service.resolver.GenericResolver.create"):

-             self.fn(config=self.config, db_session=self.session, msg=msg)

+             self.fn(db_session=self.session, msg=msg)

  

      @patch(

          "module_build_service.builder.GenericBuilder.default_buildroot_groups",
@@ -97,11 +97,10 @@ 

          resolver.get_module_tag.return_value = "module-testmodule-master-20170109091357"

  

          generic_resolver.create.return_value = resolver

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="some state")

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state")

  

          module_build_service.scheduler.handlers.modules.wait(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          koji_session.newRepo.assert_called_once_with("module-123-build")

  
@@ -145,11 +144,10 @@ 

          resolver.get_module_tag.return_value = "module-testmodule-master-20170109091357"

  

          generic_resolver.create.return_value = resolver

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="some state")

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state")

  

          module_build_service.scheduler.handlers.modules.wait(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          assert koji_session.newRepo.called

  
@@ -192,11 +190,10 @@ 

          }

  

          generic_resolver.create.return_value = resolver

-         msg = module_build_service.messaging.MBSModule(

-             msg_id=None, module_build_id=2, module_build_state="some state")

+         msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state")

  

          module_build_service.scheduler.handlers.modules.wait(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          module_build = ModuleBuild.get_by_id(db_session, 2)

          assert module_build.cg_build_koji_tag == "modular-updates-candidate"
@@ -265,11 +262,9 @@ 

              new=koji_cg_tag_build,

          ):

              generic_resolver.create.return_value = resolver

-             msg = module_build_service.messaging.MBSModule(

-                 msg_id=None, module_build_id=2, module_build_state="some state"

-             )

+             msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state")

              module_build_service.scheduler.handlers.modules.wait(

-                 config=conf, db_session=db_session, msg=msg

+                 db_session=db_session, msg=msg

              )

              module_build = ModuleBuild.get_by_id(db_session, 2)

              assert module_build.cg_build_koji_tag == expected_cg_koji_build_tag

@@ -8,7 +8,7 @@ 

  import mock

  import koji

  from module_build_service.scheduler.producer import MBSProducer

- from module_build_service.messaging import KojiTagChange

+ from module_build_service.scheduler.events import KojiTagChange

  import six.moves.queue as queue

  from datetime import datetime, timedelta

  

@@ -6,7 +6,8 @@ 

  import module_build_service.scheduler.handlers.repos

  import module_build_service.models

  from module_build_service.models import ComponentBuild

- from tests import conf, scheduler_init_data

+ from module_build_service.scheduler.events import KojiRepoChange

+ from tests import scheduler_init_data

  

  

  class TestRepoDone:
@@ -18,10 +19,10 @@ 

          """

          scheduler_init_data(db_session)

          from_repo_done_event.return_value = None

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "no matches for this...", "2016-some-nonexistent-build")

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

      @mock.patch(

          "module_build_service.builder.KojiModuleBuilder."
@@ -56,10 +57,10 @@ 

          get_session.return_value = mock.Mock(), "development"

          build_fn.return_value = 1234, 1, "", None

  

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build")

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          build_fn.assert_called_once_with(

              artifact_name="tangerine",

              source=(
@@ -118,10 +119,10 @@ 

  

          finalizer.side_effect = mocked_finalizer

  

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build")

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          finalizer.assert_called_once()

  
@@ -159,10 +160,10 @@ 

          config.return_value = mock.Mock(), "development"

          build_fn.return_value = None, 4, "Failed to submit artifact tangerine to Koji", None

  

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build")

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          build_fn.assert_called_once_with(

              artifact_name="tangerine",

              source=(
@@ -186,11 +187,11 @@ 

          component_build.tagged = False

          db_session.commit()

  

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build")

  

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          mock_log_info.assert_called_with(

              "Ignoring repo regen, because not all components are tagged."
@@ -227,10 +228,10 @@ 

          config.return_value = mock.Mock(), "development"

          build_fn.return_value = None, 4, "Failed to submit artifact x to Koji", None

  

-         msg = module_build_service.messaging.KojiRepoChange(

+         msg = KojiRepoChange(

              "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build")

          module_build_service.scheduler.handlers.repos.done(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          module_build = module_build_service.models.ModuleBuild.get_by_id(db_session, 2)

  

          assert module_build.state == module_build_service.models.BUILD_STATES["failed"]

@@ -9,7 +9,8 @@ 

  import module_build_service.scheduler.handlers.repos

  import module_build_service.scheduler.handlers.tags

  import module_build_service.models

- from tests import conf

+ 

+ from module_build_service.scheduler.events import KojiTagChange

  

  import koji

  
@@ -23,23 +24,23 @@ 

          that we do nothing gracefully.

          """

          from_tag_change_event.return_value = None

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "no matches for this...", "2016-some-nonexistent-build", "artifact", "artifact-1.2-1")

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

      def test_no_matching_artifact(self, db_session):

          """ Test that when a tag msg hits us and we have no match,

          that we do nothing gracefully.

          """

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "artifact",

              "artifact-1.2-1",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

      @patch(

          "module_build_service.builder.GenericBuilder.default_buildroot_groups",
@@ -85,24 +86,24 @@ 

          db_session.commit()

  

          # Tag the first component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg

+             db_session=db_session, msg=msg

          )

          # Tag the first component to the final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg

+             db_session=db_session, msg=msg

          )

  

          # newRepo should not be called, because there are still components
@@ -110,14 +111,14 @@ 

          assert not koji_session.newRepo.called

  

          # Tag the second component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg

+             db_session=db_session, msg=msg

          )

  

          # newRepo should not be called, because the component has not been
@@ -125,14 +126,14 @@ 

          assert not koji_session.newRepo.called

  

          # Tag the first component to the final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should be called now - all components have been tagged.

          koji_session.newRepo.assert_called_once_with(
@@ -181,23 +182,23 @@ 

          db_session.commit()

  

          # Tag the perl-List-Compare component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the perl-List-Compare component to final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should not be called, because perl-List-Compare has not been

          # built yet.
@@ -251,24 +252,24 @@ 

          db_session.commit()

  

          # Tag the perl-List-Compare component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg

+             db_session=db_session, msg=msg

          )

          # Tag the perl-List-Compare component to final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should be called now - all successfully built

          # components have been tagged.
@@ -327,69 +328,69 @@ 

          db_session.commit()

  

          # Tag the first component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the first component to the final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should not be called, because there are still components

          # to tag.

          assert not koji_session.newRepo.called

  

          # Tag the second component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the second component to final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should not be called, because there are still components

          # to tag.

          assert not koji_session.newRepo.called

  

          # Tag the component from first batch to final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "module-build-macros",

              "module-build-macros-0.1-1.module+0+b0a1d1f7",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the component from first batch to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "module-build-macros",

              "module-build-macros-0.1-1.module+0+b0a1d1f7",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should be called now - all components have been tagged.

          koji_session.newRepo.assert_called_once_with(
@@ -454,33 +455,33 @@ 

          db_session.commit()

  

          # Tag the perl-Tangerine component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          assert not koji_session.newRepo.called

          # Tag the perl-List-Compare component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the perl-List-Compare component to final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # newRepo should be called now - all successfully built

          # components have been tagged.
@@ -553,41 +554,41 @@ 

          db_session.commit()

  

          # Tag the first component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the first component to the final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-Tangerine",

              "perl-Tangerine-0.23-1.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the second component to the buildroot.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c-build",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

          # Tag the second component to the final tag.

-         msg = module_build_service.messaging.KojiTagChange(

+         msg = KojiTagChange(

              "id",

              "module-testmodule-master-20170219191323-c40c156c",

              "perl-List-Compare",

              "perl-List-Compare-0.53-5.module+0+d027b723",

          )

          module_build_service.scheduler.handlers.tags.tagged(

-             config=conf, db_session=db_session, msg=msg)

+             db_session=db_session, msg=msg)

  

          # All components are tagged, newRepo should be called if there are no active tasks.

          if expect_new_repo:

@@ -16,6 +16,7 @@ 

  from module_build_service.errors import ProgrammingError, ValidationError, UnprocessableEntity

  from module_build_service.utils.general import load_mmd

  from module_build_service.utils.submit import format_mmd

+ from module_build_service.scheduler.events import KojiBuildChange, KojiRepoChange

  from tests import (

      clean_database,

      init_data,
@@ -1172,7 +1173,7 @@ 

          # to BUILDING, so KojiBuildChange message handler handles the change

          # properly.

          for msg in further_work:

-             if type(msg) == module_build_service.messaging.KojiBuildChange:

+             if type(msg) == KojiBuildChange:

                  assert msg.build_new_state == koji.BUILD_STATES["COMPLETE"]

                  component_build = models.ComponentBuild.from_component_event(db_session, msg)

                  assert component_build.state == koji.BUILD_STATES["BUILDING"]
@@ -1180,12 +1181,12 @@ 

          # When we handle these KojiBuildChange messages, MBS should tag all

          # the components just once.

          for msg in further_work:

-             if type(msg) == module_build_service.messaging.KojiBuildChange:

-                 module_build_service.scheduler.handlers.components.complete(conf, db_session, msg)

+             if type(msg) == KojiBuildChange:

+                 module_build_service.scheduler.handlers.components.complete(db_session, msg)

  

          # Since we have reused all the components in the batch, there should

          # be fake KojiRepoChange message.

-         assert type(further_work[-1]) == module_build_service.messaging.KojiRepoChange

+         assert type(further_work[-1]) == KojiRepoChange

  

          # Check that packages have been tagged just once.

          assert len(DummyModuleBuilder.TAGGED_COMPONENTS) == 2

no initial comment

Build #539 failed (commit: 6ce63f2).
Rebase or make new commits to rebuild.

@cqi I'll review this after #1487 is merged and this is rebased.

I think I don't understand the whole picture here, but I presume other PRs will show me your intention with this one :).

This PR seems to move code between python modules. I think it is OK if it unblocks your other work or makesit easier.

Generally +1 on this change.

rebased onto cb34e87

4 years ago

I have to close it because pagure.io seems have a bug, that is the base branch in pr is not refreshed after that base branch is updated by a force push.

Pull-Request has been closed by cqi

4 years ago
Changes Summary 28
+3 -2
file changed
module_build_service/builder/KojiModuleBuilder.py
+13 -275
file changed
module_build_service/messaging.py
+3 -2
file changed
module_build_service/models.py
+6 -1
file changed
module_build_service/scheduler/__init__.py
+17 -16
file changed
module_build_service/scheduler/consumer.py
+1 -3
file changed
module_build_service/scheduler/default_modules.py
+151
file added
module_build_service/scheduler/events.py
+17 -16
file changed
module_build_service/scheduler/handlers/components.py
+4 -4
file changed
module_build_service/scheduler/handlers/greenwave.py
+18 -18
file changed
module_build_service/scheduler/handlers/modules.py
+10 -10
file changed
module_build_service/scheduler/handlers/repos.py
+6 -5
file changed
module_build_service/scheduler/handlers/tags.py
+118
file added
module_build_service/scheduler/parser.py
+5 -4
file changed
module_build_service/scheduler/producer.py
+2 -3
file changed
module_build_service/utils/batches.py
+3 -3
file changed
module_build_service/utils/reuse.py
+13 -9
file changed
tests/test_build/test_build.py
+6 -5
file changed
tests/test_builder/test_koji.py
+8 -4
file changed
tests/test_messaging.py
+1 -1
file changed
tests/test_scheduler/test_consumer.py
+4 -4
file changed
tests/test_scheduler/test_default_modules.py
+2 -2
file changed
tests/test_scheduler/test_greenwave.py
+9 -14
file changed
tests/test_scheduler/test_module_init.py
+11 -16
file changed
tests/test_scheduler/test_module_wait.py
+1 -1
file changed
tests/test_scheduler/test_poller.py
+14 -13
file changed
tests/test_scheduler/test_repo_done.py
+48 -47
file changed
tests/test_scheduler/test_tag_tagged.py
+5 -4
file changed
tests/test_utils/test_utils.py