From 74f18b687e54911fd5e44c01b68c3a7c5a85d44a Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jul 24 2020 11:40:26 +0000 Subject: kojira: threaded repo deletion Fixes: https://pagure.io/koji/issue/2336 --- diff --git a/util/kojira b/util/kojira index 452a292..c8c41d5 100755 --- a/util/kojira +++ b/util/kojira @@ -180,7 +180,7 @@ class ManagedRepo(object): age = time.time() - max(times) return age > timeout - def tryDelete(self): + def tryDelete(self, logger): """Remove the repo from disk, if possible""" path = self.get_path() if not path: @@ -199,52 +199,55 @@ class ManagedRepo(object): if e.errno == 2: # No such file or directory, so the repo either never existed, # or has already been deleted, so allow it to be marked deleted. - self.logger.info("Repo directory does not exist: %s" % path) + logger.info("Repo directory does not exist: %s" % path) pass else: - self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror)) + logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror)) return False else: times = [self.event_ts, mtime, self.expire_ts] times = [ts for ts in times if ts is not None] age = time.time() - max(times) - self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age) + logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age) if age < lifetime: return False - self.logger.debug("Attempting to delete repo %s.." % self.repo_id) + logger.debug("Attempting to delete repo %s.." % self.repo_id) if self.state != koji.REPO_EXPIRED: raise koji.GenericError("Repo not expired") if self.session.repoDelete(self.repo_id) > 0: # cannot delete, we are referenced by a buildroot - self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id) + logger.debug("Cannot delete repo %s, still referenced" % self.repo_id) return False - self.logger.info("Deleted repo %s" % self.repo_id) + logger.info("Deleted repo %s" % self.repo_id) self.state = koji.REPO_DELETED if os.path.islink(path): # expected for repos on other volumes info = self.get_info() if not os.path.exists(path): - self.logger.error('Repo volume link broken: %s', path) + logger.error('Repo volume link broken: %s', path) return False if not info or 'volume' not in info: - self.logger.error('Missing repo.json in %s', path) + logger.error('Missing repo.json in %s', path) return False realpath = self.get_path(volume=info['volume']) if not os.path.exists(realpath): - self.logger.error('Repo real path missing: %s', realpath) + logger.error('Repo real path missing: %s', realpath) return False if not os.path.samefile(path, realpath): - self.logger.error('Incorrect volume link: %s', path) + logger.error('Incorrect volume link: %s', path) return False # ok, try to remove the symlink try: os.unlink(path) except OSError: - self.logger.error('Unable to remove volume link: %s', path) - # and remove the real path - self.manager.rmtree(realpath) + logger.error('Unable to remove volume link: %s', path) else: - self.manager.rmtree(path) + realpath = path + try: + rmtree(realpath) + except BaseException: + logger.error(''.join(traceback.format_exception(*sys.exc_info()))) + return True def ready(self): @@ -519,6 +522,21 @@ class RepoManager(object): finally: session.logout() + def deleteLoop(self, session): + """Triggers regens as needed/possible. Runs in a separate thread""" + self.session = session + self.delete_logger = logging.getLogger("koji.repo.delete") + self.delete_logger.info('deleteLoop starting') + try: + while True: + self.deleteRepos() + time.sleep(self.options.sleeptime) + except Exception: + self.delete_logger.exception('Error in delete thread') + raise + finally: + session.logout() + def pruneLocalRepos(self): for volinfo in self.session.listVolumes(): volumedir = pathinfo.volumedir(volinfo['name']) @@ -689,17 +707,6 @@ class RepoManager(object): time_expired = time.time() - tag['expire_ts'] f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score']))) - # trigger deletes - n_deletes = 0 - for repo in to_list(self.repos.values()): - if n_deletes >= self.options.delete_batch_size: - break - if repo.expired(): - # try to delete - if repo.tryDelete(): - n_deletes += 1 - del self.repos[repo.repo_id] - def checkTasks(self): """Check on newRepo tasks @@ -850,12 +857,12 @@ class RepoManager(object): order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True) for tag in order: if running_tasks >= self.options.max_repo_tasks: - self.logger.debug("Running tasks (%s): %s" % list(running_tasks)) + self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks))) self.logger.info("Maximum number of repo tasks reached") return elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit: - self.logger.debug("Tracked tasks (%s): %s" % list(self.tasks)) - self.logger.debug("Untracked tasks (%s): %s" % list(self.other_tasks)) + self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks))) + self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks))) self.logger.info("Repo task limit reached") return tagname = tag['taginfo']['name'] @@ -905,6 +912,18 @@ class RepoManager(object): if running_tasks_maven >= self.options.max_repo_tasks_maven: self.logger.info("Maximum number of maven repo tasks reached") + def deleteRepos(self): + # trigger deletes + self.delete_logger.debug("Starting delete repos") + n = 0 + for repo in to_list(self.repos.values()): + if repo.expired(): + # try to delete + if repo.tryDelete(self.delete_logger): + del self.repos[repo.repo_id] + n += 1 + self.delete_logger.debug("Ending delete repos (deleted: %s)" % n) + def start_currency_checker(session, repomgr): subsession = session.subsession() @@ -933,6 +952,15 @@ def start_regen_loop(session, repomgr): return thread +def start_delete_loop(session, repomgr): + subsession = session.subsession() + thread = threading.Thread(name='deleteLoop', + target=repomgr.deleteLoop, args=(subsession,)) + thread.setDaemon(True) + thread.start() + return thread + + def main(options, session): repomgr = RepoManager(options, session) repomgr.readCurrentRepos() @@ -944,6 +972,7 @@ def main(options, session): if options.check_external_repos: curr_ext_chk_thread = start_external_currency_checker(session, repomgr) regen_thread = start_regen_loop(session, repomgr) + delete_thread = start_delete_loop(session, repomgr) # TODO also move rmtree jobs to threads logger.info("Entering main loop") while True: @@ -961,6 +990,9 @@ def main(options, session): if not regen_thread.is_alive(): logger.error("Regeneration thread died. Restarting it.") regen_thread = start_regen_loop(session, repomgr) + if not delete_thread.is_alive(): + logger.error("Delete thread died. Restarting it.") + delete_thread = start_delete_loop(session, repomgr) except KeyboardInterrupt: logger.warning("User exit") break