#272 Added custom celery router
Merged a month ago by jkaluza. Opened a month ago by mcurlej.
mcurlej/odcs odcs_celery_router  into  master

@@ -82,6 +82,65 @@ 

  

  celery_app = Celery("backend", broker=broker_url)

  celery_app.conf.update(conf.celery_config)

+ celery_app.conf.update({

+     'task_routes': ('odcs.server.celery_tasks.TaskRouter')

+ })

+ 

+ 

+ class TaskRouter:

+     """ Custom Celery router """

+ 

+     def __init__(self):

+         self.config = conf.celery_router_config

+ 

+     def route_for_task(self, task_name, *args, **kwargs):

+         """

+         Method which celery expects to be defined on a custom router. Returns the payload

+         with the queue selected for task

+         """

+         if task_name == self.config["cleanup_task"]:

+             return {"queue": conf.celery_cleanup_queue}

+ 

+         compose_id = args[0][0]

+         compose = get_odcs_compose(compose_id)

+         compose_md = compose.json()

+ 

+         queue = self.__get_queue_for_compose(compose_md, task_name)

+ 

+         return {"queue": queue}

+ 

+     def __get_queue_for_compose(self, compose_md, task_name):

+         """ Goes through routing rules configured for a task returns a queue on the first match. """

+         rules = {}

+         if self.config["routing_rules"].get(task_name):

+             rules.update(self.config["routing_rules"][task_name])

+ 

+         for queue, rule in rules.items():

+             # if the rule has no properties its an automatic match.

+             if rule:

+                 for key, value in rule.items():

+                     if not compose_md.get(key):

+                         raise ValueError(

+                             ("Task Router: Routing rule for queue %s for task %s contains an "

+                              "invalid property: %s") % (queue, task_name, key))

+ 

+                     # if the value of the property from the rule and compose does not match, the

+                     # whole rule is ignored and we go to the next rule

+                     if type(value) is list:

+                         if compose_md[key] not in value:

+                             break

+                     else:

+                         if compose_md[key] != value:

+                             break

+                 else:

+                     # if all of the properties and values match then the whole rule match

+                     return queue

+             else:

+                 # if the rule is emtpy its an wildcard and an automatic match

+                 return queue

+ 

+         # if none of the rules applies the default queue is returned

+         return self.config["default_queue"]

  

  

  @celery_app.on_after_configure.connect

@@ -113,7 +172,7 @@ 

      backend_generate_compose(compose.id)

  

  

- @celery_app.task(queue=conf.celery_pungi_composes_queue)

+ @celery_app.task

  def generate_pungi_compose(compose_id):

      """

      Generates the Pungi based compose.

@@ -121,7 +180,7 @@ 

      generate_compose(compose_id)

  

  

- @celery_app.task(queue=conf.celery_pulp_composes_queue)

+ @celery_app.task

  def generate_pulp_compose(compose_id):

      """

      Generates the Pungi based compose.

@@ -129,7 +188,7 @@ 

      generate_compose(compose_id)

  

  

- @celery_app.task(queue=conf.celery_cleanup_queue)

+ @celery_app.task

  def run_cleanup():

      """

      Runs the cleanup.

@@ -383,6 +383,26 @@ 

              'default': "cleanup",

              'desc': 'Name of the Celery queue for cleanup task.'

          },

+         'celery_router_config': {

+             'type': dict,

+             'default': {

+                 "routing_rules": {

+                     "odcs.server.celery_tasks.generate_pungi_compose": {

+                         "pungi_composes": {

+                             "source_type": 3,

+                         },

+                     },

+                     "odcs.server.celery_tasks.generate_pulp_compose": {

+                         "pulp_composes": {

+                             "source_type": 4,

+                         },

+                     },

+                 },

+                 "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+                 "default_queue": "pungi_composes",

+             },

+             'desc': 'Configuration for custom celery router.'

+         }

      }

  

      def __init__(self, conf_section_obj):

@@ -512,3 +532,18 @@ 

          if not os.path.isfile(s):

              raise ValueError("Pungi config template doesn't exist: %s" % s)

          self._pungi_conf_path = s

+ 

+     def _setifok_celery_router_config(self, celery_router_config):

+         if type(celery_router_config) != dict:

+             raise TypeError("celery_router_config must be a dict.")

+ 

+         required_config_keys = ["routing_rules", "cleanup_task", "default_queue"]

+ 

+         for conf_key in required_config_keys:

+             if not celery_router_config.get(conf_key):

+                 raise KeyError("celery_router_config is missing %s" % conf_key)

+ 

+         if type(celery_router_config["routing_rules"]) != dict:

+             raise TypeError("routing_rules must be a dict.")

+ 

+         self._celery_router_config = celery_router_config

file modified
+1

@@ -25,3 +25,4 @@ 

  pygobject

  koji

  pyldap

+ celery 

\ No newline at end of file

@@ -0,0 +1,211 @@ 

+ from mock import patch, Mock

+ 

+ import pytest

+ 

+ from odcs.server import conf

+ from odcs.server.celery_tasks import TaskRouter

+ 

+ 

+ class TestCeleryRouter():

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_empty_rule(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "odcs.server.celery_tasks.generate_pungi_compose": {

+                     "pungi_composes": {},

+                     "other_composes": {

+                         "source_type": 4,

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         queue = tr.route_for_task("odcs.server.celery_tasks.generate_pungi_compose",

+                                   *args, **kwargs)

+         assert queue == {"queue": "pungi_composes"}

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_default_queue(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "some.other.task": {

+                     "pungi_composes": {},

+                     "other_composes": {

+                         "source_type": 4,

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         queue = tr.route_for_task("odcs.server.celery_tasks.generate_pungi_compose",

+                                   *args, **kwargs)

+         assert queue == {"queue": "default_queue"}

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_rule_with_single_property(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "odcs.server.celery_tasks.generate_pungi_compose": {

+                     "pungi_composes": {

+                         "source_type": 3,

+                     },

+                     "other_composes": {

+                         "source_type": 4,

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         queue = tr.route_for_task("odcs.server.celery_tasks.generate_pungi_compose",

+                                   *args, **kwargs)

+         assert queue == {"queue": "pungi_composes"}

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_rule_with_list_property(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3,

+             "user": "mprahl",

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "odcs.server.celery_tasks.generate_pungi_compose": {

+                     "pungi_composes": {

+                         "source_type": 3,

+                         "user": ["mcurlej", "jkaluza"],

+                     },

+                     "other_composes": {

+                         "source_type": 3,

+                         "user": ["mprahl", "lucarval"],

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         queue = tr.route_for_task("odcs.server.celery_tasks.generate_pungi_compose",

+                                   *args, **kwargs)

+         assert queue == {"queue": "other_composes"}

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_cleanup_queue(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "odcs.server.celery_tasks.generate_pungi_compose": {

+                     "pungi_composes": {

+                         "source_type": 3,

+                     },

+                     "other_composes": {

+                         "source_type": 4,

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         queue = tr.route_for_task("odcs.server.celery_tasks.run_cleanup",

+                                   *args, **kwargs)

+         assert queue == {"queue": conf.celery_cleanup_queue}

+ 

+     @patch("odcs.server.celery_tasks.get_odcs_compose")

+     def test_invalid_rule_property_exception(self, mock_get_compose):

+         mock_compose = Mock()

+ 

+         compose_md = {

+             "source_type": 3

+         }

+ 

+         mock_conf = {

+             "routing_rules": {

+                 "odcs.server.celery_tasks.generate_pungi_compose": {

+                     "pungi_composes": {

+                         "bad_compose_prop": 3,

+                     },

+                 },

+             },

+             "cleanup_task": "odcs.server.celery_tasks.run_cleanup",

+             "default_queue": "default_queue",

+         }

+ 

+         tr = TaskRouter()

+         tr.config = mock_conf

+ 

+         mock_compose.json.return_value = compose_md

+         mock_get_compose.return_value = mock_compose

+         args = [[1], {}]

+         kwargs = {}

+         with pytest.raises(ValueError) as e:

+             tr.route_for_task("odcs.server.celery_tasks.generate_pungi_compose",

+                               *args, **kwargs)

+             assert "invalid property" in e.args[0]

+             assert "bad_compose_prop" in e.args[0]

This change was needed, so ODCS can route task to different queues, which will
forward the task to different execution backends. The routing happens according
to the values of compose metadata which are compared to the router rules.

Signed-off-by: Martin Curlej mcurlej@redhat.com

@jkaluza @mprahl @lucarval Can you take a look?

Can you change this to use mprahl, so we match the second rule (the other_composes queue)? I would like to test it skips the first rule in this case ;).

Can this be done like this?

if type(value) is list and compose_md[key] not in value:
    break
elif compose_md[key] != value:
    break

I was thinking about addding _setifok method for this, but maybe it's not worth doing so... It could check just that routing_rules, cleanup_task and default_queue is set...

@jkaluza if i do that, then when a value is a list and compose[key] is in value then the second if will break as compose_md[key] (is a string) is not equal value (list), which it should not.

rebased onto 44dae25

a month ago

Pull-Request has been merged by jkaluza

a month ago