#2340 kojira: threaded repo deletion
Merged 2 years ago by tkopecek. Opened 2 years ago by tkopecek.
tkopecek/koji issue2336  into  master

file modified
+75 -32
@@ -64,7 +64,7 @@ 


  class ManagedRepo(object):


-     def __init__(self, manager, data):

+     def __init__(self, manager, data, repodata=None):

          self.manager = manager

          self.session = manager.session

          self.options = manager.options
@@ -81,7 +81,7 @@ 

          self.expire_ts = None

          if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']:

              self.current = False

-             self.expire_ts = time.time()

+             self._find_expire_time(repodata)

              # TODO use hub data to find the actual expiration time

          self.first_seen = time.time()

          if self.current:
@@ -92,6 +92,17 @@ 

                  tags[x['parent_id']] = 1

              self.taglist = to_list(tags.keys())


+     def _find_expire_time(self, repodata):

+         # find all newer repos for same tag and set oldest as expire_ts for our repo

+         if repodata:

+             repos = [r for r in repodata if

+                      r['tag_id'] == self.tag_id and r['create_event'] > self.event_id]

+             if repos:

+                 invalidated_by = sorted(repos, key=lambda x: x['create_event'])

+                 self.expire_ts = invalidated_by[0]['create_ts']

+         if not self.expire_ts:

+             self.expire_ts = time.time()



      def dist(self):

          # TODO: remove this indirection once we can rely on the hub to return
@@ -180,7 +191,7 @@ 

          age = time.time() - max(times)

          return age > timeout


-     def tryDelete(self):

+     def tryDelete(self, logger):

          """Remove the repo from disk, if possible"""

          path = self.get_path()

          if not path:
@@ -199,52 +210,55 @@ 

              if e.errno == 2:

                  # No such file or directory, so the repo either never existed,

                  # or has already been deleted, so allow it to be marked deleted.

-                 self.logger.info("Repo directory does not exist: %s" % path)

+                 logger.info("Repo directory does not exist: %s" % path)



-                 self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))

+                 logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))

                  return False


              times = [self.event_ts, mtime, self.expire_ts]

              times = [ts for ts in times if ts is not None]

              age = time.time() - max(times)

-             self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)

+             logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)

              if age < lifetime:

                  return False

-         self.logger.debug("Attempting to delete repo %s.." % self.repo_id)

+         logger.debug("Attempting to delete repo %s.." % self.repo_id)

          if self.state != koji.REPO_EXPIRED:

              raise koji.GenericError("Repo not expired")

          if self.session.repoDelete(self.repo_id) > 0:

              # cannot delete, we are referenced by a buildroot

-             self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)

+             logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)

              return False

-         self.logger.info("Deleted repo %s" % self.repo_id)

+         logger.info("Deleted repo %s" % self.repo_id)

          self.state = koji.REPO_DELETED

          if os.path.islink(path):

              # expected for repos on other volumes

              info = self.get_info()

              if not os.path.exists(path):

-                 self.logger.error('Repo volume link broken: %s', path)

+                 logger.error('Repo volume link broken: %s', path)

                  return False

              if not info or 'volume' not in info:

-                 self.logger.error('Missing repo.json in %s', path)

+                 logger.error('Missing repo.json in %s', path)

                  return False

              realpath = self.get_path(volume=info['volume'])

              if not os.path.exists(realpath):

-                 self.logger.error('Repo real path missing: %s', realpath)

+                 logger.error('Repo real path missing: %s', realpath)

                  return False

              if not os.path.samefile(path, realpath):

-                 self.logger.error('Incorrect volume link: %s', path)

+                 logger.error('Incorrect volume link: %s', path)

                  return False

              # ok, try to remove the symlink



              except OSError:

-                 self.logger.error('Unable to remove volume link: %s', path)

-             # and remove the real path

-             self.manager.rmtree(realpath)

+                 logger.error('Unable to remove volume link: %s', path)


-             self.manager.rmtree(path)

+             realpath = path

+         try:

+             rmtree(realpath)

+         except BaseException:

+             logger.error(''.join(traceback.format_exception(*sys.exc_info())))


          return True


      def ready(self):
@@ -371,7 +385,7 @@ 


                  self.logger.info('Found repo %s, state=%s'

                                   % (repo_id, koji.REPO_STATES[data['state']]))

-                 repo = ManagedRepo(self, data)

+                 repo = ManagedRepo(self, data, repodata)

                  self.repos[repo_id] = repo

              if not getTag(self.session, repo.tag_id) and not repo.expired():

                  self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id)
@@ -519,6 +533,21 @@ 




+     def deleteLoop(self, session):

+         """Triggers regens as needed/possible. Runs in a separate thread"""

+         self.session = session

+         self.delete_logger = logging.getLogger("koji.repo.delete")

+         self.delete_logger.info('deleteLoop starting')

+         try:

+             while True:

+                 self.deleteRepos()

+                 time.sleep(self.options.sleeptime)

+         except Exception:

+             self.delete_logger.exception('Error in delete thread')

+             raise

+         finally:

+             session.logout()


      def pruneLocalRepos(self):

          for volinfo in self.session.listVolumes():

              volumedir = pathinfo.volumedir(volinfo['name'])
@@ -689,17 +718,6 @@ 

                      time_expired = time.time() - tag['expire_ts']

                      f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score'])))


-         # trigger deletes

-         n_deletes = 0

-         for repo in to_list(self.repos.values()):

-             if n_deletes >= self.options.delete_batch_size:

-                 break

-             if repo.expired():

-                 # try to delete

-                 if repo.tryDelete():

-                     n_deletes += 1

-                     del self.repos[repo.repo_id]


      def checkTasks(self):

          """Check on newRepo tasks

@@ -850,12 +868,12 @@ 

          order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True)

          for tag in order:

              if running_tasks >= self.options.max_repo_tasks:

-                 self.logger.debug("Running tasks (%s): %s" % list(running_tasks))

+                 self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks)))

                  self.logger.info("Maximum number of repo tasks reached")


              elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit:

-                 self.logger.debug("Tracked tasks (%s): %s" % list(self.tasks))

-                 self.logger.debug("Untracked tasks (%s): %s" % list(self.other_tasks))

+                 self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks)))

+                 self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks)))

                  self.logger.info("Repo task limit reached")


              tagname = tag['taginfo']['name']
@@ -905,6 +923,18 @@ 

          if running_tasks_maven >= self.options.max_repo_tasks_maven:

              self.logger.info("Maximum number of maven repo tasks reached")


+     def deleteRepos(self):

+         # trigger deletes

+         self.delete_logger.debug("Starting delete repos")

+         n = 0

+         for repo in to_list(self.repos.values()):

+             if repo.expired():

+                 # try to delete

+                 if repo.tryDelete(self.delete_logger):

+                     del self.repos[repo.repo_id]

+                     n += 1

+         self.delete_logger.debug("Ending delete repos (deleted: %s)" % n)



  def start_currency_checker(session, repomgr):

      subsession = session.subsession()
@@ -933,6 +963,15 @@ 

      return thread



+ def start_delete_loop(session, repomgr):

+     subsession = session.subsession()

+     thread = threading.Thread(name='deleteLoop',

+                               target=repomgr.deleteLoop, args=(subsession,))

+     thread.setDaemon(True)

+     thread.start()

+     return thread



  def main(options, session):

      repomgr = RepoManager(options, session)

@@ -944,6 +983,7 @@ 

      if options.check_external_repos:

          curr_ext_chk_thread = start_external_currency_checker(session, repomgr)

      regen_thread = start_regen_loop(session, repomgr)

+     delete_thread = start_delete_loop(session, repomgr)

      # TODO also move rmtree jobs to threads

      logger.info("Entering main loop")

      while True:
@@ -961,6 +1001,9 @@ 

              if not regen_thread.is_alive():

                  logger.error("Regeneration thread died. Restarting it.")

                  regen_thread = start_regen_loop(session, repomgr)

+             if not delete_thread.is_alive():

+                 logger.error("Delete thread died. Restarting it.")

+                 delete_thread = start_delete_loop(session, repomgr)

          except KeyboardInterrupt:

              logger.warning("User exit")


Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

2 years ago

1 new commit added

  • kojira: estimate better expire_ts
2 years ago

This is going to require further review, but here is what I have so far.

Passing data on all active repos to each ManagedRepo instance seems wrong. It seems like there has to be a better way, though I appreciate it's a messy problem.

Overall, handling the tryDelete loop in its own thread seems fine.

However, it looks like some of the data access is not thread-safe. The checkQueue function is called both from rmtree in the delete thread and in main in the main thread. This function both reads and modifies self.delete_pids and self.delete_queue. We could easily get some odd behavior two calls overlap.

This could be resolved different ways -- restricting the checkQueue calls to only one thread or the other, or using locking.

I'll likely have more to say after a second look.

I can add the lock - it is simple. What I'm afraid of more is logic behind _find_expire_time. I've gut feeling that it doesn't work well for recreated older repos, but wasn't able to find any concrete case which could break it.

Active repos could be saved to RepoManager._active_repos and deleted after the loop ends, so it is not being copied around.

I would also like to add rmtree logic to deleteRepos thread but it is worth another PR later.

Metadata Update from @mfilip:
- Pull-request tagged with: testing-done

2 years ago

Regarding _find_expire_time(). I was worried about the case where a repo is manually triggered from an old event and kojira gives it an expire_ts based on some about-to-be-deleted repo from the same tag.

However, since we also check mtime in tryDelete, the age of such repos will end up being based on that instead.

So I think that part is ok, or at least as ok as it can be given the data we're working with.

Commit ee93da4 fixes this pull-request

Pull-Request has been merged by tkopecek

2 years ago

Did you merge this without fixing the threading issue?

Because of the similarly named functions, I did not realize at first review that you'd removed queuing from tryDelete. Easy to confuse the rmtree function from koji.util with self.rmtree.

So, the good news is that allays my threading concerns, since self.rmtree and self.checkQueue are only called from the main thread.

However, I'm worried that serializing the deletes, even in a dedicated thread, could be a problem when there are maven repos involved. Deleting a normal repo is almost instantaneous, since it's only a few files that need to be unlinked. However, maven repos are large and complicated trees with lots and lots of files that need to be unlinked. This tends to be very slow, particularly over nfs.

Anyway, I think that we should look into making the actual deletes parallel again. We still have the delete queue in place. We'd just have to make sure to do it in a thread-safe way. It seems inconsistent to have the main thread queuing deletes in pruneLocalRepos, while the delete thread is running them directly.

I'll file a follow up for that. I don't think we need to hold 1.22 for it, though.

I've filed #2398 as a followup and marked it for 1.23