#177 allow tasks to fail on some arches for images/lives/appliances
Merged 7 years ago by mikem. Opened 7 years ago by tkopecek.
tkopecek/koji task-fail  into  master

file modified
+25 -2
@@ -57,7 +57,6 @@ 

  from fnmatch import fnmatch

  from gzip import GzipFile

  from optparse import OptionParser, SUPPRESS_HELP

- from StringIO import StringIO

  from yum import repoMDObject


  #imports for LiveCD, LiveMedia, and Appliance handler
@@ -2406,18 +2405,38 @@ 

                  bld_info = self.initImageBuild(name, version, release,

                      target_info, opts)

              subtasks = {}

+             canfail = []

              for arch in arches:

                  subtasks[arch] = self.subtask('createLiveMedia',

                      [name, version, release, arch, target_info, build_tag,

                          repo_info, ksfile, opts],

                      label='livemedia %s' % arch, arch=arch)

+                 if arch in opts.get('optional_arches', []):

+                     canfail.append(subtasks[arch])



              self.logger.debug("Got image subtasks: %r", subtasks)

              self.logger.debug("Waiting on livemedia subtasks...")

-             results = self.wait(subtasks.values(), all=True, failany=True)

+             results = self.wait(subtasks.values(), all=True, failany=True, canfail=canfail)


+             # if everything failed, fail even if all subtasks are in canfail

              self.logger.debug('subtask results: %r', results)

+             all_failed = True

+             for result in results.values():

+                 if not isinstance(result, dict) or 'faultCode' not in result:

+                     all_failed = False

+                     break

+             if all_failed:

+                 raise koji.GenericError("all subtasks failed")


+             # determine ignored arch failures

+             ignored_arches = set()

+             for arch in arches:

+                 if arch in opts.get('optional_arches', []):

+                     task_id = subtasks[arch]

+                     result = results[task_id]

+                     if isinstance(result, dict) and 'faultCode' in result:

+                         ignored_arches.add(arch)


              # wrap each image an RPM if needed

              spec_url = opts.get('specfile')
@@ -2427,6 +2446,8 @@ 

                      subtask_id = subtasks[arch]

                      result = results[subtask_id]

                      tinfo = self.session.getTaskInfo(subtask_id)

+                     if arch in ignored_arches:

+                         continue

                      arglist = [spec_url, target_info, bld_info, tinfo,

                                  {'repo_id': repo_info['id']}]

                      wrapper_tasks[arch] = self.subtask('wrapperRPM', arglist,
@@ -2437,6 +2458,8 @@ 


                  # add wrapper rpm results into main results

                  for arch in arches:

+                     if arch in ignored_arches:

+                         continue

                      result = results[subtasks[arch]]

                      result2 = results2[wrapper_tasks[arch]]

                      result['rpmresults'] = result2

file modified
+13 -6
@@ -5466,12 +5466,15 @@ 

          help=_("SCM URL to spec file fragment to use to generate wrapper RPMs"))

      parser.add_option("--skip-tag", action="store_true",

                        help=_("Do not attempt to tag package"))

+     parser.add_option("--can-fail", action="store", dest="optional_arches",

+         metavar="ARCH1,ARCH2,...", default="",

+         help=_("List of archs which are not blocking for build (separated by commas."))

      (task_options, args) = parser.parse_args(args)


      # Make sure the target and kickstart is specified.

      if len(args) != 5:

-         parser.error(_("Five arguments are required: a name, a version, an" +

-                        " architecture, a build target, and a relative path to" +

+         parser.error(_("Five arguments are required: a name, a version, a" +

+                        " build target, an architecture, and a relative path to" +

                         " a kickstart file."))

          assert False  # pragma: no cover

      _build_image(options, task_options, session, args, 'livemedia')
@@ -5732,6 +5735,9 @@ 

          help=_("Create a scratch image"))

      parser.add_option("--skip-tag", action="store_true",

                        help=_("Do not attempt to tag package"))

+     parser.add_option("--can-fail", action="store", dest="optional_arches",

+         metavar="ARCH1,ARCH2,...", default="",

+         help=_("List of archs which are not blocking for build (separated by commas."))

      parser.add_option("--specfile", metavar="URL",

          help=_("SCM URL to spec file fragment to use to generate wrapper RPMs"))

      parser.add_option("--wait", action="store_true",
@@ -5842,12 +5848,13 @@ 

          ksfile = os.path.join(serverdir, os.path.basename(ksfile))



+     hub_opts = {}

+     hub_opts['optional_arches'] = task_opts.optional_arches.split(',')

      passthru_opts = [

-         'isoname', 'ksurl', 'ksversion', 'scratch', 'repo',

-         'release', 'skip_tag', 'vmem', 'vcpu', 'format', 'specfile',

-         'title', 'install_tree_url',

+         'format', 'install_tree_url', 'isoname', 'ksurl',

+         'ksversion', 'release', 'repo', 'scratch', 'skip_tag',

+         'specfile', 'title', 'vcpu', 'vmem',


-     hub_opts = {}

      for opt in passthru_opts:

          val = getattr(task_opts, opt, None)

          if val is not None:

file modified
+30 -19
@@ -400,7 +400,7 @@ 

          params, method = xmlrpclib.loads(xml_request)

          return params


-     def getResult(self):

+     def getResult(self, raise_fault=True):

          query = """SELECT state,result FROM task WHERE id = %(id)i"""

          r = _fetchSingle(query, vars(self))

          if not r:
@@ -410,17 +410,20 @@ 

              raise koji.GenericError, "Task %i is canceled" % self.id

          elif koji.TASK_STATES[state] not in ['CLOSED', 'FAILED']:

              raise koji.GenericError, "Task %i is not finished" % self.id

-         # If the result is a Fault, then loads will raise it

-         # This is probably what we want to happen.

-         # Note that you can't really 'return' a fault over xmlrpc, you

-         # can only 'raise' them.

-         # If you try to return a fault as a value, it gets reduced to

-         # a mere struct.

-         # f = Fault(1,"hello"); print dumps((f,))

          if xml_result.find('<?xml', 0, 10) == -1:

              #handle older base64 encoded data

              xml_result = base64.decodestring(xml_result)

-         result, method = xmlrpclib.loads(xml_result)

+         try:

+             # If the result is a Fault, then loads will raise it

+             # This is normally what we want to happen

+             result, method = xmlrpclib.loads(xml_result)

+         except xmlrpclib.Fault, fault:

+             if raise_fault:

+                 raise

+             # Note that you can't really return a fault over xmlrpc, except by

+             # raising it. We return a dictionary in the same format that

+             # multiCall does.

+             return {'faultCode': fault.faultCode, 'faultString': fault.faultString}

          return result[0]


      def getInfo(self, strict=True, request=False):
@@ -10066,9 +10069,9 @@ 

          task = Task(taskId)

          return task.getRequest()


-     def getTaskResult(self, taskId):

+     def getTaskResult(self, taskId, raise_fault=True):

          task = Task(taskId)

-         return task.getResult()

+         return task.getResult(raise_fault=raise_fault)


      def getTaskInfo(self, task_id, request=False):

          """Get information about a task"""
@@ -11094,7 +11097,8 @@ 

                  c.execute(q, locals())

          return [finished, unfinished]


-     def taskWaitResults(self, parent, tasks):

+     def taskWaitResults(self, parent, tasks, canfail=None):

+         results = {}

          # If we're getting results, we're done waiting


          c = context.cnx.cursor()
@@ -11108,16 +11112,17 @@ 

              # Query all subtasks

              tasks = []

              c.execute(q, locals())

-             for id, state in c.fetchall():

+             for task_id, state in c.fetchall():

                  if state == canceled:

                      raise koji.GenericError, "Subtask canceled"

                  elif state in (closed, failed):

-                     tasks.append(id)

+                     tasks.append(task_id)

          # Would use a dict, but xmlrpc requires the keys to be strings

          results = []

-         for id in tasks:

-             task = Task(id)

-             results.append([id, task.getResult()])

+         for task_id in tasks:

+             task = Task(task_id)

+             raise_fault = (task in canfail)

+             results.append([task_id, task.getResult(raise_fault=raise_fault)])

          return results


      def getHostTasks(self):
@@ -11301,10 +11306,10 @@ 


          return host.taskWait(parent)


-     def taskWaitResults(self, parent, tasks):

+     def taskWaitResults(self, parent, tasks, canfail=None):

          host = Host()


-         return host.taskWaitResults(parent, tasks)

+         return host.taskWaitResults(parent, tasks, canfail)


      def subtask(self, method, arglist, parent, **opts):

          host = Host()
@@ -11445,6 +11450,9 @@ 


          logger.debug('scratch image results: %s' % results)

          for sub_results in results.values():

+             if 'task_id' not in sub_results:

+                 logger.warning('Task %s failed, no image available' % task_id)

+                 continue

              workdir = koji.pathinfo.task(sub_results['task_id'])

              scratchdir = koji.pathinfo.scratch()

              username = get_user(task.getOwner())['name']
@@ -11825,6 +11833,9 @@ 

          moving the image to its final location.


          for sub_results in results.values():

+             if 'task_id' not in sub_results:

+                 logger.warning('Task %s failed, no image available' % task_id)

+                 continue

              importImageInternal(task_id, build_id, sub_results)

              if sub_results.has_key('rpmresults'):

                  rpm_results = sub_results['rpmresults']

file modified
+30 -12
@@ -187,15 +187,20 @@ 

          safe_rmtree(self.workdir, unmount=False, strict=True)

          #os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir])


-     def wait(self, subtasks=None, all=False, failany=False):

+     def wait(self, subtasks=None, all=False, failany=False, canfail=None):

          """Wait on subtasks


          subtasks is a list of integers (or an integer). If more than one subtask

          is specified, then the default behavior is to return when any of those

          tasks complete. However, if all is set to True, then it waits for all of

-         them to complete.  If all and failany are both set to True, then each

-         finished task will be checked for failure, and a failure will cause all

-         of the unfinished tasks to be cancelled.

+         them to complete.


+         If all and failany are both set to True, then each finished task will

+         be checked for failure, and a failure will cause all of the unfinished

+         tasks to be cancelled.


+         If canfail is given a list of task ids, then those tasks can fail

+         without affecting the other tasks.


          special values:

              subtasks = None     specify all subtasks
@@ -206,6 +211,9 @@ 

              the database and will send the subprocess corresponding to the

              subtask a SIGUSR2 to wake it up when subtasks complete.



+         if canfail is None:

+             canfail = []

          if isinstance(subtasks, int):

              # allow single integer w/o enclosing list

              subtasks = [subtasks]
@@ -221,6 +229,9 @@ 

                      if failany:

                          failed = False

                          for task in finished:

+                             if task in canfail:

+                                 # no point in checking

+                                 continue



                              except (koji.GenericError, xmlrpclib.Fault), task_error:
@@ -243,9 +254,10 @@ 

              self.logger.debug("...waking up")

          self.logger.debug("Finished waiting")

          if all:

-             return dict(self.session.host.taskWaitResults(self.id, subtasks))

-         else:

-             return dict(self.session.host.taskWaitResults(self.id, finished))

+             finished = subtasks

+         return dict(self.session.host.taskWaitResults(self.id, finished,

+                                                     canfail=canfail))



      def getUploadDir(self):

          return koji.pathinfo.taskrelpath(self.id)
@@ -390,17 +402,23 @@ 

              os.spawnvp(os.P_NOWAIT, 'sleep', ['sleep', str(m)])


  class WaitTestTask(BaseTaskHandler):

+     """

+     Tests self.wait()


+     Starts few tasks which just sleeps. One of them will fail due to bad

+     arguments. As it is listed as 'canfail' it shouldn't affect overall

+     CLOSED status.

+     """

      Methods = ['waittest']

      _taskWeight = 0.1

      def handler(self, count, seconds=10):

          tasks = []

          for i in xrange(count):

-             task_id = self.session.host.subtask(method='sleep',

-                                                 arglist=[seconds],

-                                                 label=str(i),

-                                                 parent=self.id)

+             task_id = self.subtask(method='sleep', arglist=[seconds], label=str(i), parent=self.id)


-         results = self.wait(all=True)

+         bad_task = self.subtask('sleep', ['BAD_ARG'], label='bad')

+         tasks.append(bad_task)

+         results = self.wait(subtasks=tasks, all=True, failany=True, canfail=[bad_task])




file modified
+56 -15
@@ -1,14 +1,16 @@ 

  import random

+ from io import StringIO

+ from os import path, makedirs

+ from shutil import rmtree

+ from tempfile import gettempdir

  from unittest import TestCase

  from mock import patch, Mock, call

- from tempfile import gettempdir

- from shutil import rmtree

- from os import path, makedirs

- from io import StringIO


  import koji

- from koji.tasks import scan_mounts, umount_all, safe_rmtree, BaseTaskHandler, FakeTask, SleepTask, ForkTask

- from koji import BuildError, GenericError

+ from koji.tasks import BaseTaskHandler, FakeTask, ForkTask, SleepTask, \

+                        WaitTestTask, scan_mounts, umount_all, \

+                        safe_rmtree



  def get_fake_mounts_file():

      """ Returns contents of /prc/mounts in a file-like object
@@ -271,7 +273,7 @@ 

          obj.session.host.taskWaitResults.return_value = taskWaitResults

          self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))

          obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])

-         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234])

+         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])


      def test_BaseTaskHandler_wait_some_not_done(self):

          """ Tests that the wait function returns the one finished subtask results of
@@ -296,7 +298,7 @@ 

          obj.session.host.taskWaitResults.return_value = taskWaitResults

          self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))

          obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])

-         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234])

+         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234], canfail=[])


      @patch('signal.pause', return_value=None)

      def test_BaseTaskHandler_wait_some_not_done_all_set(self, mock_signal_pause):
@@ -336,7 +338,7 @@ 

          obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])

          obj.session.host.taskWait.assert_has_calls([call(12345678), call(12345678)])


-         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234])

+         obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])


      def test_BaseTaskHandler_wait_some_not_done_all_set_failany_set_failed_task(self):

          """ Tests that the wait function raises an exception when one of the subtask fails when the failany flag is set
@@ -348,11 +350,11 @@ 

          obj.session = Mock()

          obj.session.host.taskSetWait.return_value = None

          obj.session.host.taskWait.side_effect = [[[1551234], [1591234]], [[1551234, 1591234], []]]

-         obj.session.getTaskResult.side_effect = GenericError('Uh oh, we\'ve got a problem here!')

+         obj.session.getTaskResult.side_effect = koji.GenericError('Uh oh, we\'ve got a problem here!')


              obj.wait([1551234, 1591234], all=True, failany=True)

              raise Exception('A GeneralError was not raised.')

-         except GenericError as e:

+         except koji.GenericError as e:

              self.assertEquals(e.message, 'Uh oh, we\'ve got a problem here!')

              obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])

@@ -509,7 +511,7 @@ 


              obj.find_arch('noarch', host, None)

              raise Exception('The BuildError Exception was not raised')

-         except BuildError as e:

+         except koji.BuildError as e:

              self.assertEquals(e.message, 'No arch list for this host: test.domain.local')


      def test_BaseTaskHandler_find_arch_noarch_bad_tag(self):
@@ -524,7 +526,7 @@ 


              obj.find_arch('noarch', host, tag)

              raise Exception('The BuildError Exception was not raised')

-         except BuildError as e:

+         except koji.BuildError as e:

              self.assertEquals(e.message, 'No arch list for tag: some_package-1.2-build')


      def test_BaseTaskHandler_find_arch_noarch(self):
@@ -550,7 +552,7 @@ 


              obj.find_arch('noarch', host, tag)

              raise Exception('The BuildError Exception was not raised')

-         except BuildError as e:

+         except koji.BuildError as e:

              self.assertEquals(e.message, ('host test.domain.local (i386) does not support '

                                            'any arches of tag some_package-1.2-build (aarch64, x86_64)'))

@@ -646,7 +648,7 @@ 



              raise Exception('The BuildError Exception was not raised')

-         except BuildError as e:

+         except koji.BuildError as e:


              self.assertEquals(e.message, 'no repo (and no target) for tag rhel-7.3-build')

@@ -671,3 +673,42 @@ 

          obj = ForkTask(123, 'fork', [1, 20], None, None, (get_tmp_dir_path('ForkTask')))


          mock_spawnvp.assert_called_once_with(1, 'sleep', ['sleep', '20'])


+     @patch('signal.pause', return_value=None)

+     @patch('time.sleep')

+     def test_WaitTestTask_handler(self, mock_sleep, mock_signal_pause):

+         """ Tests that the WaitTestTask handler can be instantiated and runs appropriately based on the input

+             Specifically, that forking works and canfail behaves correctly.

+         """

+         self.mock_subtask_id = 1

+         def mock_subtask(method, arglist, id, **opts):

+             self.assertEqual(method, 'sleep')

+             task_id = self.mock_subtask_id

+             self.mock_subtask_id += 1

+             obj = SleepTask(task_id, 'sleep', arglist, None, None, (get_tmp_dir_path('SleepTask')))

+             obj.run()

+             return task_id


+         mock_taskWait = [

+             [[], [1, 2, 3, 4]],

+             [[3, 4], [1, 2]],

+             [[1, 2, 3, 4], []],

+         ]

+         def mock_getTaskResult(task_id):

+             if task_id == 4:

+                 raise koji.GenericError()



+         obj = WaitTestTask(123, 'waittest', [3], None, None, (get_tmp_dir_path('WaitTestTask')))

+         obj.session = Mock()

+         obj.session.host.subtask.side_effect = mock_subtask

+         obj.session.getTaskResult.side_effect = mock_getTaskResult

+         obj.session.host.taskWait.side_effect = mock_taskWait

+         obj.session.host.taskWaitResults.return_value = [ ['1', {}], ['2', {}], ['3', {}], ['4', {}], ]

+         obj.run()

+         #self.assertEqual(mock_sleep.call_count, 4)

+         obj.session.host.taskSetWait.assert_called_once()

+         obj.session.host.taskWait.assert_has_calls([call(123), call(123), call(123)])

+         # getTaskResult should be called in 2nd round only for task 3, as 4

+         # will be skipped as 'canfail'

+         obj.session.getTaskResult.assert_has_calls([call(3)])


7 years ago


7 years ago


7 years ago

the cli options patch needs rebased


7 years ago

If the --can-fail option is not used, this line will crash with AttributeError: 'NoneType' object has no attribute 'split'.

I have tested this in staging koji a little.

The code path that does not use this feature is working fine (example task).

When running a spin-livemedia task that should fail on i386 and succeed on x86_64, it all looked fine until the very end, when the toplevel task was marked as failed with result <type 'exceptions.KeyError'>: 'task_id'.


For the record the full command I used to start that is:

koji spin-livemedia Fedora-Scientific_KDE-Live Rawhide f25 i386,x86_64 fedora-live-scientific_kde.ks '--install-tree=http://kojipkgs.fedoraproject.org/compose/rawhide/Fedora-Rawhide-20161110.n.0/compose/Everything/$basearch/os' '--ksurl=git+https://pagure.io/fedora-kickstarts.git?#881933b138c981abe5a1145412e9c9ecc94e930f' --release=20161110.n.2 '--repo=http://kojipkgs.fedoraproject.org/compose/rawhide/Fedora-Rawhide-20161110.n.0/compose/Everything/$basearch/os' --can-fail=i386

1 new commit added

  • fix default value for --can-fail
7 years ago

Fixed first one, now looking to second problem.

1 new commit added

  • don't try to upload image if task failed
7 years ago


7 years ago

I'll add special check that if all subtasks fails (and they are listed as can-fail) the parent task will fail. It makes not much sense to successfully close it.


7 years ago

Rebased to current master and added commit for failing parent tasks if all subtasks failed.

This exact line seems to be here twice.

1 new commit added

  • typo
7 years ago


7 years ago

can we please have this rebased

It rebases fine. I have done so here, and added a needed fix:


If the above looks fine to everyone, I can just merge it


7 years ago

Commit 12c7f89 fixes this pull-request

Pull-Request has been merged by mikem@redhat.com

7 years ago