From f45fd2e1c462281c2494ef39e13730f306d62ed0 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Aug 24 2022 06:54:28 +0000 Subject: Merge #60 `Port koji-change-volumes to py3` --- diff --git a/src/bin/koji-change-volumes b/src/bin/koji-change-volumes index b3a47c4..b501d68 100755 --- a/src/bin/koji-change-volumes +++ b/src/bin/koji-change-volumes @@ -1,6 +1,7 @@ #!/usr/bin/python -import Queue # for exceptions +from __future__ import absolute_import +import six.moves.queue # for exceptions import cProfile import fnmatch import logging @@ -13,6 +14,7 @@ import time import dateutil import koji as _koji # koji declared using profile module in main() +from six.moves import range koji = _koji # until main() replaces import koji.policy from koji.util import multi_fnmatch @@ -144,9 +146,7 @@ def main(): # all the work is in subprocesses, so we just wait # the order of waiting matters - build_queue.close() - build_queue.join_thread() - assert build_queue.empty() + wait_queue(build_queue) logger.info('Waiting for workers to stop.') for worker in workers: @@ -155,9 +155,7 @@ def main(): workers_done.set() logger.info('Waiting for move queue to clear') - move_queue.close() - move_queue.join_thread() - assert move_queue.empty() + wait_queue(move_queue) logger.info('Waiting for movers to stop') for mover in movers: @@ -170,6 +168,18 @@ def main(): sys.exit(1) +def wait_queue(a_queue): + a_queue.close() + a_queue.join_thread() + for i in range(5): + if a_queue.empty(): + break + time.sleep(1) + else: + # this should not happen, since queue should be empty after join_thread() + logger.error('Queue not empty. Work may be incomplete.') + + def get_session(): """Get a subsession if logged in, clone session if not""" session_opts = koji.grab_session_options(koji.config) @@ -214,7 +224,7 @@ def stats_thread(tracker): while True: try: method, args, kw = stats_queue.get(block=True, timeout=5) - except Queue.Empty: + except six.moves.queue.Empty: if workers_done.is_set() and move_queue.empty(): # is this enough? break @@ -416,7 +426,7 @@ def worker_main(opts): while True: try: build, opts = build_queue.get(block=True, timeout=5) - except Queue.Empty: + except six.moves.queue.Empty: if feeder_done.is_set(): # is this enough? break @@ -439,7 +449,7 @@ def mover_main(opts): while True: try: build, volume, size = move_queue.get(block=True, timeout=5) - except Queue.Empty: + except six.moves.queue.Empty: if workers_done.is_set(): break continue