From b1ac83aaf445b177fb0aa05e3cfa203c551051c5 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Oct 31 2019 10:33:27 +0000 Subject: [PATCH 1/4] use BulkInsertProcessor for hub mass inserts Fixes: https://pagure.io/koji/issue/1712 --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 7e16b87..5856a46 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -6434,16 +6434,16 @@ class CG_Importer(object): rpmlist = fileinfo['hub.rpmlist'] archives = fileinfo['hub.archives'] - insert = InsertProcessor('archive_rpm_components') - insert.set(archive_id=archive_id) - for rpminfo in rpmlist: - insert.set(rpm_id=rpminfo['id']) + if rpmlist: + insert = BulkInsertProcessor('archive_rpm_components') + for rpminfo in rpmlist: + insert.set(archive_id=archive_id, rpm_id=rpminfo['id']) insert.execute() - insert = InsertProcessor('archive_components') - insert.set(archive_id=archive_id) - for archiveinfo in archives: - insert.set(component_id=archiveinfo['id']) + if archives: + insert = BulkInsertProcessor('archive_components') + for archiveinfo in archives: + insert.set(archive_id=archive_id, component_id=archiveinfo['id']) insert.execute() @@ -8305,6 +8305,74 @@ def _fix_extra_field(row): return row +class BulkInsertProcessor(object): + def __init__(self, table, data=None, columns=None, strict=True): + """Do bulk inserts - it has some limitations compared to + InsertProcessor (no rawset, dup_check). + + set() is replaced with set_record() to avoid confusion + + table - name of the table + data - list of dict per record + columns - list/set of names of used columns - makes sense + mainly with strict=True + strict - all records must contain values for all columns, if + it is False, missing values will be inserted as NULLs + """ + + self.table = table + self.data = [] + self.prepared_data = {} + if columns is None: + self.columns = set() + else: + self.columns = set(columns) + if data is not None: + self.data = data + for row in data: + self.columns |= set(row.keys()) + self.strict = strict + + def __str__(self): + if not self.data: + return "-- incomplete update: no assigns" + parts = ['INSERT INTO %s ' % self.table] + columns = sorted(self.columns) + parts.append("(%s) " % ', '.join(columns)) + + self.prepared_data = {} + values = [] + i = 0 + for row in self.data: + row_values = [] + for key in columns: + if key in row: + row_key = '%s%d' % (key, i) + row_values.append("%%(%s)s" % row_key) + self.prepared_data[row_key] = row[key] + elif self.strict: + raise koji.GenericError("Missing value %s in BulkInsert" % key) + else: + row_values.append("NULL") + values.append("(%s)" % ', '.join(row_values)) + i += 1 + parts.append("VALUES %s" % ', '.join(values)) + return ''.join(parts) + + def __repr__(self): + return "" % vars(self) + + def set_record(self, **kwargs): + """Set whole record via keyword args""" + if not kwargs: + raise koji.GenericError("Missing values in BulkInsert.set_record") + self.data.append(kwargs) + self.columns |= set(kwargs.keys()) + + def execute(self): + return _dml(str(self), self.prepared_data) + + class InsertProcessor(object): """Build an insert statement @@ -9392,16 +9460,16 @@ def importImageInternal(task_id, build_id, imgdata): rpm_ids.append(data['id']) # associate those RPMs with the image - insert = InsertProcessor('archive_rpm_components') + insert = BulkInsertProcessor('archive_rpm_components') for archive in archives: logger.info('working on archive %s', archive) if archive['filename'].endswith('xml'): continue - insert.set(archive_id = archive['id']) logger.info('associating installed rpms with %s', archive['id']) for rpm_id in rpm_ids: - insert.set(rpm_id = rpm_id) - insert.execute() + insert.set_record(archive_id=archive['id'], rpm_id=rpm_id) + if insert.data: + insert.execute() koji.plugin.run_callbacks('postImport', type='image', image=imgdata, build=build_info, fullpath=fullpath) @@ -12373,6 +12441,7 @@ class BuildRoot(object): def _setList(self, rpmlist, update=False): """Set or update the list of rpms in a buildroot""" + update = bool(update) if self.id is None: raise koji.GenericError("buildroot not specified") if update: @@ -12393,11 +12462,11 @@ class BuildRoot(object): #we sort to try to avoid deadlock issues rpm_ids.sort() - # actually do the inserts - insert = InsertProcessor('buildroot_listing') - insert.set(buildroot_id=self.id, is_update=bool(update)) - for rpm_id in rpm_ids: - insert.set(rpm_id=rpm_id) + # actually do the inserts (in bulk) + if rpm_ids: + insert = BulkInsertProcessor(table='buildroot_listing') + for rpm_id in rpm_ids: + insert.set_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update) insert.execute() def setList(self, rpmlist): @@ -12442,6 +12511,7 @@ class BuildRoot(object): If False, they dependencies required to setup the build environment. """ + project = bool(project) if self.is_standard: if not (context.opts.get('EnableMaven') or context.opts.get('EnableWin')): raise koji.GenericError("non-rpm support is not enabled") @@ -12450,21 +12520,25 @@ class BuildRoot(object): archives = set([r['id'] for r in archives]) current = set([r['id'] for r in self.getArchiveList()]) new_archives = archives.difference(current) - insert = InsertProcessor('buildroot_archives') - insert.set(buildroot_id=self.id, project_dep=bool(project)) - for archive_id in sorted(new_archives): - insert.set(archive_id=archive_id) + + if new_archives: + insert = BulkInsertProcessor('buildroot_archives') + for archive_id in sorted(new_archives): + insert.set_record(buildroot_id=self.id, + project_dep=project, + archive_id=archive_id) insert.execute() def setTools(self, tools): """Set tools info for buildroot""" - insert = InsertProcessor('buildroot_tools_info') - insert.set(buildroot_id=self.id) + if not tools: + return + + insert = BulkInsertProcessor('buildroot_tools_info') for tool in tools: - insert.set(tool=tool['name']) - insert.set(version=tool['version']) - insert.execute() + insert.set_record(buildroot_id=self.id, tool=tool['name'], version=tool['version']) + insert.execute() class Host(object): diff --git a/tests/test_hub/data/image/import_1/db.json b/tests/test_hub/data/image/import_1/db.json index ca9f44e..1cf84da 100644 --- a/tests/test_hub/data/image/import_1/db.json +++ b/tests/test_hub/data/image/import_1/db.json @@ -121,76 +121,25 @@ {} ], [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1002, - "rpm_id": 1002 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1003, - "rpm_id": 1002 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1000 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1001 - }, - {} - ], - [ - "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)", - { - "archive_id": 1005, - "rpm_id": 1002 - }, - {} + "INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id0)s, %(rpm_id0)s), (%(archive_id1)s, %(rpm_id1)s), (%(archive_id2)s, %(rpm_id2)s), (%(archive_id3)s, %(rpm_id3)s), (%(archive_id4)s, %(rpm_id4)s), (%(archive_id5)s, %(rpm_id5)s), (%(archive_id6)s, %(rpm_id6)s), (%(archive_id7)s, %(rpm_id7)s), (%(archive_id8)s, %(rpm_id8)s)", + {"archive_id0": 1002, + "archive_id1": 1002, + "archive_id2": 1002, + "archive_id3": 1003, + "archive_id4": 1003, + "archive_id5": 1003, + "archive_id6": 1005, + "archive_id7": 1005, + "archive_id8": 1005, + "rpm_id0": 1000, + "rpm_id1": 1001, + "rpm_id2": 1002, + "rpm_id3": 1000, + "rpm_id4": 1001, + "rpm_id5": 1002, + "rpm_id6": 1000, + "rpm_id7": 1001, + "rpm_id8": 1002} ] ], "updates": [ diff --git a/tests/test_hub/test_complete_image_build.py b/tests/test_hub/test_complete_image_build.py index aa3015b..cf9f515 100644 --- a/tests/test_hub/test_complete_image_build.py +++ b/tests/test_hub/test_complete_image_build.py @@ -28,6 +28,16 @@ def make_insert_grabber(test): return grab_insert +def make_bulk_insert_grabber(test): + # test is the test class instance + def grab_insert(insert): + # insert is self for the InsertProcessor instance + # we are replacing execute() + info = [str(insert), copy.copy(insert.prepared_data)] + test.inserts.append(info) + return grab_insert + + def make_update_grabber(test): # test is the test class instance def grab_update(update): @@ -68,6 +78,8 @@ class TestCompleteImageBuild(unittest.TestCase): self.updates = [] mock.patch.object(kojihub.InsertProcessor, 'execute', new=make_insert_grabber(self)).start() + mock.patch.object(kojihub.BulkInsertProcessor, 'execute', + new=make_bulk_insert_grabber(self)).start() mock.patch.object(kojihub.UpdateProcessor, 'execute', new=make_update_grabber(self)).start() mock.patch('kojihub.nextval', new=self.my_nextval).start() diff --git a/tests/test_hub/test_import_image_internal.py b/tests/test_hub/test_import_image_internal.py index 5de7f9f..c96555c 100644 --- a/tests/test_hub/test_import_image_internal.py +++ b/tests/test_hub/test_import_image_internal.py @@ -110,6 +110,6 @@ class TestImportImageInternal(unittest.TestCase): expression, kwargs = cursor.execute.mock_calls[0][1] expression = " ".join(expression.split()) expected = 'INSERT INTO archive_rpm_components (archive_id, rpm_id) ' + \ - 'VALUES (%(archive_id)s, %(rpm_id)s)' + 'VALUES (%(archive_id0)s, %(rpm_id0)s)' self.assertEquals(expression, expected) - self.assertEquals(kwargs, {'archive_id': 9, 'rpm_id': 6}) + self.assertEquals(kwargs, {'archive_id0': 9, 'rpm_id0': 6}) diff --git a/tests/test_hub/test_insert_processor.py b/tests/test_hub/test_insert_processor.py index bd0a5ee..8ef3171 100644 --- a/tests/test_hub/test_insert_processor.py +++ b/tests/test_hub/test_insert_processor.py @@ -5,6 +5,7 @@ try: except ImportError: import unittest +import koji import kojihub @@ -91,3 +92,80 @@ class TestInsertProcessor(unittest.TestCase): actual = str(proc) expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data self.assertEquals(actual, expected) + + +class TestBulkInsertProcessor(unittest.TestCase): + def test_basic_instantiation(self): + proc = kojihub.BulkInsertProcessor('sometable') + actual = str(proc) + expected = '-- incomplete update: no assigns' + self.assertEquals(actual, expected) + + def test_to_string_with_single_row(self): + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + actual = str(proc) + expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)' + self.assertEquals(actual, expected) + + proc = kojihub.BulkInsertProcessor('sometable') + proc.set_record(foo='bar') + actual = str(proc) + self.assertEquals(actual, expected) + + @mock.patch('kojihub.context') + def test_simple_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + ) + + cursor.reset_mock() + proc = kojihub.BulkInsertProcessor('sometable') + proc.set_record(foo='bar') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + ) + + @mock.patch('kojihub.context') + def test_bulk_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}]) + proc.set_record(foo='bar2') + proc.set_record(foo='bar3') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, + ) + + def test_missing_values(self): + proc = kojihub.BulkInsertProcessor('sometable') + proc.set_record(foo='bar') + proc.set_record(foo2='bar2') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + + def test_missing_values_nostrict(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=False) + proc.set_record(foo='bar') + proc.set_record(foo2='bar2') + actual = str(proc) + expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)' + self.assertEquals(actual, expected) + + def test_missing_values_explicit_columns(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2']) + proc.set_record(foo='bar') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + From e46f19f9fd6985db1fb464fc48e9d96f3c0da400 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Nov 08 2019 19:27:46 +0000 Subject: [PATCH 2/4] rename set_record to add_record --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 5856a46..3380a08 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -8310,7 +8310,7 @@ class BulkInsertProcessor(object): """Do bulk inserts - it has some limitations compared to InsertProcessor (no rawset, dup_check). - set() is replaced with set_record() to avoid confusion + set() is replaced with add_record() to avoid confusion table - name of the table data - list of dict per record @@ -8362,10 +8362,10 @@ class BulkInsertProcessor(object): def __repr__(self): return "" % vars(self) - def set_record(self, **kwargs): + def add_record(self, **kwargs): """Set whole record via keyword args""" if not kwargs: - raise koji.GenericError("Missing values in BulkInsert.set_record") + raise koji.GenericError("Missing values in BulkInsert.add_record") self.data.append(kwargs) self.columns |= set(kwargs.keys()) @@ -9467,7 +9467,7 @@ def importImageInternal(task_id, build_id, imgdata): continue logger.info('associating installed rpms with %s', archive['id']) for rpm_id in rpm_ids: - insert.set_record(archive_id=archive['id'], rpm_id=rpm_id) + insert.add_record(archive_id=archive['id'], rpm_id=rpm_id) if insert.data: insert.execute() @@ -12466,7 +12466,7 @@ class BuildRoot(object): if rpm_ids: insert = BulkInsertProcessor(table='buildroot_listing') for rpm_id in rpm_ids: - insert.set_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update) + insert.add_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update) insert.execute() def setList(self, rpmlist): @@ -12524,7 +12524,7 @@ class BuildRoot(object): if new_archives: insert = BulkInsertProcessor('buildroot_archives') for archive_id in sorted(new_archives): - insert.set_record(buildroot_id=self.id, + insert.add_record(buildroot_id=self.id, project_dep=project, archive_id=archive_id) insert.execute() @@ -12537,7 +12537,7 @@ class BuildRoot(object): insert = BulkInsertProcessor('buildroot_tools_info') for tool in tools: - insert.set_record(buildroot_id=self.id, tool=tool['name'], version=tool['version']) + insert.add_record(buildroot_id=self.id, tool=tool['name'], version=tool['version']) insert.execute() diff --git a/tests/test_hub/test_insert_processor.py b/tests/test_hub/test_insert_processor.py index 8ef3171..eb3b8d2 100644 --- a/tests/test_hub/test_insert_processor.py +++ b/tests/test_hub/test_insert_processor.py @@ -108,7 +108,7 @@ class TestBulkInsertProcessor(unittest.TestCase): self.assertEquals(actual, expected) proc = kojihub.BulkInsertProcessor('sometable') - proc.set_record(foo='bar') + proc.add_record(foo='bar') actual = str(proc) self.assertEquals(actual, expected) @@ -125,7 +125,7 @@ class TestBulkInsertProcessor(unittest.TestCase): cursor.reset_mock() proc = kojihub.BulkInsertProcessor('sometable') - proc.set_record(foo='bar') + proc.add_record(foo='bar') proc.execute() cursor.execute.assert_called_once_with( 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', @@ -138,8 +138,8 @@ class TestBulkInsertProcessor(unittest.TestCase): context.cnx.cursor.return_value = cursor proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}]) - proc.set_record(foo='bar2') - proc.set_record(foo='bar3') + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') proc.execute() cursor.execute.assert_called_once_with( 'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', @@ -148,23 +148,23 @@ class TestBulkInsertProcessor(unittest.TestCase): def test_missing_values(self): proc = kojihub.BulkInsertProcessor('sometable') - proc.set_record(foo='bar') - proc.set_record(foo2='bar2') + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') with self.assertRaises(koji.GenericError) as cm: str(proc) self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') def test_missing_values_nostrict(self): proc = kojihub.BulkInsertProcessor('sometable', strict=False) - proc.set_record(foo='bar') - proc.set_record(foo2='bar2') + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') actual = str(proc) expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)' self.assertEquals(actual, expected) def test_missing_values_explicit_columns(self): proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2']) - proc.set_record(foo='bar') + proc.add_record(foo='bar') with self.assertRaises(koji.GenericError) as cm: str(proc) self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') From 2677e9c598293010e8563d133b8d0c98c152bde1 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Nov 13 2019 10:14:58 +0000 Subject: [PATCH 3/4] support batch operation in BulkInsertProcessor --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 3380a08..9def1c3 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -8306,7 +8306,7 @@ def _fix_extra_field(row): class BulkInsertProcessor(object): - def __init__(self, table, data=None, columns=None, strict=True): + def __init__(self, table, data=None, columns=None, strict=True, batch=1000): """Do bulk inserts - it has some limitations compared to InsertProcessor (no rawset, dup_check). @@ -8316,13 +8316,13 @@ class BulkInsertProcessor(object): data - list of dict per record columns - list/set of names of used columns - makes sense mainly with strict=True - strict - all records must contain values for all columns, if - it is False, missing values will be inserted as NULLs + strict - if True, all records must contain values for all columns. + if False, missing values will be inserted as NULLs + batch - batch size for inserts (one statement per batch) """ self.table = table self.data = [] - self.prepared_data = {} if columns is None: self.columns = set() else: @@ -8332,24 +8332,39 @@ class BulkInsertProcessor(object): for row in data: self.columns |= set(row.keys()) self.strict = strict + self.batch = batch def __str__(self): if not self.data: - return "-- incomplete update: no assigns" + return "-- incomplete insert: no data" + query, params = self._get_insert(self.data) + return query + + def _get_insert(self, data): + """ + Generate one insert statement for the given data + + :param list data: list of rows (dict format) to insert + :returns: (query, params) + """ + + if not data: + # should not happen + raise ValueError('no data for insert') parts = ['INSERT INTO %s ' % self.table] columns = sorted(self.columns) parts.append("(%s) " % ', '.join(columns)) - self.prepared_data = {} + prepared_data = {} values = [] i = 0 - for row in self.data: + for row in data: row_values = [] for key in columns: if key in row: row_key = '%s%d' % (key, i) row_values.append("%%(%s)s" % row_key) - self.prepared_data[row_key] = row[key] + prepared_data[row_key] = row[key] elif self.strict: raise koji.GenericError("Missing value %s in BulkInsert" % key) else: @@ -8357,7 +8372,7 @@ class BulkInsertProcessor(object): values.append("(%s)" % ', '.join(row_values)) i += 1 parts.append("VALUES %s" % ', '.join(values)) - return ''.join(parts) + return ''.join(parts), prepared_data def __repr__(self): return "" % vars(self) @@ -8370,7 +8385,16 @@ class BulkInsertProcessor(object): self.columns |= set(kwargs.keys()) def execute(self): - return _dml(str(self), self.prepared_data) + if not self.batch: + self.__one_insert(self.data) + else: + for i in range(0, len(self.data), self.batch): + data = self.data[i:i+self.batch] + self._one_insert(data) + + def _one_insert(self, data): + query, params = self._get_insert(data) + _dml(query, params) class InsertProcessor(object): diff --git a/tests/test_hub/test_complete_image_build.py b/tests/test_hub/test_complete_image_build.py index cf9f515..0b0d354 100644 --- a/tests/test_hub/test_complete_image_build.py +++ b/tests/test_hub/test_complete_image_build.py @@ -30,10 +30,11 @@ def make_insert_grabber(test): def make_bulk_insert_grabber(test): # test is the test class instance - def grab_insert(insert): - # insert is self for the InsertProcessor instance - # we are replacing execute() - info = [str(insert), copy.copy(insert.prepared_data)] + def grab_insert(insert, data): + # insert is self for the BulkInsertProcessor instance + # we are replacing _one_insert() + query, params = insert._get_insert(data) + info = [query, copy.copy(params)] test.inserts.append(info) return grab_insert @@ -78,7 +79,7 @@ class TestCompleteImageBuild(unittest.TestCase): self.updates = [] mock.patch.object(kojihub.InsertProcessor, 'execute', new=make_insert_grabber(self)).start() - mock.patch.object(kojihub.BulkInsertProcessor, 'execute', + mock.patch.object(kojihub.BulkInsertProcessor, '_one_insert', new=make_bulk_insert_grabber(self)).start() mock.patch.object(kojihub.UpdateProcessor, 'execute', new=make_update_grabber(self)).start() diff --git a/tests/test_hub/test_insert_processor.py b/tests/test_hub/test_insert_processor.py index eb3b8d2..17601ed 100644 --- a/tests/test_hub/test_insert_processor.py +++ b/tests/test_hub/test_insert_processor.py @@ -98,7 +98,7 @@ class TestBulkInsertProcessor(unittest.TestCase): def test_basic_instantiation(self): proc = kojihub.BulkInsertProcessor('sometable') actual = str(proc) - expected = '-- incomplete update: no assigns' + expected = '-- incomplete insert: no data' self.assertEquals(actual, expected) def test_to_string_with_single_row(self): @@ -169,3 +169,23 @@ class TestBulkInsertProcessor(unittest.TestCase): str(proc) self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + @mock.patch('kojihub.context') + def test_batch_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEquals(len(calls), 2) + self.assertEquals( + calls[0][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)', + {'foo0': 'bar1', 'foo1': 'bar2'})) + self.assertEquals( + calls[1][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar3'})) From 8308ac762026b2479470e2a5c036c886e3d1d854 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Nov 14 2019 08:43:50 +0000 Subject: [PATCH 4/4] fix typo and extend unit test to cover where it occurred --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 9def1c3..74c90da 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -8386,7 +8386,7 @@ class BulkInsertProcessor(object): def execute(self): if not self.batch: - self.__one_insert(self.data) + self._one_insert(self.data) else: for i in range(0, len(self.data), self.batch): data = self.data[i:i+self.batch] diff --git a/tests/test_hub/test_insert_processor.py b/tests/test_hub/test_insert_processor.py index 17601ed..1feaccd 100644 --- a/tests/test_hub/test_insert_processor.py +++ b/tests/test_hub/test_insert_processor.py @@ -189,3 +189,20 @@ class TestBulkInsertProcessor(unittest.TestCase): calls[1][1], ('INSERT INTO sometable (foo) VALUES (%(foo0)s)', {'foo0': 'bar3'})) + + @mock.patch('kojihub.context') + def test_no_batch_execution(self, context): + cursor = mock.MagicMock() + context.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=None) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEquals(len(calls), 1) + self.assertEquals( + calls[0][1], + ('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}))