| |
@@ -11,7 +11,6 @@
|
| |
#
|
| |
# This program requires koji and sigul installed, as well as configured.
|
| |
|
| |
- from cStringIO import StringIO
|
| |
import os
|
| |
import optparse
|
| |
import sys
|
| |
@@ -22,8 +21,6 @@
|
| |
import tempfile
|
| |
import logging
|
| |
|
| |
- import gpg
|
| |
-
|
| |
errors = {}
|
| |
|
| |
status = 0
|
| |
@@ -72,76 +69,6 @@
|
| |
}
|
| |
|
| |
|
| |
- class GPGContext(object):
|
| |
- """ Use it like:
|
| |
-
|
| |
- with GPGContext() as ctx:
|
| |
- #do_stuff(ctx)
|
| |
- """
|
| |
- config = """no-auto-check-trustdb
|
| |
- trust-model direct
|
| |
- no-expensive-trust-checks
|
| |
- no-use-agent
|
| |
- cipher-algo AES256
|
| |
- digest-algo SHA512
|
| |
- s2k-digest-algo SHA512
|
| |
- cipher-algo AES256
|
| |
- compress-algo zlib
|
| |
- """
|
| |
-
|
| |
- def __init__(self, create_homedir=True):
|
| |
- self.tempdir = tempfile.gettempdir()
|
| |
- self.create_homedir = create_homedir
|
| |
-
|
| |
- ctx = gpg.Context()
|
| |
- gpg_homedir = None
|
| |
- if create_homedir:
|
| |
- gpg_homedir = tempfile.mkdtemp(prefix="temporary_gpg_homedir_")
|
| |
- # FIXME: Hardcoded gpg path
|
| |
- ctx.set_engine_info(gpg.constants.PROTOCOL_OpenPGP,
|
| |
- "/usr/bin/gpg", gpg_homedir)
|
| |
-
|
| |
- with open(os.path.join(gpg_homedir, 'gpg.conf'),
|
| |
- 'wb', 0755) as gpg_config_fp:
|
| |
- gpg_config_fp.write(self.config)
|
| |
-
|
| |
- self.ctx = ctx
|
| |
- self.gpg_homedir = gpg_homedir
|
| |
-
|
| |
- def __enter__(self):
|
| |
- return self.ctx
|
| |
-
|
| |
- def __exit__(self, type_, value, traceback):
|
| |
- gpg_homedir = self.gpg_homedir
|
| |
- if self.create_homedir and \
|
| |
- os.path.abspath(gpg_homedir).startswith(self.tempdir):
|
| |
- shutil.rmtree(gpg_homedir, ignore_errors=True)
|
| |
-
|
| |
-
|
| |
- def get_key_info(source, filename=False):
|
| |
- with GPGContext() as ctx:
|
| |
- if filename is not None:
|
| |
- with open(source, 'r') as ifile:
|
| |
- source = ifile.read()
|
| |
- ctx.op_import_source)
|
| |
- import_result = ctx.op_import_result()
|
| |
-
|
| |
- if import_result.imported != 1:
|
| |
- raise ValueError(
|
| |
- "{0} does not contain exactly one GPG key".format(filename))
|
| |
-
|
| |
- imported_fpr = import_result.imports[0].fpr
|
| |
- key = ctx.get_key(imported_fpr)
|
| |
- first_subkey = key.subkeys[0]
|
| |
- keyid = first_subkey.keyid[-8:].lower()
|
| |
- if first_subkey.pubkey_algo == gpg.constants.PK_DSA:
|
| |
- v3 = False
|
| |
- else:
|
| |
- v3 = True
|
| |
-
|
| |
- return keyid, v3
|
| |
-
|
| |
-
|
| |
def get_gpg_agent_passphrase(cache_id, ask=False, error_message="X",
|
| |
prompt="X", description=None):
|
| |
gpg_agent = subprocess.Popen(
|
| |
@@ -362,7 +289,8 @@
|
| |
ret, pubkey, stderr = self.get_public_key()
|
| |
if ret != 0:
|
| |
raise ValueError("Invalid key or password: " + stderr)
|
| |
- self.keyid, self.v3 = get_key_info(pubkey)
|
| |
+ self.keyid = KEYS[key]['id']
|
| |
+ self.v3 = KEYS[key]['v3']
|
| |
|
| |
def get_public_key(self):
|
| |
command = self.build_cmdline('get-public-key', self.key)
|
| |
Signed-off-by: Patrick Uiterwijk puiterwijk@redhat.com