#900 enable batch multiCall in clone-tag
Merged 5 years ago by mikem. Opened 6 years ago by julian8628.
julian8628/koji batch-multicall  into  master

file modified
+17 -12
@@ -3234,6 +3234,8 @@ 

      parser.add_option("-f","--force", action="store_true",

              help=_("override tag locks if necessary"))

      parser.add_option("-n","--test", action="store_true", help=_("test mode"))

+     parser.add_option("--batch", type='int', default=1000, metavar='SIZE',

+             help=_("batch size of multicalls [0 to disable, default: %default]"))

      (options, args) = parser.parse_args(args)

  

      if len(args) != 2:
@@ -3249,6 +3251,9 @@ 

          sys.stdout.write('Source and destination tags must be different.\n')

          return

  

+     if options.batch < 0:

+         parser.error(_("batch size must be bigger than zero"))

+ 

      if options.all:

          options.config = options.groups = options.pkgs = options.builds = True

  
@@ -3300,7 +3305,7 @@ 

                                             owner=pkgs['owner_name'],block=pkgs['blocked'],

                                             extra_arches=pkgs['extra_arches'])

              if not options.test:

-                 session.multiCall()

+                 session.multiCall(batch=options.batch)

          if options.builds:

              # get --all latest builds from src tag

              builds = session.listTagged(srctag['id'], event=event.get('id'),
@@ -3317,7 +3322,7 @@ 

                  if not options.test:

                      session.tagBuildBypass(newtag['name'], build, force=options.force)

              if not options.test:

-                 session.multiCall()

+                 session.multiCall(batch=options.batch)

          if options.groups:

              # Copy the group data

              srcgroups = session.getTagGroups(srctag['name'], event=event.get('id'))
@@ -3332,7 +3337,7 @@ 

                                                      pkg['package'], block=pkg['blocked'])

                      chggrplist.append(('[new]', pkg['package'], group['name']))

              if not options.test:

-                 session.multiCall()

+                 session.multiCall(batch=options.batch)

      # case of existing dst-tag.

      if dsttag:

          # get fresh list of packages & builds into maps.
@@ -3423,7 +3428,7 @@ 

                                         block=pkg['blocked'],

                                         extra_arches=pkg['extra_arches'])

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # ADD builds.

          if not options.test:

              session.multicall = True
@@ -3436,7 +3441,7 @@ 

              if not options.test:

                  session.tagBuildBypass(dsttag['name'], build, force=options.force)

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # ADD groups.

          if not options.test:

              session.multicall = True
@@ -3448,7 +3453,7 @@ 

                      session.groupPackageListAdd(dsttag['name'], group['name'], pkg['package'], force=options.force)

                  chggrplist.append(('[new]', pkg['package'], group['name']))

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # ADD group pkgs.

          if not options.test:

              session.multicall = True
@@ -3458,7 +3463,7 @@ 

                  if not options.test:

                      session.groupPackageListAdd(dsttag['name'], group, pkg, force=options.force)

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # DEL builds.

          if not options.test:

              session.multicall = True
@@ -3473,7 +3478,7 @@ 

                  if not options.test:

                      session.untagBuildBypass(dsttag['name'], build, force=options.force)

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # DEL packages.

          ninhrtpdellist = []

          inhrtpdellist = []
@@ -3487,7 +3492,7 @@ 

          for pkg in ninhrtpdellist:

              # check if package have owned builds inside.

              session.listTagged(dsttag['name'], package=pkg['package_name'], inherit=False)

-         bump_builds = session.multiCall()

+         bump_builds = session.multiCall(batch=options.batch)

          if not options.test:

              session.multicall = True

          for pkg, [builds] in zip(ninhrtpdellist, bump_builds):
@@ -3512,7 +3517,7 @@ 

              if not options.test:

                  session.packageListBlock(dsttag['name'], pkg['package_name'])

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # DEL groups.

          if not options.test:

              session.multicall = True
@@ -3530,7 +3535,7 @@ 

                  for pkg in group['packagelist']:

                      chggrplist.append(('[blk]', pkg['package'], group['name']))

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

          # DEL group pkgs.

          if not options.test:

              session.multicall = True
@@ -3546,7 +3551,7 @@ 

                      if not options.test:

                          session.groupPackageListBlock(dsttag['name'], group, pkg)

          if not options.test:

-             session.multiCall()

+             session.multiCall(batch=options.batch)

      # print final list of actions.

      if options.verbose:

          pfmt='    %-7s %-28s %-10s %-10s %-10s\n'

file modified
+44 -14
@@ -2521,19 +2521,41 @@ 

                      time.sleep(interval)

              #not reached

  

-     def multiCall(self, strict=False):

-         """Execute a multicall (multiple function calls passed to the server

-         and executed at the same time, with results being returned in a batch).

-         Before calling this method, the self.multicall field must have

-         been set to True, and then one or more methods must have been called on

-         the current session (those method calls will return None).  On executing

-         the multicall, the self.multicall field will be reset to False

-         (so subsequent method calls will be executed immediately)

-         and results will be returned in a list.  The list will contain one element

-         for each method added to the multicall, in the order it was added to the multicall.

-         Each element of the list will be either a one-element list containing the result of the

-         method call, or a map containing "faultCode" and "faultString" keys, describing the

-         error that occurred during the method call."""

+     def multiCall(self, strict=False, batch=None):

+         """Execute a prepared multicall

+ 

+         In a multicall, a number of calls are combined into a single RPC call

+         and handled by the server in a batch. This can improve throughput.

+ 

+         The server handles a multicall as a single database transaction (though

+         see the note about the batch option below).

+ 

+         To prepare a multicall:

+           1. set the multicall attribute to True

+           2. issue one or more calls in the normal fashion

+ 

+         When multicall is True, the call parameters are stored rather than

+         passed to the server. Each call will return the special value

+         MultiCallInProgress, since the return is not yet known.

+ 

+         This method executes the prepared multicall, resets the multicall

+         attribute to False (so subsequent calls will work normally), and

+         returns the results of the calls as a list.

+ 

+         The result list will contain one element for each call added to the

+         multicall, in the order it was added. Each element will be either:

+           - a one-element list containing the result of the method call

+           - a map containing "faultCode" and "faultString" keys, describing

+             the error that occurred during the call.

+ 

+         If the strict option is set to True, then this call will raise the

+         first error it encounters, if any.

+ 

+         If the batch option is set to a number greater than zero, the calls

+         will be spread across multiple multicall batches of at most this

+         number. Note that each such batch will be a separate database

+         transaction.

+         """

          if not self.multicall:

              raise GenericError('ClientSession.multicall must be set to True before calling multiCall()')

          self.multicall = False
@@ -2542,7 +2564,15 @@ 

  

          calls = self._calls

          self._calls = []

-         ret = self._callMethod('multiCall', (calls,), {})

+         if batch is not None and batch > 0:

+             ret = []

+             callgrp = (calls[i:i + batch] for i in range(0, len(calls), batch))

+             self.logger.debug("MultiCall with batch size %i, calls/groups(%i/%i)",

+                               batch, len(calls), round(len(calls) / batch))

+             for c in callgrp:

+                 ret.extend(self._callMethod('multiCall', (c,), {}))

+         else:

+             ret = self._callMethod('multiCall', (calls,), {})

          if strict:

              #check for faults and raise first one

              for entry in ret:

@@ -2,8 +2,10 @@ 

  import mock

  import unittest

  import six

+ import logging

  

  import koji

+ from koji.xmlrpcplus import Fault

  

  

  class TestClientSession(unittest.TestCase):
@@ -167,3 +169,59 @@ 

              kwargs = call[0][2]

              self.assertTrue('volume' in kwargs)

              self.assertEqual(kwargs['volume'], 'foobar')

+ 

+ 

+ class TestMultiCall(unittest.TestCase):

+ 

+     def setUp(self):

+         self.ksession = koji.ClientSession('http://koji.example.com/kojihub')

+         # mocks

+         self.ksession._sendCall = mock.MagicMock()

+ 

+     def tearDown(self):

+         del self.ksession

+ 

+     def test_multiCall_disable(self):

+         with self.assertRaises(koji.GenericError) as cm:

+             self.ksession.multiCall()

+         self.assertEqual(cm.exception.args[0],

+                          "ClientSession.multicall must be set to True"

+                          " before calling multiCall()")

+ 

+     def test_multiCall_empty(self):

+         self.ksession.multicall = True

+         ret = self.ksession.multiCall()

+         self.assertEqual([], ret)

+         self.ksession._sendCall.assert_not_called()

+ 

+     def test_multiCall_strict(self):

+         self.ksession._sendCall.return_value = [[], {'faultCode': 1000,

+                                                'faultString': 'msg'}]

+         self.ksession.multicall = True

+         self.ksession.methodA('a', 'b', c='c')

+         self.ksession.methodB(1, 2, 3)

+         with self.assertRaises(koji.GenericError):

+             self.ksession.multiCall(strict=True)

+ 

+     def test_multiCall_not_strict(self):

+         self.ksession._sendCall.return_value = [[], {'faultCode': 1000,

+                                                      'faultString': 'msg'}]

+         self.ksession.multicall = True

+         self.ksession.methodA('a', 'b', c='c')

+         self.ksession.methodB(1, 2, 3)

+         ret = self.ksession.multiCall()

+         self.assertFalse(self.ksession.multicall)

+         self.assertEqual([[], {'faultCode': 1000, 'faultString': 'msg'}], ret)

+ 

+     def test_multiCall_batch(self):

+         self.ksession._sendCall.side_effect = [[['a', 'b', 'c']],

+                                                [{'faultCode': 1000,

+                                                  'faultString': 'msg'}]]

+         self.ksession.multicall = True

+         self.ksession.methodA('a', 'b', c='c')

+         self.ksession.methodB(1, 2, 3)

+         ret = self.ksession.multiCall(batch=1)

+         self.assertFalse(self.ksession.multicall)

+         self.assertEqual(2, self.ksession._sendCall.call_count)

+         self.assertEqual([['a', 'b', 'c'],

+                           {'faultCode': 1000, 'faultString': 'msg'}], ret)

fixes #885

  • add batch option in ClientSession.multiCall()
  • enable this batch option in clone-tag command

Didn't make batch affecting globally yet.
I guess we could move batch into koji.conf and ClientSession options next like
https://pagure.io/fork/julian8628/koji/commits/global-batch

I don't like global approach. There is many cases, where it makes sense to uses batches, on the other hand a lot of other usecases should be done in one transaction or not at all. Enabling batch for all multicall calls seems to me too dangerous. So, I would rather merge this request and use batch explicitly, where we are sure that it is safe to.

I don't like global approach. There is many cases, where it makes sense to uses batches, on the other hand a lot of other usecases should be done in one transaction or not at all. Enabling batch for all multicall calls seems to me too dangerous. So, I would rather merge this request and use batch explicitly, where we are sure that it is safe to.

Yes, agree.

Maybe batch=None? It seems to me like it is more evident that batch mode will not be used, but maybe it is only personal taste.

As we are already doing this for large number of calls, generator could save us some memory. (switch from [ to () It should work even on py2.5

rebased onto 51a2d8f9b80b777929b58bb86d592bfb83607518

5 years ago

I would add more explicit warning as users needn't to notice danger of separate transactions.

rebased onto 58f312023ab913d01b162849314a7610198f5b5c

5 years ago

1 new commit added

  • add warning for batch usage in clone-tag
5 years ago

@tkopecek
I added one warning msg in clone-tag and updated docstr for multiCall()

I don't think we need the warnings here. They're going to scare users needlessly. This command has never been a single transaction (different aspects are changed in different multicalls, or in individual calls), so there has always been a risk that an interrupted run would leave things in a broken state or that other users might see see partial work.

That multiCall() docstring has gotten unwieldy. I've rewritten it here:

https://github.com/mikem23/koji-playground/commits/pagure/pr/900

Do we want to default to batch=1000? How common is #885?

3 new commits added

  • give user option to use avoid using batches
  • clean up docstring for multiCall()
  • drop unecessary warning
5 years ago

That multiCall() docstring has gotten unwieldy. I've rewritten it here:
https://github.com/mikem23/koji-playground/commits/pagure/pr/900

+1, and rebased those commits

Do we want to default to batch=1000? How common is #885?

Do you mean to using batch=1000 for multiCall globally?
It looks we have some other calls which might be too long as well, like davidlt tense, and CGImport, but they are not related to multiCall
in CLI, multiCall is used in

  • anon_handle_list_channels
  • anon_handle_list_hosts
  • handle_set_pkg_arches
  • handle_set_pkg_owner
  • handle_unblock_pkg
  • handle_edit_host
  • handle_add_pkg
  • handle_block_pkg
  • handle_remove_pkg
  • handle_disable_host
  • handle_enable_host
  • handle_prune_signed_copies

But, they are barely able to reach the size limit even 1000.
and it's also invoked in koji-gc, koji-shadow, kojira, kojiweb.
it looks the call in kojira could be very big by chance.

Do you mean to using batch=1000 for multiCall globally?

I mean in this particular command. Currently the --batch option has a default value of 1000.

It's not clear to me whether we want that default to stay as is (work around #885 by default at the cost of extra transactions), or leave batching off by default (the original behavior). That's why I'm wondering how frequent #885 is.

It looks we have some other calls which might be too long as well, like davidlt tense, and CGImport, but they are not related to multiCall

Clients can avoid this in CGImport by uploading the metadata as a file rather than passing it in as an arg.

I'm ok with this command defaulting to batch=1000, I just want to be sure we're thinking about the reasons and risks.

If we're all happy with the current changes, I should be able to merge them on Monday.

1000 can cover the requirement of RCM without adding --batch param on this command, and won't break other common using.
I would +1 on default 1000 here.

rebased onto 0ed860d

5 years ago

Commit 967704b fixes this pull-request

Pull-Request has been merged by mikem

5 years ago