#4044 Update getNextTask for scheduler
Merged a month ago by tkopecek. Opened 2 months ago by mikem.

file modified
+4 -2
@@ -153,10 +153,11 @@ 

      signal.signal(signal.SIGTERM, shutdown)

      signal.signal(signal.SIGUSR1, restart)

      exit_code = 0

+     taken = False

      while True:

          try:

-             taken = False

-             tm.updateBuildroots()

+             if not taken:

+                 tm.updateBuildroots()

              tm.updateTasks()

              taken = tm.getNextTask()

          except (SystemExit, ServerExit, KeyboardInterrupt):
@@ -179,6 +180,7 @@ 

              # XXX - this is a little extreme

              # log the exception and continue

              logger.error(''.join(traceback.format_exception(*sys.exc_info())))

+             taken = False

          try:

              if not taken:

                  # Only sleep if we didn't take a task, otherwise retry immediately.

file modified
+23 -109
@@ -1026,129 +1026,41 @@ 

                      self.logger.info("Lingering task %r (pid %r)" % (id, pid))

  

      def getNextTask(self):

+         """Task the next task

+ 

+         :returns: True if a task was taken, False otherwise

+         """

          self.ready = self.readyForTask()

          self.session.host.updateHost(self.task_load, self.ready)

          if not self.ready:

              self.logger.info("Not ready for task")

              return False

-         hosts, tasks = self.session.host.getLoadData()

-         self.logger.debug("Load Data:")

-         self.logger.debug("  hosts: %r" % hosts)

-         self.logger.debug("  tasks: %r" % tasks)

-         # now we organize this data into channel-arch bins

-         bin_hosts = {}  # hosts indexed by bin

-         bins = {}  # bins for this host

-         our_avail = None

-         for host in hosts:

-             host['bins'] = []

-             if host['id'] == self.host_id:

-                 # note: task_load reported by server might differ from what we

-                 # sent due to precision variation

-                 our_avail = host['capacity'] - host['task_load']

-             for chan in host['channels']:

-                 for arch in host['arches'].split() + ['noarch']:

-                     bin = "%s:%s" % (chan, arch)

-                     bin_hosts.setdefault(bin, []).append(host)

-                     if host['id'] == self.host_id:

-                         bins[bin] = 1

-         self.logger.debug("bins: %r" % bins)

-         if our_avail is None:

-             self.logger.info("Server did not report this host. Are we disabled?")

-             return False

-         elif not bins:

-             self.logger.info("No bins for this host. Missing channel/arch config?")

-             # Note: we may still take an assigned task below

-         # sort available capacities for each of our bins

-         avail = {}

-         for bin in bins:

-             avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]

-             avail[bin].sort()

-             avail[bin].reverse()

-         self.cleanDelayTimes()

+ 

+         # get our assigned tasks

+         tasks = self.session.host.getTasks()

          for task in tasks:

-             # note: tasks are in priority order

              self.logger.debug("task: %r" % task)

-             if task['method'] not in self.handlers:

-                 self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)

-                 continue

              if task['id'] in self.tasks:

                  # we were running this task, but it apparently has been

-                 # freed or reassigned. We can't do anything with it until

+                 # reassigned. We can't do anything with it until

                  # updateTasks notices this and cleans up.

-                 self.logger.debug("Task %(id)s freed or reassigned", task)

+                 self.logger.info("Task %(id)s reassigned", task)

+                 continue

+             if task['state'] != koji.TASK_STATES['ASSIGNED']:

+                 # shouldn't happen

+                 self.logger.error("Recieved task %(id)s is not assigned, state=%(state)s", task)

+                 continue

+             if task['host_id'] != self.host_id:

+                 # shouldn't happen

+                 self.logger.error("Recieved task %(id)s is not ours, host=%(host_id)s", task)

                  continue

-             if task['state'] == koji.TASK_STATES['ASSIGNED']:

-                 self.logger.debug("task is assigned")

-                 if self.host_id == task['host_id']:

-                     # assigned to us, we can take it regardless

-                     if self.takeTask(task):

-                         return True

-             elif task['state'] == koji.TASK_STATES['FREE']:

-                 bin = "%(channel_id)s:%(arch)s" % task

-                 self.logger.debug("task is free, bin=%r" % bin)

-                 if bin not in bins:

-                     continue

-                 # see where our available capacity is compared to other hosts for this bin

-                 # (note: the hosts in this bin are exactly those that could

-                 # accept this task)

-                 bin_avail = avail.get(bin, [0])

-                 if self.checkAvailDelay(task, bin_avail, our_avail):

-                     # decline for now and give the upper half a chance

-                     continue

-                 # otherwise, we attempt to open the task

-                 if self.takeTask(task):

-                     return True

-             else:

-                 # should not happen

-                 raise Exception("Invalid task state reported by server")

-         return False

- 

-     def checkAvailDelay(self, task, bin_avail, our_avail):

-         """Check to see if we should still delay taking a task

- 

-         Returns True if we are still in the delay period and should skip the

-         task. Otherwise False (delay has expired).

-         """

- 

-         now = time.time()

-         ts = self.skipped_tasks.get(task['id'])

-         if not ts:

-             ts = self.skipped_tasks[task['id']] = now

- 

-         # determine our normalized bin rank

-         for pos, cap in enumerate(bin_avail):

-             if our_avail >= cap:

-                 break

-         if len(bin_avail) > 1:

-             rank = float(pos) / (len(bin_avail) - 1)

-         else:

-             rank = 0.0

-         # so, 0.0 for highest available capacity, 1.0 for lowest

  

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         delay *= rank

+             # otherwise attempt to take it

+             if self.takeTask(task):

Maybe we can expand this to take all assigned tasks instead of waiting for next cycle. Builder should be able to accomoddate almost all assigned tasks at this point. It could improve throughput especially for cheap/short tasks like tagNotification.

+                 return True

  

-         # return True if we should delay

-         if now - ts < delay:

-             self.logger.debug("skipping task %i, age=%s rank=%s"

-                               % (task['id'], int(now - ts), rank))

-             return True

-         # otherwise

-         del self.skipped_tasks[task['id']]

          return False

  

-     def cleanDelayTimes(self):

-         """Remove old entries from skipped_tasks"""

-         now = time.time()

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         cutoff = now - delay * 10

-         # After 10x the delay, we've had plenty of opportunity to take the

-         # task, so either it has already been taken or we can't take it.

-         for task_id in list(self.skipped_tasks):

-             ts = self.skipped_tasks[task_id]

-             if ts < cutoff:

-                 del self.skipped_tasks[task_id]

- 

      def _waitTask(self, task_id, pid=None):

          """Wait (nohang) on the task, return true if finished"""

          if pid is None:
@@ -1410,7 +1322,9 @@ 

          if method in self.handlers:

              handlerClass = self.handlers[method]

          else:

-             raise koji.GenericError("No handler found for method '%s'" % method)

+             self.logger.warning("Refusing task %(id)s, no handler for %(method)s", task)

+             self.session.host.refuseTask(task['id'], soft=False, msg="no handler for method")

+             return False

          task_info = self.session.getTaskInfo(task['id'], request=True)

          if task_info.get('request') is None:

              self.logger.warning("Task '%s' has no request" % task['id'])

@@ -1,109 +0,0 @@ 

- from __future__ import absolute_import

- import mock

- import unittest

- 

- import koji.daemon

- import koji

- 

- 

- class TestDelayTimes(unittest.TestCase):

- 

-     def setUp(self):

-         self.options = mock.MagicMock()

-         self.session = mock.MagicMock()

-         self.tm = koji.daemon.TaskManager(self.options, self.session)

-         self.time = mock.patch('time.time').start()

- 

-     def tearDown(self):

-         mock.patch.stopall()

- 

-     def test_check_avail_delay(self):

-         self.options.task_avail_delay = 180  # same as default

- 

-         # highest capacity, no skip entry

-         start = 10000

-         task = {'id': 100}

-         self.tm.skipped_tasks = {}

-         self.time.return_value = start

-         bin_avail = [10.0, 9.0, 8.0, 7.0]

-         our_avail = 10.1

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, False)

- 

-         # not highest, no skip entry

-         our_avail = 9.0

-         self.tm.skipped_tasks = {}

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, True)

- 

-         # last, but past full delay

-         self.tm.skipped_tasks = {task['id']: start}

-         our_avail = 7.0

-         self.options.task_avail_delay = 500

-         self.time.return_value = start + 500

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, False)

- 

-         # last, but less than delay

-         self.tm.skipped_tasks = {task['id']: start}

-         our_avail = 7.0

-         self.time.return_value = start + 499

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, True)

- 

-         # median, but less than scaled delay

-         self.tm.skipped_tasks = {task['id']: start}

-         bin_avail = [10.0, 9.0, 8.0, 7.0, 6.0]

-         our_avail = 8.0

-         # rank = 2/4 = 0.5, so adjusted delay is 250

-         self.time.return_value = start + 249

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, True)

- 

-         # median, but past scaled delay

-         self.tm.skipped_tasks = {task['id']: start}

-         bin_avail = [10.0, 9.0, 8.0, 7.0, 6.0]

-         our_avail = 8.0

-         # rank = 3/4 = 0.75, so adjusted delay is 250

-         self.time.return_value = start + 476

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, False)

- 

-         # only one in bin

-         self.tm.skipped_tasks = {}

-         bin_avail = [5.0]

-         our_avail = 5.0

-         self.time.return_value = start

-         chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)

-         self.assertEqual(chk, False)

- 

- 

-     def test_clean_delay_times(self):

-         self.options.task_avail_delay = 180  # same as default

- 

-         # test no skipped entries

-         start = 10000

-         self.time.return_value = start + 100

-         self.tm.skipped_tasks = {}

-         self.tm.cleanDelayTimes()

-         self.assertEqual(self.tm.skipped_tasks, {})

- 

-         # test all skipped entries

-         self.time.return_value = start + 5000

-         skipped = {}

-         for i in range(25):

-             skipped[i] = start + i

-             # all older than 180 in age

-         self.tm.skipped_tasks = skipped

-         self.tm.cleanDelayTimes()

-         self.assertEqual(self.tm.skipped_tasks, {})

- 

-         # test mixed entries

-         skipped = {100: start + 5000}

-         expected = skipped.copy()

-         for i in range(25):

-             skipped[i] = start + i

-             # all older than 180 in age

-         self.tm.skipped_tasks = skipped

-         self.tm.cleanDelayTimes()

-         self.assertEqual(self.tm.skipped_tasks, expected)

@@ -0,0 +1,218 @@ 

+ from __future__ import absolute_import

+ import mock

+ import unittest

+ 

+ import koji.daemon

+ import koji

+ 

+ 

+ class TestGetNextTask(unittest.TestCase):

+ 

+     def setUp(self):

+         self.options = mock.MagicMock()

+         self.session = mock.MagicMock()

+         self.tm = koji.daemon.TaskManager(self.options, self.session)

+         self.tm.readyForTask = mock.MagicMock()

+         self.tm.takeTask = mock.MagicMock()

+         self.time = mock.patch('time.time').start()

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     def test_not_ready(self):

+         self.tm.readyForTask.return_value = False

+ 

+         retval = self.tm.getNextTask()

+ 

+         self.assertEqual(retval, False)

+         self.session.host.getTasks.assert_not_called()

+         self.tm.takeTask.assert_not_called()

+ 

+     def test_no_tasks(self):

+         self.tm.host_id = host_id = 999

+         self.tm.readyForTask.return_value = True

+         self.session.host.getTasks.return_value = []

+ 

+         retval = self.tm.getNextTask()

+ 

+         self.assertEqual(retval, False)

+         self.session.host.getTasks.assert_called_once()

+         self.tm.takeTask.assert_not_called()

+ 

+     def test_one_good_task(self):

+         self.tm.host_id = host_id = 999

+         self.tm.readyForTask.return_value = True

+         self.tm.tasks = {3: 'already running'}

+         tasks = [

+             {'id': 1, 'state': koji.TASK_STATES['FREE'], 'host_id': None},  # bad state

+             {'id': 2, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': 666},  # wrong host

+             {'id': 3, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': host_id},  # already in tasks

+             {'id': 4, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': host_id},  # good

+         ]

+         self.session.host.getTasks.return_value = tasks

+ 

+         retval = self.tm.getNextTask()

+ 

+         self.assertEqual(retval, True)

+         self.session.host.getTasks.assert_called_once()

+         self.tm.takeTask.assert_called_once_with(tasks[3])

+ 

+ 

+ class TestTakeTask(unittest.TestCase):

+ 

+     def setUp(self):

+         self.options = mock.MagicMock()

+         self.session = mock.MagicMock()

+         self.tm = koji.daemon.TaskManager(self.options, self.session)

+         self.tm.readyForTask = mock.MagicMock()

+         self.tm.runTask = mock.MagicMock()

+         self.tm.forkTask = mock.MagicMock()

+         self.time = mock.patch('time.time').start()

+         self.handler = mock.MagicMock()

+         self.tm.handlers = {'fake': mock.MagicMock(return_value=self.handler)}

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     def test_simple_fork(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+         }

+         self.handler.Foreground = False

+         self.session.host.openTask.return_value = task

+         self.tm.forkTask.return_value = ['PID', 'SESSION']

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.handler.setManager.assert_not_called()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_called_once_with(self.handler)

+         self.assertEqual(self.tm.pids, {4: 'PID'})

+         self.assertEqual(self.tm.subsessions, {4: 'SESSION'})

+         self.assertEqual(retval, True)

+ 

+     def test_simple_foreground(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+         }

+         self.handler.Foreground = True

+         self.session.host.openTask.return_value = task

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.handler.setManager.assert_called_once_with(self.tm)

+         self.tm.runTask.assert_called_once_with(self.handler)

+         self.tm.forkTask.assert_not_called()

+         self.assertEqual(self.tm.pids, {})

+         self.assertEqual(self.tm.subsessions, {})

+         self.assertEqual(retval, True)

+ 

+ 

+     def test_refuse_no_handler(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'missing',

+         }

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.refuseTask.assert_called_once()

+         self.session.getTaskInfo.assert_not_called()

+         self.session.host.openTask.assert_not_called()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

+ 

+     def test_skip_no_request(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+         }

+         self.session.getTaskInfo.return_value = {}

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.openTask.assert_not_called()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

+ 

+     def test_skip_bad_check(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+         }

+         self.handler.checkHost.side_effect = Exception('should refuse')

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.refuseTask.assert_called_once()

+         self.session.host.openTask.assert_not_called()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

+ 

+     def test_open_fails(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+         }

+         self.session.host.openTask.return_value = None

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.openTask.assert_called_once()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

+ 

+     def test_set_weight_fails(self):

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+             'request': '...',

+             'host_id': 999,

+         }

+         self.session.host.openTask.return_value = task

+         self.session.host.setTaskWeight.side_effect = koji.ActionNotAllowed('should skip')

+         task2 = task.copy()

+         task2['host_id'] = 42

+         self.session.getTaskInfo.side_effect = [task, task2]

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.openTask.assert_called_once()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

+ 

+     def test_set_weight_fails_state(self):

+         self.tm.host_id = 999

+         task = {

+             'id': 4,

+             'state': koji.TASK_STATES['ASSIGNED'],

+             'method': 'fake',

+             'request': '...',

+             'host_id': self.tm.host_id,

+         }

+         self.session.host.openTask.return_value = task

+         self.session.host.setTaskWeight.side_effect = koji.ActionNotAllowed('should skip')

+         task2 = task.copy()

+         task2['state'] = koji.TASK_STATES['FREE']

+         self.session.getTaskInfo.side_effect = [task, task2]

+ 

+         retval = self.tm.takeTask(task)

+ 

+         self.assertEqual(retval, False)

+         self.session.host.openTask.assert_called_once()

+         self.tm.runTask.assert_not_called()

+         self.tm.forkTask.assert_not_called()

I had an alternate version of #4029 bundled with some other pending work. I've pulled that out, cleaned it up and added unit tests

Fixes https://pagure.io/koji/issue/4028

Maybe we can expand this to take all assigned tasks instead of waiting for next cycle. Builder should be able to accomoddate almost all assigned tasks at this point. It could improve throughput especially for cheap/short tasks like tagNotification.

But it is probably ok as it is - another run of readyForTask is worth doing.

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

2 months ago

Maybe we can expand this to take all assigned tasks instead of waiting for next cycle

We already skip the sleep if we take one. If I queue up a ton of sleep 0 tasks locally with one builder, I see kojid rapidly fill up to max tasks in less than a second. The limiting factor for me seems to be the scheduler RunInterval setting. If I drop that to 1 sec, kojid seems to grind through these tasks almost as fast as I can make them (in a bash loop of call makeTasks commands).

That said, buildroot cleanup is not happening here and that could slow things down. It would be reasonable to skip this when we take a task.

1 new commit added

  • skip updateBuildroots if we have just taken a task
2 months ago

Metadata Update from @tkopecek:
- Pull-request untagged with: testing-ready
- Pull-request tagged with: no_qe

a month ago

Commit c4b50c6 fixes this pull-request

Pull-Request has been merged by tkopecek

a month ago