From e31ecea1c8decadda97c5c35cd3bc09b0e28a5a8 Mon Sep 17 00:00:00 2001 From: Kevin Fenzi Date: Feb 19 2021 22:28:26 +0000 Subject: koji / kojira: switch back to the shipped version of kojira We had a patched version, but the patches have been merged upstream so we should switch back to using the 'stock' kojira. Signed-off-by: Kevin Fenzi --- diff --git a/roles/koji_hub/files/kojira b/roles/koji_hub/files/kojira deleted file mode 100755 index 3def615..0000000 --- a/roles/koji_hub/files/kojira +++ /dev/null @@ -1,1213 +0,0 @@ -#!/usr/bin/python3 - -# Koji Repository Administrator (kojira) -# Copyright (c) 2005-2014 Red Hat, Inc. -# -# Koji is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; -# version 2.1 of the License. -# -# This software is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this software; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: -# Mike McLean - -from __future__ import absolute_import, division - -import errno -import json -import logging -import logging.handlers -import os -import pprint -import signal -import stat -import sys -import threading -import time -import traceback -from optparse import OptionParser -from xml.etree import ElementTree - -import requests -import six - -import koji -from koji.util import deprecated, parseStatus, rmtree, to_list - -tag_cache = {} - - -def getTag(session, tag, event=None): - """A caching version of the hub call""" - cache = tag_cache - now = time.time() - if (tag, event) in cache: - ts, info = cache[(tag, event)] - if now - ts < 600: - # use the cache - return info - info = session.getTag(tag, event=event) - if info: - cache[(info['id'], event)] = (now, info) - cache[(info['name'], event)] = (now, info) - return info - - -class ManagedRepo(object): - - def __init__(self, manager, data, repodata=None): - self.manager = manager - self.session = manager.session - self.options = manager.options - self.logger = logging.getLogger("koji.repo") - self.current = True - self.repo_id = data['id'] - self.event_id = data['create_event'] - self.event_ts = data['create_ts'] - self.tag_id = data['tag_id'] - self.state = data['state'] - if 'dist' in data: - self._dist = data['dist'] - self.tag_name = data['tag_name'] - self.expire_ts = None - if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']: - self.current = False - self._find_expire_time(repodata) - # TODO use hub data to find the actual expiration time - self.first_seen = time.time() - if self.current: - order = self.session.getFullInheritance(self.tag_id, event=self.event_id) - # order may contain same tag more than once - tags = {self.tag_id: 1} - for x in order: - tags[x['parent_id']] = 1 - self.taglist = to_list(tags.keys()) - - def _find_expire_time(self, repodata): - # find all newer repos for same tag and set oldest as expire_ts for our repo - if repodata: - repos = [r for r in repodata if - r['tag_id'] == self.tag_id and r['create_event'] > self.event_id] - if repos: - invalidated_by = sorted(repos, key=lambda x: x['create_event']) - self.expire_ts = invalidated_by[0]['create_ts'] - if not self.expire_ts: - self.expire_ts = time.time() - - @property - def dist(self): - # TODO: remove this indirection once we can rely on the hub to return - # dist field in getActiveRepos - if hasattr(self, '_dist'): - return self._dist - rinfo = self.session.repoInfo(self.repo_id) - self._dist = rinfo['dist'] - - def get_info(self): - "Fetch data from repo.json" - path = self.get_path() - if not path: - # can this be an error yet? - return None - fn = '%s/repo.json' % path - if not os.path.exists(fn): - self.logger.warning('Repo info file missing: %s', fn) - return None - with open(fn, 'r') as fp: - return json.load(fp) - - def get_path(self, volume=None): - """Return the path to the repo directory""" - tag_info = getTag(self.session, self.tag_id) - if not tag_info: - tag_info = getTag(self.session, self.tag_id, self.event_id) - if not tag_info: - self.logger.warning('Could not get info for tag %i, referenced by repo %i' % - (self.tag_id, self.repo_id)) - return None - tag_name = tag_info['name'] - if self.dist: - path = pathinfo.distrepo(self.repo_id, tag_name, volume=volume) - else: - # currently only dist repos can be on another volume - path = pathinfo.repo(self.repo_id, tag_name) - return path - - def expire(self): - """Mark the repo expired""" - if self.state == koji.REPO_EXPIRED: - return - elif self.state == koji.REPO_DELETED: - raise koji.GenericError("Repo already deleted") - self.logger.info("Expiring repo %s.." % self.repo_id) - self.session.repoExpire(self.repo_id) - self.state = koji.REPO_EXPIRED - - def expired(self): - return self.state == koji.REPO_EXPIRED - - def pending(self, timeout=180): - """Determine if repo generation appears to be in progress and not already obsolete""" - if self.state != koji.REPO_INIT: - return False - age = time.time() - self.event_ts - return self.current and age < timeout - - def stale(self): - """Determine if repo seems stale - - By stale, we mean: - - state=INIT - - timestamp really, really old - """ - timeout = 36000 - # XXX - config - if self.state != koji.REPO_INIT: - return False - times = [self.event_ts] - # the mtime is also factored in because a repo can be - # created from an older event and should not be expired based solely on - # that event's timestamp. - path = self.get_path() - if os.path.exists(path): - try: - times.append(os.stat(path).st_mtime) - except Exception: - self.logger.error("Can't read mtime for %s" % path) - return False - else: - times.append(self.first_seen) - self.logger.warning("Repo %d is in INIT state, " - "but doesn't have directory %s yet?" % (self.repo_id, path)) - age = time.time() - max(times) - return age > timeout - - def tryDelete(self, logger): - """Remove the repo from disk, if possible""" - path = self.get_path() - if not path: - # get_path already warned - return False - if self.dist: - lifetime = self.options.dist_repo_lifetime - else: - lifetime = self.options.deleted_repo_lifetime - # (should really be called expired_repo_lifetime) - try: - # also check dir age. We do this because a repo can be created from an older event - # and should not be removed based solely on that event's timestamp. - mtime = os.stat(path).st_mtime - except OSError as e: - if e.errno == 2: - # No such file or directory, so the repo either never existed, - # or has already been deleted, so allow it to be marked deleted. - logger.info("Repo directory does not exist: %s" % path) - pass - else: - logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror)) - return False - else: - times = [self.event_ts, mtime, self.expire_ts] - times = [ts for ts in times if ts is not None] - age = time.time() - max(times) - logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age) - if age < lifetime: - return False - logger.debug("Attempting to delete repo %s.." % self.repo_id) - if self.state != koji.REPO_EXPIRED: - raise koji.GenericError("Repo not expired") - if self.session.repoDelete(self.repo_id) > 0: - # cannot delete, we are referenced by a buildroot - logger.debug("Cannot delete repo %s, still referenced" % self.repo_id) - return False - logger.info("Deleted repo %s" % self.repo_id) - self.state = koji.REPO_DELETED - if os.path.islink(path): - # expected for repos on other volumes - info = self.get_info() - if not os.path.exists(path): - logger.error('Repo volume link broken: %s', path) - return False - if not info or 'volume' not in info: - logger.error('Missing repo.json in %s', path) - return False - realpath = self.get_path(volume=info['volume']) - if not os.path.exists(realpath): - logger.error('Repo real path missing: %s', realpath) - return False - if not os.path.samefile(path, realpath): - logger.error('Incorrect volume link: %s', path) - return False - # ok, try to remove the symlink - try: - os.unlink(path) - except OSError: - logger.error('Unable to remove volume link: %s', path) - else: - realpath = path - try: - rmtree(realpath) - except BaseException: - logger.error(''.join(traceback.format_exception(*sys.exc_info()))) - - return True - - def ready(self): - return self.state == koji.REPO_READY - - def deleted(self): - return self.state == koji.REPO_DELETED - - def problem(self): - return self.state == koji.REPO_PROBLEM - - -class RepoManager(object): - - def __init__(self, options, session): - self.options = options - self._local = threading.local() - self._local.session = session - self.repos = {} - self.external_repos = {} - self.tasks = {} - self.recent_tasks = {} - self.other_tasks = {} - self.needed_tags = {} - self.tag_use_stats = {} - self.delete_pids = {} - self.delete_queue = [] - self.logger = logging.getLogger("koji.repo.manager") - - @property - def session(self): - # session is stored in our threadlocal instance - return self._local.session - - @session.setter - def session(self, value): - self._local.session = value - - def printState(self): - self.logger.debug('Tracking %i repos, %i child processes', - len(self.repos), len(self.delete_pids)) - for tag_id, task_id in six.iteritems(self.tasks): - self.logger.debug("Tracking task %s for tag %s", task_id, tag_id) - for pid, desc in six.iteritems(self.delete_pids): - self.logger.debug("Delete job %s: %r", pid, desc) - - def rmtree(self, path): - """Spawn (or queue) and rmtree job""" - self.logger.info("Queuing rmtree job for %s", path) - self.delete_queue.append(path) - self.checkQueue() - - def checkQueue(self): - finished = [pid for pid in self.delete_pids if self.waitPid(pid)] - for pid in finished: - path = self.delete_pids[pid] - self.logger.info("Completed rmtree job for %s", path) - del self.delete_pids[pid] - while self.delete_queue and len(self.delete_pids) <= self.options.max_delete_processes: - path = self.delete_queue.pop(0) - pid = self._rmtree(path) - self.logger.info("Started rmtree (pid %i) for %s", pid, path) - self.delete_pids[pid] = path - - def waitPid(self, pid): - # XXX - can we unify with TaskManager? - prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid)) - try: - (childpid, status) = os.waitpid(pid, os.WNOHANG) - except OSError as e: - if e.errno != errno.ECHILD: - # should not happen - raise - # otherwise assume the process is gone - self.logger.info("%s: %s" % (prefix, e)) - return True - if childpid != 0: - self.logger.info(parseStatus(status, prefix)) - return True - return False - - def _rmtree(self, path): - pid = os.fork() - if pid: - return pid - # no return - try: - status = 1 - self.session._forget() - try: - rmtree(path) - status = 0 - except BaseException: - logger.error(''.join(traceback.format_exception(*sys.exc_info()))) - logging.shutdown() - finally: - os._exit(status) - - def killChildren(self): - # XXX - unify with TaskManager? - sig = signal.SIGTERM - for pid in self.delete_pids: - try: - os.kill(pid, sig) - except OSError as e: - if e.errno != errno.ESRCH: - logger.error("Unable to kill process %s", pid) - - def readCurrentRepos(self): - self.logger.debug("Reading current repo data") - repodata = self.session.getActiveRepos() - self.logger.debug("Repo data: %r" % repodata) - - for data in repodata: - repo_id = data['id'] - repo = self.repos.get(repo_id) - if repo: - # we're already tracking it - if repo.state != data['state']: - self.logger.info( - 'State changed for repo %s: %s -> %s', - repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']]) - repo.state = data['state'] - else: - self.logger.info('Found repo %s, state=%s' - % (repo_id, koji.REPO_STATES[data['state']])) - repo = ManagedRepo(self, data, repodata) - self.repos[repo_id] = repo - if not getTag(self.session, repo.tag_id) and not repo.expired(): - self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id) - repo.expire() - if len(self.repos) > len(repodata): - # This shouldn't normally happen, but might if someone else calls - # repoDelete or similar - active = set([r['id'] for r in repodata]) - for repo_id in to_list(self.repos.keys()): - if repo_id not in active: - self.logger.info('Dropping entry for inactive repo: %s', repo_id) - del self.repos[repo_id] - - def checkExternalRepo(self, ts, repodata, tag): - """Determine which external repos are current, return True if remote repo is newer""" - url = repodata['url'] - if url not in self.external_repos: - self.external_repos[url] = 0 - arches = [] # placeholder for repos without $arch bit - try: - arches = getTag(self.session, tag)['arches'].split() - except AttributeError: - pass - for arch in arches: - if '$arch' in url: - arch_url = url.replace('$arch', arch) - else: - arch_url = url - arch_url = os.path.join(arch_url, 'repodata/repomd.xml') - self.logger.debug('Checking external url: %s' % arch_url) - try: - r = requests.get(arch_url, timeout=5) - root = ElementTree.fromstring(r.text) - for child in root.iter('{http://linux.duke.edu/metadata/repo}timestamp'): - remote_ts = int(child.text) - if remote_ts > self.external_repos[url]: - self.external_repos[url] = remote_ts - except Exception: - # inaccessible or without timestamps - # treat repo as unchanged (ts = 0) - pass - return ts < self.external_repos[url] - - def reposToCheck(self): - to_check = [] - repo_ids = to_list(self.repos.keys()) - for repo_id in repo_ids: - repo = self.repos.get(repo_id) - if repo is None: - # removed by main thread - continue - if not repo.current: - # no point in checking again - continue - if repo.state not in (koji.REPO_READY, koji.REPO_INIT): - repo.current = False - if repo.expire_ts is None: - repo.expire_ts = time.time() - # also no point in further checking - continue - to_check.append(repo) - if self.logger.isEnabledFor(logging.DEBUG): - skipped = set(repo_ids).difference([r.repo_id for r in to_check]) - self.logger.debug("Skipped check for repos: %r", skipped) - return to_check - - def checkExternalRepos(self): - """Determine which external repos changed""" - # clean external repo cache - self.external_repos = {} - for repo in self.reposToCheck(): - changed = False - for tag in repo.taglist: - try: - external_repos = self.session.getExternalRepoList(tag) - except koji.GenericError: - # in case tag was deleted, checkCurrentRepos is - # responsible for cleanup, ignore it here - external_repos = [] - for external_repo in external_repos: - changed = self.checkExternalRepo(repo.event_ts, external_repo, tag) - self.logger.debug("Check external repo %s [%s] for tag %s: %s" % ( - external_repo['external_repo_id'], external_repo['url'], - tag, changed)) - if changed: - break - if changed: - break - if changed: - self.logger.info("Repo %i no longer current due to external repo change" % - repo.repo_id) - repo.current = False - repo.expire_ts = time.time() - - def checkCurrentRepos(self): - """Determine which repos are current""" - for repo in self.reposToCheck(): - if self.session.tagChangedSinceEvent(repo.event_id, repo.taglist): - self.logger.info("Repo %i no longer current", repo.repo_id) - repo.current = False - repo.expire_ts = time.time() - - def currencyChecker(self, session): - """Continually checks repos for currency. Runs as a separate thread""" - self.session = session - self.logger = logging.getLogger("koji.repo.currency") - self.logger.info('currencyChecker starting') - try: - while True: - self.checkCurrentRepos() - time.sleep(self.options.sleeptime) - except Exception: - self.logger.exception('Error in currency checker thread') - raise - finally: - session.logout() - - def currencyExternalChecker(self, session): - """Continually checks repos for external repo currency. Runs as a separate thread""" - self.session = session - self.logger = logging.getLogger("koji.repo.currency_external") - self.logger.info('currencyExternalChecker starting') - try: - while True: - self.checkExternalRepos() - time.sleep(self.options.sleeptime) - except Exception: - self.logger.exception('Error in external currency checker thread') - raise - finally: - session.logout() - - def regenLoop(self, session): - """Triggers regens as needed/possible. Runs in a separate thread""" - self.session = session - self.logger = logging.getLogger("koji.repo.regen") - self.logger.info('regenLoop starting') - try: - while True: - self.regenRepos() - time.sleep(self.options.sleeptime) - except Exception: - self.logger.exception('Error in regen thread') - raise - finally: - session.logout() - - def deleteLoop(self, session): - """Triggers regens as needed/possible. Runs in a separate thread""" - self.session = session - self.delete_logger = logging.getLogger("koji.repo.delete") - self.delete_logger.info('deleteLoop starting') - try: - while True: - self.deleteRepos() - time.sleep(self.options.sleeptime) - except Exception: - self.delete_logger.exception('Error in delete thread') - raise - finally: - session.logout() - - def pruneLocalRepos(self): - for volinfo in self.session.listVolumes(): - volumedir = pathinfo.volumedir(volinfo['name']) - repodir = "%s/repos" % volumedir - self._pruneLocalRepos(repodir, self.options.deleted_repo_lifetime) - distrepodir = "%s/repos-dist" % volumedir - self._pruneLocalRepos(distrepodir, self.options.dist_repo_lifetime) - - def _pruneLocalRepos(self, topdir, max_age): - """Scan filesystem for repos and remove any deleted ones - - Also, warn about any oddities""" - if self.delete_pids: - # skip - return - if not os.path.exists(topdir): - self.logger.debug("%s doesn't exist, skipping", topdir) - return - if not os.path.isdir(topdir): - self.logger.warning("%s is not directory, skipping", topdir) - return - self.logger.debug("Scanning %s for repos", topdir) - self.logger.debug('max age allowed: %s seconds', max_age) - for tag in os.listdir(topdir): - tagdir = "%s/%s" % (topdir, tag) - if not os.path.isdir(tagdir): - self.logger.debug("%s is not a directory, skipping", tagdir) - continue - for repo_id in os.listdir(tagdir): - if repo_id == 'latest': - # ignore latest symlinks - continue - try: - repo_id = int(repo_id) - except ValueError: - self.logger.debug("%s/%s not an int, skipping", tagdir, repo_id) - continue - if repo_id in self.repos: - # we're already managing it, no need to deal with it here - continue - repodir = "%s/%s" % (tagdir, repo_id) - try: - # lstat because it could be link to another volume - dirstat = os.lstat(repodir) - except OSError: - # just in case something deletes the repo out from under us - self.logger.debug("%s deleted already?!", repodir) - continue - symlink = False - if stat.S_ISLNK(dirstat.st_mode): - symlink = True - elif not stat.S_ISDIR(dirstat.st_mode): - self.logger.debug("%s not a directory, skipping", repodir) - continue - dir_ts = dirstat.st_mtime - rinfo = self.session.repoInfo(repo_id) - if rinfo is None: - if not self.options.ignore_stray_repos: - age = time.time() - dir_ts - self.logger.debug("did not expect %s; age: %s", - repodir, age) - if age > max_age: - self.logger.info( - "Removing unexpected directory (no such repo): %s", repodir) - if symlink: - os.unlink(repodir) - else: - self.rmtree(repodir) - continue - if rinfo['tag_name'] != tag: - self.logger.warning( - "Tag name mismatch (rename?): %s vs %s", tag, rinfo['tag_name']) - continue - if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM): - age = time.time() - max(rinfo['create_ts'], dir_ts) - self.logger.debug("potential removal candidate: %s; age: %s" % (repodir, age)) - if age > max_age: - logger.info("Removing stray repo (state=%s): %s", - koji.REPO_STATES[rinfo['state']], repodir) - if symlink: - os.unlink(repodir) - else: - self.rmtree(repodir) - - def tagUseStats(self, tag_id): - stats = self.tag_use_stats.get(tag_id) - now = time.time() - if stats and now - stats['ts'] < 3600: - # use the cache - return stats - data = self.session.listBuildroots(tagID=tag_id, - queryOpts={'order': '-create_event_id', 'limit': 100}) - # XXX magic number (limit) - if data: - tag_name = data[0]['tag_name'] - else: - tag_name = "#%i" % tag_id - stats = {'data': data, 'ts': now, 'tag_name': tag_name} - recent = [x for x in data if now - x['create_ts'] < 3600 * 24] - # XXX magic number - stats['n_recent'] = len(recent) - self.tag_use_stats[tag_id] = stats - self.logger.debug("tag %s recent use count: %i" % (tag_name, len(recent))) - return stats - - def setTagScore(self, entry): - """Set score for needed_tag entry - - We score the tags by two factors - - age of current repo - - last use in a buildroot - - Having an older repo or a higher use count gives the tag a higher - priority for regen. The formula attempts to keep the last use factor - from overpowering, so that tags with very old repos still get priority - """ - - stats = self.tagUseStats(entry['taginfo']['id']) - # normalize use count - max_n = max([t.get('n_recent', 0) for t in self.needed_tags.values()] or [1]) - if max_n == 0: - # no recent use or missing data - max_n = 1 - adj = stats['n_recent'] * 9.0 // max_n + 1 # 1.0 to 10.0 - ts = entry['expire_ts'] - age = time.time() - ts - # XXX - need to make sure our times aren't far off, otherwise this - # scoring could have the opposite of the desired effect - if age < 0: - self.logger.warning("Needed tag has future expire_ts: %r", entry) - age = 0 - entry['score'] = age * adj - self.logger.debug("Needed tag %s got score %.2f", - entry['taginfo']['name'], entry['score']) - # so a day old unused repo gets about the regen same score as a - # 2.4-hour-old, very popular repo - - def updateTagScores(self): - for entry in self.needed_tags.values(): - self.setTagScore(entry) - - def updateRepos(self): - self.logger.debug("Updating repos") - - self.readCurrentRepos() - - # check for stale repos - for repo in to_list(self.repos.values()): - if repo.stale(): - repo.expire() - - # find out which tags require repos - self.checkNeeded() - - self.updateTagScores() - - if self.options.queue_file: - with open(self.options.queue_file, "wt") as f: - fmt = "%-40s %7s %5s\n" - f.write(fmt % ("Tag", "Expired", "Score")) - for tag in sorted(self.needed_tags.values(), key=lambda t: t['score'], - reverse=True): - time_expired = time.time() - tag['expire_ts'] - f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score']))) - - def checkTasks(self): - """Check on newRepo tasks - - - update taskinfo - - remove finished tasks - - check for other newRepo tasks (not generated by us) - """ - - # prune recent tasks - now = time.time() - for task_id in list(self.recent_tasks): - if now - self.recent_tasks[task_id] > self.options.recent_tasks_lifetime: - del self.recent_tasks[task_id] - - # check on current tasks - task_ids = list(self.tasks) - self.session.multicall = True - for task_id in task_ids: - self.session.getTaskInfo(task_id) - for task_id, [tinfo] in zip(task_ids, self.session.multiCall(strict=True)): - tstate = koji.TASK_STATES[tinfo['state']] - tag_id = self.tasks[task_id]['tag_id'] - if tstate == 'CLOSED': - self.logger.info("Finished: newRepo task %s for tag %s", task_id, tag_id) - self.recent_tasks[task_id] = time.time() - del self.tasks[task_id] - elif tstate in ('CANCELED', 'FAILED'): - self.logger.info( - "Problem: newRepo task %s for tag %s is %s", task_id, tag_id, tstate) - self.recent_tasks[task_id] = time.time() - del self.tasks[task_id] - else: - self.tasks[task_id]['taskinfo'] = tinfo - # TODO: implement a timeout - - # also check other newRepo tasks - repo_tasks = self.session.listTasks(opts={'method': 'newRepo', - 'state': ([koji.TASK_STATES[s] - for s in ('FREE', 'OPEN')])}) - others = [t for t in repo_tasks if t['id'] not in self.tasks] - for tinfo in others: - if tinfo['id'] not in self.other_tasks: - self.logger.info("Untracked newRepo task: %(id)i", tinfo) - # note: possible race here, but only a log message - # TODO - determine tag and maven support - self.other_tasks = dict([(t['id'], t) for t in others]) - - def checkNeeded(self): - """Determine which tags currently need regeneration""" - - n_need = len(self.needed_tags) - ignore = self.options.ignore_tags.split() - self.build_tags = set([ - t['build_tag'] for t in self.session.getBuildTargets() - if not koji.util.multi_fnmatch(t['build_tag_name'], ignore) - ]) - # index repos by tag - tag_repos = {} - for repo in to_list(self.repos.values()): - tag_repos.setdefault(repo.tag_id, []).append(repo) - - for tag_id in self.build_tags: - covered = False - for repo in tag_repos.get(tag_id, []): - if repo.current: - covered = True - break - elif repo.pending(): - # one on the way - covered = True - break - if tag_id in self.needed_tags: - entry = self.needed_tags[tag_id] - if covered: - # no longer needed - self.logger.info("Tag %(name)s has a current or in " - "progress repo", entry['taginfo']) - del self.needed_tags[tag_id] - # if not covered, we already know - continue - if covered: - continue - - # we haven't noted this need yet - taginfo = self.session.getTag(tag_id) - # (not using the caching version since we only call upon discovery) - if not taginfo: - self.logger.warning('Tag disappeared: %i', tag_id) - continue - self.logger.info('Tag needs regen: %(name)s', taginfo) - - # how expired are we? - ts = 0 - for repo in tag_repos.get(tag_id, []): - if repo.expire_ts: - if repo.expire_ts > ts: - ts = repo.expire_ts - else: - self.logger.warning("No expire timestamp for repo: %s", repo.repo_id) - if ts == 0: - ts = time.time() - - entry = { - 'taginfo': taginfo, - 'expire_ts': ts, - 'needed_since': time.time(), - } - self.setTagScore(entry) - self.needed_tags[tag_id] = entry - - # some cleanup - for tag_id in list(self.needed_tags): - entry = self.needed_tags.get(tag_id) - if tag_id not in self.build_tags: - self.logger.info("Tag %(name)s is no longer a build tag", - entry['taginfo']) - del self.needed_tags[tag_id] - for tag_id, repolist in tag_repos.items(): - if tag_id not in self.build_tags: - # repos for these tags are no longer required - for repo in repolist: - if repo.ready(): - repo.expire() - - if n_need != len(self.needed_tags): - self.logger.info('Needed tags count went from %i to %i', n_need, - len(self.needed_tags)) - - def regenRepos(self): - """Trigger newRepo tasks for needed tags""" - - self.checkTasks() - self.logger.debug("Current tasks: %r" % self.tasks) - if self.other_tasks: - self.logger.debug("Found %i untracked newRepo tasks", - len(self.other_tasks)) - - # first note currently running tasks - running_tasks = 0 - running_tasks_maven = 0 - for task in self.tasks.values(): - if task['taskinfo']['waiting']: - self.logger.debug("Task %(id)i is waiting", task) - else: - # The largest hub impact is from the first part of the newRepo - # task. Once it is waiting on subtasks, that part is over - running_tasks += 1 - if task['maven']: - running_tasks_maven += 1 - - debuginfo_pat = self.options.debuginfo_tags.split() - src_pat = self.options.source_tags.split() - separate_src_pat = self.options.separate_source_tags.split() - order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True) - for tag in order: - if running_tasks >= self.options.max_repo_tasks: - self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks))) - self.logger.info("Maximum number of repo tasks reached") - return - elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit: - self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks))) - self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks))) - self.logger.info("Repo task limit reached") - return - tagname = tag['taginfo']['name'] - task_id = tag.get('task_id') - if task_id: - if task_id in self.tasks: - # we already have a task - continue - elif task_id in self.recent_tasks: - # avoiding a race, see https://pagure.io/koji/issue/942 - continue - else: - # should not happen - logger.warning('Needed tag refers to unknown task. ' - '%s -> %i', tagname, task_id) - # we'll advance and create a new task - taskopts = {} - if koji.util.multi_fnmatch(tagname, debuginfo_pat): - taskopts['debuginfo'] = True - if koji.util.multi_fnmatch(tagname, src_pat): - taskopts['src'] = True - if koji.util.multi_fnmatch(tagname, separate_src_pat): - taskopts['separate_src'] = True - maven = tag['taginfo']['maven_support'] - if maven: - if running_tasks_maven >= self.options.max_repo_tasks_maven: - continue - task_id = self.session.newRepo(tagname, **taskopts) - running_tasks += 1 - if maven: - running_tasks_maven += 1 - expire_ts = tag['expire_ts'] - if expire_ts == 0: # can this still happen? - time_expired = '???' - else: - time_expired = "%.1f" % (time.time() - expire_ts) - self.logger.info("Created newRepo task %s for tag %s (%s), " - "expired for %s sec", task_id, tag['taginfo']['id'], - tag['taginfo']['name'], time_expired) - self.tasks[task_id] = { - 'id': task_id, - 'taskinfo': self.session.getTaskInfo(task_id), - 'tag_id': tag['taginfo']['id'], - 'maven': maven, - } - tag['task_id'] = task_id - if running_tasks_maven >= self.options.max_repo_tasks_maven: - self.logger.info("Maximum number of maven repo tasks reached") - - def deleteRepos(self): - # trigger deletes - self.delete_logger.debug("Starting delete repos") - n = 0 - for repo in to_list(self.repos.values()): - if repo.expired(): - # try to delete - if repo.tryDelete(self.delete_logger): - del self.repos[repo.repo_id] - n += 1 - self.delete_logger.debug("Ending delete repos (deleted: %s)" % n) - - -def start_currency_checker(session, repomgr): - subsession = session.subsession() - thread = threading.Thread(name='currencyChecker', - target=repomgr.currencyChecker, args=(subsession,)) - thread.setDaemon(True) - thread.start() - return thread - - -def start_external_currency_checker(session, repomgr): - subsession = session.subsession() - thread = threading.Thread(name='currencyExternalChecker', - target=repomgr.currencyExternalChecker, args=(subsession,)) - thread.setDaemon(True) - thread.start() - return thread - - -def start_regen_loop(session, repomgr): - subsession = session.subsession() - thread = threading.Thread(name='regenLoop', - target=repomgr.regenLoop, args=(subsession,)) - thread.setDaemon(True) - thread.start() - return thread - - -def start_delete_loop(session, repomgr): - subsession = session.subsession() - thread = threading.Thread(name='deleteLoop', - target=repomgr.deleteLoop, args=(subsession,)) - thread.setDaemon(True) - thread.start() - return thread - - -def main(options, session): - repomgr = RepoManager(options, session) - repomgr.readCurrentRepos() - - def shutdown(*args): - raise SystemExit - signal.signal(signal.SIGTERM, shutdown) - curr_chk_thread = start_currency_checker(session, repomgr) - if options.check_external_repos: - curr_ext_chk_thread = start_external_currency_checker(session, repomgr) - regen_thread = start_regen_loop(session, repomgr) - delete_thread = start_delete_loop(session, repomgr) - # TODO also move rmtree jobs to threads - logger.info("Entering main loop") - while True: - try: - repomgr.updateRepos() - repomgr.checkQueue() - repomgr.printState() - repomgr.pruneLocalRepos() - if not curr_chk_thread.is_alive(): - logger.error("Currency checker thread died. Restarting it.") - curr_chk_thread = start_currency_checker(session, repomgr) - if options.check_external_repos and not curr_ext_chk_thread.is_alive(): - logger.error("External currency checker thread died. Restarting it.") - curr_ext_chk_thread = start_external_currency_checker(session, repomgr) - if not regen_thread.is_alive(): - logger.error("Regeneration thread died. Restarting it.") - regen_thread = start_regen_loop(session, repomgr) - if not delete_thread.is_alive(): - logger.error("Delete thread died. Restarting it.") - delete_thread = start_delete_loop(session, repomgr) - except KeyboardInterrupt: - logger.warning("User exit") - break - except koji.AuthExpired: - logger.warning("Session expired") - break - except SystemExit: - logger.warning("Shutting down") - break - except Exception: - # log the exception and continue - logger.error(''.join(traceback.format_exception(*sys.exc_info()))) - try: - time.sleep(options.sleeptime) - except KeyboardInterrupt: - logger.warning("User exit") - break - try: - repomgr.checkQueue() - repomgr.killChildren() - finally: - session.logout() - - -def get_options(): - """process options from command line and config file""" - # parse command line args - parser = OptionParser("usage: %prog [opts]") - parser.add_option("-c", "--config", dest="configFile", - help="use alternate configuration file", metavar="FILE", - default="/etc/kojira/kojira.conf") - parser.add_option("--user", help="specify user") - parser.add_option("--password", help="specify password") - parser.add_option("--principal", help="Kerberos principal") - parser.add_option("--keytab", help="Kerberos keytab") - parser.add_option("-f", "--fg", dest="daemon", - action="store_false", default=True, - help="run in foreground") - parser.add_option("-d", "--debug", action="store_true", - help="show debug output") - parser.add_option("-q", "--quiet", action="store_true", - help="don't show warnings") - parser.add_option("-v", "--verbose", action="store_true", - help="show verbose output") - parser.add_option("--force-lock", action="store_true", default=False, - help="force lock for exclusive session") - parser.add_option("--debug-xmlrpc", action="store_true", default=False, - help="show xmlrpc debug output") - parser.add_option("--skip-main", action="store_true", default=False, - help="don't actually run main") - parser.add_option("--show-config", action="store_true", default=False, - help="Show config and exit") - parser.add_option("--sleeptime", type='int', help="Specify the polling interval") - parser.add_option("-s", "--server", help="URL of XMLRPC server") - parser.add_option("--topdir", help="Specify topdir") - parser.add_option("--logfile", help="Specify logfile") - parser.add_option("--queue-file", - help="If specified, queue is dumped to separate status file each cycle") - (options, args) = parser.parse_args() - - config = koji.read_config_files(options.configFile) - section = 'kojira' - for x in config.sections(): - if x != section: - quit('invalid section found in config file: %s' % x) - defaults = {'debuginfo_tags': '', - 'source_tags': '', - 'separate_source_tags': '', - 'ignore_tags': '', - 'verbose': False, - 'debug': False, - 'ignore_stray_repos': False, - 'topdir': '/mnt/koji', - 'server': None, - 'logfile': '/var/log/kojira.log', - 'principal': None, - 'keytab': '/etc/kojira/kojira.keytab', - 'ccache': '/var/tmp/kojira.ccache', - 'retry_interval': 60, - 'max_retries': 120, - 'offline_retry': True, - 'offline_retry_interval': 120, - 'no_ssl_verify': False, - 'max_delete_processes': 4, - 'max_repo_tasks': 4, - 'max_repo_tasks_maven': 2, - 'repo_tasks_limit': 10, - 'deleted_repo_lifetime': 7 * 24 * 3600, - # XXX should really be called expired_repo_lifetime - 'dist_repo_lifetime': 7 * 24 * 3600, - 'check_external_repos': False, - 'recent_tasks_lifetime': 600, - 'sleeptime': 15, - 'cert': None, - 'ca': '', # FIXME: unused, remove in next major release - 'serverca': None, - 'queue_file': None, - } - if config.has_section(section): - int_opts = ('deleted_repo_lifetime', 'max_repo_tasks', 'repo_tasks_limit', - 'retry_interval', 'max_retries', 'offline_retry_interval', - 'max_delete_processes', 'max_repo_tasks_maven', 'dist_repo_lifetime', - 'sleeptime', 'recent_tasks_lifetime') - str_opts = ('topdir', 'server', 'user', 'password', 'logfile', 'principal', 'keytab', - 'cert', 'ca', 'serverca', 'debuginfo_tags', 'queue_file', - 'source_tags', 'separate_source_tags', 'ignore_tags') # FIXME: remove ca here - bool_opts = ('verbose', 'debug', 'ignore_stray_repos', 'offline_retry', - 'no_ssl_verify', 'check_external_repos') - legacy_opts = ('with_src', 'delete_batch_size') - for name in config.options(section): - if name in int_opts: - defaults[name] = config.getint(section, name) - elif name in str_opts: - defaults[name] = config.get(section, name) - elif name in bool_opts: - defaults[name] = config.getboolean(section, name) - elif name in legacy_opts: - deprecated('The %s configuration option is no longer used\n' % name) - else: - quit("unknown config option: %s" % name) - for name, value in defaults.items(): - if getattr(options, name, None) is None: - setattr(options, name, value) - if options.logfile in ('', 'None', 'none'): - options.logfile = None - # special handling for cert defaults - cert_defaults = { - 'cert': '/etc/kojira/client.crt', - 'serverca': '/etc/kojira/serverca.crt', - } - for name in cert_defaults: - if getattr(options, name, None) is None: - fn = cert_defaults[name] - if os.path.exists(fn): - setattr(options, name, fn) - return options - - -def quit(msg=None, code=1): - if msg: - logging.getLogger("koji.repo").error(msg) - sys.stderr.write('%s\n' % msg) - sys.stderr.flush() - sys.exit(code) - - -if __name__ == "__main__": - - options = get_options() - topdir = getattr(options, 'topdir', None) - pathinfo = koji.PathInfo(topdir) - if options.show_config: - pprint.pprint(options.__dict__) - sys.exit() - if options.logfile: - if not os.path.exists(options.logfile): - try: - logfile = open(options.logfile, "w") - logfile.close() - except Exception: - sys.stderr.write("Cannot create logfile: %s\n" % options.logfile) - sys.exit(1) - if not os.access(options.logfile, os.W_OK): - sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile) - sys.exit(1) - koji.add_file_logger("koji", options.logfile) - # note we're setting logging for koji.* - logger = logging.getLogger("koji") - if options.debug: - logger.setLevel(logging.DEBUG) - elif options.verbose: - logger.setLevel(logging.INFO) - elif options.quiet: - logger.setLevel(logging.ERROR) - else: - logger.setLevel(logging.WARNING) - - session_opts = koji.grab_session_options(options) - session = koji.ClientSession(options.server, session_opts) - if options.cert is not None and os.path.isfile(options.cert): - # authenticate using SSL client certificates - session.ssl_login(options.cert, None, options.serverca) - elif options.user: - # authenticate using user/password - session.login() - elif koji.reqgssapi and options.principal and options.keytab: - session.gssapi_login(options.principal, options.keytab, options.ccache) - else: - quit("No username/password/certificate supplied and Kerberos missing or not configured") - # get an exclusive session - try: - session.exclusiveSession(force=options.force_lock) - except koji.AuthLockError: - quit("Error: Unable to get lock. Trying using --force-lock") - if not session.logged_in: - quit("Error: Unknown login error") - if not session.logged_in: - print("Error: unable to log in") - sys.exit(1) - if options.skip_main: - sys.exit() - elif options.daemon: - koji.daemonize() - else: - koji.add_stderr_logger("koji") - main(options, session) diff --git a/roles/koji_hub/tasks/main.yml b/roles/koji_hub/tasks/main.yml index c2569ac..c08d362 100644 --- a/roles/koji_hub/tasks/main.yml +++ b/roles/koji_hub/tasks/main.yml @@ -465,15 +465,3 @@ tags: - files - koji_hub - -# for now we have a patched version of kojira -# Based on the 1.22.0 version -# With: -# https://pagure.io/koji/pull-request/2140.patch -# Hopefully all merged in 1.23. -# -- name: copy in patched kojira - copy: src=kojira dest=/usr/sbin/kojira - tags: - - files - - koji_hub