#16 new script for fixing build volume symlinks
Merged 5 years ago by mikem. Opened 5 years ago by mikem.
mikem/koji-tools fix-volume-symlink  into  master

@@ -0,0 +1,509 @@ 

+ #!/usr/bin/python

+ 

+ import Queue   # for exceptions

+ import cProfile

+ import grp

+ import logging

+ import multiprocessing

+ import optparse

+ import os

+ import pwd

+ import sys

+ import threading

+ import time

+ 

+ import dateutil

+ import koji as _koji  # koji declared using profile module in main()

+ koji = _koji  # until main() replaces

+ 

+ 

+ logger = logging.getLogger('koji.fixsymlinks`')

+ 

+ # events to indicate various completions

+ feeder_done = multiprocessing.Event()

+ workers_done = multiprocessing.Event()

+ 

+ # queue for builds to consider

+ build_queue = multiprocessing.Queue()

+ 

+ 

+ def main():

+     global koji

+     parser = optparse.OptionParser(usage='%prog [options]')

+     parser.add_option('-p', '--profile', default='koji',

+                       help='pick a profile')

+     parser.add_option('-j', '--jobs', default=5, type='int',

+                       help='worker count')

+     parser.add_option('--with-profiler', action='store_true',

+                       help='use python profiler')

+     parser.add_option('-n', '--test', action='store_true',

+                       help="don't actually change volumes")

+     parser.add_option('--uid', default=-1,

+                       help="set symlink owner to uid")

+     parser.add_option('--gid', default=-1,

+                       help="set symlink group to gid")

+ 

+     # verbosity options

+     parser.add_option("-d", "--debug", action="store_true",

+                       help="show debug output")

+     parser.add_option("-v", "--verbose", action="store_true",

+                       help="show verbose output")

+     parser.add_option("-q", "--quiet", action="store_true", default=False,

+                       help="run quietly")

+ 

+     # build selection options

+     parser.add_option("--buildid", help="Check specific build from ID or nvr")

+     parser.add_option("--package", help="Check builds for this package")

+     parser.add_option("--before",

+                       help="Check builds built before this time")

+     parser.add_option("--after",

+                       help="Check builds built after this time")

+     parser.add_option("--type", help="Check builds of this type.")

+     parser.add_option("--owner", help="Check builds built by this owner")

+     parser.add_option("--volume", help="Check builds by volume ID")

+     parser.add_option("--prefix", help="Only check packages starting with this prefix")

+     parser.add_option("--tag", help="Only builds in the given tag (with inheritance)")

+     parser.add_option("--file", help="Only builds listed in given file")

+ 

+     opts, args = parser.parse_args()

+ 

+     if args:

+         parser.error('This script takes no arguments')

+ 

+     for opt in 'uid', 'gid':

+         val = getattr(opts, opt)

+         if val == -1:

+             continue

+         if os.getuid():

+             parser.error('Setting uid/gid requires root')

+         if val.isdigit():

+             setattr(opts, opt, int(val))

+ 

+     if not isinstance(opts.uid, int):

+         opts.uid = pwd.getpwnam(opts.uid).pw_uid

+     if not isinstance(opts.gid, int):

+         opts.gid = grp.getgrnam(opts.gid).gr_gid

+ 

+     if not opts.test:

+         sys.stderr.write('Running without --test -- changes will be made\n')

+         time.sleep(5)

+ 

+     koji = _koji.get_profile_module(opts.profile)

+ 

+     for name in ('cert', 'serverca'):

+         value = os.path.expanduser(getattr(koji.config, name))

+         setattr(koji.config, name, value)

+ 

+     top_logger = logging.getLogger("koji")

+     handler = logging.StreamHandler(sys.stdout)

+     handler.setLevel(logging.DEBUG)

+     handler.setFormatter(logging.Formatter('%(asctime)s %(process)s [%(levelname)s] %(name)s: %(message)s'))

+     top_logger.addHandler(handler)

+     if opts.debug:

+         top_logger.setLevel(logging.DEBUG)

+     elif opts.quiet:

+         top_logger.setLevel(logging.ERROR)

+     elif opts.verbose:

+         top_logger.setLevel(logging.INFO)

+     else:

+         top_logger.setLevel(logging.WARN)

+ 

+     Process = multiprocessing.Process

+     if opts.with_profiler:

+         Process = ProfiledProcess

+ 

+     tracker = StatsTracker()

+     # still a thread

+     s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,))

+     s_thread.daemon = True

+     s_thread.start()

+ 

+     # start feeder

+     feeder = Process(name='feeder', target=feeder_main,

+                                args=(opts, args))

+     feeder.daemon = True

+     feeder.start()

+ 

+     # start workers

+     workers = []

+     for i in range(opts.jobs):

+         worker = Process(name='worker %i' % i, target=worker_main,

+                                    args=(opts,))

+         worker.daemon = True

+         worker.start()

+         workers.append(worker)

+ 

+     # input thread

+     i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,))

+     i_thread.daemon = True

+     i_thread.start()

+ 

+     # feeder will be the first to finish

+     feeder.join()

+     feeder_done.set()

+ 

+     # all the work is in subprocesses, so we just wait

+     # the order of waiting matters

+     build_queue.close()

+     build_queue.join_thread()

+     assert build_queue.empty()

+ 

+     logger.info('Waiting for workers to stop.')

+     for worker in workers:

+         worker.join()

+     logger.info('Workers finished')

+     workers_done.set()

+ 

+     s_thread.join()

+     tracker.report()

+     if tracker.failed_builds:

+         sys.exit(1)

+ 

+ 

+ def get_session():

+     """Get a subsession if logged in, clone session if not"""

+     session_opts = koji.grab_session_options(koji.config)

+     session_opts['anon_retry'] = True

+     session_opts['offline_retry'] = True

+     return koji.ClientSession(koji.config.server, session_opts)

+ 

+ 

+ class ProfiledProcess(multiprocessing.Process):

+ 

+     profile_lock = multiprocessing.Lock()

+ 

+     def run(self):

+         profiler = cProfile.Profile()

+         try:

+             return profiler.runcall(multiprocessing.Process.run, self)

+         finally:

+             with self.profile_lock:

+                 profiler.print_stats(sort='cumulative')

+ 

+ 

+ def input_thread(tracker):

+     '''Show stats if user hits enter'''

+     while True:

+         sys.stdin.readline()

+         tracker.report(sys.stderr)

+ 

+ 

+ # queue for stats

+ stats_queue = multiprocessing.Queue()

+ 

+ 

+ def stats_thread(tracker):

+     '''Handle stats queue'''

+     # this one remains a thread

+     while True:

+         try:

+             method, args, kw = stats_queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if workers_done.is_set():

+                 # is this enough?

+                 break

+             continue

+         if method not in ('increment', 'fail_build'):

+             raise ValueError('Invalid stats method: %s' % method)

+         handler = getattr(tracker, method)

+         handler(*args, **kw)

+ 

+ 

+ class StatsProxy(object):

+ 

+     def increment(self, *args, **kw):

+         stats_queue.put(['increment', args, kw])

+ 

+     def fail_build(self, *args, **kw):

+         stats_queue.put(['fail_build', args, kw])

+ 

+ 

+ stats = StatsProxy()

+ 

+ 

+ class StatsTracker(object):

+ 

+     def __init__(self):

+         self.counters = {}

+         self.start = time.time()

+         self.failed_builds = {}

+ 

+     def increment(self, name, delta=1):

+         # optimizing for the default case

+         try:

+             self.counters[name] += delta

+         except KeyError:

+             self.counters[name] = delta

+ 

+     def fail_build(self, build, key):

+         build_id = build['build_id']

+         if build_id in self.failed_builds:

+             return

+         self.failed_builds[build_id] = [build['nvr'], key]

+         logger.warn('Action failed: %s (%s)', build['nvr'], key)

+ 

+     def report(self, fp=None):

+         if fp is None:

+             fp = sys.stdout

+         elapsed = time.time() - self.start

+         fp.write('Elapsed time: %i seconds\n' % elapsed)

+         for key in sorted(self.counters):

+             val = self.counters[key]

+             fp.write('%13i  %s\n' % (val, key))

+         fp.write('%13i  failed builds\n' % len(self.failed_builds))

+ 

+ 

+ def feeder_main(opts, args):

+     '''Fetch builds and feed them into the build queue'''

+     global session

+     session = get_session()

+ 

+     for i, build in enumerate(get_builds(opts, args), start=1):

+         while build_queue.qsize() > 1000:

+             # avoid overloading the queue

+             logger.debug('feeder waiting. build queue is large')

+             time.sleep(5)

+         stats.increment('build.queued')

+         build_queue.put([build, opts])

+         logger.debug("%i: queueing build %s", i, build['nvr'])

+     logger.info('%i builds queued', i)

+     build_queue.close()

+ 

+ 

+ def get_builds(options, args):

+     '''Yield all requested builds'''

+     if options.buildid:

+         try:

+             buildid = int(options.buildid)

+         except ValueError:

+             buildid = options.buildid

+         binfo = session.getBuild(buildid, strict=True)

+         yield binfo

+         return

+     if options.file:

+         for line in open(options.file, 'r'):

+             binfo = session.getBuild(line.strip())

+             if not binfo:

+                 logger.warn('No such build: %s', line)

+                 continue

+             yield binfo

+         return

+     if options.tag:

+         for binfo in get_tagged_builds(options, args):

+             yield binfo

+         return

+     chunksize = 10000

+     opts = {}

+     opts['queryOpts'] = {'order': 'build.id', 'offset': 0, 'limit': chunksize}

+     opts['state'] = koji.BUILD_STATES['COMPLETE']

+     for key in ('type', 'prefix'):

+         value = getattr(options, key)

+         if value is not None:

+             opts[key] = value

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['packageID'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['userID'] = user['id']

+     if options.volume:

+         try:

+             opts['volumeID'] = int(options.volume)

+         except ValueError:

+             volumes = session.listVolumes()

+             volumeID = None

+             for volume in volumes:

+                 if options.volume == volume['name']:

+                     volumeID = volume['id']

+             if volumeID is None:

+                 raise ValueError("Invalid volume option")

+             opts['volumeID'] = volumeID

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         try:

+             ts = float(val)

+             setattr(options, opt, ts)

+             continue

+         except ValueError:

+             pass

+         try:

+             dt = dateutil.parser.parse(val)

+             ts = time.mktime(dt.timetuple())

+             setattr(options, opt, ts)

+         except:

+             raise ValueError("Invalid time specification: %s" % val)

+     if options.before:

+         opts['completeBefore'] = getattr(options, 'before')

+     if options.after:

+         opts['completeAfter'] = getattr(options, 'after')

+ 

+     while True:

+         chunk = session.listBuilds(**opts)

+         if not chunk:

+             break

+         opts['queryOpts']['offset'] += chunksize

+         for build in chunk:

+             yield build

+ 

+ 

+ def get_tagged_builds(options, args):

+     opts = {'inherit': True}

+     opts['tag'] = options.tag

+     if options.type is not None:

+         opts['type'] = options.type

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['package'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['owner'] = user['id']

+     if options.volume:

+         raise ValueError('The --volume option is incompatible with --tag')

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         raise ValueError('The --%s option is incompatible with --tag' % opt)

+ 

+     return session.listTagged(**opts)

+ 

+ 

+ def worker_main(opts):

+     '''Handle builds in queue'''

+     global session

+     session = get_session()

+ 

+     while True:

+         try:

+             build, opts = build_queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if feeder_done.is_set():

+                 # is this enough?

+                 break

+             continue

+         try:

+             checker = BuildHandler(build, opts)

+             checker.run()

+         except Exception:

+             stats.fail_build(build, 'unknown_error')

+             logger.exception('Unexpected error')

+ 

+ 

+ def format_bw(n_bytes, seconds):

+     if seconds == 0:

+         return '??? Mbytes/sec'

+     return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024)

+ 

+ 

+ class BuildHandler(object):

+ 

+     def __init__(self, build, options):

+         stats.increment('build.checked')

+         self.build = build

+         self.options = options

+ 

+     def run(self):

+         try:

+             self.fix_symlink()

+         except Exception:

+             logger.exception('Unable to fix symlink')

+             stats.fail_build(self.build, 'unknown_error')

+             return

+ 

+     def fix_symlink(self):

+         if self.build['volume_name'] == 'DEFAULT':

+             stats.increment('build.default_volume')

+             return

+ 

+         build_dir = koji.pathinfo.build(self.build)

+         if not os.path.isdir(build_dir):

+             logger.error('Build directory missing: %s', build_dir)

+             stats.fail_build(self.build, 'missing_dir')

+             return

+ 

+         # figure out symlink location and target

+         base_binfo = self.build.copy()

+         base_binfo['volume_name'] = 'DEFAULT'

+         base_binfo['volume_id'] = 0

+         basedir = koji.pathinfo.build(base_binfo)

+         relpath = os.path.relpath(build_dir, os.path.dirname(basedir))

+ 

+         if os.path.islink(basedir):

+             actual = os.readlink(basedir)

+             if actual == relpath:

+                 # we're good

+                 stats.increment('build.link_ok')

+                 return

+             logger.warn('Wrong volume symlink: %s -> %s', basedir, actual)

+             stats.increment('build.wrong_link')

+             if not self.options.test:

+                 os.unlink(basedir)

+         elif os.path.exists(basedir):

+             # exists, but not a symlink

+             logger.warn('Unexpected base volume content: %s', basedir)

+             self.fail('vol_symlink.not_a_link')

+         else:

+             stats.increment('build.link_missing')

+             self.ensuredir(os.path.dirname(basedir))

+ 

+         if self.options.test:

+             logger.warn('Would have fixed symlink: %s -> %s', basedir, relpath)

+             return

+         logger.warn('Fixing volume symlink: %s -> %s', basedir, relpath)

+         os.symlink(relpath, basedir)

+         if self.options.uid != -1 or self.options.gid != -1:

+             os.lchown(basedir, self.options.uid, self.options.gid)

+ 

+     def ensuredir(self, directory):

+         """Mostly same as koji lib function, but honors our options"""

+         if os.path.exists(directory):

+             if not os.path.isdir(directory):

+                 raise OSError("Not a directory: %s" % directory)

+         else:

+             head, tail = os.path.split(directory)

+             if not tail and head == directory:

+                 # can only happen if directory == '/' or equivalent

+                 # (which obviously should not happen)

+                 raise OSError("root directory missing? %s" % directory)

+             if head:

+                 self.ensuredir(head)

+             # note: if head is blank, then we've reached the top of a relative path

+             if self.options.test:

+                 logger.info('Would have created directory: %s', directory)

+                 return

+             try:

+                 os.mkdir(directory)

+             except OSError:

+                 #thrown when dir already exists (could happen in a race)

+                 if not os.path.isdir(directory):

+                     #something else must have gone wrong

+                     raise

+             if self.options.uid != -1 or self.options.gid != -1:

+                 os.lchown(directory, self.options.uid, self.options.gid)

+         return directory

+ 

+ 

+ if __name__ == '__main__':

+     main()

no initial comment

I noticed that if the symlink is pointing to the volume this script unlinks it and makes note of it in the stats - would there be enough information here to fix the symlink with the script directly?

Otherwise LGTM

I noticed that if the symlink is pointing to the volume this script unlinks it and makes note of it in the stats

Here's that section:

        if os.path.islink(basedir):
            actual = os.readlink(basedir)
            if actual == relpath:
                # we're good
                stats.increment('build.link_ok')
                return

(Note that we return if the symlink is already correct. If we continue in this branch, then the existing link is incorrect. To correct it we have to unlink and replace with the correct symlink)

            logger.warn('Wrong volume symlink: %s -> %s', basedir, actual)
            stats.increment('build.wrong_link')
            if not self.options.test:
                os.unlink(basedir)

Does this make sense? Still concerned?

Ah, right, it is unlinked there and the new, correct symlink is created on line 475.

LGTM!

rebased onto 9f1b37bd97fa630b91764eaab516cd85d515ae2a

5 years ago

rebased onto 9781bcc

5 years ago

Commit 34423a2 fixes this pull-request

Pull-Request has been merged by mikem

5 years ago
Metadata