From a842bd748f0e5a0fd4df85b69e8b2f58581a4569 Mon Sep 17 00:00:00 2001 From: mprahl Date: Jan 16 2020 19:39:46 +0000 Subject: Move route.py to scheduler/route.py --- diff --git a/module_build_service/common/config.py b/module_build_service/common/config.py index b84d52a..372ba01 100644 --- a/module_build_service/common/config.py +++ b/module_build_service/common/config.py @@ -707,7 +707,7 @@ class Config(object): }, "celery_task_routes": { "type": list, - "default": ["module_build_service.route.route_task"], + "default": ["module_build_service.scheduler.route.route_task"], "desc": "A list of Celery routers. When deciding the final destination queue of a " "Celery task the routers are consulted in order", }, diff --git a/module_build_service/route.py b/module_build_service/route.py deleted file mode 100644 index b0b3e14..0000000 --- a/module_build_service/route.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# SPDX-License-Identifier: MIT -""" Define the router used to route Celery tasks to queues.""" -import inspect - -from module_build_service import conf, log -from module_build_service.common import models -from module_build_service.scheduler.db_session import db_session -from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build - - -def route_task(name, args, kwargs, options, task=None, **kw): - """ - Figure out module build id from task args and route task to queue - per the module build id. - - Each celery worker will listens on two queues: - 1. mbs-default - 2. mbs-{number} # where number is "module_build_id % conf.num_workers" - If a task is associated with a module build, route it to the queue - named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure - tasks for a module build can run on the same worker serially. - """ - queue_name = "mbs-default" - - module_build_id = None - num_workers = conf.num_workers - - module, handler_name = name.rsplit(".", 1) - handler = getattr(__import__(module, fromlist=[handler_name]), handler_name) - # handlers can be decorated, inspect the original function - while getattr(handler, "__wrapped__", None): - handler = handler.__wrapped__ - handler_args = inspect.getargspec(handler).args - - def _get_handler_arg(name): - index = handler_args.index(name) - arg_value = kwargs.get(name, None) - if arg_value is None and len(args) > index: - arg_value = args[index] - return arg_value - - if "module_build_id" in handler_args: - module_build_id = _get_handler_arg("module_build_id") - - # if module_build_id is not found, we may be able to figure it out - # by checking other arguments - if module_build_id is None: - if "task_id" in handler_args: - task_id = _get_handler_arg("task_id") - component_build = models.ComponentBuild.from_component_event(db_session, task_id) - if component_build: - module_build_id = component_build.module_build.id - elif "tag_name" in handler_args: - tag_name = _get_handler_arg("tag_name") - module_build = models.ModuleBuild.get_by_tag(db_session, tag_name) - if module_build: - module_build_id = module_build.id - elif "subject_identifier" in handler_args: - module_build_nvr = _get_handler_arg("subject_identifier") - module_build = get_corresponding_module_build(module_build_nvr) - if module_build is not None: - module_build_id = module_build.id - - if module_build_id is not None: - queue_name = "mbs-{}".format(module_build_id % num_workers) - - taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw} - log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo)) - return {"queue": queue_name} diff --git a/module_build_service/scheduler/route.py b/module_build_service/scheduler/route.py new file mode 100644 index 0000000..b0b3e14 --- /dev/null +++ b/module_build_service/scheduler/route.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT +""" Define the router used to route Celery tasks to queues.""" +import inspect + +from module_build_service import conf, log +from module_build_service.common import models +from module_build_service.scheduler.db_session import db_session +from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build + + +def route_task(name, args, kwargs, options, task=None, **kw): + """ + Figure out module build id from task args and route task to queue + per the module build id. + + Each celery worker will listens on two queues: + 1. mbs-default + 2. mbs-{number} # where number is "module_build_id % conf.num_workers" + If a task is associated with a module build, route it to the queue + named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure + tasks for a module build can run on the same worker serially. + """ + queue_name = "mbs-default" + + module_build_id = None + num_workers = conf.num_workers + + module, handler_name = name.rsplit(".", 1) + handler = getattr(__import__(module, fromlist=[handler_name]), handler_name) + # handlers can be decorated, inspect the original function + while getattr(handler, "__wrapped__", None): + handler = handler.__wrapped__ + handler_args = inspect.getargspec(handler).args + + def _get_handler_arg(name): + index = handler_args.index(name) + arg_value = kwargs.get(name, None) + if arg_value is None and len(args) > index: + arg_value = args[index] + return arg_value + + if "module_build_id" in handler_args: + module_build_id = _get_handler_arg("module_build_id") + + # if module_build_id is not found, we may be able to figure it out + # by checking other arguments + if module_build_id is None: + if "task_id" in handler_args: + task_id = _get_handler_arg("task_id") + component_build = models.ComponentBuild.from_component_event(db_session, task_id) + if component_build: + module_build_id = component_build.module_build.id + elif "tag_name" in handler_args: + tag_name = _get_handler_arg("tag_name") + module_build = models.ModuleBuild.get_by_tag(db_session, tag_name) + if module_build: + module_build_id = module_build.id + elif "subject_identifier" in handler_args: + module_build_nvr = _get_handler_arg("subject_identifier") + module_build = get_corresponding_module_build(module_build_nvr) + if module_build is not None: + module_build_id = module_build.id + + if module_build_id is not None: + queue_name = "mbs-{}".format(module_build_id % num_workers) + + taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw} + log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo)) + return {"queue": queue_name} diff --git a/tests/test_celery_route_task.py b/tests/test_celery_route_task.py deleted file mode 100644 index b2ed59e..0000000 --- a/tests/test_celery_route_task.py +++ /dev/null @@ -1,116 +0,0 @@ -# -*- coding: utf-8 -*- -# SPDX-License-Identifier: MIT -import mock - -from module_build_service import celery_app, conf -from module_build_service.scheduler.handlers import components, greenwave, modules, repos, tags -from module_build_service.scheduler.producer import fail_lost_builds - -from tests import scheduler_init_data - - -@mock.patch.object(conf, "num_workers", create=True, new=3) -@mock.patch("celery.app.amqp.AMQP.send_task_message") -class TestCeleryRouteTask: - def setup_method(self, test_method): - self.old_task_always_eager = celery_app.conf.get("task_always_eager") - celery_app.conf.update(task_always_eager=False) - - def teardown_method(self, test_method): - celery_app.conf.update(task_always_eager=self.old_task_always_eager) - - def test_route_modules_init_task(self, send_task_message): - modules.init.delay("fakemsg", 2, 0) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - def test_route_modules_init_task_call_with_kwargs(self, send_task_message): - kwargs = { - "msg_id": "fakemsg", - "module_build_id": 2, - "module_build_state": 0, - } - modules.init.delay(**kwargs) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - def test_route_modules_wait_task(self, send_task_message): - modules.wait.delay("fakemsg", 3, 1) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-0" - - def test_route_modules_done_task(self, send_task_message): - modules.done.delay("fakemsg", 22, 3) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-1" - - def test_route_modules_failed_task(self, send_task_message): - modules.failed.delay("fakemsg", 23, 4) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - def test_route_components_build_task_finalize_task(self, send_task_message): - scheduler_init_data() - components.build_task_finalize.delay( - "fakemsg", 90276228, 1, "perl-Tangerine", "0.23", "1.module+f28+2+814cfa39") - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - def test_route_components_build_task_finalize_task_without_a_module(self, send_task_message): - scheduler_init_data() - components.build_task_finalize.delay( - "fakemsg", 123456, 1, "hostname", "0.1", "1.module+f28+2+814cfa39") - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-default" - - def test_route_repos_done_task(self, send_task_message): - scheduler_init_data() - repos.done.delay("fakemsg", "module-testmodule-master-20170109091357-7c29193d-build") - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - def test_route_repos_done_task_without_a_module(self, send_task_message): - scheduler_init_data() - repos.done.delay("fakemsg", "no-module-build-exist") - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-default" - - def test_route_tags_tagged_task(self, send_task_message): - scheduler_init_data() - tags.tagged.delay( - "fakemsg", "module-testmodule-master-20170109091357-7c29193d-build", - "perl-Tangerine-0.23-1.module+f28+2+814cfa39") - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-2" - - @mock.patch("koji.ClientSession") - def test_route_greenwave_decision_update_task(self, kojisession, send_task_message): - kojisession.return_value.getBuild.return_value = { - "extra": {"typeinfo": {"module": {"module_build_service_id": 1}}} - } - scheduler_init_data() - greenwave.decision_update.delay( - "fakemsg", - decision_context="test_dec_context", - subject_identifier="module-testmodule-master-20170109091357-7c29193d-build", - policies_satisfied=False - ) - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-1" - - def test_route_fail_lost_builds_task(self, send_task_message): - fail_lost_builds.delay() - queue = send_task_message.call_args[1].get("queue") - qname = queue.__dict__.get("name") - assert qname == "mbs-default" diff --git a/tests/test_scheduler/test_celery_route_task.py b/tests/test_scheduler/test_celery_route_task.py new file mode 100644 index 0000000..b2ed59e --- /dev/null +++ b/tests/test_scheduler/test_celery_route_task.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT +import mock + +from module_build_service import celery_app, conf +from module_build_service.scheduler.handlers import components, greenwave, modules, repos, tags +from module_build_service.scheduler.producer import fail_lost_builds + +from tests import scheduler_init_data + + +@mock.patch.object(conf, "num_workers", create=True, new=3) +@mock.patch("celery.app.amqp.AMQP.send_task_message") +class TestCeleryRouteTask: + def setup_method(self, test_method): + self.old_task_always_eager = celery_app.conf.get("task_always_eager") + celery_app.conf.update(task_always_eager=False) + + def teardown_method(self, test_method): + celery_app.conf.update(task_always_eager=self.old_task_always_eager) + + def test_route_modules_init_task(self, send_task_message): + modules.init.delay("fakemsg", 2, 0) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_modules_init_task_call_with_kwargs(self, send_task_message): + kwargs = { + "msg_id": "fakemsg", + "module_build_id": 2, + "module_build_state": 0, + } + modules.init.delay(**kwargs) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_modules_wait_task(self, send_task_message): + modules.wait.delay("fakemsg", 3, 1) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-0" + + def test_route_modules_done_task(self, send_task_message): + modules.done.delay("fakemsg", 22, 3) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-1" + + def test_route_modules_failed_task(self, send_task_message): + modules.failed.delay("fakemsg", 23, 4) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_components_build_task_finalize_task(self, send_task_message): + scheduler_init_data() + components.build_task_finalize.delay( + "fakemsg", 90276228, 1, "perl-Tangerine", "0.23", "1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_components_build_task_finalize_task_without_a_module(self, send_task_message): + scheduler_init_data() + components.build_task_finalize.delay( + "fakemsg", 123456, 1, "hostname", "0.1", "1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default" + + def test_route_repos_done_task(self, send_task_message): + scheduler_init_data() + repos.done.delay("fakemsg", "module-testmodule-master-20170109091357-7c29193d-build") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + def test_route_repos_done_task_without_a_module(self, send_task_message): + scheduler_init_data() + repos.done.delay("fakemsg", "no-module-build-exist") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default" + + def test_route_tags_tagged_task(self, send_task_message): + scheduler_init_data() + tags.tagged.delay( + "fakemsg", "module-testmodule-master-20170109091357-7c29193d-build", + "perl-Tangerine-0.23-1.module+f28+2+814cfa39") + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-2" + + @mock.patch("koji.ClientSession") + def test_route_greenwave_decision_update_task(self, kojisession, send_task_message): + kojisession.return_value.getBuild.return_value = { + "extra": {"typeinfo": {"module": {"module_build_service_id": 1}}} + } + scheduler_init_data() + greenwave.decision_update.delay( + "fakemsg", + decision_context="test_dec_context", + subject_identifier="module-testmodule-master-20170109091357-7c29193d-build", + policies_satisfied=False + ) + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-1" + + def test_route_fail_lost_builds_task(self, send_task_message): + fail_lost_builds.delay() + queue = send_task_message.call_args[1].get("queue") + qname = queue.__dict__.get("name") + assert qname == "mbs-default"