From c3b7c815cb412747e5a783b45e53cbcf89f179f3 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Dec 03 2018 20:54:07 +0000 Subject: add a timeout for skipping a task based on bin capacity --- diff --git a/koji/daemon.py b/koji/daemon.py index 8d8c37d..a4ef7ed 100644 --- a/koji/daemon.py +++ b/koji/daemon.py @@ -863,6 +863,7 @@ class TaskManager(object): avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]] avail[bin].sort() avail[bin].reverse() + self.cleanDelayTimes() for task in tasks: # note: tasks are in priority order self.logger.debug("task: %r" % task) @@ -894,8 +895,9 @@ class TaskManager(object): median = bin_avail[(len(bin_avail)-1)//2] self.logger.debug("ours: %.2f, median: %.2f" % (our_avail, median)) if not self.checkRelAvail(bin_avail, our_avail): - #decline for now and give the upper half a chance - return False + if self.checkAvailDelay(task): + #decline for now and give the upper half a chance + return False #otherwise, we attempt to open the task if self.takeTask(task): return True @@ -904,6 +906,36 @@ class TaskManager(object): raise Exception("Invalid task state reported by server") return False + def checkAvailDelay(self, task): + """Check to see if we should still delay taking a task + + Returns True if we are still in the delay period and should skip the + task. Otherwise False (delay has expired). + """ + + now = time.time() + ts = self.skipped_tasks.get(task['id']) + if not ts: + self.skipped_tasks[task['id']] = now + return True + # else + delay = getattr(self.options, 'task_avail_delay', 180) + if now - ts < delay: + del self.skipped_tasks[task['id']] + return True + + def cleanDelayTimes(self): + """Remove old entries from skipped_tasks""" + now = time.time() + delay = getattr(self.options, 'task_avail_delay', 180) + cutoff = now - delay * 10 + # After 10x the delay, we've had plenty of opportunity to take the + # task, so either it has already been taken or we can't take it. + for task_id in self.skipped_tasks: + ts = self.skipped_tasks[task_id] + if ts < cutoff: + del self.skipped_tasks[task_id] + def checkRelAvail(self, bin_avail, avail): """ Check our available capacity against the capacity of other hosts in this bin.