#1638 Drop pungi-orchestrator code
Merged 10 months ago by lsedlar. Opened 2 years ago by lsedlar.
lsedlar/pungi drop-orchestrator  into  master

file modified
-1
@@ -22,4 +22,3 @@ 

      comps

      contributing

      testing

-     multi_compose

file removed
-107
@@ -1,107 +0,0 @@ 

- .. _multi_compose:

- 

- Managing compose from multiple parts

- ====================================

- 

- There may be cases where it makes sense to split a big compose into separate

- parts, but create a compose output that links all output into one familiar

- structure.

- 

- The `pungi-orchestrate` tools allows that.

- 

- It works with an INI-style configuration file. The ``[general]`` section

- contains information about identity of the main compose. Other sections define

- individual parts.

- 

- The parts are scheduled to run in parallel, with the minimal amount of

- serialization. The final compose directory will contain hard-links to the

- files.

- 

- 

- General settings

- ----------------

- 

- **target**

-    Path to directory where the final compose should be created.

- **compose_type**

-    Type of compose to make.

- **release_name**

-    Name of the product for the final compose.

- **release_short**

-    Short name of the product for the final compose.

- **release_version**

-    Version of the product for the final compose.

- **release_type**

-    Type of the product for the final compose.

- **extra_args**

-    Additional arguments that will be passed to the child Pungi processes.

- **koji_profile**

-    If specified, a current event will be retrieved from the Koji instance and

-    used for all parts.

- 

- **kerberos**

-    If set to yes, a kerberos ticket will be automatically created at the start.

-    Set keytab and principal as well.

- **kerberos_keytab**

-    Path to keytab file used to create the kerberos ticket.

- **kerberos_principal**

-    Kerberos principal for the ticket

- 

- **pre_compose_script**

-    Commands to execute before first part is started. Can contain multiple

-    commands on separate lines.

- **post_compose_script**

-    Commands to execute after the last part finishes and final status is

-    updated. Can contain multiple commands on separate lines. ::

- 

-       post_compose_script =

-           compose-latest-symlink $COMPOSE_PATH

-           custom-post-compose-script.sh

- 

-    Multiple environment variables are defined for the scripts:

- 

-     * ``COMPOSE_PATH``

-     * ``COMPOSE_ID``

-     * ``COMPOSE_DATE``

-     * ``COMPOSE_TYPE``

-     * ``COMPOSE_RESPIN``

-     * ``COMPOSE_LABEL``

-     * ``RELEASE_ID``

-     * ``RELEASE_NAME``

-     * ``RELEASE_SHORT``

-     * ``RELEASE_VERSION``

-     * ``RELEASE_TYPE``

-     * ``RELEASE_IS_LAYERED`` – ``YES`` for layered products, empty otherwise

-     * ``BASE_PRODUCT_NAME`` – only set for layered products

-     * ``BASE_PRODUCT_SHORT`` – only set for layered products

-     * ``BASE_PRODUCT_VERSION`` – only set for layered products

-     * ``BASE_PRODUCT_TYPE`` – only set for layered products

- 

- **notification_script**

-    Executable name (or path to a script) that will be used to send a message

-    once the compose is finished. In order for a valid URL to be included in the

-    message, at least one part must configure path translation that would apply

-    to location of main compose.

- 

-    Only two messages will be sent, one for start and one for finish (either

-    successful or not).

- 

- 

- Partial compose settings

- ------------------------

- 

- Each part should have a separate section in the config file.

- 

- It can specify these options:

- 

- **config**

-    Path to configuration file that describes this part. If relative, it is

-    resolved relative to the file with parts configuration.

- **just_phase**, **skip_phase**

-    Customize which phases should run for this part.

- **depends_on**

-    A comma separated list of other parts that must be finished before this part

-    starts.

- **failable**

-    A boolean toggle to mark a part as failable. A failure in such part will

-    mark the final compose as incomplete, but still successful.

file modified
-1
@@ -100,7 +100,6 @@ 

  %{_bindir}/%{name}-config-validate

  %{_bindir}/%{name}-fedmsg-notification

  %{_bindir}/%{name}-notification-report-progress

- %{_bindir}/%{name}-orchestrate

  %{_bindir}/%{name}-patch-iso

  %{_bindir}/%{name}-compare-depsolving

  %{_bindir}/%{name}-wait-for-signed-ostree-handler

@@ -171,32 +171,11 @@ 

      group.add_argument(

          "--offline", action="store_true", help="Do not resolve git references."

      )

-     parser.add_argument(

-         "--multi",

-         metavar="DIR",

-         help=(

-             "Treat source as config for pungi-orchestrate and store dump into "

-             "given directory."

-         ),

-     )

  

      args = parser.parse_args()

  

      defines = config_utils.extract_defines(args.define)

  

-     if args.multi:

-         if len(args.sources) > 1:

-             parser.error("Only one multi config can be specified.")

- 

-         return dump_multi_config(

-             args.sources[0],

-             dest=args.multi,

-             defines=defines,

-             just_dump=args.just_dump,

-             event=args.freeze_event,

-             offline=args.offline,

-         )

- 

      return process_file(

          args.sources,

          defines=defines,

@@ -1,705 +0,0 @@ 

- # -*- coding: utf-8 -*-

- 

- from __future__ import print_function

- 

- import argparse

- import atexit

- import errno

- import json

- import logging

- import os

- import re

- import shutil

- import subprocess

- import sys

- import tempfile

- import time

- import threading

- from collections import namedtuple

- 

- import kobo.conf

- import kobo.log

- import productmd

- from kobo import shortcuts

- from six.moves import configparser, shlex_quote

- 

- import pungi.util

- from pungi.compose import get_compose_dir

- from pungi.linker import linker_pool

- from pungi.phases.pkgset.sources.source_koji import get_koji_event_raw

- from pungi.util import find_old_compose, parse_koji_event, temp_dir

- from pungi.wrappers.kojiwrapper import KojiWrapper

- 

- 

- Config = namedtuple(

-     "Config",

-     [

-         # Path to directory with the compose

-         "target",

-         "compose_type",

-         "label",

-         # Path to the selected old compose that will be reused

-         "old_compose",

-         # Path to directory with config file copies

-         "config_dir",

-         # Which koji event to use (if any)

-         "event",

-         # Additional arguments to pungi-koji executable

-         "extra_args",

-     ],

- )

- 

- log = logging.getLogger(__name__)

- 

- 

- class Status(object):

-     # Ready to start

-     READY = "READY"

-     # Waiting for dependencies to finish.

-     WAITING = "WAITING"

-     # Part is currently running

-     STARTED = "STARTED"

-     # A dependency failed, this one will never start.

-     BLOCKED = "BLOCKED"

- 

- 

- class ComposePart(object):

-     def __init__(self, name, config, just_phase=[], skip_phase=[], dependencies=[]):

-         self.name = name

-         self.config = config

-         self.status = Status.WAITING if dependencies else Status.READY

-         self.just_phase = just_phase

-         self.skip_phase = skip_phase

-         self.blocked_on = set(dependencies)

-         self.depends_on = set(dependencies)

-         self.path = None

-         self.log_file = None

-         self.failable = False

- 

-     def __str__(self):

-         return self.name

- 

-     def __repr__(self):

-         return (

-             "ComposePart({0.name!r},"

-             " {0.config!r},"

-             " {0.status!r},"

-             " just_phase={0.just_phase!r},"

-             " skip_phase={0.skip_phase!r},"

-             " dependencies={0.depends_on!r})"

-         ).format(self)

- 

-     def refresh_status(self):

-         """Refresh status of this part with the result of the compose. This

-         should only be called once the compose finished.

-         """

-         try:

-             with open(os.path.join(self.path, "STATUS")) as fh:

-                 self.status = fh.read().strip()

-         except IOError as exc:

-             log.error("Failed to update status of %s: %s", self.name, exc)

-             log.error("Assuming %s is DOOMED", self.name)

-             self.status = "DOOMED"

- 

-     def is_finished(self):

-         return "FINISHED" in self.status

- 

-     def unblock_on(self, finished_part):

-         """Update set of blockers for this part. If it's empty, mark us as ready."""

-         self.blocked_on.discard(finished_part)

-         if self.status == Status.WAITING and not self.blocked_on:

-             log.debug("%s is ready to start", self)

-             self.status = Status.READY

- 

-     def setup_start(self, global_config, parts):

-         substitutions = dict(

-             ("part-%s" % name, p.path) for name, p in parts.items() if p.is_finished()

-         )

-         substitutions["configdir"] = global_config.config_dir

- 

-         config = pungi.util.load_config(self.config)

- 

-         for f in config.opened_files:

-             # apply substitutions

-             fill_in_config_file(f, substitutions)

- 

-         self.status = Status.STARTED

-         self.path = get_compose_dir(

-             os.path.join(global_config.target, "parts"),

-             config,

-             compose_type=global_config.compose_type,

-             compose_label=global_config.label,

-         )

-         self.log_file = os.path.join(global_config.target, "logs", "%s.log" % self.name)

-         log.info("Starting %s in %s", self.name, self.path)

- 

-     def get_cmd(self, global_config):

-         cmd = ["pungi-koji", "--config", self.config, "--compose-dir", self.path]

-         cmd.append("--%s" % global_config.compose_type)

-         if global_config.label:

-             cmd.extend(["--label", global_config.label])

-         for phase in self.just_phase:

-             cmd.extend(["--just-phase", phase])

-         for phase in self.skip_phase:

-             cmd.extend(["--skip-phase", phase])

-         if global_config.old_compose:

-             cmd.extend(

-                 ["--old-compose", os.path.join(global_config.old_compose, "parts")]

-             )

-         if global_config.event:

-             cmd.extend(["--koji-event", str(global_config.event)])

-         if global_config.extra_args:

-             cmd.extend(global_config.extra_args)

-         cmd.extend(["--no-latest-link"])

-         return cmd

- 

-     @classmethod

-     def from_config(cls, config, section, config_dir):

-         part = cls(

-             name=section,

-             config=os.path.join(config_dir, config.get(section, "config")),

-             just_phase=_safe_get_list(config, section, "just_phase", []),

-             skip_phase=_safe_get_list(config, section, "skip_phase", []),

-             dependencies=_safe_get_list(config, section, "depends_on", []),

-         )

-         if config.has_option(section, "failable"):

-             part.failable = config.getboolean(section, "failable")

-         return part

- 

- 

- def _safe_get_list(config, section, option, default=None):

-     """Get a value from config parser. The result is split into a list on

-     commas or spaces, and `default` is returned if the key does not exist.

-     """

-     if config.has_option(section, option):

-         value = config.get(section, option)

-         return [x.strip() for x in re.split(r"[, ]+", value) if x]

-     return default

- 

- 

- def fill_in_config_file(fp, substs):

-     """Templating function. It works with Jinja2 style placeholders such as

-     {{foo}}. Whitespace around the key name is fine. The file is modified in place.

- 

-     :param fp string: path to the file to process

-     :param substs dict: a mapping for values to put into the file

-     """

- 

-     def repl(match):

-         try:

-             return substs[match.group(1)]

-         except KeyError as exc:

-             raise RuntimeError(

-                 "Unknown placeholder %s in %s" % (exc, os.path.basename(fp))

-             )

- 

-     with open(fp, "r") as f:

-         contents = re.sub(r"{{ *([a-zA-Z-_]+) *}}", repl, f.read())

-     with open(fp, "w") as f:

-         f.write(contents)

- 

- 

- def start_part(global_config, parts, part):

-     part.setup_start(global_config, parts)

-     fh = open(part.log_file, "w")

-     cmd = part.get_cmd(global_config)

-     log.debug("Running command %r", " ".join(shlex_quote(x) for x in cmd))

-     return subprocess.Popen(cmd, stdout=fh, stderr=subprocess.STDOUT)

- 

- 

- def handle_finished(global_config, linker, parts, proc, finished_part):

-     finished_part.refresh_status()

-     log.info("%s finished with status %s", finished_part, finished_part.status)

-     if proc.returncode == 0:

-         # Success, unblock other parts...

-         for part in parts.values():

-             part.unblock_on(finished_part.name)

-         # ...and link the results into final destination.

-         copy_part(global_config, linker, finished_part)

-         update_metadata(global_config, finished_part)

-     else:

-         # Failure, other stuff may be blocked.

-         log.info("See details in %s", finished_part.log_file)

-         block_on(parts, finished_part.name)

- 

- 

- def copy_part(global_config, linker, part):

-     c = productmd.Compose(part.path)

-     for variant in c.info.variants:

-         data_path = os.path.join(part.path, "compose", variant)

-         link = os.path.join(global_config.target, "compose", variant)

-         log.info("Hardlinking content %s -> %s", data_path, link)

-         hardlink_dir(linker, data_path, link)

- 

- 

- def hardlink_dir(linker, srcdir, dstdir):

-     for root, dirs, files in os.walk(srcdir):

-         root = os.path.relpath(root, srcdir)

-         for f in files:

-             src = os.path.normpath(os.path.join(srcdir, root, f))

-             dst = os.path.normpath(os.path.join(dstdir, root, f))

-             linker.queue_put((src, dst))

- 

- 

- def update_metadata(global_config, part):

-     part_metadata_dir = os.path.join(part.path, "compose", "metadata")

-     final_metadata_dir = os.path.join(global_config.target, "compose", "metadata")

-     for f in os.listdir(part_metadata_dir):

-         # Load the metadata

-         with open(os.path.join(part_metadata_dir, f)) as fh:

-             part_metadata = json.load(fh)

-         final_metadata = os.path.join(final_metadata_dir, f)

-         if os.path.exists(final_metadata):

-             # We already have this file, will need to merge.

-             merge_metadata(final_metadata, part_metadata)

-         else:

-             # A new file, just copy it.

-             copy_metadata(global_config, final_metadata, part_metadata)

- 

- 

- def copy_metadata(global_config, final_metadata, source):

-     """Copy file to final location, but update compose information."""

-     with open(

-         os.path.join(global_config.target, "compose/metadata/composeinfo.json")

-     ) as f:

-         composeinfo = json.load(f)

-     try:

-         source["payload"]["compose"].update(composeinfo["payload"]["compose"])

-     except KeyError:

-         # No [payload][compose], probably OSBS metadata

-         pass

-     with open(final_metadata, "w") as f:

-         json.dump(source, f, indent=2, sort_keys=True)

- 

- 

- def merge_metadata(final_metadata, source):

-     with open(final_metadata) as f:

-         metadata = json.load(f)

- 

-     try:

-         key = {

-             "productmd.composeinfo": "variants",

-             "productmd.modules": "modules",

-             "productmd.images": "images",

-             "productmd.rpms": "rpms",

-         }[source["header"]["type"]]

-         # TODO what if multiple parts create images for the same variant

-         metadata["payload"][key].update(source["payload"][key])

-     except KeyError:

-         # OSBS metadata, merge whole file

-         metadata.update(source)

-     with open(final_metadata, "w") as f:

-         json.dump(metadata, f, indent=2, sort_keys=True)

- 

- 

- def block_on(parts, name):

-     """Part ``name`` failed, mark everything depending on it as blocked."""

-     for part in parts.values():

-         if name in part.blocked_on:

-             log.warning("%s is blocked now and will not run", part)

-             part.status = Status.BLOCKED

-             block_on(parts, part.name)

- 

- 

- def check_finished_processes(processes):

-     """Walk through all active processes and check if something finished."""

-     for proc in processes.keys():

-         proc.poll()

-         if proc.returncode is not None:

-             yield proc, processes[proc]

- 

- 

- def run_all(global_config, parts):

-     # Mapping subprocess.Popen -> ComposePart

-     processes = dict()

-     remaining = set(p.name for p in parts.values() if not p.is_finished())

- 

-     with linker_pool("hardlink") as linker:

-         while remaining or processes:

-             update_status(global_config, parts)

- 

-             for proc, part in check_finished_processes(processes):

-                 del processes[proc]

-                 handle_finished(global_config, linker, parts, proc, part)

- 

-             # Start new available processes.

-             for name in list(remaining):

-                 part = parts[name]

-                 # Start all ready parts

-                 if part.status == Status.READY:

-                     remaining.remove(name)

-                     processes[start_part(global_config, parts, part)] = part

-                 # Remove blocked parts from todo list

-                 elif part.status == Status.BLOCKED:

-                     remaining.remove(part.name)

- 

-             # Wait for any child process to finish if there is any.

-             if processes:

-                 pid, reason = os.wait()

-                 for proc in processes.keys():

-                     # Set the return code for process that we caught by os.wait().

-                     # Calling poll() on it would not set the return code properly

-                     # since the value was already consumed by os.wait().

-                     if proc.pid == pid:

-                         proc.returncode = (reason >> 8) & 0xFF

- 

-         log.info("Waiting for linking to finish...")

-     return update_status(global_config, parts)

- 

- 

- def get_target_dir(config, compose_info, label, reldir=""):

-     """Find directory where this compose will be.

- 

-     @param reldir: if target path in config is relative, it will be resolved

-                    against this directory

-     """

-     dir = os.path.realpath(os.path.join(reldir, config.get("general", "target")))

-     target_dir = get_compose_dir(

-         dir,

-         compose_info,

-         compose_type=config.get("general", "compose_type"),

-         compose_label=label,

-     )

-     return target_dir

- 

- 

- def setup_logging(debug=False):

-     FORMAT = "%(asctime)s: %(levelname)s: %(message)s"

-     level = logging.DEBUG if debug else logging.INFO

-     kobo.log.add_stderr_logger(log, log_level=level, format=FORMAT)

-     log.setLevel(level)

- 

- 

- def compute_status(statuses):

-     if any(map(lambda x: x[0] in ("STARTED", "WAITING"), statuses)):

-         # If there is anything still running or waiting to start, the whole is

-         # still running.

-         return "STARTED"

-     elif any(map(lambda x: x[0] in ("DOOMED", "BLOCKED") and not x[1], statuses)):

-         # If any required part is doomed or blocked, the whole is doomed

-         return "DOOMED"

-     elif all(map(lambda x: x[0] == "FINISHED", statuses)):

-         # If all parts are complete, the whole is complete

-         return "FINISHED"

-     else:

-         return "FINISHED_INCOMPLETE"

- 

- 

- def update_status(global_config, parts):

-     log.debug("Updating status metadata")

-     metadata = {}

-     statuses = set()

-     for part in parts.values():

-         metadata[part.name] = {"status": part.status, "path": part.path}

-         statuses.add((part.status, part.failable))

-     metadata_path = os.path.join(

-         global_config.target, "compose", "metadata", "parts.json"

-     )

-     with open(metadata_path, "w") as fh:

-         json.dump(metadata, fh, indent=2, sort_keys=True, separators=(",", ": "))

- 

-     status = compute_status(statuses)

-     log.info("Overall status is %s", status)

-     with open(os.path.join(global_config.target, "STATUS"), "w") as fh:

-         fh.write(status)

- 

-     return status != "DOOMED"

- 

- 

- def prepare_compose_dir(config, args, main_config_file, compose_info):

-     if not hasattr(args, "compose_path"):

-         # Creating a brand new compose

-         target_dir = get_target_dir(

-             config, compose_info, args.label, reldir=os.path.dirname(main_config_file)

-         )

-         for dir in ("logs", "parts", "compose/metadata", "work/global"):

-             try:

-                 os.makedirs(os.path.join(target_dir, dir))

-             except OSError as exc:

-                 if exc.errno != errno.EEXIST:

-                     raise

-         with open(os.path.join(target_dir, "STATUS"), "w") as fh:

-             fh.write("STARTED")

-         # Copy initial composeinfo for new compose

-         shutil.copy(

-             os.path.join(target_dir, "work/global/composeinfo-base.json"),

-             os.path.join(target_dir, "compose/metadata/composeinfo.json"),

-         )

-     else:

-         # Restarting a particular compose

-         target_dir = args.compose_path

- 

-     return target_dir

- 

- 

- def load_parts_metadata(global_config):

-     parts_metadata = os.path.join(global_config.target, "compose/metadata/parts.json")

-     with open(parts_metadata) as f:

-         return json.load(f)

- 

- 

- def setup_for_restart(global_config, parts, to_restart):

-     has_stuff_to_do = False

-     metadata = load_parts_metadata(global_config)

-     for key in metadata:

-         # Update state to match what is on disk

-         log.debug(

-             "Reusing %s (%s) from %s",

-             key,

-             metadata[key]["status"],

-             metadata[key]["path"],

-         )

-         parts[key].status = metadata[key]["status"]

-         parts[key].path = metadata[key]["path"]

-     for key in to_restart:

-         # Set restarted parts to run again

-         parts[key].status = Status.WAITING

-         parts[key].path = None

- 

-     for key in to_restart:

-         # Remove blockers that are already finished

-         for blocker in list(parts[key].blocked_on):

-             if parts[blocker].is_finished():

-                 parts[key].blocked_on.discard(blocker)

-         if not parts[key].blocked_on:

-             log.debug("Part %s in not blocked", key)

-             # Nothing blocks it; let's go

-             parts[key].status = Status.READY

-             has_stuff_to_do = True

- 

-     if not has_stuff_to_do:

-         raise RuntimeError("All restarted parts are blocked. Nothing to do.")

- 

- 

- def run_kinit(config):

-     if not config.getboolean("general", "kerberos"):

-         return

- 

-     keytab = config.get("general", "kerberos_keytab")

-     principal = config.get("general", "kerberos_principal")

- 

-     fd, fname = tempfile.mkstemp(prefix="krb5cc_pungi-orchestrate_")

-     os.close(fd)

-     os.environ["KRB5CCNAME"] = fname

-     shortcuts.run(["kinit", "-k", "-t", keytab, principal])

-     log.debug("Created a kerberos ticket for %s", principal)

- 

-     atexit.register(os.remove, fname)

- 

- 

- def get_compose_data(compose_path):

-     try:

-         compose = productmd.compose.Compose(compose_path)

-         data = {

-             "compose_id": compose.info.compose.id,

-             "compose_date": compose.info.compose.date,

-             "compose_type": compose.info.compose.type,

-             "compose_respin": str(compose.info.compose.respin),

-             "compose_label": compose.info.compose.label,

-             "release_id": compose.info.release_id,

-             "release_name": compose.info.release.name,

-             "release_short": compose.info.release.short,

-             "release_version": compose.info.release.version,

-             "release_type": compose.info.release.type,

-             "release_is_layered": compose.info.release.is_layered,

-         }

-         if compose.info.release.is_layered:

-             data.update(

-                 {

-                     "base_product_name": compose.info.base_product.name,

-                     "base_product_short": compose.info.base_product.short,

-                     "base_product_version": compose.info.base_product.version,

-                     "base_product_type": compose.info.base_product.type,

-                 }

-             )

-         return data

-     except Exception:

-         return {}

- 

- 

- def get_script_env(compose_path):

-     env = os.environ.copy()

-     env["COMPOSE_PATH"] = compose_path

-     for key, value in get_compose_data(compose_path).items():

-         if isinstance(value, bool):

-             env[key.upper()] = "YES" if value else ""

-         else:

-             env[key.upper()] = str(value) if value else ""

-     return env

- 

- 

- def run_scripts(prefix, compose_dir, scripts):

-     env = get_script_env(compose_dir)

-     for idx, script in enumerate(scripts.strip().splitlines()):

-         command = script.strip()

-         logfile = os.path.join(compose_dir, "logs", "%s%s.log" % (prefix, idx))

-         log.debug("Running command: %r", command)

-         log.debug("See output in %s", logfile)

-         shortcuts.run(command, env=env, logfile=logfile)

- 

- 

- def try_translate_path(parts, path):

-     translation = []

-     for part in parts.values():

-         conf = pungi.util.load_config(part.config)

-         translation.extend(conf.get("translate_paths", []))

-     return pungi.util.translate_path_raw(translation, path)

- 

- 

- def send_notification(compose_dir, command, parts):

-     if not command:

-         return

-     from pungi.notifier import PungiNotifier

- 

-     data = get_compose_data(compose_dir)

-     data["location"] = try_translate_path(parts, compose_dir)

-     notifier = PungiNotifier([command])

-     with open(os.path.join(compose_dir, "STATUS")) as f:

-         status = f.read().strip()

-     notifier.send("status-change", workdir=compose_dir, status=status, **data)

- 

- 

- def setup_progress_monitor(global_config, parts):

-     """Update configuration so that each part send notifications about its

-     progress to the orchestrator.

- 

-     There is a file to which the notification is written. The orchestrator is

-     reading it and mapping the entries to particular parts. The path to this

-     file is stored in an environment variable.

-     """

-     tmp_file = tempfile.NamedTemporaryFile(prefix="pungi-progress-monitor_")

-     os.environ["_PUNGI_ORCHESTRATOR_PROGRESS_MONITOR"] = tmp_file.name

-     atexit.register(os.remove, tmp_file.name)

- 

-     global_config.extra_args.append(

-         "--notification-script=pungi-notification-report-progress"

-     )

- 

-     def reader():

-         while True:

-             line = tmp_file.readline()

-             if not line:

-                 time.sleep(0.1)

-                 continue

-             path, msg = line.split(":", 1)

-             for part in parts:

-                 if parts[part].path == os.path.dirname(path):

-                     log.debug("%s: %s", part, msg.strip())

-                     break

- 

-     monitor = threading.Thread(target=reader)

-     monitor.daemon = True

-     monitor.start()

- 

- 

- def run(work_dir, main_config_file, args):

-     config_dir = os.path.join(work_dir, "config")

-     shutil.copytree(os.path.dirname(main_config_file), config_dir)

- 

-     # Read main config

-     parser = configparser.RawConfigParser(

-         defaults={

-             "kerberos": "false",

-             "pre_compose_script": "",

-             "post_compose_script": "",

-             "notification_script": "",

-         }

-     )

-     parser.read(main_config_file)

- 

-     # Create kerberos ticket

-     run_kinit(parser)

- 

-     compose_info = dict(parser.items("general"))

-     compose_type = parser.get("general", "compose_type")

- 

-     target_dir = prepare_compose_dir(parser, args, main_config_file, compose_info)

-     kobo.log.add_file_logger(log, os.path.join(target_dir, "logs", "orchestrator.log"))

-     log.info("Composing %s", target_dir)

- 

-     run_scripts("pre_compose_", target_dir, parser.get("general", "pre_compose_script"))

- 

-     old_compose = find_old_compose(

-         os.path.dirname(target_dir),

-         compose_info["release_short"],

-         compose_info["release_version"],

-         "",

-     )

-     if old_compose:

-         log.info("Reusing old compose %s", old_compose)

- 

-     global_config = Config(

-         target=target_dir,

-         compose_type=compose_type,

-         label=args.label,

-         old_compose=old_compose,

-         config_dir=os.path.dirname(main_config_file),

-         event=args.koji_event,

-         extra_args=_safe_get_list(parser, "general", "extra_args"),

-     )

- 

-     if not global_config.event and parser.has_option("general", "koji_profile"):

-         koji_wrapper = KojiWrapper(parser.get("general", "koji_profile"))

-         event_file = os.path.join(global_config.target, "work/global/koji-event")

-         result = get_koji_event_raw(koji_wrapper, None, event_file)

-         global_config = global_config._replace(event=result["id"])

- 

-     parts = {}

-     for section in parser.sections():

-         if section == "general":

-             continue

-         parts[section] = ComposePart.from_config(parser, section, config_dir)

- 

-     if hasattr(args, "part"):

-         setup_for_restart(global_config, parts, args.part)

- 

-     setup_progress_monitor(global_config, parts)

- 

-     send_notification(target_dir, parser.get("general", "notification_script"), parts)

- 

-     retcode = run_all(global_config, parts)

- 

-     if retcode:

-         # Only run the script if we are not doomed.

-         run_scripts(

-             "post_compose_", target_dir, parser.get("general", "post_compose_script")

-         )

- 

-     send_notification(target_dir, parser.get("general", "notification_script"), parts)

- 

-     return retcode

- 

- 

- def parse_args(argv):

-     parser = argparse.ArgumentParser()

-     parser.add_argument("--debug", action="store_true")

-     parser.add_argument("--koji-event", metavar="ID", type=parse_koji_event)

-     subparsers = parser.add_subparsers()

-     start = subparsers.add_parser("start")

-     start.add_argument("config", metavar="CONFIG")

-     start.add_argument("--label")

- 

-     restart = subparsers.add_parser("restart")

-     restart.add_argument("config", metavar="CONFIG")

-     restart.add_argument("compose_path", metavar="COMPOSE_PATH")

-     restart.add_argument(

-         "part", metavar="PART", nargs="*", help="which parts to restart"

-     )

-     restart.add_argument("--label")

- 

-     return parser.parse_args(argv)

- 

- 

- def main(argv=None):

-     args = parse_args(argv)

-     setup_logging(args.debug)

- 

-     main_config_file = os.path.abspath(args.config)

- 

-     with temp_dir() as work_dir:

-         try:

-             if not run(work_dir, main_config_file, args):

-                 sys.exit(1)

-         except Exception:

-             log.exception("Unhandled exception!")

-             sys.exit(1)

file modified
-1
@@ -36,7 +36,6 @@ 

              "pungi-patch-iso = pungi.scripts.patch_iso:cli_main",

              "pungi-make-ostree = pungi.ostree:main",

              "pungi-notification-report-progress = pungi.scripts.report_progress:main",

-             "pungi-orchestrate = pungi_utils.orchestrator:main",

              "pungi-wait-for-signed-ostree-handler = pungi.scripts.wait_for_signed_ostree_handler:main",  # noqa: E501

              "pungi-koji = pungi.scripts.pungi_koji:cli_main",

              "pungi-gather = pungi.scripts.pungi_gather:cli_main",

@@ -1,934 +0,0 @@ 

- # -*- coding: utf-8 -*-

- 

- import itertools

- import json

- from functools import wraps

- import operator

- import os

- import shutil

- import subprocess

- from textwrap import dedent

- 

- import mock

- import six

- from six.moves import configparser

- 

- from parameterized import parameterized

- 

- from tests.helpers import BaseTestCase, PungiTestCase, touch, FIXTURE_DIR

- from pungi_utils import orchestrator as o

- 

- 

- class TestConfigSubstitute(PungiTestCase):

-     def setUp(self):

-         super(TestConfigSubstitute, self).setUp()

-         self.fp = os.path.join(self.topdir, "config.conf")

- 

-     @parameterized.expand(

-         [

-             ("hello = 'world'", "hello = 'world'"),

-             ("hello = '{{foo}}'", "hello = 'bar'"),

-             ("hello = '{{  foo}}'", "hello = 'bar'"),

-             ("hello = '{{foo  }}'", "hello = 'bar'"),

-         ]

-     )

-     def test_substitutions(self, initial, expected):

-         touch(self.fp, initial)

-         o.fill_in_config_file(self.fp, {"foo": "bar"})

-         with open(self.fp) as f:

-             self.assertEqual(expected, f.read())

- 

-     def test_missing_key(self):

-         touch(self.fp, "hello = '{{unknown}}'")

-         with self.assertRaises(RuntimeError) as ctx:

-             o.fill_in_config_file(self.fp, {})

-         self.assertEqual(

-             "Unknown placeholder 'unknown' in config.conf", str(ctx.exception)

-         )

- 

- 

- class TestSafeGetList(BaseTestCase):

-     @parameterized.expand(

-         [

-             ("", []),

-             ("foo", ["foo"]),

-             ("foo,bar", ["foo", "bar"]),

-             ("foo  bar", ["foo", "bar"]),

-         ]

-     )

-     def test_success(self, value, expected):

-         cf = configparser.RawConfigParser()

-         cf.add_section("general")

-         cf.set("general", "key", value)

-         self.assertEqual(o._safe_get_list(cf, "general", "key"), expected)

- 

-     def test_default(self):

-         cf = configparser.RawConfigParser()

-         cf.add_section("general")

-         self.assertEqual(o._safe_get_list(cf, "general", "missing", "hello"), "hello")

- 

- 

- class TestComposePart(PungiTestCase):

-     def test_from_minimal_config(self):

-         cf = configparser.RawConfigParser()

-         cf.add_section("test")

-         cf.set("test", "config", "my.conf")

- 

-         part = o.ComposePart.from_config(cf, "test", "/tmp/config")

-         deps = "set()" if six.PY3 else "set([])"

-         self.assertEqual(str(part), "test")

-         self.assertEqual(

-             repr(part),

-             "ComposePart('test', '/tmp/config/my.conf', 'READY', "

-             "just_phase=[], skip_phase=[], dependencies=%s)" % deps,

-         )

-         self.assertFalse(part.failable)

- 

-     def test_from_full_config(self):

-         cf = configparser.RawConfigParser()

-         cf.add_section("test")

-         cf.set("test", "config", "my.conf")

-         cf.set("test", "depends_on", "base")

-         cf.set("test", "skip_phase", "skip")

-         cf.set("test", "just_phase", "just")

-         cf.set("test", "failable", "yes")

- 

-         part = o.ComposePart.from_config(cf, "test", "/tmp/config")

-         deps = "{'base'}" if six.PY3 else "set(['base'])"

-         self.assertEqual(

-             repr(part),

-             "ComposePart('test', '/tmp/config/my.conf', 'WAITING', "

-             "just_phase=['just'], skip_phase=['skip'], dependencies=%s)" % deps,

-         )

-         self.assertTrue(part.failable)

- 

-     def test_get_cmd(self):

-         conf = o.Config(

-             "/tgt/", "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"]

-         )

-         part = o.ComposePart(

-             "test", "/tmp/my.conf", just_phase=["just"], skip_phase=["skip"]

-         )

-         part.path = "/compose"

- 

-         self.assertEqual(

-             part.get_cmd(conf),

-             [

-                 "pungi-koji",

-                 "--config",

-                 "/tmp/my.conf",

-                 "--compose-dir",

-                 "/compose",

-                 "--production",

-                 "--label",

-                 "RC-1.0",

-                 "--just-phase",

-                 "just",

-                 "--skip-phase",

-                 "skip",

-                 "--old-compose",

-                 "/old/parts",

-                 "--koji-event",

-                 "1234",

-                 "--quiet",

-                 "--no-latest-link",

-             ],

-         )

- 

-     def test_refresh_status(self):

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.path = os.path.join(self.topdir)

-         touch(os.path.join(self.topdir, "STATUS"), "FINISHED")

-         part.refresh_status()

-         self.assertEqual(part.status, "FINISHED")

- 

-     def test_refresh_status_missing_file(self):

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.path = os.path.join(self.topdir)

-         part.refresh_status()

-         self.assertEqual(part.status, "DOOMED")

- 

-     @parameterized.expand(["FINISHED", "FINISHED_INCOMPLETE"])

-     def test_is_finished(self, status):

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.status = status

-         self.assertTrue(part.is_finished())

- 

-     @parameterized.expand(["STARTED", "WAITING"])

-     def test_is_not_finished(self, status):

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.status = status

-         self.assertFalse(part.is_finished())

- 

-     @mock.patch("pungi_utils.orchestrator.fill_in_config_file")

-     @mock.patch("pungi_utils.orchestrator.get_compose_dir")

-     @mock.patch("kobo.conf.PyConfigParser")

-     def test_setup_start(self, Conf, gcd, ficf):

-         def pth(*path):

-             return os.path.join(self.topdir, *path)

- 

-         conf = o.Config(

-             pth("tgt"), "production", "RC-1.0", "/old", pth("cfg"), None, None

-         )

-         part = o.ComposePart("test", "/tmp/my.conf")

-         parts = {"base": mock.Mock(path="/base", is_finished=lambda: True)}

-         Conf.return_value.opened_files = ["foo.conf"]

- 

-         part.setup_start(conf, parts)

- 

-         self.assertEqual(part.status, "STARTED")

-         self.assertEqual(part.path, gcd.return_value)

-         self.assertEqual(part.log_file, pth("tgt", "logs", "test.log"))

-         self.assertEqual(

-             ficf.call_args_list,

-             [mock.call("foo.conf", {"part-base": "/base", "configdir": pth("cfg")})],

-         )

-         self.assertEqual(

-             gcd.call_args_list,

-             [

-                 mock.call(

-                     pth("tgt/parts"),

-                     Conf.return_value,

-                     compose_type="production",

-                     compose_label="RC-1.0",

-                 )

-             ],

-         )

- 

-     @parameterized.expand(

-         [

-             # Nothing blocking, no change

-             ([], [], o.Status.READY),

-             # Remove last blocker and switch to READY

-             (["finished"], [], o.Status.READY),

-             # Blocker remaining, stay in WAITING

-             (["finished", "block"], ["block"], o.Status.WAITING),

-         ]

-     )

-     def test_unblock_on(self, deps, blockers, status):

-         part = o.ComposePart("test", "/tmp/my.conf", dependencies=deps)

-         part.unblock_on("finished")

-         six.assertCountEqual(self, part.blocked_on, blockers)

-         self.assertEqual(part.status, status)

- 

- 

- class TestStartPart(PungiTestCase):

-     @mock.patch("subprocess.Popen")

-     def test_start(self, Popen):

-         part = mock.Mock(log_file=os.path.join(self.topdir, "log"))

-         config = mock.Mock()

-         parts = mock.Mock()

-         cmd = ["pungi-koji", "..."]

- 

-         part.get_cmd.return_value = cmd

- 

-         proc = o.start_part(config, parts, part)

- 

-         self.assertEqual(

-             part.mock_calls,

-             [mock.call.setup_start(config, parts), mock.call.get_cmd(config)],

-         )

-         self.assertEqual(proc, Popen.return_value)

-         self.assertEqual(

-             Popen.call_args_list,

-             [mock.call(cmd, stdout=mock.ANY, stderr=subprocess.STDOUT)],

-         )

- 

- 

- class TestHandleFinished(BaseTestCase):

-     def setUp(self):

-         self.config = mock.Mock()

-         self.linker = mock.Mock()

-         self.parts = {"a": mock.Mock(), "b": mock.Mock()}

- 

-     @mock.patch("pungi_utils.orchestrator.update_metadata")

-     @mock.patch("pungi_utils.orchestrator.copy_part")

-     def test_handle_success(self, cp, um):

-         proc = mock.Mock(returncode=0)

-         o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"])

- 

-         self.assertEqual(

-             self.parts["a"].mock_calls,

-             [mock.call.refresh_status(), mock.call.unblock_on(self.parts["a"].name)],

-         )

-         self.assertEqual(

-             self.parts["b"].mock_calls, [mock.call.unblock_on(self.parts["a"].name)]

-         )

-         self.assertEqual(

-             cp.call_args_list, [mock.call(self.config, self.linker, self.parts["a"])]

-         )

-         self.assertEqual(um.call_args_list, [mock.call(self.config, self.parts["a"])])

- 

-     @mock.patch("pungi_utils.orchestrator.block_on")

-     def test_handle_failure(self, bo):

-         proc = mock.Mock(returncode=1)

-         o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"])

- 

-         self.assertEqual(self.parts["a"].mock_calls, [mock.call.refresh_status()])

- 

-         self.assertEqual(

-             bo.call_args_list, [mock.call(self.parts, self.parts["a"].name)]

-         )

- 

- 

- class TestBlockOn(BaseTestCase):

-     def test_single(self):

-         parts = {"b": o.ComposePart("b", "b.conf", dependencies=["a"])}

- 

-         o.block_on(parts, "a")

- 

-         self.assertEqual(parts["b"].status, o.Status.BLOCKED)

- 

-     def test_chain(self):

-         parts = {

-             "b": o.ComposePart("b", "b.conf", dependencies=["a"]),

-             "c": o.ComposePart("c", "c.conf", dependencies=["b"]),

-             "d": o.ComposePart("d", "d.conf", dependencies=["c"]),

-         }

- 

-         o.block_on(parts, "a")

- 

-         self.assertEqual(parts["b"].status, o.Status.BLOCKED)

-         self.assertEqual(parts["c"].status, o.Status.BLOCKED)

-         self.assertEqual(parts["d"].status, o.Status.BLOCKED)

- 

- 

- class TestUpdateMetadata(PungiTestCase):

-     def assertEqualJSON(self, f1, f2):

-         with open(f1) as f:

-             actual = json.load(f)

-         with open(f2) as f:

-             expected = json.load(f)

-         self.assertEqual(actual, expected)

- 

-     def assertEqualMetadata(self, expected):

-         expected_dir = os.path.join(FIXTURE_DIR, expected, "compose/metadata")

-         for f in os.listdir(expected_dir):

-             self.assertEqualJSON(

-                 os.path.join(self.tgt, "compose/metadata", f),

-                 os.path.join(expected_dir, f),

-             )

- 

-     @parameterized.expand(["empty-metadata", "basic-metadata"])

-     def test_merge_into_empty(self, fixture):

-         self.tgt = os.path.join(self.topdir, "target")

- 

-         conf = o.Config(self.tgt, "production", None, None, None, None, [])

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20181001.n.0")

- 

-         shutil.copytree(os.path.join(FIXTURE_DIR, fixture), self.tgt)

- 

-         o.update_metadata(conf, part)

- 

-         self.assertEqualMetadata(fixture + "-merged")

- 

- 

- class TestCopyPart(PungiTestCase):

-     @mock.patch("pungi_utils.orchestrator.hardlink_dir")

-     def test_copy(self, hd):

-         self.tgt = os.path.join(self.topdir, "target")

-         conf = o.Config(self.tgt, "production", None, None, None, None, [])

-         linker = mock.Mock()

-         part = o.ComposePart("test", "/tmp/my.conf")

-         part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4")

- 

-         o.copy_part(conf, linker, part)

- 

-         six.assertCountEqual(

-             self,

-             hd.call_args_list,

-             [

-                 mock.call(

-                     linker,

-                     os.path.join(part.path, "compose", variant),

-                     os.path.join(self.tgt, "compose", variant),

-                 )

-                 for variant in ["Client", "Server"]

-             ],

-         )

- 

- 

- class TestHardlinkDir(PungiTestCase):

-     def test_hardlinking(self):

-         linker = mock.Mock()

-         src = os.path.join(self.topdir, "src")

-         dst = os.path.join(self.topdir, "dst")

-         files = ["file.txt", "nested/deep/another.txt"]

- 

-         for f in files:

-             touch(os.path.join(src, f))

- 

-         o.hardlink_dir(linker, src, dst)

- 

-         six.assertCountEqual(

-             self,

-             linker.queue_put.call_args_list,

-             [mock.call((os.path.join(src, f), os.path.join(dst, f))) for f in files],

-         )

- 

- 

- class TestCheckFinishedProcesses(BaseTestCase):

-     def test_nothing_finished(self):

-         k1 = mock.Mock(returncode=None)

-         v1 = mock.Mock()

-         processes = {k1: v1}

- 

-         six.assertCountEqual(self, o.check_finished_processes(processes), [])

- 

-     def test_yields_finished(self):

-         k1 = mock.Mock(returncode=None)

-         v1 = mock.Mock()

-         k2 = mock.Mock(returncode=0)

-         v2 = mock.Mock()

-         processes = {k1: v1, k2: v2}

- 

-         six.assertCountEqual(self, o.check_finished_processes(processes), [(k2, v2)])

- 

-     def test_yields_failed(self):

-         k1 = mock.Mock(returncode=1)

-         v1 = mock.Mock()

-         processes = {k1: v1}

- 

-         six.assertCountEqual(self, o.check_finished_processes(processes), [(k1, v1)])

- 

- 

- class _Part(object):

-     def __init__(self, name, parent=None, fails=False, status=None):

-         self.name = name

-         self.finished = False

-         self.status = o.Status.WAITING if parent else o.Status.READY

-         if status:

-             self.status = status

-         self.proc = mock.Mock(name="proc_%s" % name, pid=hash(self))

-         self.parent = parent

-         self.fails = fails

-         self.failable = False

-         self.path = "/path/to/%s" % name

-         self.blocked_on = set([parent]) if parent else set()

- 

-     def is_finished(self):

-         return self.finished or self.status == "FINISHED"

- 

-     def __repr__(self):

-         return "<_Part(%r, parent=%r)>" % (self.name, self.parent)

- 

- 

- def with_mocks(parts, finish_order, wait_results):

-     """Setup all mocks and create dict with the parts.

-     :param finish_order: nested list: first element contains parts that finish

-                          in first iteration, etc.

-     :param wait_results: list of names of processes that are returned by wait in each

-                          iteration

-     """

- 

-     def decorator(func):

-         @wraps(func)

-         def worker(self, lp, update_status, cfp, hf, sp, wait):

-             self.parts = dict((p.name, p) for p in parts)

-             self.linker = lp.return_value.__enter__.return_value

- 

-             update_status.side_effect = self.mock_update

-             hf.side_effect = self.mock_finish

-             sp.side_effect = self.mock_start

- 

-             finish = [[]]

-             for grp in finish_order:

-                 finish.append([(self.parts[p].proc, self.parts[p]) for p in grp])

- 

-             cfp.side_effect = finish

-             wait.side_effect = [(self.parts[p].proc.pid, 0) for p in wait_results]

- 

-             func(self)

- 

-             self.assertEqual(lp.call_args_list, [mock.call("hardlink")])

- 

-         return worker

- 

-     return decorator

- 

- 

- @mock.patch("os.wait")

- @mock.patch("pungi_utils.orchestrator.start_part")

- @mock.patch("pungi_utils.orchestrator.handle_finished")

- @mock.patch("pungi_utils.orchestrator.check_finished_processes")

- @mock.patch("pungi_utils.orchestrator.update_status")

- @mock.patch("pungi_utils.orchestrator.linker_pool")

- class TestRunAll(BaseTestCase):

-     def setUp(self):

-         self.maxDiff = None

-         self.conf = mock.Mock(name="global_config")

-         self.calls = []

- 

-     def mock_update(self, global_config, parts):

-         self.assertEqual(global_config, self.conf)

-         self.assertEqual(parts, self.parts)

-         self.calls.append("update_status")

- 

-     def mock_start(self, global_config, parts, part):

-         self.assertEqual(global_config, self.conf)

-         self.assertEqual(parts, self.parts)

-         self.calls.append(("start_part", part.name))

-         part.status = o.Status.STARTED

-         return part.proc

- 

-     @property

-     def sorted_calls(self):

-         """Sort the consecutive calls of the same function based on the argument."""

- 

-         def key(val):

-             return val[0] if isinstance(val, tuple) else val

- 

-         return list(

-             itertools.chain.from_iterable(

-                 sorted(grp, key=operator.itemgetter(1))

-                 for _, grp in itertools.groupby(self.calls, key)

-             )

-         )

- 

-     def mock_finish(self, global_config, linker, parts, proc, part):

-         self.assertEqual(global_config, self.conf)

-         self.assertEqual(linker, self.linker)

-         self.assertEqual(parts, self.parts)

-         self.calls.append(("handle_finished", part.name))

-         for child in parts.values():

-             if child.parent == part.name:

-                 child.status = o.Status.BLOCKED if part.fails else o.Status.READY

-         part.status = "DOOMED" if part.fails else "FINISHED"

- 

-     @with_mocks(

-         [_Part("fst"), _Part("snd", parent="fst")], [["fst"], ["snd"]], ["fst", "snd"]

-     )

-     def test_sequential(self):

-         o.run_all(self.conf, self.parts)

- 

-         self.assertEqual(

-             self.sorted_calls,

-             [

-                 # First iteration starts fst

-                 "update_status",

-                 ("start_part", "fst"),

-                 # Second iteration handles finish of fst and starts snd

-                 "update_status",

-                 ("handle_finished", "fst"),

-                 ("start_part", "snd"),

-                 # Third iteration handles finish of snd

-                 "update_status",

-                 ("handle_finished", "snd"),

-                 # Final update of status

-                 "update_status",

-             ],

-         )

- 

-     @with_mocks([_Part("fst"), _Part("snd")], [["fst", "snd"]], ["fst"])

-     def test_parallel(self):

-         o.run_all(self.conf, self.parts)

- 

-         self.assertEqual(

-             self.sorted_calls,

-             [

-                 # First iteration starts both fst and snd

-                 "update_status",

-                 ("start_part", "fst"),

-                 ("start_part", "snd"),

-                 # Second iteration handles finish of both of them

-                 "update_status",

-                 ("handle_finished", "fst"),

-                 ("handle_finished", "snd"),

-                 # Final update of status

-                 "update_status",

-             ],

-         )

- 

-     @with_mocks(

-         [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")],

-         [["1"], ["2", "3"]],

-         ["1", "2"],

-     )

-     def test_waits_for_dep_then_parallel_with_simultaneous_end(self):

-         o.run_all(self.conf, self.parts)

- 

-         self.assertEqual(

-             self.sorted_calls,

-             [

-                 # First iteration starts first part

-                 "update_status",

-                 ("start_part", "1"),

-                 # Second iteration starts 2 and 3

-                 "update_status",

-                 ("handle_finished", "1"),

-                 ("start_part", "2"),

-                 ("start_part", "3"),

-                 # Both 2 and 3 end in third iteration

-                 "update_status",

-                 ("handle_finished", "2"),

-                 ("handle_finished", "3"),

-                 # Final update of status

-                 "update_status",

-             ],

-         )

- 

-     @with_mocks(

-         [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")],

-         [["1"], ["3"], ["2"]],

-         ["1", "3", "2"],

-     )

-     def test_waits_for_dep_then_parallel_with_different_end_times(self):

-         o.run_all(self.conf, self.parts)

- 

-         self.assertEqual(

-             self.sorted_calls,

-             [

-                 # First iteration starts first part

-                 "update_status",

-                 ("start_part", "1"),

-                 # Second iteration starts 2 and 3

-                 "update_status",

-                 ("handle_finished", "1"),

-                 ("start_part", "2"),

-                 ("start_part", "3"),

-                 # Third iteration sees 3 finish

-                 "update_status",

-                 ("handle_finished", "3"),

-                 # Fourth iteration, 2 finishes

-                 "update_status",

-                 ("handle_finished", "2"),

-                 # Final update of status

-                 "update_status",

-             ],

-         )

- 

-     @with_mocks(

-         [_Part("fst", fails=True), _Part("snd", parent="fst")], [["fst"]], ["fst"]

-     )

-     def test_blocked(self):

-         o.run_all(self.conf, self.parts)

- 

-         self.assertEqual(

-             self.sorted_calls,

-             [

-                 # First iteration starts first part

-                 "update_status",

-                 ("start_part", "fst"),

-                 # Second iteration handles fail of first part

-                 "update_status",

-                 ("handle_finished", "fst"),

-                 # Final update of status

-                 "update_status",

-             ],

-         )

- 

- 

- @mock.patch("pungi_utils.orchestrator.get_compose_dir")

- class TestGetTargetDir(BaseTestCase):

-     def test_with_absolute_path(self, gcd):

-         config = {"target": "/tgt", "compose_type": "nightly"}

-         cfg = mock.Mock()

-         cfg.get.side_effect = lambda _, k: config[k]

-         ci = mock.Mock()

-         res = o.get_target_dir(cfg, ci, None, reldir="/checkout")

-         self.assertEqual(res, gcd.return_value)

-         self.assertEqual(

-             gcd.call_args_list,

-             [mock.call("/tgt", ci, compose_type="nightly", compose_label=None)],

-         )

- 

-     def test_with_relative_path(self, gcd):

-         config = {"target": "tgt", "compose_type": "nightly"}

-         cfg = mock.Mock()

-         cfg.get.side_effect = lambda _, k: config[k]

-         ci = mock.Mock()

-         res = o.get_target_dir(cfg, ci, None, reldir="/checkout")

-         self.assertEqual(res, gcd.return_value)

-         self.assertEqual(

-             gcd.call_args_list,

-             [

-                 mock.call(

-                     "/checkout/tgt", ci, compose_type="nightly", compose_label=None

-                 )

-             ],

-         )

- 

- 

- class TestComputeStatus(BaseTestCase):

-     @parameterized.expand(

-         [

-             ([("FINISHED", False)], "FINISHED"),

-             ([("FINISHED", False), ("STARTED", False)], "STARTED"),

-             ([("FINISHED", False), ("STARTED", False), ("WAITING", False)], "STARTED"),

-             ([("FINISHED", False), ("DOOMED", False)], "DOOMED"),

-             (

-                 [("FINISHED", False), ("BLOCKED", True), ("DOOMED", True)],

-                 "FINISHED_INCOMPLETE",

-             ),

-             ([("FINISHED", False), ("BLOCKED", False), ("DOOMED", True)], "DOOMED"),

-             ([("FINISHED", False), ("DOOMED", True)], "FINISHED_INCOMPLETE"),

-             ([("FINISHED", False), ("STARTED", False), ("DOOMED", False)], "STARTED"),

-         ]

-     )

-     def test_cases(self, statuses, expected):

-         self.assertEqual(o.compute_status(statuses), expected)

- 

- 

- class TestUpdateStatus(PungiTestCase):

-     def test_updating(self):

-         os.makedirs(os.path.join(self.topdir, "compose/metadata"))

-         conf = o.Config(

-             self.topdir, "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"]

-         )

-         o.update_status(

-             conf,

-             {"1": _Part("1", status="FINISHED"), "2": _Part("2", status="STARTED")},

-         )

-         self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED")

-         self.assertFileContent(

-             os.path.join(self.topdir, "compose/metadata/parts.json"),

-             dedent(

-                 """\

-                 {

-                   "1": {

-                     "path": "/path/to/1",

-                     "status": "FINISHED"

-                   },

-                   "2": {

-                     "path": "/path/to/2",

-                     "status": "STARTED"

-                   }

-                 }

-                 """

-             ),

-         )

- 

- 

- @mock.patch("pungi_utils.orchestrator.get_target_dir")

- class TestPrepareComposeDir(PungiTestCase):

-     def setUp(self):

-         super(TestPrepareComposeDir, self).setUp()

-         self.conf = mock.Mock(name="config")

-         self.main_config = "/some/config"

-         self.compose_info = mock.Mock(name="compose_info")

- 

-     def test_new_compose(self, gtd):

-         def mock_get_target(conf, compose_info, label, reldir):

-             self.assertEqual(conf, self.conf)

-             self.assertEqual(compose_info, self.compose_info)

-             self.assertEqual(label, args.label)

-             self.assertEqual(reldir, "/some")

-             touch(os.path.join(self.topdir, "work/global/composeinfo-base.json"), "WOO")

-             return self.topdir

- 

-         gtd.side_effect = mock_get_target

-         args = mock.Mock(name="args", spec=["label"])

-         retval = o.prepare_compose_dir(

-             self.conf, args, self.main_config, self.compose_info

-         )

-         self.assertEqual(retval, self.topdir)

-         self.assertFileContent(

-             os.path.join(self.topdir, "compose/metadata/composeinfo.json"), "WOO"

-         )

-         self.assertTrue(os.path.isdir(os.path.join(self.topdir, "logs")))

-         self.assertTrue(os.path.isdir(os.path.join(self.topdir, "parts")))

-         self.assertTrue(os.path.isdir(os.path.join(self.topdir, "work/global")))

-         self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED")

- 

-     def test_restarting_compose(self, gtd):

-         args = mock.Mock(name="args", spec=["label", "compose_path"])

-         retval = o.prepare_compose_dir(

-             self.conf, args, self.main_config, self.compose_info

-         )

-         self.assertEqual(gtd.call_args_list, [])

-         self.assertEqual(retval, args.compose_path)

- 

- 

- class TestLoadPartsMetadata(PungiTestCase):

-     def test_loading(self):

-         touch(

-             os.path.join(self.topdir, "compose/metadata/parts.json"), '{"foo": "bar"}'

-         )

-         conf = mock.Mock(target=self.topdir)

- 

-         self.assertEqual(o.load_parts_metadata(conf), {"foo": "bar"})

- 

- 

- @mock.patch("pungi_utils.orchestrator.load_parts_metadata")

- class TestSetupForRestart(BaseTestCase):

-     def setUp(self):

-         self.conf = mock.Mock(name="global_config")

- 

-     def test_restart_ok(self, lpm):

-         lpm.return_value = {

-             "p1": {"status": "FINISHED", "path": "/p1"},

-             "p2": {"status": "DOOMED", "path": "/p2"},

-         }

-         parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")}

- 

-         o.setup_for_restart(self.conf, parts, ["p2"])

- 

-         self.assertEqual(parts["p1"].status, "FINISHED")

-         self.assertEqual(parts["p1"].path, "/p1")

-         self.assertEqual(parts["p2"].status, "READY")

-         self.assertEqual(parts["p2"].path, None)

- 

-     def test_restart_one_blocked_one_ok(self, lpm):

-         lpm.return_value = {

-             "p1": {"status": "DOOMED", "path": "/p1"},

-             "p2": {"status": "DOOMED", "path": "/p2"},

-             "p3": {"status": "WAITING", "path": None},

-         }

-         parts = {

-             "p1": _Part("p1"),

-             "p2": _Part("p2", parent="p1"),

-             "p3": _Part("p3", parent="p2"),

-         }

- 

-         o.setup_for_restart(self.conf, parts, ["p1", "p3"])

- 

-         self.assertEqual(parts["p1"].status, "READY")

-         self.assertEqual(parts["p1"].path, None)

-         self.assertEqual(parts["p2"].status, "DOOMED")

-         self.assertEqual(parts["p2"].path, "/p2")

-         self.assertEqual(parts["p3"].status, "WAITING")

-         self.assertEqual(parts["p3"].path, None)

- 

-     def test_restart_all_blocked(self, lpm):

-         lpm.return_value = {

-             "p1": {"status": "DOOMED", "path": "/p1"},

-             "p2": {"status": "STARTED", "path": "/p2"},

-         }

-         parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")}

- 

-         with self.assertRaises(RuntimeError):

-             o.setup_for_restart(self.conf, parts, ["p2"])

- 

-         self.assertEqual(parts["p1"].status, "DOOMED")

-         self.assertEqual(parts["p1"].path, "/p1")

-         self.assertEqual(parts["p2"].status, "WAITING")

-         self.assertEqual(parts["p2"].path, None)

- 

- 

- @mock.patch("atexit.register")

- @mock.patch("kobo.shortcuts.run")

- class TestRunKinit(BaseTestCase):

-     def test_without_config(self, run, register):

-         conf = mock.Mock()

-         conf.getboolean.return_value = False

- 

-         o.run_kinit(conf)

- 

-         self.assertEqual(run.call_args_list, [])

-         self.assertEqual(register.call_args_list, [])

- 

-     @mock.patch.dict("os.environ")

-     def test_with_config(self, run, register):

-         conf = mock.Mock()

-         conf.getboolean.return_value = True

-         conf.get.side_effect = lambda section, option: option

- 

-         o.run_kinit(conf)

- 

-         self.assertEqual(

-             run.call_args_list,

-             [mock.call(["kinit", "-k", "-t", "kerberos_keytab", "kerberos_principal"])],

-         )

-         self.assertEqual(

-             register.call_args_list, [mock.call(os.remove, os.environ["KRB5CCNAME"])]

-         )

- 

- 

- @mock.patch.dict("os.environ", {}, clear=True)

- class TestGetScriptEnv(BaseTestCase):

-     def test_without_metadata(self):

-         env = o.get_script_env("/foobar")

-         self.assertEqual(env, {"COMPOSE_PATH": "/foobar"})

- 

-     def test_with_metadata(self):

-         compose_dir = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4")

-         env = o.get_script_env(compose_dir)

-         self.maxDiff = None

-         self.assertEqual(

-             env,

-             {

-                 "COMPOSE_PATH": compose_dir,

-                 "COMPOSE_ID": "DP-1.0-20161013.t.4",

-                 "COMPOSE_DATE": "20161013",

-                 "COMPOSE_TYPE": "test",

-                 "COMPOSE_RESPIN": "4",

-                 "COMPOSE_LABEL": "",

-                 "RELEASE_ID": "DP-1.0",

-                 "RELEASE_NAME": "Dummy Product",

-                 "RELEASE_SHORT": "DP",

-                 "RELEASE_VERSION": "1.0",

-                 "RELEASE_TYPE": "ga",

-                 "RELEASE_IS_LAYERED": "",

-             },

-         )

- 

- 

- class TestRunScripts(BaseTestCase):

-     @mock.patch("pungi_utils.orchestrator.get_script_env")

-     @mock.patch("kobo.shortcuts.run")

-     def test_run_scripts(self, run, get_env):

-         commands = """

-            date

-            env

-            """

- 

-         o.run_scripts("pref_", "/tmp/compose", commands)

- 

-         self.assertEqual(

-             run.call_args_list,

-             [

-                 mock.call(

-                     "date",

-                     logfile="/tmp/compose/logs/pref_0.log",

-                     env=get_env.return_value,

-                 ),

-                 mock.call(

-                     "env",

-                     logfile="/tmp/compose/logs/pref_1.log",

-                     env=get_env.return_value,

-                 ),

-             ],

-         )

- 

- 

- @mock.patch("pungi.notifier.PungiNotifier")

- class TestSendNotification(BaseTestCase):

-     def test_no_command(self, notif):

-         o.send_notification("/foobar", None, None)

-         self.assertEqual(notif.mock_calls, [])

- 

-     @mock.patch("pungi.util.load_config")

-     def test_with_command_and_translate(self, load_config, notif):

-         compose_dir = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4")

-         load_config.return_value = {

-             "translate_paths": [(os.path.dirname(compose_dir), "http://example.com")],

-         }

-         parts = {"foo": mock.Mock()}

- 

-         o.send_notification(compose_dir, "handler", parts)

- 

-         self.assertEqual(len(notif.mock_calls), 2)

-         self.assertEqual(notif.mock_calls[0], mock.call(["handler"]))

-         _, args, kwargs = notif.mock_calls[1]

-         self.assertEqual(args, ("status-change",))

-         self.assertEqual(

-             kwargs,

-             {

-                 "status": "FINISHED",

-                 "workdir": compose_dir,

-                 "location": "http://example.com/DP-1.0-20161013.t.4",

-                 "compose_id": "DP-1.0-20161013.t.4",

-                 "compose_date": "20161013",

-                 "compose_type": "test",

-                 "compose_respin": "4",

-                 "compose_label": None,

-                 "release_id": "DP-1.0",

-                 "release_name": "Dummy Product",

-                 "release_short": "DP",

-                 "release_version": "1.0",

-                 "release_type": "ga",

-                 "release_is_layered": False,

-             },

-         )

-         self.assertEqual(load_config.call_args_list, [mock.call(parts["foo"].config)])

This was never actually used as far as I know. It does not solve any problem Fedora has, and internally a different solution was used.

Technically removing the executable is a breaking change, which should be reflected in a version number. If 4.4 would cover this sufficiently, it may be possible to merge it.

There are still some occurrences in the repo: pungi.spec, pungi/scripts/config_dump.py, Makefile and doc/multi_compose.rst

rebased onto c4a4e04d7865b7484a1fb642ec76ea9c1b5a3e41

2 years ago

Looks good to me. :thumbsup:

rebased onto b7adbf8

10 months ago

Pull-Request has been merged by lsedlar

10 months ago