From f0cad50a48a99fbe84d024bb9814fcc97678a8b6 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Aug 02 2021 07:49:52 +0000 Subject: PR#2965: Add remove-sig CLI and deleteRPMSig hub call Merges #2965 https://pagure.io/koji/pull-request/2965 Fixes: #2665 https://pagure.io/koji/issue/2665 API for deleting signature --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index 7375f26..70af3a5 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -1640,6 +1640,37 @@ def handle_import_sig(goptions, session, args): session.writeSignedRPM(rinfo['id'], sigkey) +def handle_remove_sig(goptions, session, args): + "[admin] Remove signed RPMs from db and disk" + usage = _("usage: %prog remove-sig [options] ") + parser = OptionParser(usage=get_usage_str(usage)) + parser.add_option("--sigkey", action="store", default=None, help=_("Specify signature key")) + parser.add_option("--all", action="store_true", default=False, + help=_("Remove all signed copies for specified RPM")) + (options, args) = parser.parse_args(args) + if len(args) < 1: + parser.error(_("Please specify an RPM")) + + if not options.all and not options.sigkey: + error("Either --sigkey or --all options must be given") + + if options.all and options.sigkey: + error("Conflicting options specified") + + activate_session(session, goptions) + rpminfo = args[0] + + try: + session.deleteRPMSig(rpminfo, sigkey=options.sigkey, all_sigs=options.all) + except koji.GenericError as e: + msg = str(e) + if msg.startswith("No such rpm"): + # make this a little more readable than the hub error + error("No such rpm in system: %s" % rpminfo) + else: + error("Signature removal failed: %s" % msg) + + def handle_write_signed_rpm(goptions, session, args): "[admin] Write signed RPMs to disk" usage = _("usage: %prog write-signed-rpm [options] [ ...]") diff --git a/hub/kojihub.py b/hub/kojihub.py index 3b181e8..ca8977a 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -7585,6 +7585,75 @@ def add_rpm_sig(an_rpm, sighdr): sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo) +def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False): + """Delete rpm signature + + :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch' + string N-V-R.A + int ID + :param str sigkey: Signature key. + :param bool all_sigs: Delete all signed copies for specified RPM. + """ + if all_sigs: + sigkey = None + elif not sigkey: + raise koji.GenericError("No signature specified") + rinfo = get_rpm(rpminfo, strict=True) + if rinfo['external_repo_id']: + raise koji.GenericError("Not an internal rpm: %s (from %s)" + % (rpminfo, rinfo['external_repo_name'])) + + rpm_query_result = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=sigkey) + if not rpm_query_result: + nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo + raise koji.GenericError("%s has no matching signatures to delete" % nvra) + + clauses = ["rpm_id=%(rpm_id)i"] + if sigkey is not None: + clauses.append("sigkey=%(sigkey)s") + clauses_str = " AND ".join(clauses) + delete = """DELETE FROM rpmsigs WHERE %s""" % clauses_str + rpm_id = rinfo['id'] + _dml(delete, locals()) + binfo = get_build(rinfo['build_id']) + builddir = koji.pathinfo.build(binfo) + list_sigcaches = [] + list_sighdrs = [] + for rpmsig in rpm_query_result: + list_sigcaches.append(joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey']))) + list_sighdrs.append(joinpath(builddir, koji.pathinfo.signed(rinfo, rpmsig['sigkey']))) + list_paths = list_sighdrs + list_sigcaches + count = 0 + for file_path in list_paths: + try: + os.remove(file_path) + count += 1 + except FileNotFoundError: + logger.info("File: %s has been deleted", file_path) + except Exception: + logger.error("An error happens when deleting %s, %s deleting are deleted, " + "%s deleting are skipped, the original request is %s rpm " + "and %s sigkey", file_path, + list_paths[:count], list_paths[count:], rpminfo, sigkey, exc_info=True) + raise koji.GenericError("File %s cannot be deleted." % file_path) + + for path in list_paths: + basedir = os.path.dirname(path) + if os.path.isdir(basedir) and not os.listdir(basedir): + try: + os.rmdir(basedir) + except OSError: + logger.warning("An error happens when deleting %s directory", + basedir, exc_info=True) + sigdir = os.path.dirname(basedir) + if os.path.isdir(sigdir) and not os.listdir(sigdir): + try: + os.rmdir(sigdir) + except OSError: + logger.warning("An error happens when deleting %s directory", + sigdir, exc_info=True) + + def _scan_sighdr(sighdr, fn): """Splices sighdr with other headers from fn and queries (no payload)""" # This is hackish, but it works @@ -11941,6 +12010,18 @@ class RootExports(object): context.session.assertPerm('sign') return add_rpm_sig(an_rpm, base64.b64decode(data)) + def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False): + """Delete rpm signature + + :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch' + string N-V-R.A + int ID + :param str sigkey: Signature key. + :param bool all_sigs: Delete all signed copies for specified RPM. + """ + context.session.assertPerm('admin') + return delete_rpm_sig(rpminfo, sigkey=sigkey, all_sigs=all_sigs) + findBuildID = staticmethod(find_build_id) getTagID = staticmethod(get_tag_id) getTag = staticmethod(get_tag) diff --git a/tests/test_cli/data/list-commands-admin.txt b/tests/test_cli/data/list-commands-admin.txt index 407d93e..58f9b83 100644 --- a/tests/test_cli/data/list-commands-admin.txt +++ b/tests/test_cli/data/list-commands-admin.txt @@ -50,6 +50,7 @@ admin commands: remove-group Remove group from tag remove-host-from-channel Remove a host from a channel remove-pkg Remove a package from the listing for tag + remove-sig Remove signed RPMs from db and disk remove-tag Remove a tag remove-tag-inheritance Remove a tag inheritance link remove-target Remove a build target diff --git a/tests/test_cli/data/list-commands.txt b/tests/test_cli/data/list-commands.txt index e89ad14..9e0e72c 100644 --- a/tests/test_cli/data/list-commands.txt +++ b/tests/test_cli/data/list-commands.txt @@ -50,6 +50,7 @@ admin commands: remove-group Remove group from tag remove-host-from-channel Remove a host from a channel remove-pkg Remove a package from the listing for tag + remove-sig Remove signed RPMs from db and disk remove-tag Remove a tag remove-tag-inheritance Remove a tag inheritance link remove-target Remove a build target diff --git a/tests/test_cli/test_remove_sig.py b/tests/test_cli/test_remove_sig.py new file mode 100644 index 0000000..ac506fa --- /dev/null +++ b/tests/test_cli/test_remove_sig.py @@ -0,0 +1,62 @@ +from __future__ import absolute_import + +import mock +from six.moves import StringIO + +import koji +from koji_cli.commands import handle_remove_sig +from . import utils + + +class TestRemoveSig(utils.CliTestCase): + maxDiff = None + + def setUp(self): + self.options = mock.MagicMock() + self.options.debug = False + self.session = mock.MagicMock() + self.session.getAPIVersion.return_value = koji.API_VERSION + + def test_delete_sig_help(self): + self.assert_help( + handle_remove_sig, + """Usage: %s remove-sig [options] +(Specify the --help global option for a list of other help options) + +Options: + -h, --help show this help message and exit + --sigkey=SIGKEY Specify signature key + --all Remove all signed copies for specified RPM +""" % self.progname) + + @mock.patch('sys.stderr', new_callable=StringIO) + def test_delete_sig_without_option(self, stderr): + expected = "Usage: %s remove-sig [options] \n" \ + "(Specify the --help global option for a list of other help options)\n\n" \ + "%s: error: Please specify an RPM\n" % (self.progname, self.progname) + with self.assertRaises(SystemExit) as ex: + handle_remove_sig(self.options, self.session, []) + self.assertExitCode(ex, 2) + self.assert_console_message(stderr, expected) + + @mock.patch('sys.stdout', new_callable=StringIO) + def test_delete_sig_non_exist_rpm(self, stdout): + rpm = '1234' + expected = "No such rpm in system: %s\n" % rpm + self.session.deleteRPMSig.side_effect = koji.GenericError('No such rpm: DATA') + + self.assert_system_exit( + handle_remove_sig, + self.options, + self.session, + [rpm, '--all'], + stderr=self.format_error_message(expected), + exit_code=1, + activate_session=None) + self.session.deleteRPMSig.assert_called_with('1234', sigkey=None, all_sigs=True) + + def test_delete_sig_valid(self): + rpm = '1' + self.session.deleteRPMSig.return_value = None + handle_remove_sig(self.options, self.session, [rpm, '--sigkey', 'testkey']) + self.session.deleteRPMSig.assert_called_with('1', sigkey='testkey', all_sigs=False) diff --git a/tests/test_hub/test_delete_rpm_sig.py b/tests/test_hub/test_delete_rpm_sig.py new file mode 100644 index 0000000..6234f95 --- /dev/null +++ b/tests/test_hub/test_delete_rpm_sig.py @@ -0,0 +1,161 @@ +import unittest + +import mock + +import koji +import kojihub + +QP = kojihub.QueryProcessor + + +class TestDeleteRPMSig(unittest.TestCase): + + def getQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = mock.MagicMock() + self.queries.append(query) + return query + + def setUp(self): + self.QueryProcessor = mock.patch('kojihub.QueryProcessor', + side_effect=self.getQuery).start() + self.queries = [] + self.get_rpm = mock.patch('kojihub.get_rpm').start() + self.query_rpm_sigs = mock.patch('kojihub.query_rpm_sigs').start() + self.get_build = mock.patch('kojihub.get_build').start() + self.buildinfo = {'build_id': 1, + 'epoch': None, + 'extra': None, + 'id': 1, + 'name': 'fs_mark', + 'nvr': 'fs_mark-3.3-20.el8', + 'owner_id': 1, + 'owner_name': 'kojiadmin', + 'package_id': 1, + 'package_name': 'fs_mark', + 'release': '20.el8', + 'state': 1, + 'task_id': None, + 'version': '3.3'} + self.rinfo = {'arch': 'x86_64', + 'build_id': 1, + 'buildroot_id': None, + 'buildtime': 1564782768, + 'epoch': None, + 'external_repo_id': None, + 'extra': None, + 'id': 2, + 'metadata_only': False, + 'name': 'fs_mark', + 'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a', + 'release': '20.el8', + 'size': 25644, + 'version': '3.3'} + self.queryrpmsigs = [{'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614', + 'sigkey': ''}, + {'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6', + 'sigkey': '2f86d6a1'}] + + def tearDown(self): + mock.patch.stopall() + + @mock.patch('kojihub._dml') + def test_rpm_not_existing(self, dml): + rpm_id = 1234 + expected_msg = 'No such rpm: %s' % rpm_id + self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id) + with self.assertRaises(koji.GenericError) as ex: + kojihub.delete_rpm_sig(rpm_id, all_sigs=True) + self.assertEqual(len(self.queries), 0) + self.assertEqual(ex.exception.args[0], expected_msg) + self.get_rpm.assert_called_once_with(rpm_id, strict=True) + self.query_rpm_sigs.assert_not_called() + dml.assert_not_called() + + @mock.patch('kojihub._dml') + def test_not_all_sig_and_not_sigkey(self, dml): + expected_msg = 'No signature specified' + with self.assertRaises(koji.GenericError) as ex: + kojihub.delete_rpm_sig(1234) + self.assertEqual(len(self.queries), 0) + self.assertEqual(ex.exception.args[0], expected_msg) + self.get_rpm.assert_not_called() + self.query_rpm_sigs.assert_not_called() + dml.assert_not_called() + + @mock.patch('kojihub._dml') + def test_external_repo(self, dml): + rpminfo = 1234 + rinfo = {'external_repo_id': 1, 'external_repo_name': 'INTERNAL'} + self.get_rpm.return_value = rinfo + with self.assertRaises(koji.GenericError) as ex: + kojihub.delete_rpm_sig(rpminfo, all_sigs=True) + self.assertEqual(len(self.queries), 0) + expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name']) + self.assertEqual(ex.exception.args[0], expected_msg) + self.get_rpm.assert_called_once_with(rpminfo, strict=True) + self.query_rpm_sigs.assert_not_called() + dml.assert_not_called() + + @mock.patch('kojihub._dml') + def test_empty_query_sign(self, dml): + rpminfo = 1234 + nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'], + self.rinfo['arch']) + expected_msg = "%s has no matching signatures to delete" % nvra + self.get_rpm.return_value = self.rinfo + self.query_rpm_sigs.return_value = [] + with self.assertRaises(koji.GenericError) as ex: + kojihub.delete_rpm_sig(rpminfo, all_sigs=True) + self.assertEqual(len(self.queries), 0) + self.assertEqual(ex.exception.args[0], expected_msg) + self.get_rpm.assert_called_once_with(rpminfo, strict=True) + self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None) + dml.assert_not_called() + + @mock.patch('kojihub._dml') + @mock.patch('koji.pathinfo.build', return_value='fakebuildpath') + @mock.patch('os.remove') + def test_file_not_found_error(self, os_remove, pb, dml): + rpminfo = 2 + os_remove.side_effect = FileNotFoundError() + self.get_rpm.return_value = self.rinfo + self.get_build.return_value = self.buildinfo + self.query_rpm_sigs.return_value = self.queryrpmsigs + r = kojihub.delete_rpm_sig(rpminfo, all_sigs=True) + self.assertEqual(r, None) + self.assertEqual(len(self.queries), 0) + self.get_rpm.assert_called_once_with(rpminfo, strict=True) + self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None) + self.get_build.assert_called_once_with(self.rinfo['build_id']) + + @mock.patch('kojihub._dml') + @mock.patch('koji.pathinfo.build', return_value='fakebuildpath') + @mock.patch('os.remove', side_effect=OSError) + def test_not_valid(self, os_remove, pb, dml): + rpminfo = 2 + filepath = 'fakebuildpath/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm' + self.get_rpm.return_value = self.rinfo + self.get_build.return_value = self.buildinfo + self.query_rpm_sigs.return_value = self.queryrpmsigs + expected_msg = "File %s cannot be deleted." % filepath + with self.assertRaises(koji.GenericError) as ex: + kojihub.delete_rpm_sig(rpminfo, all_sigs=True) + self.assertEqual(ex.exception.args[0], expected_msg) + self.assertEqual(len(self.queries), 0) + self.get_rpm.assert_called_once_with(rpminfo, strict=True) + self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None) + self.get_build.assert_called_once_with(self.rinfo['build_id']) + + @mock.patch('kojihub._dml') + @mock.patch('koji.pathinfo.build', return_value='fakebuildpath') + @mock.patch('os.remove') + def test_valid(self, os_remove, pb, dml): + rpminfo = 2 + self.get_rpm.return_value = self.rinfo + self.get_build.return_value = self.buildinfo + self.query_rpm_sigs.return_value = self.queryrpmsigs + kojihub.delete_rpm_sig(rpminfo, all_sigs=True) + self.get_rpm.assert_called_once_with(rpminfo, strict=True) + self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None) + self.get_build.assert_called_once_with(self.rinfo['build_id'])