#1714 use BulkInsertProcessor for hub mass inserts
Merged 4 years ago by mikem. Opened 4 years ago by tkopecek.
tkopecek/koji issue1712  into  master

file modified
+124 -26
@@ -6434,16 +6434,16 @@ 

          rpmlist = fileinfo['hub.rpmlist']

          archives = fileinfo['hub.archives']

  

-         insert = InsertProcessor('archive_rpm_components')

-         insert.set(archive_id=archive_id)

-         for rpminfo in rpmlist:

-             insert.set(rpm_id=rpminfo['id'])

+         if rpmlist:

+             insert = BulkInsertProcessor('archive_rpm_components')

+             for rpminfo in rpmlist:

+                 insert.set(archive_id=archive_id, rpm_id=rpminfo['id'])

              insert.execute()

  

-         insert = InsertProcessor('archive_components')

-         insert.set(archive_id=archive_id)

-         for archiveinfo in archives:

-             insert.set(component_id=archiveinfo['id'])

+         if archives:

+             insert = BulkInsertProcessor('archive_components')

+             for archiveinfo in archives:

+                 insert.set(archive_id=archive_id, component_id=archiveinfo['id'])

              insert.execute()

  

  
@@ -8305,6 +8305,98 @@ 

      return row

  

  

+ class BulkInsertProcessor(object):

+     def __init__(self, table, data=None, columns=None, strict=True, batch=1000):

+         """Do bulk inserts - it has some limitations compared to

+         InsertProcessor (no rawset, dup_check).

+ 

+         set() is replaced with add_record() to avoid confusion

+ 

+         table   - name of the table

+         data    - list of dict per record

+         columns - list/set of names of used columns - makes sense

+                   mainly with strict=True

+         strict  - if True, all records must contain values for all columns.

+                   if False, missing values will be inserted as NULLs

+         batch   - batch size for inserts (one statement per batch)

+         """

+ 

+         self.table = table

+         self.data = []

+         if columns is None:

+             self.columns = set()

+         else:

+             self.columns = set(columns)

+         if data is not None:

+             self.data = data

+             for row in data:

+                 self.columns |= set(row.keys())

+         self.strict = strict

+         self.batch = batch

+ 

+     def __str__(self):

+         if not self.data:

+             return "-- incomplete insert: no data"

+         query, params = self._get_insert(self.data)

+         return query

+ 

+     def _get_insert(self, data):

+         """

+         Generate one insert statement for the given data

+ 

+         :param list data: list of rows (dict format) to insert

+         :returns: (query, params)

+         """

+ 

+         if not data:

+             # should not happen

+             raise ValueError('no data for insert')

+         parts = ['INSERT INTO %s ' % self.table]

+         columns = sorted(self.columns)

+         parts.append("(%s) " % ', '.join(columns))

+ 

+         prepared_data = {}

+         values = []

+         i = 0

+         for row in data:

+             row_values = []

+             for key in columns:

+                 if key in row:

+                     row_key = '%s%d' % (key, i)

+                     row_values.append("%%(%s)s" % row_key)

+                     prepared_data[row_key] = row[key]

+                 elif self.strict:

+                     raise koji.GenericError("Missing value %s in BulkInsert" % key)

+                 else:

+                     row_values.append("NULL")

+             values.append("(%s)" % ', '.join(row_values))

+             i += 1

+         parts.append("VALUES %s" % ', '.join(values))

+         return ''.join(parts), prepared_data

+ 

+     def __repr__(self):

+         return "<BulkInsertProcessor: %r>" % vars(self)

+ 

+     def add_record(self, **kwargs):

+         """Set whole record via keyword args"""

+         if not kwargs:

+             raise koji.GenericError("Missing values in BulkInsert.add_record")

+         self.data.append(kwargs)

+         self.columns |= set(kwargs.keys())

+ 

+     def execute(self):

+         if not self.batch:

+             self._one_insert(self.data)

+         else:

+             for i in range(0, len(self.data), self.batch):

+                 data = self.data[i:i+self.batch]

+                 self._one_insert(data)

+ 

+     def _one_insert(self, data):

+         query, params = self._get_insert(data)

+         _dml(query, params)

+ 

+ 

  class InsertProcessor(object):

      """Build an insert statement

  
@@ -9392,16 +9484,16 @@ 

          rpm_ids.append(data['id'])

  

      # associate those RPMs with the image

-     insert = InsertProcessor('archive_rpm_components')

+     insert = BulkInsertProcessor('archive_rpm_components')

      for archive in archives:

          logger.info('working on archive %s', archive)

          if archive['filename'].endswith('xml'):

              continue

-         insert.set(archive_id = archive['id'])

          logger.info('associating installed rpms with %s', archive['id'])

          for rpm_id in rpm_ids:

-             insert.set(rpm_id = rpm_id)

-             insert.execute()

+             insert.add_record(archive_id=archive['id'], rpm_id=rpm_id)

+     if insert.data:

+         insert.execute()

  

      koji.plugin.run_callbacks('postImport', type='image', image=imgdata,

                                build=build_info, fullpath=fullpath)
@@ -12373,6 +12465,7 @@ 

      def _setList(self, rpmlist, update=False):

          """Set or update the list of rpms in a buildroot"""

  

+         update = bool(update)

          if self.id is None:

              raise koji.GenericError("buildroot not specified")

          if update:
@@ -12393,11 +12486,11 @@ 

          #we sort to try to avoid deadlock issues

          rpm_ids.sort()

  

-         # actually do the inserts

-         insert = InsertProcessor('buildroot_listing')

-         insert.set(buildroot_id=self.id, is_update=bool(update))

-         for rpm_id in rpm_ids:

-             insert.set(rpm_id=rpm_id)

+         # actually do the inserts (in bulk)

+         if rpm_ids:

+             insert = BulkInsertProcessor(table='buildroot_listing')

+             for rpm_id in rpm_ids:

+                 insert.add_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update)

              insert.execute()

  

      def setList(self, rpmlist):
@@ -12442,6 +12535,7 @@ 

          If False, they dependencies required to setup the build environment.

          """

  

+         project = bool(project)

          if self.is_standard:

              if not (context.opts.get('EnableMaven') or context.opts.get('EnableWin')):

                  raise koji.GenericError("non-rpm support is not enabled")
@@ -12450,21 +12544,25 @@ 

          archives = set([r['id'] for r in archives])

          current = set([r['id'] for r in self.getArchiveList()])

          new_archives = archives.difference(current)

-         insert = InsertProcessor('buildroot_archives')

-         insert.set(buildroot_id=self.id, project_dep=bool(project))

-         for archive_id in sorted(new_archives):

-             insert.set(archive_id=archive_id)

+ 

+         if new_archives:

+             insert = BulkInsertProcessor('buildroot_archives')

+             for archive_id in sorted(new_archives):

+                 insert.add_record(buildroot_id=self.id,

+                                   project_dep=project,

+                                   archive_id=archive_id)

              insert.execute()

  

      def setTools(self, tools):

          """Set tools info for buildroot"""

  

-         insert = InsertProcessor('buildroot_tools_info')

-         insert.set(buildroot_id=self.id)

+         if not tools:

+             return

+ 

+         insert = BulkInsertProcessor('buildroot_tools_info')

          for tool in tools:

-             insert.set(tool=tool['name'])

-             insert.set(version=tool['version'])

-             insert.execute()

+             insert.add_record(buildroot_id=self.id, tool=tool['name'], version=tool['version'])

+         insert.execute()

  

  

  class Host(object):

@@ -121,76 +121,25 @@ 

              {}

          ],

          [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1002,

-                 "rpm_id": 1000

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1002,

-                 "rpm_id": 1001

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1002,

-                 "rpm_id": 1002

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1003,

-                 "rpm_id": 1000

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1003,

-                 "rpm_id": 1001

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1003,

-                 "rpm_id": 1002

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1005,

-                 "rpm_id": 1000

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1005,

-                 "rpm_id": 1001

-             },

-             {}

-         ],

-         [

-             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",

-             {

-                 "archive_id": 1005,

-                 "rpm_id": 1002

-             },

-             {}

+             "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id0)s, %(rpm_id0)s), (%(archive_id1)s, %(rpm_id1)s), (%(archive_id2)s, %(rpm_id2)s), (%(archive_id3)s, %(rpm_id3)s), (%(archive_id4)s, %(rpm_id4)s), (%(archive_id5)s, %(rpm_id5)s), (%(archive_id6)s, %(rpm_id6)s), (%(archive_id7)s, %(rpm_id7)s), (%(archive_id8)s, %(rpm_id8)s)",

+               {"archive_id0": 1002,

+                "archive_id1": 1002,

+                "archive_id2": 1002,

+                "archive_id3": 1003,

+                "archive_id4": 1003,

+                "archive_id5": 1003,

+                "archive_id6": 1005,

+                "archive_id7": 1005,

+                "archive_id8": 1005,

+                "rpm_id0": 1000,

+                "rpm_id1": 1001,

+                "rpm_id2": 1002,

+                "rpm_id3": 1000,

+                "rpm_id4": 1001,

+                "rpm_id5": 1002,

+                "rpm_id6": 1000,

+                "rpm_id7": 1001,

+                "rpm_id8": 1002}

          ]

      ],

      "updates": [

@@ -28,6 +28,17 @@ 

      return grab_insert

  

  

+ def make_bulk_insert_grabber(test):

+     # test is the test class instance

+     def grab_insert(insert, data):

+         # insert is self for the BulkInsertProcessor instance

+         # we are replacing _one_insert()

+         query, params = insert._get_insert(data)

+         info = [query, copy.copy(params)]

+         test.inserts.append(info)

+     return grab_insert

+ 

+ 

  def make_update_grabber(test):

      # test is the test class instance

      def grab_update(update):
@@ -68,6 +79,8 @@ 

          self.updates = []

          mock.patch.object(kojihub.InsertProcessor, 'execute',

                      new=make_insert_grabber(self)).start()

+         mock.patch.object(kojihub.BulkInsertProcessor, '_one_insert',

+                     new=make_bulk_insert_grabber(self)).start()

          mock.patch.object(kojihub.UpdateProcessor, 'execute',

                      new=make_update_grabber(self)).start()

          mock.patch('kojihub.nextval', new=self.my_nextval).start()

@@ -110,6 +110,6 @@ 

          expression, kwargs = cursor.execute.mock_calls[0][1]

          expression = " ".join(expression.split())

          expected = 'INSERT INTO archive_rpm_components (archive_id, rpm_id) ' + \

-             'VALUES (%(archive_id)s, %(rpm_id)s)'

+             'VALUES (%(archive_id0)s, %(rpm_id0)s)'

          self.assertEquals(expression, expected)

-         self.assertEquals(kwargs, {'archive_id': 9, 'rpm_id': 6})

+         self.assertEquals(kwargs, {'archive_id0': 9, 'rpm_id0': 6})

@@ -5,6 +5,7 @@ 

  except ImportError:

      import unittest

  

+ import koji

  import kojihub

  

  
@@ -91,3 +92,117 @@ 

          actual = str(proc)

          expected = "INSERT INTO sometable (foo) VALUES (('bar'))"  # raw data

          self.assertEquals(actual, expected)

+ 

+ 

+ class TestBulkInsertProcessor(unittest.TestCase):

+     def test_basic_instantiation(self):

+         proc = kojihub.BulkInsertProcessor('sometable')

+         actual = str(proc)

+         expected = '-- incomplete insert: no data'

+         self.assertEquals(actual, expected)

+ 

+     def test_to_string_with_single_row(self):

+         proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])

+         actual = str(proc)

+         expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)'

+         self.assertEquals(actual, expected)

+ 

+         proc = kojihub.BulkInsertProcessor('sometable')

+         proc.add_record(foo='bar')

+         actual = str(proc)

+         self.assertEquals(actual, expected)

+ 

+     @mock.patch('kojihub.context')

+     def test_simple_execution(self, context):

+         cursor = mock.MagicMock()

+         context.cnx.cursor.return_value = cursor

+         proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])

+         proc.execute()

+         cursor.execute.assert_called_once_with(

+             'INSERT INTO sometable (foo) VALUES (%(foo0)s)',

+             {'foo0': 'bar'},

+         )

+ 

+         cursor.reset_mock()

+         proc = kojihub.BulkInsertProcessor('sometable')

+         proc.add_record(foo='bar')

+         proc.execute()

+         cursor.execute.assert_called_once_with(

+             'INSERT INTO sometable (foo) VALUES (%(foo0)s)',

+             {'foo0': 'bar'},

+         )

+ 

+     @mock.patch('kojihub.context')

+     def test_bulk_execution(self, context):

+         cursor = mock.MagicMock()

+         context.cnx.cursor.return_value = cursor

+ 

+         proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}])

+         proc.add_record(foo='bar2')

+         proc.add_record(foo='bar3')

+         proc.execute()

+         cursor.execute.assert_called_once_with(

+             'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',

+             {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'},

+         )

+ 

+     def test_missing_values(self):

+         proc = kojihub.BulkInsertProcessor('sometable')

+         proc.add_record(foo='bar')

+         proc.add_record(foo2='bar2')

+         with self.assertRaises(koji.GenericError) as cm:

+             str(proc)

+         self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')

+ 

+     def test_missing_values_nostrict(self):

+         proc = kojihub.BulkInsertProcessor('sometable', strict=False)

+         proc.add_record(foo='bar')

+         proc.add_record(foo2='bar2')

+         actual = str(proc)

+         expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)'

+         self.assertEquals(actual, expected)

+ 

+     def test_missing_values_explicit_columns(self):

+         proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2'])

+         proc.add_record(foo='bar')

+         with self.assertRaises(koji.GenericError) as cm:

+             str(proc)

+         self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')

+ 

+     @mock.patch('kojihub.context')

+     def test_batch_execution(self, context):

+         cursor = mock.MagicMock()

+         context.cnx.cursor.return_value = cursor

+ 

+         proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2)

+         proc.add_record(foo='bar2')

+         proc.add_record(foo='bar3')

+         proc.execute()

+         calls = cursor.execute.mock_calls

+         # list of (name, positional args, keyword args)

+         self.assertEquals(len(calls), 2)

+         self.assertEquals(

+                 calls[0][1],

+                 ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)',

+                     {'foo0': 'bar1', 'foo1': 'bar2'}))

+         self.assertEquals(

+                 calls[1][1],

+                 ('INSERT INTO sometable (foo) VALUES (%(foo0)s)',

+                     {'foo0': 'bar3'}))

+ 

+     @mock.patch('kojihub.context')

+     def test_no_batch_execution(self, context):

+         cursor = mock.MagicMock()

+         context.cnx.cursor.return_value = cursor

+ 

+         proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=None)

+         proc.add_record(foo='bar2')

+         proc.add_record(foo='bar3')

+         proc.execute()

+         calls = cursor.execute.mock_calls

+         # list of (name, positional args, keyword args)

+         self.assertEquals(len(calls), 1)

+         self.assertEquals(

+                 calls[0][1],

+                 ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',

+                     {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}))

rebased onto b1ac83a

4 years ago

is this a debug print?

Yes, fixed.

The part in import_components() is still calling set() for the bulk operation.

I don't think set_record is quite the right name for this. Seems like add_record (or even just add) would be more accurate.

1 new commit added

  • rename set_record to add_record
4 years ago

On Friday we were talking about possibly chunking this for large numbers of inserts. I just took a stab at this here. If we still think this is worthwhile we can pull this into the PR, or possibly we can split it off into a separate one.

https://github.com/mikem23/koji-playground/commits/pagure/pr/1714

1 new commit added

  • support batch operation in BulkInsertProcessor
4 years ago

Yep, I pulled it into this PR.

Oops, there's a typo in my patch. Should be self._one_insert() both places in execute().

@@ -8386,7 +8386,7 @@ class BulkInsertProcessor(object):

     def execute(self):
         if not self.batch:
-            self.__one_insert(self.data)
+            self._one_insert(self.data)
         else:
             for i in range(0, len(self.data), self.batch):
                 data = self.data[i:i+self.batch]

1 new commit added

  • fix typo and extend unit test to cover where it occurred
4 years ago

Commit aaaabcc fixes this pull-request

Pull-Request has been merged by mikem

4 years ago

Metadata Update from @jcupova:
- Pull-request tagged with: no_qe

4 years ago