From b392566d1caa4c2e61f24dbce4a6ebcf93c734c4 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 14 2022 11:31:31 +0000 Subject: [PATCH 1/20] Merge branch 'scheduler2' into scheduler-all --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index 1217bc8..e3dabba 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7742,3 +7742,62 @@ def anon_handle_userinfo(goptions, session, args): print("Number of tasks: %d" % tasks.result) print("Number of builds: %d" % builds.result) print('') + + +def handle_scheduler_logs(goptions, session, args): + "[monitor] Query scheduler logs" + usage = "usage: %prog scheduler-logs " + parser = OptionParser(usage=get_usage_str(usage)) + parser.add_option("--task", type="int", action="store", + help="Filter by task ID") + parser.add_option("--host", type="str", action="store", + help="Filter by host (name/ID)") + parser.add_option("--level", type="str", action="store", + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + help="Filter by message level") + parser.add_option("--from", type="float", action="store", dest="from_ts", + help="Logs from given timestamp") + parser.add_option("--to", type="float", action="store", dest="to_ts", + help="Logs until given timestamp (included)") + parser.add_option("--logger", type="str", action="store", + help="Filter by logger name") + (options, args) = parser.parse_args(args) + if len(args) != 0: + parser.error("There are no arguments for this command") + + kwargs = {} + if options.task: + kwargs['taskID'] = options.task + if options.host: + try: + kwargs['hostID'] = int(options.host) + except ValueError: + kwargs['hostID'] = session.getHost(options.host)['id'] + if options.level: + kwargs['level'] = options.level + if options.from_ts: + kwargs['from_ts'] = options.from_ts + if options.to_ts: + kwargs['to_ts'] = options.to_ts + if options.logger: + kwargs['logger_name'] = options.logger + + logs = session.scheduler.getLogs(**kwargs) + + mask = ("%(task_id)s\t%(host_name)s\t%(msg_time)s\t%(logger_name)s" + "\t%(level)s\t%(location)s\t%(msg)s") + if not goptions.quiet: + h = mask % { + 'task_id': 'Task', + 'host_name': 'Host', + 'msg_time': 'Time', + 'logger_name': 'Logger', + 'level': 'Level', + 'location': 'Location', + 'msg': 'Message', + } + print(h) + print('-' * len(h)) + + for log in logs: + print(mask % log) diff --git a/docs/schema.sql b/docs/schema.sql index 39b7893..1b689cb 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -955,4 +955,21 @@ CREATE TABLE proton_queue ( ) WITHOUT OIDS; +-- Scheduler tables +CREATE TYPE logger_level AS ENUM ('NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'); +CREATE TABLE scheduler_log_messages ( + id SERIAL NOT NULL PRIMARY KEY, + task_id INTEGER REFERENCES task (id), + host_id INTEGER REFERENCES host (id), + msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + logger_name VARCHAR(200) NOT NULL, + level logger_level NOT NULL, + location VARCHAR(200), + msg TEXT NOT NULL +) WITHOUT OIDS; +CREATE INDEX scheduler_log_messages_task_id ON scheduler_log_messages(task_id); +CREATE INDEX scheduler_log_messages_host_id ON scheduler_log_messages(host_id); +CREATE INDEX scheduler_log_messages_msg_time ON scheduler_log_messages(msg_time); +CREATE INDEX scheduler_log_messages_level ON scheduler_log_messages(level); + COMMIT WORK; diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index 1881a88..eb4ee58 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -42,6 +42,8 @@ from koji.context import context from koji.server import ServerError, BadRequest, RequestTimeout from koji.xmlrpcplus import ExtendedMarshaller, Fault, dumps, getparser +from . import scheduler + class Marshaller(ExtendedMarshaller): @@ -845,8 +847,10 @@ def get_registry(opts, plugins): registry = HandlerRegistry() functions = kojihub.RootExports() hostFunctions = kojihub.HostExports() + schedulerFunctions = scheduler.SchedulerExports() registry.register_instance(functions) registry.register_module(hostFunctions, "host") + registry.register_module(schedulerFunctions, "scheduler") registry.register_function(koji.auth.login) registry.register_function(koji.auth.sslLogin) registry.register_function(koji.auth.logout) diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py new file mode 100644 index 0000000..eebb6d7 --- /dev/null +++ b/kojihub/scheduler.py @@ -0,0 +1,96 @@ +import functools +import logging + +from koji.db import InsertProcessor, QueryProcessor + + +class SchedulerExports(): + def getLogs(self, taskID=None, hostID=None, level=None, + from_ts=None, to_ts=None, logger_name=None): + """Return all related log messages + + :param int taskID: filter by task + :param int hostID: filter by host + :param str level: filter by message level + :param float from_ts: filter from earliest time + :param float to_ts: filter to latest time (from_ts < ts <= to_ts) + :param str logger_name: filter by logger name + :return [dict]: list of messages + """ + fields = ( + ('scheduler_log_messages.id', 'id'), + ('task_id', 'task_id'), + ('host_id', 'host_id'), + ('msg_time', 'msg_time'), + ('logger_name', 'logger_name'), + ('level', 'level'), + ('location', 'location'), + ('msg', 'msg'), + ('hosts.name', 'host_name'), + ) + clauses = [] + values = {} + if taskID is not None: + clauses.append("taskID = %(taskID)") + values['taskID'] = taskID + if hostID is not None: + clauses.append("hostID = %(hostID)") + values['hostID'] = hostID + if level is not None: + clauses.append("level = %(level)s") + values['level'] = level.upper() + if from_ts is not None: + clauses.append("msg_time > %(from_ts)s") + values['from_ts'] = float(from_ts) + if to_ts is not None: + clauses.append("msg_time <= %(to_ts)s") + values['to_ts'] = float(to_ts) + if logger_name is not None: + clauses.append("logger_name = %(to_ts)s") + values['logger_name'] = logger_name + + columns, aliases = zip(*fields) + query = QueryProcessor(tables=['scheduler_log_messages'], + columns=columns, aliases=aliases, + joins=['hosts ON host_id = hosts.id'], + clauses=clauses, values=values, + opts={'order': 'msg_time'}) + return query.execute() + + +class DBLogger(object): + """DBLogger class for encapsulating scheduler logging. It is thread-safe + as both logging parts do this per se (loggind + DB handler via context)""" + + def __init__(self, logger_name=None): + if logger_name: + self.logger = logger_name + else: + self.logger = 'koji.scheduler' + + def log(self, msg, logger_name=None, level=logging.NOTSET, + task_id=None, host_id=None, location=None): + if not logger_name: + logger_name = self.logger + # log to regular log + text = f"task: {task_id}, host: {host_id}, location: {location}, message: {msg}" + logging.getLogger(logger_name).log(level, text) + # log to db + insert = InsertProcessor( + 'scheduler_log_messages', + data={ + 'logger_name': logger_name, + 'level': logging._levelToName[level], + 'task_id': task_id, + 'host_id': host_id, + 'location': location, + 'msg': msg, + } + ) + insert.execute() + + debug = functools.partialmethod(log, level=logging.DEBUG) + info = functools.partialmethod(log, level=logging.INFO) + warning = functools.partialmethod(log, level=logging.WARNING) + error = functools.partialmethod(log, level=logging.ERROR) + critical = functools.partialmethod(log, level=logging.CRITICAL) diff --git a/tests/test_hub/test_scheduler.py b/tests/test_hub/test_scheduler.py new file mode 100644 index 0000000..a1359bd --- /dev/null +++ b/tests/test_hub/test_scheduler.py @@ -0,0 +1,67 @@ +import logging +import mock +import unittest + +import scheduler + +IP = scheduler.InsertProcessor + + +class TestDBLogger(unittest.TestCase): + def setUp(self): + self.InsertProcessor = mock.patch('scheduler.InsertProcessor', + side_effect=self.getInsert).start() + self.inserts = [] + + def tearDown(self): + mock.patch.stopall() + + def getInsert(self, *args, **kwargs): + insert = IP(*args, **kwargs) + insert.execute = mock.MagicMock() + self.inserts.append(insert) + return insert + + def test_defaults(self): + logger = scheduler.DBLogger() + self.assertEqual(logger.logger, 'koji.scheduler') + self.assertEqual(len(self.inserts), 0) + + def test_basic(self): + logger = scheduler.DBLogger() + logger.log("text") + self.assertEqual(len(self.inserts), 1) + insert = self.inserts[0] + self.assertEqual(insert.table, 'scheduler_log_messages') + self.assertEqual(insert.data, { + 'task_id': None, + 'host_id': None, + 'logger_name': 'koji.scheduler', + 'level': 'NOTSET', + 'location': None, + 'text': 'text', + }) + + def test_all(self): + logger = scheduler.DBLogger() + logger.log("text", logger_name="logger_name", level=logging.ERROR, + task_id=123, host_id=456, location="location") + self.assertEqual(len(self.inserts), 1) + insert = self.inserts[0] + self.assertEqual(insert.data, { + 'task_id': 123, + 'host_id': 456, + 'logger_name': 'logger_name', + 'level': 'ERROR', + 'location': 'location', + 'text': 'text', + }) + + def test_levels(self): + logger = scheduler.DBLogger() + for level in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'): + m = getattr(logger, level.lower()) + m("") + insert = self.inserts[0] + self.assertEqual(insert.data['level'], level) + self.inserts = [] diff --git a/util/koji-sweep-db b/util/koji-sweep-db index 0c54e85..de4a57d 100755 --- a/util/koji-sweep-db +++ b/util/koji-sweep-db @@ -147,6 +147,21 @@ def clean_buildroots(cursor, vacuum, test): cursor.execute("VACUUM ANALYZE buildroot") +def clean_scheduler_logs(cursor, vacuum, test, age): + clauses = f"(msg_time < NOW() - '{age:d} days'::interval)" + if options.verbose: + query = QueryProcessor(tables=["scheduler_log_messages"], + clauses=clauses, + opts={'countOnly': True}) + rows = query.execute() + print(f"Deleting {rows} scheduler log messages") + if not test: + delete = DeleteProcessor(table="scheduler_log_messages", clauses=clauses)\ + delete.execute() + if vacuum: + cursor.execute("VACUUM ANALYZE scheduler_log_messages") + + if __name__ == "__main__": global options parser = OptionParser("%prog cleans koji database") @@ -180,6 +195,9 @@ if __name__ == "__main__": parser.add_option('--scratch-builds-age', type=int, dest="scratch_age", action="store", default=730, metavar="DAYS", help="Delete scratch builds' tasks older than this (default: 2 years") + parser.add_option('--logs-age', type=int, + action="store", default=7, metavar="DAYS", + help="Delete scheduler log messages older than this (default: 7 days)") parser.add_option('--buildroots', action="store_true", help="Delete unreferenced buildroots") parser.add_option('-f', '--force', action="store_true", @@ -240,6 +258,7 @@ if __name__ == "__main__": clean_sessions(cursor, options.vacuum, options.test, options.sessions_age, options.sessions_absolute_age) clean_reservations(cursor, options.vacuum, options.test, options.reservations_age) + clean_scheduler_logs(cursor, options.vacuum, options.test, options.logs_age) if options.tag_notifications: clean_notification_tasks(cursor, options.vacuum, options.test, age=options.tag_notifications_age) From 73c621cd58e32afbb3280cc72c2ff5008ca59dd4 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 14 2022 11:34:18 +0000 Subject: [PATCH 2/20] Merge branch 'scheduler-api' into scheduler-all --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index e3dabba..c78f230 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7744,6 +7744,62 @@ def anon_handle_userinfo(goptions, session, args): print('') +def anon_handle_scheduler_info(goptions, session, args): + """[monitor] Show information about scheduling""" + usage = "usage: %prog schedulerinfo [options]" + parser = OptionParser(usage=get_usage_str(usage)) + parser.add_option("-t", "--task", action="store", type=int, default=None, + help="Limit data to given task id") + parser.add_option("--host", action="store", default=None, + help="Limit data to given builder (name/id)") + parser.add_option("--state", action="store", type='str', default=None, + choices=[x for x in koji.TASK_STATES.keys()], + help="Limit data to task state") + (options, args) = parser.parse_args(args) + if len(args) > 0: + parser.error("This command takes no arguments") + + ensure_connection(session, goptions) + + host_id = None + if options.host: + try: + host_id = int(options.host) + except ValueError: + host_id = session.getHost(options.host, strict=True)['id'] + + if options.state: + state = koji.TASK_STATES[options.state] + else: + state = None + + # get the data + runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, state=state) + mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s' + if not goptions.quiet: + header = mask % { + 'task_id': 'Task', + 'host_name': 'Host', + 'state': 'State', + 'create_time': 'Created', + 'start_time': 'Started', + 'end_time': 'Ended' + } + print(header) + print('-' * len(header)) + for run in runs: + run['state'] = koji.TASK_STATES[runs['state']] + print(mask % run) + + if host_id: + print('Host data for %s:' % options.host) + host_data = session.scheduler.getHostData(hostID=host_id) + if len(host_data) > 0: + print(host_data[0]['data']) + else: + print('-') + + def handle_scheduler_logs(goptions, session, args): "[monitor] Query scheduler logs" usage = "usage: %prog scheduler-logs " @@ -7801,3 +7857,4 @@ def handle_scheduler_logs(goptions, session, args): for log in logs: print(mask % log) + diff --git a/docs/schema.sql b/docs/schema.sql index 1b689cb..3f9aadf 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -972,4 +972,23 @@ CREATE INDEX scheduler_log_messages_host_id ON scheduler_log_messages(host_id); CREATE INDEX scheduler_log_messages_msg_time ON scheduler_log_messages(msg_time); CREATE INDEX scheduler_log_messages_level ON scheduler_log_messages(level); +CREATE TABLE scheduler_task_runs ( + id SERIAL NOT NULL PRIMARY KEY, + task_id INTEGER REFERENCES task (id) NOT NULL, + host_id INTEGER REFERENCES host (id) NOT NULL, + state INTEGER NOT NULL, + create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, +) WITHOUT OIDS; +CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id); +CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id); +CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state); +CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time); + +CREATE TABLE scheduler_host_data ( + host_id INTEGER REFERENCES host (id) PRIMARY KEY, + data JSONB, +) WITHOUT OIDS; + COMMIT WORK; diff --git a/koji/__init__.py b/koji/__init__.py index e1422d9..974d2bf 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -193,6 +193,8 @@ TASK_STATES = Enum(( 'CANCELED', 'ASSIGNED', 'FAILED', + 'SCHEDULED', + 'REFUSED', )) BUILD_STATES = Enum(( diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index ea1160b..2444d50 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -95,6 +95,7 @@ from koji.db import ( # noqa: F401 logger = logging.getLogger('koji.hub') +sched_logger = scheduler.DBLogger() NUMERIC_TYPES = (int, float) @@ -315,10 +316,12 @@ class Task(object): else: return None - def free(self): + def free(self, newstate=koji.TASK_STATES['FREE']): """Free a task""" + if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]: + raise koji.GenericError("Can't be called with other than FREE/REFUSED states") info = self.getInfo(request=True) - self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE']) + self.runCallbacks('preTaskStateChange', info, 'state', newstate) self.runCallbacks('preTaskStateChange', info, 'host_id', None) # access checks should be performed by calling function query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'], @@ -327,14 +330,13 @@ class Task(object): if not oldstate: raise koji.GenericError("No such task: %i" % self.id) if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']: - raise koji.GenericError("Cannot free task %i, state is %s" % + raise koji.GenericError("Cannot free/refuse task %i, state is %s" % (self.id, koji.TASK_STATES[oldstate])) - newstate = koji.TASK_STATES['FREE'] newhost = None update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id}, data={'state': newstate, 'host_id': newhost}) update.execute() - self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE']) + self.runCallbacks('postTaskStateChange', info, 'state', newstate) self.runCallbacks('postTaskStateChange', info, 'host_id', None) return True @@ -2537,44 +2539,6 @@ def set_channel_enabled(channelname, enabled=True, comment=None): update.execute() -def get_ready_hosts(): - """Return information about hosts that are ready to build. - - Hosts set the ready flag themselves - Note: We ignore hosts that are late checking in (even if a host - is busy with tasks, it should be checking in quite often). - """ - query = QueryProcessor( - tables=['host'], - columns=['host.id', 'name', 'arches', 'task_load', 'capacity'], - aliases=['id', 'name', 'arches', 'task_load', 'capacity'], - clauses=[ - 'enabled IS TRUE', - 'ready IS TRUE', - 'expired IS FALSE', - 'master IS NULL', - 'active IS TRUE', - "update_time > NOW() - '5 minutes'::interval" - ], - joins=[ - 'sessions USING (user_id)', - 'host_config ON host.id = host_config.host_id' - ] - ) - hosts = query.execute() - for host in hosts: - query = QueryProcessor( - tables=['host_channels'], - columns=['channel_id'], - clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'], - joins=['channels ON host_channels.channel_id = channels.id'], - values=host - ) - rows = query.execute() - host['channels'] = [row['channel_id'] for row in rows] - return hosts - - def get_all_arches(): """Return a list of all (canonical) arches available from hosts""" ret = {} @@ -2590,27 +2554,6 @@ def get_all_arches(): return list(ret.keys()) -def get_active_tasks(host=None): - """Return data on tasks that are yet to be run""" - fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time'] - values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED')) - if host: - values['arches'] = host['arches'].split() + ['noarch'] - values['channels'] = host['channels'] - values['host_id'] = host['id'] - clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)' - if values['channels']: - clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \ -AND channel_id IN %(channels)s)''' - clauses = [clause] - else: - clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)'] - queryOpts = {'limit': 100, 'order': 'priority,create_time'} - query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses, - values=values, opts=queryOpts) - return query.execute() - - def get_task_descendents(task, childMap=None, request=False): if childMap is None: childMap = {} @@ -14095,16 +14038,9 @@ class Host(object): This data is relatively small and the necessary load analysis is relatively complex, so we let the host machines crunch it.""" - hosts = get_ready_hosts() - for host in hosts: - if host['id'] == self.id: - break - else: - # this host not in ready list - return [[], []] - # host is the host making the call - tasks = get_active_tasks(host) - return [hosts, tasks] + host = get_host(self.id) + tasks = scheduler.getTaskRuns(hostID=self.id) + return [[host], tasks] def getTask(self): """Open next available task and return it""" @@ -14222,6 +14158,66 @@ class HostExports(object): task.assertHost(host.id) return task.setWeight(weight) + def setHostData(self, hostdata): + """Builder will update all its resources + + Initial implementation contains: + - available task methods + - maxjobs + - host readiness + """ + host = Host() + host.verify() + clauses = ['host_id = %(host_id)i'] + values = {'host_id': host.id} + table = 'scheduler_host_data' + query = QueryProcessor(tables=[table], clauses=clauses, values=values, + opts={'countOnly': True}) + if query.singleValue() > 0: + update = UpdateProcessor(table=table, data={'data': hostdata}, + clauses=clauses, values=values) + update.execute() + else: + insert = InsertProcessor(table=table, data={'data': hostdata}, + clauses=clauses, values=values) + insert.execute() + sched_logger.debug(f"Updating host data with: {hostdata}", + host_id=host.id, location='setHostData') + + def getTasks(self): + host = Host() + host.verify() + + query = QueryProcessor( + tables=['scheduler_task_runs'], + clauses=[ + 'host_id = %(host_id)s', + 'state in %(states)s' + ], + values={ + 'host_id': host.id, + 'states': [ + koji.TASK_STATES['SCHEDULED'], + koji.TASK_STATES['ASSIGNED'], + ], + } + ) + tasks = query.execute() + for task in tasks: + sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'], + location="getTasks") + return tasks + + def refuseTask(self, task_id): + host = Host() + host.verify() + + task = Task(task_id) + task.free(newstate=koji.TASK_STATES['REFUSED']) + sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id, + location="refuseTask") + return True + def getHostTasks(self): host = Host() host.verify() diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index eb4ee58..65c7152 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -710,6 +710,7 @@ def setup_logging2(opts): log_handler.setFormatter(HubFormatter(opts['LogFormat'])) + import scheduler def get_memory_usage(): pagesize = resource.getpagesize() statm = [pagesize * int(y) // 1024 From 237f6ebc3ff33dccde95e4e9d6602f64875af347 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 14 2022 11:53:27 +0000 Subject: [PATCH 3/20] fix aftermerge issues --- diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 2444d50..dc92327 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -92,6 +92,7 @@ from koji.db import ( # noqa: F401 nextval, currval, ) +from . import scheduler logger = logging.getLogger('koji.hub') diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index 65c7152..eb4ee58 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -710,7 +710,6 @@ def setup_logging2(opts): log_handler.setFormatter(HubFormatter(opts['LogFormat'])) - import scheduler def get_memory_usage(): pagesize = resource.getpagesize() statm = [pagesize * int(y) // 1024 From ce0ee090327e5423c015da0c2746c7ddc4d7c3a9 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 14 2022 14:31:19 +0000 Subject: [PATCH 4/20] wip --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index c78f230..69bbaa0 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7753,7 +7753,6 @@ def anon_handle_scheduler_info(goptions, session, args): parser.add_option("--host", action="store", default=None, help="Limit data to given builder (name/id)") parser.add_option("--state", action="store", type='str', default=None, - choices=[x for x in koji.TASK_STATES.keys()], help="Limit data to task state") (options, args) = parser.parse_args(args) if len(args) > 0: @@ -7769,17 +7768,17 @@ def anon_handle_scheduler_info(goptions, session, args): host_id = session.getHost(options.host, strict=True)['id'] if options.state: - state = koji.TASK_STATES[options.state] + states = [koji.TASK_STATES[options.state]] else: - state = None + states = None # get the data - runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, state=state) + runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, states=states) mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s' if not goptions.quiet: header = mask % { 'task_id': 'Task', - 'host_name': 'Host', + 'host_id': 'Host', 'state': 'State', 'create_time': 'Created', 'start_time': 'Started', @@ -7788,7 +7787,7 @@ def anon_handle_scheduler_info(goptions, session, args): print(header) print('-' * len(header)) for run in runs: - run['state'] = koji.TASK_STATES[runs['state']] + run['state'] = koji.TASK_STATES[run['state']] print(mask % run) if host_id: @@ -7799,7 +7798,7 @@ def anon_handle_scheduler_info(goptions, session, args): else: print('-') - + def handle_scheduler_logs(goptions, session, args): "[monitor] Query scheduler logs" usage = "usage: %prog scheduler-logs " diff --git a/docs/schema.sql b/docs/schema.sql index 3f9aadf..86c70f8 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -979,7 +979,7 @@ CREATE TABLE scheduler_task_runs ( state INTEGER NOT NULL, create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), start_time TIMESTAMPTZ, - end_time TIMESTAMPTZ, + end_time TIMESTAMPTZ ) WITHOUT OIDS; CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id); CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id); @@ -988,7 +988,8 @@ CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time) CREATE TABLE scheduler_host_data ( host_id INTEGER REFERENCES host (id) PRIMARY KEY, - data JSONB, + data JSONB ) WITHOUT OIDS; +CREATE INDEX scheduler_host_data_host ON scheduler_host_data(host_id); COMMIT WORK; diff --git a/koji/daemon.py b/koji/daemon.py index 41ac10c..490e46d 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -1024,123 +1024,21 @@ class TaskManager(object): if not self.ready: self.logger.info("Not ready for task") return False - hosts, tasks = self.session.host.getLoadData() - self.logger.debug("Load Data:") - self.logger.debug(" hosts: %r" % hosts) - self.logger.debug(" tasks: %r" % tasks) - # now we organize this data into channel-arch bins - bin_hosts = {} # hosts indexed by bin - bins = {} # bins for this host - our_avail = None - for host in hosts: - host['bins'] = [] - if host['id'] == self.host_id: - # note: task_load reported by server might differ from what we - # sent due to precision variation - our_avail = host['capacity'] - host['task_load'] - for chan in host['channels']: - for arch in host['arches'].split() + ['noarch']: - bin = "%s:%s" % (chan, arch) - bin_hosts.setdefault(bin, []).append(host) - if host['id'] == self.host_id: - bins[bin] = 1 - self.logger.debug("bins: %r" % bins) - if our_avail is None: - self.logger.info("Server did not report this host. Are we disabled?") + tasks = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id']) + if not tasks: return False - elif not bins: - self.logger.info("No bins for this host. Missing channel/arch config?") - # Note: we may still take an assigned task below - # sort available capacities for each of our bins - avail = {} - for bin in bins: - avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]] - avail[bin].sort() - avail[bin].reverse() - self.cleanDelayTimes() - for task in tasks: - # note: tasks are in priority order - self.logger.debug("task: %r" % task) - if task['method'] not in self.handlers: - self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task) - continue - if task['id'] in self.tasks: - # we were running this task, but it apparently has been - # freed or reassigned. We can't do anything with it until - # updateTasks notices this and cleans up. - self.logger.debug("Task %(id)s freed or reassigned", task) - continue - if task['state'] == koji.TASK_STATES['ASSIGNED']: - self.logger.debug("task is assigned") - if self.host_id == task['host_id']: - # assigned to us, we can take it regardless - if self.takeTask(task): - return True - elif task['state'] == koji.TASK_STATES['FREE']: - bin = "%(channel_id)s:%(arch)s" % task - self.logger.debug("task is free, bin=%r" % bin) - if bin not in bins: - continue - # see where our available capacity is compared to other hosts for this bin - # (note: the hosts in this bin are exactly those that could - # accept this task) - bin_avail = avail.get(bin, [0]) - if self.checkAvailDelay(task, bin_avail, our_avail): - # decline for now and give the upper half a chance - continue - # otherwise, we attempt to open the task - if self.takeTask(task): - return True - else: - # should not happen - raise Exception("Invalid task state reported by server") - return False - - def checkAvailDelay(self, task, bin_avail, our_avail): - """Check to see if we should still delay taking a task - - Returns True if we are still in the delay period and should skip the - task. Otherwise False (delay has expired). - """ - - now = time.time() - ts = self.skipped_tasks.get(task['id']) - if not ts: - ts = self.skipped_tasks[task['id']] = now - - # determine our normalized bin rank - for pos, cap in enumerate(bin_avail): - if our_avail >= cap: - break - if len(bin_avail) > 1: - rank = float(pos) / (len(bin_avail) - 1) - else: - rank = 0.0 - # so, 0.0 for highest available capacity, 1.0 for lowest - - delay = getattr(self.options, 'task_avail_delay', 180) - delay *= rank - - # return True if we should delay - if now - ts < delay: - self.logger.debug("skipping task %i, age=%s rank=%s" - % (task['id'], int(now - ts), rank)) - return True - # otherwise - del self.skipped_tasks[task['id']] - return False - - def cleanDelayTimes(self): - """Remove old entries from skipped_tasks""" - now = time.time() - delay = getattr(self.options, 'task_avail_delay', 180) - cutoff = now - delay * 10 - # After 10x the delay, we've had plenty of opportunity to take the - # task, so either it has already been taken or we can't take it. - for task_id in list(self.skipped_tasks): - ts = self.skipped_tasks[task_id] - if ts < cutoff: - del self.skipped_tasks[task_id] + task = self.session.getTaskInfo(tasks[0]['task_id']) + if task['method'] not in self.handlers: + self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task) + return False + if task['id'] in self.tasks: + # we were running this task, but it apparently has been + # freed or reassigned. We can't do anything with it until + # updateTasks notices this and cleans up. + self.logger.debug("Task %(id)s freed or reassigned", task) + return False + self.takeTask(task) + return True def _waitTask(self, task_id, pid=None): """Wait (nohang) on the task, return true if finished""" diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index dc92327..3fce76f 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -288,6 +288,16 @@ class Task(object): if state == koji.TASK_STATES['OPEN']: update.rawset(start_time='NOW()') update.execute() + + update = UpdateProcessor( + table='scheduler_task_runs', + clauses=['task_id = %(task_id)i', 'host_id = %(host_id)i'], + values={'task_id': task_id, 'host_id': host_id}, + data={'state': state}, + ) + if state == koji.TASK_STATES['OPEN']: + update.rawset(start_time='NOW()') + update.execute() self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES[newstate]) self.runCallbacks('postTaskStateChange', info, 'host_id', host_id) return True @@ -744,6 +754,7 @@ def make_task(method, arglist, **opts): opts['id'] = task_id koji.plugin.run_callbacks( 'postTaskStateChange', attribute='state', old=None, new='FREE', info=opts) + scheduler.schedule(task_id=task_id) return task_id @@ -14040,7 +14051,12 @@ class Host(object): This data is relatively small and the necessary load analysis is relatively complex, so we let the host machines crunch it.""" host = get_host(self.id) - tasks = scheduler.getTaskRuns(hostID=self.id) + query = QueryProcessor(tables=['host_channels'], columns=['channel_id'], + clauses=['host_id = %(id)s', 'active IS TRUE'], + values={'id': self.id}, + opts={'asList': True}) + host['channels'] = [x[0] for x in query.execute()] + tasks = scheduler.get_task_runs(hostID=self.id) return [[host], tasks] def getTask(self): @@ -14189,21 +14205,9 @@ class HostExports(object): host = Host() host.verify() - query = QueryProcessor( - tables=['scheduler_task_runs'], - clauses=[ - 'host_id = %(host_id)s', - 'state in %(states)s' - ], - values={ - 'host_id': host.id, - 'states': [ - koji.TASK_STATES['SCHEDULED'], - koji.TASK_STATES['ASSIGNED'], - ], - } - ) - tasks = query.execute() + tasks = scheduler.get_task_runs(hostID=host.id, + states=[koji.TASK_STATES['ASSIGNED'], + koji.TASK_STATES['SCHEDULED']]) for task in tasks: sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'], location="getTasks") diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index eebb6d7..d727848 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -1,10 +1,99 @@ import functools import logging -from koji.db import InsertProcessor, QueryProcessor +import koji +from koji.db import InsertProcessor, QueryProcessor, DeleteProcessor + +logger = logging.getLogger('koji.scheduler') + + +def drop_from_queue(task_id): + """Delete scheduled run without checking its existence""" + delete = DeleteProcessor( + table='scheduler_task_runs', + clauses=['task_id = %(task_id)i'], + values={'task_id': task_id}, + ) + delete.execute() + + +def get_host_data(hostID=None): + """Return actual builder data + + :param int hostID: Return data for given host (otherwise for all) + :returns list[dict]: list of host_id/data dicts + """ + clauses = [] + columns = ['host_id', 'data'] + if hostID is not None: + clauses.append('host_id = %(hostID)i') + query = QueryProcessor( + tables=['scheduler_host_data'], + clauses=clauses, + columns=columns, + values=locals(), + opts={'order': 'id'} + ) + + return query.execute() + + +def get_task_runs(taskID=None, hostID=None, states=None): + """Return content of scheduler queue + + :param int taskID: filter by task + :param int hostID: filter by host + :param list[int] states: filter by states + :returns list[dict]: list of dicts + """ + + columns = ['task_id', 'host_id', 'state', 'create_time', 'start_time', 'end_time'] + clauses = [] + if taskID is not None: + clauses.append('task_id = %(taskID)i') + if hostID is not None: + clauses.append('host_id = %(hostID)i') + if states is not None: + clauses.append('states IN %(states)s') + + query = QueryProcessor( + tables=['scheduler_task_runs'], columns=columns, + clauses=clauses, values=locals() + ) + return query.execute() + + +def schedule(task_id=None): + """Run scheduler""" + + # stupid for now, just add new task to first builder + query = QueryProcessor( + tables=['host'], + columns=['id'], + joins=['host_config ON host.id=host_config.host_id'], + clauses=['enabled IS TRUE'], + opts={'limit': 1} + ) + logger.error('xxxxxxxxxxxxxxx %s', str(query)) + host = query.executeOne() + if not host: + return + + insert = InsertProcessor( + table='scheduler_task_runs', + data={ + 'task_id': task_id, + 'host_id': host['id'], + 'state': koji.TASK_STATES['SCHEDULED'], + } + ) + insert.execute() class SchedulerExports(): + getTaskRuns = staticmethod(get_task_runs) + getHostData = staticmethod(get_host_data) + def getLogs(self, taskID=None, hostID=None, level=None, from_ts=None, to_ts=None, logger_name=None): """Return all related log messages From 7b8cdf641ac1894a43617dc5eb070798dec0fb10 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 15 2022 15:01:13 +0000 Subject: [PATCH 5/20] wip --- diff --git a/koji/daemon.py b/koji/daemon.py index 490e46d..268714c 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -1024,7 +1024,8 @@ class TaskManager(object): if not self.ready: self.logger.info("Not ready for task") return False - tasks = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id']) + tasks = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id'], + states=[koji.TASK_STATES['SCHEDULED']]) if not tasks: return False task = self.session.getTaskInfo(tasks[0]['task_id']) diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 3fce76f..216ce71 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -291,7 +291,11 @@ class Task(object): update = UpdateProcessor( table='scheduler_task_runs', - clauses=['task_id = %(task_id)i', 'host_id = %(host_id)i'], + clauses=[ + 'task_id = %(task_id)i', + 'host_id = %(host_id)i', + 'id = max(id)', + ], values={'task_id': task_id, 'host_id': host_id}, data={'state': state}, ) @@ -400,6 +404,12 @@ class Task(object): data={'result': info['result'], 'state': state}, rawdata={'completion_time': 'NOW()'}) update.execute() + update = UpdateProcessor('scheduler_task_runs', + clauses=['task_id = %(task_id)i', 'host_id = %(host_id)i'], + data={'state': state}, + rawdata={'end_time': 'NOW()'}, + values={'task_id': self.id, 'host_id': info['host_id']}) + update.execute() self.runCallbacks('postTaskStateChange', info, 'state', state) self.runCallbacks('postTaskStateChange', info, 'completion_ts', now) @@ -14123,6 +14133,7 @@ class HostExports(object): host = Host() host.verify() host.updateHost(task_load, ready) + scheduler.schedule() def getLoadData(self): host = Host() diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index d727848..10f18ae 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -2,7 +2,13 @@ import functools import logging import koji -from koji.db import InsertProcessor, QueryProcessor, DeleteProcessor +from koji.db import ( + BulkInsertProcessor, + DeleteProcessor, + InsertProcessor, + QueryProcessor, + UpdateProcessor, +) logger = logging.getLogger('koji.scheduler') @@ -47,18 +53,18 @@ def get_task_runs(taskID=None, hostID=None, states=None): :returns list[dict]: list of dicts """ - columns = ['task_id', 'host_id', 'state', 'create_time', 'start_time', 'end_time'] + columns = ['id', 'task_id', 'host_id', 'state', 'create_time', 'start_time', 'end_time'] clauses = [] if taskID is not None: clauses.append('task_id = %(taskID)i') if hostID is not None: clauses.append('host_id = %(hostID)i') if states is not None: - clauses.append('states IN %(states)s') + clauses.append('state IN %(states)s') query = QueryProcessor( tables=['scheduler_task_runs'], columns=columns, - clauses=clauses, values=locals() + clauses=clauses, values=locals(), opts={'order': 'id'}, ) return query.execute() @@ -67,26 +73,65 @@ def schedule(task_id=None): """Run scheduler""" # stupid for now, just add new task to first builder + logger.error("SCHEDULER RUN") query = QueryProcessor( tables=['host'], columns=['id'], joins=['host_config ON host.id=host_config.host_id'], clauses=['enabled IS TRUE'], - opts={'limit': 1} + opts={'asList': True}, ) - logger.error('xxxxxxxxxxxxxxx %s', str(query)) - host = query.executeOne() - if not host: + hosts = [x[0] for x in query.execute()] + if not hosts: return - insert = InsertProcessor( - table='scheduler_task_runs', - data={ - 'task_id': task_id, + # FAIL inconsistent runs + query = QueryProcessor( + tables=['scheduler_task_runs', 'task'], + columns=['scheduler_task_runs.id'], + clauses=[ + 'task.id = scheduler_task_runs.task_id', + 'scheduler_task_runs.state = %(state)s', + 'scheduler_task_runs.state != task.state', + ], + values={'state': koji.TASK_STATES['OPEN']}, + opts={'asList': True}, + ) + run_ids = [x[0] for x in query.execute()] + if run_ids: + update = UpdateProcessor( + table='scheduler_task_runs', + clauses=['id IN %(run_ids)s'], + values={'run_ids': run_ids}, + data={'state': koji.TASK_STATES['FAILED']}, + rawdata={'end_time': 'NOW()'}, + ) + update.execute() + + # add unscheduled tasks + data = [] + if not task_id: + query = QueryProcessor( + columns=['id'], + tables=['task'], + clauses=[ + 'state IN %(states)s', + 'id NOT IN (SELECT task_id FROM scheduler_task_runs WHERE state = 6)' + ], + values={'states': [koji.TASK_STATES['FREE'], koji.TASK_STATES['ASSIGNED']]}, + opts={'asList': True} + ) + task_ids = [x[0] for x in query.execute()] + else: + task_ids = [task_id] + + for task in task_ids: + data.append({ 'host_id': host['id'], + 'task_id': task[0], 'state': koji.TASK_STATES['SCHEDULED'], - } - ) + }) + insert = BulkInsertProcessor(table='scheduler_task_runs', data=data) insert.execute() From 16a9e62cd37f6879f3d2854216fe371ca4f55897 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 19 2022 13:48:25 +0000 Subject: [PATCH 6/20] wip --- diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 216ce71..5ec25e3 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -289,12 +289,12 @@ class Task(object): update.rawset(start_time='NOW()') update.execute() + # update last task run update = UpdateProcessor( table='scheduler_task_runs', clauses=[ - 'task_id = %(task_id)i', - 'host_id = %(host_id)i', - 'id = max(id)', + 'id = (SELECT MAX(id) FROM scheduler_task_runs ' + 'WHERE task_id = %(task_id)i AND host_id = %(host_id)i)', ], values={'task_id': task_id, 'host_id': host_id}, data={'state': state}, diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 10f18ae..c08a008 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -1,5 +1,6 @@ import functools import logging +import random import koji from koji.db import ( @@ -125,10 +126,10 @@ def schedule(task_id=None): else: task_ids = [task_id] - for task in task_ids: + for task_id in task_ids: data.append({ - 'host_id': host['id'], - 'task_id': task[0], + 'host_id': random.choice(hosts), + 'task_id': task_id, 'state': koji.TASK_STATES['SCHEDULED'], }) insert = BulkInsertProcessor(table='scheduler_task_runs', data=data) From f7afb213e822e9a50ccca3e7f1a8eea1c56eea0b Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 19 2022 16:20:33 +0000 Subject: [PATCH 7/20] wip --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index 69bbaa0..f3c54ff 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7808,7 +7808,6 @@ def handle_scheduler_logs(goptions, session, args): parser.add_option("--host", type="str", action="store", help="Filter by host (name/ID)") parser.add_option("--level", type="str", action="store", - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Filter by message level") parser.add_option("--from", type="float", action="store", dest="from_ts", help="Logs from given timestamp") diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 5ec25e3..a565e7f 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -7370,7 +7370,6 @@ def add_archive_type(name, description, extensions, compression_type=None): :param str description: eg. "YAML Ain't Markup Language" :param str extensions: space-separated list of descriptions, eg. "yaml yml" """ - print(context) context.session.assertPerm('admin') verify_name_internal(name) convert_value(description, cast=str, check_only=True) diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index c08a008..7b3ca82 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -1,6 +1,6 @@ import functools +import inspect import logging -import random import koji from koji.db import ( @@ -14,6 +14,57 @@ from koji.db import ( logger = logging.getLogger('koji.scheduler') +class HostHashTable(object): + """multiindexed host table for fast filtering""" + def __init__(self, hosts=None): + self.arches = {} + self.channels = {} + self.hosts = {} + self.host_ids = set() + if hosts is None: + hosts = get_ready_hosts() + for hostinfo in hosts: + self.add_host(hostinfo) + + def add_host(self, hostinfo): + host_id = hostinfo['id'] + # priority is based on available capacity + hostinfo['priority'] = hostinfo['capacity'] - hostinfo['task_load'] + # but builders running zero tasks should be always better fit + if hostinfo['task_load'] == 0: + # TODO: better heuristic? + hostinfo['priority'] += 100 + + self.hosts[host_id] = hostinfo + self.host_ids.add(host_id) + for arch in hostinfo['arches']: + self.arches.setdefault(arch, set()).add(host_id) + for channel in hostinfo['channels']: + self.channels.setdefault(channel, set()).add(host_id) + + def get(self, task): + # filter by requirements + host_ids = set(self.host_ids) + if task.get('arch') is not None: + host_ids &= self.arches[task['arch']] + if task.get('channel_id') is not None: + host_ids &= self.channels[task['channel_id']] + + # select best from filtered + hosts = [self.hosts[host_id] for host_id in host_ids] + hosts = sorted(hosts, key=lambda x: -x['priority']) + if not hosts: + return None + + host = hosts[0] + # TODO: lower capacity by some heuritics + # TODO: reduce resources (reserved memory, cpus) + # TODO: reduce capacity by ?, placeholder 1.5 as for buildArch, + # otherwise it is high chance that it could be overcomitted + self.hosts[host['id']]['task_load'] += 1.5 + return host + + def drop_from_queue(task_id): """Delete scheduled run without checking its existence""" delete = DeleteProcessor( @@ -70,23 +121,57 @@ def get_task_runs(taskID=None, hostID=None, states=None): return query.execute() -def schedule(task_id=None): - """Run scheduler""" +def get_ready_hosts(): + """Return information about hosts that are ready to build. - # stupid for now, just add new task to first builder - logger.error("SCHEDULER RUN") + Hosts set the ready flag themselves + Note: We ignore hosts that are late checking in (even if a host + is busy with tasks, it should be checking in quite often). + + host dict contains: + - id + - name + - list(arches) + - task_load + - capacity + - list(channels) (ids) + - [resources] + """ query = QueryProcessor( tables=['host'], - columns=['id'], - joins=['host_config ON host.id=host_config.host_id'], - clauses=['enabled IS TRUE'], - opts={'asList': True}, + columns=['host.id', 'name', 'arches', 'task_load', 'capacity'], + aliases=['id', 'name', 'arches', 'task_load', 'capacity'], + clauses=[ + 'enabled IS TRUE', + 'ready IS TRUE', + 'expired IS FALSE', + 'master IS NULL', + 'active IS TRUE', + "update_time > NOW() - '5 minutes'::interval" + ], + joins=[ + 'sessions USING (user_id)', + 'host_config ON host.id = host_config.host_id' + ] ) - hosts = [x[0] for x in query.execute()] - if not hosts: - return + hosts = query.execute() + for host in hosts: + query = QueryProcessor( + tables=['host_channels'], + columns=['channel_id'], + clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'], + joins=['channels ON host_channels.channel_id = channels.id'], + values=host, + opts={'asList': True}, + ) + rows = query.execute() + host['channels'] = [row[0] for row in rows] + host['arches'] = host['arches'].split() + ['noarch'] + return hosts - # FAIL inconsistent runs + +def clean_scheduler_queue(): + # FAIL inconsistent runs, but not tasks query = QueryProcessor( tables=['scheduler_task_runs', 'task'], columns=['scheduler_task_runs.id'], @@ -99,6 +184,20 @@ def schedule(task_id=None): opts={'asList': True}, ) run_ids = [x[0] for x in query.execute()] + # FAIL (timeout) also runs which are scheduled for too long and were not picked + # by their respective workers, try to find new builders for them + query = QueryProcessor( + tables=['scheduler_task_runs'], + columns=['id'], + clauses=[ + "create_time < NOW() + '5 minutes'::interval", + "state = %(state)i", + ], + values={'state': koji.TASK_STATES['SCHEDULED']}, + opts={'asList': True}, + ) + # TODO: does it make sense to have TIMEOUTED state for runs? + run_ids += [x[0] for x in query.execute()] if run_ids: update = UpdateProcessor( table='scheduler_task_runs', @@ -109,30 +208,53 @@ def schedule(task_id=None): ) update.execute() - # add unscheduled tasks - data = [] + +def schedule(task_id=None): + """Run scheduler""" + + # stupid for now, just add new task to random builder + logger.error("SCHEDULER RUN") + hosts = HostHashTable() + if not hosts.hosts: + # early fail if there is nothing available + return + + # find unscheduled tasks + columns = ['id', 'arch', 'method', 'channel_id', 'priority'] if not task_id: + clean_scheduler_queue() query = QueryProcessor( - columns=['id'], - tables=['task'], + tables=['task'], columns=columns, clauses=[ 'state IN %(states)s', 'id NOT IN (SELECT task_id FROM scheduler_task_runs WHERE state = 6)' ], values={'states': [koji.TASK_STATES['FREE'], koji.TASK_STATES['ASSIGNED']]}, - opts={'asList': True} + opts={'order': '-priority'}, ) - task_ids = [x[0] for x in query.execute()] else: - task_ids = [task_id] + query = QueryProcessor( + tables=['task'], columns=columns, + clauses=['id = %(id)i'], values={'id': task_id}, + opts={'order': '-priority'}, + ) + tasks = list(query.execute()) - for task_id in task_ids: - data.append({ - 'host_id': random.choice(hosts), - 'task_id': task_id, + # assign them to random builders fulfiling criteria in priority order + runs = [] + for task in tasks: + host = hosts.get(task) + if not host: + # TODO: log that there is not available builder + dblogger.warning("Can't find adequate builder", task_id=task['id']) + continue + runs.append({ + 'host_id': host['id'], + 'task_id': task['id'], 'state': koji.TASK_STATES['SCHEDULED'], }) - insert = BulkInsertProcessor(table='scheduler_task_runs', data=data) + dblogger.info("Scheduling", task_id=task['id'], host_id=host['id']) + insert = BulkInsertProcessor(table='scheduler_task_runs', data=runs) insert.execute() @@ -161,7 +283,7 @@ class SchedulerExports(): ('level', 'level'), ('location', 'location'), ('msg', 'msg'), - ('hosts.name', 'host_name'), + ('host.name', 'host_name'), ) clauses = [] values = {} @@ -187,7 +309,7 @@ class SchedulerExports(): columns, aliases = zip(*fields) query = QueryProcessor(tables=['scheduler_log_messages'], columns=columns, aliases=aliases, - joins=['hosts ON host_id = hosts.id'], + joins=['host ON host_id = host.id'], clauses=clauses, values=values, opts={'order': 'msg_time'}) return query.execute() @@ -207,6 +329,11 @@ class DBLogger(object): task_id=None, host_id=None, location=None): if not logger_name: logger_name = self.logger + if location is None: + frame = inspect.currentframe() + frames = inspect.getouterframes(frame) + frame = frames[1] + location = frame.function # log to regular log text = f"task: {task_id}, host: {host_id}, location: {location}, message: {msg}" logging.getLogger(logger_name).log(level, text) @@ -229,3 +356,6 @@ class DBLogger(object): warning = functools.partialmethod(log, level=logging.WARNING) error = functools.partialmethod(log, level=logging.ERROR) critical = functools.partialmethod(log, level=logging.CRITICAL) + + +dblogger = DBLogger() diff --git a/tests/test_hub/test_scheduler.py b/tests/test_hub/test_scheduler.py index a1359bd..bdea94b 100644 --- a/tests/test_hub/test_scheduler.py +++ b/tests/test_hub/test_scheduler.py @@ -38,8 +38,8 @@ class TestDBLogger(unittest.TestCase): 'host_id': None, 'logger_name': 'koji.scheduler', 'level': 'NOTSET', - 'location': None, - 'text': 'text', + 'location': 'test_basic', + 'msg': 'text', }) def test_all(self): @@ -54,7 +54,7 @@ class TestDBLogger(unittest.TestCase): 'logger_name': 'logger_name', 'level': 'ERROR', 'location': 'location', - 'text': 'text', + 'msg': 'text', }) def test_levels(self): @@ -65,3 +65,43 @@ class TestDBLogger(unittest.TestCase): insert = self.inserts[0] self.assertEqual(insert.data['level'], level) self.inserts = [] + + +class TestHostHashTable(unittest.TestCase): + def test_get(self): + hosts = [ + { + 'id': 1, + 'arches': ['i386', 'x86_64', 'noarch'], + 'channels': [1], + 'capacity': 2.0, + 'task_load': 0.0, + }, + { + 'id': 2, + 'arches': ['i386'], + 'channels': [1, 2], + 'capacity': 2.0, + 'task_load': 0.0, + }, + { + 'id': 3, + 'arches': ['x86_64', 'noarch'], + 'channels': [2], + 'capacity': 3.0, + 'task_load': 0.0, + } + ] + ht = scheduler.HostHashTable(hosts) + + result = ht.get({'arch': 'noarch'}) + self.assertEqual(result['id'], 3) + + result = ht.get({'arch': 'x86_64'}) + self.assertEqual(result['id'], 3) + + result = ht.get({'channel_id': 2}) + self.assertEqual(result['id'], 3) + + result = ht.get({'channel_id': 2, 'arch': 'i386'}) + self.assertEqual(result['id'], 2) From 32b9e673ea42c72148bd8d94417a11993b6cf590 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 22 2022 12:38:12 +0000 Subject: [PATCH 8/20] upload host data --- diff --git a/koji/daemon.py b/koji/daemon.py index 268714c..84fb781 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -1018,28 +1018,51 @@ class TaskManager(object): else: self.logger.info("Lingering task %r (pid %r)" % (id, pid)) + def setHostData(self): + """Report all runtime data to scheduler""" + hostdata = { + 'task_load': self.task_load, + 'ready': self.ready, + 'methods': list(self.handlers.keys()), + 'maxjobs': self.options.maxjobs, + # kernel + # cpu_total + # cpu_available (total - reserved) + # memory_total + # memory_available (total - reserved) + } + self.session.host.setHostData(hostdata) + self.logger.debug("Reported hostdata %s", hostdata) + def getNextTask(self): self.ready = self.readyForTask() self.session.host.updateHost(self.task_load, self.ready) + self.setHostData() if not self.ready: self.logger.info("Not ready for task") return False - tasks = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id'], - states=[koji.TASK_STATES['SCHEDULED']]) - if not tasks: - return False - task = self.session.getTaskInfo(tasks[0]['task_id']) - if task['method'] not in self.handlers: - self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task) - return False - if task['id'] in self.tasks: - # we were running this task, but it apparently has been - # freed or reassigned. We can't do anything with it until - # updateTasks notices this and cleans up. - self.logger.debug("Task %(id)s freed or reassigned", task) - return False - self.takeTask(task) - return True + scheduled = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id'], + states=[koji.TASK_STATES['SCHEDULED']]) + with self.session.multicall() as m: + tasks = [m.getTaskInfo(task['task_id']) for task in scheduled] + + taken = False + for task in tasks: + self.ready = self.readyForTask() + if not self.ready: + self.logger.info("Not ready for task") + break + task = task.result + # last check - it should be already filtered by updateHost + if task['method'] not in self.handlers: + self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task) + if task['id'] in self.tasks: + # we were running this task, but it apparently has been + # freed or reassigned. We can't do anything with it until + # updateTasks notices this and cleans up. + self.logger.debug("Task %(id)s freed or reassigned", task) + self.takeTask(task) + return taken def _waitTask(self, task_id, pid=None): """Wait (nohang) on the task, return true if finished""" diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index a565e7f..373469a 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -14192,21 +14192,27 @@ class HostExports(object): - available task methods - maxjobs - host readiness + + obsoletes updateHost """ host = Host() host.verify() + if 'task_load' in hostdata and 'ready' in hostdata: + self.updateHost(hostdata['task_load'], hostdata['ready']) + # TODO: maybe obsolete it completely and put this data into scheduler_host_data + hostdata = hostdata clauses = ['host_id = %(host_id)i'] values = {'host_id': host.id} table = 'scheduler_host_data' query = QueryProcessor(tables=[table], clauses=clauses, values=values, opts={'countOnly': True}) if query.singleValue() > 0: - update = UpdateProcessor(table=table, data={'data': hostdata}, - clauses=clauses, values=values) + update = UpdateProcessor(table=table, clauses=clauses, values=values, + data={'data': json.dumps(hostdata)}) update.execute() else: - insert = InsertProcessor(table=table, data={'data': hostdata}, - clauses=clauses, values=values) + insert = InsertProcessor(table=table, + data={'host_id': host.id, 'data': json.dumps(hostdata)}) insert.execute() sched_logger.debug(f"Updating host data with: {hostdata}", host_id=host.id, location='setHostData') From 330448b1f91c0a1c00b8fea386e05eeb9c660699 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 22 2022 12:53:38 +0000 Subject: [PATCH 9/20] make hostdata available to scheduler --- diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 7b3ca82..32231ac 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -34,6 +34,10 @@ class HostHashTable(object): if hostinfo['task_load'] == 0: # TODO: better heuristic? hostinfo['priority'] += 100 + # TODO: one query for all hosts + query = QueryProcessor(tables=['task'], clauses=['host_id = %(host_id)i'], + values={'host_id': host_id}, opts={'countOnly': True}) + hostinfo['tasks'] = query.executeOne() self.hosts[host_id] = hostinfo self.host_ids.add(host_id) @@ -139,19 +143,21 @@ def get_ready_hosts(): """ query = QueryProcessor( tables=['host'], - columns=['host.id', 'name', 'arches', 'task_load', 'capacity'], - aliases=['id', 'name', 'arches', 'task_load', 'capacity'], + columns=['host.id', 'name', 'arches', 'task_load', 'capacity', 'data'], + aliases=['id', 'name', 'arches', 'task_load', 'capacity', 'data'], clauses=[ 'enabled IS TRUE', 'ready IS TRUE', 'expired IS FALSE', 'master IS NULL', 'active IS TRUE', - "update_time > NOW() - '5 minutes'::interval" + "update_time > NOW() - '5 minutes'::interval", + 'capacity > task_load', ], joins=[ 'sessions USING (user_id)', - 'host_config ON host.id = host_config.host_id' + 'host_config ON host.id = host_config.host_id', + 'scheduler_host_data ON host.id = scheduler_host_data.host_id', ] ) hosts = query.execute() @@ -240,7 +246,7 @@ def schedule(task_id=None): ) tasks = list(query.execute()) - # assign them to random builders fulfiling criteria in priority order + # assign them to builders fulfiling criteria in priority order runs = [] for task in tasks: host = hosts.get(task) From ee8af4c6601cc765da1e9cbc6e5dc9c6fc6fda93 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 22 2022 13:10:27 +0000 Subject: [PATCH 10/20] configurable log level for scheduler --- diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index eb4ee58..9cd0436 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -475,6 +475,7 @@ def load_config(environ): ['LogFormat', 'string', '%(asctime)s [%(levelname)s] m=%(method)s u=%(user_name)s p=%(process)s r=%(remoteaddr)s ' '%(name)s: %(message)s'], + ['SchedulerLogLevel', 'string', 'WARNING'], ['MissingPolicyOk', 'boolean', True], ['EnableMaven', 'boolean', False], diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 32231ac..30d489c 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -3,6 +3,7 @@ import inspect import logging import koji +from koji.context import context from koji.db import ( BulkInsertProcessor, DeleteProcessor, @@ -326,6 +327,7 @@ class DBLogger(object): as both logging parts do this per se (loggind + DB handler via context)""" def __init__(self, logger_name=None): + self.log_level = None if logger_name: self.logger = logger_name else: @@ -333,6 +335,15 @@ class DBLogger(object): def log(self, msg, logger_name=None, level=logging.NOTSET, task_id=None, host_id=None, location=None): + if self.log_level is None: + # can't be done in constructor, as config is not loaded in that time + log_level = context.opts.get('SchedulerLogLevel') + valid_levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') + if log_level not in valid_levels: + raise koji.GenericError(f"Invalid log level: {log_level}") + self.log_level = logging.getLevelName(log_level) + if level < self.log_level: + return if not logger_name: logger_name = self.logger if location is None: From 27b1ac5267c8d5ae868d61944118494100688a75 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 22 2022 14:34:52 +0000 Subject: [PATCH 11/20] test rich interface --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index f3c54ff..468e3f6 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -22,6 +22,11 @@ import six import six.moves.xmlrpc_client from six.moves import filter, map, range, zip +from rich.console import Console +from rich.style import Style +from rich.table import Table, Column +from rich.theme import Theme + import koji from koji.util import base64encode, md5_constructor, to_list from koji_cli.lib import ( @@ -7774,6 +7779,7 @@ def anon_handle_scheduler_info(goptions, session, args): # get the data runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, states=states) + ''' mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s' if not goptions.quiet: header = mask % { @@ -7789,6 +7795,42 @@ def anon_handle_scheduler_info(goptions, session, args): for run in runs: run['state'] = koji.TASK_STATES[run['state']] print(mask % run) + ''' + + theme = Theme({ + 'free': Style(color="#00030c"), + 'scheduled': Style(color="#00030c"), + 'open': Style(color="#ff8c00"), + 'closed': Style(color="#008000"), + 'assigned': Style(color="#000c0f"), + 'canceled': Style(color="#000c90"), + 'failed': Style(color="#ff0000"), + }) + + table = Table( + Column("Task", justify="right"), + "Host", + "State", + "Created", + "Started", + "Ended", + ) + for run in runs: + run['state'] = koji.TASK_STATES[run['state']] + for ts in ('create_ts', 'start_ts', 'end_ts'): + if run[ts] is not None: + run[ts] = time.asctime(time.localtime(run[ts])) + else: + run[ts] = '' + table.add_row( + str(run["task_id"]), + str(run["host_id"]), + f'[{run["state"].lower()}]{run["state"]}[/{run["state"].lower()}]', + run["create_ts"], + run["start_ts"], + run["end_ts"]) + Console(theme=theme).print(table) + #print(datetime.fromtimestamp(run['create_time']).isoformat(' ')) if host_id: print('Host data for %s:' % options.host) @@ -7838,6 +7880,7 @@ def handle_scheduler_logs(goptions, session, args): logs = session.scheduler.getLogs(**kwargs) + """ mask = ("%(task_id)s\t%(host_name)s\t%(msg_time)s\t%(logger_name)s" "\t%(level)s\t%(location)s\t%(msg)s") if not goptions.quiet: @@ -7856,3 +7899,33 @@ def handle_scheduler_logs(goptions, session, args): for log in logs: print(mask % log) + """ + table = Table( + "Time", + Column("Task", justify="right"), + "Host", + "Logger", + "Level", + "Location", + "Message", + ) + for log in logs: + if log['task_id']: + log['task_id'] = str(log['task_id']) + else: + log['task_id'] = '' + if log['msg_ts']: + log['msg_ts'] = time.asctime(time.localtime(log['msg_ts'])) + else: + log['msg_ts'] = '' + table.add_row( + log['msg_ts'], + str(log['task_id']), + log['host_name'], + log['logger_name'], + log['level'], + log['location'], + log['msg'], + style=f"logging.level.{log['level'].lower()}", + ) + Console().print(table) diff --git a/docs/schema.sql b/docs/schema.sql index 86c70f8..8d8c3ac 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -977,7 +977,7 @@ CREATE TABLE scheduler_task_runs ( task_id INTEGER REFERENCES task (id) NOT NULL, host_id INTEGER REFERENCES host (id) NOT NULL, state INTEGER NOT NULL, - create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + create_time TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), start_time TIMESTAMPTZ, end_time TIMESTAMPTZ ) WITHOUT OIDS; diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 30d489c..1785cf5 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -110,7 +110,19 @@ def get_task_runs(taskID=None, hostID=None, states=None): :returns list[dict]: list of dicts """ - columns = ['id', 'task_id', 'host_id', 'state', 'create_time', 'start_time', 'end_time'] + fields = ( + ('id', 'id'), + ('task_id', 'task_id'), + ('host_id', 'host_id'), + ('state', 'state'), + ('create_time', 'create_time'), + ("date_part('epoch', create_time)", 'create_ts'), + ('start_time', 'start_time'), + ("date_part('epoch', start_time)", 'start_ts'), + ('end_time', 'end_time'), + ("date_part('epoch', end_time)", 'end_ts'), + ) + columns, aliases = zip(*fields) clauses = [] if taskID is not None: clauses.append('task_id = %(taskID)i') @@ -120,7 +132,7 @@ def get_task_runs(taskID=None, hostID=None, states=None): clauses.append('state IN %(states)s') query = QueryProcessor( - tables=['scheduler_task_runs'], columns=columns, + tables=['scheduler_task_runs'], columns=columns, aliases=aliases, clauses=clauses, values=locals(), opts={'order': 'id'}, ) return query.execute() @@ -286,6 +298,7 @@ class SchedulerExports(): ('task_id', 'task_id'), ('host_id', 'host_id'), ('msg_time', 'msg_time'), + ("date_part('epoch', msg_time)", 'msg_ts'), ('logger_name', 'logger_name'), ('level', 'level'), ('location', 'location'), From d7bda8af46e37de3702cd0986f89f1d26311bfc9 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 09:34:21 +0000 Subject: [PATCH 12/20] fix query --- diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 1785cf5..aa4b33e 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -308,10 +308,10 @@ class SchedulerExports(): clauses = [] values = {} if taskID is not None: - clauses.append("taskID = %(taskID)") + clauses.append("task_id = %(taskID)i") values['taskID'] = taskID if hostID is not None: - clauses.append("hostID = %(hostID)") + clauses.append("host_id = %(hostID)i") values['hostID'] = hostID if level is not None: clauses.append("level = %(level)s") From a26360bab6c91fbe977fef8a73c6cea4abed90fe Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 09:34:53 +0000 Subject: [PATCH 13/20] rich print for CLI --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index 468e3f6..375dc1f 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -26,6 +26,7 @@ from rich.console import Console from rich.style import Style from rich.table import Table, Column from rich.theme import Theme +from rich import print import koji from koji.util import base64encode, md5_constructor, to_list From 2849ecec5d71087f25460161dde6acff4d1fec0a Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 09:35:21 +0000 Subject: [PATCH 14/20] filter hosts by methods --- diff --git a/koji/daemon.py b/koji/daemon.py index 84fb781..7a8bdcd 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -1020,10 +1020,13 @@ class TaskManager(object): def setHostData(self): """Report all runtime data to scheduler""" + methods = {} + for method in self.handlers: + methods[method] = self.handlers[method]._taskWeight hostdata = { 'task_load': self.task_load, 'ready': self.ready, - 'methods': list(self.handlers.keys()), + 'methods': methods, 'maxjobs': self.options.maxjobs, # kernel # cpu_total diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index aa4b33e..5ddcd21 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -20,6 +20,7 @@ class HostHashTable(object): def __init__(self, hosts=None): self.arches = {} self.channels = {} + self.methods = {} self.hosts = {} self.host_ids = set() if hosts is None: @@ -36,6 +37,7 @@ class HostHashTable(object): # TODO: better heuristic? hostinfo['priority'] += 100 # TODO: one query for all hosts + # TODO: add only hosts which checked in in last X minutes query = QueryProcessor(tables=['task'], clauses=['host_id = %(host_id)i'], values={'host_id': host_id}, opts={'countOnly': True}) hostinfo['tasks'] = query.executeOne() @@ -46,27 +48,53 @@ class HostHashTable(object): self.arches.setdefault(arch, set()).add(host_id) for channel in hostinfo['channels']: self.channels.setdefault(channel, set()).add(host_id) + for method in hostinfo['data']['methods']: + self.methods.setdefault(method, set()).add(host_id) + + # know about refused tasks + query = QueryProcessor(tables=['scheduler_task_runs'], columns=['task_id'], + clauses=['host_id = %(host_id)i', 'state = %(state)i'], + values={'host_id': host_id, 'state': koji.TASK_STATES['REFUSED']}, + opts={'asList': True}) + self.hosts[host_id]['refused_tasks'] = set(query.execute()) def get(self, task): # filter by requirements host_ids = set(self.host_ids) + # assigned task + if task['host_id']: + host_ids &= {task['host_id']} + # filter by architecture if task.get('arch') is not None: host_ids &= self.arches[task['arch']] + # filter by method + host_ids &= self.methods[task['method']] + # filter by channel if task.get('channel_id') is not None: host_ids &= self.channels[task['channel_id']] - # select best from filtered - hosts = [self.hosts[host_id] for host_id in host_ids] + # select best from filtered and remove hosts which already refused this task + hosts = [] + for host_id in host_ids: + hostinfo = self.hosts[host_id] + if task['id'] in hostinfo['refused_tasks']: + dblogger.debug("Task already refused", task_id=task['id'], host_id=host_id) + continue + task_weight = hostinfo['data']['methods'][task['method']] + if task_weight > hostinfo['capacity'] - hostinfo['task_load']: + dblogger.debug( + f"Higher weight {task_weight} than available capacity {hostinfo['capacity']}", + task_id=task['id'], host_id=host_id) + continue + hosts.append(hostinfo) + hosts = sorted(hosts, key=lambda x: -x['priority']) if not hosts: return None host = hosts[0] - # TODO: lower capacity by some heuritics # TODO: reduce resources (reserved memory, cpus) - # TODO: reduce capacity by ?, placeholder 1.5 as for buildArch, - # otherwise it is high chance that it could be overcomitted - self.hosts[host['id']]['task_load'] += 1.5 + self.hosts[host['id']]['task_load'] += self.hosts[host['id']]['data']['methods'][task['method']] return host @@ -95,7 +123,7 @@ def get_host_data(hostID=None): clauses=clauses, columns=columns, values=locals(), - opts={'order': 'id'} + opts={'order': 'host_id'} ) return query.execute() @@ -231,6 +259,10 @@ def clean_scheduler_queue(): def schedule(task_id=None): """Run scheduler""" + # TODO: locking so, only one scheduler runs in a time + # TODO: don't run it too often (configurable) + # TODO: run only reasonably, now we trigger it on every updateHost + makeTask + # stupid for now, just add new task to random builder logger.error("SCHEDULER RUN") hosts = HostHashTable() @@ -239,7 +271,7 @@ def schedule(task_id=None): return # find unscheduled tasks - columns = ['id', 'arch', 'method', 'channel_id', 'priority'] + columns = ['id', 'arch', 'method', 'channel_id', 'priority', 'host_id'] if not task_id: clean_scheduler_queue() query = QueryProcessor( From 5ba4d4374b854e6742e35daa4f740f400eefafea Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 10:06:02 +0000 Subject: [PATCH 15/20] add method to log print --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index 375dc1f..d1053c4 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7904,24 +7904,36 @@ def handle_scheduler_logs(goptions, session, args): table = Table( "Time", Column("Task", justify="right"), + "Method", "Host", "Logger", "Level", "Location", "Message", ) + result = [] + task_ids = set([log['task_id'] for log in logs if log['task_id']]) + with session.multicall() as m: + for task_id in task_ids: + result.append(m.getTaskInfo(task_id)) + tasks = {} + for task in result: + tinfo = task.result + tasks[str(tinfo['id'])] = tinfo + for log in logs: if log['task_id']: - log['task_id'] = str(log['task_id']) + task = str(log['task_id']) else: - log['task_id'] = '' + task = '' if log['msg_ts']: - log['msg_ts'] = time.asctime(time.localtime(log['msg_ts'])) + msg_ts = time.asctime(time.localtime(log['msg_ts'])) else: - log['msg_ts'] = '' + msg_ts = '' table.add_row( - log['msg_ts'], - str(log['task_id']), + msg_ts, + task, + tasks.get(task, {}).get('method', ''), log['host_name'], log['logger_name'], log['level'], From 45f18957310d94b84c779738b84823c401d49332 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 10:14:14 +0000 Subject: [PATCH 16/20] limit tasks by maxjobs --- diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 5ddcd21..7685ade 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -86,6 +86,9 @@ class HostHashTable(object): f"Higher weight {task_weight} than available capacity {hostinfo['capacity']}", task_id=task['id'], host_id=host_id) continue + if hostinfo['data']['maxjobs'] < 1: + dblogger.debug("Host has no free job slot", task_id=task['id'], host_id=host_id) + continue hosts.append(hostinfo) hosts = sorted(hosts, key=lambda x: -x['priority']) @@ -94,7 +97,8 @@ class HostHashTable(object): host = hosts[0] # TODO: reduce resources (reserved memory, cpus) - self.hosts[host['id']]['task_load'] += self.hosts[host['id']]['data']['methods'][task['method']] + host['task_load'] += host['data']['methods'][task['method']] + host['data']['maxjobs'] -= 1 return host From f5fe8a117e6ca2d8544f5b66839257d85e2a69bf Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 10:18:10 +0000 Subject: [PATCH 17/20] fix query --- diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 7685ade..e06abf1 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -56,7 +56,7 @@ class HostHashTable(object): clauses=['host_id = %(host_id)i', 'state = %(state)i'], values={'host_id': host_id, 'state': koji.TASK_STATES['REFUSED']}, opts={'asList': True}) - self.hosts[host_id]['refused_tasks'] = set(query.execute()) + self.hosts[host_id]['refused_tasks'] = set([x[0] for x in query.execute()]) def get(self, task): # filter by requirements @@ -96,7 +96,7 @@ class HostHashTable(object): return None host = hosts[0] - # TODO: reduce resources (reserved memory, cpus) + # reduce resources (reserved memory, cpus) host['task_load'] += host['data']['methods'][task['method']] host['data']['maxjobs'] -= 1 return host From eefd11fb1b0ca3637489a7f820f73b2183dfc580 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 11:46:36 +0000 Subject: [PATCH 18/20] backward compatibility --- diff --git a/koji/daemon.py b/koji/daemon.py index 7a8bdcd..e162e9f 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -1024,8 +1024,8 @@ class TaskManager(object): for method in self.handlers: methods[method] = self.handlers[method]._taskWeight hostdata = { - 'task_load': self.task_load, - 'ready': self.ready, + # 'task_load': self.task_load, + # 'ready': self.ready, 'methods': methods, 'maxjobs': self.options.maxjobs, # kernel diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 373469a..6440e5b 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -14057,15 +14057,24 @@ class Host(object): def getLoadData(self): """Get load balancing data - This data is relatively small and the necessary load analysis is - relatively complex, so we let the host machines crunch it.""" + Compatibility version for older builders. + """ host = get_host(self.id) query = QueryProcessor(tables=['host_channels'], columns=['channel_id'], clauses=['host_id = %(id)s', 'active IS TRUE'], values={'id': self.id}, opts={'asList': True}) host['channels'] = [x[0] for x in query.execute()] - tasks = scheduler.get_task_runs(hostID=self.id) + task_runs = scheduler.get_task_runs(hostID=self.id, states=[koji.TASK_STATES['ASSIGNED'], + koji.TASK_STATES['SCHEDULED']]) + if task_runs: + query = QueryProcessor(tables=['task'], clauses=['id = %(id)s'], + columns=['id', 'state', 'channel_id', 'host_id', 'arch', + 'method', 'priority', 'create_time'], + values={'id': task_runs[0]['task_id']}) + tasks = query.execute() + else: + tasks = [] return [[host], tasks] def getTask(self): diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index e06abf1..17f6be6 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -23,6 +23,7 @@ class HostHashTable(object): self.methods = {} self.hosts = {} self.host_ids = set() + self.old_hosts = set() if hosts is None: hosts = get_ready_hosts() for hostinfo in hosts: @@ -48,8 +49,11 @@ class HostHashTable(object): self.arches.setdefault(arch, set()).add(host_id) for channel in hostinfo['channels']: self.channels.setdefault(channel, set()).add(host_id) - for method in hostinfo['data']['methods']: - self.methods.setdefault(method, set()).add(host_id) + if not hostinfo['data']: + self.old_hosts.add(host_id) + else: + for method in hostinfo['data']['methods']: + self.methods.setdefault(method, set()).add(host_id) # know about refused tasks query = QueryProcessor(tables=['scheduler_task_runs'], columns=['task_id'], @@ -66,27 +70,30 @@ class HostHashTable(object): host_ids &= {task['host_id']} # filter by architecture if task.get('arch') is not None: - host_ids &= self.arches[task['arch']] - # filter by method - host_ids &= self.methods[task['method']] + host_ids &= self.arches.get(task['arch'], set()) + # filter by method (unknown for old builders) + host_ids &= self.methods.get(task['method'], set()) | self.old_hosts # filter by channel if task.get('channel_id') is not None: - host_ids &= self.channels[task['channel_id']] + host_ids &= self.channels.get(task['channel_id'], set()) # select best from filtered and remove hosts which already refused this task hosts = [] + # for old builder just heurstic of 1.5 + task_weight = 1.5 for host_id in host_ids: hostinfo = self.hosts[host_id] if task['id'] in hostinfo['refused_tasks']: dblogger.debug("Task already refused", task_id=task['id'], host_id=host_id) continue - task_weight = hostinfo['data']['methods'][task['method']] + if host_id not in self.old_hosts: + task_weight = hostinfo['data']['methods'][task['method']] if task_weight > hostinfo['capacity'] - hostinfo['task_load']: dblogger.debug( f"Higher weight {task_weight} than available capacity {hostinfo['capacity']}", task_id=task['id'], host_id=host_id) continue - if hostinfo['data']['maxjobs'] < 1: + if host_id not in self.old_hosts and hostinfo['data']['maxjobs'] < 1: dblogger.debug("Host has no free job slot", task_id=task['id'], host_id=host_id) continue hosts.append(hostinfo) @@ -97,8 +104,9 @@ class HostHashTable(object): host = hosts[0] # reduce resources (reserved memory, cpus) - host['task_load'] += host['data']['methods'][task['method']] - host['data']['maxjobs'] -= 1 + host['task_load'] += task_weight + if host['id'] not in self.old_hosts: + host['data']['maxjobs'] -= 1 return host @@ -202,7 +210,7 @@ def get_ready_hosts(): joins=[ 'sessions USING (user_id)', 'host_config ON host.id = host_config.host_id', - 'scheduler_host_data ON host.id = scheduler_host_data.host_id', + 'LEFT JOIN scheduler_host_data ON host.id = scheduler_host_data.host_id', ] ) hosts = query.execute() @@ -272,6 +280,7 @@ def schedule(task_id=None): hosts = HostHashTable() if not hosts.hosts: # early fail if there is nothing available + dblogger.debug("Hosts not found") return # find unscheduled tasks @@ -365,7 +374,7 @@ class SchedulerExports(): columns, aliases = zip(*fields) query = QueryProcessor(tables=['scheduler_log_messages'], columns=columns, aliases=aliases, - joins=['host ON host_id = host.id'], + joins=['LEFT JOIN host ON host_id = host.id'], clauses=clauses, values=values, opts={'order': 'msg_time'}) return query.execute() From 5e7d75c7af45347519f8a8b4471434c74f728a89 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 12:03:21 +0000 Subject: [PATCH 19/20] fix task closing --- diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index 6440e5b..a699877 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -404,8 +404,11 @@ class Task(object): data={'result': info['result'], 'state': state}, rawdata={'completion_time': 'NOW()'}) update.execute() - update = UpdateProcessor('scheduler_task_runs', - clauses=['task_id = %(task_id)i', 'host_id = %(host_id)i'], + update = UpdateProcessor(table='scheduler_task_runs', + clauses=[ + 'id = (SELECT MAX(id) FROM scheduler_task_runs ' + 'WHERE task_id = %(task_id)i AND host_id = %(host_id)i)', + ], data={'state': state}, rawdata={'end_time': 'NOW()'}, values={'task_id': self.id, 'host_id': info['host_id']}) From f017161bb472dfe8b64121d8cb894418c95ca9b0 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Jan 02 2023 12:52:08 +0000 Subject: [PATCH 20/20] fix query --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index d1053c4..184413b 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7760,6 +7760,8 @@ def anon_handle_scheduler_info(goptions, session, args): help="Limit data to given builder (name/id)") parser.add_option("--state", action="store", type='str', default=None, help="Limit data to task state") + parser.add_option("-c", "--children", action="store_true", default=False, + help="If --task is given, also display its children") (options, args) = parser.parse_args(args) if len(args) > 0: parser.error("This command takes no arguments") @@ -7778,8 +7780,19 @@ def anon_handle_scheduler_info(goptions, session, args): else: states = None + if options.children: + if not options.task: + parser.error("--children makes sense only with --task") + tasks = sorted([int(x) for x in session.getTaskDescendents(options.task).keys()]) + else: + tasks = [options.task] + # get the data - runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, states=states) + result = [] + with session.multicall() as m: + for task in tasks: + result.append(m.scheduler.getTaskRuns(taskID=task, hostID=host_id, states=states)) + runs = itertools.chain(*[x.result for x in result]) ''' mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s' if not goptions.quiet: @@ -7815,9 +7828,19 @@ def anon_handle_scheduler_info(goptions, session, args): "Created", "Started", "Ended", + "Waited", ) for run in runs: run['state'] = koji.TASK_STATES[run['state']] + if run['start_ts']: + diff = run['start_ts'] - run['create_ts'] + s = diff % 60 + diff = (diff - s) / 60 + m = diff % 60 + h = (diff - s) / 60 + run['waited'] = '%02d:%02d:%02d' % (h, m, s) + else: + run['waited'] = '' for ts in ('create_ts', 'start_ts', 'end_ts'): if run[ts] is not None: run[ts] = time.asctime(time.localtime(run[ts])) @@ -7829,7 +7852,9 @@ def anon_handle_scheduler_info(goptions, session, args): f'[{run["state"].lower()}]{run["state"]}[/{run["state"].lower()}]', run["create_ts"], run["start_ts"], - run["end_ts"]) + run["end_ts"], + str(run["waited"]), + ) Console(theme=theme).print(table) #print(datetime.fromtimestamp(run['create_time']).isoformat(' ')) diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 17f6be6..99047ae 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -249,7 +249,7 @@ def clean_scheduler_queue(): tables=['scheduler_task_runs'], columns=['id'], clauses=[ - "create_time < NOW() + '5 minutes'::interval", + "create_time < NOW() - '5 minutes'::interval", "state = %(state)i", ], values={'state': koji.TASK_STATES['SCHEDULED']},