From c7ea56c049ec8ab1a5500852eca6faf750b1479f Mon Sep 17 00:00:00 2001 From: Fraser Tweedale Date: Dec 12 2016 12:03:15 +0000 Subject: Add function for extracting PEM certs from PKCS #7 Add a single function for extracting X.509 certs in PEM format from a PKCS #7 object. Refactor sites that execute ``openssl pkcs7`` to use the new function. Part of: https://fedorahosted.org/freeipa/ticket/6178 Reviewed-By: Jan Cholasta Reviewed-By: Tomas Krizek --- diff --git a/ipalib/x509.py b/ipalib/x509.py index e1c3867..851af5a 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -49,6 +49,14 @@ from ipalib import api from ipalib import util from ipalib import errors from ipapython.dn import DN +from ipapython import ipautil + +try: + from ipaplatform.paths import paths +except ImportError: + OPENSSL = '/usr/bin/openssl' +else: + OPENSSL = paths.OPENSSL if six.PY3: unicode = str @@ -56,7 +64,9 @@ if six.PY3: PEM = 0 DER = 1 -PEM_REGEX = re.compile(r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)', re.DOTALL) +PEM_REGEX = re.compile( + r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', + re.DOTALL) EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1' EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2' @@ -145,6 +155,23 @@ def load_certificate_list_from_file(filename): return load_certificate_list(f.read()) +def pkcs7_to_pems(data, datatype=PEM): + """ + Extract certificates from a PKCS #7 object. + + Return a ``list`` of X.509 PEM strings. + + May throw ``ipautil.CalledProcessError`` on invalid data. + + """ + cmd = [ + OPENSSL, "pkcs7", "-print_certs", + "-inform", "PEM" if datatype == PEM else "DER", + ] + result = ipautil.run(cmd, stdin=data, capture_output=True) + return PEM_REGEX.findall(result.output) + + def is_self_signed(certificate, datatype=PEM): cert = load_certificate(certificate, datatype) return cert.issuer == cert.subject diff --git a/ipapython/certdb.py b/ipapython/certdb.py index 4e05b78..4fbbbd9 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -239,13 +239,8 @@ class NSSDatabase(object): continue if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): - args = [ - OPENSSL, 'pkcs7', - '-print_certs', - ] try: - result = ipautil.run( - args, stdin=body, capture_output=True) + certs = x509.pkcs7_to_pems(body) except ipautil.CalledProcessError as e: if label == 'CERTIFICATE': root_logger.warning( @@ -257,7 +252,7 @@ class NSSDatabase(object): filename, line, e) continue else: - extracted_certs += result.output + '\n' + extracted_certs += '\n'.join(certs) + '\n' loaded = True continue diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index bf79821..29acd7e 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -749,44 +749,30 @@ class CAInstance(DogtagInstance): # makes openssl throw up. data = base64.b64decode(chain) - result = ipautil.run( - [paths.OPENSSL, - "pkcs7", - "-inform", - "DER", - "-print_certs", - ], stdin=data, capture_output=True) - certlist = result.output + certlist = x509.pkcs7_to_pems(data, x509.DER) # Ok, now we have all the certificates in certs, walk through it # and pull out each certificate and add it to our database - st = 1 - en = 0 - subid = 0 ca_dn = DN(('CN','Certificate Authority'), self.subject_base) - while st > 0: - st = certlist.find('-----BEGIN', en) - en = certlist.find('-----END', en+1) - if st > 0: - try: - (chain_fd, chain_name) = tempfile.mkstemp() - os.write(chain_fd, certlist[st:en+25]) - os.close(chain_fd) - (_rdn, subject_dn) = certs.get_cert_nickname(certlist[st:en+25]) - if subject_dn == ca_dn: - nick = get_ca_nickname(self.realm) - trust_flags = 'CT,C,C' - else: - nick = str(subject_dn) - trust_flags = ',,' - self.__run_certutil( - ['-A', '-t', trust_flags, '-n', nick, '-a', - '-i', chain_name] - ) - finally: - os.remove(chain_name) - subid += 1 + for cert in certlist: + try: + chain_fd, chain_name = tempfile.mkstemp() + os.write(chain_fd, cert) + os.close(chain_fd) + (_rdn, subject_dn) = certs.get_cert_nickname(cert) + if subject_dn == ca_dn: + nick = get_ca_nickname(self.realm) + trust_flags = 'CT,C,C' + else: + nick = str(subject_dn) + trust_flags = ',,' + self.__run_certutil( + ['-A', '-t', trust_flags, '-n', nick, '-a', + '-i', chain_name] + ) + finally: + os.remove(chain_name) # Restore NSS trust flags of all previously existing certificates for nick, trust_flags in cert_backup_list: