#1534 Add celery task router
Merged 8 months ago by qwan. Opened 8 months ago by qwan.
qwan/fm-orchestrator celery-route-task  into  v3

file modified
+3

@@ -133,6 +133,9 @@ 

      RPMS_ALLOW_REPOSITORY = True

      MODULES_ALLOW_REPOSITORY = True

  

+     # Celery tasks will be executed locally for local builds

+     CELERY_TASK_ALWAYS_EAGER = True

+ 

  

  class OfflineLocalBuildConfiguration(LocalBuildConfiguration):

      RESOLVER = "local"

@@ -662,13 +662,26 @@ 

                      "timeout loading a repo if the download speed is below minrate for the "

                      "duration of the timeout."

          },

+         "num_workers": {"type": int, "default": 1, "desc": "Number of Celery workers"},

+         "celery_task_always_eager": {

+             "type": bool,

+             "default": False,

+             "desc": "All Celery tasks will be executed locally by blocking until the task returns "

+                     "when this is True",

+         },

+         "celery_task_routes": {

+             "type": list,

+             "default": ["module_build_service.route.route_task"],

+             "desc": "A list of Celery routers. When deciding the final destination queue of a "

+                     "Celery task the routers are consulted in order",

+         },

          "celery_worker_prefetch_multiplier": {

              "type": int,

              "default": 1,

              "desc": "This defaults to 1 so that the worker doesn't fetch more messages than it can "

                      "handle at a time. This so that general tasks aren't starved when running "

                      "a long handler.",

-         }

+         },

      }

  

      def __init__(self, conf_section_obj):

@@ -0,0 +1,69 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: MIT

+ """ Define the router used to route Celery tasks to queues."""

+ import inspect

+ 

+ from module_build_service import conf, log, models

+ from module_build_service.db_session import db_session

+ from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build

+ 

+ 

+ def route_task(name, args, kwargs, options, task=None, **kw):

+     """

+     Figure out module build id from task args and route task to queue

+     per the module build id.
cqi commented 8 months ago
Figure out module build id from task args and route task to queue
per the module build id.

This is misleading. The purpose of route_task is to schedule event handlers related to one module build to a specific queue, and periodic tasks to mbs-default. So, the title of function's docstring could just tell Route event handlers and periodic tasks to specific queue. It's not a good practice to describe implementation details in the title.

+ 

+     Each celery worker will listens on two queues:
cqi commented 8 months ago
Each celery worker will listens on two queues:

I don't think it is a good idea to write this in docstring, because Each celery worker will listens on two queues is totally a determination of deployment rather than a feature implemented. The ansible playbook is a proper place to write down this statement if the deployment is done in this way.

+         1. mbs-default

+         2. mbs-{number}  # where number is "module_build_id % conf.num_workers"

+     If a task is associated with a module build, route it to the queue

+     named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure

+     tasks for a module build can run on the same worker serially.

+     """

+     queue_name = "mbs-default"

+ 

+     module_build_id = None

+     num_workers = conf.num_workers

+ 

+     module, handler_name = name.rsplit(".", 1)

+     handler = getattr(__import__(module, fromlist=[handler_name]), handler_name)

+     # handlers can be decorated, inspect the original function

+     while getattr(handler, "__wrapped__", None):

+         handler = handler.__wrapped__

+     handler_args = inspect.getargspec(handler).args

+ 

+     def _get_handler_arg(name):

+         index = handler_args.index(name)

+         arg_value = kwargs.get(name, None)

+         if arg_value is None and len(args) > index:

+             arg_value = args[index]

+         return arg_value

+ 

+     if "module_build_id" in handler_args:

+         module_build_id = _get_handler_arg("module_build_id")

+ 

+     # if module_build_id is not found, we may be able to figure it out

+     # by checking other arguments

+     if module_build_id is None:

+         if "task_id" in handler_args:

+             task_id = _get_handler_arg("task_id")

+             component_build = models.ComponentBuild.from_component_event(db_session, task_id)

+             if component_build:

+                 module_build_id = component_build.module_build.id

+         elif "tag_name" in handler_args:

+             tag_name = _get_handler_arg("tag_name")

+             module_build = models.ModuleBuild.get_by_tag(db_session, tag_name)

+             if module_build:

+                 module_build_id = module_build.id

+         elif "subject_identifier" in handler_args:

+             module_build_nvr = _get_handler_arg("subject_identifier")

+             module_build = get_corresponding_module_build(module_build_nvr)

+             if module_build is not None:

+                 module_build_id = module_build.id

+ 

+     if module_build_id is not None:

+         queue_name = "mbs-{}".format(module_build_id % num_workers)

+ 

+     taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw}

+     log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo))

+     return {"queue": queue_name}

@@ -204,7 +204,7 @@ 

          if event == events.KOJI_REPO_CHANGE:

              return (

                  ON_REPO_CHANGE_HANDLER,

-                 models.ModuleBuild.get_by_tag(db_session, event_info["repo_tag"])

+                 models.ModuleBuild.get_by_tag(db_session, event_info["tag_name"])

              )

  

          if event == events.KOJI_TAG_CHANGE:

@@ -66,22 +66,20 @@ 

  scheduler = Scheduler(time.time, delayfunc=lambda x: x)

  

  

- def mbs_event_handler():

+ def mbs_event_handler(func):

      """

      A decorator for MBS event handlers. It implements common tasks which should otherwise

      be repeated in every MBS event handler, for example:

  

        - at the end of handler, call events.scheduler.run().

      """

- 

-     def decorator(func):

-         @wraps(func)

-         def wrapper(*args, **kwargs):

-             try:

-                 return func(*args, **kwargs)

-             finally:

-                 scheduler.run()

- 

-         return wrapper

- 

-     return decorator

+     @wraps(func)

+     def wrapper(*args, **kwargs):

+         try:

+             return func(*args, **kwargs)

+         finally:

+             scheduler.run()

+     # save origin function as functools.wraps from python2 doesn't preserve the signature

+     if not hasattr(wrapper, "__wrapped__"):

+         wrapper.__wrapped__ = func

+     return wrapper

@@ -17,7 +17,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def build_task_finalize(

          msg_id, build_id, task_id, build_new_state,

          build_name, build_version, build_release,

@@ -33,7 +33,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied):

      """Move module build to ready or failed according to Greenwave result

  

@@ -41,7 +41,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def failed(msg_id, module_build_id, module_build_state):

      """Called whenever a module enters the 'failed' state.

  

@@ -102,7 +102,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def done(msg_id, module_build_id, module_build_state):

      """Called whenever a module enters the 'done' state.

  

@@ -141,7 +141,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def init(msg_id, module_build_id, module_build_state):

      """Called whenever a module enters the 'init' state.

  

@@ -317,7 +317,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def wait(msg_id, module_build_id, module_build_state):

      """ Called whenever a module enters the 'wait' state.

  

@@ -14,21 +14,21 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

- def done(msg_id, repo_tag):

+ @events.mbs_event_handler

+ def done(msg_id, tag_name):

      """Called whenever koji rebuilds a repo, any repo.

  

      :param str msg_id: the original id of the message being handled which is

          received from the message bus.

-     :param str repo_tag: the tag name from which the repo is generated.

+     :param str tag_name: the tag name from which the repo is generated.

      """

  

      # First, find our ModuleBuild associated with this repo, if any.

-     if conf.system in ("koji", "test") and not repo_tag.endswith("-build"):

-         log.debug("Tag %r does not end with '-build' suffix, ignoring", repo_tag)

+     if conf.system in ("koji", "test") and not tag_name.endswith("-build"):

+         log.debug("Tag %r does not end with '-build' suffix, ignoring", tag_name)

          return

-     tag = repo_tag[:-6] if repo_tag.endswith("-build") else repo_tag

-     module_build = models.ModuleBuild.get_by_tag(db_session, repo_tag)

+     tag = tag_name[:-6] if tag_name.endswith("-build") else tag_name

+     module_build = models.ModuleBuild.get_by_tag(db_session, tag_name)

      if not module_build:

          log.debug("No module build found associated with koji tag %r" % tag)

          return

@@ -13,7 +13,7 @@ 

  

  

  @celery_app.task

- @events.mbs_event_handler()

+ @events.mbs_event_handler

  def tagged(msg_id, tag_name, build_name, build_nvr):

      """Called whenever koji tags a build to tag.

  

@@ -92,7 +92,7 @@ 

                      return {

                          "msg_id": msg_id,

                          "event": events.KOJI_REPO_CHANGE,

-                         "repo_tag": msg_inner_msg.get("tag")

+                         "tag_name": msg_inner_msg.get("tag")

                      }

  

                  if event == "tag":

@@ -1825,7 +1825,7 @@ 

          events_info = [{

              "msg_id": "a faked internal message",

              "event": events.KOJI_REPO_CHANGE,

-             "repo_tag": module.koji_tag + "-build"

+             "tag_name": module.koji_tag + "-build"

          }]

          db_session.expire_all()

          # Stop after processing the seeded message

@@ -0,0 +1,116 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: MIT

+ import mock

+ 

+ from module_build_service import celery_app, conf

+ from module_build_service.scheduler.handlers import components, greenwave, modules, repos, tags

+ from module_build_service.scheduler.producer import fail_lost_builds

+ 

+ from tests import scheduler_init_data

+ 

+ 

+ @mock.patch.object(conf, "num_workers", create=True, new=3)

+ @mock.patch("celery.app.amqp.AMQP.send_task_message")

+ class TestCeleryRouteTask:

+     def setup_method(self, test_method):

+         self.old_task_always_eager = celery_app.conf.get("task_always_eager")

+         celery_app.conf.update(task_always_eager=False)

+ 

+     def teardown_method(self, test_method):

+         celery_app.conf.update(task_always_eager=self.old_task_always_eager)

+ 

+     def test_route_modules_init_task(self, send_task_message):

+         modules.init.delay("fakemsg", 2, 0)

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     def test_route_modules_init_task_call_with_kwargs(self, send_task_message):

+         kwargs = {

+             "msg_id": "fakemsg",

+             "module_build_id": 2,

+             "module_build_state": 0,

+         }

+         modules.init.delay(**kwargs)

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     def test_route_modules_wait_task(self, send_task_message):

+         modules.wait.delay("fakemsg", 3, 1)

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-0"

+ 

+     def test_route_modules_done_task(self, send_task_message):

+         modules.done.delay("fakemsg", 22, 3)

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-1"

+ 

+     def test_route_modules_failed_task(self, send_task_message):

+         modules.failed.delay("fakemsg", 23, 4)

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     def test_route_components_build_task_finalize_task(self, send_task_message):

+         scheduler_init_data()

+         components.build_task_finalize.delay(

+             "fakemsg", 123, 90276228, 1, "perl-Tangerine", "0.23", "1.module+f28+2+814cfa39")

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     def test_route_components_build_task_finalize_task_without_a_module(self, send_task_message):

+         scheduler_init_data()

+         components.build_task_finalize.delay(

+             "fakemsg", 123, 123456, 1, "hostname", "0.1", "1.module+f28+2+814cfa39")

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-default"

+ 

+     def test_route_repos_done_task(self, send_task_message):

+         scheduler_init_data()

+         repos.done.delay("fakemsg", "module-testmodule-master-20170109091357-7c29193d-build")

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     def test_route_repos_done_task_without_a_module(self, send_task_message):

+         scheduler_init_data()

+         repos.done.delay("fakemsg", "no-module-build-exist")

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-default"

+ 

+     def test_route_tags_tagged_task(self, send_task_message):

+         scheduler_init_data()

+         tags.tagged.delay(

+             "fakemsg", "module-testmodule-master-20170109091357-7c29193d-build",

+             "perl-Tangerine", "perl-Tangerine-0.23-1.module+f28+2+814cfa39")

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-2"

+ 

+     @mock.patch("koji.ClientSession")

+     def test_route_greenwave_decision_update_task(self, kojisession, send_task_message):

+         kojisession.return_value.getBuild.return_value = {

+             "extra": {"typeinfo": {"module": {"module_build_service_id": 1}}}

+         }

+         scheduler_init_data()

+         greenwave.decision_update.delay(

+             "fakemsg",

+             decision_context="test_dec_context",

+             subject_identifier="module-testmodule-master-20170109091357-7c29193d-build",

+             policies_satisfied=False

+         )

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-1"

+ 

+     def test_route_fail_lost_builds_task(self, send_task_message):

+         fail_lost_builds.delay()

+         queue = send_task_message.call_args[1].get("queue")

+         qname = queue.__dict__.get("name")

+         assert qname == "mbs-default"

file modified
+1 -1

@@ -74,4 +74,4 @@ 

          parser = FedmsgMessageParser(messaging.known_fedmsg_services)

          event_info = parser.parse(buildsys_tag_msg)

  

-         assert event_info["repo_tag"] == "module-f0f7e44f3c6cccab-build"

+         assert event_info["tag_name"] == "module-f0f7e44f3c6cccab-build"

@@ -76,4 +76,4 @@ 

          event_info = process_message.call_args[0][0]

          assert event_info["event"] == events.KOJI_REPO_CHANGE

          assert event_info["msg_id"] == msg["body"]["msg_id"]

-         assert event_info["repo_tag"] == msg["body"]["msg"]["tag"]

+         assert event_info["tag_name"] == msg["body"]["msg"]["tag"]

@@ -21,7 +21,7 @@ 

          get_by_tag.return_value = None

          module_build_service.scheduler.handlers.repos.done(

              msg_id="no matches for this...",

-             repo_tag="2016-some-nonexistent-build")

+             tag_name="2016-some-nonexistent-build")

  

      @mock.patch(

          "module_build_service.builder.KojiModuleBuilder."

@@ -58,7 +58,7 @@ 

  

          module_build_service.scheduler.handlers.repos.done(

              msg_id="some_msg_id",

-             repo_tag="module-testmodule-master-20170109091357-7c29193d-build")

+             tag_name="module-testmodule-master-20170109091357-7c29193d-build")

          build_fn.assert_called_once_with(

              artifact_name="tangerine",

              source=(

@@ -118,7 +118,7 @@ 

  

          module_build_service.scheduler.handlers.repos.done(

              msg_id="some_msg_id",

-             repo_tag="module-testmodule-master-20170109091357-7c29193d-build")

+             tag_name="module-testmodule-master-20170109091357-7c29193d-build")

  

          finalizer.assert_called_once()

  

@@ -158,7 +158,7 @@ 

  

          module_build_service.scheduler.handlers.repos.done(

              msg_id="some_msg_id",

-             repo_tag="module-testmodule-master-20170109091357-7c29193d-build")

+             tag_name="module-testmodule-master-20170109091357-7c29193d-build")

  

          build_fn.assert_called_once_with(

              artifact_name="tangerine",

@@ -185,7 +185,7 @@ 

  

          module_build_service.scheduler.handlers.repos.done(

              msg_id="some_msg_id",

-             repo_tag="module-testmodule-master-20170109091357-7c29193d-build")

+             tag_name="module-testmodule-master-20170109091357-7c29193d-build")

  

          mock_log_info.assert_called_with(

              "Ignoring repo regen, because not all components are tagged."

@@ -224,7 +224,7 @@ 

  

          module_build_service.scheduler.handlers.repos.done(

              msg_id="some_msg_id",

-             repo_tag="module-testmodule-master-20170109091357-7c29193d-build")

+             tag_name="module-testmodule-master-20170109091357-7c29193d-build")

  

          module_build = module_build_service.models.ModuleBuild.get_by_id(db_session, 2)

          assert module_build.state == module_build_service.models.BUILD_STATES["failed"]

Add route_task function to route celery tasks to different queues.
If we can figure out what the module build is a task ran for by
checking the task arguments, then we route this task to a queue
named:

"mbs-{}".format(module_build_id % num_workers)

"num_workers" has default value of 1, and can be changed in
backend_config.py. If module build id can't be figured out, task will
be routed to the default queue which is named "mbs-default".

While setting up the workers, the number of workers should match with
"num_workers" in config, and each worker will listen on two queues:

1. mbs-default
2. mbs-{number} # for example, the first worker listens on "mbs-0"

By this design, all tasks for a particular module build will be routed
to the same queue and run on the same worker serially.

Send for early review, trying generating fake messages to test the task router, and will add tests.

rebased onto bd955606cde7f38a17ffb3dd1d9070c898069449

8 months ago

Build #666 failed (commit: 639977d558a5e489c9f0dac93816e3d5927e4fb7).
Rebase or make new commits to rebuild.

Build #667 failed (commit: bd955606cde7f38a17ffb3dd1d9070c898069449).
Rebase or make new commits to rebuild.

rebased onto d04e9e1a2c4d0b4946884dcf790e172e479c3f58

8 months ago

Build #670 failed (commit: d04e9e1a2c4d0b4946884dcf790e172e479c3f58).
Rebase or make new commits to rebuild.

@qwan this is looking good.

What do you think about only checking the arguments instead of the handler names and arguments for the module build ID? I haven't tried it, but I think it could shorten the code and make it less fragile. If it doesn't work out, it's not a big deal. We rarely ever add any new handlers.

elif name.startswith("module_build_service.scheduler.handlers.repos"):
    repo_tag = kwargs.get("repo_tag")
    module_build = models.ModuleBuild.get_by_tag(db_session, repo_tag)

How should the case be handled is the module build is in build state? get_by_tag finds out module build with a specific tag from builds in build state.

get_by_tag could raise error RuntimeError if there are more than one module builds in build state with the give tag (the repo_tag in this case). How would it affect this route and what should do in case it happens?

     module_build = models.ModuleBuild.get_by_tag(db_session, tag_name)

Same as my previous comment.

if name.startswith("module_build_service.scheduler.handlers.modules"):
    module_build_id = kwargs.get("module_build_id", None)

This function is the core function for scheduling Celery tasks. IMO, we should do something if code cannot find out the info from specific argument to get the corresponding module build. At least, we should log something, or just make it fail and terminate the process since we want to ensure event handlers should be routed to one specific worker.

The reason why I'm thinking those are:

  • it would be easy for developers to debug, e.g. if some task is not scheduled to the queue correctly.
  • the code could impress the intention of how a task should be routed explicitly, and it is easy for developers to understand how the different cases should be handled.

I guess route function route_task covers the routing of periodic tasks and those will be scheduled to a default queue. If I'm right, I would suggest to make it explicit rather than implicit.

That is,

  • to define a queue name for periodic tasks specifically, mbs-default is too general I think.
  • check name argument in route_task. If it is a periodic task, return the queue.

rebased onto 28a14f4a60ff17b6d66b76359d5d480563bd39e3

8 months ago

Updated with changes:

  1. Add log in route_task for routing info
  2. Task can be called in different ways:
    task.delay(arg1, arg2, ...)
    task.delay(kwargs)
    so need check args or kwargs accordingly.
  3. Add unittests
  4. Init config for tests from backend_config. (Will explain this more in another comment).

get_by_tag could raise error RuntimeError if there are more than one module builds in build state with the give tag (the repo_tag in this case). How would it affect this route and what should do in case it happens?

If this can happen, code will raise exception before routing, because the same logic happens before the task is routed, check _map_message in module_build_service/scheduler/consumer.py.

I guess route function route_task covers the routing of periodic tasks and those will be scheduled to a default queue. If I'm right, I would suggest to make it explicit rather than implicit.
That is,

to define a queue name for periodic tasks specifically, mbs-default is too general I think.
check name argument in route_task. If it is a periodic task, return the queue.

I don't think it's necessary to introduce a new queue but do nothing more than the "mbs-default" queue, it also brings confusion for deploying, which need each worker to listen on three queues.

Build #677 failed (commit: 28a14f4a60ff17b6d66b76359d5d480563bd39e3).
Rebase or make new commits to rebuild.

I added "py.test" in backend_commands, because without this change, init_config uses "web_config.py" to initialize the Config object, however in tests, we need some celery related settings, and celery app is created in a early phase while importing module_build_service. It's obviously not a good idea to add such settings in web_config.py for testing purpose. That's why I made this change.

Personally, I'd prefer to revert the "split config" change completely (it was reverted partially), the purpose of splitting the config is to have frontend settings in "web_config.py" and "WebConfig", and backend settings in "backend_config.py" and "BackendConfig", however it turns out it's a little hard in current stage if we move these settings out from Config without hacks around tests and local build code. So before splitting MBS code to sub-packages or sub-components, I don't think there is benefit from the splitting change.
@cqi @mprahl, what's your opinion?

elif name.startswith("module_build_service.scheduler.handlers.repos"):
repo_tag = kwargs.get("repo_tag")
module_build = models.ModuleBuild.get_by_tag(db_session, repo_tag)

How should the case be handled is the module build is in build state? get_by_tag finds out module build with a specific tag from builds in build state.
get_by_tag could raise error RuntimeError if there are more than one module builds in build state with the give tag (the repo_tag in this case). How would it affect this route and what should do in case it happens?

There shouldn't be more than one module build with the same tag name since the tag name is derived from the NSVC, which is unique per module build.

I added "py.test" in backend_commands, because without this change, init_config uses "web_config.py" to initialize the Config object, however in tests, we need some celery related settings, and celery app is created in a early phase while importing module_build_service. It's obviously not a good idea to add such settings in web_config.py for testing purpose. That's why I made this change.
Personally, I'd prefer to revert the "split config" change completely (it was reverted partially), the purpose of splitting the config is to have frontend settings in "web_config.py" and "WebConfig", and backend settings in "backend_config.py" and "BackendConfig", however it turns out it's a little hard in current stage if we move these settings out from Config without hacks around tests and local build code. So before splitting MBS code to sub-packages or sub-components, I don't think there is benefit from the splitting change.
@cqi @mprahl, what's your opinion?

I'm fine if you feel the need to revert the change.

@qwan I think the following approach would make the code a lot simpler and shorter (the example assumes it has access to all of the arguments in route_task):

import inspect
module, handler_name = name.rsplit(".", 1)
# TODO: Use a loop to get the actual handler instead of assuming two decorators
handler = getattr(__import__(module, fromlist=[handler_name]), handler_name).__wrapped__.__wrapped__
handler_args = inspect.getargspec(handler).args

def _get_handler_arg(name):
    index = handler_args.index(name)
    return kwargs.get(name, args[index])

module_build_id = None
if "module_build_id" in handler_args:
    module_build_id = _get_handler_arg("module_build_id")
elif "task_id" in handler_args:
    task_id = _get_handler_arg("task_id")
    component_build = models.ComponentBuild.from_component_event(db_session, task_id)
    if component_build:
        module_build_id = component_build.module_build.id
# TODO: Continue with the other elifs

There shouldn't be more than one module build with the same tag name since the tag name is derived from the NSVC, which is unique per module build.

@mprahl Was a unique constraint created for field koji_tag in the database already?

I don't think it's necessary to introduce a new queue but do nothing more than the "mbs-default" queue, it also brings confusion for deploying, which need each worker to listen on three queues.

What is queue mbs-default used for, what's the default?

For this multiple backend workers refactor, there would be two kind of queues for the workers to pick up tasks. One is the queues with name pattern mbs-{} to schedule event handler tasks, another one is for deliver periodic tasks to workers. So, do you think does name mbs-default make sense in this use case? I didn't mean to create a third kind of queue for periodic tasks specifically. Can we just rename mbs-default to some more specific name for periodic tasks to make it easy to understand for any code reader?

In addition, can we put the queue name for periodic tasks, and the queue name format pattern of event handler tasks in config?

Instead of making function task_route complicated more and more for parsing the args and kwargs, I would suggest to reconsider the use of sched and remove it to make things simpler.

sched was added to remove the dependency of fedmsg-hub from fake events scheduled from inside event handler directly. However, all what it does could be replaced with Celery, that is just to call task.delay. That works as well for local build when task_always_eager is enabled. Meanwhile, sched also has a limition in Python 2.7 that prevents from passing argument as kwargs.

So, after removing sched, we can make a convention to call task.delay by passing arguments as kwargs only. This convention can make code easier to read, and also benefit the task_route function to be more simpler to extract relative info from just kwargs directly.

What is queue mbs-default used for, what's the default?
For this multiple backend workers refactor, there would be two kind of queues for the workers to pick up tasks. One is the queues with name pattern mbs-{} to schedule event handler tasks, another one is for deliver periodic tasks to workers. So, do you think does name mbs-default make sense in this use case? I didn't mean to create a third kind of queue for periodic tasks specifically. Can we just rename mbs-default to some more specific name for periodic tasks to make it easy to understand for any code reader?

Beside of the periodic tasks, some tasks can be scheduled without a module build associated, so we need a default queue to handle these tasks. Tasks in handlers.modules should always have a module build associated, but that is not guaranteed for other tasks under handlers.

So, after removing sched, we can make a convention to call task.delay by passing arguments as kwargs only. This convention can make code easier to read, and also benefit the task_route function to be more simpler to extract relative info from just kwargs directly.

I don't think it's a good idea, if a function signature says we can call it with a variable number of arguments, but actually we can't because only keyworded args can work, that would be weird.

Beside of the periodic tasks, some tasks can be scheduled without a module build associated, so we need a default queue to handle these tasks. Tasks in handlers.modules should always have a module build associated, but that is not guaranteed for other tasks under handlers.

What are the "some tasks"?

What do you mean by "but that is not guaranteed for other tasks under handlers."? Can you be more specific?

The design is all event handlers should be scheduled to one queue to ensure they run serially.

I don't think it's a good idea, if a function signature says we can call it with a variable number of arguments, but actually we can't because only keyworded args can work, that would be weird.

Can you be more specific as well? Does MBS have such a case you mentioned? Can you give an example?

Beside of the periodic tasks, some tasks can be scheduled without a module build associated, so we need a default queue to handle these tasks. Tasks in handlers.modules should always have a module build associated, but that is not guaranteed for other tasks under handlers.

What are the "some tasks"?
What do you mean by "but that is not guaranteed for other tasks under handlers."? Can you be more specific?
The design is all event handlers should be scheduled to one queue to ensure they run serially.

For example:
1. handlers.components.build_task_finalize when there is no component build in db associated with the brew task.
2. repos.done and tags.tagged when there is no module build associated with the tag or the module build is in other states other than "build".

I don't think it's a good idea, if a function signature says we can call it with a variable number of arguments, but actually we can't because only keyworded args can work, that would be weird.

Can you be more specific as well? Does MBS have such a case you mentioned? Can you give an example?

When you have a function defined as

def task(arg1, arg2):
    pass

and actually you can't call it with task(arg1, arg2) and only task(arg1=v1, arg2=v2) can work, do you think it makes sense?

@qwan I think the following approach would make the code a lot simpler and shorter (the example assumes it has access to all of the arguments in route_task)

This doesn't work under python2, functools.wrap in python2 doesn't preserve the original function's signature, so we we need to introduce some third-party module for decorator or write that in our code.

btw, this is not ready for merge as I reverted the config change with PR #1537, if there is no objection against that PR, I'll merge that and rebase this PR on top of it.

There shouldn't be more than one module build with the same tag name since the tag name is derived from the NSVC, which is unique per module build.

@mprahl Was a unique constraint created for field koji_tag in the database already?

There isn't, but it's a good idea.

What is queue mbs-default used for, what's the default?
For this multiple backend workers refactor, there would be two kind of queues for the workers to pick up tasks. One is the queues with name pattern mbs-{} to schedule event handler tasks, another one is for deliver periodic tasks to workers. So, do you think does name mbs-default make sense in this use case? I didn't mean to create a third kind of queue for periodic tasks specifically. Can we just rename mbs-default to some more specific name for periodic tasks to make it easy to understand for any code reader?

Beside of the periodic tasks, some tasks can be scheduled without a module build associated, so we need a default queue to handle these tasks. Tasks in handlers.modules should always have a module build associated, but that is not guaranteed for other tasks under handlers.

@qwan I think the following approach would make the code a lot simpler and shorter (the example assumes it has access to all of the arguments in route_task)

This doesn't work under python2, functools.wrap in python2 doesn't preserve the original function's signature, so we we need to introduce some third-party module for decorator or write that in our code.

If you alter the mbs_event_handler slightly, then it works in Python 2. You also need to remove () when calling the decorator so that the function is passed in. Here's an example that worked for me in Python 2:

from functools import wraps
import inspect
from celery import Celery

app = Celery("module-build-service")


def some_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    wrapper.__wrapped__ = func
    return wrapper


@app.task
@some_decorator
def some_handler(module_build_id, state, verbose=False):
    pass


print(inspect.getargspec(some_handler.__wrapped__.__wrapped__).args)

Beside of the periodic tasks, some tasks can be scheduled without a module build associated, so we need a default queue to handle these tasks. Tasks in handlers.modules should always have a module build associated, but that is not guaranteed for other tasks under handlers.

What are the "some tasks"?
What do you mean by "but that is not guaranteed for other tasks under handlers."? Can you be more specific?
The design is all event handlers should be scheduled to one queue to ensure they run serially.

@cqi the idea is that all periodic tasks would get sent to something like mbs-default. The underlying periodic tasks could schedule other tasks that are worker specific to ensure they are processed serially.

rebased onto 7283aaec6351adb2889eadb344ada7f3c840b180

8 months ago

PR is updated and rebased on top of latest v3, unittests passed locally.
Changes to the previous version:
1. Rename handler.repos.done argument name repo_tag to tag_name, so we don't need to deal with different names in function signatures of handler.repos.done and handler.tags.tagged.
2. Update the decorator of mbs_event_handler per @mprahl 's comments.

Optional: Since in Python 3 this is already set, we could do the following:

if not hasattr(wrapper, "__wrapped__"):
    wrapper.__wrapped__ = func

Also adding a comment explaining this would be nice.

Could you please add a docstring explaining the general logic of this? I think it'd be appreciated by the future maintainers of MBS. :smile:

I think you meant modules.done.delay("fakemsg", 22, 3)

I think you meant modules.failed.delay("fakemsg", 23, 4)

This is a good test, but we are planning to remove the Greenwave code soon. Could you pick one of the tasks in producer.py instead that will use the mbs-default queue?

PR is updated and rebased on top of latest v3, unittests passed locally.
Changes to the previous version:
1. Rename handler.repos.done argument name repo_tag to tag_name, so we don't need to deal with different names in function signatures of handler.repos.done and handler.tags.tagged.

I like this idea but you'll also need to modify:
https://github.com/release-engineering/mbs-messaging-umb/blob/f139c91f5d7da0334c024a3b3c2e04e2a89539e8/conf/config.py#L33

@qwan this PR looks great. Once you address my comments, and @cqi reviews it, this can be merged. There's no need to wait for me to review it again.

Can you make this configurable?

Can you make mbs-{} configurable as well?

I still think we should think about the root problem instead of making something new on top of that to make a complicated solution, which is error-prone and confusing. Feel free to merge. But, sorry, I would give -1 to this PR.

I still think we should think about the root problem instead of making something new on top of that to make a complicated solution, which is error-prone and confusing. Feel free to merge. But, sorry, I would give -1 to this PR.

Could you explain more on what's the problem here?

2 new commits added

  • Add celery task router
  • Rename `handler.repos.done` argument name "repo_tag" to "tag_name"
8 months ago

Patch is updated to address @mprahl 's comments.

Could you add this to module_build_service/config.py instead?

I still think we should think about the root problem instead of making something new on top of that to make a complicated solution, which is error-prone and confusing. Feel free to merge. But, sorry, I would give -1 to this PR.

Could you explain more on what's the problem here?

@cqi could you please elaborate? I'm open to other ideas, but I think having this complicated function is better than needing to compute the correct queue every time a Celery task is called. This makes the task routing logic centrally located rather than distributed.

rebased onto 9d39497b377c0fd934b7761ab462e860a0b01646

8 months ago

Could you add this to module_build_service/config.py instead?

Done.

2 new commits added

  • Add celery task router
  • Rename `handler.repos.done` argument name "repo_tag" to "tag_name"
8 months ago

2 new commits added

  • Add celery task router
  • Rename `handler.repos.done` argument name "repo_tag" to "tag_name"
8 months ago

Patch is updated to fix a test issue after rebase (because task_always_eager is enabled in test config, so our route_task tests failed).

Build #686 failed (commit: 4b2553742d1661b2e10eea9a02b83d6a383be7ea).
Rebase or make new commits to rebuild.

1 new commit added

  • Enable CELERY_TASK_ALWAYS_EAGER for local builds
8 months ago

Build #687 failed (commit: d16793bc50a8fb7fcd85d44abba2e17bcada6293).
Rebase or make new commits to rebuild.

Build #688 failed (commit: e81fc8240716889558386ad776b06573932db158).
Rebase or make new commits to rebuild.

rebased onto 84ffa8e

8 months ago

3 new commits added

  • Enable task_always_eager for local builds
  • Add celery task router
  • Rename `handler.repos.done` argument name "repo_tag" to "tag_name"
8 months ago

Rebased on top of latest (and rebased) v3 branch. Unit tests passed locally.

Build #692 failed (commit: 178934f2b594389acaf6e473f9aba94dae4cd763).
Rebase or make new commits to rebuild.

Build #693 failed (commit: 178934f2b594389acaf6e473f9aba94dae4cd763).
Rebase or make new commits to rebuild.

3 new commits added

  • Enable task_always_eager for local builds
  • Add celery task router
  • Rename `handler.repos.done` argument name "repo_tag" to "tag_name"
8 months ago

Build #694 failed (commit: 7c88912).
Rebase or make new commits to rebuild.

Talked with @cqi on irc, he's ok with merging this. So I'm going to do that.

Pull-Request has been merged by qwan

8 months ago

@mprahl

could you please elaborate? I'm open to other ideas, but I think having this complicated function is better than needing to compute the correct queue every time a Celery task is called. This makes the task routing logic centrally located rather than distributed.

I'm not against to add the function route_task. I've been suggesting to simplify the function route_task and to handle the issue in an explicit way as much as possible. I still have the thought in my previous comments[1][2]. Let me summary major points again here.

  • queue name mbs-default is too general. The name is not reflecting what's the purpose of the queue that would be used for. My comment mentioned that, MBS only has two kinds of queues, one is for scheduing periodic tasks, and another one is for the module builds. No more for now. From my perspective, the name should be more specific for what it is purposed. It is also a good design decision for message queue name for a service. And for developers who maintain MBS in the future, it is clear and easy to understand how many kinds of queues, how many kinds of messages sent to specific queues, etc.
  • queue name mbs-default and queue name pattern mbs-{} are not configurable but hardcoded in the route_task.
  • current implementation of route_task is smart by inspecting possible args and kwargs and different ways to call a event handler. But, we don't have to do it in this complicated way. My previous comment suggested to make a convention to call every event handler by passing kwargs only.
  • route_task does not handle periodic tasks explicitly. I can understand that periodic tasks are routed to queue mbs-default, because I know the original design and the implementation details. What about other people who is maintaining MBS? I believe such a person could understand everything eventually like us, but he/she has to spend more time to learn all the necessary things in order to understand this router function. That is why I suggested qwan to check periodic tasks and return the queue name for them explicitly.

[1] https://pagure.io/fm-orchestrator/pull-request/1534#comment-104418
[2] https://pagure.io/fm-orchestrator/pull-request/1534#comment-104419

Figure out module build id from task args and route task to queue
per the module build id.

This is misleading. The purpose of route_task is to schedule event handlers related to one module build to a specific queue, and periodic tasks to mbs-default. So, the title of function's docstring could just tell Route event handlers and periodic tasks to specific queue. It's not a good practice to describe implementation details in the title.

Each celery worker will listens on two queues:

I don't think it is a good idea to write this in docstring, because Each celery worker will listens on two queues is totally a determination of deployment rather than a feature implemented. The ansible playbook is a proper place to write down this statement if the deployment is done in this way.

Hi @cqi , sorry, I was not aware that you still have these objections when you said "you're feel to merge" but no response to "could you add a quick comment?". Anyway, we can still make improvements later even this is merged.

queue name mbs-default and queue name pattern mbs-{} are not configurable but hardcoded in the route_task.

Regarding your concerns on making the queue names configurable, I don't think it's a good idea to make them changeable, that brings confusions to deployment, at this moment, while deploying the service, we just need to check the "num_workers" in config, and all workers listens on the mbs-default and mbs-{num} queues, this is clear, if we make that configurable, configuration for deployment is unclear and easy to make mistakes. And I don't see any necessary to make them configurable, change the current mbs-{module_build_id % num_workers} will break our implementation, so we don't want people to change it as well.

queue name mbs-default is too general. The name is not reflecting what's the purpose of the queue that would be used for. My comment mentioned that, MBS only has two kinds of queues, one is for scheduing periodic tasks, and another one is for the module builds.

I've explained that in previous comments, beside of periodic tasks, other handler tasks can also have no module build associated and we need to route them to mbs-default. We only cares about the tasks associated with a particular module build, and route it to corresponding mbs-{num} queue, all other tasks go to the mbs-default, no matter it comes from "periodic tasks" or "normal handlers".

current implementation of route_task is smart by inspecting possible args and kwargs and different ways to call a event handler. But, we don't have to do it in this complicated way. My previous comment suggested to make a convention to call every event handler by passing kwargs only.

This was also explained in my previous comments, I think it's a bad idea when you have a function defined as:

def handler_task(arg1, arg2):
    pass

but you actually can't call it with handler_task(arg1, arg2), you need handler_task(arg1=v1, arg2=v2), otherwise it will break, this brings confusion to future maintainers. Though you can restrict the usage of such functions to only called by kwargs with function signatures, but it's not a good use case here.

Regarding the docstring comment, I can improve them later.

Hi @mprahl , since @cqi still have some concerns on the design here, I'd like to hear what you think about his above concerns, I'm open to accept any suggestion and make changes if you agree on that.

@mprahl

could you please elaborate? I'm open to other ideas, but I think having this complicated function is better than needing to compute the correct queue every time a Celery task is called. This makes the task routing logic centrally located rather than distributed.

I'm not against to add the function route_task. I've been suggesting to simplify the function route_task and to handle the issue in an explicit way as much as possible. I still have the thought in my previous comments[1][2]. Let me summary major points again here.

queue name mbs-default is too general. The name is not reflecting what's the purpose of the queue that would be used for. My comment mentioned that, MBS only has two kinds of queues, one is for scheduing periodic tasks, and another one is for the module builds. No more for now. From my perspective, the name should be more specific for what it is purposed. It is also a good design decision for message queue name for a service. And for developers who maintain MBS in the future, it is clear and easy to understand how many kinds of queues, how many kinds of messages sent to specific queues, etc.

In my view, there are only two types of queues. A queue for non-serial tasks, and a queue for serial tasks. The non-serial tasks just go to mbs-default, which just so happens to be all the scheduled tasks.

queue name mbs-default and queue name pattern mbs-{} are not configurable but hardcoded in the route_task.

Sure, making this configurable would be fine.

current implementation of route_task is smart by inspecting possible args and kwargs and different ways to call a event handler. But, we don't have to do it in this complicated way. My previous comment suggested to make a convention to call every event handler by passing kwargs only.

This is hard to enforce though. When a new developer writes a handler, how are they supposed to know it needs to be called with only kwargs? I think the added complexity in the routing function is worth it for a better developer experience.

route_task does not handle periodic tasks explicitly. I can understand that periodic tasks are routed to queue mbs-default, because I know the original design and the implementation details. What about other people who is maintaining MBS? I believe such a person could understand everything eventually like us, but he/she has to spend more time to learn all the necessary things in order to understand this router function. That is why I suggested qwan to check periodic tasks and return the queue name for them explicitly.

[1] https://pagure.io/fm-orchestrator/pull-request/1534#comment-104418
[2] https://pagure.io/fm-orchestrator/pull-request/1534#comment-104419

See my previous answer.

Each celery worker will listens on two queues:

I don't think it is a good idea to write this in docstring, because Each celery worker will listens on two queues is totally a determination of deployment rather than a feature implemented. The ansible playbook is a proper place to write down this statement if the deployment is done in this way.

In general, I agree with your sentiment. Unfortunately, due to the handlers needing to be run serially, the deployment details have to bleed into the code. I'd rather have this information in the docstring to explain this. Perhaps it would have been better if the docstring described a "typical scenario" or a "recommendation" rather than explicitly saying this is the way it has to be because you're right, it's possible that a different configuration would work.

@qwan, there are no blockers from me. I stated my opinion, and as long as the majority of us agree, I think we can move forward.

@qwan

As I mentioned before, I didn't have objection to merge this PR and move it forward. IMO, when mentioning a different thought on a PR during review, it does not always mean that blocks the merge. I just want to share what I'm thinking so that our thoughts could interchange, and we can understand each other clearly and make things better. I also think this is a good practice in the open source way.

By the way, when reading the words in comment, it is difficult to understand if no specific example is shown than a face-to-face talk. For example, you mentioned "other tasks" several times, and I asked the question "what are the other tasks" in order to ensure we are on the same page. But, I haven't got a concrete answer about it, so I don't know what's the difference between our thoughts and I'm confusing all the time on your explanation based on "other tasks". Again, in my mind, there are two kinds of tasks, the periodic tasks written in producer.py and the event handler tasks under scheduler/handlers. I'm not sure what other tasks are the "other tasks".

Anyway, I don't have any further comments to this PR. Thanks @mprahl and @qwan for the discussion. I can understand most of your thoughts on the implementation now. Let's keep moving it forward.

Metadata