#260 Allow deploying ODCS as Celery app.
Merged 5 years ago by jkaluza. Opened 5 years ago by jkaluza.
jkaluza/odcs celery  into  master

@@ -0,0 +1,103 @@ 

+ # -*- coding: utf-8 -*-

+ # Copyright (c) 2019  Red Hat, Inc.

+ #

+ # Permission is hereby granted, free of charge, to any person obtaining a copy

+ # of this software and associated documentation files (the "Software"), to deal

+ # in the Software without restriction, including without limitation the rights

+ # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ # copies of the Software, and to permit persons to whom the Software is

+ # furnished to do so, subject to the following conditions:

+ #

+ # The above copyright notice and this permission notice shall be included in

+ # all copies or substantial portions of the Software.

+ #

+ # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE

+ # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE

+ # SOFTWARE.

+ #

+ # Written by Jan Kaluza <jkaluza@redhat.com>

+ 

+ import os

+ from celery import Celery

+ 

+ from odcs.server import conf, db

+ from odcs.server.backend import (

+     generate_compose as backend_generate_compose,

+     ComposerThread,

+     RemoveExpiredComposesThread)

+ from odcs.server.utils import retry

+ from odcs.server.models import Compose, COMPOSE_STATES

+ 

+ 

+ # Prepare the instances of classes with worker methods.

+ composer_thread = ComposerThread()

+ remove_expired_compose_thread = RemoveExpiredComposesThread()

+ 

+ 

+ # Create the Celery app.

+ if os.environ.get("ODCS_CELERY_BROKER_URL"):

+     broker_url = os.environ["ODCS_CELERY_BROKER_URL"]

+ elif conf.celery_broker_url:

+     broker_url = conf.celery_broker_url

+ else:

+     broker_url = "amqp://localhost"

+ celery_app = Celery("backend", broker=broker_url)

+ celery_app.conf.update(conf.celery_config)

+ 

+ 

+ @celery_app.on_after_configure.connect

+ def setup_periodic_tasks(sender, **kwargs):

+     # Add the cleanup task every 10 minutes. This can be hardcoded here.

+     # It is just internal task to clean up the expired composes, mark

+     # stuck composes as failed and so on.

+     sender.add_periodic_task(10 * 60, run_cleanup.s())

Should this be run_cleanup.s or run_cleanup.s() ?

+ 

+ 

+ @retry(wait_on=RuntimeError)

+ def get_odcs_compose(compose_id):

+     """

+     Gets the compose from ODCS DB.

+     """

+     compose = Compose.query.filter(Compose.id == compose_id).first()

+     if not compose:

+         raise RuntimeError("No compose with id %d in ODCS DB." % compose_id)

+     return compose

+ 

+ 

+ def generate_compose(compose_id):

+     """

+     Generates the compose with id `compose_id`.

+     """

+     compose = get_odcs_compose(compose_id)

+     compose.transition(COMPOSE_STATES["generating"], "Compose thread started")

+     db.session.commit()

+     backend_generate_compose(compose.id)

+ 

+ 

+ @celery_app.task(queue=conf.celery_pungi_composes_queue)

+ def generate_pungi_compose(compose_id):

+     """

+     Generates the Pungi based compose.

+     """

+     generate_compose(compose_id)

+ 

+ 

+ @celery_app.task(queue=conf.celery_pulp_composes_queue)

+ def generate_pulp_compose(compose_id):

+     """

+     Generates the Pungi based compose.

+     """

+     generate_compose(compose_id)

+ 

+ 

+ @celery_app.task

+ def run_cleanup():

+     """

+     Runs the cleanup.

+     """

+     remove_expired_compose_thread.do_work()

+     composer_thread.fail_lost_generating_composes()

file modified
+21 -1
@@ -357,7 +357,27 @@ 

              'default': {},

              'desc': 'Command line argument for raw_config source type, which '

                      'overwrite arguments listed PUNGI_KOJI_ARGS.'

-         }

+         },

+         'celery_config': {

+             'type': dict,

+             'default': {},

+             'desc': 'Configuration dict to pass to Celery.'

+         },

+         'celery_broker_url': {

+             'type': str,

+             'default': "",

+             'desc': 'Celery broker URL'

+         },

+         'celery_pulp_composes_queue': {

+             'type': str,

+             'default': "pulp_composes",

+             'desc': 'Name of the Celery queue for Pulp composes.'

+         },

+         'celery_pungi_composes_queue': {

+             'type': str,

+             'default': "pungi_composes",

+             'desc': 'Name of the Celery queue for Pungi composes.'

+         },

      }

  

      def __init__(self, conf_section_obj):

@@ -39,6 +39,15 @@ 

  from odcs.server.auth import requires_role, login_required

  from odcs.server.auth import require_scopes

  

+ try:

+     from odcs.server.celery_tasks import (

+         generate_pulp_compose, generate_pungi_compose)

+     CELERY_AVAILABLE = True

+ except ImportError:

+     log.exception(

+         "Cannot import celery_tasks. The Celery support is turned off.")

+     CELERY_AVAILABLE = False

+ 

  

  api_v1 = {

      'composes': {
@@ -348,6 +357,12 @@ 

          db.session.flush()

          db.session.commit()

  

+         if CELERY_AVAILABLE:

+             if source_type == PungiSourceType.PULP:

+                 generate_pulp_compose.delay(compose.id)

+             else:

+                 generate_pungi_compose.delay(compose.id)

+ 

          return jsonify(compose.json()), 200

  

      @login_required

file modified
+2 -2
@@ -1,9 +1,9 @@ 

  #!/bin/bash

  

- export PYTHONPATH=./common:./server:./client

+ export PYTHONPATH=`pwd`/common:`pwd`/server:`pwd`/client

  export ODCS_DEVELOPER_ENV=1

  

- python server/odcs//server/manage.py runbackend &

+ celery -A odcs.server.celery_tasks worker --loglevel=info -Q pungi_composes,pulp_composes &

  PIDS[0]=$!

  

  trap "kill ${PIDS[*]}" SIGINT

This should replace fedmsg-hub as a way how frontend communicate
with backends. Celery brings lot of improvements and actually
enables ODCS scalling on Fedora, because fedmsg cannot do
round-robin. We can also have different backends for different
kind of composes and also monitor the backends thanks to Celery.

Example how to start the backend using Celery is in the ./start_odcs_from_here.

pretty please pagure-ci rebuild

5 years ago

Should this be run_cleanup.s or run_cleanup.s() ?

run_cleanup.s() should be correct
Ah! That call must return a curried callable.

:thumbsup:

Pull-Request has been merged by jkaluza

5 years ago