From ac25ead3477b775cd1c3f68d0ff42b8527b15fbf Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Aug 04 2017 18:17:12 +0000 Subject: Thread the other side of the equation too. Signed-off-by: Ralph Bean --- diff --git a/scripts/pdc/sync-branches-from-pkgdb.py b/scripts/pdc/sync-branches-from-pkgdb.py index 1b7429c..7ab0428 100644 --- a/scripts/pdc/sync-branches-from-pkgdb.py +++ b/scripts/pdc/sync-branches-from-pkgdb.py @@ -16,6 +16,7 @@ from __future__ import print_function import argparse import json +import multiprocessing.pool import os import sys import time @@ -59,7 +60,7 @@ def _pkgdb_data_by_page(page, tries=1): return json.loads(f.read()) url = 'https://admin.fedoraproject.org/pkgdb/api/packages/?acls=True&limit=5&page=%i' % page - print(" Querying %r" % url, end='...') + print(" Querying %r" % url) sys.stdout.flush() start = time.time() response = requests.get(url) @@ -68,7 +69,7 @@ def _pkgdb_data_by_page(page, tries=1): raise IOError("Tried 5 times. Giving up.") print(" ! Failed, %r, %i times. Trying again." % (response, tries)) return _pkgdb_data_by_page(page, tries+1) - print(" Query took %r seconds" % (time.time() - start)) + print(" pkgdb query took %r seconds" % (time.time() - start)) data = response.json() print("Writing cache of pkgdb information to %r" % cache_file) @@ -77,12 +78,18 @@ def _pkgdb_data_by_page(page, tries=1): return data -def pkgdb_data(): +def feed_pkgdb_data(q): initial = 0 - for page in range(initial, _pkgdb_data_by_page(initial)['page_total']): + total = _pkgdb_data_by_page(initial)['page_total'] + pages = range(total) + + def _handle_page(page): data = _pkgdb_data_by_page(page) for entry in data['packages']: - yield entry + q.put(entry) + + pool = multiprocessing.pool.ThreadPool(5) + pool.map(_handle_page, pages) def get_implicit_slas(branchname): @@ -209,7 +216,7 @@ if __name__ == '__main__': q = queue.Queue() # Set up N workers to pull work from a queue of pkgdb entries - N = 5 + N = 10 def pull_work(): while True: print("Worker found %i items on the queue" % q.qsize()) @@ -217,7 +224,6 @@ if __name__ == '__main__': if entry is StopIteration: print("Worker found StopIteration. Shutting down.") break - print("Worker is handling a pkgdb entry.") do_work(entry) workers = [threading.Thread(target=pull_work) for i in range(N)] for worker in workers: @@ -225,13 +231,12 @@ if __name__ == '__main__': try: # Feed their queue of pkgdb entries. They work on them in parallel. - export = pkgdb_data() - for entry in export: - q.put(entry) + feed_pkgdb_data(q) except: print("Clearing the queue for premature shutdown.") with q.mutex: q.queue.clear() + raise finally: # Wrap up. Tell the workers to stop and wait for them to be done. for worker in workers: diff --git a/scripts/pdc/utilities.py b/scripts/pdc/utilities.py index 5b254de..5879f9c 100644 --- a/scripts/pdc/utilities.py +++ b/scripts/pdc/utilities.py @@ -41,18 +41,21 @@ def ensure_component_branches(pdc, package, slas, eol, branch, type, critpath, f endpoint = pdc['component-branch-slas'] # A base query base = dict(branch=branch, global_component=package, branch_type=type) + modified = [] for sla in slas: # See if the sla already exists on the branch: results = list(pdc.get_paged(endpoint, sla=sla, **base)) if results: - print(" sla {sla: <16} already exists for {branch_type}/" - "{global_component}#{branch}".format(sla=sla, **base)) continue + + # See if user wants intervention message = "Apply sla %r to %r" % (sla, base) if not prompt(message, force): print("Not applying sla %r to %r" % (sla, base)) continue - print("Applying sla %r to %r (critpath %r)" % (sla, base, critpath)) + + # Do it. + modified.append(sla) payload = dict( sla=sla, eol=eol, @@ -64,3 +67,9 @@ def ensure_component_branches(pdc, package, slas, eol, branch, type, critpath, f ) ) endpoint._(payload) + + # Report at the end. + if modified: + print("Applied %r to %r (critpath %r)" % (modified, base, critpath)) + else: + print("Did not apply any slas to %r (critpath %r)" % (base, critpath))