From aaaabcc99c679f1d945f86e14ca8a5bcb9927a99 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Nov 19 2019 17:01:19 +0000 Subject: PR#1714: use BulkInsertProcessor for hub mass inserts Merges #1714 https://pagure.io/koji/pull-request/1714 Fixes: #1712 https://pagure.io/koji/issue/1712 [RFE] Use bulk inserts in hub --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 84f5452..aae60c5 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -6446,16 +6446,16 @@ class CG_Importer(object): rpmlist = fileinfo['hub.rpmlist'] archives = fileinfo['hub.archives'] - insert = InsertProcessor('archive_rpm_components') - insert.set(archive_id=archive_id) - for rpminfo in rpmlist: - insert.set(rpm_id=rpminfo['id']) + if rpmlist: + insert = BulkInsertProcessor('archive_rpm_components') + for rpminfo in rpmlist: + insert.set(archive_id=archive_id, rpm_id=rpminfo['id']) insert.execute() - insert = InsertProcessor('archive_components') - insert.set(archive_id=archive_id) - for archiveinfo in archives: - insert.set(component_id=archiveinfo['id']) + if archives: + insert = BulkInsertProcessor('archive_components') + for archiveinfo in archives: + insert.set(archive_id=archive_id, component_id=archiveinfo['id']) insert.execute() @@ -8348,6 +8348,98 @@ def _fix_extra_field(row): return row +class BulkInsertProcessor(object): + def __init__(self, table, data=None, columns=None, strict=True, batch=1000): + """Do bulk inserts - it has some limitations compared to + InsertProcessor (no rawset, dup_check). + + set() is replaced with add_record() to avoid confusion + + table - name of the table + data - list of dict per record + columns - list/set of names of used columns - makes sense + mainly with strict=True + strict - if True, all records must contain values for all columns. + if False, missing values will be inserted as NULLs + batch - batch size for inserts (one statement per batch) + """ + + self.table = table + self.data = [] + if columns is None: + self.columns = set() + else: + self.columns = set(columns) + if data is not None: + self.data = data + for row in data: + self.columns |= set(row.keys()) + self.strict = strict + self.batch = batch + + def __str__(self): + if not self.data: + return "-- incomplete insert: no data" + query, params = self._get_insert(self.data) + return query + + def _get_insert(self, data): + """ + Generate one insert statement for the given data + + :param list data: list of rows (dict format) to insert + :returns: (query, params) + """ + + if not data: + # should not happen + raise ValueError('no data for insert') + parts = ['INSERT INTO %s ' % self.table] + columns = sorted(self.columns) + parts.append("(%s) " % ', '.join(columns)) + + prepared_data = {} + values = [] + i = 0 + for row in data: + row_values = [] + for key in columns: + if key in row: + row_key = '%s%d' % (key, i) + row_values.append("%%(%s)s" % row_key) + prepared_data[row_key] = row[key] + elif self.strict: + raise koji.GenericError("Missing value %s in BulkInsert" % key) + else: + row_values.append("NULL") + values.append("(%s)" % ', '.join(row_values)) + i += 1 + parts.append("VALUES %s" % ', '.join(values)) + return ''.join(parts), prepared_data + + def __repr__(self): + return "" % vars(self) + + def add_record(self, **kwargs): + """Set whole record via keyword args""" + if not kwargs: + raise koji.GenericError("Missing values in BulkInsert.add_record") + self.data.append(kwargs) + self.columns |= set(kwargs.keys()) + + def execute(self): + if not self.batch: + self._one_insert(self.data) + else: + for i in range(0, len(self.data), self.batch): + data = self.data[i:i+self.batch] + self._one_insert(data) + + def _one_insert(self, data): + query, params = self._get_insert(data) + _dml(query, params) + + class InsertProcessor(object): """Build an insert statement @@ -9438,16 +9530,16 @@ def importImageInternal(task_id, build_id, imgdata): rpm_ids.append(data['id']) # associate those RPMs with the image - insert = InsertProcessor('archive_rpm_components') + insert = BulkInsertProcessor('archive_rpm_components') for archive in archives: logger.info('working on archive %s', archive) if archive['filename'].endswith('xml'): continue - insert.set(archive_id = archive['id']) logger.info('associating installed rpms with %s', archive['id']) for rpm_id in rpm_ids: - insert.set(rpm_id = rpm_id) - insert.execute() + insert.add_record(archive_id=archive['id'], rpm_id=rpm_id) + if insert.data: + insert.execute() koji.plugin.run_callbacks('postImport', type='image', image=imgdata, build=build_info, fullpath=fullpath) @@ -12421,6 +12513,7 @@ class BuildRoot(object): def _setList(self, rpmlist, update=False): """Set or update the list of rpms in a buildroot""" + update = bool(update) if self.id is None: raise koji.GenericError("buildroot not specified") if update: @@ -12441,11 +12534,11 @@ class BuildRoot(object): #we sort to try to avoid deadlock issues rpm_ids.sort() - # actually do the inserts - insert = InsertProcessor('buildroot_listing') - insert.set(buildroot_id=self.id, is_update=bool(update)) - for rpm_id in rpm_ids: - insert.set(rpm_id=rpm_id) + # actually do the inserts (in bulk) + if rpm_ids: + insert = BulkInsertProcessor(table='buildroot_listing') + for rpm_id in rpm_ids: + insert.add_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update) insert.execute() def setList(self, rpmlist): @@ -12490,6 +12583,7 @@ class BuildRoot(object): If False, they dependencies required to setup the build environment. """ + project = bool(project) if self.is_standard: if not (context.opts.get('EnableMaven') or context.opts.get('EnableWin')): raise koji.GenericError("non-rpm support is not enabled") @@ -12498,21 +12592,25 @@ class BuildRoot(object): archives = set([r['id'] for r in archives]) current = set([r['id'] for r in self.getArchiveList()]) new_archives = archives.difference(current) - insert = InsertProcessor('buildroot_archives') - insert.set(buildroot_id=self.id, project_dep=bool(project)) - for archive_id in sorted(new_archives): - insert.set(archive_id=archive_id) + + if new_archives: + insert = BulkInsertProcessor('buildroot_archives') + for archive_id in sorted(new_archives): + insert.add_record(buildroot_id=self.id, + project_dep=project, + archive_id=archive_id) insert.execute() def setTools(self, tools): """Set tools info for buildroot""" - insert = InsertProcessor('buildroot_tools_info') - insert.set(buildroot_id=self.id) + if not tools: + return + + insert = BulkInsertProcessor('buildroot_tools_info') for tool in tools: - insert.set(tool=tool['name']) - insert.set(version=tool['version']) - insert.execute() + insert.add_record(buildroot_id=self.id, tool=tool['name'], version=tool['version']) + insert.execute() class Host(object): diff --git a/tests/test_hub/data/image/import_1/db.json b/tests/test_hub/data/image/import_1/db.json index ca9f44e..1cf84da 100644 --- a/tests/test_hub/data/image/import_1/db.json +++ b/tests/test_hub/data/image/import_1/db.json @@ -121,76 +121,25 @@ {} ], [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1002 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1002 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1002 - }, - {} + "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id0)s, %(rpm_id0)s), (%(archive_id1)s, %(rpm_id1)s), (%(archive_id2)s, %(rpm_id2)s), (%(archive_id3)s, %(rpm_id3)s), (%(archive_id4)s, %(rpm_id4)s), (%(archive_id5)s, %(rpm_id5)s), (%(archive_id6)s, %(rpm_id6)s), (%(archive_id7)s, %(rpm_id7)s), (%(archive_id8)s, %(rpm_id8)s)", + {"archive_id0": 1002, + "archive_id1": 1002, + "archive_id2": 1002, + "archive_id3": 1003, + "archive_id4": 1003, + "archive_id5": 1003, + "archive_id6": 1005, + "archive_id7": 1005, + "archive_id8": 1005, + "rpm_id0": 1000, + "rpm_id1": 1001, + "rpm_id2": 1002, + "rpm_id3": 1000, + "rpm_id4": 1001, + "rpm_id5": 1002, + "rpm_id6": 1000, + "rpm_id7": 1001, + "rpm_id8": 1002} ] ], "updates": [ diff --git a/tests/test_hub/test_complete_image_build.py b/tests/test_hub/test_complete_image_build.py index aa3015b..0b0d354 100644 --- a/tests/test_hub/test_complete_image_build.py +++ b/tests/test_hub/test_complete_image_build.py @@ -28,6 +28,17 @@ def make_insert_grabber(test): return grab_insert +def make_bulk_insert_grabber(test): + # test is the test class instance + def grab_insert(insert, data): + # insert is self for the BulkInsertProcessor instance + # we are replacing _one_insert() + query, params = insert._get_insert(data) + info = [query, copy.copy(params)] + test.inserts.append(info) + return grab_insert + + def make_update_grabber(test): # test is the test class instance def grab_update(update): @@ -68,6 +79,8 @@ class TestCompleteImageBuild(unittest.TestCase): self.updates = [] mock.patch.object(kojihub.InsertProcessor, 'execute', new=make_insert_grabber(self)).start() + mock.patch.object(kojihub.BulkInsertProcessor, '_one_insert', + new=make_bulk_insert_grabber(self)).start() mock.patch.object(kojihub.UpdateProcessor, 'execute', new=make_update_grabber(self)).start() mock.patch('kojihub.nextval', new=self.my_nextval).start() diff --git a/tests/test_hub/test_import_image_internal.py b/tests/test_hub/test_import_image_internal.py index 5de7f9f..c96555c 100644 --- a/tests/test_hub/test_import_image_internal.py +++ b/tests/test_hub/test_import_image_internal.py @@ -110,6 +110,6 @@ class TestImportImageInternal(unittest.TestCase): expression, kwargs = cursor.execute.mock_calls[0][1] expression = " ".join(expression.split()) expected = 'INSERT INTO archive_rpm_components (archive_id, rpm_id) ' + \ - 'VALUES (%(archive_id)s, %(rpm_id)s)' + 'VALUES (%(archive_id0)s, %(rpm_id0)s)' self.assertEquals(expression, expected) - self.assertEquals(kwargs, {'archive_id': 9, 'rpm_id': 6}) + self.assertEquals(kwargs, {'archive_id0': 9, 'rpm_id0': 6}) diff --git a/tests/test_hub/test_insert_processor.py b/tests/test_hub/test_insert_processor.py index bd0a5ee..1feaccd 100644 --- a/tests/test_hub/test_insert_processor.py +++ b/tests/test_hub/test_insert_processor.py @@ -5,6 +5,7 @@ try: except ImportError: import unittest +import koji import kojihub @@ -91,3 +92,117 @@ class TestInsertProcessor(unittest.TestCase): actual = str(proc) expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data self.assertEquals(actual, expected) + + +class TestBulkInsertProcessor(unittest.TestCase): + def test_basic_instantiation(self): + proc = kojihub.BulkInsertProcessor('sometable') + actual = str(proc) + expected = '-- incomplete insert: no data' + self.assertEquals(actual, expected) + + def test_to_string_with_single_row(self): + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + actual = str(proc) + expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)' + self.assertEquals(actual, expected) + + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + actual = str(proc) + self.assertEquals(actual, expected) + + @mock.patch('kojihub.context') + def test_simple_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + ) + + cursor.reset_mock() + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + ) + + @mock.patch('kojihub.context') + def test_bulk_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}]) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, + ) + + def test_missing_values(self): + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + + def test_missing_values_nostrict(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=False) + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') + actual = str(proc) + expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)' + self.assertEquals(actual, expected) + + def test_missing_values_explicit_columns(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2']) + proc.add_record(foo='bar') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + + @mock.patch('kojihub.context') + def test_batch_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEquals(len(calls), 2) + self.assertEquals( + calls[0][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)', + {'foo0': 'bar1', 'foo1': 'bar2'})) + self.assertEquals( + calls[1][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar3'})) + + @mock.patch('kojihub.context') + def test_no_batch_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=None) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEquals(len(calls), 1) + self.assertEquals( + calls[0][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}))