From 7f0ef2b058f8948373e5dcac1dc7677dadfc8f42 Mon Sep 17 00:00:00 2001 From: Michael Simacek Date: Oct 12 2015 09:07:48 +0000 Subject: Start parallel_generator immediately --- diff --git a/koschei/resolver.py b/koschei/resolver.py index 2c1b550..4111561 100644 --- a/koschei/resolver.py +++ b/koschei/resolver.py @@ -260,26 +260,29 @@ class GenerateRepoTask(AbstractResolverTask): brs = util.get_rpm_requires(self.koji_session, [p.srpm_nvra for p in packages]) brs = util.parallel_generator(brs, queue_size=None) - with self.repo_cache.get_sack(repo_id) as sack: - if not sack: - self.log.error('Cannot generate repo: {}'.format(repo_id)) - self.db.rollback() - return - repo.base_resolved, base_problems, _ = self.resolve_dependencies(sack, []) - resolution_time.stop() - if not repo.base_resolved: - self.log.info("Build group not resolvable") - self.db.add(repo) - self.db.flush() - self.db.execute(BuildrootProblem.__table__.insert(), - [{'repo_id': repo.repo_id, 'problem': problem} - for problem in base_problems]) - self.db.commit() - return - self.log.info("Resolving dependencies...") - resolution_time.start() - self.generate_dependency_changes(sack, packages, brs, repo_id) - resolution_time.stop() + try: + with self.repo_cache.get_sack(repo_id) as sack: + if not sack: + self.log.error('Cannot generate repo: {}'.format(repo_id)) + self.db.rollback() + return + repo.base_resolved, base_problems, _ = self.resolve_dependencies(sack, []) + resolution_time.stop() + if not repo.base_resolved: + self.log.info("Build group not resolvable") + self.db.add(repo) + self.db.flush() + self.db.execute(BuildrootProblem.__table__.insert(), + [{'repo_id': repo.repo_id, 'problem': problem} + for problem in base_problems]) + self.db.commit() + return + self.log.info("Resolving dependencies...") + resolution_time.start() + self.generate_dependency_changes(sack, packages, brs, repo_id) + resolution_time.stop() + finally: + brs.stop() self.db.add(repo) self.db.commit() total_time.stop() diff --git a/koschei/util.py b/koschei/util.py index 83b051e..5372cc1 100644 --- a/koschei/util.py +++ b/koschei/util.py @@ -168,26 +168,40 @@ def itercall(koji_session, args, koji_call): args = args[chunk_size:] -def parallel_generator(generator, queue_size=1000): - queue = Queue(maxsize=queue_size) +class parallel_generator(object): sentinel = object() - worker_exception = [StopIteration] - def worker(): + + def __init__(self, generator, queue_size=1000): + self.generator = generator + self.queue = Queue(maxsize=queue_size) + self.worker_exception = StopIteration + self.stop_thread = False + self.thread = Thread(target=self.worker_fn) + self.thread.daemon = True + self.thread.start() + + def worker_fn(self): try: - for item in generator: - queue.put(item) + for item in self.generator: + self.queue.put(item) + if self.stop_thread: + return except Exception as e: - worker_exception[0] = e + self.worker_exception = e finally: - queue.put(sentinel) - thread = Thread(target=worker) - thread.daemon = True - thread.start() - while True: - item = queue.get() - if item is sentinel: - raise worker_exception[0] - yield item + self.queue.put(self.sentinel) + + def __iter__(self): + return self + + def next(self): + item = self.queue.get() + if item is self.sentinel: + raise self.worker_exception # StopIteration in case of success + return item + + def stop(self): + self.stop_thread = True def prepare_build_opts(opts=None):