From e3fd2a256c4473eda0dd258987791023bd3656c4 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Aug 18 2020 12:34:26 +0000 Subject: kojira: parallelize rmtree Fixes: https://pagure.io/koji/issue/2398 --- diff --git a/util/kojira b/util/kojira index 3f75160..82294ff 100755 --- a/util/kojira +++ b/util/kojira @@ -255,7 +255,7 @@ class ManagedRepo(object): else: realpath = path try: - rmtree(realpath) + self.manager.rmtree(realpath) except BaseException: logger.error(''.join(traceback.format_exception(*sys.exc_info()))) @@ -309,7 +309,6 @@ class RepoManager(object): """Spawn (or queue) and rmtree job""" self.logger.info("Queuing rmtree job for %s", path) self.delete_queue.append(path) - self.checkQueue() def checkQueue(self): finished = [pid for pid in self.delete_pids if self.waitPid(pid)] @@ -548,6 +547,20 @@ class RepoManager(object): finally: session.logout() + def rmtreeLoop(self, session): + self.session = session + logger = logging.getLogger("koji.repo.rmtree") + try: + while True: + logger.debug('queue length: %d', len(self.delete_queue)) + self.checkQueue() + time.sleep(self.options.sleeptime) + except Exception: + logger.exception('Error in delete thread') + raise + finally: + session.logout() + def pruneLocalRepos(self): for volinfo in self.session.listVolumes(): volumedir = pathinfo.volumedir(volinfo['name']) @@ -933,7 +946,7 @@ class RepoManager(object): if repo.tryDelete(self.delete_logger): del self.repos[repo.repo_id] n += 1 - self.delete_logger.debug("Ending delete repos (deleted: %s)" % n) + self.delete_logger.debug("Ending delete repos (queued for deletion: %s)" % n) def start_currency_checker(session, repomgr): @@ -972,6 +985,15 @@ def start_delete_loop(session, repomgr): return thread +def start_rmtree_loop(session, repomgr): + subsession = session.subsession() + thread = threading.Thread(name='rmtreeLoop', + target=repomgr.rmtreeLoop, args=(subsession,)) + thread.setDaemon(True) + thread.start() + return thread + + def main(options, session): repomgr = RepoManager(options, session) repomgr.readCurrentRepos() @@ -984,12 +1006,12 @@ def main(options, session): curr_ext_chk_thread = start_external_currency_checker(session, repomgr) regen_thread = start_regen_loop(session, repomgr) delete_thread = start_delete_loop(session, repomgr) + rmtree_thread = start_rmtree_loop(session, repomgr) # TODO also move rmtree jobs to threads logger.info("Entering main loop") while True: try: repomgr.updateRepos() - repomgr.checkQueue() repomgr.printState() repomgr.pruneLocalRepos() if not curr_chk_thread.is_alive(): @@ -1004,6 +1026,9 @@ def main(options, session): if not delete_thread.is_alive(): logger.error("Delete thread died. Restarting it.") delete_thread = start_delete_loop(session, repomgr) + if not rmtree_thread.is_alive(): + logger.error("rmtree thread died. Restarting it.") + rmtree_thread = start_rmtree_loop(session, repomgr) except KeyboardInterrupt: logger.warning("User exit") break