| |
@@ -244,10 +244,17 @@
|
| |
is_worker_alive() method - whether the task is really still doing
|
| |
something on background or not (== unexpected failure cleanup).
|
| |
Fill float value in seconds.
|
| |
+ :cvar worker_cleanup_period: How often should WorkerManager try to cleanup
|
| |
+ workers? (value is a period in seconds)
|
| |
"""
|
| |
+
|
| |
+ # pylint: disable=too-many-instance-attributes
|
| |
+
|
| |
worker_prefix = 'worker' # make sure this is unique in each class
|
| |
worker_timeout_start = 30
|
| |
- worker_timeout_deadcheck = 60
|
| |
+ worker_timeout_deadcheck = 3*60
|
| |
+ worker_cleanup_period = 3.0
|
| |
+
|
| |
|
| |
def __init__(self, redis_connection=None, max_workers=8, log=None,
|
| |
frontend_client=None, limits=None):
|
| |
@@ -263,6 +270,7 @@
|
| |
# to survive server restarts (we adopt the old background workers).
|
| |
self._tracked_workers = set(self.worker_ids())
|
| |
self._limits = limits or []
|
| |
+ self._last_worker_cleanup = None
|
| |
|
| |
def start_task(self, worker_id, task):
|
| |
"""
|
| |
@@ -407,6 +415,12 @@
|
| |
start_time = time.time()
|
| |
self.log.debug("Worker.run() start at time %s", start_time)
|
| |
|
| |
+ # Make sure _cleanup_workers() has some effect during the run() call.
|
| |
+ # This is here mostly for the test-suite, because in the real use-cases
|
| |
+ # the worker_cleanup_period is much shorter period than the timeout and
|
| |
+ # the cleanup is done _several_ times during the run() call.
|
| |
+ self._last_worker_cleanup = 0.0
|
| |
+
|
| |
while True:
|
| |
now = start_time if now is None else time.time()
|
| |
|
| |
@@ -417,6 +431,7 @@
|
| |
|
| |
worker_count = len(self._tracked_workers)
|
| |
if worker_count >= self.max_workers:
|
| |
+ self.log.debug("Worker count on a limit %s", worker_count)
|
| |
time.sleep(1)
|
| |
continue
|
| |
|
| |
@@ -427,6 +442,7 @@
|
| |
# Empty queue!
|
| |
if worker_count:
|
| |
# It still makes sense to cycle to finish the workers.
|
| |
+ self.log.debug("No more tasks, waiting for workers")
|
| |
time.sleep(1)
|
| |
continue
|
| |
# Optimization part, nobody is working now, and there's nothing
|
| |
@@ -472,6 +488,22 @@
|
| |
self._tracked_workers.discard(worker_id)
|
| |
|
| |
def _cleanup_workers(self, now):
|
| |
+ """
|
| |
+ Go through all the tracked workers and check if they already finished,
|
| |
+ failed to start or died in the background.
|
| |
+ """
|
| |
+
|
| |
+ # This method is called very frequently (several hundreds per second,
|
| |
+ # for each of the attempts to start a worker in the self.run() method).
|
| |
+ # Because the likelihood that some of the background workers changed
|
| |
+ # state is pretty low, we control the frequency of the cleanup here.
|
| |
+ now = time.time()
|
| |
+ if now - self._last_worker_cleanup < self.worker_cleanup_period:
|
| |
+ return
|
| |
+
|
| |
+ self.log.debug("Trying to clean old workers")
|
| |
+ self._last_worker_cleanup = time.time()
|
| |
+
|
| |
for worker_id in self.worker_ids():
|
| |
info = self.redis.hgetall(worker_id)
|
| |
|
| |
Does this make more sense now?