| |
@@ -76,6 +76,7 @@
|
| |
columns=fields, aliases=aliases, tables=['task'],
|
| |
clauses=['host_id = %(hostID)s', 'state=%(assigned)s'],
|
| |
values={'hostID': hostID, 'assigned': koji.TASK_STATES['ASSIGNED']},
|
| |
+ opts={'order': 'priority,create_ts'},
|
| |
)
|
| |
|
| |
tasks = query.execute()
|
| |
@@ -325,7 +326,7 @@
|
| |
if (host['capacity'] - host['_load'] > min_avail and
|
| |
host['_ntasks'] < self.maxjobs):
|
| |
# add run entry
|
| |
- self.add_run(task, host)
|
| |
+ self.assign(task, host)
|
| |
# update our totals and rank
|
| |
host['_load'] += task['weight']
|
| |
host['_ntasks'] += 1
|
| |
@@ -357,9 +358,11 @@
|
| |
|
| |
taskruns = runs.get(task['task_id'], [])
|
| |
if not taskruns:
|
| |
- log_both('Assigned task with no active run entry', task_id=task['task_id'],
|
| |
- host_id=host['id'], level=logging.ERROR)
|
| |
- kojihub.Task(task['task_id']).free()
|
| |
+ # a task that is assigned without a run entry is treated as an override
|
| |
+ # we simply leave these alone
|
| |
+ # TODO track overrides more explicitly
|
| |
+ logger.debug('Override task assignment: task %i, host %s',
|
| |
+ task['task_id'], host['name'])
|
| |
continue
|
| |
|
| |
if len(taskruns) > 1:
|
| |
@@ -565,8 +568,17 @@
|
| |
|
| |
return hosts
|
| |
|
| |
- def add_run(self, task, host):
|
| |
- log_both('Assigning task', task_id=task['task_id'], host_id=host['id'])
|
| |
+ def assign(self, task, host, force=False, override=False):
|
| |
+ # mark the task assigned
|
| |
+ success = kojihub.Task(task['task_id']).assign(host['id'], force=force)
|
| |
+ if not success:
|
| |
+ log_both('Assignment failed', task_id=task['task_id'], host_id=host['id'])
|
| |
+ return False
|
| |
+
|
| |
+ if override:
|
| |
+ log_both('Assigning task (override)', task_id=task['task_id'], host_id=host['id'])
|
| |
+ else:
|
| |
+ log_both('Assigning task', task_id=task['task_id'], host_id=host['id'])
|
| |
|
| |
# mark any older runs inactive
|
| |
update = UpdateProcessor(
|
| |
@@ -577,14 +589,31 @@
|
| |
)
|
| |
update.execute()
|
| |
|
| |
- # add the new run
|
| |
- insert = InsertProcessor('scheduler_task_runs')
|
| |
- insert.set(task_id=task['task_id'], host_id=host['id'])
|
| |
- insert.execute()
|
| |
+ if not override:
|
| |
+ # add the new run
|
| |
+ insert = InsertProcessor('scheduler_task_runs')
|
| |
+ insert.set(task_id=task['task_id'], host_id=host['id'])
|
| |
+ insert.execute()
|
| |
+ # In the override case, we omit the run entry
|
| |
|
| |
- # mark the task assigned
|
| |
- task = kojihub.Task(task['task_id'])
|
| |
- task.assign(host['id'])
|
| |
+ return True
|
| |
+
|
| |
+
|
| |
+ # exported as assignTask in kojihub
|
| |
+ def do_assign(task_id, host, force=False, override=False):
|
| |
+ """Assign a task to a host
|
| |
+
|
| |
+ Specify force=True to assign a non-free task
|
| |
+ Specify override=True to prevent the scheduler from reassigning later
|
| |
+ """
|
| |
+ task_id = kojihub.convert_value(task_id, cast=int)
|
| |
+ host = kojihub.get_host(host, strict=True)
|
| |
+ force = kojihub.convert_value(force, cast=bool)
|
| |
+ override = kojihub.convert_value(override, cast=bool)
|
| |
+ context.session.assertPerm('admin')
|
| |
+ task = {'task_id': task_id} # the assign call just needs the task_id field
|
| |
+ db_lock('scheduler', wait=True)
|
| |
+ return TaskScheduler().assign(task, host, force=force, override=override)
|
| |
|
| |
|
| |
class SchedulerExports:
|
| |
This is a minimally invasive, if less than ideal, solution to #3979. If a task is assigned but has no associated run entry, this is treated as an assignment override -- that is the scheduler will not adjust it.
Fixes https://pagure.io/koji/issue/3979