From 801e9851dd992a7bd8aec847887bcc53ea4e0e1f Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Sep 27 2021 06:43:38 +0000 Subject: backend: use lock(timeout=5) to work-around fair-locks It is not easy to implement fair locking. Having unfair locks, previously some of the copr-repo processes were waiting awfully long for the lock even though other copr-repo processes already finished theirs task. So don't indefinitely wait for the lock (implement timeout with several seconds), and try checking for the status periodically (without lock). If task status is filled, finish early. Because osloutils lock() doesn't have timeout= argument, and fair=True doesn't work - I'm switching the code against the filelock module (as per rhbz#1968789 suggestion). Some new commands appeared, so add a test-case to cover them. Relates: #1423 --- diff --git a/backend/copr-backend.spec b/backend/copr-backend.spec index ad30486..afd1a2e 100644 --- a/backend/copr-backend.spec +++ b/backend/copr-backend.spec @@ -42,13 +42,13 @@ BuildRequires: python3-copr-messaging BuildRequires: python3-daemon BuildRequires: python3-dateutil BuildRequires: python3-fedmsg +BuildRequires: python3-filelock BuildRequires: python3-gobject BuildRequires: python3-httpretty BuildRequires: python3-humanize BuildRequires: python3-libmodulemd1 >= 1.7.0 BuildRequires: python3-munch BuildRequires: python3-netaddr -BuildRequires: python3-oslo-concurrency BuildRequires: python3-packaging BuildRequires: python3-pytest BuildRequires: python3-pytest-cov @@ -81,6 +81,7 @@ Requires: python3-copr-messaging Requires: python3-daemon Requires: python3-dateutil Requires: python3-fedmsg +Requires: python3-filelock Requires: python3-gobject Requires: python3-humanize Requires: python3-jinja2 @@ -88,7 +89,6 @@ Requires: python3-libmodulemd1 >= 1.7.0 Requires: python3-munch Requires: python3-netaddr Requires: python3-novaclient -Requires: python3-oslo-concurrency Requires: python3-packaging Requires: python3-pytz Requires: python3-requests diff --git a/backend/copr_backend/createrepo.py b/backend/copr_backend/createrepo.py index 0c332be..10078b1 100644 --- a/backend/copr_backend/createrepo.py +++ b/backend/copr_backend/createrepo.py @@ -90,23 +90,30 @@ class BatchedCreaterepo: self.redis.hset(self.key, "task", self._json_redis_task) return self.key - def check_processed(self): + def check_processed(self, delete_if_not=True): """ Drop our entry from Redis DB (if any), and return True if the task is - already processed. Requires lock! + already processed. When 'delete_if_not=True, we delete the self.key + from Redis even if the task is not yet processed (meaning that caller + plans to finish the task right away). """ if self.noop: return False - self.log.debug("Checking if we have to start actually") - status = self.redis.hget(self.key, "status") - self.redis.delete(self.key) - if status is None: - # not yet processed - return False + status = self.redis.hget(self.key, "status") == "success" + self.log.debug("Has already a status? %s", status) + + try: + if not status: + # not yet processed + return False + finally: + # This is atomic operation, other processes may not re-start doing this + # task again. https://github.com/redis/redis/issues/9531 + if status or delete_if_not: + self.redis.delete(self.key) - self.log.debug("Task has already status %s", status) - return status == "success" + return status def options(self): """ diff --git a/backend/run/copr-repo b/backend/run/copr-repo index 25688f8..b3af224 100755 --- a/backend/run/copr-repo +++ b/backend/run/copr-repo @@ -19,7 +19,7 @@ import shutil import subprocess import sys -import oslo_concurrency.lockutils +import filelock from copr_backend.helpers import ( BackendConfigReader, @@ -295,19 +295,28 @@ def assert_new_createrepo(): assert b'--recycle-pkglist' in out +class LockTimeout(OSError): + """ Raised for lock() timeout, if timeout= option is set to value >= 0 """ + @contextlib.contextmanager -def lock(opts): +def lock(opts, timeout=-1): lock_path = os.environ.get('COPR_TESTSUITE_LOCKPATH', "/var/lock/copr-backend") - # TODO: better lock filename once we can remove craterepo.py - lock_name = os.path.join(opts.directory, 'createrepo.lock') + lock_basename = opts.directory.replace("/", "_@_") + '.lock' + lock_filename = os.path.join(lock_path, lock_basename) opts.log.debug("acquiring lock") - with oslo_concurrency.lockutils.lock(name=lock_name, external=True, - lock_path=lock_path): - opts.log.debug("acquired lock") - yield + try: + with filelock.FileLock(lock_filename, timeout=timeout): + opts.log.debug("acquired lock") + yield + except filelock.Timeout as err: + opts.log.debug("lock timeouted") + raise LockTimeout("Timeouted on lock file: {}".format(lock_path)) from err def main_locked(opts, batch, log): + """ + Main method, executed under lock. + """ if batch.check_processed(): log.info("Task processed by other process") return @@ -368,6 +377,37 @@ def process_directory_path(opts): opts.dirname, opts.chroot) +def main_try_lock(opts, batch): + """ + Periodically try to acquire the lock, and execute the main_locked() method. + """ + + while True: + + # We don't have fair locking (locks-first => processes-first). So to + # avoid potential indefinite waiting (see issue #1423) we check if the + # task isn't already processed _without_ having the lock. + + if batch.check_processed(delete_if_not=False): + opts.log.info("Task processed by other process (no-lock)") + return + + try: + with lock(opts, timeout=5): + main_locked(opts, batch, opts.log) + # skip commit if main_locked() raises exception + batch.commit() + # Unless there's an exception, the bash.commit() is done and we are + # done. + opts.log.debug("Metadata built by this process") + break + except LockTimeout: + continue # Try again... + + # we never loop, only upon timeout + assert False + + def main(): opts = get_arg_parser().parse_args() @@ -402,11 +442,7 @@ def main(): batch.make_request() try: - with lock(opts): - main_locked(opts, batch, opts.log) - # skip commit if main_locked() raises exception - batch.commit() - + main_try_lock(opts, batch) except CommandException: opts.log.exception("Sub-command failed") return 1 diff --git a/backend/tests/test_createrepo.py b/backend/tests/test_createrepo.py index ffab910..b229c85 100644 --- a/backend/tests/test_createrepo.py +++ b/backend/tests/test_createrepo.py @@ -233,7 +233,7 @@ class TestBatchedCreaterepo: log_entries_expected = { "Batch copr-repo limit", - "Checking if we have to start actually", + "Has already a status? False", } msg_count = len(caplog.record_tuples) diff --git a/backend/tests/test_modifyrepo.py b/backend/tests/test_modifyrepo.py index ec6cca4..95d4201 100644 --- a/backend/tests/test_modifyrepo.py +++ b/backend/tests/test_modifyrepo.py @@ -5,6 +5,7 @@ import runpy import shutil import subprocess import tempfile +import time import glob from unittest import mock @@ -509,3 +510,56 @@ class TestModifyRepo(object): lines = fd.readlines() assert len(lines) == 1 assert "pruned on" in lines[0] + + @pytest.mark.parametrize('run_bg', [True, False]) + @mock.patch.dict(os.environ, {'COPR_TESTSUITE_NO_OUTPUT': '1'}) + def test_copr_repo_timeouted_check(self, f_second_build, run_bg): + _unused = self + ctx = f_second_build + chroot = ctx.chroots[0] + chrootdir = os.path.join(ctx.empty_dir, chroot) + repodata = os.path.join(chrootdir, 'repodata') + + # empty repodata at the beginning + empty_repodata = load_primary_xml(repodata) + assert empty_repodata['names'] == set() + + pid = os.fork() + if not pid: + # give parent some time to lock the repo + time.sleep(1) + # Run the blocked (by parent) createrepo, it must finish soon + # anway because parent will claim the task is done. + assert call_copr_repo(chrootdir, add=[ctx.builds[1]]) + # sys.exit() can not be used in testsuite + os._exit(0) # pylint: disable=protected-access + + with _lock(chrootdir): + # give the child some time to fill its Redis keys + sleeper = 1 + while True: + if len(self.redis.keys()) > 0: + break + sleeper += 1 + time.sleep(0.1) + assert sleeper < 10*15 # 15s + + assert len(self.redis.keys()) == 1 + key = self.redis.keys()[0] + + # Claim we did that task (even if not) and check that the child + # finishes after some time. + if not run_bg: + self.redis.hset(key, "status", "success") + assert os.wait()[1] == 0 + assert self.redis.get(key) is None + + if run_bg: + # actually process the background job! + assert os.wait()[1] == 0 + + repodata = load_primary_xml(repodata) + if run_bg: + assert repodata['names'] == {'example'} + else: + assert repodata['names'] == set()