| |
@@ -0,0 +1,509 @@
|
| |
+ #!/usr/bin/python
|
| |
+
|
| |
+ import Queue # for exceptions
|
| |
+ import cProfile
|
| |
+ import grp
|
| |
+ import logging
|
| |
+ import multiprocessing
|
| |
+ import optparse
|
| |
+ import os
|
| |
+ import pwd
|
| |
+ import sys
|
| |
+ import threading
|
| |
+ import time
|
| |
+
|
| |
+ import dateutil
|
| |
+ import koji as _koji # koji declared using profile module in main()
|
| |
+ koji = _koji # until main() replaces
|
| |
+
|
| |
+
|
| |
+ logger = logging.getLogger('koji.fixsymlinks`')
|
| |
+
|
| |
+ # events to indicate various completions
|
| |
+ feeder_done = multiprocessing.Event()
|
| |
+ workers_done = multiprocessing.Event()
|
| |
+
|
| |
+ # queue for builds to consider
|
| |
+ build_queue = multiprocessing.Queue()
|
| |
+
|
| |
+
|
| |
+ def main():
|
| |
+ global koji
|
| |
+ parser = optparse.OptionParser(usage='%prog [options]')
|
| |
+ parser.add_option('-p', '--profile', default='koji',
|
| |
+ help='pick a profile')
|
| |
+ parser.add_option('-j', '--jobs', default=5, type='int',
|
| |
+ help='worker count')
|
| |
+ parser.add_option('--with-profiler', action='store_true',
|
| |
+ help='use python profiler')
|
| |
+ parser.add_option('-n', '--test', action='store_true',
|
| |
+ help="don't actually change volumes")
|
| |
+ parser.add_option('--uid', default=-1,
|
| |
+ help="set symlink owner to uid")
|
| |
+ parser.add_option('--gid', default=-1,
|
| |
+ help="set symlink group to gid")
|
| |
+
|
| |
+ # verbosity options
|
| |
+ parser.add_option("-d", "--debug", action="store_true",
|
| |
+ help="show debug output")
|
| |
+ parser.add_option("-v", "--verbose", action="store_true",
|
| |
+ help="show verbose output")
|
| |
+ parser.add_option("-q", "--quiet", action="store_true", default=False,
|
| |
+ help="run quietly")
|
| |
+
|
| |
+ # build selection options
|
| |
+ parser.add_option("--buildid", help="Check specific build from ID or nvr")
|
| |
+ parser.add_option("--package", help="Check builds for this package")
|
| |
+ parser.add_option("--before",
|
| |
+ help="Check builds built before this time")
|
| |
+ parser.add_option("--after",
|
| |
+ help="Check builds built after this time")
|
| |
+ parser.add_option("--type", help="Check builds of this type.")
|
| |
+ parser.add_option("--owner", help="Check builds built by this owner")
|
| |
+ parser.add_option("--volume", help="Check builds by volume ID")
|
| |
+ parser.add_option("--prefix", help="Only check packages starting with this prefix")
|
| |
+ parser.add_option("--tag", help="Only builds in the given tag (with inheritance)")
|
| |
+ parser.add_option("--file", help="Only builds listed in given file")
|
| |
+
|
| |
+ opts, args = parser.parse_args()
|
| |
+
|
| |
+ if args:
|
| |
+ parser.error('This script takes no arguments')
|
| |
+
|
| |
+ for opt in 'uid', 'gid':
|
| |
+ val = getattr(opts, opt)
|
| |
+ if val == -1:
|
| |
+ continue
|
| |
+ if os.getuid():
|
| |
+ parser.error('Setting uid/gid requires root')
|
| |
+ if val.isdigit():
|
| |
+ setattr(opts, opt, int(val))
|
| |
+
|
| |
+ if not isinstance(opts.uid, int):
|
| |
+ opts.uid = pwd.getpwnam(opts.uid).pw_uid
|
| |
+ if not isinstance(opts.gid, int):
|
| |
+ opts.gid = grp.getgrnam(opts.gid).gr_gid
|
| |
+
|
| |
+ if not opts.test:
|
| |
+ sys.stderr.write('Running without --test -- changes will be made\n')
|
| |
+ time.sleep(5)
|
| |
+
|
| |
+ koji = _koji.get_profile_module(opts.profile)
|
| |
+
|
| |
+ for name in ('cert', 'serverca'):
|
| |
+ value = os.path.expanduser(getattr(koji.config, name))
|
| |
+ setattr(koji.config, name, value)
|
| |
+
|
| |
+ top_logger = logging.getLogger("koji")
|
| |
+ handler = logging.StreamHandler(sys.stdout)
|
| |
+ handler.setLevel(logging.DEBUG)
|
| |
+ handler.setFormatter(logging.Formatter('%(asctime)s %(process)s [%(levelname)s] %(name)s: %(message)s'))
|
| |
+ top_logger.addHandler(handler)
|
| |
+ if opts.debug:
|
| |
+ top_logger.setLevel(logging.DEBUG)
|
| |
+ elif opts.quiet:
|
| |
+ top_logger.setLevel(logging.ERROR)
|
| |
+ elif opts.verbose:
|
| |
+ top_logger.setLevel(logging.INFO)
|
| |
+ else:
|
| |
+ top_logger.setLevel(logging.WARN)
|
| |
+
|
| |
+ Process = multiprocessing.Process
|
| |
+ if opts.with_profiler:
|
| |
+ Process = ProfiledProcess
|
| |
+
|
| |
+ tracker = StatsTracker()
|
| |
+ # still a thread
|
| |
+ s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,))
|
| |
+ s_thread.daemon = True
|
| |
+ s_thread.start()
|
| |
+
|
| |
+ # start feeder
|
| |
+ feeder = Process(name='feeder', target=feeder_main,
|
| |
+ args=(opts, args))
|
| |
+ feeder.daemon = True
|
| |
+ feeder.start()
|
| |
+
|
| |
+ # start workers
|
| |
+ workers = []
|
| |
+ for i in range(opts.jobs):
|
| |
+ worker = Process(name='worker %i' % i, target=worker_main,
|
| |
+ args=(opts,))
|
| |
+ worker.daemon = True
|
| |
+ worker.start()
|
| |
+ workers.append(worker)
|
| |
+
|
| |
+ # input thread
|
| |
+ i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,))
|
| |
+ i_thread.daemon = True
|
| |
+ i_thread.start()
|
| |
+
|
| |
+ # feeder will be the first to finish
|
| |
+ feeder.join()
|
| |
+ feeder_done.set()
|
| |
+
|
| |
+ # all the work is in subprocesses, so we just wait
|
| |
+ # the order of waiting matters
|
| |
+ build_queue.close()
|
| |
+ build_queue.join_thread()
|
| |
+ assert build_queue.empty()
|
| |
+
|
| |
+ logger.info('Waiting for workers to stop.')
|
| |
+ for worker in workers:
|
| |
+ worker.join()
|
| |
+ logger.info('Workers finished')
|
| |
+ workers_done.set()
|
| |
+
|
| |
+ s_thread.join()
|
| |
+ tracker.report()
|
| |
+ if tracker.failed_builds:
|
| |
+ sys.exit(1)
|
| |
+
|
| |
+
|
| |
+ def get_session():
|
| |
+ """Get a subsession if logged in, clone session if not"""
|
| |
+ session_opts = koji.grab_session_options(koji.config)
|
| |
+ session_opts['anon_retry'] = True
|
| |
+ session_opts['offline_retry'] = True
|
| |
+ return koji.ClientSession(koji.config.server, session_opts)
|
| |
+
|
| |
+
|
| |
+ class ProfiledProcess(multiprocessing.Process):
|
| |
+
|
| |
+ profile_lock = multiprocessing.Lock()
|
| |
+
|
| |
+ def run(self):
|
| |
+ profiler = cProfile.Profile()
|
| |
+ try:
|
| |
+ return profiler.runcall(multiprocessing.Process.run, self)
|
| |
+ finally:
|
| |
+ with self.profile_lock:
|
| |
+ profiler.print_stats(sort='cumulative')
|
| |
+
|
| |
+
|
| |
+ def input_thread(tracker):
|
| |
+ '''Show stats if user hits enter'''
|
| |
+ while True:
|
| |
+ sys.stdin.readline()
|
| |
+ tracker.report(sys.stderr)
|
| |
+
|
| |
+
|
| |
+ # queue for stats
|
| |
+ stats_queue = multiprocessing.Queue()
|
| |
+
|
| |
+
|
| |
+ def stats_thread(tracker):
|
| |
+ '''Handle stats queue'''
|
| |
+ # this one remains a thread
|
| |
+ while True:
|
| |
+ try:
|
| |
+ method, args, kw = stats_queue.get(block=True, timeout=5)
|
| |
+ except Queue.Empty:
|
| |
+ if workers_done.is_set():
|
| |
+ # is this enough?
|
| |
+ break
|
| |
+ continue
|
| |
+ if method not in ('increment', 'fail_build'):
|
| |
+ raise ValueError('Invalid stats method: %s' % method)
|
| |
+ handler = getattr(tracker, method)
|
| |
+ handler(*args, **kw)
|
| |
+
|
| |
+
|
| |
+ class StatsProxy(object):
|
| |
+
|
| |
+ def increment(self, *args, **kw):
|
| |
+ stats_queue.put(['increment', args, kw])
|
| |
+
|
| |
+ def fail_build(self, *args, **kw):
|
| |
+ stats_queue.put(['fail_build', args, kw])
|
| |
+
|
| |
+
|
| |
+ stats = StatsProxy()
|
| |
+
|
| |
+
|
| |
+ class StatsTracker(object):
|
| |
+
|
| |
+ def __init__(self):
|
| |
+ self.counters = {}
|
| |
+ self.start = time.time()
|
| |
+ self.failed_builds = {}
|
| |
+
|
| |
+ def increment(self, name, delta=1):
|
| |
+ # optimizing for the default case
|
| |
+ try:
|
| |
+ self.counters[name] += delta
|
| |
+ except KeyError:
|
| |
+ self.counters[name] = delta
|
| |
+
|
| |
+ def fail_build(self, build, key):
|
| |
+ build_id = build['build_id']
|
| |
+ if build_id in self.failed_builds:
|
| |
+ return
|
| |
+ self.failed_builds[build_id] = [build['nvr'], key]
|
| |
+ logger.warn('Action failed: %s (%s)', build['nvr'], key)
|
| |
+
|
| |
+ def report(self, fp=None):
|
| |
+ if fp is None:
|
| |
+ fp = sys.stdout
|
| |
+ elapsed = time.time() - self.start
|
| |
+ fp.write('Elapsed time: %i seconds\n' % elapsed)
|
| |
+ for key in sorted(self.counters):
|
| |
+ val = self.counters[key]
|
| |
+ fp.write('%13i %s\n' % (val, key))
|
| |
+ fp.write('%13i failed builds\n' % len(self.failed_builds))
|
| |
+
|
| |
+
|
| |
+ def feeder_main(opts, args):
|
| |
+ '''Fetch builds and feed them into the build queue'''
|
| |
+ global session
|
| |
+ session = get_session()
|
| |
+
|
| |
+ for i, build in enumerate(get_builds(opts, args), start=1):
|
| |
+ while build_queue.qsize() > 1000:
|
| |
+ # avoid overloading the queue
|
| |
+ logger.debug('feeder waiting. build queue is large')
|
| |
+ time.sleep(5)
|
| |
+ stats.increment('build.queued')
|
| |
+ build_queue.put([build, opts])
|
| |
+ logger.debug("%i: queueing build %s", i, build['nvr'])
|
| |
+ logger.info('%i builds queued', i)
|
| |
+ build_queue.close()
|
| |
+
|
| |
+
|
| |
+ def get_builds(options, args):
|
| |
+ '''Yield all requested builds'''
|
| |
+ if options.buildid:
|
| |
+ try:
|
| |
+ buildid = int(options.buildid)
|
| |
+ except ValueError:
|
| |
+ buildid = options.buildid
|
| |
+ binfo = session.getBuild(buildid, strict=True)
|
| |
+ yield binfo
|
| |
+ return
|
| |
+ if options.file:
|
| |
+ for line in open(options.file, 'r'):
|
| |
+ binfo = session.getBuild(line.strip())
|
| |
+ if not binfo:
|
| |
+ logger.warn('No such build: %s', line)
|
| |
+ continue
|
| |
+ yield binfo
|
| |
+ return
|
| |
+ if options.tag:
|
| |
+ for binfo in get_tagged_builds(options, args):
|
| |
+ yield binfo
|
| |
+ return
|
| |
+ chunksize = 10000
|
| |
+ opts = {}
|
| |
+ opts['queryOpts'] = {'order': 'build.id', 'offset': 0, 'limit': chunksize}
|
| |
+ opts['state'] = koji.BUILD_STATES['COMPLETE']
|
| |
+ for key in ('type', 'prefix'):
|
| |
+ value = getattr(options, key)
|
| |
+ if value is not None:
|
| |
+ opts[key] = value
|
| |
+ if options.package:
|
| |
+ try:
|
| |
+ opts['packageID'] = int(options.package)
|
| |
+ except ValueError:
|
| |
+ package = session.getPackageID(options.package)
|
| |
+ if package is None:
|
| |
+ raise ValueError('invalid package option')
|
| |
+ opts['packageID'] = package
|
| |
+ if options.owner:
|
| |
+ try:
|
| |
+ opts['userID'] = int(options.owner)
|
| |
+ except ValueError:
|
| |
+ user = session.getUser(options.owner)
|
| |
+ if user is None:
|
| |
+ raise ValueError("Invalid owner option")
|
| |
+ opts['userID'] = user['id']
|
| |
+ if options.volume:
|
| |
+ try:
|
| |
+ opts['volumeID'] = int(options.volume)
|
| |
+ except ValueError:
|
| |
+ volumes = session.listVolumes()
|
| |
+ volumeID = None
|
| |
+ for volume in volumes:
|
| |
+ if options.volume == volume['name']:
|
| |
+ volumeID = volume['id']
|
| |
+ if volumeID is None:
|
| |
+ raise ValueError("Invalid volume option")
|
| |
+ opts['volumeID'] = volumeID
|
| |
+ for opt in ('before', 'after'):
|
| |
+ val = getattr(options, opt)
|
| |
+ if not val:
|
| |
+ continue
|
| |
+ try:
|
| |
+ ts = float(val)
|
| |
+ setattr(options, opt, ts)
|
| |
+ continue
|
| |
+ except ValueError:
|
| |
+ pass
|
| |
+ try:
|
| |
+ dt = dateutil.parser.parse(val)
|
| |
+ ts = time.mktime(dt.timetuple())
|
| |
+ setattr(options, opt, ts)
|
| |
+ except:
|
| |
+ raise ValueError("Invalid time specification: %s" % val)
|
| |
+ if options.before:
|
| |
+ opts['completeBefore'] = getattr(options, 'before')
|
| |
+ if options.after:
|
| |
+ opts['completeAfter'] = getattr(options, 'after')
|
| |
+
|
| |
+ while True:
|
| |
+ chunk = session.listBuilds(**opts)
|
| |
+ if not chunk:
|
| |
+ break
|
| |
+ opts['queryOpts']['offset'] += chunksize
|
| |
+ for build in chunk:
|
| |
+ yield build
|
| |
+
|
| |
+
|
| |
+ def get_tagged_builds(options, args):
|
| |
+ opts = {'inherit': True}
|
| |
+ opts['tag'] = options.tag
|
| |
+ if options.type is not None:
|
| |
+ opts['type'] = options.type
|
| |
+ if options.package:
|
| |
+ try:
|
| |
+ opts['packageID'] = int(options.package)
|
| |
+ except ValueError:
|
| |
+ package = session.getPackageID(options.package)
|
| |
+ if package is None:
|
| |
+ raise ValueError('invalid package option')
|
| |
+ opts['package'] = package
|
| |
+ if options.owner:
|
| |
+ try:
|
| |
+ opts['userID'] = int(options.owner)
|
| |
+ except ValueError:
|
| |
+ user = session.getUser(options.owner)
|
| |
+ if user is None:
|
| |
+ raise ValueError("Invalid owner option")
|
| |
+ opts['owner'] = user['id']
|
| |
+ if options.volume:
|
| |
+ raise ValueError('The --volume option is incompatible with --tag')
|
| |
+ for opt in ('before', 'after'):
|
| |
+ val = getattr(options, opt)
|
| |
+ if not val:
|
| |
+ continue
|
| |
+ raise ValueError('The --%s option is incompatible with --tag' % opt)
|
| |
+
|
| |
+ return session.listTagged(**opts)
|
| |
+
|
| |
+
|
| |
+ def worker_main(opts):
|
| |
+ '''Handle builds in queue'''
|
| |
+ global session
|
| |
+ session = get_session()
|
| |
+
|
| |
+ while True:
|
| |
+ try:
|
| |
+ build, opts = build_queue.get(block=True, timeout=5)
|
| |
+ except Queue.Empty:
|
| |
+ if feeder_done.is_set():
|
| |
+ # is this enough?
|
| |
+ break
|
| |
+ continue
|
| |
+ try:
|
| |
+ checker = BuildHandler(build, opts)
|
| |
+ checker.run()
|
| |
+ except Exception:
|
| |
+ stats.fail_build(build, 'unknown_error')
|
| |
+ logger.exception('Unexpected error')
|
| |
+
|
| |
+
|
| |
+ def format_bw(n_bytes, seconds):
|
| |
+ if seconds == 0:
|
| |
+ return '??? Mbytes/sec'
|
| |
+ return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024)
|
| |
+
|
| |
+
|
| |
+ class BuildHandler(object):
|
| |
+
|
| |
+ def __init__(self, build, options):
|
| |
+ stats.increment('build.checked')
|
| |
+ self.build = build
|
| |
+ self.options = options
|
| |
+
|
| |
+ def run(self):
|
| |
+ try:
|
| |
+ self.fix_symlink()
|
| |
+ except Exception:
|
| |
+ logger.exception('Unable to fix symlink')
|
| |
+ stats.fail_build(self.build, 'unknown_error')
|
| |
+ return
|
| |
+
|
| |
+ def fix_symlink(self):
|
| |
+ if self.build['volume_name'] == 'DEFAULT':
|
| |
+ stats.increment('build.default_volume')
|
| |
+ return
|
| |
+
|
| |
+ build_dir = koji.pathinfo.build(self.build)
|
| |
+ if not os.path.isdir(build_dir):
|
| |
+ logger.error('Build directory missing: %s', build_dir)
|
| |
+ stats.fail_build(self.build, 'missing_dir')
|
| |
+ return
|
| |
+
|
| |
+ # figure out symlink location and target
|
| |
+ base_binfo = self.build.copy()
|
| |
+ base_binfo['volume_name'] = 'DEFAULT'
|
| |
+ base_binfo['volume_id'] = 0
|
| |
+ basedir = koji.pathinfo.build(base_binfo)
|
| |
+ relpath = os.path.relpath(build_dir, os.path.dirname(basedir))
|
| |
+
|
| |
+ if os.path.islink(basedir):
|
| |
+ actual = os.readlink(basedir)
|
| |
+ if actual == relpath:
|
| |
+ # we're good
|
| |
+ stats.increment('build.link_ok')
|
| |
+ return
|
| |
+ logger.warn('Wrong volume symlink: %s -> %s', basedir, actual)
|
| |
+ stats.increment('build.wrong_link')
|
| |
+ if not self.options.test:
|
| |
+ os.unlink(basedir)
|
| |
+ elif os.path.exists(basedir):
|
| |
+ # exists, but not a symlink
|
| |
+ logger.warn('Unexpected base volume content: %s', basedir)
|
| |
+ self.fail('vol_symlink.not_a_link')
|
| |
+ else:
|
| |
+ stats.increment('build.link_missing')
|
| |
+ self.ensuredir(os.path.dirname(basedir))
|
| |
+
|
| |
+ if self.options.test:
|
| |
+ logger.warn('Would have fixed symlink: %s -> %s', basedir, relpath)
|
| |
+ return
|
| |
+ logger.warn('Fixing volume symlink: %s -> %s', basedir, relpath)
|
| |
+ os.symlink(relpath, basedir)
|
| |
+ if self.options.uid != -1 or self.options.gid != -1:
|
| |
+ os.lchown(basedir, self.options.uid, self.options.gid)
|
| |
+
|
| |
+ def ensuredir(self, directory):
|
| |
+ """Mostly same as koji lib function, but honors our options"""
|
| |
+ if os.path.exists(directory):
|
| |
+ if not os.path.isdir(directory):
|
| |
+ raise OSError("Not a directory: %s" % directory)
|
| |
+ else:
|
| |
+ head, tail = os.path.split(directory)
|
| |
+ if not tail and head == directory:
|
| |
+ # can only happen if directory == '/' or equivalent
|
| |
+ # (which obviously should not happen)
|
| |
+ raise OSError("root directory missing? %s" % directory)
|
| |
+ if head:
|
| |
+ self.ensuredir(head)
|
| |
+ # note: if head is blank, then we've reached the top of a relative path
|
| |
+ if self.options.test:
|
| |
+ logger.info('Would have created directory: %s', directory)
|
| |
+ return
|
| |
+ try:
|
| |
+ os.mkdir(directory)
|
| |
+ except OSError:
|
| |
+ #thrown when dir already exists (could happen in a race)
|
| |
+ if not os.path.isdir(directory):
|
| |
+ #something else must have gone wrong
|
| |
+ raise
|
| |
+ if self.options.uid != -1 or self.options.gid != -1:
|
| |
+ os.lchown(directory, self.options.uid, self.options.gid)
|
| |
+ return directory
|
| |
+
|
| |
+
|
| |
+ if __name__ == '__main__':
|
| |
+ main()
|
| |