#2962 Add delete-rpm-sig CLI and deleteRPMSig hub call
Closed 2 years ago by tkopecek. Opened 2 years ago by tkopecek.
tkopecek/koji issue2665  into  master

file modified
+30
@@ -1640,6 +1640,36 @@ 

              session.writeSignedRPM(rinfo['id'], sigkey)

  

  

+ def handle_remove_sig(goptions, session, args):

+     "[admin] Remove signed RPMs from db and disk"

+     usage = _("usage: %prog remove-sig [options] <rpm-id/n-v-r.a/rpminfo>")

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--sigkey", action="store", default=None, help=_("Specify signature key"))

+     parser.add_option("--all", action="store_true",

+                       help=_("Remove all signed copies for specified RPM"))

+     (options, args) = parser.parse_args(args)

+     if len(args) < 1:

+         parser.error(_("Please specify an RPM"))

+ 

+     if not options.all and not options.sigkey:

+         error("Either --sigkey or --all options must be given")

+ 

+     if options.all and options.sigkey:

+         error("Conflicting options specified")

+ 

+     activate_session(session, goptions)

+     rpminfo = args[0]

+ 

+     rinfo = session.getRPM(rpminfo)

+     if not rinfo:

+         print("No such rpm in system: %s" % rpminfo)

+     else:

+         try:

+             session.deleteRPMSig(rpminfo, sigkey=options.sigkey)

+         except koji.GenericError:

+             print("Signature %s for rpm %s does not existing" % (options.sigkey, rpminfo))

+ 

+ 

  def handle_write_signed_rpm(goptions, session, args):

      "[admin] Write signed RPMs to disk"

      usage = _("usage: %prog write-signed-rpm [options] <signature-key> <n-v-r> [<n-v-r> ...]")

file modified
+81
@@ -7585,6 +7585,75 @@ 

                                sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)

  

  

+ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):

+     """Delete rpm signature

+ 

+     :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'

+                             string N-V-R.A

+                             int ID

+     :param str sigkey: Signature key.

+     :param bool all_sigs: Delete all signed copies for specified RPM.

+     """

+     if all_sigs:

+         sigkey = None

+     elif not sigkey:

+         raise koji.GenericError("No signature specified")

+     rinfo = get_rpm(rpminfo, strict=True)

+     if rinfo['external_repo_id']:

+         raise koji.GenericError("Not an internal rpm: %s (from %s)"

+                                 % (rpminfo, rinfo['external_repo_name']))

+ 

+     rpm_query_result = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=sigkey)

+     if not rpm_query_result:

+         nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

+         raise koji.GenericError("%s has no matching signatures to delete" % nvra)

+ 

+     clauses = ["rpm_id=%(rpm_id)i"]

+     if sigkey is not None:

+         clauses.append("sigkey=%(sigkey)s")

+     clauses_str = " AND ".join(clauses)

+     delete = """DELETE FROM rpmsigs WHERE %s""" % clauses_str

+     rpm_id = rinfo['id']

+     _dml(delete, locals())

+     binfo = get_build(rinfo['build_id'])

+     builddir = koji.pathinfo.build(binfo)

+     list_sigcaches = []

+     list_sighdrs = []

+     for rpmsig in rpm_query_result:

+         list_sigcaches.append(joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey'])))

+         list_sighdrs.append(joinpath(builddir, koji.pathinfo.signed(rinfo, rpmsig['sigkey'])))

+     list_paths = list_sighdrs + list_sigcaches

+     count = 0

+     for file_path in list_paths:

+         try:

+             os.remove(file_path)

+             count += 1

+         except FileNotFoundError:

+             logger.info("File: %s has been deleted", file_path)

+         except Exception:

+             logger.error("An error happens when deleting %s, %s deleting are deleted, "

+                          "%s deleting are skipped, the original request is %s rpm "

+                          "and %s sigkey", file_path,

+                          list_paths[:count], list_paths[count:], rpminfo, sigkey, exc_info=True)

+             raise koji.GenericError("File %s cannot be deleted." % file_path)

+ 

+     for path in list_paths:

+         basedir = os.path.dirname(path)

+         if os.path.isdir(basedir) and not os.listdir(basedir):

+             try:

+                 os.rmdir(basedir)

+             except OSError:

+                 logger.warning("An error happens when deleting %s directory",

+                                basedir, exc_info=True)

+         sigdir = os.path.dirname(basedir)

+         if os.path.isdir(sigdir) and not os.listdir(sigdir):

+             try:

+                 os.rmdir(sigdir)

+             except OSError:

+                 logger.warning("An error happens when deleting %s directory",

+                                sigdir, exc_info=True)

+ 

+ 

  def _scan_sighdr(sighdr, fn):

      """Splices sighdr with other headers from fn and queries (no payload)"""

      # This is hackish, but it works
@@ -11941,6 +12010,18 @@ 

          context.session.assertPerm('sign')

          return add_rpm_sig(an_rpm, base64.b64decode(data))

  

+     def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False):

+         """Delete rpm signature

+ 

+         :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'

+                                 string N-V-R.A

+                                 int ID

+         :param str sigkey: Signature key.

+         :param bool all_sigs: Delete all signed copies for specified RPM.

+         """

+         context.session.assertPerm('admin')

+         return delete_rpm_sig(rpminfo, sigkey=sigkey, all_sigs=all_sigs)

+ 

      findBuildID = staticmethod(find_build_id)

      getTagID = staticmethod(get_tag_id)

      getTag = staticmethod(get_tag)

@@ -50,6 +50,7 @@ 

          remove-group              Remove group from tag

          remove-host-from-channel  Remove a host from a channel

          remove-pkg                Remove a package from the listing for tag

+         remove-sig                Remove signed RPMs from db and disk

          remove-tag                Remove a tag

          remove-tag-inheritance    Remove a tag inheritance link

          remove-target             Remove a build target

@@ -50,6 +50,7 @@ 

          remove-group              Remove group from tag

          remove-host-from-channel  Remove a host from a channel

          remove-pkg                Remove a package from the listing for tag

+         remove-sig                Remove signed RPMs from db and disk

          remove-tag                Remove a tag

          remove-tag-inheritance    Remove a tag inheritance link

          remove-target             Remove a build target

@@ -0,0 +1,73 @@ 

+ from __future__ import absolute_import

+ 

+ import mock

+ from six.moves import StringIO

+ 

+ import koji

+ from koji_cli.commands import handle_remove_sig

+ from . import utils

+ 

+ 

+ class TestRemoveSig(utils.CliTestCase):

+     maxDiff = None

+ 

+     def setUp(self):

+         self.options = mock.MagicMock()

+         self.options.debug = False

+         self.session = mock.MagicMock()

+         self.session.getAPIVersion.return_value = koji.API_VERSION

+ 

+     def test_delete_sig_help(self):

+         self.assert_help(

+             handle_remove_sig,

+             """Usage: %s remove-sig [options] <rpm-id/n-v-r.a/rpminfo>

+ (Specify the --help global option for a list of other help options)

+ 

+ Options:

+   -h, --help       show this help message and exit

+   --sigkey=SIGKEY  Specify signature key

+   --all            Remove all signed copies for specified RPM

+ """ % self.progname)

+ 

+     @mock.patch('sys.stderr', new_callable=StringIO)

+     def test_delete_sig_without_option(self, stderr):

+         expected = "Usage: %s remove-sig [options] <rpm-id/n-v-r.a/rpminfo>\n" \

+                    "(Specify the --help global option for a list of other help options)\n\n" \

+                    "%s: error: Please specify an RPM\n" % (self.progname, self.progname)

+         with self.assertRaises(SystemExit) as ex:

+             handle_remove_sig(self.options, self.session, [])

+         self.assertExitCode(ex, 2)

+         self.assert_console_message(stderr, expected)

+ 

+     @mock.patch('sys.stdout', new_callable=StringIO)

+     def test_delete_sig_non_exist_rpm(self, stdout):

+         rpm = '1234'

+         expected = "No such rpm in system: %s\n" % rpm

+         self.session.getRPM.return_value = None

+         handle_remove_sig(self.options, self.session, [rpm, '--all'])

+         self.assert_console_message(stdout, expected)

+         self.session.getRPM.assert_called_with('1234')

+         self.session.deleteRPMSig.assert_not_called()

+ 

+     def test_delete_sig_valid(self):

+         rpm = '1'

+         rpminfo = {'arch': 'src',

+                    'build_id': 10,

+                    'buildroot_id': None,

+                    'buildtime': 1618361584,

+                    'epoch': None,

+                    'external_repo_id': 0,

+                    'external_repo_name': 'INTERNAL',

+                    'extra': None,

+                    'id': 1,

+                    'metadata_only': False,

+                    'name': 'koji',

+                    'payloadhash': 'c2b13f978c45e274c856e0a4599842a4',

+                    'release': '1.fc34',

+                    'size': 1178794,

+                    'version': '1.24.1'}

+         self.session.getRPM.return_value = rpminfo

+         self.session.deleteRPMSig.return_value = None

+         handle_remove_sig(self.options, self.session, [rpm, '--sigkey', 'testkey'])

+         self.session.getRPM.assert_called_with('1')

+         self.session.deleteRPMSig.assert_called_with('1', sigkey='testkey')

@@ -0,0 +1,161 @@ 

+ import unittest

+ 

+ import mock

+ 

+ import koji

+ import kojihub

+ 

+ QP = kojihub.QueryProcessor

+ 

+ 

+ class TestDeleteRPMSig(unittest.TestCase):

+ 

+     def getQuery(self, *args, **kwargs):

+         query = QP(*args, **kwargs)

+         query.execute = mock.MagicMock()

+         self.queries.append(query)

+         return query

+ 

+     def setUp(self):

+         self.QueryProcessor = mock.patch('kojihub.QueryProcessor',

+                                          side_effect=self.getQuery).start()

+         self.queries = []

+         self.get_rpm = mock.patch('kojihub.get_rpm').start()

+         self.query_rpm_sigs = mock.patch('kojihub.query_rpm_sigs').start()

+         self.get_build = mock.patch('kojihub.get_build').start()

+         self.buildinfo = {'build_id': 1,

+                           'epoch': None,

+                           'extra': None,

+                           'id': 1,

+                           'name': 'fs_mark',

+                           'nvr': 'fs_mark-3.3-20.el8',

+                           'owner_id': 1,

+                           'owner_name': 'kojiadmin',

+                           'package_id': 1,

+                           'package_name': 'fs_mark',

+                           'release': '20.el8',

+                           'state': 1,

+                           'task_id': None,

+                           'version': '3.3'}

+         self.rinfo = {'arch': 'x86_64',

+                       'build_id': 1,

+                       'buildroot_id': None,

+                       'buildtime': 1564782768,

+                       'epoch': None,

+                       'external_repo_id': None,

+                       'extra': None,

+                       'id': 2,

+                       'metadata_only': False,

+                       'name': 'fs_mark',

+                       'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a',

+                       'release': '20.el8',

+                       'size': 25644,

+                       'version': '3.3'}

+         self.queryrpmsigs = [{'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614',

+                               'sigkey': ''},

+                              {'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6',

+                               'sigkey': '2f86d6a1'}]

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     @mock.patch('kojihub._dml')

+     def test_rpm_not_existing(self, dml):

+         rpm_id = 1234

+         expected_msg = 'No such rpm: %s' % rpm_id

+         self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id)

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.delete_rpm_sig(rpm_id, all_sigs=True)

+         self.assertEqual(len(self.queries), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpm_id, strict=True)

+         self.query_rpm_sigs.assert_not_called()

+         dml.assert_not_called()

+ 

+     @mock.patch('kojihub._dml')

+     def test_not_all_sig_and_not_sigkey(self, dml):

+         expected_msg = 'No signature specified'

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.delete_rpm_sig(1234)

+         self.assertEqual(len(self.queries), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_not_called()

+         self.query_rpm_sigs.assert_not_called()

+         dml.assert_not_called()

+ 

+     @mock.patch('kojihub._dml')

+     def test_external_repo(self, dml):

+         rpminfo = 1234

+         rinfo = {'external_repo_id': 1, 'external_repo_name': 'INTERNAL'}

+         self.get_rpm.return_value = rinfo

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

+         self.assertEqual(len(self.queries), 0)

+         expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name'])

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_not_called()

+         dml.assert_not_called()

+ 

+     @mock.patch('kojihub._dml')

+     def test_empty_query_sign(self, dml):

+         rpminfo = 1234

+         nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],

+                                 self.rinfo['arch'])

+         expected_msg = "%s has no matching signatures to delete" % nvra

+         self.get_rpm.return_value = self.rinfo

+         self.query_rpm_sigs.return_value = []

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

+         self.assertEqual(len(self.queries), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

+         dml.assert_not_called()

+ 

+     @mock.patch('kojihub._dml')

+     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

+     @mock.patch('os.remove')

+     def test_file_not_found_error(self, os_remove, pb, dml):

+         rpminfo = 2

+         os_remove.side_effect = FileNotFoundError()

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+         r = kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

+         self.assertEqual(r, None)

+         self.assertEqual(len(self.queries), 0)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

+         self.get_build.assert_called_once_with(self.rinfo['build_id'])

+ 

+     @mock.patch('kojihub._dml')

+     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

+     @mock.patch('os.remove', side_effect=OSError)

+     def test_not_valid(self, os_remove, pb, dml):

+         rpminfo = 2

+         filepath = 'fakebuildpath/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm'

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+         expected_msg = "File %s cannot be deleted." % filepath

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.assertEqual(len(self.queries), 0)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

+         self.get_build.assert_called_once_with(self.rinfo['build_id'])

+ 

+     @mock.patch('kojihub._dml')

+     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

+     @mock.patch('os.remove')

+     def test_valid(self, os_remove, pb, dml):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+         kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

+         self.get_build.assert_called_once_with(self.rinfo['build_id'])

Metadata Update from @jobrauer:
- Pull-request tagged with: testing-ready

2 years ago

I might also want to address Jonas's concern about the redundant sanity check, but this should fix the key issues that were mentioned to me

Metadata Update from @jobrauer:
- Pull-request untagged with: testing-ready

2 years ago

Pull-Request has been closed by tkopecek

2 years ago