#1543 Rebase v3 branch on top of latest master branch
Closed a month ago by qwan. Opened a month ago by qwan.
qwan/fm-orchestrator rebase-v3  into  v3

file modified
+1

@@ -24,3 +24,4 @@ 

  mbstest.db

  htmlcov/

  test.env.yaml

+ report.html

file modified
+24 -73

@@ -7,73 +7,36 @@ 

      echo "export MODULE_BUILD_SERVICE_DEVELOPER_ENV=1" > /etc/profile.d/module_build_service_developer_env.sh

      source /etc/profile.d/module_build_service_developer_env.sh

      dnf install -y \

-         fedmsg-hub \

-         fedmsg-relay \

-         fedpkg \

-         gcc \

-         gcc-c++ \

-         git \

-         koji \

-         krb5-devel \

-         krb5-workstation \

-         libffi-devel \

-         mock-scm \

-         openssl-devel \

-         python \

-         python-devel \

-         python2-dnf \

-         python-docutils \

-         python-flask \

-         python2-libmodulemd \

-         python-m2ext \

-         python-mock \

-         python-qpid \

-         python-solv \

-         python-sqlalchemy \

-         python-futures \

-         python2-pungi \

-         python3 \

-         python3-devel \

-         python3-docutils \

-         python3-pungi \

-         python3-virtualenv \

-         redhat-rpm-config \

-         redhat-rpm-config \

-         rpm-build \

-         swig \

-         sqlite \

-         bash-completion \

-         wget \

-         which

- 

+       bash-completion \

+       python3-celery \

+       python3-flake8 \

+       python3-mock \

+       python3-pytest \

+       python3-pytest-cov \

+       python3-tox \

+       rpm-build \

+       sqlite

+     # Install the runtime dependencies from the module-build-service spec file

+     curl -s https://src.fedoraproject.org/rpms/module-build-service/raw/master/f/module-build-service.spec -o /tmp/module-build-service.spec

+     dnf install -y $(rpmspec --parse /tmp/module-build-service.spec  | grep ^Requires: | tr -s ' ' | cut -d ' ' -f2)

      mbs_config_dir=/etc/module-build-service

      [ -e "$mbs_config_dir" ] || mkdir "$mbs_config_dir"

-     cp -r /opt/module_build_service/conf/* "$mbs_config_dir"

+     cd /opt/module_build_service

+     cp -r conf/* "$mbs_config_dir"

+ 

+     # Workaround because python3-koji has no egg-info file

+     sed -i '/koji/d' requirements.txt

+     # Remove Python 2 only dependencies

+     sed -i '/futures/d' requirements.txt

+     sed -i '/enum34/d' requirements.txt

+ 

+     python3 setup.py develop --no-deps

+     python3 setup.py egg_info

  SCRIPT

  

  $make_devenv = <<DEVENV

    set -e

-   env_dir=~/devenv

-   pip=${env_dir}/bin/pip

-   py=${env_dir}/bin/python

    code_dir=/opt/module_build_service

- 

-   test -e $env_dir && rm -rf $env_dir

- 

-   # solv is not availabe from pypi.org. libsolv has to be installed by dnf.

-   (cd; virtualenv -p python2 --system-site-packages devenv)

- 

-   $pip install --upgrade pip kobo

-   $pip install -r $code_dir/test-requirements.txt

-   $pip install ipython

- 

-   cd $code_dir

-   $py setup.py develop

-   $py setup.py egg_info

- 

-   if ! grep ". $env_dir/bin/activate" ~/.bashrc >/dev/null; then

-       echo ". $env_dir/bin/activate" >> ~/.bashrc

-   fi

    if ! grep "^cd $code_dir" ~/.bashrc >/dev/null; then

        # Go to working directory after login

        echo "cd $code_dir" >> ~/.bashrc

@@ -81,7 +44,7 @@ 

  DEVENV

  

  $config_pgsql = <<PGSQL

- dnf install -y postgresql postgresql-server python2-psycopg2

+ dnf install -y postgresql postgresql-server python3-psycopg2

  

  pg_hba_conf=/var/lib/pgsql/data/pg_hba.conf

  

@@ -112,22 +75,12 @@ 

  psql -U postgres -h 127.0.0.1 -c "DROP DATABASE IF EXISTS mbstest"

  psql -U postgres -h 127.0.0.1 -c "CREATE DATABASE mbstest"

  

- bashrc=/home/vagrant/.bashrc

- 

  echo "******** Run Tests with PostgreSQL ********"

  echo "Set this environment variable to test with PostgreSQL"

  echo "export DATABASE_URI=postgresql+psycopg2://postgres:@127.0.0.1/mbstest"

  echo

  PGSQL

  

- $script_services = <<SCRIPT_SERVICES

-     bin_dir=~/devenv/bin

-     cd /opt/module_build_service

-     $bin_dir/mbs-upgradedb > /tmp/mbs-base.out 2>&1

-     $bin_dir/fedmsg-relay < /dev/null >& /tmp/fedmsg-relay.out &

-     $bin_dir/fedmsg-hub < /dev/null >& /tmp/fedmsg-hub.out &

-     $bin_dir/mbs-frontend < /dev/null >& /tmp/mbs-frontend.out &

- SCRIPT_SERVICES

  

  Vagrant.configure("2") do |config|

    config.vm.box = "fedora/31-cloud-base"

@@ -135,12 +88,10 @@ 

    # Disable the default share

    config.vm.synced_folder ".", "/vagrant", disabled: true

    config.vm.network "forwarded_port", guest_ip: "0.0.0.0", guest: 5000, host: 5000

-   config.vm.network "forwarded_port", guest_ip: "0.0.0.0", guest: 13747, host: 13747

    config.vm.provision "shell", inline: $script

    config.vm.provision "shell", inline: "usermod -a -G mock vagrant"

    config.vm.provision "shell", inline: $config_pgsql

    config.vm.provision "shell", inline: $make_devenv, privileged: false

-   config.vm.provision "shell", inline: $script_services, privileged: false, run: "always"

    config.vm.provider "libvirt" do |v, override|

      override.vm.synced_folder "./", "/opt/module_build_service", type: "sshfs"

      v.memory = 1024

file modified
+23 -35

@@ -19,26 +19,15 @@ 

      HOST = "0.0.0.0"

      PORT = 5000

  

-     # Global network-related values, in seconds

-     NET_TIMEOUT = 120

-     NET_RETRY_INTERVAL = 30

- 

-     SYSTEM = "koji"

-     MESSAGING = "fedmsg"  # or amq

      MESSAGING_TOPIC_PREFIX = ["org.fedoraproject.prod"]

      KOJI_CONFIG = "/etc/module-build-service/koji.conf"

      KOJI_PROFILE = "koji"

      ARCHES = ["i686", "armv7hl", "x86_64"]

-     ALLOW_ARCH_OVERRIDE = False

      KOJI_REPOSITORY_URL = "https://kojipkgs.fedoraproject.org/repos"

-     KOJI_TAG_PREFIXES = ["module", "scrmod"]

-     KOJI_ENABLE_CONTENT_GENERATOR = True

-     CHECK_FOR_EOL = False

      PDC_URL = "https://pdc.fedoraproject.org/rest_api/v1"

      PDC_INSECURE = False

      PDC_DEVELOP = True

      SCMURLS = ["https://src.fedoraproject.org/modules/"]

-     YAML_SUBMIT_ALLOWED = False

  

      # How often should we resort to polling, in seconds

      # Set to zero to disable polling

@@ -48,23 +37,10 @@ 

      # and be in the build state at a time. Set this to 0 for no restrictions

      NUM_CONCURRENT_BUILDS = 5

  

-     ALLOW_CUSTOM_SCMURLS = False

- 

      RPMS_DEFAULT_REPOSITORY = "https://src.fedoraproject.org/rpms/"

-     RPMS_ALLOW_REPOSITORY = False

      RPMS_DEFAULT_CACHE = "http://pkgs.fedoraproject.org/repo/pkgs/"

-     RPMS_ALLOW_CACHE = False

  

      MODULES_DEFAULT_REPOSITORY = "https://src.fedoraproject.org/modules/"

-     MODULES_ALLOW_REPOSITORY = False

-     MODULES_ALLOW_SCRATCH = False

- 

-     ALLOWED_GROUPS = {"packager"}

- 

-     ALLOWED_GROUPS_TO_IMPORT_MODULE = set()

- 

-     # Available backends are: console and file

-     LOG_BACKEND = "console"

  

      # Path to log file when LOG_BACKEND is set to "file".

      LOG_FILE = "module_build_service.log"

@@ -89,14 +65,26 @@ 

      AMQ_PRIVATE_KEY_FILE = "/etc/module_build_service/msg-m8y-client.key"

      AMQ_TRUSTED_CERT_FILE = "/etc/module_build_service/Root-CA.crt"

  

-     # Disable Client Authorization

-     NO_AUTH = False

+     # Configs for running tasks asynchronously with Celery

+     # For details of Celery configs, refer to Celery documentation:

+     # https://docs.celeryproject.org/en/latest/userguide/configuration.html

+     #

+     # Each config name consists of namespace CELERY_ and the new Celery config

+     # name converted to upper case. For example the broker url, Celery config

+     # name is broker_url, then as you can below, the corresponding config name

+     # in MBS is CELERY_BROKER_URL.

+     CELERY_BROKER_URL = ""

+     CELERY_RESULT_BACKEND = ""

+     CELERY_IMPORTS = [

+         "module_build_service.scheduler.handlers.components",

+         "module_build_service.scheduler.handlers.modules",

+         "module_build_service.scheduler.handlers.repos",

+         "module_build_service.scheduler.handlers.tags",

+         "module_build_service.scheduler.handlers.greenwave",

+     ]

  

  

  class TestConfiguration(BaseConfiguration):

-     BUILD_LOGS_DIR = "/tmp"

-     BUILD_LOGS_NAME_FORMAT = "build-{id}.log"

-     LOG_BACKEND = "console"

      LOG_LEVEL = "debug"

      SQLALCHEMY_DATABASE_URI = environ.get(

          "DATABASE_URI", "sqlite:///{0}".format(path.join(dbdir, "mbstest.db")))

@@ -117,8 +105,6 @@ 

  

      KOJI_REPOSITORY_URL = "https://kojipkgs.stg.fedoraproject.org/repos"

      SCMURLS = ["https://src.stg.fedoraproject.org/modules/"]

-     AUTH_METHOD = "oidc"

-     RESOLVER = "db"

  

      ALLOWED_GROUPS_TO_IMPORT_MODULE = {"mbs-import-module"}

  

@@ -129,6 +115,9 @@ 

  

      STREAM_SUFFIXES = {r"^el\d+\.\d+\.\d+\.z$": 0.1}

  

+     # Ensures task.delay executes locally instead of scheduling a task to a queue.

+     CELERY_TASK_ALWAYS_EAGER = True

+ 

  

  class ProdConfiguration(BaseConfiguration):

      pass

@@ -139,9 +128,6 @@ 

      LOG_LEVEL = "debug"

      MESSAGING = "in_memory"

  

-     ARCH_AUTODETECT = True

-     ARCH_FALLBACK = "x86_64"

- 

      ALLOW_CUSTOM_SCMURLS = True

      RESOLVER = "mbs"

      RPMS_ALLOW_REPOSITORY = True

@@ -154,4 +140,6 @@ 

  

  class DevConfiguration(LocalBuildConfiguration):

      DEBUG = True

-     LOG_BACKEND = "console"

+ 

+     CELERY_BROKER_URL = "redis://localhost:6379/0"

+     CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

file modified
+6 -1

@@ -12,6 +12,7 @@ 

  # --with-pgsql: run tests with PostgreSQL, otherwise SQLite is used.

  # --no-tty: don't use tty for containers

  # --sudo: run Docker via sudo

+ # --podman: use Podman instead of Docker

  # --no-pull: don't update Docker images

  #

  # Please note that, both of them can have arbitrary value as long as one of

@@ -22,6 +23,7 @@ 

  with_pgsql=

  no_tty=

  use_sudo=

+ use_podman=

  do_pull=1

  

  while (( "$#" )); do

@@ -30,6 +32,7 @@ 

          --with-pgsql) with_pgsql=1 ;;

          --no-tty) no_tty=1 ;;

          --sudo) use_sudo=1 ;;

+         --podman) use_podman=1 ;;

          --no-pull) do_pull= ;;

          *) break ;;

      esac

@@ -58,7 +61,9 @@ 

  if [ -n "$with_pgsql" ]; then

      test_container_name="${test_container_name}-pgsql"

  fi

- if [ -n "$use_sudo" ]; then

+ if [ -n "$use_podman" ]; then

+     docker="podman"

+ elif [ -n "$use_sudo" ]; then

      # use sudo for docker

      docker="sudo /usr/bin/docker"

  else

file modified
+2

@@ -42,6 +42,8 @@ 

      python-solv \

      python-sqlalchemy \

      python-tox \

+     python2-distro \

+     python2-celery \

      python2-libmodulemd2 \

      python2-pyyaml \

      python2-pungi \

@@ -9,6 +9,8 @@ 

      git-core \

      createrepo_c \

      rsync \

+     python3-distro \

+     python3-celery \

      python3-fedmsg \

      python3-kobo-rpmlib \

      python3-rpm \

file modified
+1 -1

@@ -22,4 +22,4 @@ 

  

  # Since tox seems to ignore `usedevelop` when we have `sitepackages` on, we have to run it manually

  python3 setup.py develop --no-deps

- /usr/bin/tox -e flake8,py3 "$@"

+ /usr/bin/tox -e flake8,py3,intflake "$@"

file modified
+12

@@ -1,6 +1,18 @@ 

  Change Log

  ==========

  

+ v2.30.4

+ -------

+ * allow component reuse in some cases when a component is added

+ 

+ v2.30.3

+ -------

+ * Fix a local build bug caused by the refactoring of how database sessions are handled

+ 

+ v2.30.2

+ -------

+ * Fixed bugs that caused local builds to fail on Fedora 31

+ 

  v2.30.1

  -------

  * Fixed a bug that caused local builds to fail depending on the version of DNF being used

file modified
+1

@@ -31,6 +31,7 @@ 

  * ``--with-pgsql``: run tests with PostgreSQL database.

  * ``--no-tty``: don't use tty for containers

  * ``--sudo``: run Docker via sudo

+ * ``--podman``: use Podman instead of Docker

  * ``--no-pull``: don't update Docker images

  

  For example, ``contrib/run-unittests.sh --py3 --with-pgsql``.

@@ -19,10 +19,12 @@ 

  """

  

  import pkg_resources

+ from celery import Celery

  from flask import Flask, has_app_context, url_for

  from flask_sqlalchemy import SQLAlchemy

  from sqlalchemy.pool import StaticPool

  from logging import getLogger

+ 

  import gi  # noqa

  gi.require_version("Modulemd", "2.0")  # noqa

  from gi.repository import Modulemd  # noqa

@@ -46,6 +48,17 @@ 

  

  conf = init_config(app)

  

+ celery_app = Celery("module-build-service")

+ # Convert config names specific for Celery like this:

+ # celery_broker_url -> broker_url

+ celery_configs = {

+     name[7:]: getattr(conf, name)

+     for name in dir(conf) if name.startswith("celery_")

+ }

+ # Only allow a single process so that tasks are always serial per worker

+ celery_configs["worker_concurrency"] = 1

+ celery_app.conf.update(**celery_configs)

+ 

  

  class MBSSQLAlchemy(SQLAlchemy):

      """

@@ -1,6 +1,7 @@ 

  # -*- coding: utf-8 -*-

  # SPDX-License-Identifier: MIT

  import calendar

+ import distro

  import hashlib

  import logging

  import json

@@ -293,12 +294,12 @@ 

  

      def _get_buildroot(self):

          version = pkg_resources.get_distribution("module-build-service").version

-         distro = platform.linux_distribution()

+         distro_info = distro.linux_distribution()

          ret = {

              u"id": 1,

              u"host": {

                  u"arch": text_type(platform.machine()),

-                 u"os": u"%s %s" % (distro[0], distro[1]),

+                 u"os": u"%s %s" % (distro_info[0], distro_info[1]),

              },

              u"content_generator": {

                  u"name": u"module-build-service",

@@ -24,12 +24,12 @@ 

  import module_build_service.scm

  import module_build_service.utils

  from module_build_service.builder.utils import execute_cmd

- from module_build_service.builder.koji_backports import ClientSession as KojiClientSession

  from module_build_service.db_session import db_session

  from module_build_service.errors import ProgrammingError

  

- from module_build_service.builder.base import GenericBuilder

+ from module_build_service.builder import GenericBuilder

  from module_build_service.builder.KojiContentGenerator import KojiContentGenerator

+ from module_build_service.scheduler import events

  from module_build_service.utils import get_reusable_components, get_reusable_module, set_locale

  

  logging.basicConfig(level=logging.DEBUG)

@@ -487,7 +487,7 @@ 

  

          address = koji_config.server

          log.info("Connecting to koji %r.", address)

-         koji_session = KojiClientSession(address, opts=koji_config)

+         koji_session = koji.ClientSession(address, opts=koji_config)

  

          if not login:

              return koji_session

@@ -528,11 +528,13 @@ 

          # only if we are creating the build_tag for first time.

          build_tag_exists = self.koji_session.getTag(self.tag_name + "-build")

  

+         tag_perm = self.config.koji_tag_permission

+ 

          # Create or update individual tags

          # the main tag needs arches so pungi can dump it

-         self.module_tag = self._koji_create_tag(self.tag_name, self.arches, perm="admin")

+         self.module_tag = self._koji_create_tag(self.tag_name, self.arches, perm=tag_perm)

          self.module_build_tag = self._koji_create_tag(

-             self.tag_name + "-build", self.arches, perm="admin")

+             self.tag_name + "-build", self.arches, perm=tag_perm)

  

          buildopts = self.mmd.get_buildopts()

          if buildopts and buildopts.get_rpm_whitelist():

@@ -698,6 +700,11 @@ 

          :param component_build: a ComponentBuild object

          :return: a list of msgs that MBS needs to process

          """

+         # Imported here because of circular dependencies.

+         from module_build_service.scheduler.handlers.tags import tagged as tagged_handler

+         from module_build_service.scheduler.handlers.components import (

+             build_task_finalize as build_task_finalize_handler)

+ 

          opts = {"latest": True, "package": component_build.package, "inherit": False}

          build_tagged = self.koji_session.listTagged(self.module_build_tag["name"], **opts)

          dest_tagged = None

@@ -725,10 +732,10 @@ 

                      nvr = "{name}-{version}-{release}".format(**untagged_build)

                      build = self.koji_session.getBuild(nvr)

                      break

-         further_work = []

-         # If the build doesn't exist, then return

+ 

+         # If the build doesn't exist, then return False

          if not build:

-             return further_work

+             return False

  

          # Start setting up MBS' database to use the existing build

          log.info('Skipping build of "{0}" since it already exists.'.format(build["nvr"]))

@@ -739,19 +746,11 @@ 

          component_build.state_reason = "Found existing build"

          nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)

          # Trigger a completed build message

-         further_work.append(

-             module_build_service.messaging.KojiBuildChange(

-                 "recover_orphaned_artifact: fake message",

-                 build["build_id"],

-                 build["task_id"],

-                 koji.BUILD_STATES["COMPLETE"],

-                 component_build.package,

-                 nvr_dict["version"],

-                 nvr_dict["release"],

-                 component_build.module_build.id,

-             )

-         )

- 

+         args = (

+             "recover_orphaned_artifact: fake message", build["build_id"], build["task_id"],

+             koji.BUILD_STATES["COMPLETE"], component_build.package, nvr_dict["version"],

+             nvr_dict["release"], component_build.module_build.id, None)

+         events.scheduler.add(build_task_finalize_handler, args)

          component_tagged_in = []

          if build_tagged:

              component_tagged_in.append(self.module_build_tag["name"])

@@ -770,15 +769,11 @@ 

                  'The build being skipped isn\'t tagged in the "{0}" tag. Will send a message to '

                  "the tag handler".format(tag)

              )

-             further_work.append(

-                 module_build_service.messaging.KojiTagChange(

-                     "recover_orphaned_artifact: fake message",

-                     tag,

-                     component_build.package,

-                     component_build.nvr,

-                 )

-             )

-         return further_work

+             args = ("recover_orphaned_artifact: fake message", tag, component_build.package,

+                     component_build.nvr)

+             events.scheduler.add(tagged_handler, args)

+ 

+         return True

  

      def build(self, artifact_name, source):

          """

@@ -1336,6 +1331,9 @@ 

              the module build in the build system.

          :return: list of architectures

          """

+         if not module.koji_tag:

+             log.warning("No Koji tag associated with module %r", module)

+             return []

          koji_session = KojiModuleBuilder.get_session(conf, login=False)

          tag = koji_session.getTag(module.koji_tag)

          if not tag:

@@ -14,9 +14,8 @@ 

  import module_build_service.scm

  import module_build_service.utils

  import module_build_service.scheduler

- import module_build_service.scheduler.consumer

  

- from module_build_service.builder.base import GenericBuilder

+ from module_build_service.builder import GenericBuilder

  from module_build_service.builder.utils import (

      create_local_repo_from_koji_tag,

      execute_cmd,

@@ -26,6 +25,7 @@ 

  from module_build_service.utils.general import mmd_to_str

  from module_build_service.db_session import db_session

  from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder

+ from module_build_service.scheduler import events

  

  from module_build_service import models

  

@@ -109,7 +109,13 @@ 

              os.makedirs(self.configdir)

  

          # Generate path to mock config and add local repository there.

-         self._add_repo("localrepo", "file://" + self.resultsdir, "metadata_expire=1\n")

+         # Set skip_if_unavailable=True since the repo isn't available until after

+         # module-build-macros is built.

+         self._add_repo(

+             "localrepo",

+             "file://" + self.resultsdir,

+             "metadata_expire=1\nskip_if_unavailable=True\n",

+         )

  

          # Remove old files from the previous build of this tag but only

          # before the first build is done, otherwise we would remove files

@@ -311,6 +317,7 @@ 

          pass

  

      def buildroot_add_artifacts(self, artifacts, install=False):

+         from module_build_service.scheduler.handlers.repos import done as repos_done_handler

          self._createrepo()

  

          # TODO: This is just hack to install module-build-macros into the

@@ -323,9 +330,7 @@ 

                  self.groups.append("module-build-macros")

                  self._write_mock_config()

  

-         from module_build_service.scheduler.consumer import fake_repo_done_message

- 

-         fake_repo_done_message(self.tag_name)

+         events.scheduler.add(repos_done_handler, ("fake_msg", self.tag_name + "-build"))

  

      def tag_artifacts(self, artifacts):

          pass

@@ -367,22 +372,28 @@ 

                  repo_name = tag = source

                  koji_config = get_koji_config(self.config)

                  koji_session = koji.ClientSession(koji_config.server, opts=koji_config)

+                 # Check to see if there are any external repos tied to the tag

+                 for ext_repo in koji_session.getTagExternalRepos(tag):

+                     self._add_repo(ext_repo["external_repo_name"], ext_repo["url"])

+ 

                  repo = koji_session.getRepo(repo_name)

                  if repo:

                      baseurl = koji.PathInfo(topdir=koji_config.topurl).repo(repo["id"], repo_name)

                      baseurl = "{0}/{1}/".format(baseurl, self.arch)

                  else:

                      repo_dir = os.path.join(self.config.cache_dir, "koji_tags", tag)

-                     create_local_repo_from_koji_tag(

+                     should_add_repo = create_local_repo_from_koji_tag(

                          self.config, tag, repo_dir, [self.arch, "noarch"])

+                     if not should_add_repo:

+                         continue

                      baseurl = "file://" + repo_dir

-                 # Check to see if there are any external repos tied to the tag

-                 for ext_repo in koji_session.getTagExternalRepos(repo_name):

-                     self._add_repo(ext_repo["external_repo_name"], ext_repo["url"])

+ 

              self._add_repo(repo_name, baseurl)

          self._write_mock_config()

  

      def _send_build_change(self, state, source, build_id):

+         from module_build_service.scheduler.handlers.components import (

+             build_task_finalize as build_task_finalize_handler)

          try:

              nvr = kobo.rpmlib.parse_nvr(source)

          except ValueError:

@@ -390,16 +401,10 @@ 

  

          # build_id=1 and task_id=1 are OK here, because we are building just

          # one RPM at the time.

-         msg = module_build_service.messaging.KojiBuildChange(

-             msg_id="a faked internal message",

-             build_id=build_id,

-             task_id=build_id,

-             build_name=nvr["name"],

-             build_new_state=state,

-             build_release=nvr["release"],

-             build_version=nvr["version"],

-         )

-         module_build_service.scheduler.consumer.work_queue_put(msg)

+         args = (

+             "a faked internal message", build_id, build_id, state, nvr["name"], nvr["version"],

+             nvr["release"], None, None)

+         events.scheduler.add(build_task_finalize_handler, args)

  

      def _save_log(self, resultsdir, log_name, artifact_name):

          old_log = os.path.join(resultsdir, log_name)

@@ -1,100 +0,0 @@ 

- # -*- coding: utf-8 -*-

- # SPDX-License-Identifier: MIT

- # flake8: noqa

- import base64

- import traceback

- 

- import koji

- # Import krbV from here so we don't have to redo the whole try except that Koji does

- from koji import krbV, PythonImportError, AuthError, AUTHTYPE_KERB

- 

- 

- class ClientSession(koji.ClientSession):

-     """The koji.ClientSession class with patches from upstream."""

- 

-     # This backport comes from https://pagure.io/koji/pull-request/1187

-     def krb_login(self, principal=None, keytab=None, ccache=None, proxyuser=None, ctx=None):

-         """Log in using Kerberos.  If principal is not None and keytab is

-         not None, then get credentials for the given principal from the given keytab.

-         If both are None, authenticate using existing local credentials (as obtained

-         from kinit).  ccache is the absolute path to use for the credential cache. If

-         not specified, the default ccache will be used.  If proxyuser is specified,

-         log in the given user instead of the user associated with the Kerberos

-         principal.  The principal must be in the "ProxyPrincipals" list on

-         the server side.  ctx is the Kerberos context to use, and should be unique

-         per thread.  If ctx is not specified, the default context is used."""

- 

-         try:

-             # Silently try GSSAPI first

-             if self.gssapi_login(principal, keytab, ccache, proxyuser=proxyuser):

-                 return True

-         except Exception as e:

-             if krbV:

-                 e_str = ''.join(traceback.format_exception_only(type(e), e))

-                 self.logger.debug('gssapi auth failed: %s', e_str)

-                 pass

-             else:

-                 raise

- 

-         if not krbV:

-             raise PythonImportError(

-                 "Please install python-krbV to use kerberos."

-             )

- 

-         if not ctx:

-             ctx = krbV.default_context()

- 

-         if ccache != None:

-             ccache = krbV.CCache(name=ccache, context=ctx)

-         else:

-             ccache = ctx.default_ccache()

- 

-         if principal != None:

-             if keytab != None:

-                 cprinc = krbV.Principal(name=principal, context=ctx)

-                 keytab = krbV.Keytab(name=keytab, context=ctx)

-                 ccache.init(cprinc)

-                 ccache.init_creds_keytab(principal=cprinc, keytab=keytab)

-             else:

-                 raise AuthError('cannot specify a principal without a keytab')

-         else:

-             # We're trying to log ourself in.  Connect using existing credentials.

-             cprinc = ccache.principal()

- 

-         self.logger.debug('Authenticating as: %s', cprinc.name)

-         sprinc = krbV.Principal(name=self._serverPrincipal(cprinc), context=ctx)

- 

-         ac = krbV.AuthContext(context=ctx)

-         ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE | krbV.KRB5_AUTH_CONTEXT_DO_TIME

-         ac.rcache = ctx.default_rcache()

- 

-         # create and encode the authentication request

-         (ac, req) = ctx.mk_req(server=sprinc, client=cprinc,

-                                auth_context=ac, ccache=ccache,

-                                options=krbV.AP_OPTS_MUTUAL_REQUIRED)

-         req_enc = base64.encodestring(req)

- 

-         # ask the server to authenticate us

-         (rep_enc, sinfo_enc, addrinfo) = self.callMethod('krbLogin', req_enc, proxyuser)

-         # Set the addrinfo we received from the server

-         # (necessary before calling rd_priv())

-         # addrinfo is in (serveraddr, serverport, clientaddr, clientport)

-         # format, so swap the pairs because clientaddr is now the local addr

-         ac.addrs = tuple((addrinfo[2], addrinfo[3], addrinfo[0], addrinfo[1]))

- 

-         # decode and read the reply from the server

-         rep = base64.decodestring(rep_enc)

-         ctx.rd_rep(rep, auth_context=ac)

- 

-         # decode and decrypt the login info

-         sinfo_priv = base64.decodestring(sinfo_enc)

-         sinfo_str = ac.rd_priv(sinfo_priv)

-         sinfo = dict(zip(['session-id', 'session-key'], sinfo_str.split()))

- 

-         if not sinfo:

-             self.logger.warn('No session info received')

-             return False

-         self.setSession(sinfo)

- 

-         self.authtype = AUTHTYPE_KERB

-         return True

@@ -72,6 +72,8 @@ 

      Downloads the packages build for one of `archs` (defaults to ['x86_64',

      'noarch']) in Koji tag `tag` to `repo_dir` and creates repository in that

      directory. Needs config.koji_profile and config.koji_config to be set.

+ 

+     If the there are no builds associated with the tag, False is returned.

      """

  

      # Placed here to avoid py2/py3 conflicts...

@@ -92,6 +94,10 @@ 

      except koji.GenericError:

          log.exception("Failed to list rpms in tag %r" % tag)

  

+     if not builds:

+         log.debug("No builds are associated with the tag %r", tag)

+         return False

+ 

      # Reformat builds so they are dict with build_id as a key.

      builds = {build["build_id"]: build for build in builds}

  

@@ -162,3 +168,5 @@ 

  

          log.info("Creating local repository in %s" % repo_dir)

          execute_cmd(["/usr/bin/createrepo_c", repo_dir])

+ 

+     return True

@@ -164,6 +164,11 @@ 

              "default": ["module", "scrmod"],

              "desc": "List of allowed koji tag prefixes.",

          },

+         "koji_tag_permission": {

+             "type": str,

+             "default": "admin",

+             "desc": "Permission name to require for newly created Koji tags.",

+         },

          "koji_tag_extra_opts": {

              "type": dict,

              "default": {

@@ -244,7 +249,7 @@ 

          "log_level": {"type": str, "default": 0, "desc": "Log level"},

          "build_logs_dir": {

              "type": Path,

-             "default": "",

+             "default": tempfile.gettempdir(),

              "desc": "Directory to store module build logs to.",

          },

          "build_logs_name_format": {

@@ -656,6 +661,13 @@ 

              "desc": "The minrate configuration on a DNF repo. This configuration will cause DNF to "

                      "timeout loading a repo if the download speed is below minrate for the "

                      "duration of the timeout."

+         },

+         "celery_worker_prefetch_multiplier": {

+             "type": int,

+             "default": 1,

+             "desc": "This defaults to 1 so that the worker doesn't fetch more messages than it can "

+                     "handle at a time. This so that general tasks aren't starved when running "

+                     "a long handler.",

          }

      }

  

@@ -751,7 +763,7 @@ 

  

      def _setifok_log_backend(self, s):

          if s is None:

-             self._log_backend = "console"

+             s = "console"

          elif s not in logger.supported_log_backends():

              raise ValueError("Unsupported log backend")

          self._log_backend = str(s)

@@ -44,3 +44,7 @@ 

      response = jsonify({"status": status, "error": error, "message": message})

      response.status_code = status

      return response

+ 

+ 

+ class IgnoreMessage(Exception):

+     """Raise if message received from message bus should be ignored"""

@@ -23,6 +23,7 @@ 

  from module_build_service.errors import StreamAmbigous

  import module_build_service.messaging

  import module_build_service.scheduler.consumer

+ import module_build_service.scheduler.local

  

  

  manager = Manager(create_app)

@@ -144,6 +145,9 @@ 

          os.remove(dbpath)

  

      db.create_all()

+     # Reconfigure the backend database session registry to use the new the database location

+     db_session.remove()

+     db_session.configure(bind=db.session.bind)

  

      params = {

          "local_build": True,

@@ -158,8 +162,6 @@ 

  

      yaml_file_path = os.path.abspath(yaml_file)

  

-     from module_build_service.db_session import db_session

- 

      if offline:

          import_builds_from_local_dnf_repos(platform_id)

      load_local_builds(local_build_nsvs)

@@ -180,10 +182,7 @@ 

  

          module_build_ids = [build.id for build in module_builds]

  

-     stop = module_build_service.scheduler.make_simple_stop_condition()

- 

-     # Run the consumer until stop_condition returns True

-     module_build_service.scheduler.main([], stop)

+     module_build_service.scheduler.local.main(module_build_ids)

  

      has_failed_module = db_session.query(models.ModuleBuild).filter(

          models.ModuleBuild.id.in_(module_build_ids),

@@ -2,280 +2,11 @@ 

  # SPDX-License-Identifier: MIT

  """Generic messaging functions."""

  

- import re

  import pkg_resources

  

- try:

-     from inspect import signature

- except ImportError:

-     from funcsigs import signature

+ from module_build_service.scheduler.parser import FedmsgMessageParser

  

- from module_build_service import log

- 

- 

- class IgnoreMessage(Exception):

-     pass

- 

- 

- class BaseMessage(object):

-     def __init__(self, msg_id):

-         """

-         A base class to abstract messages from different backends

-         :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-         """

-         self.msg_id = msg_id

- 

-         # Moksha calls `consumer.validate` on messages that it receives, and

-         # even though we have validation turned off in the config there's still

-         # a step that tries to access `msg['body']`, `msg['topic']` and

-         # `msg.get('topic')`.

-         # These are here just so that the `validate` method won't raise an

-         # exception when we push our fake messages through.

-         # Note that, our fake message pushing has worked for a while... but the

-         # *latest* version of fedmsg has some code that exercises the bug.  I

-         # didn't hit this until I went to test in jenkins.

-         self.body = {}

-         self.topic = None

- 

-     def __repr__(self):

-         init_sig = signature(self.__init__)

- 

-         args_strs = (

-             "{}={!r}".format(name, getattr(self, name))

-             if param.default != param.empty

-             else repr(getattr(self, name))

-             for name, param in init_sig.parameters.items()

-         )

- 

-         return "{}({})".format(type(self).__name__, ", ".join(args_strs))

- 

-     def __getitem__(self, key):

-         """ Used to trick moksha into thinking we are a dict. """

-         return getattr(self, key)

- 

-     def __setitem__(self, key, value):

-         """ Used to trick moksha into thinking we are a dict. """

-         return setattr(self, key, value)

- 

-     def get(self, key, value=None):

-         """ Used to trick moksha into thinking we are a dict. """

-         return getattr(self, key, value)

- 

-     def __json__(self):

-         return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)

- 

- 

- class MessageParser(object):

-     def parse(self, msg):

-         raise NotImplementedError()

- 

- 

- class FedmsgMessageParser(MessageParser):

-     def parse(self, msg):

-         """

-         Takes a fedmsg topic and message and converts it to a message object

-         :param msg: the message contents from the fedmsg message

-         :return: an object of BaseMessage descent if the message is a type

-         that the app looks for, otherwise None is returned

-         """

-         if "body" in msg:

-             msg = msg["body"]

-         topic = msg["topic"]

-         topic_categories = _messaging_backends["fedmsg"]["services"]

-         categories_re = "|".join(map(re.escape, topic_categories))

-         regex_pattern = re.compile(

-             r"(?P<category>" + categories_re + r")"

-             r"(?:(?:\.)(?P<object>build|repo|module|decision))?"

-             r"(?:(?:\.)(?P<subobject>state|build))?"

-             r"(?:\.)(?P<event>change|done|end|tag|update)$"

-         )

-         regex_results = re.search(regex_pattern, topic)

- 

-         if regex_results:

-             category = regex_results.group("category")

-             object = regex_results.group("object")

-             subobject = regex_results.group("subobject")

-             event = regex_results.group("event")

- 

-             msg_id = msg.get("msg_id")

-             msg_inner_msg = msg.get("msg")

- 

-             # If there isn't a msg dict in msg then this message can be skipped

-             if not msg_inner_msg:

-                 log.debug(

-                     "Skipping message without any content with the " 'topic "{0}"'.format(topic))

-                 return None

- 

-             msg_obj = None

- 

-             # Ignore all messages from the secondary koji instances.

-             if category == "buildsys":

-                 instance = msg_inner_msg.get("instance", "primary")

-                 if instance != "primary":

-                     log.debug("Ignoring message from %r koji hub." % instance)

-                     return

- 

-             if (

-                 category == "buildsys"

-                 and object == "build"

-                 and subobject == "state"

-                 and event == "change"

-             ):

-                 build_id = msg_inner_msg.get("build_id")

-                 task_id = msg_inner_msg.get("task_id")

-                 build_new_state = msg_inner_msg.get("new")

-                 build_name = msg_inner_msg.get("name")

-                 build_version = msg_inner_msg.get("version")

-                 build_release = msg_inner_msg.get("release")

- 

-                 msg_obj = KojiBuildChange(

-                     msg_id,

-                     build_id,

-                     task_id,

-                     build_new_state,

-                     build_name,

-                     build_version,

-                     build_release,

-                 )

- 

-             elif (

-                 category == "buildsys"

-                 and object == "repo"

-                 and subobject is None

-                 and event == "done"

-             ):

-                 repo_tag = msg_inner_msg.get("tag")

-                 msg_obj = KojiRepoChange(msg_id, repo_tag)

- 

-             elif category == "buildsys" and event == "tag":

-                 tag = msg_inner_msg.get("tag")

-                 name = msg_inner_msg.get("name")

-                 version = msg_inner_msg.get("version")

-                 release = msg_inner_msg.get("release")

-                 nvr = None

-                 if name and version and release:

-                     nvr = "-".join((name, version, release))

-                 msg_obj = KojiTagChange(msg_id, tag, name, nvr)

- 

-             elif (

-                 category == "mbs"

-                 and object == "module"

-                 and subobject == "state"

-                 and event == "change"

-             ):

-                 msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state"))

- 

-             elif (

-                 category == "greenwave"

-                 and object == "decision"

-                 and subobject is None

-                 and event == "update"

-             ):

-                 msg_obj = GreenwaveDecisionUpdate(

-                     msg_id=msg_id,

-                     decision_context=msg_inner_msg.get("decision_context"),

-                     policies_satisfied=msg_inner_msg.get("policies_satisfied"),

-                     subject_identifier=msg_inner_msg.get("subject_identifier"),

-                 )

- 

-             # If the message matched the regex and is important to the app,

-             # it will be returned

-             if msg_obj:

-                 return msg_obj

- 

-         return None

- 

- 

- class KojiBuildChange(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a build's info (in fedmsg this replaces the msg dictionary)

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param build_id: the id of the build (e.g. 264382)

-     :param build_new_state: the new build state, this is currently a Koji

-     integer

-     :param build_name: the name of what is being built

-     (e.g. golang-googlecode-tools)

-     :param build_version: the version of the build (e.g. 6.06.06)

-     :param build_release: the release of the build (e.g. 4.fc25)

-     :param module_build_id: the optional id of the module_build in the database

-     :param state_reason: the optional reason as to why the state changed

-     """

- 

-     def __init__(

-         self,

-         msg_id,

-         build_id,

-         task_id,

-         build_new_state,

-         build_name,

-         build_version,

-         build_release,

-         module_build_id=None,

-         state_reason=None,

-     ):

-         if task_id is None:

-             raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")

-         super(KojiBuildChange, self).__init__(msg_id)

-         self.build_id = build_id

-         self.task_id = task_id

-         self.build_new_state = build_new_state

-         self.build_name = build_name

-         self.build_version = build_version

-         self.build_release = build_release

-         self.module_build_id = module_build_id

-         self.state_reason = state_reason

- 

- 

- class KojiTagChange(BaseMessage):

-     """

-     A class that inherits from BaseMessage to provide a message

-     object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)

-     :param tag: the name of tag (e.g. module-123456789-build)

-     :param artifact: the name of tagged artifact (e.g. module-build-macros)

-     :param nvr: the nvr of the tagged artifact

-     """

- 

-     def __init__(self, msg_id, tag, artifact, nvr):

-         super(KojiTagChange, self).__init__(msg_id)

-         self.tag = tag

-         self.artifact = artifact

-         self.nvr = nvr

- 

- 

- class KojiRepoChange(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a repo's info (in fedmsg this replaces the msg dictionary)

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)

-     """

- 

-     def __init__(self, msg_id, repo_tag):

-         super(KojiRepoChange, self).__init__(msg_id)

-         self.repo_tag = repo_tag

- 

- 

- class MBSModule(BaseMessage):

-     """ A class that inherits from BaseMessage to provide a message

-     object for a module event generated by module_build_service

-     :param msg_id: the id of the msg (e.g. 2016-SomeGUID)

-     :param module_build_id: the id of the module build

-     :param module_build_state: the state of the module build

-     """

- 

-     def __init__(self, msg_id, module_build_id, module_build_state):

-         super(MBSModule, self).__init__(msg_id)

-         self.module_build_id = module_build_id

-         self.module_build_state = module_build_state

- 

- 

- class GreenwaveDecisionUpdate(BaseMessage):

-     """A class representing message send to topic greenwave.decision.update"""

- 

-     def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):

-         super(GreenwaveDecisionUpdate, self).__init__(msg_id)

-         self.decision_context = decision_context

-         self.policies_satisfied = policies_satisfied

-         self.subject_identifier = subject_identifier

+ from module_build_service import conf, log

  

  

  def publish(topic, msg, conf, service):

@@ -331,7 +62,7 @@ 

      # Create fake fedmsg from the message so we can reuse

      # the BaseMessage.from_fedmsg code to get the particular BaseMessage

      # class instance.

-     wrapped_msg = FedmsgMessageParser().parse({

+     wrapped_msg = FedmsgMessageParser(known_fedmsg_services).parse({

          "msg_id": str(_in_memory_msg_id),

          "topic": service + "." + topic,

          "msg": msg

@@ -352,16 +83,19 @@ 

          _initial_messages.append(wrapped_msg)

  

  

+ known_fedmsg_services = ["buildsys", "mbs", "greenwave"]

+ 

+ 

  _fedmsg_backend = {

      "publish": _fedmsg_publish,

-     "services": ["buildsys", "mbs", "greenwave"],

-     "parser": FedmsgMessageParser(),

+     "parser": FedmsgMessageParser(known_fedmsg_services),

+     "services": known_fedmsg_services,

      "topic_suffix": ".",

  }

  _in_memory_backend = {

      "publish": _in_memory_publish,

+     "parser": FedmsgMessageParser(known_fedmsg_services),  # re-used.  :)

      "services": [],

-     "parser": FedmsgMessageParser(),  # re-used.  :)

      "topic_suffix": ".",

  }

  

@@ -375,3 +109,7 @@ 

  

  if not _messaging_backends:

      raise ValueError("No messaging plugins are installed or available.")

+ 

+ # After loading registered messaging backends, the default messaging backend

+ # can be determined by configured messaging backend.

+ default_messaging_backend = _messaging_backends[conf.messaging]

file modified
+13 -44

@@ -19,6 +19,7 @@ 

  import module_build_service.messaging

  from module_build_service import db, log, get_url_for, conf

  from module_build_service.errors import UnprocessableEntity

+ from module_build_service.scheduler import events

  

  DEFAULT_MODULE_CONTEXT = "00000000"

  

@@ -473,7 +474,7 @@ 

          raise ValueError("%s: %s, not in %r" % (key, field, BUILD_STATES))

  

      @validates("rebuild_strategy")

-     def validate_rebuild_stategy(self, key, rebuild_strategy):

+     def validate_rebuild_strategy(self, key, rebuild_strategy):

          if rebuild_strategy not in self.rebuild_strategies.keys():

              choices = ", ".join(self.rebuild_strategies.keys())

              raise ValueError(

@@ -484,7 +485,7 @@ 

  

      @classmethod

      def from_module_event(cls, db_session, event):

-         if type(event) == module_build_service.messaging.MBSModule:

+         if type(event) == events.MBSModule:

              return db_session.query(cls).filter(cls.id == event.module_build_id).first()

          else:

              raise ValueError("%r is not a module message." % type(event).__name__)

@@ -746,41 +747,15 @@ 

          return db_session.query(ModuleBuild).filter_by(state=BUILD_STATES[state]).all()

  

      @classmethod

-     def from_repo_done_event(cls, db_session, event):

-         """ Find the ModuleBuilds in our database that should be in-flight...

-         ... for a given koji tag.

- 

-         There should be at most one.

-         """

-         if event.repo_tag.endswith("-build"):

-             tag = event.repo_tag[:-6]

-         else:

-             tag = event.repo_tag

-         query = (

-             db_session.query(cls)

-             .filter(cls.koji_tag == tag)

-             .filter(cls.state == BUILD_STATES["build"])

-         )

- 

-         count = query.count()

-         if count > 1:

-             raise RuntimeError("%r module builds in flight for %r" % (count, tag))

- 

-         return query.first()

- 

-     @classmethod

-     def from_tag_change_event(cls, db_session, event):

-         tag = event.tag[:-6] if event.tag.endswith("-build") else event.tag

-         query = (

-             db_session.query(cls)

-             .filter(cls.koji_tag == tag)

-             .filter(cls.state == BUILD_STATES["build"])

+     def get_by_tag(cls, db_session, tag_name):

+         tag = tag_name[:-6] if tag_name.endswith("-build") else tag_name

+         query = db_session.query(cls).filter(

+             cls.koji_tag == tag,

+             cls.state == BUILD_STATES["build"]

          )

- 

          count = query.count()

          if count > 1:

              raise RuntimeError("%r module builds in flight for %r" % (count, tag))

- 

          return query.first()

  

      def short_json(self, show_stream_version=False, show_scratch=True):

@@ -1127,18 +1102,12 @@ 

      weight = db.Column(db.Float, default=0)

  

      @classmethod

-     def from_component_event(cls, db_session, event):

-         if isinstance(event, module_build_service.messaging.KojiBuildChange):

-             if event.module_build_id:

-                 return (

-                     db_session.query(cls)

-                     .filter_by(task_id=event.task_id, module_id=event.module_build_id)

-                     .one()

-                 )

-             else:

-                 return db_session.query(cls).filter(cls.task_id == event.task_id).first()

+     def from_component_event(cls, db_session, task_id, module_id=None):

+         _filter = db_session.query(cls).filter

+         if module_id is None:

+             return _filter(cls.task_id == task_id).first()

          else:

-             raise ValueError("%r is not a koji message." % event["topic"])

+             return _filter(cls.task_id == task_id, cls.module_id == module_id).one()

  

      @classmethod

      def from_component_name(cls, db_session, component_name, module_id):

@@ -1,87 +1,3 @@ 

  # -*- coding: utf-8 -*-

  # SPDX-License-Identifier: MIT

  """ This is a sub-module for backend/scheduler functionality. """

- 

- import fedmsg

- import moksha.hub

- 

- import module_build_service.models

- import module_build_service.scheduler.consumer

- 

- from module_build_service.db_session import db_session

- 

- import logging

- 

- log = logging.getLogger(__name__)

- 

- 

- def main(initial_messages, stop_condition):

-     """ Run the consumer until some condition is met.

- 

-     Setting stop_condition to None will run the consumer forever.

-     """

- 

-     config = fedmsg.config.load_config()

-     config["mbsconsumer"] = True

-     config["mbsconsumer.stop_condition"] = stop_condition

-     config["mbsconsumer.initial_messages"] = initial_messages

- 

-     # Moksha requires that we subscribe to *something*, so tell it /dev/null

-     # since we'll just be doing in-memory queue-based messaging for this single

-     # build.

-     config["zmq_enabled"] = True

-     config["zmq_subscribe_endpoints"] = "ipc:///dev/null"

- 

-     consumers = [module_build_service.scheduler.consumer.MBSConsumer]

- 

-     # Note that the hub we kick off here cannot send any message.  You

-     # should use fedmsg.publish(...) still for that.

-     moksha.hub.main(

-         # Pass in our config dict

-         options=config,

-         # Only run the specified consumers if any are so specified.

-         consumers=consumers,

-         # Do not run default producers.

-         producers=[],

-         # Tell moksha to quiet its logging.

-         framework=False,

-     )

- 

- 

- def make_simple_stop_condition():

-     """ Return a simple stop_condition callable.

- 

-     Intended to be used with the main() function here in manage.py and tests.

- 

-     The stop_condition returns true when the latest module build enters the any

-     of the finished states.

-     """

- 

-     def stop_condition(message):

-         # XXX - We ignore the message here and instead just query the DB.

- 

-         # Grab the latest module build.

-         module = (

-             db_session.query(module_build_service.models.ModuleBuild)

-             .order_by(module_build_service.models.ModuleBuild.id.desc())

-             .first()

-         )

-         done = (

-             module_build_service.models.BUILD_STATES["failed"],

-             module_build_service.models.BUILD_STATES["ready"],

-             module_build_service.models.BUILD_STATES["done"],

-         )

-         result = module.state in done

-         log.debug("stop_condition checking %r, got %r" % (module, result))

- 

-         # moksha.hub.main starts the hub and runs it in a separate thread. When

-         # the result is True, remove the db_session from that thread local so

-         # that any pending queries in the transaction will not block other

-         # queries made from other threads.

-         # This is useful for testing particularly.

-         if result:

-             db_session.remove()

- 

-         return result

- 

-     return stop_condition

@@ -5,7 +5,6 @@ 

  to use.

  """

  

- import inspect

  import itertools

  

  try:

@@ -18,21 +17,50 @@ 

  import koji

  import fedmsg.consumers

  import moksha.hub

- import six

  import sqlalchemy.exc

  

  import module_build_service.messaging

- import module_build_service.scheduler.handlers.repos

- import module_build_service.scheduler.handlers.components

- import module_build_service.scheduler.handlers.modules

- import module_build_service.scheduler.handlers.tags