From 653a1d21d26fc03e0fe5fcb3c5f47432e0b0ac78 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Feb 12 2021 16:18:58 +0000 Subject: koji-clean-repos: refactor processes --- diff --git a/src/bin/koji-clean-repos b/src/bin/koji-clean-repos index 08c1d64..60bad57 100755 --- a/src/bin/koji-clean-repos +++ b/src/bin/koji-clean-repos @@ -1,20 +1,12 @@ #!/usr/bin/python -from multiprocessing.queues import Empty, Full -import cProfile -from functools import partial -import hashlib import logging import multiprocessing import optparse import os import sys -import threading -import time -import dateutil import koji as _koji # koji declared using profile module in main() -import rpm ''' @@ -40,8 +32,6 @@ def main(): help='pick a profile') parser.add_option('-j', '--jobs', default=5, type='int', help='worker count') - parser.add_option('--with-profiler', action='store_true', - help='use python profiler') # verbosity options parser.add_option("-d", "--debug", action="store_true", @@ -76,173 +66,32 @@ def main(): else: top_logger.setLevel(logging.WARN) - Process = multiprocessing.Process - if opts.with_profiler: - Process = ProfiledProcess + logger.info('Starting pool') + pool = multiprocessing.Pool(opts.jobs) - tracker = StatsTracker() - # still a thread - s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,)) - s_thread.daemon = True - s_thread.start() - - # start feeder - feeder = Process(name='feeder', target=feeder_main, - args=(opts, args)) - feeder.daemon = True - feeder.start() - - # start workers - workers = [] - for i in range(opts.jobs): - worker = Process(name='worker %i' % i, target=worker_main, - args=(opts,)) - worker.daemon = True - worker.start() - workers.append(worker) - - # input thread - i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,)) - i_thread.daemon = True - i_thread.start() - - # feeder will be the first to finish - feeder.join() - feeder_done.set() - - # wait for the queue to be empty - queue.close() - queue.join_thread() # XXX is this right? - - # assert queue.empty() - - logger.info('Finished. Waiting for workers to stop.') - for worker in workers: - worker.join() - logger.info('Workers finished') - - s_thread.join() - tracker.report() - if tracker.failed_builds: - sys.exit(1) - - -class ProfiledProcess(multiprocessing.Process): - - profile_lock = multiprocessing.Lock() - - def run(self): - profiler = cProfile.Profile() - try: - return profiler.runcall(multiprocessing.Process.run, self) - finally: - with self.profile_lock: - profiler.print_stats(sort='cumulative') + jobs = [] + maxqueue = 1000 + for repo_id, path in iter_repo_dirs(): + # wait for room in queue + while len(jobs) > maxqueue: + jobs = [j for j in jobs if not j.ready()] + jobs.append(pool.apply_async(check_repo, [repo_id, path])) -def input_thread(tracker): - '''Show stats if user hits enter''' - while True: - sys.stdin.readline() - tracker.report(sys.stderr) + # wait for remaining jobs + while jobs: + jobs = [j for j in jobs if not j.ready()] def new_session(): '''Get a new anonymous session''' + # TODO better session management session_opts = koji.grab_session_options(koji.config) session_opts['anon_retry'] = True session_opts['offline_retry'] = True return koji.ClientSession(koji.config.server, session_opts) -# queue for stats -stats_queue = multiprocessing.Queue() - - -def stats_thread(tracker): - '''Handle stats queue''' - # this one remains a thread - while True: - try: - method, args, kw = stats_queue.get(block=True, timeout=5) - except Empty: - if feeder_done.is_set() and queue.empty(): - # is this enough? - break - continue - if method not in ('increment', 'fail_build'): - raise ValueError('Invalid stats method: %s' % method) - handler = getattr(tracker, method) - handler(*args, **kw) - - -class StatsProxy(object): - - def increment(self, *args, **kw): - stats_queue.put(['increment', args, kw]) - - def fail_build(self, *args, **kw): - stats_queue.put(['fail_build', args, kw]) - - -stats = StatsProxy() - - -class StatsTracker(object): - - def __init__(self): - self.counters = {} - self.start = time.time() - self.failed_builds = {} - - def increment(self, name, delta=1): - # optimizing for the default case - try: - self.counters[name] += delta - except KeyError: - self.counters[name] = delta - - def fail_build(self, build, key): - build_id = build['build_id'] - if build_id in self.failed_builds: - return - self.failed_builds[build_id] = [build['nvr'], key] - logger.warn('Build check failed: %s (%s)', build['nvr'], key) - - def report(self, fp=None): - if fp is None: - fp = sys.stdout - elapsed = time.time() - self.start - fp.write('Elapsed time: %i seconds\n' % elapsed) - n_bytes = 0 - b_time = 0 - for key in sorted(self.counters): - val = self.counters[key] - if key.endswith('.bytes'): - n_bytes += val - elif key.endswith('.time'): - b_time += val - fp.write('%13i %s\n' % (val, key)) - fp.write('%13i failed builds\n' % len(self.failed_builds)) - fp.write('Bytes average throughput: %s\n' % format_bw(n_bytes, b_time)) - fp.write('Bytes overall throughput: %s\n' % format_bw(n_bytes, elapsed)) - - -def feeder_main(opts, args): - '''Scan repos on disc and feed them into a queue''' - reposdir = '%s/repos' % (koji.pathinfo.topdir) - for repo_id, path in iter_repo_dirs(): - while queue.qsize() > 5000: - # avoid overloading the queue - logger.debug('feeder waiting. queue is large') - time.sleep(5) - stats.increment('repo.queued') - queue.put([repo_id, path, opts]) - logger.debug("%i: queueing repo %s", repo_id, path) - logger.info('all repos queued') - queue.close() - - def iter_repo_dirs(): """Scan repos on disk and yield (id, path) pairs""" reposdir = '%s/repos' % (koji.pathinfo.topdir) @@ -257,57 +106,24 @@ def iter_repo_dirs(): yield int(repodir.name), repodir.path -def worker_main(opts): - '''Handle tasks in queue''' - global session +def check_repo(repo_id, path): session = new_session() - while True: - try: - repo_id, path, opts = queue.get(block=True, timeout=5) - except Empty: - if feeder_done.is_set(): - # is this enough? - break - continue - try: - checker = RepoChecker(repo_id, path, opts) - checker.check() - except Exception: - stats.fail_build(build, 'unknown_error') - logger.exception('Unexpected error') - - -def format_bw(n_bytes, seconds): - if seconds == 0: - return '??? Mbytes/sec' - return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024) - - -class RepoChecker(object): - - def __init__(self, repo_id, path, options): - stats.increment('repo.started') - self.repo_id = repo_id - self.path = path - self.options = options - - def check(self): - rinfo = session.repoInfo(self.repo_id, strict=False) - if not rinfo: - return - state = koji.REPO_STATES[rinfo['state']] - if state != 'DELETED': - logger.info('Ignoring repo %s in state %s', self.repo_id, state) - - # ok, let's delete it - logger.info('Removing deleted repo: %s', self.path) - self.delete() - stats.increment('repo.finished') - - def delete(self): - # TODO - pass - stats.increment('repo.deleted') + rinfo = session.repoInfo(repo_id, strict=False) + if not rinfo: + return + state = koji.REPO_STATES[rinfo['state']] + if state != 'DELETED': + logger.info('Ignoring repo %s in state %s', repo_id, state) + return + + # ok, let's delete it + logger.info('Removing deleted repo: %s', path) + do_delete(path) + + +def do_delete(path): + # TODO + pass if __name__ == '__main__':