From de695e688e390deef5510dca0daef133f50f490f Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Jul 30 2014 14:04:21 +0000 Subject: Add certificate store module ipalib.certstore. Part of https://fedorahosted.org/freeipa/ticket/3259 Part of https://fedorahosted.org/freeipa/ticket/3520 Reviewed-By: Rob Crittenden --- diff --git a/ipalib/certstore.py b/ipalib/certstore.py new file mode 100644 index 0000000..8a9b410 --- /dev/null +++ b/ipalib/certstore.py @@ -0,0 +1,397 @@ +# Authors: +# Jan Cholasta +# +# Copyright (C) 2014 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +LDAP shared certificate store. +""" + +from nss.error import NSPRError +from pyasn1.error import PyAsn1Error + +from ipapython.dn import DN +from ipapython.certdb import get_ca_nickname +from ipalib import errors, x509 + + +def _parse_cert(dercert): + try: + subject = x509.get_subject(dercert, x509.DER) + issuer = x509.get_issuer(dercert, x509.DER) + serial_number = x509.get_serial_number(dercert, x509.DER) + public_key_info = x509.get_der_public_key_info(dercert, x509.DER) + except (NSPRError, PyAsn1Error), e: + raise ValueError("failed to decode certificate: %s" % e) + + subject = str(subject).replace('\\;', '\\3b') + issuer = str(issuer).replace('\\;', '\\3b') + issuer_serial = '%s;%s' % (issuer, serial_number) + + return subject, issuer_serial, public_key_info + + +def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): + """ + Initialize certificate store entry for a CA certificate. + """ + subject, issuer_serial, public_key = _parse_cert(dercert) + + if ext_key_usage is not None: + try: + cert_eku = x509.get_ext_key_usage(dercert, x509.DER) + except NSPRError, e: + raise ValueError("failed to decode certificate: %s" % e) + if cert_eku is not None: + cert_eku -= {x509.EKU_SERVER_AUTH, x509.EKU_CLIENT_AUTH, + x509.EKU_EMAIL_PROTECTION, x509.EKU_CODE_SIGNING, + x509.EKU_ANY, x509.EKU_PLACEHOLDER} + ext_key_usage = ext_key_usage | cert_eku + + entry['objectClass'] = ['ipaCertificate', 'pkiCA', 'ipaKeyPolicy'] + entry['cn'] = [nickname] + + entry['ipaCertSubject'] = [subject] + entry['ipaCertIssuerSerial'] = [issuer_serial] + entry['ipaPublicKey'] = [public_key] + entry['cACertificate;binary'] = [dercert] + + if trusted is not None: + entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted'] + if ext_key_usage is not None: + ext_key_usage = list(ext_key_usage) + if not ext_key_usage: + ext_key_usage.append(x509.EKU_PLACEHOLDER) + entry['ipaKeyExtUsage'] = ext_key_usage + + +def update_compat_ca(ldap, base_dn, dercert): + """ + Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX. + """ + dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn) + try: + entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary']) + entry.single_value['cACertificate;binary'] = dercert + ldap.update_entry(entry) + except errors.NotFound: + entry = ldap.make_entry(dn) + entry['objectClass'] = ['nsContainer', 'pkiCA'] + entry.single_value['cn'] = 'CAcert' + entry.single_value['cACertificate;binary'] = dercert + ldap.add_entry(entry) + except errors.EmptyModlist: + pass + + +def clean_old_config(ldap, base_dn, dn, config_ipa, config_compat): + """ + Remove ipaCA and compatCA flags from their previous carriers. + """ + if not config_ipa and not config_compat: + return + + try: + result, truncated = ldap.find_entries( + base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'), + base_dn), + filter='(|(ipaConfigString=ipaCA)(ipaConfigString=compatCA))', + attrs_list=['ipaConfigString']) + except errors.NotFound: + return + + for entry in result: + if entry.dn == dn: + continue + for config in list(entry['ipaConfigString']): + if config.lower() == 'ipaca' and config_ipa: + entry['ipaConfigString'].remove(config) + elif config.lower() == 'compatca' and config_compat: + entry['ipaConfigString'].remove(config) + try: + ldap.update_entry(entry) + except errors.EmptyModlist: + pass + + +def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, + ext_key_usage=None, config_ipa=False, config_compat=False): + """ + Add new entry for a CA certificate to the certificate store. + """ + container_dn = DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'), + base_dn) + dn = DN(('cn', nickname), container_dn) + entry = ldap.make_entry(dn) + + init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage) + + if config_ipa: + entry.setdefault('ipaConfigString', []).append('ipaCA') + if config_compat: + entry.setdefault('ipaConfigString', []).append('compatCA') + + if config_compat: + update_compat_ca(ldap, base_dn, dercert) + + ldap.add_entry(entry) + clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) + + +def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, + config_ipa=False, config_compat=False): + """ + Update existing entry for a CA certificate in the certificate store. + """ + subject, issuer_serial, public_key = _parse_cert(dercert) + + filter = ldap.make_filter({'ipaCertSubject': subject}) + result, truncated = ldap.find_entries( + base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'), + base_dn), + filter=filter, + attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial', + 'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage', + 'ipaConfigString', 'cACertificate;binary']) + entry = result[0] + dn = entry.dn + + for old_cert in entry['cACertificate;binary']: + # Check if we are adding a new cert + if old_cert == dercert: + break + else: + # We are adding a new cert, validate it + if entry.single_value['ipaCertSubject'].lower() != subject.lower(): + raise ValueError("subject name mismatch") + if entry.single_value['ipaPublicKey'] != public_key: + raise ValueError("subject public key info mismatch") + entry['ipaCertIssuerSerial'].append(issuer_serial) + entry['cACertificate;binary'].append(dercert) + + # Update key trust + if trusted is not None: + old_trust = entry.single_value.get('ipaKeyTrust') + new_trust = 'trusted' if trusted else 'distrusted' + if old_trust is not None and old_trust.lower() != new_trust: + raise ValueError("inconsistent trust") + entry.single_value['ipaKeyTrust'] = new_trust + + # Update extended key usage + if trusted is not False: + if ext_key_usage is not None: + old_eku = set(entry.get('ipaKeyExtUsage', [])) + old_eku.discard(x509.EKU_PLACEHOLDER) + new_eku = old_eku | ext_key_usage + if not new_eku: + new_eku.add(x509.EKU_PLACEHOLDER) + entry['ipaKeyExtUsage'] = list(new_eku) + else: + entry.pop('ipaKeyExtUsage', None) + + # Update configuration flags + is_ipa = False + is_compat = False + for config in entry.get('ipaConfigString', []): + if config.lower() == 'ipaca': + is_ipa = True + elif config.lower() == 'compatca': + is_compat = True + if config_ipa and not is_ipa: + entry.setdefault('ipaConfigString', []).append('ipaCA') + if config_compat and not is_compat: + entry.setdefault('ipaConfigString', []).append('compatCA') + + if is_compat or config_compat: + update_compat_ca(ldap, base_dn, dercert) + + ldap.update_entry(entry) + clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) + + +def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, + ext_key_usage=None, config_ipa=False, config_compat=False): + """ + Add or update entry for a CA certificate in the certificate store. + """ + try: + update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage, + config_ipa=config_ipa, config_compat=config_compat) + except errors.NotFound: + add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + config_ipa=config_ipa, config_compat=config_compat) + except errors.EmptyModlist: + pass + + +def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, + filter_subject=None): + """ + Get CA certificates and associated key policy from the certificate store. + """ + if filter_subject is not None: + if not isinstance(filter_subject, list): + filter_subject = [filter_subject] + filter_subject = [str(subj).replace('\\;', '\\3b') + for subj in filter_subject] + + config_dn = DN(('cn', 'ipa'), ('cn', 'etc'), base_dn) + container_dn = DN(('cn', 'certificates'), config_dn) + try: + # Search the certificate store for CA certificate entries + filters = ['(objectClass=ipaCertificate)', '(objectClass=pkiCA)'] + if filter_subject: + filter = ldap.make_filter({'ipaCertSubject': filter_subject}) + filters.append(filter) + result, truncated = ldap.find_entries( + base_dn=container_dn, + filter=ldap.combine_filters(filters, ldap.MATCH_ALL), + attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial', + 'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage', + 'cACertificate;binary']) + + certs = [] + for entry in result: + nickname = entry.single_value['cn'] + trusted = entry.single_value.get('ipaKeyTrust', 'unknown').lower() + if trusted == 'trusted': + trusted = True + elif trusted == 'distrusted': + trusted = False + else: + trusted = None + ext_key_usage = entry.get('ipaKeyExtUsage') + if ext_key_usage is not None: + ext_key_usage = set(str(p) for p in ext_key_usage) + ext_key_usage.discard(x509.EKU_PLACEHOLDER) + + for cert in entry.get('cACertificate;binary', []): + certs.append((cert, nickname, trusted, ext_key_usage)) + + return certs + except errors.NotFound: + try: + ldap.get_entry(container_dn, ['']) + except errors.NotFound: + pass + else: + return [] + + # Fallback to cn=CAcert,cn=ipa,cn=etc,SUFFIX + dn = DN(('cn', 'CAcert'), config_dn) + entry = ldap.get_entry(dn, ['cACertificate;binary']) + + cert = entry.single_value['cACertificate;binary'] + subject, issuer_serial, public_key_info = _parse_cert(cert) + if filter_subject is not None and subject not in filter_subject: + return [] + + nickname = get_ca_nickname(compat_realm) + ext_key_usage = {x509.EKU_SERVER_AUTH} + if compat_ipa_ca: + ext_key_usage |= {x509.EKU_CLIENT_AUTH, + x509.EKU_EMAIL_PROTECTION, + x509.EKU_CODE_SIGNING} + + return [(cert, nickname, True, ext_key_usage)] + + +def trust_flags_to_key_policy(trust_flags): + """ + Convert certutil trust flags to certificate store key policy. + """ + if 'p' in trust_flags: + if 'C' in trust_flags or 'P' in trust_flags or 'T' in trust_flags: + raise ValueError("cannot be both trusted and not trusted") + return False, None, None + elif 'C' in trust_flags or 'T' in trust_flags: + if 'P' in trust_flags: + raise ValueError("cannot be both CA and not CA") + ca = True + elif 'P' in trust_flags: + ca = False + else: + return None, None, set() + + trust_flags = trust_flags.split(',') + ext_key_usage = set() + for i, kp in enumerate((x509.EKU_SERVER_AUTH, + x509.EKU_EMAIL_PROTECTION, + x509.EKU_CODE_SIGNING)): + if 'C' in trust_flags[i] or 'P' in trust_flags[i]: + ext_key_usage.add(kp) + if 'T' in trust_flags[0]: + ext_key_usage.add(x509.EKU_CLIENT_AUTH) + + return True, ca, ext_key_usage + + +def key_policy_to_trust_flags(trusted, ca, ext_key_usage): + """ + Convert certificate store key policy to certutil trust flags. + """ + if trusted is False: + return 'p,p,p' + elif trusted is None or ca is None: + return ',,' + elif ext_key_usage is None: + if ca: + return 'CT,C,C' + else: + return 'P,P,P' + + trust_flags = ['', '', ''] + for i, kp in enumerate((x509.EKU_SERVER_AUTH, + x509.EKU_EMAIL_PROTECTION, + x509.EKU_CODE_SIGNING)): + if kp in ext_key_usage: + trust_flags[i] += ('C' if ca else 'P') + if ca and x509.EKU_CLIENT_AUTH in ext_key_usage: + trust_flags[0] += 'T' + + trust_flags = ','.join(trust_flags) + return trust_flags + + +def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags, + config_ipa=False, config_compat=False): + """ + Add or update entry for a CA certificate in the certificate store. + """ + trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags) + if ca is False: + raise ValueError("must be CA certificate") + + put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + config_ipa, config_compat) + + +def get_ca_certs_nss(ldap, base_dn, compat_realm, compat_ipa_ca, + filter_subject=None): + """ + Get CA certificates and associated trust flags from the certificate store. + """ + nss_certs = [] + + certs = get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, + filter_subject=filter_subject) + for cert, nickname, trusted, ext_key_usage in certs: + trust_flags = key_policy_to_trust_flags(trusted, True, ext_key_usage) + nss_certs.append((cert, nickname, trust_flags)) + + return nss_certs