#8 koji-check-builds script
Merged 6 years ago by mikem. Opened 6 years ago by mikem.
mikem/koji-tools checkfiles  into  master

comments
Mike McLean • 6 years ago  
@@ -0,0 +1,683 @@ 

+ #!/usr/bin/python

+ 

+ import Queue   # for exceptions

+ import cProfile

+ from functools import partial

+ import logging

+ import multiprocessing

+ import optparse

+ import os

+ import sys

+ import threading

+ import time

+ 

+ import dateutil

+ import koji as _koji  # koji declared using profile module in main()

+ from koji.util import md5_constructor

+ from koji.util import sha1_constructor

+ import rpm

+ 

+ 

+ logger = logging.getLogger('koji.checkbuilds')

+ 

+ # an event to indicate that the feeder is done

+ feeder_done = multiprocessing.Event()

+ 

+ # queue to hold actions in the pipeline

+ queue = multiprocessing.Queue()

+ 

+ 

+ def main():

+     global koji

+     parser = optparse.OptionParser(usage='%prog [options]')

+     parser.add_option('-p', '--profile', default='koji',

+                       help='pick a profile')

+     parser.add_option('-j', '--jobs', default=5, type='int',

+                       help='worker count')

+     parser.add_option('--with-profiler', action='store_true',

+                       help='use python profiler')

+ 

+     # verbosity options

+     parser.add_option("-d", "--debug", action="store_true",

+                       help="show debug output")

+     parser.add_option("-v", "--verbose", action="store_true",

+                       help="show verbose output")

+     parser.add_option("-q", "--quiet", action="store_true", default=False,

+                       help="run quietly")

+ 

+     # build selection options

+     parser.add_option("--buildid", help="Check specific build from ID or nvr")

+     parser.add_option("--package", help="Check builds for this package")

+     parser.add_option("--before",

+                       help="Check builds built before this time")

+     parser.add_option("--after",

+                       help="Check builds built after this time")

+     parser.add_option("--type", help="Check builds of this type.")

+     parser.add_option("--owner", help="Check builds built by this owner")

+     parser.add_option("--volume", help="Check builds by volume ID")

+     parser.add_option("--prefix", help="Only check packages starting with this prefix")

+     parser.add_option("--tag", help="Only builds in the given tag (with inheritance)")

+ 

+     # options for what to check

+     parser.add_option("--ignore-strays", action='store_true',

+                       help="Ignore stray files")

+     parser.add_option("--ignore-rpm-size", action='store_true',

+                       help="Ignore rpm size")

+     # ^ if an rpm has been signed in place, the sigmd5 will still match the db,

+     #   but the size will likely change

+     parser.add_option("--no-sums", action='store_true',

+                       help="Don't validate checksums")

+ 

+     opts, args = parser.parse_args()

+ 

+     if args:

+         parser.error('This command takes no arguments. See --help for options')

+ 

+     koji = _koji.get_profile_module(opts.profile)

+ 

+     top_logger = logging.getLogger("koji")

+     handler = logging.StreamHandler(sys.stdout)

+     handler.setLevel(logging.DEBUG)

+     handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s'))

+     top_logger.addHandler(handler)

+     if opts.debug:

+         top_logger.setLevel(logging.DEBUG)

+     elif opts.quiet:

+         top_logger.setLevel(logging.ERROR)

+     elif opts.verbose:

+         top_logger.setLevel(logging.INFO)

+     else:

+         top_logger.setLevel(logging.WARN)

+ 

+     Process = multiprocessing.Process

+     if opts.with_profiler:

+         Process = ProfiledProcess

+ 

+     tracker = StatsTracker()

+     # still a thread

+     s_thread = threading.Thread(name='stats', target=stats_thread, args=(tracker,))

+     s_thread.daemon = True

+     s_thread.start()

+ 

+     # start feeder

+     feeder = Process(name='feeder', target=feeder_main,

+                                args=(opts, args))

+     feeder.daemon = True

+     feeder.start()

+ 

+     # start workers

+     workers = []

+     for i in range(opts.jobs):

+         worker = Process(name='worker %i' % i, target=worker_main,

+                                    args=(opts,))

+         worker.daemon = True

+         worker.start()

+         workers.append(worker)

+ 

+     # input thread

+     i_thread = threading.Thread(name='input', target=input_thread, args=(tracker,))

+     i_thread.daemon = True

+     i_thread.start()

+ 

+     # feeder will be the first to finish

+     feeder.join()

+     feeder_done.set()

+ 

+     # wait for the queue to be empty

+     queue.close()

+     queue.join_thread()  # XXX is this right?

+ 

+     assert queue.empty()

+ 

+     logger.info('Finished. Waiting for workers to stop.')

+     for worker in workers:

+         worker.join()

+     logger.info('Workers finished')

+ 

+     s_thread.join()

+     tracker.report()

+     if tracker.failed_builds:

+         sys.exit(1)

+ 

+ 

+ class ProfiledProcess(multiprocessing.Process):

+ 

+     profile_lock = multiprocessing.Lock()

+ 

+     def run(self):

+         profiler = cProfile.Profile()

+         try:

+             return profiler.runcall(multiprocessing.Process.run, self)

+         finally:

+             with self.profile_lock:

+                 profiler.print_stats(sort='cumulative')

+ 

+ 

+ def input_thread(tracker):

+     '''Show stats if user hits enter'''

+     while True:

+         sys.stdin.readline()

+         tracker.report(sys.stderr)

+ 

+ 

+ def new_session():

+     '''Get a new anonymous session'''

+     session_opts = koji.grab_session_options(koji.config)

+     session_opts['anon_retry'] = True

+     session_opts['offline_retry'] = True

+     return koji.ClientSession(koji.config.server, session_opts)

+ 

+ 

+ # queue for stats

+ stats_queue = multiprocessing.Queue()

+ 

+ 

+ def stats_thread(tracker):

+     '''Handle stats queue'''

+     # this one remains a thread

+     while True:

+         try:

+             method, args, kw = stats_queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if feeder_done.is_set() and queue.empty():

+                 # is this enough?

+                 break

+             continue

+         if method not in ('increment', 'fail_build'):

+             raise ValueError('Invalid stats method: %s' % method)

+         handler = getattr(tracker, method)

+         handler(*args, **kw)

+ 

+ 

+ class StatsProxy(object):

+ 

+     def increment(self, *args, **kw):

+         stats_queue.put(['increment', args, kw])

+ 

+     def fail_build(self, *args, **kw):

+         stats_queue.put(['fail_build', args, kw])

+ 

+ 

+ stats = StatsProxy()

+ 

+ 

+ class StatsTracker(object):

+ 

+     def __init__(self):

+         self.counters = {}

+         self.start = time.time()

+         self.failed_builds = {}

+ 

+     def increment(self, name, delta=1):

+         # optimizing for the default case

+         try:

+             self.counters[name] += delta

+         except KeyError:

+             self.counters[name] = delta

+ 

+     def fail_build(self, build, key):

+         build_id = build['build_id']

+         if build_id in self.failed_builds:

+             return

+         self.failed_builds[build_id] = [build['nvr'], key]

+         logger.warn('Build check failed: %s (%s)', build['nvr'], key)

+ 

+     def report(self, fp=None):

+         if fp is None:

+             fp = sys.stdout

+         elapsed = time.time() - self.start

+         fp.write('Elapsed time: %i seconds\n' % elapsed)

+         n_bytes = 0

+         b_time = 0

+         for key in sorted(self.counters):

+             val = self.counters[key]

+             if key.endswith('.bytes'):

+                 n_bytes += val

+             elif key.endswith('.time'):

+                 b_time += val

+             fp.write('%13i  %s\n' % (val, key))

+         fp.write('%13i  failed builds\n' % len(self.failed_builds))

+         fp.write('Bytes average throughput: %s\n' % format_bw(n_bytes, b_time))

+         fp.write('Bytes overall throughput: %s\n' % format_bw(n_bytes, elapsed))

+ 

+ 

+ def feeder_main(opts, args):

+     '''Fetch builds and feed them into the queu'''

+     global session

+     session = new_session()

+     for i, build in enumerate(get_builds(opts, args), start=1):

+         while queue.qsize() > 1000:

+             # avoid overloading the queue

+             logger.debug('feeder waiting. queue is large')

+             time.sleep(5)

+         stats.increment('build.queued')

+         queue.put([build, opts])

+         logger.debug("%i: queueing build %s", i, build['nvr'])

+     logger.info('%i builds queued', i)

+     queue.close()

+ 

+ 

+ def get_builds(options, args):

+     '''Yield all requested builds'''

+     if options.buildid:

+         try:

+             buildid = int(options.buildid)

+         except ValueError:

+             buildid = options.buildid

+         binfo = session.getBuild(buildid, strict=True)

+         yield binfo

+         return

+     if options.tag:

+         for binfo in get_tagged_builds(options, args):

+             yield binfo

+         return

+     chunksize = 10000

+     opts = {}

+     opts['queryOpts'] = {'order': 'build.id', 'offset': 0, 'limit': chunksize}

+     for key in ('type', 'prefix'):

+         value = getattr(options, key)

+         if value is not None:

+             opts[key] = value

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['packageID'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['userID'] = user['id']

+     if options.volume:

+         try:

+             opts['volumeID'] = int(options.volume)

+         except ValueError:

+             volumes = session.listVolumes()

+             volumeID = None

+             for volume in volumes:

+                 if options.volume == volume['name']:

+                     volumeID = volume['id']

+             if volumeID is None:

+                 raise ValueError("Invalid volume option")

+             opts['volumeID'] = volumeID

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         try:

+             ts = float(val)

+             setattr(options, opt, ts)

+             continue

+         except ValueError:

+             pass

+         try:

+             dt = dateutil.parser.parse(val)

+             ts = time.mktime(dt.timetuple())

+             setattr(options, opt, ts)

+         except:

+             raise ValueError("Invalid time specification: %s" % val)

+     if options.before:

+         opts['completeBefore'] = getattr(options, 'before')

+     if options.after:

+         opts['completeAfter'] = getattr(options, 'after')

+ 

+     while True:

+         chunk = session.listBuilds(**opts)

+         if not chunk:

+             break

+         opts['queryOpts']['offset'] += chunksize

+         for build in chunk:

+             yield build

+ 

+ 

+ def get_tagged_builds(options, args):

+     opts = {'inherit': True}

+     opts['tag'] = options.tag

+     if options.type is not None:

+         opts['type'] = options.type

+     if options.package:

+         try:

+             opts['packageID'] = int(options.package)

+         except ValueError:

+             package = session.getPackageID(options.package)

+             if package is None:

+                 raise ValueError('invalid package option')

+             opts['package'] = package

+     if options.owner:

+         try:

+             opts['userID'] = int(options.owner)

+         except ValueError:

+             user = session.getUser(options.owner)

+             if user is None:

+                 raise ValueError("Invalid owner option")

+             opts['owner'] = user['id']

+     if options.volume:

+         raise ValueError('The --volume option is incompatible with --tag')

+     for opt in ('before', 'after'):

+         val = getattr(options, opt)

+         if not val:

+             continue

+         raise ValueError('The --%s option is incompatible with --tag' % opt)

+ 

+     return session.listTagged(**opts)

+ 

+ 

+ def worker_main(opts):

+     '''Handle tasks in queue'''

+     global session

+     session = new_session()

+     while True:

+         try:

+             build, opts = queue.get(block=True, timeout=5)

+         except Queue.Empty:

+             if feeder_done.is_set():

+                 # is this enough?

+                 break

+             continue

+         try:

+             checker = BuildChecker(build, opts)

+             checker.check()

+         except Exception:

+             stats.fail_build(build, 'unknown_error')

+             logger.exception('Unexpected error')

+ 

+ 

+ def format_bw(n_bytes, seconds):

+     if seconds == 0:

+         return '??? Mbytes/sec'

+     return '%.3f Mbytes/sec' % (n_bytes/seconds/1024/1024)

+ 

+ 

+ class BuildChecker(object):

+ 

+     def __init__(self, build, options):

+         stats.increment('build.started')

+         self.build = build

+         self.options = options

+ 

+     def check(self):

+         if not self.check_build():

+             return

+         self.check_rpms()

+         self.check_rpm_sigs()

+         if not self.options.no_sums:

+             self.verify_rpms()

+         self.check_archives()

+         if not self.options.no_sums:

+             self.verify_archives()

+         stats.increment('build.done')

+ 

+     def check_build(self):

+         '''Initial build check, plus queue deeper checks'''

+         build = self.build

+         state = koji.BUILD_STATES[build['state']]

+         self.build_dir = koji.pathinfo.build(build)

+         if state == 'BUILDING':

+             # ignore these

+             return False

+         elif state in ('FAILED', 'DELETED', 'CANCELED'):

+             if not self.options.ignore_strays and os.path.isdir(self.build_dir):

+                 logger.warn('Stray build directory: %s (build is %s)',

+                              self.build_dir, state)

+                 self.fail('build.stray_dir')

+             # don't check further for these

+             return False

+         elif state == 'COMPLETE':

+             if not os.path.isdir(self.build_dir):

+                 logger.warn('Build directory missing: %s', self.build_dir)

+                 self.fail('build.missing_dir')

+                 return False

+             return True

+         else:

+             # should not happen

+             raise ValueError('Build state is %s' % state)

+ 

+     def fail(self, key=None):

+         stats.fail_build(self.build, key)

+         if key:

+             stats.increment(key)

+ 

+     def get_rpms(self):

+         '''Get rpms to check'''

+         rpms = []

+         for rpminfo in session.listRPMs(buildID=self.build['build_id']):

+             fn = '%s/%s' % (self.build_dir, koji.pathinfo.rpm(rpminfo))

+             rpminfo['_fn'] = fn

+             if rpminfo['metadata_only']:

+                 if os.path.lexists(fn):

+                     logger.warn('Metadata-only rpm is present: %s', fn)

+                     self.fail('rpm.metadata_only_exists')

+                 logger.debug('Skipping metadata-only rpm: %s', fn)

+                 continue

+             rpms.append(rpminfo)

+         self.rpms = rpms

+         return rpms

+ 

+     def check_rpms(self):

+         self.get_rpms()

+         for rpminfo in self.rpms:

+             stats.increment('rpm.checked')

+             fn = rpminfo['_fn']

+             try:

+                 st = os.stat(fn)

+             except OSError as ex:

+                 if ex.errno == 2:

+                     logger.warn('Missing rpm: %s', fn)

+                     self.fail('rpm.missing')

+                     continue

+                 raise

+             db_size = int(rpminfo['size'])  # might be string

+             rpminfo['_size'] = st.st_size

+             if not self.options.ignore_rpm_size and st.st_size != db_size:

+                 logger.warn('Wrong size for: %s\n'

+                             '  db: %s, file: %s',

+                             fn, db_size, st.st_size)

+                 self.fail('rpm.wrong_size')

+             try:

+                 hdr = koji.get_rpm_header(fn)

+                 # ^NOTE: this call does not *verify*

+             except Exception:

+                 logger.warn('Unable to read header for: %s', fn)

+                 self.fail('rpm.bad_header')

+                 continue

+             sigmd5 = koji.hex_string(hdr[rpm.RPMTAG_SIGMD5])

+             if rpminfo['payloadhash'] != sigmd5:

+                 logger.warn('Wrong sigmd5 for: %s\n'

+                             '  db: %s, file: %s',

+                             fn, rpminfo['payloadhash'], sigmd5)

+                 self.fail('rpm.bad_hash')

+ 

+     def verify_rpms(self):

+         '''Actually verify the embedded checksums'''

+         # should we just combine this with the earlier header check?

+         ts = rpm.TransactionSet()

+         ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)

+         for rpminfo in self.rpms:

+             if '_size' not in rpminfo:

+                 # doesn't exist

+                 continue

+             fn = rpminfo['_fn']

+             self.verify_rpm(fn, ts, rpminfo['_size'])

+ 

+     def verify_rpm(self, fn, ts, n_bytes):

+         logger.debug('Verifying rpm %s', fn)

+         start = time.time()

+         with open(fn, 'r') as fp:

+             try:

+                 ts.hdrFromFdno(fp.fileno())

+             except rpm.error as ex:

+                 logger.warn('Could not verify rpm %s: %s', fn, ex)

+                 self.fail('rpm.failed_verify')

+             except Exception as ex:

+                 logger.exception("Error verifying rpm: %s", fn)

+                 self.fail('rpm.failed_verify')

+         elapsed = time.time() - start

+         stats.increment('rpm.bytes', n_bytes)

+         stats.increment('rpm.time', elapsed)

+         logger.debug('RPM verify %i bytes at %s',

+                      n_bytes, format_bw(n_bytes, elapsed))

+ 

+     def check_rpm_sigs(self):

+         '''Validate signature data on disc'''

+         build = self.build

+         sig_idx = {}

+         session.multicall = True

+         for rpminfo in self.rpms:

+             session.queryRPMSigs(rpm_id=rpminfo['id'])

+         for rpminfo, [sigs] in zip(self.rpms, session.multiCall(strict=True)):

+             sigs = session.queryRPMSigs(rpm_id=rpminfo['id'])

+             for sig in sigs:

+                 sig_idx.setdefault(sig['sigkey'], []).append([rpminfo, sig])

+         logger.debug('Keys for %s: %s', build['nvr'], sig_idx.keys())

+         for sigkey in sig_idx:

+             cachedir = os.path.join(self.build_dir, 'data/sigcache/%s' % sigkey)

+             if not os.path.isdir(cachedir):

+                 logger.warning("Signature cache dir missing: %s", cachedir)

+                 self.fail('sigcachedir.missing')

+                 continue

+             # else

+             for rpminfo, sig in sig_idx[sigkey]:

+                 stats.increment('sigcache.checked')

+                 cachefile = os.path.join(self.build_dir, koji.pathinfo.sighdr(rpminfo, sigkey))

+                 if not os.path.isfile(cachefile):

+                     logger.warn("Cached signature missing: %s", cachefile)

+                     self.fail('sigcache.missing')

+                     continue

+                 sighash = md5_constructor(file(cachefile).read()).hexdigest()

+                 if sighash != sig['sighash']:

+                     logger.warn('Cached signature mismatch for %s\n'

+                             '  db: %s, file:%s',

+                             cachefile, sig['sighash'], sighash)

+                     self.fail('sigcache.mismatch')

+         signed_to_check = []

+         for sigkey in sig_idx:

+             signeddir = os.path.join(self.build_dir, 'data/signed/%s' % sigkey)

+             if not os.path.isdir(signeddir):

+                 # ok - signed copies are temporary

+                 continue

+             for rpminfo, sig in sig_idx[sigkey]:

+                 signed = os.path.join(self.build_dir, koji.pathinfo.signed(rpminfo, sigkey))

+                 if not os.path.exists(signed):

+                     # still ok

+                     continue

+                 signed_to_check.append(signed)

+                 # check that sig header matches

+                 stats.increment('sigheader.checked')

+                 start = time.time()

+                 hdr = koji.rip_rpm_sighdr(signed)

+                 sighash = md5_constructor(hdr).hexdigest()

+                 elapsed = time.time() - start

+                 stats.increment('sigheader.bytes', len(hdr))

+                 stats.increment('sigheader.time', elapsed)

+                 if sighash != sig['sighash']:

+                     logger.warn('Signed copy sighdr mismatch: %s\n'

+                             '  db: %s, file: %s',

+                             signed, sig['sighash'], sighash)

+                     self.fail('signedcopy.mismatch')

+         if not self.options.no_sums:

+             ts = rpm.TransactionSet()

+             ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)

+             logger.debug('Verifying %i signed copies for %s',

+                     len(signed_to_check), self.build['nvr'])

+             for fn in signed_to_check:

+                 size = os.stat(fn).st_size

+                 stats.increment('signed_copy.check')

+                 self.verify_rpm(fn, ts, size)

+ 

+     def check_archives(self):

+         build = self.build

+         # first gather archives

+         archives = {}

+         for legacy in ['maven', 'win', 'image']:

+             for archive in session.listArchives(buildID=build['build_id'], type=legacy):

+                 archives.setdefault(archive['id'], archive)

+         for archive in session.listArchives(buildID=build['build_id']):

+             archives.setdefault(archive['id'], archive)

+ 

+         logger.debug('Found %i archives for %s', len(archives), self.build['nvr'])

+         self.archives = []

+         for archive in archives.values():

+             stats.increment('archive.checked')

+             if archive['btype'] == 'maven':

+                 fn = '%s/%s' % (koji.pathinfo.mavenbuild(build),

+                         koji.pathinfo.mavenfile(archive))

+             elif archive['btype'] == 'win':

+                 fn = '%s/%s' % (koji.pathinfo.winbuild(build),

+                         koji.pathinfo.winfile(archive))

+             elif archive['btype'] == 'image':

+                 fn = '%s/%s' % (koji.pathinfo.imagebuild(build),

+                         archive['filename'])

+             else:

+                 fn = '%s/%s' % (koji.pathinfo.typedir(build, archive['btype']),

+                         archive['filename'])

+             archive['_fn'] = fn

+             if archive['metadata_only']:

+                 if os.path.lexists(fn):

+                     logger.warn('Metadata-only archive is present: %s', fn)

+                     self.fail('archive.metadata_only_exists')

+                 logger.debug('Skipping metadata-only archive: %s', fn)

+                 continue

+             try:

+                 st = os.stat(fn)

+             except OSError as ex:

+                 if ex.errno == 2:

+                     logger.warn('Missing archive: %s', fn)

+                     self.fail('archive.missing')

+                     continue

+                 raise

+             self.archives.append(archive)

+             db_size = int(archive['size'])  # might be string

+             if st.st_size != db_size:

+                 logger.warn('Wrong size for: %s\n'

+                             '  db: %i, file: %i',

+                             fn, db_size, st.st_size)

+                 self.fail('archive.wrong_size')

+ 

+     def verify_archives(self):

+         logger.debug('Checking %i archive checksums for %s', len(self.archives),

+                 self.build['nvr'])

+         for archive in self.archives:

+             self.verify_checksum(archive['_fn'], archive['checksum_type'],

+                     archive['checksum'])

+ 

+     def verify_checksum(self, fn, sumtype, expect):

+         try:

+             sumtype = koji.CHECKSUM_TYPES[sumtype]

+         except KeyError:

+             logger.error('Unknown sum type %s for %s', sumtype, fn)

+             return

+         if sumtype == 'md5':

+             chk = md5_constructor()

+         elif sumtype == 'sha1':

+             chk = sha1_constructor()

+         else:

+             logger.error('Unsupported sum type %s for %s', sumtype, fn)

+             return

+         logger.debug('Checking %s for %s', sumtype, fn)

+         stats.increment('checksum.checked')

+         start = time.time()

+         with file(fn, 'r') as fp:

+             chunks = iter(partial(fp.read, 819200), b'')

+             [chk.update(b) for b in chunks]

+             n_bytes = fp.tell()

+         elapsed = time.time() - start

+         stats.increment('checksum.bytes', n_bytes)

+         stats.increment('checksum.time', elapsed)

+         logger.debug('Summed %i bytes at %s',

+                      n_bytes, format_bw(n_bytes, elapsed))

+         value = chk.hexdigest()

+         if value != expect:

+             logger.warn('Checksum mismatch (%s) for %s\n'

+                     '  db: %s, file: %s',

+                     sumtype, fn, expect, value)

+             self.fail('checksum.bad')

+ 

+ 

+ if __name__ == '__main__':

+     main()

This script checks the build data on disc against the data in the database

Commit d9342c7 fixes this pull-request

Pull-Request has been merged by mikem

6 years ago
Metadata