#1927 backend: use lock(timeout=5) to work-around fair-locks
Merged 2 years ago by praiskup. Opened 2 years ago by praiskup.
Unknown source fair-lock  into  main

file modified
+2 -2
@@ -42,13 +42,13 @@

  BuildRequires: python3-daemon

  BuildRequires: python3-dateutil

  BuildRequires: python3-fedmsg

+ BuildRequires: python3-filelock

  BuildRequires: python3-gobject

  BuildRequires: python3-httpretty

  BuildRequires: python3-humanize

  BuildRequires: python3-libmodulemd1 >= 1.7.0

  BuildRequires: python3-munch

  BuildRequires: python3-netaddr

- BuildRequires: python3-oslo-concurrency

  BuildRequires: python3-packaging

  BuildRequires: python3-pytest

  BuildRequires: python3-pytest-cov
@@ -81,6 +81,7 @@

  Requires:   python3-daemon

  Requires:   python3-dateutil

  Requires:   python3-fedmsg

+ Requires:   python3-filelock

  Requires:   python3-gobject

  Requires:   python3-humanize

  Requires:   python3-jinja2
@@ -88,7 +89,6 @@

  Requires:   python3-munch

  Requires:   python3-netaddr

  Requires:   python3-novaclient

- Requires:   python3-oslo-concurrency

  Requires:   python3-packaging

  Requires:   python3-pytz

  Requires:   python3-requests

@@ -90,23 +90,30 @@

          self.redis.hset(self.key, "task", self._json_redis_task)

          return self.key

  

-     def check_processed(self):

+     def check_processed(self, delete_if_not=True):

          """

          Drop our entry from Redis DB (if any), and return True if the task is

-         already processed.  Requires lock!

+         already processed.  When 'delete_if_not=True, we delete the self.key

+         from Redis even if the task is not yet processed (meaning that caller

+         plans to finish the task right away).

          """

          if self.noop:

              return False

  

-         self.log.debug("Checking if we have to start actually")

-         status = self.redis.hget(self.key, "status")

-         self.redis.delete(self.key)

-         if status is None:

-             # not yet processed

-             return False

+         status = self.redis.hget(self.key, "status") == "success"

+         self.log.debug("Has already a status? %s", status)

+ 

+         try:

+             if not status:

+                 # not yet processed

+                 return False

+         finally:

+             # This is atomic operation, other processes may not re-start doing this

+             # task again.  https://github.com/redis/redis/issues/9531

+             if status or delete_if_not:

+                 self.redis.delete(self.key)

  

-         self.log.debug("Task has already status %s", status)

-         return status == "success"

+         return status

  

      def options(self):

          """

file modified
+49 -13
@@ -19,7 +19,7 @@

  import subprocess

  import sys

  

- import oslo_concurrency.lockutils

+ import filelock

  

  from copr_backend.helpers import (

      BackendConfigReader,
@@ -295,19 +295,28 @@

      assert b'--recycle-pkglist' in out

  

  

+ class LockTimeout(OSError):

+     """ Raised for lock() timeout, if timeout= option is set to value >= 0 """

+ 

  @contextlib.contextmanager

- def lock(opts):

+ def lock(opts, timeout=-1):

      lock_path = os.environ.get('COPR_TESTSUITE_LOCKPATH', "/var/lock/copr-backend")

-     # TODO: better lock filename once we can remove craterepo.py

-     lock_name = os.path.join(opts.directory, 'createrepo.lock')

+     lock_basename = opts.directory.replace("/", "_@_") + '.lock'

+     lock_filename = os.path.join(lock_path, lock_basename)

      opts.log.debug("acquiring lock")

-     with oslo_concurrency.lockutils.lock(name=lock_name, external=True,

-                                          lock_path=lock_path):

-         opts.log.debug("acquired lock")

-         yield

+     try:

+         with filelock.FileLock(lock_filename, timeout=timeout):

+             opts.log.debug("acquired lock")

+             yield

+     except filelock.Timeout as err:

+         opts.log.debug("lock timeouted")

+         raise LockTimeout("Timeouted on lock file: {}".format(lock_path)) from err

  

  

  def main_locked(opts, batch, log):

+     """

+     Main method, executed under lock.

+     """

      if batch.check_processed():

          log.info("Task processed by other process")

          return
@@ -368,6 +377,37 @@

                                  opts.dirname, opts.chroot)

  

  

+ def main_try_lock(opts, batch):

+     """

+     Periodically try to acquire the lock, and execute the main_locked() method.

+     """

+ 

+     while True:

+ 

+         # We don't have fair locking (locks-first => processes-first).  So to

+         # avoid potential indefinite waiting (see issue #1423) we check if the

+         # task isn't already processed _without_ having the lock.

+ 

+         if batch.check_processed(delete_if_not=False):

+             opts.log.info("Task processed by other process (no-lock)")

+             return

+ 

+         try:

+             with lock(opts, timeout=5):

+                 main_locked(opts, batch, opts.log)

+                 # skip commit if main_locked() raises exception

+                 batch.commit()

+                 # Unless there's an exception, the bash.commit() is done and we are

+                 # done.

+                 opts.log.debug("Metadata built by this process")

+                 break

+         except LockTimeout:

+             continue  # Try again...

+ 

+         # we never loop, only upon timeout

+         assert False

+ 

+ 

  def main():

      opts = get_arg_parser().parse_args()

  
@@ -402,11 +442,7 @@

      batch.make_request()

  

      try:

-         with lock(opts):

-             main_locked(opts, batch, opts.log)

-             # skip commit if main_locked() raises exception

-             batch.commit()

- 

+         main_try_lock(opts, batch)

      except CommandException:

          opts.log.exception("Sub-command failed")

          return 1

@@ -233,7 +233,7 @@

  

          log_entries_expected = {

              "Batch copr-repo limit",

-             "Checking if we have to start actually",

+             "Has already a status? False",

          }

  

          msg_count = len(caplog.record_tuples)

@@ -5,6 +5,7 @@

  import shutil

  import subprocess

  import tempfile

+ import time

  import glob

  from unittest import mock

  
@@ -509,3 +510,56 @@

              lines = fd.readlines()

              assert len(lines) == 1

              assert "pruned on" in lines[0]

+ 

+     @pytest.mark.parametrize('run_bg', [True, False])

+     @mock.patch.dict(os.environ, {'COPR_TESTSUITE_NO_OUTPUT': '1'})

+     def test_copr_repo_timeouted_check(self, f_second_build, run_bg):

+         _unused = self

+         ctx = f_second_build

+         chroot = ctx.chroots[0]

+         chrootdir = os.path.join(ctx.empty_dir, chroot)

+         repodata = os.path.join(chrootdir, 'repodata')

+ 

+         # empty repodata at the beginning

+         empty_repodata = load_primary_xml(repodata)

+         assert empty_repodata['names'] == set()

+ 

+         pid = os.fork()

+         if not pid:

+             # give parent some time to lock the repo

+             time.sleep(1)

+             # Run the blocked (by parent) createrepo, it must finish soon

+             # anway because parent will claim the task is done.

+             assert call_copr_repo(chrootdir, add=[ctx.builds[1]])

+             # sys.exit() can not be used in testsuite

+             os._exit(0)  # pylint: disable=protected-access

+ 

+         with _lock(chrootdir):

+             # give the child some time to fill its Redis keys

+             sleeper = 1

+             while True:

+                 if len(self.redis.keys()) > 0:

+                     break

+                 sleeper += 1

+                 time.sleep(0.1)

+                 assert sleeper < 10*15  # 15s

+ 

+             assert len(self.redis.keys()) == 1

+             key = self.redis.keys()[0]

+ 

+             # Claim we did that task (even if not) and check that the child

+             # finishes after some time.

+             if not run_bg:

+                 self.redis.hset(key, "status", "success")

+                 assert os.wait()[1] == 0

+                 assert self.redis.get(key) is None

+ 

+         if run_bg:

+             # actually process the background job!

+             assert os.wait()[1] == 0

+ 

+         repodata = load_primary_xml(repodata)

+         if run_bg:

+             assert repodata['names'] == {'example'}

+         else:

+             assert repodata['names'] == set()

It is not easy to implement fair locking. Having unfair locks,
previously some of the copr-repo processes were waiting awfully long for
the lock even though other copr-repo processes already finished theirs
task.

So don't indefinitely wait for the lock (implement timeout with several
seconds), and try checking for the status periodically (without lock).
If task status is filled, finish early.

Relates: #1423

rebased onto bc485b29c4a154ebaa3ccd627ef6ad3dd0684019

2 years ago

Build succeeded.

rebased onto bfda41ac2561dc738b9cf7d60ec1b554983a0212

2 years ago

Updated, with additional test-case to cover the new statements in copr-repo.

Build succeeded.

Can this be reviewed please? This should bring significant speedup in the rubygems project.

Can we move this break right after batch.commit() so it is more clear when it happens?

Can you please describe the purpose of delete_if_not in the docstring?

I was surprised why we needed to switch from python3-oslo-concurrency to python3-filelock instead of simply acquiring the lock in a loop with time.sleep, so thank you for linking

https://bugzilla.redhat.com/show_bug.cgi?id=1968789

Apart from those two minor things, LGTM

rebased onto 801e985

2 years ago

Thanks! Updated, can you take another look?

Build succeeded.

Pull-Request has been merged by praiskup

2 years ago