From b5732efda6d27f588adbc3f41259bc4511716f43 Mon Sep 17 00:00:00 2001 From: Stanislav Laznicka Date: Jul 27 2017 08:28:58 +0000 Subject: x509: Make certificates represented as objects https://pagure.io/freeipa/issue/4985 Reviewed-By: Fraser Tweedale Reviewed-By: Rob Crittenden Reviewed-By: Martin Basti --- diff --git a/install/certmonger/dogtag-ipa-ca-renew-agent-submit b/install/certmonger/dogtag-ipa-ca-renew-agent-submit index 3d3e791..7d7ff9a 100755 --- a/install/certmonger/dogtag-ipa-ca-renew-agent-submit +++ b/install/certmonger/dogtag-ipa-ca-renew-agent-submit @@ -31,7 +31,6 @@ import syslog import traceback import tempfile import shutil -import base64 import contextlib import json @@ -266,8 +265,8 @@ def store_cert(**kwargs): cert = os.environ.get('CERTMONGER_CERTIFICATE') if not cert: return (REJECTED, "New certificate requests not supported") - - dercert = x509.normalize_certificate(cert) + cert = x509.load_pem_x509_certificate(cert) + dercert = cert.public_bytes(x509.Encoding.DER) dn = DN(('cn', nickname), ('cn', 'ca_renewal'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn) @@ -365,7 +364,8 @@ def retrieve_or_reuse_cert(**kwargs): if not nickname: return (REJECTED, "Nickname could not be determined") - cert = os.environ.get('CERTMONGER_CERTIFICATE') + cert = x509.load_pem_x509_certificate( + fix_pem(os.environ.get('CERTMONGER_CERTIFICATE'))) # TODO: the fix_pem somehow got there early, so making this comment way too long to get rid of it later if not cert: return (REJECTED, "New certificate requests not supported") @@ -378,11 +378,10 @@ def retrieve_or_reuse_cert(**kwargs): except errors.NotFound: pass else: - cert = entry.single_value['usercertificate'] - cert = base64.b64encode(cert) - cert = x509.make_pem(cert) + cert = x509.load_der_x509_certificate( + entry.single_value['usercertificate']) - return (ISSUED, cert) + return (ISSUED, cert.public_bytes(x509.Encoding.PEM)) def retrieve_cert_continuous(reuse_existing, **kwargs): @@ -392,7 +391,7 @@ def retrieve_cert_continuous(reuse_existing, **kwargs): """ old_cert = os.environ.get('CERTMONGER_CERTIFICATE') if old_cert: - old_cert = x509.normalize_certificate(old_cert) + old_cert = x509.load_pem_x509_certificate(old_cert) result = call_handler(retrieve_or_reuse_cert, reuse_existing=reuse_existing, @@ -400,7 +399,7 @@ def retrieve_cert_continuous(reuse_existing, **kwargs): if result[0] != ISSUED or reuse_existing: return result - new_cert = x509.normalize_certificate(result[1]) + new_cert = x509.load_pem_x509_certificate(result[1]) if new_cert == old_cert: syslog.syslog(syslog.LOG_INFO, "Updated certificate not available") # No cert available yet, tell certmonger to wait another 8 hours @@ -431,7 +430,8 @@ def renew_ca_cert(reuse_existing, **kwargs): cert = os.environ.get('CERTMONGER_CERTIFICATE') if not cert: return (REJECTED, "New certificate requests not supported") - is_self_signed = x509.is_self_signed(cert) + cert = x509.load_pem_x509_certificate(cert) + is_self_signed = cert.is_self_signed() operation = os.environ.get('CERTMONGER_OPERATION') if operation == 'SUBMIT': diff --git a/install/restart_scripts/renew_ca_cert b/install/restart_scripts/renew_ca_cert index bb31def..3071653 100644 --- a/install/restart_scripts/renew_ca_cert +++ b/install/restart_scripts/renew_ca_cert @@ -29,7 +29,7 @@ import traceback from ipalib.install import certstore from ipapython import ipautil -from ipalib import api, errors, x509 +from ipalib import api, errors from ipalib.install.kinit import kinit_keytab from ipaserver.install import certs, cainstance, installutils from ipaserver.plugins.ldap2 import ldap2 @@ -64,7 +64,7 @@ def _main(): # Fetch the new certificate db = certs.CertDB(api.env.realm, nssdir=paths.PKI_TOMCAT_ALIAS_DIR) - cert = db.get_cert_from_db(nickname, pem=False) + cert = db.get_cert_from_db(nickname) if not cert: syslog.syslog(syslog.LOG_ERR, 'No certificate %s found.' % nickname) sys.exit(1) @@ -106,7 +106,7 @@ def _main(): cfg_path, 'subsystem.select', '=') if config == 'New': syslog.syslog(syslog.LOG_NOTICE, "Updating CS.cfg") - if x509.is_self_signed(cert, x509.DER): + if cert.is_self_signed(): installutils.set_directive( cfg_path, 'hierarchy.select', 'Root', quotes=False, separator='=') diff --git a/install/tools/ipa-replica-conncheck b/install/tools/ipa-replica-conncheck index 1a50c4b..cb08ad3 100755 --- a/install/tools/ipa-replica-conncheck +++ b/install/tools/ipa-replica-conncheck @@ -47,7 +47,6 @@ from socket import SOCK_STREAM, SOCK_DGRAM import distutils.spawn from ipaplatform.paths import paths import gssapi -from cryptography.hazmat.primitives import serialization logger = logging.getLogger(os.path.basename(__file__)) @@ -544,10 +543,8 @@ def main(): ca_certs = x509.load_certificate_list_from_file( options.ca_cert_file) for ca_cert in ca_certs: - data = ca_cert.public_bytes( - serialization.Encoding.DER) nss_db.add_cert( - data, + ca_cert, str(DN(ca_cert.subject)), certdb.EXTERNAL_CA_TRUST_FLAGS) diff --git a/ipaclient/install/client.py b/ipaclient/install/client.py index 8566346..b7293bb 100644 --- a/ipaclient/install/client.py +++ b/ipaclient/install/client.py @@ -27,7 +27,6 @@ import tempfile import time import traceback -from cryptography.hazmat.primitives import serialization # pylint: disable=import-error from six.moves.configparser import RawConfigParser from six.moves.urllib.parse import urlparse, urlunparse @@ -1647,9 +1646,7 @@ def get_ca_certs_from_ldap(server, basedn, realm): logger.debug("get_ca_certs_from_ldap() error: %s", e) raise - certs = [x509.load_der_x509_certificate(c[0]) for c in certs - if c[2] is not False] - + certs = [c[0] for c in certs if c[2] is not False] return certs @@ -1830,10 +1827,6 @@ def get_ca_certs(fstore, options, server, basedn, realm): if ca_certs is not None: try: - ca_certs = [ - cert.public_bytes(serialization.Encoding.DER) - for cert in ca_certs - ] x509.write_certificate_list(ca_certs, ca_file) except Exception as e: if os.path.exists(ca_file): @@ -2680,10 +2673,6 @@ def _install(options): # Add CA certs to a temporary NSS database ca_certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT) - ca_certs = [ - cert.public_bytes(serialization.Encoding.DER) - for cert in ca_certs - ] try: tmp_db.create_db() diff --git a/ipaclient/plugins/ca.py b/ipaclient/plugins/ca.py index fe9c55f..c48f76e 100644 --- a/ipaclient/plugins/ca.py +++ b/ipaclient/plugins/ca.py @@ -2,7 +2,6 @@ # Copyright (C) 2016 FreeIPA Contributors see COPYING for license # -import base64 from ipaclient.frontend import MethodOverride from ipalib import errors, util, x509, Str from ipalib.plugable import Registry @@ -34,15 +33,15 @@ class WithCertOutArgs(MethodOverride): result = super(WithCertOutArgs, self).forward(*keys, **options) if filename: - def to_pem(x): - return x509.make_pem(x) if options.get('chain', False): - ders = result['result']['certificate_chain'] - data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders) + certs = (x509.load_der_x509_certificate(c) + for c in result['result']['certificate_chain']) else: - data = to_pem(result['result']['certificate']) - with open(filename, 'wb') as f: - f.write(data) + certs = [ + x509.load_der_x509_certificate( + result['result']['certificate']) + ] + x509.write_certificate_list(certs, filename) return result diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py index e318c55..860f925 100644 --- a/ipaclient/plugins/cert.py +++ b/ipaclient/plugins/cert.py @@ -66,10 +66,9 @@ class CertRetrieveOverride(MethodOverride): certs = result['result']['certificate_chain'] else: certs = [result['result']['certificate']] - certs = (x509.ensure_der_format(cert) for cert in certs) - certs = (x509.make_pem(base64.b64encode(cert)) for cert in certs) - with open(certificate_out, 'w') as f: - f.write('\n'.join(certs)) + certs = (x509.load_der_x509_certificate( + x509.ensure_der_format(cert)) for cert in certs) + x509.write_certificate_list(certs, certificate_out) return result diff --git a/ipaclient/plugins/host.py b/ipaclient/plugins/host.py index 7d8b92d..e3ae7fa 100644 --- a/ipaclient/plugins/host.py +++ b/ipaclient/plugins/host.py @@ -34,10 +34,9 @@ class host_show(MethodOverride): util.check_writable_file(options['out']) result = super(host_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - x509.write_certificate_list( - result['result']['usercertificate'], - options['out'] - ) + certs = (x509.load_der_x509_certificate(c) + for c in result['result']['usercertificate']) + x509.write_certificate_list(certs, options['out']) result['summary'] = ( _('Certificate(s) stored in file \'%(file)s\'') % dict(file=options['out']) diff --git a/ipaclient/plugins/service.py b/ipaclient/plugins/service.py index c45a2f2..a48a76d 100644 --- a/ipaclient/plugins/service.py +++ b/ipaclient/plugins/service.py @@ -36,10 +36,9 @@ class service_show(MethodOverride): util.check_writable_file(options['out']) result = super(service_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - x509.write_certificate_list( - result['result']['usercertificate'], - options['out'] - ) + certs = (x509.load_der_x509_certificate(c) + for c in result['result']['usercertificate']) + x509.write_certificate_list(certs, options['out']) result['summary'] = ( _('Certificate(s) stored in file \'%(file)s\'') % dict(file=options['out']) diff --git a/ipaclient/plugins/user.py b/ipaclient/plugins/user.py index 19eecac..5af73b5 100644 --- a/ipaclient/plugins/user.py +++ b/ipaclient/plugins/user.py @@ -67,10 +67,9 @@ class user_show(MethodOverride): util.check_writable_file(options['out']) result = super(user_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - x509.write_certificate_list( - result['result']['usercertificate'], - options['out'] - ) + certs = (x509.load_der_x509_certificate(c) + for c in result['result']['usercertificate']) + x509.write_certificate_list(certs, options['out']) result['summary'] = ( _('Certificate(s) stored in file \'%(file)s\'') % dict(file=options['out']) diff --git a/ipaclient/plugins/vault.py b/ipaclient/plugins/vault.py index 022c5b7..398e401 100644 --- a/ipaclient/plugins/vault.py +++ b/ipaclient/plugins/vault.py @@ -624,15 +624,15 @@ class vaultconfig_show(MethodOverride): response = super(vaultconfig_show, self).forward(*args, **options) # cache transport certificate - transport_cert = x509.load_certificate( - response['result']['transport_cert'], x509.DER) + transport_cert = x509.load_der_x509_certificate( + response['result']['transport_cert']) _transport_cert_cache.store_cert( self.api.env.domain, transport_cert ) if file: - with open(file, 'w') as f: + with open(file, 'wb') as f: f.write(response['result']['transport_cert']) return response diff --git a/ipalib/install/certstore.py b/ipalib/install/certstore.py index 0d0902f..89302f6 100644 --- a/ipalib/install/certstore.py +++ b/ipalib/install/certstore.py @@ -28,13 +28,13 @@ from ipapython.dn import DN from ipapython.certdb import get_ca_nickname, TrustFlags from ipalib import errors, x509 -def _parse_cert(dercert): + +def _parse_cert(cert): try: - cert = x509.load_der_x509_certificate(dercert) subject = DN(cert.subject) issuer = DN(cert.issuer) serial_number = cert.serial_number - public_key_info = x509.get_der_public_key_info(dercert, x509.DER) + public_key_info = cert.public_key_info_bytes except (ValueError, PyAsn1Error) as e: raise ValueError("failed to decode certificate: %s" % e) @@ -45,15 +45,15 @@ def _parse_cert(dercert): return subject, issuer_serial, public_key_info -def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): +def init_ca_entry(entry, cert, nickname, trusted, ext_key_usage): """ Initialize certificate store entry for a CA certificate. """ - subject, issuer_serial, public_key = _parse_cert(dercert) + subject, issuer_serial, public_key = _parse_cert(cert) if ext_key_usage is not None: try: - cert_eku = x509.get_ext_key_usage(dercert, x509.DER) + cert_eku = cert.extended_key_usage except ValueError as e: raise ValueError("failed to decode certificate: %s" % e) if cert_eku is not None: @@ -68,7 +68,7 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): entry['ipaCertSubject'] = [subject] entry['ipaCertIssuerSerial'] = [issuer_serial] entry['ipaPublicKey'] = [public_key] - entry['cACertificate;binary'] = [dercert] + entry['cACertificate;binary'] = [cert.public_bytes(x509.Encoding.DER)] if trusted is not None: entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted'] @@ -79,11 +79,12 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): entry['ipaKeyExtUsage'] = ext_key_usage -def update_compat_ca(ldap, base_dn, dercert): +def update_compat_ca(ldap, base_dn, cert): """ Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX. """ dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn) + dercert = cert.public_bytes(x509.Encoding.DER) try: entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary']) entry.single_value['cACertificate;binary'] = dercert @@ -152,12 +153,12 @@ def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) -def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, +def update_ca_cert(ldap, base_dn, cert, trusted=None, ext_key_usage=None, config_ipa=False, config_compat=False): """ Update existing entry for a CA certificate in the certificate store. """ - subject, issuer_serial, public_key = _parse_cert(dercert) + subject, issuer_serial, public_key = _parse_cert(cert) filter = ldap.make_filter({'ipaCertSubject': subject}) result, _truncated = ldap.find_entries( @@ -172,7 +173,7 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, for old_cert in entry['cACertificate;binary']: # Check if we are adding a new cert - if old_cert == dercert: + if old_cert == cert: break else: # We are adding a new cert, validate it @@ -181,7 +182,8 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, if entry.single_value['ipaPublicKey'] != public_key: raise ValueError("subject public key info mismatch") entry['ipaCertIssuerSerial'].append(issuer_serial) - entry['cACertificate;binary'].append(dercert) + entry['cACertificate;binary'].append( + cert.public_bytes(x509.Encoding.DER)) # Update key trust if trusted is not None: @@ -217,22 +219,24 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, entry.setdefault('ipaConfigString', []).append('compatCA') if is_compat or config_compat: - update_compat_ca(ldap, base_dn, dercert) + update_compat_ca(ldap, base_dn, cert) ldap.update_entry(entry) clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) -def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, +def put_ca_cert(ldap, base_dn, cert, nickname, trusted=None, ext_key_usage=None, config_ipa=False, config_compat=False): """ Add or update entry for a CA certificate in the certificate store. + + :param cert: IPACertificate """ try: - update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage, + update_ca_cert(ldap, base_dn, cert, trusted, ext_key_usage, config_ipa=config_ipa, config_compat=config_compat) except errors.NotFound: - add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + add_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage, config_ipa=config_ipa, config_compat=config_compat) except errors.EmptyModlist: pass @@ -306,11 +310,12 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, for cert in entry.get('cACertificate;binary', []): try: - _parse_cert(cert) + cert_obj = x509.load_der_x509_certificate(cert) + _parse_cert(cert_obj) except ValueError: certs = [] break - certs.append((cert, nickname, trusted, ext_key_usage)) + certs.append((cert_obj, nickname, trusted, ext_key_usage)) except errors.NotFound: try: ldap.get_entry(container_dn, ['']) @@ -319,7 +324,8 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, dn = DN(('cn', 'CAcert'), config_dn) entry = ldap.get_entry(dn, ['cACertificate;binary']) - cert = entry.single_value['cACertificate;binary'] + cert = x509.load_der_x509_certificate( + entry.single_value['cACertificate;binary']) try: subject, _issuer_serial, _public_key_info = _parse_cert(cert) except ValueError: @@ -354,16 +360,18 @@ def key_policy_to_trust_flags(trusted, ca, ext_key_usage): return TrustFlags(False, trusted, ca, ext_key_usage) -def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags, +def put_ca_cert_nss(ldap, base_dn, cert, nickname, trust_flags, config_ipa=False, config_compat=False): """ Add or update entry for a CA certificate in the certificate store. + + :param cert: IPACertificate """ trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags) if ca is False: raise ValueError("must be CA certificate") - put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + put_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage, config_ipa, config_compat) diff --git a/ipalib/x509.py b/ipalib/x509.py index 5bec5ae..dc7a2eb 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -39,10 +39,15 @@ import ssl import base64 import re +from cryptography import x509 as crypto_x509 +from cryptography import utils as crypto_utils from cryptography.hazmat.backends import default_backend -import cryptography.x509 +from cryptography.hazmat.primitives.serialization import ( + Encoding, PublicFormat +) from pyasn1.type import univ, char, namedtype, tag from pyasn1.codec.der import decoder, encoder +# from pyasn1.codec.native import decoder, encoder from pyasn1_modules import rfc2315, rfc2459 import six @@ -101,26 +106,319 @@ def strip_header(pem): return pem +@crypto_utils.register_interface(crypto_x509.Certificate) +class IPACertificate(object): + """ + A proxy class wrapping a python-cryptography certificate representation for + FreeIPA purposes + """ + def __init__(self, cert, backend=None): + """ + :param cert: A python-cryptography Certificate object + :param backend: A python-cryptography Backend object + """ + self._cert = cert + self.backend = default_backend() if backend is None else backend() + + # initialize the certificate fields + # we have to do it this way so that some systems don't explode since + # some field types encode-decoding is not strongly defined + self._subject = self.__get_der_field('subject') + self._issuer = self.__get_der_field('issuer') + + def __getstate__(self): + state = { + '_cert': self.public_bytes(Encoding.DER), + '_subject': self.subject_bytes, + '_issuer': self.issuer_bytes, + } + return state + + def __setstate__(self, state): + self._subject = state['_subject'] + self._issuer = state['_issuer'] + self._cert = crypto_x509.load_der_x509_certificate( + state['_cert'], backend=default_backend()) + + def __eq__(self, other): + """ + Checks equality. + + :param other: either cryptography.Certificate or IPACertificate or + bytes representing a DER-formatted certificate + """ + if (isinstance(other, (crypto_x509.Certificate, IPACertificate))): + return (self.public_bytes(Encoding.DER) == + other.public_bytes(Encoding.DER)) + elif isinstance(other, bytes): + return self.public_bytes(Encoding.DER) == other + else: + return False + + def __ne__(self, other): + """ + Checks not equal. + """ + return not self.__eq__(other) + + def __hash__(self): + """ + Computes a hash of the wrapped cryptography.Certificate. + """ + return hash(self._cert) + + def __encode_extension(self, oid, critical, value): + # TODO: have another proxy for crypto_x509.Extension which would + # provide public_bytes on the top of what python-cryptography has + ext = rfc2459.Extension() + # TODO: this does not have to be so weird, pyasn1 now has codecs + # which are capable of providing python-native types + ext['extnID'] = univ.ObjectIdentifier(oid) + ext['critical'] = univ.Boolean(critical) + ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value))) + ext = encoder.encode(ext) + return ext + + def __get_pyasn1_field(self, field): + """ + :returns: a field of the certificate in pyasn1 representation + """ + cert_bytes = self.tbs_certificate_bytes + cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0] + field = cert[field] + return field + + def __get_der_field(self, field): + """ + :field: the name of the field of the certificate + :returns: bytes representing the value of a certificate field + """ + return encoder.encode(self.__get_pyasn1_field(field)) + + def public_bytes(self, encoding): + """ + Serializes the certificate to PEM or DER format. + """ + return self._cert.public_bytes(encoding) + + def is_self_signed(self): + """ + :returns: True if this certificate is self-signed, False otherwise + """ + return self._cert.issuer == self._cert.subject + + def fingerprint(self, algorithm): + """ + Counts fingerprint of the wrapped cryptography.Certificate + """ + return self._cert.fingerprint(algorithm) + + @property + def serial_number(self): + return self._cert.serial_number + + @property + def version(self): + return self._cert.version + + @property + def subject(self): + return self._cert.subject + + @property + def subject_bytes(self): + return self._subject + + @property + def signature_hash_algorithm(self): + """ + Returns a HashAlgorithm corresponding to the type of the digest signed + in the certificate. + """ + return self._cert.signature_hash_algorithm + + @property + def signature_algorithm_oid(self): + """ + Returns the ObjectIdentifier of the signature algorithm. + """ + return self._cert.signature_algorithm_oid + + @property + def signature(self): + """ + Returns the signature bytes. + """ + return self._cert.signature + + @property + def issuer(self): + return self._cert.issuer + + @property + def issuer_bytes(self): + return self._issuer + + @property + def not_valid_before(self): + return self._cert.not_valid_before + + @property + def not_valid_after(self): + return self._cert.not_valid_after + + @property + def tbs_certificate_bytes(self): + return self._cert.tbs_certificate_bytes + + @property + def extensions(self): + # TODO: own Extension and Extensions classes proxying + # python-cryptography + return self._cert.extensions + + def public_key(self): + return self._cert.public_key() + + @property + def public_key_info_bytes(self): + return self._cert.public_key().public_bytes( + encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) + + @property + def extended_key_usage(self): + try: + ext_key_usage = self._cert.extensions.get_extension_for_oid( + crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value + except crypto_x509.ExtensionNotFound: + return None + + return set(oid.dotted_string for oid in ext_key_usage) + + @property + def extended_key_usage_bytes(self): + ekurfc = rfc2459.ExtKeyUsageSyntax() + eku = self.extended_key_usage or {EKU_PLACEHOLDER} + for i, oid in enumerate(eku): + ekurfc[i] = univ.ObjectIdentifier(oid) + ekurfc = encoder.encode(ekurfc) + return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc) + + @property + def san_general_names(self): + """ + Return SAN general names from a python-cryptography + certificate object. If the SAN extension is not present, + return an empty sequence. + + Because python-cryptography does not yet provide a way to + handle unrecognised critical extensions (which may occur), + we must parse the certificate and extract the General Names. + For uniformity with other code, we manually construct values + of python-crytography GeneralName subtypes. + + python-cryptography does not yet provide types for + ediPartyName or x400Address, so we drop these name types. + + otherNames are NOT instantiated to more specific types where + the type is known. Use ``process_othernames`` to do that. + + When python-cryptography can handle certs with unrecognised + critical extensions and implements ediPartyName and + x400Address, this function (and helpers) will be redundant + and should go away. + + """ + gns = self.__pyasn1_get_san_general_names() + + GENERAL_NAME_CONSTRUCTORS = { + 'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)), + 'dNSName': lambda x: crypto_x509.DNSName(unicode(x)), + 'directoryName': _pyasn1_to_cryptography_directoryname, + 'registeredID': _pyasn1_to_cryptography_registeredid, + 'iPAddress': _pyasn1_to_cryptography_ipaddress, + 'uniformResourceIdentifier': + lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)), + 'otherName': _pyasn1_to_cryptography_othername, + } + + result = [] + + for gn in gns: + gn_type = gn.getName() + if gn_type in GENERAL_NAME_CONSTRUCTORS: + result.append( + GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent())) + + return result + + def __pyasn1_get_san_general_names(self): + # pyasn1 returns None when the key is not present in the certificate + # but we need an iterable + extensions = self.__get_pyasn1_field('extensions') or [] + OID_SAN = univ.ObjectIdentifier('2.5.29.17') + gns = [] + for ext in extensions: + if ext['extnID'] == OID_SAN: + der = decoder.decode( + ext['extnValue'], asn1Spec=univ.OctetString())[0] + gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0] + break + return gns + + @property + def san_a_label_dns_names(self): + gns = self.__pyasn1_get_san_general_names() + result = [] + + for gn in gns: + if gn.getName() == 'dNSName': + result.append(unicode(gn.getComponent())) + + return result + + def match_hostname(self, hostname): + match_cert = {} + + match_cert['subject'] = match_subject = [] + for rdn in self._cert.subject.rdns: + match_rdn = [] + for ava in rdn: + if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME: + match_rdn.append(('commonName', ava.value)) + match_subject.append(match_rdn) + + values = self.san_a_label_dns_names + if values: + match_cert['subjectAltName'] = match_san = [] + for value in values: + match_san.append(('DNS', value)) + + ssl.match_hostname(match_cert, DNSName(hostname).ToASCII()) + + def load_pem_x509_certificate(data): """ Load an X.509 certificate in PEM format. - :returns: a python-cryptography ``Certificate`` object. + :returns: a ``IPACertificate`` object. :raises: ``ValueError`` if unable to load the certificate. """ - return crypto_x509.load_pem_x509_certificate(data, - backend=default_backend()) + return IPACertificate( + crypto_x509.load_pem_x509_certificate(data, backend=default_backend()) + ) def load_der_x509_certificate(data): """ Load an X.509 certificate in DER format. - :returns: a python-cryptography ``Certificate`` object. + :returns: a ``IPACertificate`` object. :raises: ``ValueError`` if unable to load the certificate. """ - return crypto_x509.load_der_x509_certificate(data, - backend=default_backend()) + return IPACertificate( + crypto_x509.load_der_x509_certificate(data, backend=default_backend()) + ) def load_certificate_from_file(filename, dbdir=None): @@ -138,7 +436,6 @@ def load_certificate_list(data): Load a certificate list from a sequence of concatenated PEMs. Return a list of python-cryptography ``Certificate`` objects. - """ certs = PEM_REGEX.findall(data) return [load_pem_x509_certificate(cert) for cert in certs] @@ -155,11 +452,11 @@ def load_certificate_list_from_file(filename): return load_certificate_list(f.read()) -def pkcs7_to_pems(data, datatype=PEM): +def pkcs7_to_certs(data, datatype=PEM): """ Extract certificates from a PKCS #7 object. - Return a ``list`` of X.509 PEM strings. + :returns: a ``list`` of ``IPACertificate`` objects. """ if datatype == PEM: match = re.match( @@ -187,62 +484,12 @@ def pkcs7_to_pems(data, datatype=PEM): for certificate in signed_data['certificates']: certificate = encoder.encode(certificate) - certificate = base64.b64encode(certificate) - certificate = make_pem(certificate) + certificate = load_der_x509_certificate(certificate) result.append(certificate) return result -def is_self_signed(certificate, datatype=PEM): - cert = load_certificate(certificate, datatype) - return cert.issuer == cert.subject - - -def _get_der_field(cert, datatype, dbdir, field): - cert = normalize_certificate(cert) - cert = decoder.decode(cert, rfc2459.Certificate())[0] - field = cert['tbsCertificate'][field] - field = encoder.encode(field) - return field - -def get_der_subject(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'subject') - -def get_der_issuer(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'issuer') - -def get_der_serial_number(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'serialNumber') - -def get_der_public_key_info(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo') - - -def get_ext_key_usage(certificate, datatype=PEM): - cert = load_certificate(certificate, datatype) - try: - eku = cert.extensions.get_extension_for_oid( - cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value - except cryptography.x509.ExtensionNotFound: - return None - - return set(oid.dotted_string for oid in eku) - - -def make_pem(data): - """ - Convert a raw base64-encoded blob into something that looks like a PE - file with lines split to 64 characters and proper headers. - """ - if isinstance(data, bytes): - data = data.decode('ascii') - pemcert = '\r\n'.join([data[x:x+64] for x in range(0, len(data), 64)]) - return '-----BEGIN CERTIFICATE-----\n' + \ - pemcert + \ - '\n-----END CERTIFICATE-----' - - def ensure_der_format(rawcert): """ Incoming certificates should be DER-encoded. If not it is converted to @@ -254,8 +501,6 @@ def ensure_der_format(rawcert): if not rawcert: return None - rawcert = strip_header(rawcert) - try: if isinstance(rawcert, bytes): # base64 must work with utf-8, otherwise it is raw bin certificate @@ -299,58 +544,37 @@ def validate_der_x509_certificate(cert): raise errors.CertificateFormatError(error=str(e)) -def write_certificate(rawcert, filename): +def write_certificate(cert, filename): """ Write the certificate to a file in PEM format. The cert value can be either DER or PEM-encoded, it will be normalized to DER regardless, then back out to PEM. """ - dercert = normalize_certificate(rawcert) try: - fp = open(filename, 'w') - fp.write(make_pem(base64.b64encode(dercert))) - fp.close() + with open(filename, 'wb') as fp: + fp.write(cert.public_bytes(Encoding.PEM)) except (IOError, OSError) as e: raise errors.FileError(reason=str(e)) -def write_certificate_list(rawcerts, filename): + +def write_certificate_list(certs, filename): """ Write a list of certificates to a file in PEM format. - The cert values can be either DER or PEM-encoded, they will be normalized - to DER regardless, then back out to PEM. + :param certs: a list of IPACertificate objects to be written to a file + :param filename: a path to the file the certificates should be written into """ - dercerts = [normalize_certificate(rawcert) for rawcert in rawcerts] try: - with open(filename, 'w') as f: - for cert in dercerts: - cert = base64.b64encode(cert) - cert = make_pem(cert) - f.write(cert + '\n') + with open(filename, 'wb') as f: + for cert in certs: + f.write(cert.public_bytes(Encoding.PEM)) except (IOError, OSError) as e: raise errors.FileError(reason=str(e)) -def _encode_extension(oid, critical, value): - ext = rfc2459.Extension() - ext['extnID'] = univ.ObjectIdentifier(oid) - ext['critical'] = univ.Boolean(critical) - ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value))) - ext = encoder.encode(ext) - return ext - - -def encode_ext_key_usage(ext_key_usage): - eku = rfc2459.ExtKeyUsageSyntax() - for i, oid in enumerate(ext_key_usage): - eku[i] = univ.ObjectIdentifier(oid) - eku = encoder.encode(eku) - return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku) - - class _PrincipalName(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('name-type', univ.Integer().subtype( @@ -385,13 +609,13 @@ def _decode_krb5principalname(data): return name -class KRB5PrincipalName(cryptography.x509.general_name.OtherName): +class KRB5PrincipalName(crypto_x509.general_name.OtherName): def __init__(self, type_id, value): super(KRB5PrincipalName, self).__init__(type_id, value) self.name = _decode_krb5principalname(value) -class UPN(cryptography.x509.general_name.OtherName): +class UPN(crypto_x509.general_name.OtherName): def __init__(self, type_id, value): super(UPN, self).__init__(type_id, value) self.name = unicode( @@ -411,128 +635,48 @@ def process_othernames(gns): """ for gn in gns: - if isinstance(gn, cryptography.x509.general_name.OtherName): + if isinstance(gn, crypto_x509.general_name.OtherName): cls = OTHERNAME_CLASS_MAP.get( gn.type_id.dotted_string, - cryptography.x509.general_name.OtherName) + crypto_x509.general_name.OtherName) yield cls(gn.type_id, gn.value) else: yield gn -def _pyasn1_get_san_general_names(cert): - tbs = decoder.decode( - cert.tbs_certificate_bytes, - asn1Spec=rfc2459.TBSCertificate() - )[0] - OID_SAN = univ.ObjectIdentifier('2.5.29.17') - # One would expect KeyError or empty iterable when the key ('extensions' - # in this particular case) is not pressent in the certificate but pyasn1 - # returns None here - extensions = tbs['extensions'] or [] - gns = [] - for ext in extensions: - if ext['extnID'] == OID_SAN: - der = decoder.decode( - ext['extnValue'], asn1Spec=univ.OctetString())[0] - gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0] - break - - return gns - - -def get_san_general_names(cert): - """ - Return SAN general names from a python-cryptography - certificate object. If the SAN extension is not present, - return an empty sequence. - - Because python-cryptography does not yet provide a way to - handle unrecognised critical extensions (which may occur), - we must parse the certificate and extract the General Names. - For uniformity with other code, we manually construct values - of python-crytography GeneralName subtypes. - - python-cryptography does not yet provide types for - ediPartyName or x400Address, so we drop these name types. - - otherNames are NOT instantiated to more specific types where - the type is known. Use ``process_othernames`` to do that. - - When python-cryptography can handle certs with unrecognised - critical extensions and implements ediPartyName and - x400Address, this function (and helpers) will be redundant - and should go away. - - """ - gns = _pyasn1_get_san_general_names(cert) - - GENERAL_NAME_CONSTRUCTORS = { - 'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)), - 'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)), - 'directoryName': _pyasn1_to_cryptography_directoryname, - 'registeredID': _pyasn1_to_cryptography_registeredid, - 'iPAddress': _pyasn1_to_cryptography_ipaddress, - 'uniformResourceIdentifier': - lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)), - 'otherName': _pyasn1_to_cryptography_othername, - } - - result = [] - - for gn in gns: - gn_type = gn.getName() - if gn_type in GENERAL_NAME_CONSTRUCTORS: - result.append( - GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent())) - - return result - - def _pyasn1_to_cryptography_directoryname(dn): attrs = [] # Name is CHOICE { RDNSequence } (only one possibility) for rdn in dn.getComponent(): for ava in rdn: - attr = cryptography.x509.NameAttribute( + attr = crypto_x509.NameAttribute( _pyasn1_to_cryptography_oid(ava['type']), unicode(decoder.decode(ava['value'])[0]) ) attrs.append(attr) - return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs)) + return crypto_x509.DirectoryName(crypto_x509.Name(attrs)) def _pyasn1_to_cryptography_registeredid(oid): - return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid)) + return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid)) def _pyasn1_to_cryptography_ipaddress(octet_string): - return cryptography.x509.IPAddress( + return crypto_x509.IPAddress( ipaddress.ip_address(bytes(octet_string))) def _pyasn1_to_cryptography_othername(on): - return cryptography.x509.OtherName( + return crypto_x509.OtherName( _pyasn1_to_cryptography_oid(on['type-id']), bytes(on['value']) ) def _pyasn1_to_cryptography_oid(oid): - return cryptography.x509.ObjectIdentifier(str(oid)) - - -def get_san_a_label_dns_names(cert): - gns = _pyasn1_get_san_general_names(cert) - result = [] - - for gn in gns: - if gn.getName() == 'dNSName': - result.append(unicode(gn.getComponent())) - - return result + return crypto_x509.ObjectIdentifier(str(oid)) def chunk(size, s): @@ -574,23 +718,3 @@ def format_datetime(t): if t.tzinfo is None: t = t.replace(tzinfo=UTC()) return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z")) - - -def match_hostname(cert, hostname): - match_cert = {} - - match_cert['subject'] = match_subject = [] - for rdn in cert.subject.rdns: - match_rdn = [] - for ava in rdn: - if ava.oid == cryptography.x509.oid.NameOID.COMMON_NAME: - match_rdn.append(('commonName', ava.value)) - match_subject.append(match_rdn) - - values = get_san_a_label_dns_names(cert) - if values: - match_cert['subjectAltName'] = match_san = [] - for value in values: - match_san.append(('DNS', value)) - - ssl.match_hostname(match_cert, DNSName(hostname).ToASCII()) diff --git a/ipaplatform/redhat/tasks.py b/ipaplatform/redhat/tasks.py index 3352502..cc52c6c 100644 --- a/ipaplatform/redhat/tasks.py +++ b/ipaplatform/redhat/tasks.py @@ -30,7 +30,6 @@ import os import pwd import shutil import socket -import base64 import traceback import errno @@ -266,10 +265,10 @@ class RedHatTaskNamespace(BaseTaskNamespace): has_eku = set() for cert, nickname, trusted, ext_key_usage in ca_certs: try: - subject = x509.get_der_subject(cert, x509.DER) - issuer = x509.get_der_issuer(cert, x509.DER) - serial_number = x509.get_der_serial_number(cert, x509.DER) - public_key_info = x509.get_der_public_key_info(cert, x509.DER) + subject = cert.subject_bytes + issuer = cert.issuer_bytes + serial_number = cert.serial_number + public_key_info = cert.public_key_info_bytes except (PyAsn1Error, ValueError, CertificateError) as e: logger.warning( "Failed to decode certificate \"%s\": %s", nickname, e) @@ -278,12 +277,9 @@ class RedHatTaskNamespace(BaseTaskNamespace): label = urllib.parse.quote(nickname) subject = urllib.parse.quote(subject) issuer = urllib.parse.quote(issuer) - serial_number = urllib.parse.quote(serial_number) + serial_number = urllib.parse.quote(str(serial_number)) public_key_info = urllib.parse.quote(public_key_info) - cert = base64.b64encode(cert) - cert = x509.make_pem(cert) - obj = ("[p11-kit-object-v1]\n" "class: certificate\n" "certificate-type: x-509\n" @@ -302,14 +298,12 @@ class RedHatTaskNamespace(BaseTaskNamespace): obj += "trusted: true\n" elif trusted is False: obj += "x-distrusted: true\n" - obj += "%s\n\n" % cert + obj += "{pem}\n\n".format(pem=cert.public_bytes(x509.Encoding.PEM)) f.write(obj) if ext_key_usage is not None and public_key_info not in has_eku: - if not ext_key_usage: - ext_key_usage = {x509.EKU_PLACEHOLDER} try: - ext_key_usage = x509.encode_ext_key_usage(ext_key_usage) + ext_key_usage = cert.extended_key_usage_bytes except PyAsn1Error as e: logger.warning( "Failed to encode extended key usage for \"%s\": %s", diff --git a/ipapython/certdb.py b/ipapython/certdb.py index a2dfac0..32f383d 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -27,9 +27,7 @@ import re import tempfile from tempfile import NamedTemporaryFile import shutil -import base64 -from cryptography.hazmat.primitives import serialization import cryptography.x509 from ipapython.dn import DN @@ -91,7 +89,7 @@ def find_cert_from_txt(cert, start=0): trailing text, pull out just the certificate part. This will return the FIRST cert in a stream of data. - Returns a tuple (certificate, last position in cert) + :returns: a tuple (IPACertificate, last position in cert) """ s = cert.find('-----BEGIN CERTIFICATE-----', start) e = cert.find('-----END CERTIFICATE-----', s) @@ -101,7 +99,7 @@ def find_cert_from_txt(cert, start=0): if s < 0 or e < 0: raise RuntimeError("Unable to find certificate") - cert = cert[s:e] + cert = x509.load_pem_x509_certificate(cert[s:e].encode('utf-8')) return (cert, e) @@ -182,14 +180,10 @@ def unparse_trust_flags(trust_flags): def verify_kdc_cert_validity(kdc_cert, ca_certs, realm): - pem_kdc_cert = kdc_cert.public_bytes(serialization.Encoding.PEM) - pem_ca_certs = '\n'.join( - cert.public_bytes(serialization.Encoding.PEM) for cert in ca_certs) - with NamedTemporaryFile() as kdc_file, NamedTemporaryFile() as ca_file: - kdc_file.write(pem_kdc_cert) + kdc_file.write(kdc_cert.public_bytes(x509.Encoding.PEM)) kdc_file.flush() - ca_file.write(pem_ca_certs) + x509.write_certificate_list(ca_certs, ca_file.name) ca_file.flush() try: @@ -209,7 +203,7 @@ def verify_kdc_cert_validity(kdc_cert, ca_certs, realm): raise ValueError("invalid for a KDC") principal = str(Principal(['krbtgt', realm], realm)) - gns = x509.process_othernames(x509.get_san_general_names(kdc_cert)) + gns = x509.process_othernames(kdc_cert.san_general_names) for gn in gns: if isinstance(gn, x509.KRB5PrincipalName) and gn.name == principal: break @@ -458,7 +452,7 @@ class NSSDatabase(object): if label in ('CERTIFICATE', 'X509 CERTIFICATE', 'X.509 CERTIFICATE'): try: - x509.load_pem_x509_certificate(match.group(2)) + cert = x509.load_pem_x509_certificate(body) except ValueError as e: if label != 'CERTIFICATE': logger.warning( @@ -467,13 +461,13 @@ class NSSDatabase(object): filename, line, e) continue else: - extracted_certs.append(body) + extracted_certs.append(cert) loaded = True continue if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): try: - certs = x509.pkcs7_to_pems(body) + certs = x509.pkcs7_to_certs(body) except ipautil.CalledProcessError as e: if label == 'CERTIFICATE': logger.warning( @@ -521,7 +515,7 @@ class NSSDatabase(object): filename, line, e) continue else: - extracted_key = result.output + extracted_key = result.raw_output key_file = filename loaded = True continue @@ -531,12 +525,11 @@ class NSSDatabase(object): # Try to load the file as DER certificate try: - x509.load_der_x509_certificate(data) + cert = x509.load_der_x509_certificate(data) except ValueError: pass else: - data = x509.make_pem(base64.b64encode(data)) - extracted_certs.append(data) + extracted_certs.append(cert) continue # Try to import the file as PKCS#12 file @@ -576,34 +569,34 @@ class NSSDatabase(object): raise RuntimeError( "No server certificates found in %s" % (', '.join(files))) - for cert_pem in extracted_certs: - cert = x509.load_pem_x509_certificate(cert_pem) + for cert in extracted_certs: nickname = str(DN(cert.subject)) - data = cert.public_bytes(serialization.Encoding.DER) - self.add_cert(data, nickname, EMPTY_TRUST_FLAGS) + self.add_cert(cert, nickname, EMPTY_TRUST_FLAGS) if extracted_key: - in_file = ipautil.write_tmp_file( - '\n'.join(extracted_certs) + '\n' + extracted_key) - out_file = tempfile.NamedTemporaryFile() - out_password = ipautil.ipa_generate_password() - out_pwdfile = ipautil.write_tmp_file(out_password) - args = [ - OPENSSL, 'pkcs12', - '-export', - '-in', in_file.name, - '-out', out_file.name, - '-passin', 'file:' + self.pwd_file, - '-passout', 'file:' + out_pwdfile.name, - ] - try: - ipautil.run(args) - except ipautil.CalledProcessError as e: - raise RuntimeError( - "No matching certificate found for private key from %s" % - key_file) + with tempfile.NamedTemporaryFile(), tempfile.NamedTemporaryFile() \ + as (in_file, out_file): + x509.write_certificate_list(extracted_certs, in_file.name) + in_file.write(extracted_key) + in_file.flush() + out_password = ipautil.ipa_generate_password() + out_pwdfile = ipautil.write_tmp_file(out_password) + args = [ + OPENSSL, 'pkcs12', + '-export', + '-in', in_file.name, + '-out', out_file.name, + '-passin', 'file:' + self.pwd_file, + '-passout', 'file:' + out_pwdfile.name, + ] + try: + ipautil.run(args) + except ipautil.CalledProcessError as e: + raise RuntimeError( + "No matching certificate found for private key from " + "%s" % key_file) - self.import_pkcs12(out_file.name, out_password) + self.import_pkcs12(out_file.name, out_password) def trust_root_cert(self, root_nickname, trust_flags): if root_nickname[:7] == "Builtin": @@ -619,17 +612,13 @@ class NSSDatabase(object): raise RuntimeError( "Setting trust on %s failed" % root_nickname) - def get_cert(self, nickname, pem=False): + def get_cert(self, nickname): args = ['-L', '-n', nickname, '-a'] try: result = self.run_certutil(args, capture_output=True) except ipautil.CalledProcessError: raise RuntimeError("Failed to get %s" % nickname) - cert = result.output - if not pem: - cert, _start = find_cert_from_txt(cert, start=0) - cert = x509.strip_header(cert) - cert = base64.b64decode(cert) + cert, _start = find_cert_from_txt(result.output, start=0) return cert def has_nickname(self, nickname): @@ -643,9 +632,9 @@ class NSSDatabase(object): def export_pem_cert(self, nickname, location): """Export the given cert to PEM file in the given location""" - cert = self.get_cert(nickname, pem=True) - with open(location, "w+") as fd: - fd.write(cert) + cert = self.get_cert(nickname) + with open(location, "wb") as fd: + fd.write(cert.public_bytes(x509.Encoding.PEM)) os.chmod(location, 0o444) def import_pem_cert(self, nickname, flags, location): @@ -662,7 +651,7 @@ class NSSDatabase(object): ) cert, st = find_cert_from_txt(certs) - self.add_cert(cert, nickname, flags, pem=True) + self.add_cert(cert, nickname, flags) try: find_cert_from_txt(certs, st) @@ -672,12 +661,10 @@ class NSSDatabase(object): raise ValueError('%s contains more than one certificate' % location) - def add_cert(self, cert, nick, flags, pem=False): + def add_cert(self, cert, nick, flags): flags = unparse_trust_flags(flags) - args = ["-A", "-n", nick, "-t", flags] - if pem: - args.append("-a") - self.run_certutil(args, stdin=cert) + args = ["-A", "-n", nick, "-t", flags, '-a'] + self.run_certutil(args, stdin=cert.public_bytes(x509.Encoding.PEM)) def delete_cert(self, nick): self.run_certutil(["-D", "-n", nick]) @@ -688,7 +675,6 @@ class NSSDatabase(object): Raises a ValueError if the certificate is invalid. """ cert = self.get_cert(nickname) - cert = x509.load_der_x509_certificate(cert) try: self.run_certutil(['-V', '-n', nickname, '-u', 'V'], @@ -699,13 +685,12 @@ class NSSDatabase(object): raise ValueError(e.output) try: - x509.match_hostname(cert, hostname) + cert.match_hostname(hostname) except ValueError: raise ValueError('invalid for server %s' % hostname) def verify_ca_cert_validity(self, nickname): cert = self.get_cert(nickname) - cert = x509.load_der_x509_certificate(cert) if not cert.subject: raise ValueError("has empty subject") @@ -736,6 +721,5 @@ class NSSDatabase(object): def verify_kdc_cert_validity(self, nickname, realm): nicknames = self.get_trust_chain(nickname) certs = [self.get_cert(nickname) for nickname in nicknames] - certs = [x509.load_der_x509_certificate(cert) for cert in certs] verify_kdc_cert_validity(certs[-1], certs[:-1], realm) diff --git a/ipaserver/install/ca.py b/ipaserver/install/ca.py index 5b229c0..0616e75 100644 --- a/ipaserver/install/ca.py +++ b/ipaserver/install/ca.py @@ -320,7 +320,7 @@ def install_step_1(standalone, replica_config, options): subject_base=subject_base) dsdb = certs.CertDB( realm_name, nssdir=dirname, subject_base=subject_base) - cacert = cadb.get_cert_from_db('caSigningCert cert-pki-ca', pem=False) + cacert = cadb.get_cert_from_db('caSigningCert cert-pki-ca') nickname = certdb.get_ca_nickname(realm_name) trust_flags = certdb.IPA_CA_TRUST_FLAGS dsdb.add_cert(cacert, nickname, trust_flags) diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index 2396886..b11f560 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -791,24 +791,19 @@ class CAInstance(DogtagInstance): data = base64.b64decode(chain) # Get list of PEM certificates - certlist = x509.pkcs7_to_pems(data, x509.DER) + certlist = x509.pkcs7_to_certs(data, x509.DER) # We need to append the certs to the existing file, so start by # reading the file if ipautil.file_exists(paths.IPA_CA_CRT): ca_certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT) - ca_certs = [cert.public_bytes(serialization.Encoding.PEM) - for cert in ca_certs] certlist.extend(ca_certs) # We have all the certificates in certlist, write them to a PEM file for path in [paths.IPA_CA_CRT, paths.KDC_CA_BUNDLE_PEM, paths.CA_BUNDLE_PEM]: - with open(path, 'w') as ipaca_pem: - for cert in certlist: - ipaca_pem.write(cert) - ipaca_pem.write('\n') + x509.write_certificate_list(certlist, path) def __request_ra_certificate(self): # create a temp file storing the pwd diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index b2c7a77..210e7d2 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -58,9 +58,7 @@ def get_cert_nickname(cert): representation of the first RDN in the subject and subject_dn is a DN object. """ - cert_obj = x509.load_pem_x509_certificate(cert) - dn = DN(cert_obj.subject) - + dn = DN(cert.subject) return (str(dn[0]), dn) @@ -323,30 +321,20 @@ class CertDB(object): nick = get_ca_nickname(self.realm) else: nick = str(subject_dn) - self.nssdb.add_cert(cert, nick, trust_flags, pem=True) + self.nssdb.add_cert(cert, nick, trust_flags) except RuntimeError: break - def get_cert_from_db(self, nickname, pem=True): + def get_cert_from_db(self, nickname): """ Retrieve a certificate from the current NSS database for nickname. - - pem controls whether the value returned PEM or DER-encoded. The - default is the data straight from certutil -a. """ try: args = ["-L", "-n", nickname, "-a"] result = self.run_certutil(args, capture_output=True) - cert = result.output - if pem: - return cert - else: - cert, _start = find_cert_from_txt(cert, start=0) - cert = x509.strip_header(cert) - dercert = base64.b64decode(cert) - return dercert + return x509.load_pem_x509_certificate(result.raw_output) except ipautil.CalledProcessError: - return '' + return None def track_server_cert(self, nickname, principal, password_file=None, command=None): """ @@ -362,8 +350,7 @@ class CertDB(object): return cert = self.get_cert_from_db(nickname) - cert_obj = x509.load_pem_x509_certificate(cert) - subject = str(DN(cert_obj.subject)) + subject = str(DN(cert.subject)) certmonger.add_principal(request_id, principal) certmonger.add_subject(request_id, subject) @@ -392,8 +379,10 @@ class CertDB(object): try: self.issue_server_cert(self.certreq_fname, self.certder_fname) self.import_cert(self.certder_fname, nickname) + with open(self.certder_fname, "r") as f: dercert = f.read() + return x509.load_der_x509_certificate(dercert) finally: for fname in (self.certreq_fname, self.certder_fname): try: @@ -401,8 +390,6 @@ class CertDB(object): except OSError: pass - return dercert - def request_cert( self, subject, certtype="rsa", keysize="2048", san_dnsnames=None): @@ -519,8 +506,8 @@ class CertDB(object): with open(cert_fname, "w") as f: f.write(cert) - def add_cert(self, cert, nick, flags, pem=False): - self.nssdb.add_cert(cert, nick, flags, pem) + def add_cert(self, cert, nick, flags): + self.nssdb.add_cert(cert, nick, flags) def import_cert(self, cert_fname, nickname): """ @@ -594,8 +581,6 @@ class CertDB(object): newca, _st = find_cert_from_txt(newca) cacert = self.get_cert_from_db(self.cacert_name) - if cacert != '': - cacert, _st = find_cert_from_txt(cacert) if newca == cacert: return @@ -649,7 +634,7 @@ class CertDB(object): cert, st = find_cert_from_txt(certs, st) except RuntimeError: break - self.add_cert(cert, 'CA %s' % num, EMPTY_TRUST_FLAGS, pem=True) + self.add_cert(cert, 'CA %s' % num, EMPTY_TRUST_FLAGS) num += 1 # We only handle one server cert diff --git a/ipaserver/install/dogtaginstance.py b/ipaserver/install/dogtaginstance.py index a582275..e638319 100644 --- a/ipaserver/install/dogtaginstance.py +++ b/ipaserver/install/dogtaginstance.py @@ -29,7 +29,7 @@ import dbus from pki.client import PKIConnection import pki.system -from ipalib import api, errors +from ipalib import api, errors, x509 from ipalib.install import certmonger from ipaplatform import services from ipaplatform.constants import constants @@ -342,7 +342,7 @@ class DogtagInstance(service.Service): needs to get the new certificate as well. ``directive`` is the directive to update in CS.cfg - cert is a DER-encoded certificate. + cert is IPACertificate. cs_cfg is the path to the CS.cfg file """ @@ -350,7 +350,8 @@ class DogtagInstance(service.Service): installutils.set_directive( cs_cfg, directive, - base64.b64encode(cert), + # the cert must be only the base64 string without headers + base64.b64encode(cert.public_bytes(x509.Encoding.DER)), quotes=False, separator='=') diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py index c0ad242..d823635 100644 --- a/ipaserver/install/dsinstance.py +++ b/ipaserver/install/dsinstance.py @@ -234,7 +234,7 @@ class DsInstance(service.Service): self.pkcs12_info = None self.cacert_name = None self.ca_is_configured = True - self.dercert = None + self.cert = None self.idstart = None self.idmax = None self.ca_subject = None @@ -791,7 +791,7 @@ class DsInstance(service.Service): # We only handle one server cert self.nickname = server_certs[0][0] - self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False) + self.cert = dsdb.get_cert_from_db(self.nickname) if self.ca_is_configured: dsdb.track_server_cert( @@ -834,7 +834,7 @@ class DsInstance(service.Service): api.Backend.ldap2.disconnect() api.Backend.ldap2.connect() - self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False) + self.cert = dsdb.get_cert_from_db(self.nickname) if prev_helper is not None: self.add_cert_to_service() @@ -888,12 +888,12 @@ class DsInstance(service.Service): nicknames = dsdb.find_root_cert(self.cacert_name)[:-1] for nickname in nicknames: - cert = dsdb.get_cert_from_db(nickname, pem=False) + cert = dsdb.get_cert_from_db(nickname) certstore.put_ca_cert_nss(conn, self.suffix, cert, nickname, trust_flags[nickname]) nickname = self.cacert_name - cert = dsdb.get_cert_from_db(nickname, pem=False) + cert = dsdb.get_cert_from_db(nickname) cacert_flags = trust_flags[nickname] if self.setup_pkinit: cacert_flags = TrustFlags( diff --git a/ipaserver/install/httpinstance.py b/ipaserver/install/httpinstance.py index 6c56316..0b67d60 100644 --- a/ipaserver/install/httpinstance.py +++ b/ipaserver/install/httpinstance.py @@ -138,7 +138,7 @@ class HTTPInstance(service.Service): self.dm_password = dm_password self.suffix = ipautil.realm_to_suffix(self.realm) self.pkcs12_info = pkcs12_info - self.dercert = None + self.cert = None self.subject_base = subject_base self.sub_dict = dict( REALM=realm, @@ -406,7 +406,7 @@ class HTTPInstance(service.Service): nickname = server_certs[0][0] if nickname == 'ipaCert': nickname = server_certs[1][0] - self.dercert = db.get_cert_from_db(nickname, pem=False) + self.cert = db.get_cert_from_db(nickname) if self.ca_is_configured: db.track_server_cert(nickname, self.principal, db.passwd_fname, 'restart_httpd') @@ -443,7 +443,7 @@ class HTTPInstance(service.Service): if prev_helper is not None: certmonger.modify_ca_helper('IPA', prev_helper) - self.dercert = db.get_cert_from_db(self.cert_nickname, pem=False) + self.cert = db.get_cert_from_db(self.cert_nickname) if prev_helper is not None: self.add_cert_to_service() diff --git a/ipaserver/install/installutils.py b/ipaserver/install/installutils.py index 4d06f50..ff37d84 100644 --- a/ipaserver/install/installutils.py +++ b/ipaserver/install/installutils.py @@ -1054,9 +1054,8 @@ def load_pkcs12(cert_files, key_password, key_nickname, ca_cert_files, if ca_cert is None: ca_cert = cert - cert_obj = x509.load_der_x509_certificate(cert) - subject = DN(cert_obj.subject) - issuer = DN(cert_obj.issuer) + subject = DN(cert.subject) + issuer = DN(cert.issuer) if subject == issuer: break @@ -1183,11 +1182,9 @@ def load_external_cert(files, ca_subject): ca_nickname = None cache = {} for nickname, _trust_flags in nssdb.list_certs(): - cert = nssdb.get_cert(nickname, pem=True) - - cert_obj = x509.load_pem_x509_certificate(cert) - subject = DN(cert_obj.subject) - issuer = DN(cert_obj.issuer) + cert = nssdb.get_cert(nickname) + subject = DN(cert.subject) + issuer = DN(cert.issuer) cache[nickname] = (cert, subject, issuer) if subject == ca_subject: @@ -1220,11 +1217,11 @@ def load_external_cert(files, ca_subject): (subject, ", ".join(files), e)) cert_file = tempfile.NamedTemporaryFile() - cert_file.write(ca_cert_chain[0] + '\n') + cert_file.write(ca_cert_chain[0].public_bytes(x509.Encoding.PEM) + b'\n') cert_file.flush() ca_file = tempfile.NamedTemporaryFile() - ca_file.write('\n'.join(ca_cert_chain[1:]) + '\n') + x509.write_certificate_list(ca_cert_chain[1:], ca_file.name) ca_file.flush() return cert_file, ca_file diff --git a/ipaserver/install/ipa_cacert_manage.py b/ipaserver/install/ipa_cacert_manage.py index 626968d..24ef86f 100644 --- a/ipaserver/install/ipa_cacert_manage.py +++ b/ipaserver/install/ipa_cacert_manage.py @@ -22,7 +22,6 @@ from __future__ import print_function import logging import os from optparse import OptionGroup # pylint: disable=deprecated-module -from cryptography.hazmat.primitives import serialization import gssapi from ipalib.install import certmonger, certstore @@ -161,7 +160,7 @@ class CACertManage(admintool.AdminTool): "Found certmonger request id %r", self.request_id) db = certs.CertDB(api.env.realm, nssdir=paths.PKI_TOMCAT_ALIAS_DIR) - cert = db.get_cert_from_db(self.cert_nickname, pem=False) + cert = db.get_cert_from_db(self.cert_nickname) options = self.options if options.external_cert_files: @@ -170,7 +169,7 @@ class CACertManage(admintool.AdminTool): if options.self_signed is not None: self_signed = options.self_signed else: - self_signed = x509.is_self_signed(cert, x509.DER) + self_signed = cert.is_self_signed() if self_signed: return self.renew_self_signed(ca) @@ -205,38 +204,28 @@ class CACertManage(admintool.AdminTool): "--external-cert-file=/path/to/signed_certificate " "--external-cert-file=/path/to/external_ca_certificate") - def renew_external_step_2(self, ca, old_cert_der): + def renew_external_step_2(self, ca, old_cert): print("Importing the renewed CA certificate, please wait") options = self.options conn = api.Backend.ldap2 - old_cert_obj = x509.load_certificate(old_cert_der, x509.DER) - old_der_subject = x509.get_der_subject(old_cert_der, x509.DER) - old_spki = old_cert_obj.public_key().public_bytes( - serialization.Encoding.DER, - serialization.PublicFormat.SubjectPublicKeyInfo - ) + old_spki = old_cert.public_key_info_bytes cert_file, ca_file = installutils.load_external_cert( - options.external_cert_files, DN(old_cert_obj.subject)) + options.external_cert_files, DN(old_cert.subject)) with open(cert_file.name) as f: new_cert_data = f.read() - new_cert_der = x509.normalize_certificate(new_cert_data) - new_cert_obj = x509.load_certificate(new_cert_der, x509.DER) - new_der_subject = x509.get_der_subject(new_cert_der, x509.DER) - new_spki = new_cert_obj.public_key().public_bytes( - serialization.Encoding.DER, - serialization.PublicFormat.SubjectPublicKeyInfo - ) - - if new_cert_obj.subject != old_cert_obj.subject: + new_cert = x509.load_pem_x509_certificate(new_cert_data) + new_spki = new_cert.public_key_info_bytes + + if new_cert.subject != old_cert.subject: raise admintool.ScriptError( "Subject name mismatch (visit " "http://www.freeipa.org/page/Troubleshooting for " "troubleshooting guide)") - if new_der_subject != old_der_subject: + if new_cert.subject_bytes != old_cert.subject_bytes: raise admintool.ScriptError( "Subject name encoding mismatch (visit " "http://www.freeipa.org/page/Troubleshooting for " @@ -249,19 +238,18 @@ class CACertManage(admintool.AdminTool): with certs.NSSDatabase() as tmpdb: tmpdb.create_db() - tmpdb.add_cert(old_cert_der, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS) + tmpdb.add_cert(old_cert, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS) try: - tmpdb.add_cert(new_cert_der, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS) + tmpdb.add_cert(new_cert, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS) except ipautil.CalledProcessError as e: raise admintool.ScriptError( "Not compatible with the current CA certificate: %s" % e) ca_certs = x509.load_certificate_list_from_file(ca_file.name) for ca_cert in ca_certs: - data = ca_cert.public_bytes(serialization.Encoding.DER) tmpdb.add_cert( - data, str(DN(ca_cert.subject)), EXTERNAL_CA_TRUST_FLAGS) + ca_cert, str(DN(ca_cert.subject)), EXTERNAL_CA_TRUST_FLAGS) try: tmpdb.verify_ca_cert_validity('IPA CA') @@ -286,6 +274,8 @@ class CACertManage(admintool.AdminTool): dn = DN(('cn', self.cert_nickname), ('cn', 'ca_renewal'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn) + + new_cert_der = new_cert.public_bytes(x509.Encoding.DER) try: entry = conn.get_entry(dn, ['usercertificate']) entry['usercertificate'] = [new_cert_der] @@ -338,15 +328,14 @@ class CACertManage(admintool.AdminTool): cert_filename = self.args[1] try: - cert_obj = x509.load_certificate_from_file(cert_filename) + cert = x509.load_certificate_from_file(cert_filename) except IOError as e: raise admintool.ScriptError( "Can't open \"%s\": %s" % (cert_filename, e)) except (TypeError, ValueError) as e: raise admintool.ScriptError("Not a valid certificate: %s" % e) - cert = cert_obj.public_bytes(serialization.Encoding.DER) - nickname = options.nickname or str(DN(cert_obj.subject)) + nickname = options.nickname or str(DN(cert.subject)) ca_certs = certstore.get_ca_certs_nss(api.Backend.ldap2, api.env.basedn, diff --git a/ipaserver/install/ipa_server_certinstall.py b/ipaserver/install/ipa_server_certinstall.py index 9c8f6e8..4ea1217 100644 --- a/ipaserver/install/ipa_server_certinstall.py +++ b/ipaserver/install/ipa_server_certinstall.py @@ -212,8 +212,7 @@ class ServerCertInstall(admintool.AdminTool): # Start tracking only if the cert was issued by IPA CA # Retrieve IPA CA ipa_ca_cert = cdb.get_cert_from_db( - get_ca_nickname(api.env.realm), - pem=False) + get_ca_nickname(api.env.realm)) # And compare with the CA which signed this certificate if ca_cert == ipa_ca_cert: certmonger.start_tracking( @@ -289,8 +288,7 @@ class ServerCertInstall(admintool.AdminTool): # Start tracking only if the cert was issued by IPA CA # Retrieve IPA CA ipa_ca_cert = cdb.get_cert_from_db( - get_ca_nickname(api.env.realm), - pem=False) + get_ca_nickname(api.env.realm)) # And compare with the CA which signed this certificate if ca_cert == ipa_ca_cert: cdb.track_server_cert(server_cert, diff --git a/ipaserver/install/krainstance.py b/ipaserver/install/krainstance.py index 29aaad5..5a780b2 100644 --- a/ipaserver/install/krainstance.py +++ b/ipaserver/install/krainstance.py @@ -27,7 +27,6 @@ import six # pylint: disable=import-error from six.moves.configparser import RawConfigParser # pylint: enable=import-error -from cryptography.hazmat.primitives import serialization from ipalib import api from ipalib import x509 @@ -304,7 +303,7 @@ class KRAInstance(DogtagInstance): # get RA agent certificate cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM) - cert_data = cert.public_bytes(serialization.Encoding.DER) + cert_data = cert.public_bytes(x509.Encoding.DER) # connect to KRA database conn = ldap2.ldap2(api) diff --git a/ipaserver/install/plugins/upload_cacrt.py b/ipaserver/install/plugins/upload_cacrt.py index 7b7e65e..25faa6e 100644 --- a/ipaserver/install/plugins/upload_cacrt.py +++ b/ipaserver/install/plugins/upload_cacrt.py @@ -22,7 +22,7 @@ import logging from ipalib.install import certstore from ipaplatform.paths import paths from ipaserver.install import certs -from ipalib import Registry, errors +from ipalib import Registry, errors, x509 from ipalib import Updater from ipapython import certdb from ipapython.dn import DN @@ -60,7 +60,7 @@ class update_upload_cacrt(Updater): continue if nickname == ca_nickname and ca_enabled: trust_flags = certdb.IPA_CA_TRUST_FLAGS - cert = db.get_cert_from_db(nickname, pem=False) + cert = db.get_cert_from_db(nickname) trust, _ca, eku = certstore.trust_flags_to_key_policy(trust_flags) dn = DN(('cn', nickname), ('cn', 'certificates'), ('cn', 'ipa'), @@ -90,6 +90,7 @@ class update_upload_cacrt(Updater): pass if ca_cert: + dercert = ca_cert.public_bytes(x509.Encoding.DER) dn = DN(('cn', 'CACert'), ('cn', 'ipa'), ('cn','etc'), self.api.env.basedn) try: @@ -98,11 +99,11 @@ class update_upload_cacrt(Updater): entry = ldap.make_entry(dn) entry['objectclass'] = ['nsContainer', 'pkiCA'] entry.single_value['cn'] = 'CAcert' - entry.single_value['cACertificate;binary'] = ca_cert + entry.single_value['cACertificate;binary'] = dercert ldap.add_entry(entry) else: if b'' in entry['cACertificate;binary']: - entry.single_value['cACertificate;binary'] = ca_cert + entry.single_value['cACertificate;binary'] = dercert ldap.update_entry(entry) return False, [] diff --git a/ipaserver/install/server/upgrade.py b/ipaserver/install/server/upgrade.py index 4eb2c9e..5e1d74b 100644 --- a/ipaserver/install/server/upgrade.py +++ b/ipaserver/install/server/upgrade.py @@ -1602,7 +1602,7 @@ def disable_httpd_system_trust(http): db = certs.CertDB(api.env.realm, nssdir=paths.HTTPD_ALIAS_DIR) for nickname, trust_flags in db.list_certs(): if not trust_flags.has_key: - cert = db.get_cert_from_db(nickname, pem=False) + cert = db.get_cert_from_db(nickname) if cert: ca_certs.append((cert, nickname, trust_flags)) diff --git a/ipaserver/install/service.py b/ipaserver/install/service.py index 49cf022..d2c3bbd 100644 --- a/ipaserver/install/service.py +++ b/ipaserver/install/service.py @@ -32,7 +32,7 @@ from ipalib.install import certstore, sysrestore from ipapython import ipautil from ipapython.dn import DN from ipapython import kerberos -from ipalib import api, errors +from ipalib import api, errors, x509 from ipaplatform import services from ipaplatform.paths import paths @@ -245,7 +245,7 @@ class Service(object): self.suffix = DN() self.service_prefix = service_prefix self.keytab = keytab - self.dercert = None + self.cert = None self.api = api self.service_user = service_user self.keytab_user = service_user @@ -370,7 +370,8 @@ class Service(object): dn = DN(('krbprincipalname', self.principal), ('cn', 'services'), ('cn', 'accounts'), self.suffix) entry = api.Backend.ldap2.get_entry(dn) - entry.setdefault('userCertificate', []).append(self.dercert) + entry.setdefault('userCertificate', []).append( + self.cert.public_bytes(x509.Encoding.DER)) try: api.Backend.ldap2.update_entry(entry) except Exception as e: diff --git a/ipaserver/plugins/ca.py b/ipaserver/plugins/ca.py index 8db6ec5..6557ce2 100644 --- a/ipaserver/plugins/ca.py +++ b/ipaserver/plugins/ca.py @@ -181,8 +181,8 @@ def set_certificate_attrs(entry, options, want_cert=True): if want_chain or full: pkcs7_der = ca_api.read_ca_chain(ca_id) - pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER) - ders = [x509.normalize_certificate(pem) for pem in pems] + certs = x509.pkcs7_to_certs(pkcs7_der, x509.DER) + ders = [cert.public_bytes(x509.Encoding.DER) for cert in certs] entry['certificate_chain'] = ders diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index b2c5af8..1b2991b 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -490,7 +490,8 @@ class BaseCertObject(Object): """ if 'certificate' in obj: - cert = x509.load_pem_x509_certificate(obj['certificate']) + cert = x509.load_der_x509_certificate( + base64.b64decode(obj['certificate'])) obj['subject'] = DN(cert.subject) obj['issuer'] = DN(cert.issuer) obj['serial_number'] = cert.serial_number @@ -505,7 +506,7 @@ class BaseCertObject(Object): cert.fingerprint(hashes.SHA256())) general_names = x509.process_othernames( - x509.get_san_general_names(cert)) + cert.san_general_names) for gn in general_names: try: @@ -911,7 +912,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): profile = api.Command['certprofile_show'](profile_id) store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE' if store and 'certificate' in result: - cert = str(result.get('certificate')) + cert = result.get('certificate') kwargs = dict(addattr=u'usercertificate={}'.format(cert)) # note: we call different commands for the different # principal types because handling of 'userCertificate' @@ -927,7 +928,8 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): "used for krbtgt certificates") if 'certificate_chain' in ca_obj: - cert = x509.load_pem_x509_certificate(result['certificate']) + cert = x509.load_der_x509_certificate( + base64.b64decode(result['certificate'])) cert = cert.public_bytes(serialization.Encoding.DER) result['certificate_chain'] = [cert] + ca_obj['certificate_chain'] @@ -1191,7 +1193,8 @@ class cert_show(Retrieve, CertMethod, VirtualCommand): # we don't tell Dogtag the issuer (but we check the cert after). # result = self.Backend.ra.get_certificate(str(serial_number)) - cert = x509.load_pem_x509_certificate(result['certificate']) + cert = x509.load_der_x509_certificate( + base64.b64decode(result['certificate'])) try: self.check_access() diff --git a/ipaserver/plugins/certmap.py b/ipaserver/plugins/certmap.py index ee8938c..f26bf67 100644 --- a/ipaserver/plugins/certmap.py +++ b/ipaserver/plugins/certmap.py @@ -17,7 +17,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import base64 import logging import dbus @@ -439,12 +438,14 @@ class _sssd(object): :raise RemoteRetrieveError: if DBus error occurs """ try: - pem = x509.make_pem(base64.b64encode(cert)) + cert_obj = x509.load_der_x509_certificate(cert) # bug 3306 in sssd returns 0 entry when max_entries = 0 # Temp workaround is to use a non-null value, not too high # to avoid reserving unneeded memory max_entries = dbus.UInt32(100) - user_paths = self._users_iface.ListByCertificate(pem, max_entries) + user_paths = self._users_iface.ListByCertificate( + cert_obj.public_bytes(x509.Encoding.PEM), + max_entries) users = dict() for user_path in user_paths: user_obj = self._bus.get_object(DBUS_SSSD_NAME, user_path) diff --git a/ipatests/test_ipalib/test_x509.py b/ipatests/test_ipalib/test_x509.py index 3325180..904152a 100644 --- a/ipatests/test_ipalib/test_x509.py +++ b/ipatests/test_ipalib/test_x509.py @@ -81,11 +81,11 @@ class test_x509(object): # Load a good cert with headers and leading text newcert = ( - 'leading text\n' + goodcert_headers) + b'leading text\n' + goodcert_headers) x509.load_pem_x509_certificate(newcert) # Load a good cert with bad headers - newcert = '-----BEGIN CERTIFICATE-----' + goodcert_headers + newcert = b'-----BEGIN CERTIFICATE-----' + goodcert_headers with pytest.raises((TypeError, ValueError)): x509.load_pem_x509_certificate(newcert) diff --git a/ipatests/test_ipaserver/test_ldap.py b/ipatests/test_ipaserver/test_ldap.py index a9e4256..88638de 100644 --- a/ipatests/test_ipaserver/test_ldap.py +++ b/ipatests/test_ipaserver/test_ldap.py @@ -78,9 +78,8 @@ class test_ldap(object): self.conn.connect(autobind=AUTOBIND_DISABLED) entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = x509.load_certificate(cert, x509.DER).serial_number - assert serial is not None + cert = x509.load_der_x509_certificate(cert[0]) + assert cert.serial_number is not None def test_simple(self): """ @@ -96,9 +95,8 @@ class test_ldap(object): self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password) entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = x509.load_certificate(cert, x509.DER).serial_number - assert serial is not None + cert = x509.load_der_x509_certificate(cert[0]) + assert cert.serial_number is not None def test_Backend(self): """ @@ -123,9 +121,8 @@ class test_ldap(object): result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,)) entry_attrs = result['result'] cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = x509.load_certificate(cert, x509.DER).serial_number - assert serial is not None + cert = x509.load_der_x509_certificate(cert[0]) + assert cert.serial_number is not None def test_autobind(self): """ @@ -138,9 +135,8 @@ class test_ldap(object): raise nose.SkipTest("Only executed as root") entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = x509.load_certificate(cert, x509.DER).serial_number - assert serial is not None + cert = x509.load_der_x509_certificate(cert[0]) + assert cert.serial_number is not None @pytest.mark.tier0 diff --git a/ipatests/test_xmlrpc/testcert.py b/ipatests/test_xmlrpc/testcert.py index c27ea95..1519191 100644 --- a/ipatests/test_xmlrpc/testcert.py +++ b/ipatests/test_xmlrpc/testcert.py @@ -29,6 +29,7 @@ import os import tempfile import shutil import six +import base64 from ipalib import api, x509 from ipaserver.plugins import rabase @@ -96,4 +97,6 @@ def makecert(reqdir, subject, principal): csr = unicode(generate_csr(reqdir, pwname, str(subject))) res = api.Command['cert_request'](csr, principal=principal, add=True) - return x509.make_pem(res['result']['certificate']) + cert = x509.load_der_x509_certificate( + base64.b64decode(res['result']['certificate'])) + return cert.public_bytes(x509.Encoding.PEM).decode('utf-8')