#2144 backend: fix slow priority queue filling
Merged 2 years ago by praiskup. Opened 2 years ago by frostyx.
copr/ frostyx/copr slow-queue-filling  into  main

@@ -357,9 +357,7 @@ 

          task_id = repr(task)

          worker_id = self.get_worker_id(task_id)

  

-         # TODO: We now track workers in-memory, no need to re-query Redis for

-         # self.worker_ids() all the time (we can cache it for one run)

-         if worker_id in self.worker_ids():

+         if worker_id in self._tracked_workers:

              # No need to re-add this to queue.

              self._start_tracking_worker(worker_id, task)

              self.log.debug("Task %s already has a worker process", task_id)
@@ -414,7 +412,7 @@ 

  

              self._cleanup_workers(now)

  

-             worker_count = len(self.worker_ids())

+             worker_count = len(self._tracked_workers)

              if worker_count >= self.max_workers:

                  time.sleep(1)

                  continue
@@ -468,6 +466,7 @@ 

  

      def _delete_worker(self, worker_id):

          self.redis.delete(worker_id)

+         self._tracked_workers.discard(worker_id)

  

      def _cleanup_workers(self, now):

          for worker_id in self.worker_ids():

@@ -145,6 +145,7 @@ 

      worker_manager = None

  

      def setup_method(self, method):

+         log.setLevel(logging.DEBUG)

          self.setup_redis()

          self.setup_worker_manager()

          self.setup_tasks()
@@ -284,6 +285,32 @@ 

          assert self.redis.hgetall('worker:3') == {}

          assert "cancel_request" in self.redis.hgetall('worker:4')

  

+     def test_slow_priority_queue_filling(self):

+         """

+         We discovered that adding tasks to a priority queue was a bottleneck

+         when having a large (70k+ builds) queue, see #2095. Make sure this never

+         happen again.

+         """

+         tasks = [ToyQueueTask(i) for i in range(100000)]

+ 

+         # We need to run this test with logging only INFO, otherwise we waste

+         # around 5 seconds just on running self.log.debug because we need to

+         # connect to redis for each call

+         # The point of this test is to make sure that adding tasks to priority

+         # queue is not a bottleneck on production, and we don't use DEBUG there

+         # anyway.

+         log.setLevel(logging.INFO)

+ 

+         t1 = time.time()

+         for task in tasks:

+             self.worker_manager.add_task(task)

+         t2 = time.time()

+ 

+         # It should actually be faster than 1 second but I am adding one to

+         # prevent false alarms in case somebody has a slow machine

+         assert t2 - t1 < 2

+ 

+ 

  def wait_pid_exit(pid):

      """ wait till pid stops responding to no-op kill 0 """

      while True:

I am not entirely sure the change is correct, what do you think?
Anyway, it cuts the time for running the test from 10s to 5s.

The other remaining bottleneck is

self.log.debug("Adding task %s to queue, priority %s", task_id,
                task.priority)

Any of its arguments are problematic, the logging itself is problematic. Even if you do

self.log.debug("Foo")

It is slow. When removing the line entirely, the test finishes under 1s.

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

I am not entirely sure the change is correct, what do you think?

You are on the right track I'm sure. Re-querying the workers from Redis for
each added task queue is certainly the (main) bottleneck here.

Anyway, it cuts the time for running the test from 10s to 5s.

Good. I think this is where we want to get at least!

Any of its arguments are problematic, the logging itself is problematic. Even if you do
self.log.debug(
It is slow. When removing the line entirely, the test finishes under 1s.

Logging goes through Redis as well, the performance penalty is expected I would say.
At least I believe that if we increased the level to "info" from "debug", the penalty
disappears, right? (that is what we have in production)

I think we can afford doing this for the add_task() caller, but not for the cancel_task_id() caller and not for self.run() caller. The thing is that idally, all the callers get "up2date" info (not a cached variant).

Yeah, I got here as well over the weekend. IMO we really want to drop
this TODO and start tracking the workers both in Redis and in-memory in
self._worker_ids(). Simply, we should load the worker-ids from Redis once
per the cycle ... BUT anytime we add or remove worker (see e.g.
_start_worker()), we should not only modify the Redis state but we should
also modify the self._worker_ids set(). This will make it safe.

Also see 15dc4a1. What I probably meant is that we should use
self._tracked_workers set. I think it is the right time to consolidate
one of those duplicates (self._worker_ids and self._tracked_workers).

rebased onto da7b0046767e6aa7501278b101ae2a978bfa7cd4

2 years ago

I think I addressed all the comments?
PTAL

Build succeeded.

Looking at _start_tracking_worker, I am not sure if we need to add WorkerLimit.worker_deleted function and undo everything that WorkerLimit.worker_added does or not. What do you think?

We were not doing this so far at least, so I don't think so.

This is not obvious, can you specify why we need this? (I would say we don't but I may be missing something)

This is correct, but I would probably just do self._tracked_workers.discard(worker_id)?

Wow, you help the situation in #2139! I would say this deserves a separate commit (sounds a bit off-topic WRT queue filling).

Maybe we don't. My strategy was to find every self.redis statement for deleting workers and call self._stop_tracking_worker(worker_id) at the same place. But if I remove this line, tests still pass, so we can probably drop it.

rebased onto 7e213a2e82f0c654444244c9acf0c6a8564b1be5

2 years ago

Merge Failed.

This change or one of its cross-repo dependencies was unable to be automatically merged with the current state of its repository. Please rebase the change and upload a new patchset.

rebased onto b0e0e39a21756aa3c1451091bf4bfe3d21efbd90

2 years ago

This is not obvious, can you specify why we need this? (I would say we don't but I may be missing something)

So, I dropped it

This is correct, but I would probably just do self._tracked_workers.discard(worker_id)?

TIL :-)
Fixed

Wow, you help the situation in #2139! I would say this deserves a separate commit (sounds a bit off-topic WRT queue filling).

Done

Build succeeded.

This call still deserves a comment if it is really needed. I would just drop this call.

Now I realized that I previously commented on a different part of the code, right? But this has the same issue.

My strategy was to find every self.redis statement for deleting workers and call self._stop_tracking_worker(worker_id) at the same place.

That's it ... I think it is the correct strategy, but to my own surprise
self.redis.delete() is called only in one place.

rebased onto 412ea19

2 years ago

This call still deserves a comment if it is really needed. I would just drop this call.

Now I realized that I previously commented on a different part of the code, right? But this has the > same issue.

Dropped :-)

Build succeeded.

Pull-Request has been merged by praiskup

2 years ago