#2155 backend: manager: properly track running workers
Merged 2 years ago by praiskup. Opened 2 years ago by praiskup.
Unknown source fix-worker-manager  into  main

@@ -256,7 +256,12 @@

          self.redis = redis_connection

          self.max_workers = max_workers

          self.frontend_client = frontend_client

-         self._tracked_workers = set()

+         # We have to frequently ask for the actually tracked list of workers —

+         # therefore we keep it here to not re-query the list from Redis all the

+         # time.  We have to load the list from Redis initially, when the process

+         # starts (Manager/Dispatcher class is loaded) because we want the logic

+         # to survive server restarts (we adopt the old background workers).

+         self._tracked_workers = set(self.worker_ids())

          self._limits = limits or []

  

      def start_task(self, worker_id, task):
@@ -340,10 +345,7 @@

          assert prefix == self.worker_prefix

          return task_id

  

-     def _start_tracking_worker(self, worker_id, task):

-         if worker_id in self._tracked_workers:

-             return

-         self._tracked_workers.add(worker_id)

+     def _calculate_limits_for_task(self, worker_id, task):

          for limit in self._limits:

              limit.worker_added(worker_id, task)

  
@@ -358,8 +360,9 @@

          worker_id = self.get_worker_id(task_id)

  

          if worker_id in self._tracked_workers:

-             # No need to re-add this to queue.

-             self._start_tracking_worker(worker_id, task)

+             # No need to re-add this to queue, but we need to calculate

+             # it into the limits.

+             self._calculate_limits_for_task(worker_id, task)

              self.log.debug("Task %s already has a worker process", task_id)

              return

  
@@ -450,9 +453,10 @@

      def _start_worker(self, task, time_now):

          worker_id = self.get_worker_id(repr(task))

          self.redis.hset(worker_id, 'allocated', time_now)

+         self._tracked_workers.add(worker_id)

          self.log.info("Starting worker %s, task.priority=%s", worker_id,

                        task.priority)

-         self._start_tracking_worker(worker_id, task)

+         self._calculate_limits_for_task(worker_id, task)

          self.start_task(worker_id, task)

  

      def clean_tasks(self):
@@ -462,7 +466,6 @@

          self.tasks = JobQueue()

          for limit in self._limits:

              limit.clear()

-         self._tracked_workers = set()

  

      def _delete_worker(self, worker_id):

          self.redis.delete(worker_id)

The _tracked_workers is now kept precisely in sync with the Redis DB
state. Previously we cleaned the _tracked_workers set for each
WorkerManager.run(), and it got de-synced.

Previously we cleared the _tracked_workers set() for each run(). This
caused that we started a brand new Worker for the same (already running)
task in the queue in every run() - aka fork bomb.

Now, when we do not clear the _tracked_workers set we have to re-work a
bit the _start_tracking_worker() method. It had two purposes (calculate
limits, and add the worker to _tracked_workers). Now we have those two
purposes separated, so the method is renamed.

We also have to load the initial state of _tracked_workers from RedisDB,
otherwise we are unable to adopt the workers started by previous
dispatcher process (before systemctl restart copr-backend.target).

rebased onto fa8466c

2 years ago

Metadata Update from @praiskup:
- Pull-request tagged with: release-blocker

2 years ago

Build succeeded.

I am testing it now, and seems OK.

Pull-Request has been merged by praiskup

2 years ago