#1416 backend: automatically batch the createrepo requests
Merged 3 years ago by praiskup. Opened 3 years ago by praiskup.
Unknown source batch-createrepo  into  master

@@ -1,9 +1,11 @@

+ import json

  import os

  from subprocess import Popen, PIPE

  

  from shlex import split

  from setproctitle import getproctitle, setproctitle

  from oslo_concurrency import lockutils

+ from copr_backend.helpers import get_redis_connection

  

  # todo: add logging here

  # from copr_backend.helpers import BackendConfigReader, get_redis_logger
@@ -13,6 +15,11 @@

  from .exceptions import CreateRepoError

  

  

+ # Some reasonable limit here for exceptional (probably buggy) situations.

+ # This is here mostly to not overflow the execve() stack limits.

+ MAX_IN_BATCH = 100

+ 

+ 

  def run_cmd_unsafe(comm_str, lock_name, lock_path="/var/lock/copr-backend"):

      # log.info("Running command: {}".format(comm_str))

      comm = split(comm_str)
@@ -184,3 +191,182 @@

      # Automatic createrepo disabled.  Even so, we still need to createrepo in

      # special "devel" directory so we can later build packages against it.

      return createrepo_unsafe(path, base_url=base_url, dest_dir="devel")

+ 

+ 

+ class BatchedCreaterepo:

+     """

+     Group a "group-able" set of pending createrepo tasks, and execute

+     the createrepo_c binary only once for the batch.  As a result, some

+     `copr-repo` processes do slightly more work (negligible difference compared

+     to overall createrepo_c cost) but some do nothing.

+ 

+     Note that this is wrapped into separate class mostly to make the unittesting

+     easier.

+ 

+     The process goes like this:

+ 

+     1. BatchedCreaterepo() is instantiated by caller.

+     2. Before caller acquires createrepo lock, caller notifies other processes

+        by make_request().

+     3. Caller acquires createrepo lock.

+     4. Caller assures that no other process already did it's task, by calling

+        check_processed() method (if done, caller _ends_).  Others are now

+        waiting for lock so they can not process our task in the meantime.

+     5. Caller get's "unified" createrepo options that are needed by the other

+        queued processes by calling options() method.  These options are then

+        merged with options needed by caller's task, and createrepo_c is

+        executed.  Now we are saving the resources.

+     6. The commit() method is called (under lock) to notify others that they

+        don't have to duplicate the efforts and waste resources.

+     """

+     # pylint: disable=too-many-instance-attributes

+ 

+     def __init__(self, dirname, full, add, delete, log,

+                  devel=False,

+                  appstream=True,

+                  backend_opts=None,

+                  noop=False):

+         self.noop = noop

+         self.log = log

+         self.dirname = dirname

+         self.devel = devel

+         self.appstream = appstream

+ 

+ 

+         if not backend_opts:

+             self.log.error("can't get access to redis, batch disabled")

+             self.noop = True

+             return

+ 

+         self._pid = os.getpid()

+         self._json_redis_task = json.dumps({

+             "appstream": appstream,

+             "devel": devel,

+             "add": add,

+             "delete": delete,

+             "full": full,

+         })

+ 

+         self.notify_keys = []

+         self.redis = get_redis_connection(backend_opts)

+ 

+     @property

+     def key(self):

+         """ Our instance ID (key in Redis DB) """

+         return "createrepo_batched::{}::{}".format(

+             self.dirname, self._pid)

+ 

+     @property

+     def key_pattern(self):

+         """ Redis key pattern for potential tasks we can batch-process """

+         return "createrepo_batched::{}::*".format(self.dirname)

+ 

+     def make_request(self):

+         """ Request the task into Redis DB.  Run _before_ lock! """

+         if self.noop:

+             return None

+         self.redis.hset(self.key, "task", self._json_redis_task)

+         return self.key

+ 

+     def check_processed(self):

+         """

+         Drop our entry from Redis DB (if any), and return True if the task is

+         already processed.  Requires lock!

+         """

+         if self.noop:

+             return False

+ 

+         self.log.debug("Checking if we have to start actually")

+         status = self.redis.hget(self.key, "status")

+         self.redis.delete(self.key)

+         if status is None:

+             # not yet processed

+             return False

+ 

+         self.log.debug("Task has already status %s", status)

+         return status == "success"

+ 

+     def options(self):

+         """

+         Get the options from other _compatible_ (see below) Redis tasks, and

+         plan the list of tasks in self.notify_keys[] that we will notify in

+         commit().

+ 

+         We don't merge tasks that have a different 'devel' parameter.  We

+         wouldn't be able to tell what sub-tasks are to be created in/out the

+         devel subdirectory.

+ 

+         Similarly, we don't group tasks that have different 'appstream' value.

+         That's because normally (not-grouped situation) the final state of

+         repository would be order dependent => e.g. if build_A requires

+         appstream metadata, and build_B doesn't, the B appstream metadata would

+         be added only if build_A was processed after build_B (not vice versa).

+         This problem is something we don't want to solve at options() level, and

+         we want rather let two concurrent processes in race (it requires at

+         least one more createrepo run, but the "appstream" flag shouldn't change

+         frequently anyway).

+         """

+         add = set()

+         delete = set()

+         full = False

+ 

+         if self.noop:

+             return (full, add, delete)

+ 

+         for key in self.redis.keys(self.key_pattern):

+             assert key != self.key

+ 

+             task_dict = self.redis.hgetall(key)

+             if task_dict.get("status") is not None:

+                 # skip processed tasks

+                 self.log.info("Key %s already processed, skip", key)

+                 continue

+ 

+             task_opts = json.loads(task_dict["task"])

+ 

+             skip = False

+             for attr in ["devel", "appstream"]:

+                 our_value = getattr(self, attr)

+                 if task_opts[attr] != our_value:

+                     self.log.info("'%s' attribute doesn't match: %s/%s",

+                                   attr, task_opts[attr], our_value)

+                     skip = True

+                     break

+ 

+             if skip:

+                 continue

+ 

+             # we can process this task!

+             self.notify_keys.append(key)

+ 

+             # inherit "full" request from others

+             if task_opts["full"]:

+                 full = True

+                 add = set()

+ 

+             # append "add" tasks, if that makes sense

+             if not full:

+                 add.update(task_opts["add"])

+ 

+             # always process the delete requests

+             delete.update(task_opts["delete"])

+ 

+             if len(self.notify_keys) >= MAX_IN_BATCH:

+                 self.log.info("Batch copr-repo limit %s reached, skip the rest",

+                               MAX_IN_BATCH)

+                 break

+ 

+         return (full, add, delete)

+ 

+     def commit(self):

+         """

+         Report that we processed other createrepo requests.  We don't report

+         about failures, we rather kindly let the responsible processes to re-try

+         the createrepo tasks.  Requires lock!

+         """

+         if self.noop:

+             return

+ 

+         for key in self.notify_keys:

+             self.log.info("Notifying %s that we succeeded", key)

+             self.redis.hset(key, "status", "success")

@@ -587,7 +587,7 @@

      """

      Execute 'copr-repo' tool, and return True if the command succeeded.

      """

-     cmd = ['copr-repo', directory]

+     cmd = ["copr-repo", "--batched", directory]

      def subdirs(option, subdirs):

          args = []

          if not subdirs:

file modified
+84 -17
@@ -21,7 +21,7 @@

  import oslo_concurrency.lockutils

  

  from copr_backend.helpers import get_redis_logger, BackendConfigReader

- 

+ from copr_backend.createrepo import BatchedCreaterepo

  

  class CommandException(Exception):

      pass
@@ -70,9 +70,13 @@

      parser.add_argument('--add', action='append', metavar='SUBDIR', default=[],

                          type=arg_parser_subdir_type)

      parser.add_argument('--devel', action='store_true', default=False)

-     parser.add_argument('--no-appstream-metadata', action='store_true',

-                         default=False)

+     parser.add_argument('--no-appstream-metadata', dest="appstream",

+                         action='store_false', default=True)

      parser.add_argument('--log-to-stdout', action='store_true')

+     parser.add_argument("--batched", action="store_true",

+                         help="Try try to batch this request with requests "

+                              "from other processes.  When specified, the "

+                              "process needs an access to Redis DB.")

      parser.add_argument('directory')

      return parser

  
@@ -94,6 +98,7 @@

      except:

          # Useful if copr-backend isn't correctly configured, or when

          # copr-backend isn't installed (mostly developing and unittesting).

+         opts.backend_opts = None

          opts.results_baseurl = 'https://example.com/results'

  

      # obtain logger object
@@ -112,6 +117,18 @@

      opts.log = get_redis_logger(opts.backend_opts, logger_name, "modifyrepo")

  

  

+ def filter_existing(opts, subdirs):

+     """ Return items from ``subdirs`` that exist """

+     new_subdirs = []

+     for subdir in subdirs:

+         full_path = os.path.join(opts.directory, subdir)

+         if not os.path.exists(full_path):

+             opts.log.warning("Subdirectory %s doesn't exist", subdir)

+             continue

+         new_subdirs.append(subdir)

+     return new_subdirs

+ 

+ 

  def run_createrepo(opts):

      createrepo_cmd = ['/usr/bin/createrepo_c', '--database', '--ignore-lock',

                        '--local-sqlite', '--cachedir', '/tmp/', '--workers', '8',
@@ -124,7 +141,7 @@

      # we assume that we could skip the call at the beginning

      createrepo_run_needed = False

  

-     if opts.delete or opts.add:

+     if not opts.full:

          # request for modification, is the repo actually initialized?  Otherwise

          # we do full createrepo_c run.

          repodata_xml = os.path.join(opts.directory, 'repodata', 'repomd.xml')
@@ -133,21 +150,17 @@

          else:

              # Either --add or --delete was specified, but there are no metadata,

              # yet.  This would be bug in most cases (most likely the initial

-             # createrepo run after project creation failed).  But would be wrong

-             # to not run "full" createrepo in such case.

+             # createrepo run after project creation failed).  But it would be

+             # wrong to not run "full" createrepo in such case.

              createrepo_run_needed = True

      else:

          # full createrepo run requested

          createrepo_run_needed = True

  

-     for subdir in opts.delete:

-         subdir_full = os.path.join(opts.directory, subdir)

- 

-         # if the sub-directory doesn't exist (removed by prunerepo, e.g.) it is

-         # better to save one argument and perhaps save one whole createrepo run.

-         if not os.path.exists(subdir_full):

-             continue

+     opts.add = filter_existing(opts, opts.add)

+     opts.delete = filter_existing(opts, opts.delete)

  

+     for subdir in opts.delete:

          # something is going to be deleted

          createrepo_run_needed = True

          createrepo_cmd += ['--excludes', '*{}/*'.format(subdir)]
@@ -201,7 +214,7 @@

  

  

  def add_appdata(opts):

-     if opts.devel or opts.no_appstream_metadata:

+     if opts.devel or not opts.appstream:

          opts.log.info("appstream-builder skipped, /devel subdir or "

                        "--no-appstream-metadata specified")

          return
@@ -285,7 +298,36 @@

          yield

  

  

- def main_locked(opts, log):

+ def main_locked(opts, batch, log):

+     if batch.check_processed():

+         log.info("Task processed by other process")

+         return

+ 

+     # Merge others' tasks with ours (if any).

+     (batch_full, batch_add, batch_delete) = batch.options()

+ 

+     if batch_full:

+         log.info("Others requested full createrepo")

+         opts.full = True

+         # There's no point in searching for directories to be added, but we

+         # still want to delete!

+         opts.add = []

+ 

+     if batch_add:

+         if opts.full:

+             log.info("Ignoring subdirs %s requested to --add by others",

+                      ", ".join(batch_add))

+         else:

+             opts.add += list(batch_add)

+ 

+     opts.delete += list(batch_delete)

+ 

+     dont_add = set(opts.delete).intersection(opts.add)

+     if dont_add:

+         log.info("Subdirs %s are requested to both added and removed, "

+                  "so we only remove them", ", ".join(dont_add))

+         opts.add = list(set(opts.add) - dont_add)

+ 

      # (re)create the repository

      if not run_createrepo(opts):

          opts.log.warning("no-op")
@@ -300,11 +342,14 @@

      add_appdata(opts)

      add_modular_metadata(opts)

  

+     # while we still hold the lock, notify others we processed their task

+     batch.commit()

+ 

      log.info("%s run successful", sys.argv[0])

  

  

  def process_directory_path(opts):

-     helper_path = os.path.abspath(opts.directory)

+     helper_path = opts.directory = os.path.realpath(opts.directory)

      helper_path, opts.chroot = os.path.split(helper_path)

      opts.projectdir = helper_path

      helper_path, opts.dirname = os.path.split(helper_path)
@@ -317,6 +362,9 @@

  def main():

      opts = get_arg_parser().parse_args()

  

+     # neither --add nor --delete means we do full createrepo run

+     opts.full = not(opts.add or opts.delete)

+ 

      # try to setup logging based on copr-be.conf

      process_backend_config(opts)

  
@@ -326,9 +374,28 @@

  

      assert_new_createrepo()

  

+     # Initialize the batch structure.  It's methods are "no-op"s when

+     # the --batch option isn't specified.

+     batch = BatchedCreaterepo(

+         opts.directory,

+         opts.full,

+         opts.add,

+         opts.delete,

+         log=opts.log,

+         devel=opts.devel,

+         appstream=opts.appstream,

+         backend_opts=opts.backend_opts,

+         noop=not opts.batched)

+ 

+     # If appropriate, put our task to Redis DB and allow _others_ to process our

+     # own task.  This needs to be run _before_ the lock() call.

+     batch.make_request()

+ 

      try:

          with lock(opts):

-             main_locked(opts, opts.log)

+             main_locked(opts, batch, opts.log)

+             # skip commit if main_locked() raises exception

+             batch.commit()

  

      except CommandException:

          opts.log.error("Sub-command failed")

file modified
+17
@@ -112,3 +112,20 @@

          shutil.copy(source, chdir)

  

      yield ctx

+ 

+ @fixture

+ def f_third_build(f_second_build):

+     """

+     Same as ``f_second_build``, but one more build.

+     """

+     ctx = f_second_build

+     source = os.path.join(os.environ["TEST_DATA_DIRECTORY"],

+                           "build_results", "00848963-example",

+                           "example-1.0.14-1.fc30.x86_64.rpm")

+     build = '00000003-example'

+     ctx.builds.append(build)

+     for chroot in ctx.chroots:

+         chdir = os.path.join(ctx.empty_dir, chroot, build)

+         os.mkdir(chdir)

+         shutil.copy(source, chdir)

+     yield ctx

file modified
+2 -2
@@ -191,7 +191,7 @@

          for call in mc_call.call_args_list:

              args = call[0][0]

              assert args[0] == 'copr-repo'

-             dirs.add(args[1])

+             dirs.add(args[2])

  

          for chroot in ['srpm-builds', 'fedora-17-i386', 'fedora-17-x86_64']:

              dir = '/var/lib/copr/public_html/results/thrnciar/destination-copr/' + chroot
@@ -764,7 +764,7 @@

          assert test_action.run() == ActionResult.SUCCESS

  

          for chroot in ['fedora-20-x86_64', 'epel-6-i386']:

-             cmd = ['copr-repo',

+             cmd = ["copr-repo", "--batched",

                     os.path.join(self.test_project_dir, chroot)]

              exp_call = mock.call(cmd, timeout=None)

              assert exp_call in mc_sp_call.call_args_list

@@ -30,6 +30,7 @@

  from copr_backend.exceptions import CoprBackendSrpmError

  

  import testlib

+ from testlib import assert_logs_exist, assert_logs_dont_exist

  from testlib.repodata import load_primary_xml

  

  # pylint: disable=redefined-outer-name,protected-access
@@ -615,34 +616,6 @@

                  found = True

          assert (found, topic) == (True, topic)

  

- def assert_logs_exist(messages, caplog):

-     """

-     Search through caplog entries for log records having all the messages in

-     ``messages`` list.

-     """

-     search_for = set(messages)

-     found = set()

-     for record in caplog.record_tuples:

-         _, _, msg = record

-         for search in search_for:

-             if search in msg:

-                 found.add(search)

-     assert found == search_for

- 

- def assert_logs_dont_exist(messages, caplog):

-     """

-     Search through caplog entries for log records having all the messages in

-     ``messages`` list.

-     """

-     search_for = set(messages)

-     found = set()

-     for record in caplog.record_tuples:

-         _, _, msg = record

-         for search in search_for:

-             if search in msg:

-                 found.add(search)

-     assert found == set({})

- 

  def test_fe_disallowed_start(f_build_rpm_sign_on, caplog):

      config = f_build_rpm_sign_on

      worker = config.bw

@@ -1,5 +1,7 @@

  import os

  import copy

+ import json

+ import logging

  import tarfile

  import tempfile

  import shutil
@@ -9,9 +11,22 @@

  from unittest import mock

  from unittest.mock import MagicMock

  

- from copr_backend.createrepo import createrepo, createrepo_unsafe, add_appdata, run_cmd_unsafe

+ from copr_backend.createrepo import (

+     BatchedCreaterepo,

+     createrepo,

+     createrepo_unsafe,

+     MAX_IN_BATCH,

+     run_cmd_unsafe,

+ )

+ 

+ from copr_backend.helpers import BackendConfigReader, get_redis_connection

  from copr_backend.exceptions import CreateRepoError

  

+ import testlib

+ from testlib import assert_logs_exist, AsyncCreaterepoRequestFactory

+ 

+ # pylint: disable=attribute-defined-outside-init

+ 

  @mock.patch('copr_backend.createrepo.createrepo_unsafe')

  @mock.patch('copr_backend.createrepo.add_appdata')

  @mock.patch('copr_backend.helpers.CoprClient')
@@ -235,3 +250,203 @@

  

              createrepo_unsafe(path, base_url=self.base_url, dest_dir="devel")

              assert os.path.exists(os.path.join(path, "devel"))

+ 

+ 

+ class TestBatchedCreaterepo:

+     def setup_method(self):

+         self.workdir = tempfile.mkdtemp(prefix="copr-batched-cr-test-")

+         self.config_file = testlib.minimal_be_config(self.workdir, {

+             "redis_db": 9,

+             "redis_port": 7777,

+         })

+         self.config = BackendConfigReader(self.config_file).read()

+         self.redis = get_redis_connection(self.config)

+         self.request_createrepo = AsyncCreaterepoRequestFactory(self.redis)

+         self.redis.flushdb()

+         self._pid = os.getpid()

+ 

+     def teardown_method(self):

+         shutil.rmtree(self.workdir)

+         self.redis.flushdb()

+ 

+     def _prep_batched_repo(self, some_dir, full=False, add=None, delete=None):

+         self.bcr = BatchedCreaterepo(

+             some_dir,

+             full,

+             add if add is not None else ["subdir_add_1", "subdir_add_2"],

+             delete if delete is not None else ["subdir_del_1", "subdir_del_2"],

+             logging.getLogger(),

+             backend_opts=self.config,

+         )

+         return self.bcr

+ 

+     def test_batched_createrepo_normal(self):

+         some_dir = "/some/dir/name:pr:135"

+         bcr = self._prep_batched_repo(some_dir)

+         bcr.make_request()

+ 

+         keys = self.redis.keys()

+         assert len(keys) == 1

+         assert keys[0].startswith("createrepo_batched::{}::".format(some_dir))

+         redis_dict = self.redis.hgetall(keys[0])

+         redis_task = json.loads(redis_dict["task"])

+         assert len(redis_dict) == 1

+         assert redis_task == {

+             "appstream": True,

+             "devel": False,

+             "add": ["subdir_add_1", "subdir_add_2"],

+             "delete": ["subdir_del_1", "subdir_del_2"],

+             "full": False,

+         }

+         self.request_createrepo.get(some_dir)

+         # appstream=False has no effect, others beat it, appstream=False

+         # makes it non-matching.

+         self.request_createrepo.get(some_dir, {"add": ["add_2"], "appstream": False})

+         self.request_createrepo.get(some_dir, {"add": [], "delete": ["del_1"]})

+         self.request_createrepo.get(some_dir, {"add": [], "delete": ["del_2"]})

+         assert not bcr.check_processed()

+         assert bcr.options() == (False,

+                                  set(["add_1"]),

+                                  set(["del_1", "del_2"]))

+         assert len(bcr.notify_keys) == 3

+ 

+         our_key = keys[0]

+ 

+         bcr.commit()

+         keys = self.redis.keys()

+         count_non_finished = 0

+         for key in keys:

+             assert key != our_key

+             task_dict = self.redis.hgetall(key)

+             if "status" in task_dict:

+                 assert task_dict["status"] == "success"

+             else:

+                 count_non_finished += 1

+         assert count_non_finished == 1

+ 

+ 

+     def test_batched_createrepo_already_done(self):

+         some_dir = "/some/dir/name"

+         bcr = self._prep_batched_repo(some_dir)

+         key = bcr.make_request()

+         self.request_createrepo.get(some_dir)

+         self.redis.hset(key, "status", "success")

+         assert bcr.check_processed()

+         assert self.redis.hgetall(key) == {}

+         assert bcr.notify_keys == []  # nothing to commit()

+ 

+     def test_batched_createrepo_other_already_done(self, caplog):

+         some_dir = "/some/dir/name:pr:3"

+         bcr = self._prep_batched_repo(some_dir)

+         key = bcr.make_request()

+ 

+         # create two other requests, one is not to be processed

+         self.request_createrepo.get(some_dir)

+         self.request_createrepo.get(some_dir, {"add": ["add_2"]}, done=True)

+ 

+         # nobody processed us

+         assert not bcr.check_processed()

+ 

+         # we only process the first other request

+         assert bcr.options() == (False, set(["add_1"]), set())

+         assert len(bcr.notify_keys) == 1  # still one to notify

+         assert self.redis.hgetall(key) == {}

+         assert len(caplog.record_tuples) == 2

+         assert_logs_exist("already processed, skip", caplog)

+ 

+     def test_batched_createrepo_devel_mismatch(self, caplog):

+         some_dir = "/some/dir/name:pr:5"

+         bcr = self._prep_batched_repo(some_dir)

+         key = bcr.make_request()

+ 

+         # create two other requests, one is not to be processed

+         self.request_createrepo.get(some_dir, {"add": ["add_2"], "devel": True})

+         self.request_createrepo.get(some_dir)

+ 

+         # nobody processed us

+         assert not bcr.check_processed()

+ 

+         # we only process the first other request

+         assert bcr.options() == (False, set(["add_1"]), set())

+         assert len(bcr.notify_keys) == 1  # still one to notify

+         assert self.redis.hgetall(key) == {}

+         assert len(caplog.record_tuples) == 2

+         assert_logs_exist("'devel' attribute doesn't match", caplog)

+ 

+     def test_batched_createrepo_full_we_take_others(self):

+         some_dir = "/some/dir/name:pr:take_others"

+         bcr = self._prep_batched_repo(some_dir, full=True, add=[], delete=[])

+         key = bcr.make_request()

+ 

+         task = self.redis.hgetall(key)

+         task_json = json.loads(task["task"])

+         assert task_json["full"]

+         assert task_json["add"] == [] == task_json["delete"]

+ 

+         # create three other requests, one is not to be processed

+         self.request_createrepo.get(some_dir, {"add": ["add_2"], "devel": True})

+         self.request_createrepo.get(some_dir)

+         self.request_createrepo.get(some_dir, {"add": [], "delete": ["del_1"]})

+ 

+         assert len(self.redis.keys()) == 4

+         assert not bcr.check_processed()

+         assert len(self.redis.keys()) == 3

+ 

+         assert bcr.options() == (False, {"add_1"}, {"del_1"})

+         assert len(bcr.notify_keys) == 2

+ 

+     def test_batched_createrepo_full_others_take_us(self):

+         some_dir = "/some/dir/name:pr:others_take_us"

+         bcr = self._prep_batched_repo(some_dir)

+         key = bcr.make_request()

+         task = self.redis.hgetall(key)

+         task_json = json.loads(task["task"])

+         assert not task_json["full"]

+         assert not bcr.check_processed()

+ 

+         # create four other requests, one is not to be processed

+         self.request_createrepo.get(some_dir, {"add": ["add_2"]})

+         self.request_createrepo.get(some_dir, done=True)

+         self.request_createrepo.get(some_dir, {"add": [], "delete": [], "full": True})

+         self.request_createrepo.get(some_dir, {"add": [], "delete": ["del_1"]})

+ 

+         assert bcr.options() == (True, set(), {"del_1"})

+         assert len(bcr.notify_keys) == 3

+ 

+     def test_batched_createrepo_task_limit(self, caplog):

+         some_dir = "/some/dir/name:pr:limit"

+         bcr = self._prep_batched_repo(some_dir)

+         key = bcr.make_request()

+ 

+         # create limit +2 other requests, one is not to be processed, once

+         # skipped

+         self.request_createrepo.get(some_dir)

+         self.request_createrepo.get(some_dir, {"add": ["add_2"], "devel": True})

+         for i in range(3, 3 + MAX_IN_BATCH):

+             add_dir = "add_{}".format(i)

+             self.request_createrepo.get(some_dir, {"add": [add_dir]})

+ 

+         # nobody processed us

+         assert not bcr.check_processed()

+ 

+         expected = {"add_{}".format(i) for i in range(1, MAX_IN_BATCH + 2)}

+         expected.remove("add_2")

+         assert len(expected) == MAX_IN_BATCH

+ 

+         full, add, remove = bcr.options()

+         assert (full, remove) == (False, set())

+         assert len(add) == MAX_IN_BATCH

+ 

+         # The redis.keys() isn't sorted, and even if it was - PID would make the

+         # order.  Therefore we don't know which one is skipped, but only one is.

+         assert len(add - expected) == 1

+         assert len(expected - add) == 1

+ 

+         assert len(bcr.notify_keys) == MAX_IN_BATCH

+         assert self.redis.hgetall(key) == {}

+         assert len(caplog.record_tuples) == 3

+         assert_logs_exist({

+             "'devel' attribute doesn't match",

+             "Batch copr-repo limit",

+             "Checking if we have to start actually",

+         }, caplog)

file modified
+309 -15
@@ -1,3 +1,4 @@

+ import contextlib

  import os

  import importlib

  import logging
@@ -7,36 +8,57 @@

  import shutil

  import subprocess

  import tempfile

- from copr_backend.helpers import call_copr_repo

+ import munch

+ 

+ from copr_backend.helpers import (

+     BackendConfigReader,

+     call_copr_repo,

+     get_redis_connection,

+ )

+ 

  from testlib.repodata import load_primary_xml

+ from testlib import (

+     assert_files_in_dir,

+     AsyncCreaterepoRequestFactory,

+     minimal_be_config,

+ )

  

  modifyrepo = 'run/copr-repo'

  

+ # pylint: disable=attribute-defined-outside-init

+ 

+ @contextlib.contextmanager

+ def _lock(directory="non-existent"):

+     filedict = runpy.run_path(modifyrepo)

+     opts = munch.Munch()

+     opts.log = logging.getLogger()

+     opts.directory = directory

+     lock = filedict['lock']

+     with lock(opts):

+         yield opts

  

  class TestModifyRepo(object):

      def setup_method(self, method):

-         self.lockpath = tempfile.mkdtemp(prefix="copr-test-lockpath")

+         self.workdir = tempfile.mkdtemp(prefix="copr-test-copr-repo")

+         self.be_config = minimal_be_config(self.workdir)

          self.os_env_patcher = mock.patch.dict(os.environ, {

              'PATH': os.environ['PATH']+':run',

-             'COPR_TESTSUITE_LOCKPATH': self.lockpath,

+             'COPR_TESTSUITE_LOCKPATH': self.workdir,

+             'COPR_BE_CONFIG': self.be_config,

          })

          self.os_env_patcher.start()

+         self.redis = get_redis_connection(

+             BackendConfigReader(self.be_config).read())

+         self.request_createrepo = AsyncCreaterepoRequestFactory(self.redis)

  

      def teardown_method(self, method):

-         shutil.rmtree(self.lockpath)

+         shutil.rmtree(self.workdir)

          self.os_env_patcher.stop()

+         self.redis.flushdb()

  

      def test_copr_modifyrepo_locks(self):

-         filedict = runpy.run_path(modifyrepo)

-         class XXX:

-             pass

-         opts = XXX()

-         opts.log = logging.getLogger()

-         opts.directory = 'non-existent'

-         lock = filedict['lock']

- 

-         cmd = [modifyrepo, opts.directory, '--log-to-stdout']

-         with lock(opts):

+         with _lock(self.workdir) as opts:

+             cmd = [modifyrepo, opts.directory, '--log-to-stdout']

              proc = subprocess.Popen(cmd,

                                      stdout=subprocess.PIPE,

                                      stderr=subprocess.PIPE)
@@ -58,8 +80,15 @@

          proc.kill()

          assert b"acquired lock" in err

  

+     @staticmethod

+     def _run_copr_repo(args):

+         with mock.patch("sys.argv", ["copr-repo"] + args):

+             filedict = runpy.run_path(modifyrepo)

+             filedict["main"]()

+ 

      @mock.patch.dict(os.environ, {'COPR_TESTSUITE_NO_OUTPUT': '1'})

      def test_copr_repo_add_subdir(self, f_second_build):

+         _unused = self

          ctx = f_second_build

          chroot = ctx.chroots[0]

          chrootdir = os.path.join(ctx.empty_dir, chroot)
@@ -77,9 +106,220 @@

              '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

              '00000002-example/example-1.0.4-1.fc23.x86_64.rpm'

          }

+         assert_files_in_dir(chrootdir,

+                             ["00000002-example", "00000001-prunerepo"], [])

+ 

+     def test_copr_repo_batched_createrepo(self, f_second_build):

+         ctx = f_second_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+         assert call_copr_repo(chrootdir, add=[ctx.builds[0]])

+         first_repodata = load_primary_xml(repodata)

+         assert first_repodata['hrefs'] == {

+             '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

+         }

+         # call copr-repo for second build while separate request for removal of

+         # the first repo was requested

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": ["00000001-prunerepo"],

+         })

+         assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+ 

+         second_repodata = load_primary_xml(repodata)

+         assert second_repodata['hrefs'] == {

+             '00000002-example/example-1.0.4-1.fc23.x86_64.rpm'

+         }

+         assert_files_in_dir(chrootdir, ["00000002-example"],

+                             ["00000001-prunerepo"])

+ 

+     def test_copr_repo_batched_already_processed(self, f_second_build):

+         ctx = f_second_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+         with _lock(chrootdir) as opts:

+             # delay processing by lock

+             cmd = [modifyrepo, "--batched", "--log-to-stdout",

+                    opts.directory, "--add", ctx.builds[1]]

+             proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,

+                                     stderr=subprocess.PIPE)

+             try:

+                 # start the process

+                 proc.communicate(timeout=2)

+                 assert 0 # this shouldn't happen

+             except subprocess.TimeoutExpired:

+                 pass

+             while True:

+                 keys = self.redis.keys("createrepo_batched*")

+                 if len(keys) == 1:

+                     break

+             key = keys[0]

+             # claim we did it!

+             self.redis.hset(key, "status", "success")

+ 

+         (out, err) = proc.communicate()

+         assert out == b""

+         err_decoded = err.decode("utf-8")

+         assert "Task processed by other process" in err_decoded

+         assert proc.returncode == 0

+         # nothing changed, copr-repo went no-op because it thinks we already

+         # processed it

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+         assert_files_in_dir(chrootdir,

+                             ["00000001-prunerepo", "00000002-example"], [])

+ 

+     def test_copr_repo_batched_two_builds(self, f_third_build):

+         """ Two finished builds requesting createrepo at the same time """

+         ctx = f_third_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+         assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000002-example/example-1.0.4-1.fc23.x86_64.rpm'

+         }

+         # other process requested adding build[0]

+         self.request_createrepo.get(chrootdir, {

+             "add": [ctx.builds[0]],

+             "delete": [],

+         })

+         assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

+             '00000002-example/example-1.0.4-1.fc23.x86_64.rpm'

+         }

+ 

+         # build 3 stays undeleted

+         assert_files_in_dir(chrootdir,

+                             ["00000001-prunerepo", "00000002-example",

+                              "00000003-example"], [])

+ 

+     def test_copr_repo_batched_full(self, f_third_build):

+         """

+         Full createrepo which also does one removal and one addition

+         for others.

+         """

+         ctx = f_third_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+ 

+         # createrepo was not run for the packages, yet

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+ 

+         # one other process requested removal, another addition

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": [ctx.builds[1]],

+         })

+         self.request_createrepo.get(chrootdir, {

+             "add": [ctx.builds[0]],

+             "delete": [],

+         })

+ 

+         # merged full createrepo run, still does the removal

+         assert call_copr_repo(chrootdir)

+ 

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

+             '00000003-example/example-1.0.14-1.fc30.x86_64.rpm',

+         }

+         assert_files_in_dir(chrootdir,

+                             ["00000001-prunerepo", "00000003-example"],

+                             ["00000002-example"])

+ 

+     def test_copr_repo_batched_others_full(self, f_third_build):

+         """

+         We add one build, but other request is to

+         - remove one build

+         - run full createrepo

+         """

+         ctx = f_third_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+ 

+         # check no one run craterepo against the builds

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+ 

+         # one build finished (the second one)

+         assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000002-example/example-1.0.4-1.fc23.x86_64.rpm'

+         }

+ 

+         # other process requested full run

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": [],

+             "full": True,

+         })

+ 

+         # other requested removal of first build

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": [ctx.builds[0]],

+             "full": True,

+         })

+ 

+         # we request addition of third build

+         assert call_copr_repo(chrootdir, add=[ctx.builds[2]])

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000002-example/example-1.0.4-1.fc23.x86_64.rpm',

+             '00000003-example/example-1.0.14-1.fc30.x86_64.rpm',

+         }

+         assert_files_in_dir(chrootdir,

+                             ["00000002-example", "00000003-example"],

+                             ["00000001-prunerepo"])

+ 

+     def test_copr_repo_add_del_mixup(self, f_third_build):

+         """

+         Check that if one process requests adding one build, and another

+         removal, we still remove it.

+         """

+         ctx = f_third_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+ 

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == set()

+ 

+         # delete request

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": [ctx.builds[1]],

+         })

+ 

+         # and add request for the same build

+         assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+ 

+         repoinfo = load_primary_xml(repodata)

+         # TODO: the output should be empty

+         # https://github.com/rpm-software-management/createrepo_c/issues/222

+         # assert repoinfo['hrefs'] == set()

+         assert repoinfo['hrefs'] == set([

+             '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

+             '00000003-example/example-1.0.14-1.fc30.x86_64.rpm',

+         ])

  

      @mock.patch.dict(os.environ, {'COPR_TESTSUITE_NO_OUTPUT': '1'})

      def test_copr_repo_add_subdir_devel(self, f_acr_on_and_first_build):

+         _unused = self

          ctx = f_acr_on_and_first_build

          chroot = ctx.chroots[0]

          chrootdir = os.path.join(ctx.empty_dir, chroot)
@@ -120,4 +360,58 @@

          assert True == call_copr_repo('/some/dir', add=['xxx', None])

          assert len(call.call_args_list) == 1

          call = call.call_args_list[0]

-         assert call[0][0] == ['copr-repo', '/some/dir', '--add', 'xxx']

+         assert call[0][0] == ['copr-repo', '--batched', '/some/dir', '--add', 'xxx']

+ 

+     def test_copr_repo_el5(self, f_third_build):

+         """

+         Test that special createrepo_c arguments are used when creating

+         el5 repositories.

+         """

+         _unused = self

+         ctx = f_third_build

+         chroot = ctx.chroots[0]

+         old_chrootdir = os.path.join(ctx.empty_dir, chroot)

+         # assure that it looks like el5 directory

+         chrootdir = os.path.join(ctx.empty_dir, "rhel-5-x86_64")

+         repodata = os.path.join(chrootdir, 'repodata')

+         subprocess.check_call(["cp", "-r", old_chrootdir, chrootdir])

+         assert call_copr_repo(old_chrootdir, add=[ctx.builds[0]],

+                               delete=[ctx.builds[2]])

+         assert call_copr_repo(chrootdir, add=[ctx.builds[0]],

+                               delete=[ctx.builds[2]])

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo['hrefs'] == {

+             '00000001-prunerepo/prunerepo-1.1-1.fc23.noarch.rpm',

+         }

+ 

+         # rhel-5 contains md5 checksums

+         assert_files_in_dir(chrootdir,

+                             ["00000002-example", "00000001-prunerepo"],

+                             ["00000003-example"])

+         assert repoinfo["packages"]["prunerepo"]["chksum_type"] == "md5"

+ 

+         # other chroots are sha256

+         repodata = os.path.join(old_chrootdir, 'repodata')

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo["packages"]["prunerepo"]["chksum_type"] == "sha256"

+ 

+     def test_copr_repo_noop(self, f_second_build):

+         """

+         When anyone requests removal (or addition) of directories which do not

+         exist, there's no point in running the createrepo_c at all.

+         """

+         ctx = f_second_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         self.request_createrepo.get(chrootdir, {

+             "add": [],

+             "delete": ["non-existing-dir"],

+         })

+         assert call_copr_repo(chrootdir, add=["non-existing-dir-2"])

+         repodata = os.path.join(chrootdir, 'repodata')

+         repoinfo = load_primary_xml(repodata)

+         assert repoinfo["hrefs"] == set()

+         keys = self.redis.keys("createrepo_batch*")

+         assert len(keys) == 1

+         task_dict = self.redis.hgetall(keys[0])

+         assert task_dict["status"] == "success"

@@ -11,7 +11,7 @@

  from copr_backend.sshcmd import SSHConnection, SSHConnectionError

  

  

- def minimal_be_config(where):

+ def minimal_be_config(where, overrides=None):

      """

      Create minimal be config which is parseable by BackendConfigReader.

      """
@@ -22,12 +22,17 @@

      except FileExistsError:

          pass

  

-     minimal_config_snippet = (

-         "[backend]\n"

-         "destdir={}\n"

-         "redis_port=7777\n"

-         "results_baseurl=https://example.com/results\n"

-     ).format(destdir)

+     setup = {

+         "redis_port": "7777",

+         "results_baseurl": "https://example.com/results",

+         "destdir": destdir,

+     }

+     if overrides:

+         setup.update(overrides)

+ 

+     minimal_config_snippet = "[backend]\n"

+     for key, value in setup.items():

+         minimal_config_snippet += "{}={}\n".format(key, value)

  

      be_config_file = os.path.join(where, "copr-be.conf")

      with open(be_config_file, "w") as cfg_fd:
@@ -186,3 +191,64 @@

  

          if "PROJECT_2" in dest:

              os.unlink(os.path.join(dest, "example-1.0.14-1.fc30.x86_64.rpm"))

+ 

+ def assert_logs_exist(messages, caplog):

+     """

+     Search through caplog entries for log records having all the messages in

+     ``messages`` list.

+     """

+     search_for = set(messages)

+     found = set()

+     for record in caplog.record_tuples:

+         _, _, msg = record

+         for search in search_for:

+             if search in msg:

+                 found.add(search)

+     assert found == search_for

+ 

+ def assert_logs_dont_exist(messages, caplog):

+     """

+     Search through caplog entries for log records having all the messages in

+     ``messages`` list.

+     """

+     search_for = set(messages)

+     found = set()

+     for record in caplog.record_tuples:

+         _, _, msg = record

+         for search in search_for:

+             if search in msg:

+                 found.add(search)

+     assert found == set({})

+ 

+ def assert_files_in_dir(directory, exist, dont_exist):

+     """

+     Check for (non-)existence of files in directory

+     """

+     for subdir in exist + dont_exist:

+         filename = os.path.join(directory, subdir)

+         expected_to_exist = subdir in exist

+         assert expected_to_exist == os.path.exists(filename)

+ 

+ class AsyncCreaterepoRequestFactory:

+     """ Generator for asynchronous craeterepo requests """

+     def __init__(self, redis):

+         self.pid = os.getpid()

+         self.redis = redis

+ 

+     def get(self, dirname, overrides=None, done=False):

+         """ put new request to redis """

+         task = {

+             "appstream": True,

+             "devel": False,

+             "add": ["add_1"],

+             "delete": [],

+             "full": False,

+         }

+         self.pid += 1

+         if overrides:

+             task.update(overrides)

+         key = "createrepo_batched::{}::{}".format(dirname, self.pid)

+         task_json = json.dumps(task)

+         self.redis.hset(key, "task", task_json)

+         if done:

+             self.redis.hset(key, "status", "success")

@@ -23,8 +23,9 @@

  

      for d_package in dom.getElementsByTagName('package'):

          name = d_package.getElementsByTagName('name')[0].firstChild.nodeValue

+         checksum = d_package.getElementsByTagName('checksum')[0].getAttribute('type')

          names.add(name)

-         packages[name] = {'name': name}

+         packages[name] = {'name': name, 'chksum_type': checksum}

          package = packages[name]

          package['href'] = d_package.getElementsByTagName('location')[0].getAttribute('href')

          package['xml:base'] = d_package.getElementsByTagName('location')[0].getAttribute('xml:base')

backend: automatically batch the createrepo requests

Before we start running createrepo_c, we store the createrepo request
into Redis DB and give the potential lock() rivals chance to also finish
our task.  When this happens, we just do nothing.  OTOH when we win on
the lock(), we also try to check others' requests and process them.

Fixes: #813

rebased onto ae8b4081a802d45d039e7a44b0bb251e8343d01e

3 years ago

rebased onto 8247d39c98c9a3e4ce3482d73500cd61680f24fc

3 years ago

rebased onto 06b1b2386b7609ef724c3d07d933f6602e1a2fcd

3 years ago

Any reason why to remove this not os.path.exists check?

The os.path.exists check was moved here ^^ and is done both for add/delete, not only for delete.

2 new commits added

  • backend: test for noop copr-repo run
  • backend: test checksums in el5 chroots
3 years ago

This is duplicate with batch.commit() at the end of main_locked. One of them should be removed.

Isn't this logic in reverse. This code will do "if at least one task does not have appstream enabled, then we disable it for whole batch", which is not what we want.

Isn't it better to write this as else branch?

These double negatives are hard to read for me. I will prefere to rename all no_appstream to appstream and flip the logic.

full may hold different value than currently processed task_opts["full"]

Hmm, why do you prefer this? If those requests were handled separately, any of those would beat all the previous - and the metadata would be generated in the end. OTOH, to stay on the safe side, we probably shouldn't match requests that differe in this? Similarly to the "devel" flag ... wdyt?

rebased onto f0d090aff7f0f89bfc619da7fe7043a4511e495a

3 years ago

rebased onto 533a80927aa5e65e10e4fcf3a4ffe12160b0c09e

3 years ago

6 new commits added

  • backend: copr-repo to work with absolute paths
  • backend: invert no_appstream_metadata logic
  • backend: test for noop copr-repo run
  • backend: test checksums in el5 chroots
  • backend: automatically batch the createrepo requests
  • backend: slightly more thrifty copr-repo
3 years ago

6 new commits added

  • backend: copr-repo to work with absolute paths
  • backend: invert no_appstream_metadata logic
  • backend: test for noop copr-repo run
  • backend: test checksums in el5 chroots
  • backend: automatically batch the createrepo requests
  • backend: slightly more thrifty copr-repo
3 years ago

rebased onto fe3889e

3 years ago

Pull-Request has been merged by praiskup

3 years ago
Metadata