From ca9d3a2af8499e42bb2969eb32499c921865d003 Mon Sep 17 00:00:00 2001 From: Qixiang Wan Date: Jan 16 2020 19:39:45 +0000 Subject: Add celery task router Add route_task function to route celery tasks to different queues. If we can figure out what the module build is a task ran for by checking the task arguments, then we route this task to a queue named: "mbs-{}".format(module_build_id % num_workers) "num_workers" has default value of 1, and can be changed in backend_config.py. If module build id can't be figured out, task will be routed to the default queue which is named "mbs-default". While setting up the workers, the number of workers should match with "num_workers" in config, and each worker will listen on two queues: 1. mbs-default 2. mbs-{number} # for example, the first worker listens on "mbs-0" By this design, all tasks for a particular module build will be routed to the same queue and run on the same worker serially. --- diff --git a/module_build_service/config.py b/module_build_service/config.py index ac4af45..dde5485 100644 --- a/module_build_service/config.py +++ b/module_build_service/config.py @@ -667,6 +667,19 @@ class Config(object): "default": 30, "desc": "The timeout configuration for dnf operations, in seconds." }, + "num_workers": {"type": int, "default": 1, "desc": "Number of Celery workers"}, + "celery_task_always_eager": { + "type": bool, + "default": False, + "desc": "All Celery tasks will be executed locally by blocking until the task returns " + "when this is True", + }, + "celery_task_routes": { + "type": list, + "default": ["module_build_service.route.route_task"], + "desc": "A list of Celery routers. When deciding the final destination queue of a " + "Celery task the routers are consulted in order", + }, "celery_worker_prefetch_multiplier": { "type": int, "default": 1, diff --git a/module_build_service/route.py b/module_build_service/route.py new file mode 100644 index 0000000..d130347 --- /dev/null +++ b/module_build_service/route.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT +""" Define the router used to route Celery tasks to queues.""" +import inspect + +from module_build_service import conf, log, models +from module_build_service.db_session import db_session +from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build + + +def route_task(name, args, kwargs, options, task=None, **kw): + """ + Figure out module build id from task args and route task to queue + per the module build id. + + Each celery worker will listens on two queues: + 1. mbs-default + 2. mbs-{number} # where number is "module_build_id % conf.num_workers" + If a task is associated with a module build, route it to the queue + named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure + tasks for a module build can run on the same worker serially. + """ + queue_name = "mbs-default" + + module_build_id = None + num_workers = conf.num_workers + + module, handler_name = name.rsplit(".", 1) + handler = getattr(__import__(module, fromlist=[handler_name]), handler_name) + # handlers can be decorated, inspect the original function + while getattr(handler, "__wrapped__", None): + handler = handler.__wrapped__ + handler_args = inspect.getargspec(handler).args + + def _get_handler_arg(name): + index = handler_args.index(name) + arg_value = kwargs.get(name, None) + if arg_value is None and len(args) > index: + arg_value = args[index] + return arg_value + + if "module_build_id" in handler_args: + module_build_id = _get_handler_arg("module_build_id") + + # if module_build_id is not found, we may be able to figure it out + # by checking other arguments + if module_build_id is None: + if "task_id" in handler_args: + task_id = _get_handler_arg("task_id") + component_build = models.ComponentBuild.from_component_event(db_session, task_id) + if component_build: + module_build_id = component_build.module_build.id + elif "tag_name" in handler_args: + tag_name = _get_handler_arg("tag_name") + module_build = models.ModuleBuild.get_by_tag(db_session, tag_name) + if module_build: + module_build_id = module_build.id + elif "subject_identifier" in handler_args: + module_build_nvr = _get_handler_arg("subject_identifier") + module_build = get_corresponding_module_build(module_build_nvr) + if module_build is not None: + module_build_id = module_build.id + + if module_build_id is not None: + queue_name = "mbs-{}".format(module_build_id % num_workers) + + taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw} + log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo)) + return {"queue": queue_name} diff --git a/module_build_service/scheduler/events.py b/module_build_service/scheduler/events.py index d1843fe..e4f2e5a 100644 --- a/module_build_service/scheduler/events.py +++ b/module_build_service/scheduler/events.py @@ -66,22 +66,20 @@ class Scheduler(sched.scheduler): scheduler = Scheduler(time.time, delayfunc=lambda x: x) -def mbs_event_handler(): +def mbs_event_handler(func): """ A decorator for MBS event handlers. It implements common tasks which should otherwise be repeated in every MBS event handler, for example: - at the end of handler, call events.scheduler.run(). """ - - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - finally: - scheduler.run() - - return wrapper - - return decorator + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + finally: + scheduler.run() + # save origin function as functools.wraps from python2 doesn't preserve the signature + if not hasattr(wrapper, "__wrapped__"): + wrapper.__wrapped__ = func + return wrapper diff --git a/module_build_service/scheduler/handlers/components.py b/module_build_service/scheduler/handlers/components.py index 4b46960..d6e8679 100644 --- a/module_build_service/scheduler/handlers/components.py +++ b/module_build_service/scheduler/handlers/components.py @@ -17,7 +17,7 @@ logging.basicConfig(level=logging.DEBUG) @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def build_task_finalize( msg_id, build_id, task_id, build_new_state, build_name, build_version, build_release, diff --git a/module_build_service/scheduler/handlers/greenwave.py b/module_build_service/scheduler/handlers/greenwave.py index b62a13c..73a6535 100644 --- a/module_build_service/scheduler/handlers/greenwave.py +++ b/module_build_service/scheduler/handlers/greenwave.py @@ -33,7 +33,7 @@ def get_corresponding_module_build(nvr): @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied): """Move module build to ready or failed according to Greenwave result diff --git a/module_build_service/scheduler/handlers/modules.py b/module_build_service/scheduler/handlers/modules.py index d303cd2..929710d 100644 --- a/module_build_service/scheduler/handlers/modules.py +++ b/module_build_service/scheduler/handlers/modules.py @@ -41,7 +41,7 @@ def get_artifact_from_srpm(srpm_path): @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def failed(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'failed' state. @@ -102,7 +102,7 @@ def failed(msg_id, module_build_id, module_build_state): @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def done(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'done' state. @@ -141,7 +141,7 @@ def done(msg_id, module_build_id, module_build_state): @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def init(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'init' state. @@ -317,7 +317,7 @@ def get_content_generator_build_koji_tag(module_deps): @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def wait(msg_id, module_build_id, module_build_state): """ Called whenever a module enters the 'wait' state. diff --git a/module_build_service/scheduler/handlers/repos.py b/module_build_service/scheduler/handlers/repos.py index 72c8c92..e5f9029 100644 --- a/module_build_service/scheduler/handlers/repos.py +++ b/module_build_service/scheduler/handlers/repos.py @@ -14,7 +14,7 @@ logging.basicConfig(level=logging.DEBUG) @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def done(msg_id, tag_name): """Called whenever koji rebuilds a repo, any repo. diff --git a/module_build_service/scheduler/handlers/tags.py b/module_build_service/scheduler/handlers/tags.py index ac38ca1..f1ff03a 100644 --- a/module_build_service/scheduler/handlers/tags.py +++ b/module_build_service/scheduler/handlers/tags.py @@ -13,7 +13,7 @@ logging.basicConfig(level=logging.DEBUG) @celery_app.task -@events.mbs_event_handler() +@events.mbs_event_handler def tagged(msg_id, tag_name, build_name, build_nvr): """Called whenever koji tags a build to tag. diff --git a/tests/test_celery_route_task.py b/tests/test_celery_route_task.py new file mode 100644 index 0000000..be2c7a0 --- /dev/null +++ b/tests/test_celery_route_task.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT +import mock + +from module_build_service import celery_app, conf +from module_build_service.scheduler.handlers import components, greenwave, modules, repos, tags +from module_build_service.scheduler.producer import fail_lost_builds + +from tests import scheduler_init_data + + +@mock.patch.object(conf, "num_workers", create=True, new=3) +@mock.patch("celery.app.amqp.AMQP.send_task_message") +class TestCeleryRouteTask: + def setup_method(self, test_method): + self.old_task_always_eager = celery_app.conf.get("task_always_eager") + celery_app.conf.update(task_always_eager=False) + + def teardown_method(self, test_method): + celery_app.conf.update(task_always_eager=self.old_task_always_eager) + + def test_route_modules_init_task(self, send_task_message): + modules.init.delay("fakemsg", 2, 0) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_modules_init_task_call_with_kwargs(self, send_task_message): + kwargs = { + "msg_id": "fakemsg", + "module_build_id": 2, + "module_build_state": 0, + } + modules.init.delay(**kwargs) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_modules_wait_task(self, send_task_message): + modules.wait.delay("fakemsg", 3, 1) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-0" + + def test_route_modules_done_task(self, send_task_message): + modules.done.delay("fakemsg", 22, 3) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-1" + + def test_route_modules_failed_task(self, send_task_message): + modules.failed.delay("fakemsg", 23, 4) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_components_build_task_finalize_task(self, send_task_message): + scheduler_init_data() + components.build_task_finalize.delay( + "fakemsg", 123, 90276228, 1, "perl-Tangerine", "0.23", "1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_components_build_task_finalize_task_without_a_module(self, send_task_message): + scheduler_init_data() + components.build_task_finalize.delay( + "fakemsg", 123, 123456, 1, "hostname", "0.1", "1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default" + + def test_route_repos_done_task(self, send_task_message): + scheduler_init_data() + repos.done.delay("fakemsg", "module-testmodule-master-20170109091357-7c29193d-build") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_repos_done_task_without_a_module(self, send_task_message): + scheduler_init_data() + repos.done.delay("fakemsg", "no-module-build-exist") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default" + + def test_route_tags_tagged_task(self, send_task_message): + scheduler_init_data() + tags.tagged.delay( + "fakemsg", "module-testmodule-master-20170109091357-7c29193d-build", + "perl-Tangerine", "perl-Tangerine-0.23-1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + @mock.patch("koji.ClientSession") + def test_route_greenwave_decision_update_task(self, kojisession, send_task_message): + kojisession.return_value.getBuild.return_value = { + "extra": {"typeinfo": {"module": {"module_build_service_id": 1}}} + } + scheduler_init_data() + greenwave.decision_update.delay( + "fakemsg", + decision_context="test_dec_context", + subject_identifier="module-testmodule-master-20170109091357-7c29193d-build", + policies_satisfied=False + ) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-1" + + def test_route_fail_lost_builds_task(self, send_task_message): + fail_lost_builds.delay() + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default"