From d7e6dc771cc8bee82e140cdd97d02a732662c96d Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Oct 03 2023 13:23:02 +0000 Subject: flake8 --- diff --git a/kojihub/scheduler.py b/kojihub/scheduler.py index 768e27c..df1be0c 100644 --- a/kojihub/scheduler.py +++ b/kojihub/scheduler.py @@ -1,11 +1,9 @@ import logging -import psycopg2 import time import koji from . import kojihub from .db import QueryProcessor, InsertProcessor, UpdateProcessor, db_lock -from koji.context import context convert_value = kojihub.convert_value @@ -16,8 +14,8 @@ logger = logging.getLogger('koji.scheduler') def log_db(msg, task_id=None, host_id=None): insert = InsertProcessor( - 'scheduler_log_messages', - data={'msg': msg, 'task_id': task_id, 'host_id': host_id}, + 'scheduler_log_messages', + data={'msg': msg, 'task_id': task_id, 'host_id': host_id}, ) insert.execute() @@ -88,21 +86,12 @@ def get_task_runs(taskID=None, hostID=None, active=None): query = QueryProcessor( columns=fields, aliases=aliases, tables=['scheduler_task_runs'], - # joins=['host ON host_id=host.id', 'task ON task_id=task.id'], + # joins=['host ON host_id=host.id', 'task ON task_id=task.id'], clauses=clauses, values=locals()) return query.execute() -def scheduler_map_task(taskinfo): - # map which hosts can take this task - # eventually this will involve more complex rules - q = QueryProcessor() - # select hosts matching arch and channel - hosts = q.execute() - u = InsertProcessor() - - class TaskScheduler(object): def __init__(self): @@ -174,7 +163,6 @@ class TaskScheduler(object): host['capacity'] - host['_load'] > min_avail): task['_hosts'].append(host) logger.info(f'Task {task["task_id"]}: {len(task["_hosts"])} options') - #import pdb; pdb.set_trace() for host in task['_hosts']: # demand gives us a rough measure of how much overall load is pending for the host host.setdefault('_demand', 0.0) @@ -193,7 +181,8 @@ class TaskScheduler(object): for task in self.free_tasks: min_avail = task['weight'] - self.capacity_overcommit task['_hosts'].sort(key=lambda h: h['_rank']) - logger.debug('Task %i choices: %s', task['task_id'], [(h['name'], "%(_rank).2f" %h) for h in task['_hosts']]) + logger.debug('Task %i choices: %s', task['task_id'], + [(h['name'], "%(_rank).2f" % h) for h in task['_hosts']]) for host in task['_hosts']: if (host['capacity'] - host['_load'] > min_avail and host['_ntasks'] < self.maxjobs): @@ -265,15 +254,15 @@ class TaskScheduler(object): # end stale runs update = UpdateProcessor( - 'scheduler_task_runs', - data={'active': False}, - clauses=['active = TRUE', - '(SELECT id FROM task WHERE task.id=task_id AND state IN %(states)s) IS NULL'], - values={'states': [koji.TASK_STATES[s] for s in ('OPEN', 'ASSIGNED')]}, + 'scheduler_task_runs', + data={'active': False}, + clauses=['active = TRUE', + '(SELECT id FROM task WHERE task.id=task_id AND ' + 'state IN %(states)s) IS NULL'], + values={'states': [koji.TASK_STATES[s] for s in ('OPEN', 'ASSIGNED')]}, ) update.execute() - def check_hosts(self): # sanity check ready status hosts_to_mark = [] @@ -285,15 +274,14 @@ class TaskScheduler(object): log_both('Marking host not ready', host_id=host['id']) if hosts_to_mark: - update = db.UpdateProcessor( - 'host', - data={'ready': False}, - clauses=['host_id IN %(host_ids)s'], - values={'host_ids': [h['id'] for h in hosts_to_mark]}, + update = UpdateProcessor( + 'host', + data={'ready': False}, + clauses=['host_id IN %(host_ids)s'], + values={'host_ids': [h['id'] for h in hosts_to_mark]}, ) update.execute() - def get_active_runs(self): runs = get_task_runs(active=True) runs_by_task = {} @@ -327,7 +315,7 @@ class TaskScheduler(object): columns=fields, aliases=aliases, tables=['task'], clauses=('task.state IN %(states)s', 'task.host_id IS NOT NULL', # should always be set, but... - ), + ), values=values, ) active_tasks = query.execute() @@ -420,10 +408,10 @@ class TaskScheduler(object): # mark any older runs inactive update = UpdateProcessor( - 'scheduler_task_runs', - data={'active': False}, - clauses=['task_id=%(task_id)s', 'active = TRUE'], - values={'task_id': task['task_id']}, + 'scheduler_task_runs', + data={'active': False}, + clauses=['task_id=%(task_id)s', 'active = TRUE'], + values={'task_id': task['task_id']}, ) update.execute() @@ -434,10 +422,10 @@ class TaskScheduler(object): # mark the task assigned update = UpdateProcessor( - 'task', - data={'host_id': host['id'], 'state': koji.TASK_STATES['ASSIGNED']}, - clauses=['id=%(task_id)s', 'state=%(free)s'], - values={'task_id': task['task_id'], 'free': koji.TASK_STATES['FREE']}, + 'task', + data={'host_id': host['id'], 'state': koji.TASK_STATES['ASSIGNED']}, + clauses=['id=%(task_id)s', 'state=%(free)s'], + values={'task_id': task['task_id'], 'free': koji.TASK_STATES['FREE']}, ) update.execute()