From fa7c20767fabe148b0155f528a0a52b7bbb1192f Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Sep 30 2019 13:48:08 +0000 Subject: PR#1173: hub: [groupListRemove] raise Error when no group for tag Merges #1173 https://pagure.io/koji/pull-request/1173 --- diff --git a/hub/kojihub.py b/hub/kojihub.py index 3c50e06..6a9711d 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -1724,6 +1724,16 @@ def _grplist_add(taginfo, grpinfo, block, force, **opts): def grplist_remove(taginfo, grpinfo, force=False): """Remove group from the list for tag + Permission required: admin + + :param taginfo: tag id or name which group is removed from + :type taginfo: int or str + :param grpinfo: group id or name which is removed + :type grpinfo: int or str + :param bool force: If False(default), GenericException will be raised when + no group found in the list for tag. If True, revoking + will be force to execute, no matter if the relationship + exists. Really this shouldn't be used except in special cases Most of the time you really want to use the block or unblock functions @@ -1733,13 +1743,22 @@ def grplist_remove(taginfo, grpinfo, force=False): _grplist_remove(taginfo, grpinfo, force) -def _grplist_remove(taginfo, grpinfo, force): +def _grplist_remove(taginfo, grpinfo, force=False): """grplist_remove without permission check""" tag = get_tag(taginfo, strict=True) group = lookup_group(grpinfo, strict=True) tag_id = tag['id'] grp_id = group['id'] clauses = ['group_id=%(grp_id)s', 'tag_id=%(tag_id)s'] + if not force: + query = QueryProcessor(columns=['group_id', 'tag_id', 'active'], + tables=['group_config'], + values=locals(), + clauses=clauses + [eventCondition(None)]) + old_grp_conf = query.executeOne() + if not old_grp_conf: + raise koji.GenericError("No group: %s found for tag: %s" + % (tag['name'], group['name'])) update = UpdateProcessor('group_config', values=locals(), clauses=clauses) update.make_revoke() update.execute() diff --git a/tests/test_hub/test_group_operations.py b/tests/test_hub/test_group_operations.py index e53101b..2f06d95 100644 --- a/tests/test_hub/test_group_operations.py +++ b/tests/test_hub/test_group_operations.py @@ -18,6 +18,13 @@ class TestGrouplist(unittest.TestCase): self.queries.append(query) return query + def getEmptyQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = mock.MagicMock() + query.execute.return_value = None + self.queries.append(query) + return query + def getInsert(self, *args, **kwargs): insert = IP(*args, **kwargs) insert.execute = mock.MagicMock() @@ -30,6 +37,11 @@ class TestGrouplist(unittest.TestCase): self.updates.append(update) return update + def reset_db_processors(self): + self.queries = [] + self.updates = [] + self.inserts = [] + def setUp(self): self.context = mock.patch('kojihub.context').start() self.get_tag = mock.patch('kojihub.get_tag').start() @@ -171,13 +183,42 @@ class TestGrouplist(unittest.TestCase): self.lookup_group.assert_called_once_with(group, strict=True) # db + self.assertEqual(len(self.queries), 1) self.assertEqual(len(self.inserts), 0) self.assertEqual(len(self.updates), 1) + query = self.queries[0] + self.assertEqual(' '.join(str(query).split()), + 'SELECT active, group_id, tag_id FROM group_config' + ' WHERE ((active = TRUE))' + ' AND (group_id=%(grp_id)s)' + ' AND (tag_id=%(tag_id)s)') update = self.updates[0] self.assertEqual(update.table, 'group_config') self.assertEqual(update.data, {'revoke_event': 42, 'revoker_id': 24}) self.assertEqual(update.rawdata, {'active': 'NULL'}) + # no group for tag found + self.reset_db_processors() + with mock.patch('kojihub.QueryProcessor', side_effect=self.getEmptyQuery): + with self.assertRaises(koji.GenericError) as cm: + kojihub.grplist_remove(tag, group) + + self.assertEqual(len(self.queries), 1) + self.assertEqual(len(self.inserts), 0) + self.assertEqual(len(self.updates), 0) + self.assertEqual(cm.exception.args[0], + 'No group: tag found for tag: group') + + # force = True + self.reset_db_processors() + with mock.patch('kojihub.QueryProcessor', + side_effect=self.getEmptyQuery): + kojihub.grplist_remove(tag, group, force=True) + + self.assertEqual(len(self.queries), 0) + self.assertEqual(len(self.inserts), 0) + self.assertEqual(len(self.updates), 1) + def test_grplist_unblock(self): # identical with test_grplist_add except blocked=True tag = 'tag'