#11 koji-change-volumes script
Merged 5 years ago by mikem. Opened 5 years ago by mikem.
mikem/koji-tools change-volumes  into  master

@@ -0,0 +1,722 @@ 

+ #!/usr/bin/python

+ 

+ import Queue   # for exceptions

+ import cProfile

+ import fnmatch

+ import logging

+ import multiprocessing

+ import optparse

+ import os

+ import sys

+ import threading

+ import time

+ 

+ import dateutil

+ import koji as _koji  # koji declared using profile module in main()

+ koji = _koji  # until main() replaces

+ import koji.policy

+ from koji.util import multi_fnmatch

+ from koji_cli.lib import activate_session

+ 

+ 

+ logger = logging.getLogger('koji.changevolumes')

+ 

+ # an event to indicate that the feeder is done

+ feeder_done = multiprocessing.Event()

+ 

+ # queue to hold actions in the pipeline

+ queue = multiprocessing.Queue()

+ 

+ 

+ def main():

+     global koji

+     global volume_policy

+     parser = optparse.OptionParser(usage='%prog [options] <policy_file>')

+     parser.add_option('-p', '--profile', default='koji',

+                       help='pick a profile')

+     parser.add_option('-j', '--jobs', default=5, type='int',

+                       help='worker count')

+     parser.add_option('--with-profiler', action='store_true',

+                       help='use python profiler')

+     parser.add_option('-n', '--test', action='store_true',

+                       help="don't actually change volumes")

+     parser.add_option('--max-use', type='int', metavar='PERCENT',

+                       help='skip move if target volume use reaches PERCENT')

+ 

+     # verbosity options

+     parser.add_option("-d", "--debug", action="store_true",

+                       help="show debug output")

+     parser.add_option("-v", "--verbose", action="store_true",

+                       help="show verbose output")

+     parser.add_option("-q", "--quiet", action="store_true", default=False,

+                       help="run quietly")

+ 

+     # build selection options

+     parser.add_option("--buildid", help="Check specific build from ID or nvr")

+     parser.add_option("--package", help="Check builds for this package")

+     parser.add_option("--before",

+                       help="Check builds built before this time")

+     parser.add_option("--after",

+                       help="Check builds built after this time")

+     parser.add_option("--type", help="Check builds of this type.")

+     parser.add_option("--owner", help="Check builds built by this owner")

+     parser.add_option("--volume", help="Check builds by volume ID")

+     parser.add_option("--prefix", help="Only check packages starting with this prefix")

+     parser.add_option("--tag", help="Only builds in the given tag (with inheritance)")

+ 

+     opts, args = parser.parse_args()

+ 

+     if not args:

+         parser.error('Please specify a policy file')

+     elif len(args) > 1:

+         parser.error('You can only specify one policy file')

+ 

+     if not opts.test:

+         sys.stderr.write('Running without --test -- builds will be moved\n')

+         time.sleep(5)

+ 

+     volume_policy = get_policy(args[0])

+ 

+     koji = _koji.get_profile_module(opts.profile)

+ 

+     for name in ('cert', 'serverca'):

+         value = os.path.expanduser(getattr(koji.config, name))

+         setattr(koji.config, name, value)

+ 

+     top_logger = logging.getLogger("koji")

+     handler = logging.StreamHandler(sys.stdout)

+     handler.setLevel(logging.DEBUG)

+     handler.setFormatter(logging.Formatter('%(asctime)s %(process)s [%(levelname)s] %(name)s: %(message)s'))

+     top_logger.addHandler(handler)

+     if opts.debug:

+         top_logger.setLevel(logging.DEBUG)

+     elif opts.quiet:

+         top_logger.setLevel(logging.ERROR)

+     elif opts.verbose:

+         top_logger.setLevel(logging.INFO)

+     else:

+         top_logger.setLevel(logging.WARN)

+ 

+     Process = multiprocessing.Process

+     if opts.with_profiler:

+         Process = ProfiledProcess

+ 

+     tracker = StatsTracker()

+     # still a thread

+     s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,))

+     s_thread.daemon = True

+     s_thread.start()

+ 

+     # start feeder

+     feeder = Process(name='feeder', target=feeder_main,

+                                args=(opts, args))

+     feeder.daemon = True

+     feeder.start()

+ 

+     # start workers

+     workers = []

+     for i in range(opts.jobs):

+         worker = Process(name='worker %i' % i, target=worker_main,

+                                    args=(opts,))

+         worker.daemon = True

+         worker.start()

+         workers.append(worker)

+ 

+     # input thread

+     i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,))

+     i_thread.daemon = True

+     i_thread.start()

+ 

+     # feeder will be the first to finish

+     feeder.join()

+     feeder_done.set()

+ 

+     # wait for the queue to be empty

+     queue.close()

+     queue.join_thread()

+ 

+     assert queue.empty()

+ 

+     logger.info('Finished. Waiting for workers to stop.')

+     for worker in workers:

+         worker.join()

+     logger.info('Workers finished')

+ 

+     s_thread.join()

+     tracker.report()

+     if tracker.failed_builds:

+         sys.exit(1)

+ 

+ 

+ def get_session():

+     """Get a subsession if logged in, clone session if not"""

+     session_opts = koji.grab_session_options(koji.config)

+     session_opts['anon_retry'] = True

+     session_opts['offline_retry'] = True

+     return koji.ClientSession(koji.config.server, session_opts)

+ 

+ 

+ def get_policy(filename):

+     tests = koji.policy.findSimpleTests([globals(), vars(koji.policy)])

+     with open(filename) as fp:

+         return koji.policy.SimpleRuleSet(fp.readlines(), tests)

+ 

+ 

+ class ProfiledProcess(multiprocessing.Process):

+ 

+     profile_lock = multiprocessing.Lock()

+ 

+     def run(self):

+         profiler = cProfile.Profile()

+         try:

+             return profiler.runcall(multiprocessing.Process.run, self)

+         finally:

+             with self.profile_lock:

+                 profiler.print_stats(sort='cumulative')

+ 

+ 

+ def input_thread(tracker):

+     '''Show stats if user hits enter'''

+     while True:

+         sys.stdin.readline()

+         tracker.report(sys.stderr)

+ 

+ 

+ # queue for stats

+ stats_queue = multiprocessing.Queue()

+ 

+ 

+ def stats_thread(tracker):

+     '''Handle stats queue'''

+     # this one remains a thread

+     while True:

+         try:

+             method, args, kw = stats_queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if feeder_done.is_set() and queue.empty():

+                 # is this enough?

+                 break

+             continue

+         if method not in ('increment', 'fail_build'):

+             raise ValueError('Invalid stats method: %s' % method)

+         handler = getattr(tracker, method)

+         handler(*args, **kw)

+ 

+ 

+ class StatsProxy(object):

+ 

+     def increment(self, *args, **kw):

+         stats_queue.put(['increment', args, kw])

+ 

+     def fail_build(self, *args, **kw):

+         stats_queue.put(['fail_build', args, kw])

+ 

+ 

+ stats = StatsProxy()

+ 

+ 

+ class StatsTracker(object):

+ 

+     def __init__(self):

+         self.counters = {}

+         self.start = time.time()

+         self.failed_builds = {}

+ 

+     def increment(self, name, delta=1):

+         # optimizing for the default case

+         try:

+             self.counters[name] += delta

+         except KeyError:

+             self.counters[name] = delta

+ 

+     def fail_build(self, build, key):

+         build_id = build['build_id']

+         if build_id in self.failed_builds:

+             return

+         self.failed_builds[build_id] = [build['nvr'], key]

+         logger.warn('Action failed: %s (%s)', build['nvr'], key)

+ 

+     def report(self, fp=None):

+         if fp is None:

+             fp = sys.stdout

+         elapsed = time.time() - self.start

+         fp.write('Elapsed time: %i seconds\n' % elapsed)

+         n_bytes = 0

+         b_time = 0

+         for key in sorted(self.counters):

+             val = self.counters[key]

+             if key.endswith('.bytes'):

+                 n_bytes += val

+             elif key.endswith('.time'):

+                 b_time += val

+             fp.write('%13i  %s\n' % (val, key))

+         fp.write('%13i  failed builds\n' % len(self.failed_builds))

+         fp.write('Bytes average throughput: %s\n' % format_bw(n_bytes, b_time))

+         fp.write('Bytes overall throughput: %s\n' % format_bw(n_bytes, elapsed))

+ 

+ 

+ def feeder_main(opts, args):

+     '''Fetch builds and feed them into the queue'''

+     global session

+     session = get_session()

+ 

+     for i, build in enumerate(get_builds(opts, args), start=1):

+         while queue.qsize() > 1000:

+             # avoid overloading the queue

+             logger.debug('feeder waiting. queue is large')

+             time.sleep(5)

+         stats.increment('build.queued')

+         queue.put([build, opts])

+         logger.debug("%i: queueing build %s", i, build['nvr'])

+     logger.info('%i builds queued', i)

+     queue.close()

+ 

+ 

+ def get_builds(options, args):

+     '''Yield all requested builds'''

+     if options.buildid:

+         try:

+             buildid = int(options.buildid)

+         except ValueError:

+             buildid = options.buildid

+         binfo = session.getBuild(buildid, strict=True)

+         yield binfo

+         return

+     if options.tag:

+         for binfo in get_tagged_builds(options, args):

+             yield binfo

+         return

+     chunksize = 10000

+     opts = {}

+     opts['queryOpts'] = {'order': 'build.id', 'offset': 0, 'limit': chunksize}

+     opts['state'] = koji.BUILD_STATES['COMPLETE']

+     for key in ('type', 'prefix'):

+         value = getattr(options, key)

+         if value is not None:

+             opts[key] = value

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['packageID'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['userID'] = user['id']

+     if options.volume:

+         try:

+             opts['volumeID'] = int(options.volume)

+         except ValueError:

+             volumes = session.listVolumes()

+             volumeID = None

+             for volume in volumes:

+                 if options.volume == volume['name']:

+                     volumeID = volume['id']

+             if volumeID is None:

+                 raise ValueError("Invalid volume option")

+             opts['volumeID'] = volumeID

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         try:

+             ts = float(val)

+             setattr(options, opt, ts)

+             continue

+         except ValueError:

+             pass

+         try:

+             dt = dateutil.parser.parse(val)

+             ts = time.mktime(dt.timetuple())

+             setattr(options, opt, ts)

+         except:

+             raise ValueError("Invalid time specification: %s" % val)

+     if options.before:

+         opts['completeBefore'] = getattr(options, 'before')

+     if options.after:

+         opts['completeAfter'] = getattr(options, 'after')

+ 

+     while True:

+         chunk = session.listBuilds(**opts)

+         if not chunk:

+             break

+         opts['queryOpts']['offset'] += chunksize

+         for build in chunk:

+             yield build

+ 

+ 

+ def get_tagged_builds(options, args):

+     opts = {'inherit': True}

+     opts['tag'] = options.tag

+     if options.type is not None:

+         opts['type'] = options.type

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['package'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['owner'] = user['id']

+     if options.volume:

+         raise ValueError('The --volume option is incompatible with --tag')

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         raise ValueError('The --%s option is incompatible with --tag' % opt)

+ 

+     return session.listTagged(**opts)

+ 

+ 

+ def worker_main(opts):

+     '''Handle tasks in queue'''

+     global session

+     session = get_session()

+     if not opts.test:

+         activate_session(session, koji.config)

+ 

+     while True:

+         try:

+             build, opts = queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if feeder_done.is_set():

+                 # is this enough?

+                 break

+             continue

+         try:

+             checker = BuildHandler(build, opts)

+             checker.run()

+         except Exception:

+             stats.fail_build(build, 'unknown_error')

+             logger.exception('Unexpected error')

+ 

+ 

+ def format_bw(n_bytes, seconds):

+     if seconds == 0:

+         return '??? Mbytes/sec'

+     return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024)

+ 

+ 

+ class BuildHandler(object):

+ 

+     def __init__(self, build, options):

+         stats.increment('build.checked')

+         self.build = build

+         self.options = options

+ 

+     def run(self):

+         try:

+             newvol = self.check_policy()

+         except Exception:

+             logger.exception('Error in policy')

+             stats.fail_build(self.build, 'policy_error')

+             return

+         self.set_volume(newvol)

+ 

+     def set_volume(self, newvol):

+         if newvol is None:

+             logger.debug('Build %s: no match', self.build['nvr'])

+             stats.increment('policy.nomatch')

+             return

+         if newvol == 'skip':

+             logger.debug('Build %s: skipped', self.build['nvr'])

+             stats.increment('policy.skipped')

+             return

+         if newvol == self.build['volume_name']:

+             logger.debug('Build %s: same volume', self.build['nvr'])

+             stats.increment('policy.same_volume')

+             return

+         if not self.check_use(newvol):

+             # check_use records the failure

+             return

+ 

+         # change the volume

+         start = time.time()

+         try:

+             if not self.options.test:

+                 session.changeBuildVolume(self.build['build_id'], newvol)

+         except Exception:

+             logger.exception('Move failed: %(nvr)s', self.build)

+             stats.fail_build(self.build, 'move_failed')

+             return

+ 

+         # record stats

+         elapsed = time.time() - start

+         size = self.get_size()

+         stats.increment('policy.volume_changed')

+         logger.info('Build %s: %s -> %s', self.build['nvr'],

+                 self.build['volume_name'], newvol)

+         stats.increment('volume.%s.builds_added' % newvol)

+         stats.increment('volume.%s.builds_removed' % self.build['volume_name'])

+         stats.increment('moved.bytes', size)

+         stats.increment('moved.time', elapsed)

+         stats.increment('volume.%s.bytes_added' % newvol, size)

+         stats.increment('volume.%s.bytes_removed' % self.build['volume_name'], size)

+ 

+     def check_use(self, newvol):

+         """Check usage on target volume

+ 

+         Returns True if we can proceed with the move

+         """

+ 

+         if not self.options.max_use:

+             # check not enabled

+             return True

+         path = '%s/packages' % koji.pathinfo.volumedir(newvol)

+         if not os.path.exists(path):

+             logger.error("No such directory: %s", path)

+             stats.fail_build(self.build, 'volume_missing')

+             return False

+         fs_stat = os.statvfs(path)

+         avail = fs_stat.f_bavail * fs_stat.f_bsize

+         total = fs_stat.f_blocks * fs_stat.f_bsize

+         if 100.0 * (total - avail) / total > self.options.max_use:

+             stats.fail_build(self.build, 'volume_max_use')

+             return False

+         return True

+ 

+     def check_policy(self):

+         data = {

+                 'build': self.build,

+                 'package': self.build['name'],

+                 'version': self.build['version'],

+                 'release': self.build['release'],

+                 'volume_name': self.build['volume_name'],

+                 'source': self.build['source'] or 'NOSOURCE',

+                 'handler': self,

+                 }

+         data = koji.util.LazyDict(data)

+         data.lazyset('rpms', self.get_rpms, ())

+         data.lazyset('archives', self.get_archives, ())

+         data.lazyset('buildroots', self.get_brs, ())

+         data.lazyset('cg_list', self.get_cgs, ())

+         data.lazyset('build_tags', self.get_build_tags, ())

+         data.lazyset('tags', self.get_tags, ())

+         try:

+             result = volume_policy.apply(data)

+         except Exception:

+             logger.error('policy data: %r', data)

+             raise

+         return result

+ 

+     def get_rpms(self):

+         '''Get rpms for build'''

+         if not hasattr(self, '_rpms'):

+             self._rpms = session.listRPMs(buildID=self.build['build_id'])

+         return self._rpms

+ 

+     def get_archives(self):

+         if not hasattr(self, '_archives'):

+             self._archives = session.listArchives(buildID=self.build['build_id'])

+         return self._archives

+ 

+     def get_size(self):

+         return sum([int(c['size']) for c in self.get_rpms() + self.get_archives()])

+ 

+     def get_brs(self):

+         """Determine content generators from policy data"""

+         if not hasattr(self, '_buildroots'):

+             rpm_brs = [r['buildroot_id'] for r in self.get_rpms()]

+             archive_brs = [a['buildroot_id'] for a in self.get_archives()]

+             self._buildroots = set(rpm_brs + archive_brs)

+         return self._buildroots

+ 

+     def get_brinfo(self):

+         """Get detailed buildroot info

+ 

+         Returns a list of buildroot infos

+         The list will include a None if any components lack buildroot info

+         """

+         if hasattr(self, '_brinfo'):

+             return self._brinfo

+         brs = self.get_brs()

+         result = []

+         session.multicall = True

+         for br_id in brs:

+             if br_id is None:

+                 result.append(None)

+             else:

+                 session.getBuildroot(br_id, strict=True)

+         for [brinfo] in session.multiCall(strict=True):

+             result.append(brinfo)

+         self._brinfo = result

+         return result

+ 

+     def get_cgs(self):

+         if not hasattr(self, '_cgs'):

+             cgs = set()

+             for brinfo in self.get_brinfo():

+                 if brinfo is None:

+                     cgs.add(None)

+                 else:

+                     cgs.add(brinfo['cg_name'])

+             self._cgs = cgs

+         return self._cgs

+ 

+     def get_build_tags(self):

+         if not hasattr(self, '_build_tags'):

+             tags = set()

+             for brinfo in self.get_brinfo():

+                 if brinfo is None:

+                     tags.add(None)

+                 else:

+                     tags.add(brinfo['tag_name'])

+             self._build_tags = tags

+         return self._build_tags

+ 

+     def get_tags(self):

+         if not hasattr(self, '_tags'):

+             self._tags = session.listTags(build=self.build['build_id'])

+         return self._tags

+ 

+ 

+ # Policy tests

+ 

+ # we are emulating the build-oriented tests from the hub code, presuming

+ # that the policy data is simple the build info

+ 

+ 

+ class PackageTest(koji.policy.MatchTest):

+     """Checks package against glob patterns"""

+     name = 'package'

+     field = 'package'

+ 

+ 

+ class VolumeTest(koji.policy.MatchTest):

+     """Checks volume against glob patterns"""

+     name = 'volume'

+     field = 'volume_name'

+ 

+ 

+ class SourceTest(koji.policy.MatchTest):

+     """Match build source

+ 

+     True if build source matches any of the supplied patterns

+     """

+     name = 'source'

+     field = 'source'

+ 

+ 

+ class CGMatchAnyTest(koji.policy.BaseSimpleTest):

+     """Checks content generator against glob patterns

+ 

+     The 'any' means that if any of the cgs for the build (there can be more

+     than one) match the pattern list, then the result is True

+     """

+ 

+     name = 'cg_match_any'

+ 

+     def run(self, data):

+         # we need to find the volume name from the base data

+         cgs = data['cg_list']

+         patterns = self.str.split()[1:]

+         for cg_name in cgs:

+             if cg_name is None:

+                 # component with no br, or br with no cg

+                 continue

+             if multi_fnmatch(cg_name, patterns):

+                 return True

+         # else

+         return False

+ 

+ 

+ class CGMatchAllTest(koji.policy.BaseSimpleTest):

+     """Checks content generator against glob patterns

+ 

+     The 'all' means that all of the cgs for the build (there can be more

+     than one) must match the pattern list for the result to be true.

+     """

+ 

+     name = 'cg_match_all'

+ 

+     def run(self, data):

+         # we need to find the volume name from the base data

+         cgs = data['cg_list']

+         if not cgs:

+             return False

+         patterns = self.str.split()[1:]

+         for cg_name in cgs:

+             if cg_name is None:

+                 return False

+             if not multi_fnmatch(cg_name, patterns):

+                 return False

+         # else

+         return True

+ 

+ 

+ class HasTagTest(koji.policy.BaseSimpleTest):

+     """Check to see if build (currently) has a given tag"""

+ 

+     name = 'hastag'

+ 

+     def run(self, data):

+         tags = data['tags']

+         # True if any of these tags match any of the patterns

+         args = self.str.split()[1:]

+         for tag in tags:

+             for pattern in args:

+                 if fnmatch.fnmatch(tag['name'], pattern):

+                     return True

+         # otherwise...

+         return False

+ 

+ 

+ class BuildTagTest(koji.policy.BaseSimpleTest):

+     """Check the build tag(s) of the build

+ 

+     If build_tag is not provided in policy data, it is determined by the

+     buildroots of the component rpms

+     """

+ 

+     name = 'buildtag'

+ 

+     def run(self, data):

+         args = self.str.split()[1:]

+         for tagname in data['build_tags']:

+             if tagname is None:

+                 # content generator buildroots might not have tag info

+                 continue

+             if multi_fnmatch(tagname, args):

+                 return True

+         # otherwise...

+         return False

+ 

+ 

+ class ImportedTest(koji.policy.BaseSimpleTest):

+     """Check if any part of a build was imported

+ 

+     This is determined by checking the buildroots of the rpms and archives

+     True if any of them lack a buildroot (strict)"""

+ 

+     name = 'imported'

+ 

+     def run(self, data):

+         # no test args

+         for rpminfo in data['rpms']:

+             if rpminfo['buildroot_id'] is None:

+                 return True

+         for archive in data['archives']:

+             if archive['buildroot_id'] is None:

+                 return True

+         # otherwise...

+         return False

+ 

+ 

+ if __name__ == '__main__':

+     main()

This new script applies a volume policy to all builds in the system, or to a subset of builds.

1 new commit added

  • ensure sizes are ints before adding
5 years ago

Commit a664aa7 fixes this pull-request

Pull-Request has been merged by mikem

5 years ago
Metadata