From 10c974a6cf183e03cd957324d3422aa3858e0c21 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Jun 27 2017 16:55:24 +0000 Subject: PR#472 New features for restart-hosts command Merges #472 https://pagure.io/koji/pull-request/472 --- diff --git a/cli/koji b/cli/koji index c6c0a6d..fea7448 100755 --- a/cli/koji +++ b/cli/koji @@ -1563,10 +1563,44 @@ def handle_restart_hosts(options, session, args): help=_("Don't wait on task")) parser.add_option("--quiet", action="store_true", help=_("Do not print the task information"), default=options.quiet) + parser.add_option("--force", action="store_true", + help=_("Ignore checks and force operation")) + parser.add_option("--channel", help=_("Only hosts in this channel")) + parser.add_option("--arch", "-a", action="append", default=[], + help=_("Limit to hosts of this architecture (can be " + "given multiple times)")) + parser.add_option("--timeout", metavar='N', type='int', + help=_("Time out after N seconds")) (my_opts, args) = parser.parse_args(args) activate_session(session) - task_id = session.restartHosts() + + # check for existing restart tasks + if not my_opts.force: + query = { + 'method': 'restartHosts', + 'state': + [koji.TASK_STATES[s] for s in ('FREE', 'OPEN', 'ASSIGNED')], + } + others = session.listTasks(query) + if others: + print('Found other restartHosts tasks running.') + print('Task ids: %r' % [t['id'] for t in others]) + print('Use --force to run anyway') + return 1 + + callopts = {} + if my_opts.channel: + callopts['channel'] = my_opts.channel + if my_opts.arch: + callopts['arches'] = my_opts.arch + if my_opts.timeout: + callopts['timeout'] = my_opts.timeout + if callopts: + task_id = session.restartHosts(options=callopts) + else: + # allow default case to work with older hub + task_id = session.restartHosts() if my_opts.wait or (my_opts.wait is None and not _running_in_bg()): session.logout() return watch_tasks(session, [task_id], quiet=my_opts.quiet) diff --git a/koji/tasks.py b/koji/tasks.py index 9591c23..b2258e3 100644 --- a/koji/tasks.py +++ b/koji/tasks.py @@ -191,7 +191,8 @@ class BaseTaskHandler(object): safe_rmtree(self.workdir, unmount=False, strict=True) #os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir]) - def wait(self, subtasks=None, all=False, failany=False, canfail=None): + def wait(self, subtasks=None, all=False, failany=False, canfail=None, + timeout=None): """Wait on subtasks subtasks is a list of integers (or an integer). If more than one subtask @@ -206,6 +207,9 @@ class BaseTaskHandler(object): If canfail is given a list of task ids, then those tasks can fail without affecting the other tasks. + If timeout is specified, then subtasks will be failed and an exception + raised when the timeout is exceeded. + special values: subtasks = None specify all subtasks @@ -223,6 +227,7 @@ class BaseTaskHandler(object): subtasks = [subtasks] self.session.host.taskSetWait(self.id, subtasks) self.logger.debug("Waiting on %r" % subtasks) + start = time.time() while True: finished, unfinished = self.session.host.taskWait(self.id) if len(unfinished) == 0: @@ -251,11 +256,26 @@ class BaseTaskHandler(object): else: # at least one done break - # signal handler set by TaskManager.forkTask - self.logger.debug("Pausing...") - signal.pause() - # main process will wake us up with SIGUSR2 - self.logger.debug("...waking up") + if timeout: + # sleep until timeout is up (or let main process wake us up) + remain = start + timeout - time.time() + if remain > 0: + self.logger.debug("Sleeping for %.1fs", remain) + time.sleep(remain) + # check if we're timed out + duration = time.time() - start + if duration > timeout: + self.logger.info('Subtasks timed out') + self.session.cancelTaskChildren(self.id) + raise koji.GenericError('Subtasks timed out after %.1f ' + 'seconds' % duration) + else: + # signal handler set by TaskManager.forkTask + self.logger.debug("Pausing...") + signal.pause() + # main process will wake us up with SIGUSR2 + self.logger.debug("...waking up") + self.logger.debug("Finished waiting") if all: finished = subtasks @@ -495,36 +515,51 @@ class RestartVerifyTask(BaseTaskHandler): class RestartHostsTask(BaseTaskHandler): - """Gracefully restart the daemon""" + """Gracefully restart the build hosts""" Methods = ['restartHosts'] _taskWeight = 0.1 - def handler(self): - hosts = self.session.listHosts(enabled=True) + def handler(self, options=None): + if options is None: + options = {} + # figure out which hosts we're restarting + hostquery = {'enabled': True} + if 'channel' in options: + chan = self.session.getChannel(options['channel'], strict=True) + hostquery['channelID']= chan['id'] + if 'arches' in options: + hostquery['arches'] = options['arches'] + hosts = self.session.listHosts(**hostquery) if not hosts: - raise koji.GenericError("No hosts enabled") + raise koji.GenericError("No matching hosts") + + timeout = options.get('timeout', 3600*24) + + # fire off the subtasks this_host = self.session.host.getID() subtasks = [] my_tasks = None for host in hosts: - #note: currently task assignments bypass channel restrictions + # note: currently task assignments bypass channel restrictions task1 = self.subtask('restart', [host], assign=host['id'], label="restart %i" % host['id']) task2 = self.subtask('restartVerify', [task1, host], assign=host['id'], label="sleep %i" % host['id']) subtasks.append(task1) subtasks.append(task2) if host['id'] == this_host: my_tasks = [task1, task2] - if not my_tasks: - raise koji.GenericError('This host is not enabled') - self.wait(my_tasks[0]) - #see if we've restarted - if not self.session.taskFinished(my_tasks[1]): - raise ServerRestart - #raising this inside a task handler causes TaskManager.runTask - #to free the task so that it will not block a pending restart + + # if we're being restarted, then we have to take extra steps + if my_tasks: + self.wait(my_tasks[0], timeout=timeout) + # see if we've restarted + if not self.session.taskFinished(my_tasks[1]): + raise ServerRestart + # raising this inside a task handler causes TaskManager.runTask + # to free the task so that it will not block a pending restart + + # at this point the subtasks do the rest if subtasks: - self.wait(subtasks, all=True) - return + self.wait(subtasks, all=True, timeout=timeout) class DependantTask(BaseTaskHandler): diff --git a/tests/test_lib_py2only/test_restart_tasks.py b/tests/test_lib_py2only/test_restart_tasks.py new file mode 100644 index 0000000..7cf56a0 --- /dev/null +++ b/tests/test_lib_py2only/test_restart_tasks.py @@ -0,0 +1,230 @@ +import mock +import unittest + +import koji.tasks + + +class TestRestartTask(unittest.TestCase): + + def setUp(self): + self.session = mock.MagicMock() + self.options = mock.MagicMock() + self.manager = mock.MagicMock() + self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start() + + def tearDown(self): + mock.patch.stopall() + + def get_handler(self, *args, **kwargs): + params = koji.encode_args(*args, **kwargs) + handler = koji.tasks.RestartTask(137, 'restart', params, self.session, + self.options) + # this is a foreground task + handler.setManager(self.manager) + return handler + + def test_restart_task(self): + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "HOST ID" + handler = self.get_handler(host) + self.assertEqual(handler.Foreground, True) + result = handler.run() + + self.assertEqual(self.manager.restart_pending, True) + + def test_restart_wrong_host(self): + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "ANOTHER HOST" + handler = self.get_handler(host) + self.assertEqual(handler.Foreground, True) + with self.assertRaises(koji.GenericError): + result = handler.run() + + +class TestRestartVerifyTask(unittest.TestCase): + + def setUp(self): + self.session = mock.MagicMock() + self.options = mock.MagicMock() + self.manager = mock.MagicMock() + self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start() + + def tearDown(self): + mock.patch.stopall() + + def get_handler(self, *args, **kwargs): + params = koji.encode_args(*args, **kwargs) + handler = koji.tasks.RestartVerifyTask(137, 'restartVerify', params, self.session, + self.options) + # this is a foreground task + handler.setManager(self.manager) + return handler + + def test_restart_verify_task(self): + task1 = { + 'id': 'TASK ID', + 'state': koji.TASK_STATES['CLOSED'], + 'completion_ts': 10, + } + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "HOST ID" + self.session.getTaskInfo.return_value = task1 + handler = self.get_handler(task1['id'], host) + self.manager.start_time = 100 # greater than task1['start_time'] + self.assertEqual(handler.Foreground, True) + result = handler.run() + + def test_restart_verify_not_closed(self): + task1 = { + 'id': 'TASK ID', + 'state': koji.TASK_STATES['OPEN'], + 'completion_ts': 10, + } + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "HOST ID" + self.session.getTaskInfo.return_value = task1 + handler = self.get_handler(task1['id'], host) + try: + result = handler.run() + except koji.GenericError as e: + self.assertEqual(e.args[0], 'Stage one restart task is OPEN') + else: + raise Exception('Error not raised') + + def test_restart_verify_wrong_host(self): + task1 = { + 'id': 'TASK ID', + 'state': koji.TASK_STATES['CLOSED'], + 'completion_ts': 10, + } + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "OTHER HOST" + self.session.getTaskInfo.return_value = task1 + handler = self.get_handler(task1['id'], host) + try: + result = handler.run() + except koji.GenericError as e: + self.assertEqual(e.args[0], 'Host mismatch') + else: + raise Exception('Error not raised') + + def test_restart_verify_wrong_time(self): + task1 = { + 'id': 'TASK ID', + 'state': koji.TASK_STATES['CLOSED'], + 'completion_ts': 10, + } + host = {'id': 'HOST ID'} + self.session.host.getID.return_value = "HOST ID" + self.session.getTaskInfo.return_value = task1 + handler = self.get_handler(task1['id'], host) + self.manager.start_time = 0 # LESS THAN task1['start_time'] + try: + result = handler.run() + except koji.GenericError as e: + self.assertEqual(e.args[0][:30], 'Restart failed - start time is') + else: + raise Exception('Error not raised') + + +class TestRestartHostsTask(unittest.TestCase): + + def setUp(self): + self.session = mock.MagicMock() + self.options = mock.MagicMock() + self.manager = mock.MagicMock() + self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start() + + def tearDown(self): + mock.patch.stopall() + + def get_handler(self, *args, **kwargs): + params = koji.encode_args(*args, **kwargs) + handler = koji.tasks.RestartHostsTask(137, 'restartHosts', params, self.session, + self.options) + handler.wait = mock.MagicMock() + handler.subtask = mock.MagicMock() + return handler + + def test_restart_hosts_task(self): + self.session.host.getID.return_value = "THIS HOST" + host = {'id': 99} + self.session.listHosts.return_value = [host] + handler = self.get_handler({}) + handler.subtask.side_effect = [101, 102] + result = handler.run() + + self.session.listHosts.assert_called_once_with(enabled=True) + self.session.taskFinished.assert_not_called() + handler.wait.assert_called_once_with([101, 102], all=True, timeout=3600*24) + # subtask calls + call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id']) + call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id']) + handler.subtask.assert_has_calls([call1, call2]) + + def test_restart_hosts_no_host(self): + self.session.listHosts.return_value = [] + handler = self.get_handler({}) + try: + result = handler.run() + except koji.GenericError as e: + self.assertEqual(e.args[0], 'No matching hosts') + else: + raise Exception('Error not raised') + + self.session.listHosts.assert_called_once_with(enabled=True) + self.session.taskFinished.assert_not_called() + handler.wait.assert_not_called() + handler.subtask.assert_not_called() + + def test_restart_hosts_with_opts(self): + self.session.host.getID.return_value = "THIS HOST" + host = {'id': 99} + self.session.listHosts.return_value = [host] + self.session.getChannel.return_value = {'id': 1, 'name': 'default'} + handler = self.get_handler({'channel': 'default', 'arches': ['x86_64']}) + handler.subtask.side_effect = [101, 102] + result = handler.run() + + self.session.listHosts.assert_called_once_with(enabled=True, channelID=1, arches=['x86_64']) + self.session.taskFinished.assert_not_called() + handler.wait.assert_called_once_with([101, 102], all=True, timeout=3600*24) + # subtask calls + call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id']) + call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id']) + handler.subtask.assert_has_calls([call1, call2]) + + def test_restart_hosts_self_finished(self): + self.session.host.getID.return_value = 99 + host = {'id': 99} + self.session.listHosts.return_value = [host] + handler = self.get_handler({}) + self.session.taskFinished.return_value = True + handler.subtask.side_effect = [101, 102] + result = handler.run() + + self.session.listHosts.assert_called_once_with(enabled=True) + self.session.taskFinished.assert_called_once() + call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id']) + call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id']) + handler.subtask.assert_has_calls([call1, call2]) + call1 = mock.call(101, timeout=3600*24) + call2 = mock.call([101, 102], all=True, timeout=3600*24) + handler.wait.assert_has_calls([call1, call2]) + + def test_restart_hosts_self_unfinished(self): + self.session.host.getID.return_value = 99 + host = {'id': 99} + self.session.listHosts.return_value = [host] + handler = self.get_handler({}) + self.session.taskFinished.return_value = False + handler.subtask.side_effect = [101, 102] + with self.assertRaises(koji.tasks.ServerRestart): + result = handler.run() + + self.session.listHosts.assert_called_once_with(enabled=True) + self.session.taskFinished.assert_called_once() + call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id']) + call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id']) + handler.subtask.assert_has_calls([call1, call2]) + handler.wait.assert_called_once_with(101, timeout=3600*24) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index c66d5f6..b7e70e7 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -4,7 +4,7 @@ from os import path, makedirs from shutil import rmtree from tempfile import gettempdir from unittest import TestCase -from mock import patch, Mock, call +from mock import patch, MagicMock, Mock, call import koji from koji.tasks import BaseTaskHandler, FakeTask, ForkTask, SleepTask, \ @@ -358,6 +358,51 @@ class TasksTestCase(TestCase): self.assertEquals(e.message, 'Uh oh, we\'ve got a problem here!') obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234]) + @patch('time.time') + @patch('time.sleep') + @patch('signal.pause') + def test_BaseTaskHandler_wait_timeout(self, pause, sleep, time): + """Tests timeout behavior in the wait function""" + temp_path = get_tmp_dir_path('TestTask') + obj = TestTask(95, 'some_method', ['random_arg'], None, None, temp_path) + makedirs(temp_path) + obj.session = MagicMock() + obj.session.host.taskWait.return_value = [[], [99, 100, 101]] + time.side_effect = list(range(0, 4000, 60)) + try: + obj.wait([99, 100, 101], timeout=3600) + raise Exception('A GenericError was not raised.') + except koji.GenericError as e: + self.assertEquals(e.args[0][:24], 'Subtasks timed out after') + obj.session.host.taskSetWait.assert_called_once_with(95, [99, 100, 101]) + obj.session.cancelTaskChildren.assert_called_once_with(95) + obj.session.getTaskResult.assert_not_called() + pause.assert_not_called() + + @patch('time.time') + @patch('time.sleep') + @patch('signal.pause') + def test_BaseTaskHandler_wait_avoid_timeout(self, pause, sleep, time): + """Tests that timeout does not happen if tasks finish in time""" + temp_path = get_tmp_dir_path('TestTask') + obj = TestTask(95, 'some_method', ['random_arg'], None, None, temp_path) + makedirs(temp_path) + obj.session = MagicMock() + time.side_effect = list(range(0, 4000, 20)) + # time ticks every 20s for a little over an "hour" + # code checks time 3x each cycle (twice directly, once via logging) + # so each cycle is a "minute" + # report all unfinished for most of an hour + taskWait_returns = [[[], [99, 100, 101]]] * 50 + # and then report all done + taskWait_returns.append([[99, 100, 101], []]) + obj.session.host.taskWait.side_effect = taskWait_returns + obj.wait([99, 100, 101], timeout=3600) + + obj.session.host.taskSetWait.assert_called_once_with(95, [99, 100, 101]) + obj.session.cancelTaskChildren.assert_not_called() + pause.assert_not_called() + def test_BaseTaskHandler_getUploadDir(self): """ Tests that the getUploadDir function returns the appropriate path based on the id of the handler. """