From 7a4894bc01805dcdc3e53d7bbb43dbac729f1b31 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Nov 04 2020 12:44:15 +0000 Subject: Merge #49 `cli plugin: replicate-tasks` --- diff --git a/src/plugins/cli/replicate-tasks.py b/src/plugins/cli/replicate-tasks.py new file mode 100644 index 0000000..852a676 --- /dev/null +++ b/src/plugins/cli/replicate-tasks.py @@ -0,0 +1,442 @@ +import queue +import logging +import threading +from enum import Enum +from functools import reduce + +import six + +import koji +import koji.tasks +from koji.plugin import export_cli +from koji_cli.lib import OptionParser, _, watch_tasks, activate_session + + +REPOCACHE = {} + +logger = logging.getLogger('koji.replicateTasks') + + +class Strategy(Enum): + # reusing the *versioned* build tag of the original task + reuse = 0 + # cloning the *versioned* group/pkglist/build to a new tag, then the build tag inherits from it + # and a overriiden-tag specified by --override-tag, where extra_config and others can be + # overridden. + clone = 1 + + +@export_cli +def handle_replicate_tasks(options, session, args): + """[admin] Replicate tasks""" + (parser, opts, task_ids) = parse_options(options, args) + if options.debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + task_ids = check_options(parser, opts, task_ids) + activate_session(session, options) + channel_override = getattr(opts, 'channel_override', None) + if channel_override: + opts.channels_override = session.getChannel(channel_override, + strict=True)['id'] + if not task_ids: + tasks = get_tasks(session, parser, opts) + else: + tasks = task_ids + b_queue = queue.Queue() + for task in tasks: + b_queue.put(task) + threads = [] + for i in range(4): + subsession = session.subsession() + thread = threading.Thread(name='replicator %i' % i, + target=replicate_handler, + args=(subsession, b_queue, opts)) + thread.setDaemon(True) + thread.start() + threads.append(thread) + for t in threads: + t.join() + + +def get_tasks(session, parser, opts): + channels = opts.channels + hosts = opts.hosts + methods = opts.methods + states = getattr(opts, 'states', None) + limit = getattr(opts, 'limit', None) + offset = getattr(opts, 'offset', None) + channel_ids = [] + if channels: + for channel in channels: + channel_ids.append(session.getChannel(channel, strict=True)['id']) + host_ids = [] + if hosts: + for host in hosts: + host_ids.append(session.getHost(host, strict=True)['id']) + state_nums = [] + for state in states: + if isinstance(state, six.integer_types): + if 0 <= state <= 5: + state_nums.append(state) + else: + raise koji.GenericError("integer state should >=0 and <=5") + elif isinstance(state, six.string_types): + state_nums.append(koji.TASK_STATES[state]) + else: + raise koji.GenericError("unacceptable state type") + options = {} + if state_nums: + options['state'] = state_nums + options['parent'] = None + options['decode'] = True + + queryOpts = {} + if limit: + queryOpts['limit'] = limit + if offset: + queryOpts['offset'] = offset + queryOpts['order'] = '-id' + cvs = [] + with session.multicall() as m: + if channel_ids: + for channel_id in channel_ids: + if methods: + for method in methods: + cvs.append((m.listTasks(dict(options, method=method, + channel_id=channel_id), + queryOpts))) + else: + cvs.append(m.listTasks(dict(options, channel_id=channel_id), queryOpts)) + elif host_ids: + for host_id in host_ids: + if methods: + for method in methods: + cvs.append(m.listTasks(dict(options, method=method, host_id=host_id), + queryOpts)) + else: + cvs.append(m.listTasks(dict(options, host_id=host_id), queryOpts)) + else: + if methods: + for method in methods: + cvs.append(m.listTasks(dict(options, method=method), queryOpts)) + else: + cvs.append(m.listTasks(options, queryOpts)) + tasks = sum([cv.result for cv in cvs], []) + print(tasks) + if not tasks: + raise koji.GenericError("no tasks to replicate.") + else: + logger.debug("to replicate tasks:\n%s", "\n".join([str(t['id']) for t in tasks])) + return tasks + + +def parse_options(options, args): + usage = _("usage: %prog replicate-tasks [options] [...]") + usage += _( + "\nto replicate scratch tasks from existing tasks with specified IDs" + " or by query" + "\n(Specify the --help global option for a list of" + " other help options)") + parser = OptionParser(usage=usage) + parser.disable_interspersed_args() + parser.add_option("-s", "--strategy", default=Strategy.reuse.name, + help=_("specify the strategy to construct the buildroot for" + " replicating the task, Options: %s," + " [Default: %%default]." % ", ".join(Strategy.__members__.keys()))) + parser.add_option("-T", "--override-tag", + help=_("specify the tag in the inheritance to override the content / config" + " of the origin build tag when strategy is clone")) + parser.add_option("-C", "--channel", dest="channels", action="append", + default=[], + help=_("specify channels where tasks are from")) + parser.add_option("-H", "--host", dest="hosts", action="append", + default=[], + help=_("specify hosts where tasks are replicated from")) + parser.add_option("-m", "--method", dest="methods", action="append", + default=[], + help=_("specify methods that original tasks are. Only supports 'build' now")) + parser.add_option("-S", "--state", dest="states", action="append", + default=['CLOSED'], + help=_("specify states of tasks which are replicated, [Default: %default]")) + parser.add_option("-w", "--weight", type='int', help=_("set task weight")) + parser.add_option("--channel-override", + help=_("use a non-standard channel to replicate tasks")) + parser.add_option("--arch-override", dest="arches", action="append", + default=[], + help=_("to override arches to replicate tasks")) + parser.add_option("--include-scratch", action="store_true", + help=_("also replicate scratch tasks")) + # parser.add_option("--limit-by", default='channel', + # help=_("specify field used by --limit")) + parser.add_option("--limit", type='int', default=3, + help=_("limit per method and/or per channel/host, [Default: %default]")) + parser.add_option("--offset", type="int", default=0, + help=_("offset of limit, [Default: %default]")) + parser.add_option("--quiet", action="store_true", default=options.quiet, + help=_("Do not print the task information")) + + return (parser,) + parser.parse_args(args) + + +def check_options(parser, options, args): + ints = [] + try: + for arg in args: + ints.append(int(arg)) + except ValueError: + parser.error(_("Only task ids are accepted as arguments")) + if options.channels and options.hosts: + parser.error(_("Options: --channel and --host are conflicted")) + if options.strategy not in Strategy.__members__.keys(): + parser.error(_("--strategy must be one of %s" % ", ".join(Strategy.__members__.keys()))) + if options.override_tag and options.strategy != Strategy.clone.name: + parser.error(_("--override-tag is only available when --strategy=%s" + % Strategy.clone.name)) + return ints + + +def replicate_build_task(session, task, options): + if isinstance(task, six.integer_types): + task = session.getTaskInfo(task, request=True) + task_id = task['id'] + logger.info("%i: Looking at task", task_id) + if task['parent'] is not None: + raise koji.GenericError("%(id)i: not a parent task" % task) + if task['method'] == 'build': + params = replicate_build_request(session, task, options) + # TODO: ONLY build task is supported right now + # elif task['method'] in ['image', 'livemedia', 'livecd', 'appliance']: + # params = replicate_image_request(session, task, options) + else: + raise koji.GenericError("%(id)i: can not replicate %(method)s task" % task) + channel = task['channel_id'] + if options.channel_override: + channel = options.channel_override + new_task_id = session.makeTask(task['method'], koji.encode_args(**params), channel=channel) + logger.info("Original task %i replicated as task %i", task_id, new_task_id) + rv = watch_tasks(session, [new_task_id], quiet=options.quiet) + # always True + return True + + +def replicate_build_request(session, task, options): + task_id = task['id'] + include_scratch = getattr(options, 'include_scratch', False) + arches = options.arches + orig_repo = None + for subtask in session.getTaskChildren(task_id, request=True): + if subtask['method'] != 'buildArch': + continue + sub_params = koji.tasks.parse_task_params(subtask['method'], + subtask['request']) + logger.debug('[task#%s] sub-params: %s', task_id, sub_params) + sub_opts = sub_params.get('opts', {}) + if not orig_repo: + orig_repo = sub_opts.get('repo_id', None) + + # duplicate build task + params = koji.tasks.parse_task_params(task['method'], task['request']) + request = params.copy() + opts = params.get('opts', {}) + if not include_scratch and opts.get('scratch'): + raise koji.GenericError("#%i: Skipping scratch build" % task_id) + + if not orig_repo: + raise koji.GenericError( + "#%i: Could not determine original repo" % task_id) + if options.strategy == Strategy.reuse.name: + repo_info = replicate_repo(session, orig_repo) + elif options.strategy == Strategy.clone.name: + repo_info = clone_tag(session, orig_repo, task_id, options.override_tag) + request['target'] = None + else: + raise koji.GenericError("strategy: %s is incorrect (NEVER HAPPENS)" % options.strategy) + opts['repo_id'] = repo_info['id'] + opts['scratch'] = True + if arches: + opts['arch_override'] = ' '.join(arches) + logger.info("%i: override arches: %s", task_id, arches) + request['opts'] = opts + return request + + +# TODO: finish this +def replicate_image_request(session, task, options): + task_id = task['id'] + include_scratch = getattr(options, 'include_scratch', False) + arches = options.arches + + params = koji.tasks.parse_task_params(task['method'], task['request']) + opts = params.get('opts', {}) + noopts = bool(opts) + if not include_scratch and opts.get('scratch'): + raise koji.GenericError("#%i: Skipping scratch build" % task_id) + opts['scratch'] = True + request = task['request'] + if arches: + request[2] = arches + logger.info("%i: override arches: %s", task_id, arches) + if noopts: + request.append(opts) + else: + request[-1] = opts + return request + + +def replicate_repo(session, repo_id, tag_id=None): + global REPOCACHE + logger.info("Replicating repo: #%s, tag: #%s", repo_id, tag_id) + orig_repo_id = repo_id + # FOUND + if repo_id in REPOCACHE: + repo_id = REPOCACHE[repo_id] + repo_info = session.repoInfo(repo_id, strict=True) + if koji.REPO_STATES[repo_info['state']] in ['READY', 'EXPIRED']: + # we'll just reuse this repo + return repo_info + else: + # cached repo has been deleted, regen it! + logger.info('[DELETED] Duplicate repo: #%s, based on: repo: %s ' % (orig_repo_id, + repo_info)) + return new_repo(session, None, repo_info, orig_repo_id) + # NOT FOUND + return new_repo(session, tag_id, repo_id, orig_repo_id) + + +def new_repo(session, tag, src_repo, orig_repo_id): + # cloning + # tag_id is the ID of cloned tag + if tag: + act_repo = session.getRepo(tag, event=None) + if act_repo: + REPOCACHE.setdefault(orig_repo_id, act_repo['id']) + return session.repoInfo(act_repo['id']) + event = None + # reusing/refreshing + else: + if isinstance(src_repo, six.integer_types): + src_repo = session.repoInfo(src_repo) + if src_repo and isinstance(src_repo, dict): + event = src_repo['create_event'] + tag = src_repo['tag_id'] + rtaskid = session.newRepo(tag, event=event) + watch_tasks(session, [rtaskid]) + new_repo_id, event_id = session.getTaskResult(rtaskid) + repo_info = session.repoInfo(new_repo_id) + REPOCACHE.setdefault(orig_repo_id, new_repo_id) + return repo_info + + +def replicate_handler(session, b_queue, options): + while True: + try: + taskinfo = b_queue.get(False) + except queue.Empty: + break + if isinstance(taskinfo, six.integer_types): + task_id = taskinfo + else: + task_id = taskinfo['id'] + logger.info("Replicating build from task #%s", task_id) + try: + replicate_build_task(session, taskinfo, options) + except Exception: + logger.error("An Error occurs when replicating build from task #%s", task_id, + exc_info=True) + + +def clone_tag(session, repo_id, task_id, override_tag_name=None): + if override_tag_name: + override_tag = session.getTag(override_tag_name, strict=True) + # create the tag + prefix = 'task-replication-%s-' % task_id + base_tag_name = prefix + 'base' + build_tag_name = prefix + 'build' + orig_repo_info = session.repoInfo(repo_id, strict=True) + event = orig_repo_info['create_event'] + orig_tag = session.getTag(orig_repo_info['tag_id'], strict=True, event=event) + arches = orig_tag['arches'] + extra = orig_tag['extra'] + extra['cloned_base_tag'] = True + base_tag = session.getTag(base_tag_name, strict=False) + if not base_tag: + logger.info("base tag: %s doesn't exist, creating one", base_tag_name) + base_tag_id = session.createTag(base_tag_name, arches=arches, extra=extra) + force = False + else: + base_tag_id = base_tag['id'] + force = True + logger.info("base tag: %s(#%s) exists", base_tag_name, base_tag_id) + # set the package list + dup_package_list(session, orig_tag, base_tag_id, event, force=force) + # tag our builds + dup_builds(session, orig_tag, base_tag_id, event, force=force) + # groups data + dup_groups(session, orig_tag, base_tag_id, event, force=force) + + base_tag = session.getTag(base_tag_id, strict=True) + build_tag = session.getTag(build_tag_name, strict=False) + if not build_tag: + build_tag_id = session.createTag(build_tag_name, arches=arches, + extra={'clonded_build_tag': True}) + build_tag = session.getTag(build_tag_id, strict=True) + else: + session.editTag2(build_tag['id'], arches=arches, extra={'clonded_build_tag': True}) + old_inheritance = session.getInheritanceData(build_tag['id']) + for p in old_inheritance: + del p['name'] + links = [] + if override_tag: + links.append({ + 'parent_id': override_tag['id'], + 'priority': 5, + 'maxdepth': None, + 'intransitive': False, + 'noconfig': False, + 'pkg_filter': '', + }) + links.append({ + 'parent_id': base_tag['id'], + 'priority': 15, + 'maxdepth': None, + 'intransitive': False, + 'noconfig': False, + 'pkg_filter': '', + }) + comp = [a for a in links if a in old_inheritance] + if comp != links: + session.setInheritanceData(build_tag['id'], links) + + return replicate_repo(session, repo_id, build_tag['id']) + + +def dup_package_list(session, orig_tag, tag_id, event_id, force=False): + pkgs = session.listPackages(tagID=orig_tag['id'], inherited=True, event=event_id) + with session.multicall() as m: + for pkg in pkgs: + m.packageListAdd(tag_id, pkg['package_id'], owner=pkg['owner_id'], + block=pkg['blocked'], extra_arches=pkg['extra_arches'], + force=force, update=False) + + +def dup_builds(session, orig_tag, tag_id, event_id, force=False): + builds = session.listTagged(orig_tag['id'], inherit=True, latest=True, event=event_id) + with session.multicall() as m: + for binfo in builds: + m.tagBuildBypass(tag_id, binfo['id'], force=force, notify=False) + + +def dup_groups(session, orig_tag, tag_id, event_id, force=False): + groups = session.getTagGroups(orig_tag['id'], event=event_id) + with session.multicall() as m: + for grp in groups: + m.groupListAdd(tag_id, grp['name'], block=bool(grp['blocked']), force=False, **grp) + # note: groupListAdd ignores the extra fields in **grp + for pkg in grp['packagelist']: + m.groupPackageListAdd(tag_id, grp['name'], pkg['package'], block=pkg['blocked'], + force=force, **pkg) + for req in grp['grouplist']: + m.groupReqListAdd(tag_id, grp['name'], req['req_id'], block=req['blocked'], + force=force, **req)