| |
@@ -256,7 +256,12 @@
|
| |
self.redis = redis_connection
|
| |
self.max_workers = max_workers
|
| |
self.frontend_client = frontend_client
|
| |
- self._tracked_workers = set()
|
| |
+ # We have to frequently ask for the actually tracked list of workers —
|
| |
+ # therefore we keep it here to not re-query the list from Redis all the
|
| |
+ # time. We have to load the list from Redis initially, when the process
|
| |
+ # starts (Manager/Dispatcher class is loaded) because we want the logic
|
| |
+ # to survive server restarts (we adopt the old background workers).
|
| |
+ self._tracked_workers = set(self.worker_ids())
|
| |
self._limits = limits or []
|
| |
|
| |
def start_task(self, worker_id, task):
|
| |
@@ -340,10 +345,7 @@
|
| |
assert prefix == self.worker_prefix
|
| |
return task_id
|
| |
|
| |
- def _start_tracking_worker(self, worker_id, task):
|
| |
- if worker_id in self._tracked_workers:
|
| |
- return
|
| |
- self._tracked_workers.add(worker_id)
|
| |
+ def _calculate_limits_for_task(self, worker_id, task):
|
| |
for limit in self._limits:
|
| |
limit.worker_added(worker_id, task)
|
| |
|
| |
@@ -358,8 +360,9 @@
|
| |
worker_id = self.get_worker_id(task_id)
|
| |
|
| |
if worker_id in self._tracked_workers:
|
| |
- # No need to re-add this to queue.
|
| |
- self._start_tracking_worker(worker_id, task)
|
| |
+ # No need to re-add this to queue, but we need to calculate
|
| |
+ # it into the limits.
|
| |
+ self._calculate_limits_for_task(worker_id, task)
|
| |
self.log.debug("Task %s already has a worker process", task_id)
|
| |
return
|
| |
|
| |
@@ -450,9 +453,10 @@
|
| |
def _start_worker(self, task, time_now):
|
| |
worker_id = self.get_worker_id(repr(task))
|
| |
self.redis.hset(worker_id, 'allocated', time_now)
|
| |
+ self._tracked_workers.add(worker_id)
|
| |
self.log.info("Starting worker %s, task.priority=%s", worker_id,
|
| |
task.priority)
|
| |
- self._start_tracking_worker(worker_id, task)
|
| |
+ self._calculate_limits_for_task(worker_id, task)
|
| |
self.start_task(worker_id, task)
|
| |
|
| |
def clean_tasks(self):
|
| |
@@ -462,7 +466,6 @@
|
| |
self.tasks = JobQueue()
|
| |
for limit in self._limits:
|
| |
limit.clear()
|
| |
- self._tracked_workers = set()
|
| |
|
| |
def _delete_worker(self, worker_id):
|
| |
self.redis.delete(worker_id)
|
| |
The _tracked_workers is now kept precisely in sync with the Redis DB
state. Previously we cleaned the _tracked_workers set for each
WorkerManager.run(), and it got de-synced.
Previously we cleared the _tracked_workers set() for each run(). This
caused that we started a brand new Worker for the same (already running)
task in the queue in every run() - aka fork bomb.
Now, when we do not clear the _tracked_workers set we have to re-work a
bit the _start_tracking_worker() method. It had two purposes (calculate
limits, and add the worker to _tracked_workers). Now we have those two
purposes separated, so the method is renamed.
We also have to load the initial state of _tracked_workers from RedisDB,
otherwise we are unable to adopt the workers started by previous
dispatcher process (before systemctl restart copr-backend.target).