| |
@@ -5,7 +5,6 @@
|
| |
import copy
|
| |
import time
|
| |
import logging
|
| |
- import subprocess
|
| |
from unittest.mock import MagicMock, patch
|
| |
|
| |
import pytest
|
| |
@@ -28,11 +27,15 @@
|
| |
log = logging.getLogger()
|
| |
log.setLevel(logging.DEBUG)
|
| |
|
| |
+ # pylint: disable=too-many-instance-attributes,protected-access
|
| |
+
|
| |
|
| |
class ToyWorkerManager(WorkerManager):
|
| |
- # pylint: disable=abstract-method,too-many-instance-attributes
|
| |
+ # pylint: disable=abstract-method
|
| |
process_counter = 0
|
| |
task_sleep = 0
|
| |
+ started_in_cycle = 0
|
| |
+ expected_terminations_in_cycle = None
|
| |
|
| |
def start_task(self, worker_id, task):
|
| |
self.process_counter += 1
|
| |
@@ -51,7 +54,30 @@
|
| |
if task_env:
|
| |
environ.update(task_env)
|
| |
|
| |
- subprocess.check_call(list(map(str, cmd)), env=environ)
|
| |
+ start = time.time()
|
| |
+ #subprocess.check_call(list(map(str, cmd)), env=environ)
|
| |
+ retyped_cmd = list(map(str, cmd))
|
| |
+ self.start_daemon_on_background(retyped_cmd, env=environ)
|
| |
+ self.log.debug("starting-on-background-took %s (%s)",
|
| |
+ time.time() - start, retyped_cmd)
|
| |
+ self.started_in_cycle += 1
|
| |
+
|
| |
+ def _clean_daemon_processes(self):
|
| |
+ """
|
| |
+ Check that we are not leaving any zombies behind us
|
| |
+ """
|
| |
+ waited = super()._clean_daemon_processes()
|
| |
+ self.log.debug("cleaned up %s, started %s", waited, self.started_in_cycle)
|
| |
+ if waited != self.started_in_cycle:
|
| |
+ if self.expected_terminations_in_cycle is not None:
|
| |
+ assert self.expected_terminations_in_cycle == waited
|
| |
+ return waited
|
| |
+ assert False
|
| |
+ return waited
|
| |
+
|
| |
+ def run(self, *args, **kwargs):
|
| |
+ self.started_in_cycle = 0
|
| |
+ return super().run(*args, **kwargs)
|
| |
|
| |
def finish_task(self, _w_id, _tinfo):
|
| |
pass
|
| |
@@ -232,6 +258,7 @@
|
| |
self.worker_manager._start_worker(task, time.time())
|
| |
worker_id = self.worker_manager.get_worker_id(repr(task))
|
| |
assert len(self.redis.keys(worker_id)) == 1
|
| |
+ self.worker_manager._clean_daemon_processes()
|
| |
|
| |
def test_number_of_tasks(self):
|
| |
assert self.remaining_tasks() == 10
|
| |
@@ -378,6 +405,12 @@
|
| |
assert self.w0 not in keys
|
| |
|
| |
def test_all_passed(self, caplog):
|
| |
+ # It is a lot of fun with Popen(). It seems it has some zombie reaping
|
| |
+ # mechanism. If the calling function objects are destroyed (including
|
| |
+ # the Popen() return value reference), the future call to Popen() seems
|
| |
+ # to just reap the old Popen() processes.
|
| |
+ self.worker_manager.expected_terminations_in_cycle = 5
|
| |
+
|
| |
self.worker_manager.run(timeout=100)
|
| |
for i in range(0, 10):
|
| |
smsg = "Starting worker {}{}, task.priority=0"
|
| |
The process handling in WorkerManager should be robust enough to avoid
checking the BackgroundWorker:process() exit status. We actually never
provided any useful exit status value from there (except for potential
daemon.DaemonContext() failures) — all the application logic info is
propagated through Redis.
So instead of waiting for the exit value in subprocess.check_call(),
do the "startup" of the BackgroundWorker process in background too,
using just Popen(). If the daemonizing logic ever fails, we will retry
the task anyway in the next cycle(s) (Redis "started" field will not be
set).
This change might look like a fork-bomb will be much easier to happen,
but we have all the WorkerLimit stuff in place and we don't ever start
more BackgroundWorker processes than allowed. But we'll be doing more
concurrency during the process startup so the overall average startup
time of process might prolong, and the code might be less reliable on a
very, very slow backend machines (see the option worker_timeout_start
which is still on 30s by default).
The speedup measured on my (otherwise idling) i7-10850H box was from
0.37s to 0.00065, IOW more than 500x faster. This means that in the 20s
sleeptime window we should be able to start several thousands of
processes (should be really enough to feed all the allocated workers).
Relates: #2095