From 4900b8d3f5745c474f2074368bf6490969e02db3 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Feb 12 2021 15:51:56 +0000 Subject: new script: koji-clean-repos --- diff --git a/src/bin/koji-clean-repos b/src/bin/koji-clean-repos new file mode 100755 index 0000000..08c1d64 --- /dev/null +++ b/src/bin/koji-clean-repos @@ -0,0 +1,314 @@ +#!/usr/bin/python + +from multiprocessing.queues import Empty, Full +import cProfile +from functools import partial +import hashlib +import logging +import multiprocessing +import optparse +import os +import sys +import threading +import time + +import dateutil +import koji as _koji # koji declared using profile module in main() +import rpm + + +''' +This script is for cleanup up a large backlog of deleted repos. +Normally kojira should take care of this, but there have been situations +where a kojira bug leaves repos in place. +''' + + +logger = logging.getLogger('koji.cleanrepos') + +# an event to indicate that the feeder is done +feeder_done = multiprocessing.Event() + +# queue to hold actions in the pipeline +queue = multiprocessing.Queue() + + +def main(): + global koji + parser = optparse.OptionParser(usage='%prog [options]') + parser.add_option('-p', '--profile', default='koji', + help='pick a profile') + parser.add_option('-j', '--jobs', default=5, type='int', + help='worker count') + parser.add_option('--with-profiler', action='store_true', + help='use python profiler') + + # verbosity options + parser.add_option("-d", "--debug", action="store_true", + help="show debug output") + parser.add_option("-v", "--verbose", action="store_true", + help="show verbose output") + parser.add_option("-q", "--quiet", action="store_true", default=False, + help="run quietly") + + opts, args = parser.parse_args() + + if args: + parser.error('This command takes no arguments. See --help for options') + + koji = _koji.get_profile_module(opts.profile) + + for name in ('cert', 'serverca'): + value = os.path.expanduser(getattr(koji.config, name)) + setattr(koji.config, name, value) + + top_logger = logging.getLogger("koji") + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG) + handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')) + top_logger.addHandler(handler) + if opts.debug: + top_logger.setLevel(logging.DEBUG) + elif opts.quiet: + top_logger.setLevel(logging.ERROR) + elif opts.verbose: + top_logger.setLevel(logging.INFO) + else: + top_logger.setLevel(logging.WARN) + + Process = multiprocessing.Process + if opts.with_profiler: + Process = ProfiledProcess + + tracker = StatsTracker() + # still a thread + s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,)) + s_thread.daemon = True + s_thread.start() + + # start feeder + feeder = Process(name='feeder', target=feeder_main, + args=(opts, args)) + feeder.daemon = True + feeder.start() + + # start workers + workers = [] + for i in range(opts.jobs): + worker = Process(name='worker %i' % i, target=worker_main, + args=(opts,)) + worker.daemon = True + worker.start() + workers.append(worker) + + # input thread + i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,)) + i_thread.daemon = True + i_thread.start() + + # feeder will be the first to finish + feeder.join() + feeder_done.set() + + # wait for the queue to be empty + queue.close() + queue.join_thread() # XXX is this right? + + # assert queue.empty() + + logger.info('Finished. Waiting for workers to stop.') + for worker in workers: + worker.join() + logger.info('Workers finished') + + s_thread.join() + tracker.report() + if tracker.failed_builds: + sys.exit(1) + + +class ProfiledProcess(multiprocessing.Process): + + profile_lock = multiprocessing.Lock() + + def run(self): + profiler = cProfile.Profile() + try: + return profiler.runcall(multiprocessing.Process.run, self) + finally: + with self.profile_lock: + profiler.print_stats(sort='cumulative') + + +def input_thread(tracker): + '''Show stats if user hits enter''' + while True: + sys.stdin.readline() + tracker.report(sys.stderr) + + +def new_session(): + '''Get a new anonymous session''' + session_opts = koji.grab_session_options(koji.config) + session_opts['anon_retry'] = True + session_opts['offline_retry'] = True + return koji.ClientSession(koji.config.server, session_opts) + + +# queue for stats +stats_queue = multiprocessing.Queue() + + +def stats_thread(tracker): + '''Handle stats queue''' + # this one remains a thread + while True: + try: + method, args, kw = stats_queue.get(block=True, timeout=5) + except Empty: + if feeder_done.is_set() and queue.empty(): + # is this enough? + break + continue + if method not in ('increment', 'fail_build'): + raise ValueError('Invalid stats method: %s' % method) + handler = getattr(tracker, method) + handler(*args, **kw) + + +class StatsProxy(object): + + def increment(self, *args, **kw): + stats_queue.put(['increment', args, kw]) + + def fail_build(self, *args, **kw): + stats_queue.put(['fail_build', args, kw]) + + +stats = StatsProxy() + + +class StatsTracker(object): + + def __init__(self): + self.counters = {} + self.start = time.time() + self.failed_builds = {} + + def increment(self, name, delta=1): + # optimizing for the default case + try: + self.counters[name] += delta + except KeyError: + self.counters[name] = delta + + def fail_build(self, build, key): + build_id = build['build_id'] + if build_id in self.failed_builds: + return + self.failed_builds[build_id] = [build['nvr'], key] + logger.warn('Build check failed: %s (%s)', build['nvr'], key) + + def report(self, fp=None): + if fp is None: + fp = sys.stdout + elapsed = time.time() - self.start + fp.write('Elapsed time: %i seconds\n' % elapsed) + n_bytes = 0 + b_time = 0 + for key in sorted(self.counters): + val = self.counters[key] + if key.endswith('.bytes'): + n_bytes += val + elif key.endswith('.time'): + b_time += val + fp.write('%13i %s\n' % (val, key)) + fp.write('%13i failed builds\n' % len(self.failed_builds)) + fp.write('Bytes average throughput: %s\n' % format_bw(n_bytes, b_time)) + fp.write('Bytes overall throughput: %s\n' % format_bw(n_bytes, elapsed)) + + +def feeder_main(opts, args): + '''Scan repos on disc and feed them into a queue''' + reposdir = '%s/repos' % (koji.pathinfo.topdir) + for repo_id, path in iter_repo_dirs(): + while queue.qsize() > 5000: + # avoid overloading the queue + logger.debug('feeder waiting. queue is large') + time.sleep(5) + stats.increment('repo.queued') + queue.put([repo_id, path, opts]) + logger.debug("%i: queueing repo %s", repo_id, path) + logger.info('all repos queued') + queue.close() + + +def iter_repo_dirs(): + """Scan repos on disk and yield (id, path) pairs""" + reposdir = '%s/repos' % (koji.pathinfo.topdir) + for tagdir in os.scandir(reposdir): + if not tagdir.is_dir() or tagdir.is_symlink(): + continue + for repodir in os.scandir(tagdir.path): + if not repodir.name.isdigit(): + continue + if not repodir.is_dir() or repodir.is_symlink(): + logger.error("unexpected file: %r", repodir.path) + yield int(repodir.name), repodir.path + + +def worker_main(opts): + '''Handle tasks in queue''' + global session + session = new_session() + while True: + try: + repo_id, path, opts = queue.get(block=True, timeout=5) + except Empty: + if feeder_done.is_set(): + # is this enough? + break + continue + try: + checker = RepoChecker(repo_id, path, opts) + checker.check() + except Exception: + stats.fail_build(build, 'unknown_error') + logger.exception('Unexpected error') + + +def format_bw(n_bytes, seconds): + if seconds == 0: + return '??? Mbytes/sec' + return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024) + + +class RepoChecker(object): + + def __init__(self, repo_id, path, options): + stats.increment('repo.started') + self.repo_id = repo_id + self.path = path + self.options = options + + def check(self): + rinfo = session.repoInfo(self.repo_id, strict=False) + if not rinfo: + return + state = koji.REPO_STATES[rinfo['state']] + if state != 'DELETED': + logger.info('Ignoring repo %s in state %s', self.repo_id, state) + + # ok, let's delete it + logger.info('Removing deleted repo: %s', self.path) + self.delete() + stats.increment('repo.finished') + + def delete(self): + # TODO + pass + stats.increment('repo.deleted') + + +if __name__ == '__main__': + main()