From 7431d5e152edf836130b4155624e6eed367d8c83 Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Nov 17 2022 03:05:34 +0000 Subject: distgit: fair processing of task from multiple sandboxes Even users coming "late" should be able to process at least some importing tasks quickly, and outrun the large queue generated by other users. This is guaranteed by giving the most priority (priority=1) to the first task from each sandbox. Next priority (2) is given to the second task from each sandbox, and so on. This means that no matter how big the queue is, "my first import task" (excluding background jobs!) gets always the highest priority — and will be processed relatively quickly. Merges: #2370 --- diff --git a/dist-git/copr-dist-git.spec b/dist-git/copr-dist-git.spec index 4cd9de3..6f1084b 100644 --- a/dist-git/copr-dist-git.spec +++ b/dist-git/copr-dist-git.spec @@ -23,6 +23,7 @@ BuildRequires: python3-rpkg BuildRequires: python3-pytest BuildRequires: python3-copr-common >= %copr_common_version BuildRequires: python3-oslo-concurrency +BuildRequires: python3-redis BuildRequires: python3-setproctitle Recommends: logrotate diff --git a/dist-git/copr_dist_git/import_dispatcher.py b/dist-git/copr_dist_git/import_dispatcher.py index 028c151..bd31cb5 100644 --- a/dist-git/copr_dist_git/import_dispatcher.py +++ b/dist-git/copr_dist_git/import_dispatcher.py @@ -17,6 +17,20 @@ LIMITS = { } +class _PriorityCounter: + def __init__(self): + self._counter = {} + + def get_priority(self, task): + """ + Calculate the "dynamic" import task priority. + _counter["sandbox"] = value + """ + self._counter.setdefault(task.sandbox, 0) + self._counter[task.sandbox] += 1 + return self._counter[task.sandbox] + + class ImportDispatcher(Dispatcher): """ Kick-off a dispatcher daemon for importing tasks into DistGit. @@ -45,6 +59,9 @@ class ImportDispatcher(Dispatcher): def get_frontend_tasks(self): importer = Importer(self.opts) tasks = importer.try_to_obtain_new_tasks(limit=999999) + counter = _PriorityCounter() + for task in tasks: + task.dispatcher_priority += counter.get_priority(task) return tasks def _create_per_task_logs_directory(self, path): diff --git a/dist-git/copr_dist_git/import_task.py b/dist-git/copr_dist_git/import_task.py index c670eda..0b073f8 100644 --- a/dist-git/copr_dist_git/import_task.py +++ b/dist-git/copr_dist_git/import_task.py @@ -14,6 +14,7 @@ class ImportTask(QueueTask): self.srpm_url = None self.sandbox = None self.background = None + self.dispatcher_priority = 0 @staticmethod def from_dict(task_dict): @@ -39,7 +40,8 @@ class ImportTask(QueueTask): @property def priority(self): - return 100 if self.background else 0 + value = 100 if self.background else 0 + return value + self.dispatcher_priority @property def repo_namespace(self): diff --git a/dist-git/tests/base.py b/dist-git/tests/base.py index 03287a7..34f49ad 100644 --- a/dist-git/tests/base.py +++ b/dist-git/tests/base.py @@ -1,16 +1,18 @@ import os import shutil import tempfile -import munch import time -import json + +import munch from copr_dist_git import importer from copr_dist_git import import_task +from copr_dist_git import import_dispatcher class Base(object): def setup_method(self, method): + # pylint: disable=attribute-defined-outside-init self.tmp_dir_name = self.make_temp_dir() self.lookaside_location = os.path.join(self.tmp_dir_name, "lookaside") self.per_task_location = os.path.join(self.tmp_dir_name, "per-task-logs") @@ -30,6 +32,7 @@ class Base(object): "multiple_threads": True, "git_user_name": "Test user", "git_user_email": "test@test.org", + "max_workers": 10, }) self.importer = importer.Importer(self.opts) @@ -67,6 +70,8 @@ class Base(object): self.url_task = import_task.ImportTask.from_dict(self.url_task_data) self.upload_task = import_task.ImportTask.from_dict(self.upload_task_data) + self.dispatcher = import_dispatcher.ImportDispatcher(self.opts) + self.dispatcher.importer = self.importer def teardown_method(self, method): self.rm_tmp_dir() diff --git a/dist-git/tests/test_importer.py b/dist-git/tests/test_importer.py index 1161c6c..aa67690 100644 --- a/dist-git/tests/test_importer.py +++ b/dist-git/tests/test_importer.py @@ -1,5 +1,6 @@ # coding: utf-8 +from collections import defaultdict import json import os @@ -12,6 +13,7 @@ from base import Base from unittest import mock from unittest.mock import MagicMock +import copr_dist_git.import_task MODULE_REF = 'copr_dist_git.importer' @@ -124,6 +126,29 @@ class TestImporter(Base): 'pkg_version': '1.2', 'git_hash': '124', 'repo_name': 'foo'}) ])) + + @mock.patch("copr_dist_git.import_dispatcher.Importer", return_value=MagicMock()) + def test_priorities(self, importer): + importer.return_value = self.importer + def _shortener(the_dict): + return copr_dist_git.import_task.ImportTask.from_dict( + defaultdict(lambda: "notset", the_dict)) + self.importer.try_to_obtain_new_tasks = MagicMock() + self.importer.try_to_obtain_new_tasks.return_value = [ + _shortener({"build_id": 1, "sandbox": "a", "background": False}), + _shortener({"build_id": 2, "sandbox": "a", "background": False}), + _shortener({"build_id": 3, "sandbox": "b", "background": False}), + _shortener({"build_id": 3, "sandbox": "c", "background": False}), + _shortener({"build_id": 1, "sandbox": "a", "background": True}), + ] + tasks = self.dispatcher.get_frontend_tasks() + assert tasks[0].priority == 1 + assert tasks[1].priority == 2 + assert tasks[2].priority == 1 + assert tasks[3].priority == 1 + assert tasks[4].priority == 103 + + def test_run(self, mc_time, mc_worker): self.importer.try_to_obtain_new_tasks = MagicMock() self.importer.do_import = MagicMock()