From 5a44ca638310913ab6b0c239374f4b0ddeeedeb3 Mon Sep 17 00:00:00 2001 From: Stanislav Laznicka Date: Jul 27 2017 08:28:58 +0000 Subject: Create a Certificate parameter Up until now, Bytes parameter was used for certificate parameters throughout the framework. However, the Bytes parameter does nothing special for certificates, like validation, so this had to be done for each of the parameters which were supposed to represent a certificate. This commit introduces a special Certificate parameter which takes care of certificate validation so this does not have to be done separately. It also makes sure that the certificates represented by this parameter are always converted to DER format so that we can work with them in a unified manner throughout the framework. This commit also makes it possible to pass bytes directly during instantiation of the Certificate parameter and they are still represented correctly after their conversion in the _convert_scalar() method. https://pagure.io/freeipa/issue/4985 Reviewed-By: Fraser Tweedale Reviewed-By: Rob Crittenden Reviewed-By: Martin Basti --- diff --git a/API.txt b/API.txt index 08ca919..4b06a96 100644 --- a/API.txt +++ b/API.txt @@ -744,7 +744,7 @@ args: 1,29,4 arg: Str('criteria?') option: Flag('all', autofill=True, cli_name='all', default=False) option: Str('cacn?', cli_name='ca') -option: Bytes('certificate?', autofill=False) +option: Certificate('certificate?', autofill=False) option: Flag('exactly?', autofill=True, default=False) option: Str('host*', cli_name='hosts') option: DateTime('issuedon_from?', autofill=False) @@ -828,7 +828,7 @@ output: Output('summary', type=[, ]) output: PrimaryKey('value') command: certmap_match/1 args: 1,3,4 -arg: Bytes('certificate', cli_name='certificate') +arg: Certificate('certificate', cli_name='certificate') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Str('version?') @@ -2434,7 +2434,7 @@ option: Str('nsosversion?', cli_name='os') option: Flag('random?', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Str('setattr*', cli_name='setattr') -option: Bytes('usercertificate*', cli_name='certificate') +option: Certificate('usercertificate*', cli_name='certificate') option: Str('userclass*', cli_name='class') option: Str('userpassword?', cli_name='password') option: Str('version?') @@ -2447,7 +2447,7 @@ arg: Str('fqdn', cli_name='hostname') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -2580,7 +2580,7 @@ option: Flag('pkey_only?', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Int('sizelimit?', autofill=False) option: Int('timelimit?', autofill=False) -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Str('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -2613,7 +2613,7 @@ option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Flag('rights', autofill=True, default=False) option: Str('setattr*', cli_name='setattr') option: Flag('updatedns?', autofill=True, default=False) -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Str('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -2626,7 +2626,7 @@ arg: Str('fqdn', cli_name='hostname') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -2862,7 +2862,7 @@ option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Str('setattr*', cli_name='setattr') option: Str('uid?', cli_name='login') option: Int('uidnumber?', cli_name='uid') -option: Bytes('usercertificate*', cli_name='certificate') +option: Certificate('usercertificate*', cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -2874,7 +2874,7 @@ arg: Str('ipaanchoruuid', cli_name='anchor') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('fallback_to_ldap?', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -2934,7 +2934,7 @@ option: Flag('rights', autofill=True, default=False) option: Str('setattr*', cli_name='setattr') option: Str('uid?', autofill=False, cli_name='login') option: Int('uidnumber?', autofill=False, cli_name='uid') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -2946,7 +2946,7 @@ arg: Str('ipaanchoruuid', cli_name='anchor') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('fallback_to_ldap?', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4470,7 +4470,7 @@ option: Str('krbprincipalauthind*', cli_name='auth_ind') option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Str('setattr*', cli_name='setattr') -option: Bytes('usercertificate*', cli_name='certificate') +option: Certificate('usercertificate*', cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4481,7 +4481,7 @@ arg: Principal('krbcanonicalname', cli_name='canonical_principal') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4615,7 +4615,7 @@ option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Flag('rights', autofill=True, default=False) option: Str('setattr*', cli_name='setattr') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4626,7 +4626,7 @@ arg: Principal('krbcanonicalname', cli_name='canonical_principal') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4880,7 +4880,7 @@ option: Str('street?', cli_name='street') option: Str('telephonenumber*', cli_name='phone') option: Str('title?') option: Int('uidnumber?', cli_name='uid') -option: Bytes('usercertificate*', cli_name='certificate') +option: Certificate('usercertificate*', cli_name='certificate') option: Str('userclass*', cli_name='class') option: Password('userpassword?', cli_name='password') option: Str('version?') @@ -4893,7 +4893,7 @@ arg: Str('uid', cli_name='login') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -4903,7 +4903,7 @@ args: 2,7,3 arg: Str('uid', cli_name='login') arg: Str('ipacertmapdata*', alwaysask=False, cli_name='certmapdata') option: Flag('all', autofill=True, cli_name='all', default=False) -option: Bytes('certificate*', cli_name='certificate') +option: Certificate('certificate*', cli_name='certificate') option: DNParam('issuer?', cli_name='issuer') option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) @@ -4995,7 +4995,7 @@ option: Int('timelimit?', autofill=False) option: Str('title?', autofill=False) option: Str('uid?', autofill=False, cli_name='login') option: Int('uidnumber?', autofill=False, cli_name='uid') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Password('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -5049,7 +5049,7 @@ option: Str('street?', autofill=False, cli_name='street') option: Str('telephonenumber*', autofill=False, cli_name='phone') option: Str('title?', autofill=False) option: Int('uidnumber?', autofill=False, cli_name='uid') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Password('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -5062,7 +5062,7 @@ arg: Str('uid', cli_name='login') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -5072,7 +5072,7 @@ args: 2,7,3 arg: Str('uid', cli_name='login') arg: Str('ipacertmapdata*', alwaysask=False, cli_name='certmapdata') option: Flag('all', autofill=True, cli_name='all', default=False) -option: Bytes('certificate*', cli_name='certificate') +option: Certificate('certificate*', cli_name='certificate') option: DNParam('issuer?', cli_name='issuer') option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) @@ -5947,7 +5947,7 @@ option: Str('street?', cli_name='street') option: Str('telephonenumber*', cli_name='phone') option: Str('title?') option: Int('uidnumber?', cli_name='uid') -option: Bytes('usercertificate*', cli_name='certificate') +option: Certificate('usercertificate*', cli_name='certificate') option: Str('userclass*', cli_name='class') option: Password('userpassword?', cli_name='password') option: Str('version?') @@ -5960,7 +5960,7 @@ arg: Str('uid', cli_name='login') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -5970,7 +5970,7 @@ args: 2,7,3 arg: Str('uid', cli_name='login') arg: Str('ipacertmapdata*', alwaysask=False, cli_name='certmapdata') option: Flag('all', autofill=True, cli_name='all', default=False) -option: Bytes('certificate*', cli_name='certificate') +option: Certificate('certificate*', cli_name='certificate') option: DNParam('issuer?', cli_name='issuer') option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) @@ -6079,7 +6079,7 @@ option: Int('timelimit?', autofill=False) option: Str('title?', autofill=False) option: Str('uid?', autofill=False, cli_name='login') option: Int('uidnumber?', autofill=False, cli_name='uid') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Password('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -6135,7 +6135,7 @@ option: Str('street?', autofill=False, cli_name='street') option: Str('telephonenumber*', autofill=False, cli_name='phone') option: Str('title?', autofill=False) option: Int('uidnumber?', autofill=False, cli_name='uid') -option: Bytes('usercertificate*', autofill=False, cli_name='certificate') +option: Certificate('usercertificate*', autofill=False, cli_name='certificate') option: Str('userclass*', autofill=False, cli_name='class') option: Password('userpassword?', autofill=False, cli_name='password') option: Str('version?') @@ -6148,7 +6148,7 @@ arg: Str('uid', cli_name='login') option: Flag('all', autofill=True, cli_name='all', default=False) option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) -option: Bytes('usercertificate+', alwaysask=True, cli_name='certificate') +option: Certificate('usercertificate+', alwaysask=True, cli_name='certificate') option: Str('version?') output: Entry('result') output: Output('summary', type=[, ]) @@ -6158,7 +6158,7 @@ args: 2,7,3 arg: Str('uid', cli_name='login') arg: Str('ipacertmapdata*', alwaysask=False, cli_name='certmapdata') option: Flag('all', autofill=True, cli_name='all', default=False) -option: Bytes('certificate*', cli_name='certificate') +option: Certificate('certificate*', cli_name='certificate') option: DNParam('issuer?', cli_name='issuer') option: Flag('no_members', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) diff --git a/VERSION.m4 b/VERSION.m4 index cc308f1..9d50ad9 100644 --- a/VERSION.m4 +++ b/VERSION.m4 @@ -73,8 +73,8 @@ define(IPA_DATA_VERSION, 20100614120000) # # ######################################################## define(IPA_API_VERSION_MAJOR, 2) -define(IPA_API_VERSION_MINOR, 228) -# Last change: Expose ipaNTAdditionalSuffixes in trust-mod +define(IPA_API_VERSION_MINOR, 229) +# Last change: Added the Certificate parameter ######################################################## diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py index 860f925..d5daaf3 100644 --- a/ipaclient/plugins/cert.py +++ b/ipaclient/plugins/cert.py @@ -66,8 +66,8 @@ class CertRetrieveOverride(MethodOverride): certs = result['result']['certificate_chain'] else: certs = [result['result']['certificate']] - certs = (x509.load_der_x509_certificate( - x509.ensure_der_format(cert)) for cert in certs) + certs = (x509.load_der_x509_certificate(base64.b64decode(cert)) + for cert in certs) x509.write_certificate_list(certs, certificate_out) return result diff --git a/ipaclient/remote_plugins/schema.py b/ipaclient/remote_plugins/schema.py index 41cd735..efb4790 100644 --- a/ipaclient/remote_plugins/schema.py +++ b/ipaclient/remote_plugins/schema.py @@ -12,6 +12,8 @@ import tempfile import types import zipfile +from cryptography import x509 as crypto_x509 + import six from ipaclient.frontend import ClientCommand, ClientMethod @@ -44,6 +46,7 @@ _TYPES = { 'list': list, 'tuple': tuple, 'unicode': unicode, + 'Certificate': crypto_x509.Certificate, } _PARAMS = { @@ -57,6 +60,7 @@ _PARAMS = { 'dict': parameters.Dict, 'int': parameters.Int, 'str': parameters.Str, + 'Certificate': parameters.Certificate, } diff --git a/ipalib/install/certstore.py b/ipalib/install/certstore.py index 89302f6..b26b8a7 100644 --- a/ipalib/install/certstore.py +++ b/ipalib/install/certstore.py @@ -310,12 +310,11 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, for cert in entry.get('cACertificate;binary', []): try: - cert_obj = x509.load_der_x509_certificate(cert) - _parse_cert(cert_obj) + _parse_cert(cert) except ValueError: certs = [] break - certs.append((cert_obj, nickname, trusted, ext_key_usage)) + certs.append((cert, nickname, trusted, ext_key_usage)) except errors.NotFound: try: ldap.get_entry(container_dn, ['']) @@ -324,8 +323,7 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, dn = DN(('cn', 'CAcert'), config_dn) entry = ldap.get_entry(dn, ['cACertificate;binary']) - cert = x509.load_der_x509_certificate( - entry.single_value['cACertificate;binary']) + cert = entry.single_value['cACertificate;binary'] try: subject, _issuer_serial, _public_key_info = _parse_cert(cert) except ValueError: diff --git a/ipalib/parameters.py b/ipalib/parameters.py index 7f19642..462e6e3 100644 --- a/ipalib/parameters.py +++ b/ipalib/parameters.py @@ -108,15 +108,19 @@ import six # pylint: disable=import-error from six.moves.xmlrpc_client import MAXINT, MININT # pylint: enable=import-error +from cryptography import x509 as crypto_x509 from ipalib.text import _ as ugettext from ipalib.base import check_name from ipalib.plugable import ReadOnly, lock from ipalib.errors import ConversionError, RequirementError, ValidationError -from ipalib.errors import PasswordMismatch, Base64DecodeError +from ipalib.errors import ( + PasswordMismatch, Base64DecodeError, CertificateFormatError +) from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, LDAP_GENERALIZED_TIME_FORMAT from ipalib.text import Gettext, FixMe from ipalib.util import json_serialize, validate_idna_domain +from ipalib.x509 import load_der_x509_certificate, IPACertificate from ipapython import kerberos from ipapython.dn import DN from ipapython.dnsutil import DNSName @@ -1407,6 +1411,42 @@ class Bytes(Data): return super(Bytes, self)._convert_scalar(value) +class Certificate(Param): + type = crypto_x509.Certificate + type_error = _('must be a certificate') + allowed_types = (IPACertificate, bytes, unicode) + + def _convert_scalar(self, value, index=None): + """ + :param value: either DER certificate or base64 encoded certificate + :returns: bytes representing value converted to DER format + """ + if isinstance(value, bytes): + try: + value = value.decode('ascii') + except UnicodeDecodeError: + # value is possibly a DER-encoded certificate + pass + + if isinstance(value, unicode): + # if we received unicodes right away or we got them after the + # decoding, we will now try to receive DER-certificate + try: + value = base64.b64decode(value) + except (TypeError, ValueError) as e: + raise Base64DecodeError(reason=str(e)) + + if isinstance(value, bytes): + # we now only have either bytes or an IPACertificate object + # if it's bytes, make it an IPACertificate object + try: + value = load_der_x509_certificate(value) + except ValueError as e: + raise CertificateFormatError(error=str(e)) + + return super(Certificate, self)._convert_scalar(value) + + class Str(Data): """ A parameter for Unicode text (stored in the ``unicode`` type). diff --git a/ipalib/rpc.py b/ipalib/rpc.py index 8635894..ffa2b92 100644 --- a/ipalib/rpc.py +++ b/ipalib/rpc.py @@ -42,6 +42,7 @@ import json import re import socket import gzip +from cryptography import x509 as crypto_x509 import gssapi from dns import resolver, rdatatype @@ -56,6 +57,7 @@ from ipalib.errors import (public_errors, UnknownError, NetworkError, XMLRPCMarshallError, JSONError) from ipalib import errors, capabilities from ipalib.request import context, Connection +from ipalib.x509 import Encoding as x509_Encoding from ipapython import ipautil from ipapython import session_storage from ipapython.cookie import Cookie @@ -191,6 +193,10 @@ def xml_wrap(value, version): if isinstance(value, Principal): return unicode(value) + if isinstance(value, crypto_x509.Certificate): + return base64.b64encode( + value.public_bytes(x509_Encoding.DER)).encode('ascii') + assert type(value) in (unicode, float, bool, type(None)) + six.integer_types return value @@ -318,6 +324,7 @@ class _JSONPrimer(dict): list: self._enc_list, tuple: self._enc_list, dict: self._enc_dict, + crypto_x509.Certificate: self._enc_certificate, }) # int, long for t in six.integer_types: @@ -384,6 +391,9 @@ class _JSONPrimer(dict): result[k] = v if func is _identity else func(v) return result + def _enc_certificate(self, val): + return self._enc_bytes(val.public_bytes(x509_Encoding.DER)) + def json_encode_binary(val, version, pretty_print=False): """Serialize a Python object structure to JSON diff --git a/ipalib/util.py b/ipalib/util.py index 880d2bc..cb77104 100644 --- a/ipalib/util.py +++ b/ipalib/util.py @@ -129,7 +129,8 @@ def normalize_name(name): def isvalid_base64(data): """ - Validate the incoming data as valid base64 data or not. + Validate the incoming data as valid base64 data or not. This is only + used in the ipalib.Parameters module which expects ``data`` to be unicode. The character set must only include of a-z, A-Z, 0-9, + or / and be padded with = to be a length divisible by 4 (so only 0-2 =s are diff --git a/ipalib/x509.py b/ipalib/x509.py index 87f0eda..7877df3 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -19,8 +19,7 @@ # Certificates should be stored internally DER-encoded. We can be passed # a certificate several ways: read if from LDAP, read it from a 3rd party -# app (dogtag, candlepin, etc) or as user input. The ensure_der_format() -# function will convert an incoming certificate to DER-encoding. +# app (dogtag, candlepin, etc) or as user input. # Conventions # @@ -51,8 +50,6 @@ from pyasn1.codec.der import decoder, encoder from pyasn1_modules import rfc2315, rfc2459 import six -from ipalib import api -from ipalib import util from ipalib import errors from ipapython.dn import DN from ipapython.dnsutil import DNSName @@ -82,6 +79,7 @@ SAN_KRB5PRINCIPALNAME = '1.3.6.1.5.2.2' _subject_base = None def subject_base(): + from ipalib import api global _subject_base if _subject_base is None: @@ -504,40 +502,6 @@ def pkcs7_to_certs(data, datatype=PEM): return result -def ensure_der_format(rawcert): - """ - Incoming certificates should be DER-encoded. If not it is converted to - DER-format. - - Note that this can't be a normalizer on a Param because only unicode - variables are normalized. - """ - if not rawcert: - return None - - try: - if isinstance(rawcert, bytes): - # base64 must work with utf-8, otherwise it is raw bin certificate - decoded_cert = rawcert.decode('utf-8') - else: - decoded_cert = rawcert - except UnicodeDecodeError: - dercert = rawcert - else: - if util.isvalid_base64(decoded_cert): - try: - dercert = base64.b64decode(decoded_cert) - except Exception as e: - raise errors.Base64DecodeError(reason=str(e)) - else: - dercert = rawcert - - # At this point we should have a DER certificate. - # Attempt to decode it. - validate_der_x509_certificate(dercert) - return dercert - - def validate_pem_x509_certificate(cert): """ Perform cert validation by trying to load it via python-cryptography. diff --git a/ipapython/ipaldap.py b/ipapython/ipaldap.py index 6ef6614..6ed2e9a 100644 --- a/ipapython/ipaldap.py +++ b/ipapython/ipaldap.py @@ -34,6 +34,8 @@ import pwd from six.moves.urllib.parse import urlparse # pylint: enable=import-error +from cryptography import x509 as crypto_x509 + import ldap import ldap.sasl import ldap.filter @@ -41,7 +43,7 @@ from ldap.controls import SimplePagedResultsControl import six # pylint: disable=ipa-forbidden-import -from ipalib import errors, _ +from ipalib import errors, x509, _ from ipalib.constants import LDAP_GENERALIZED_TIME_FORMAT # pylint: enable=ipa-forbidden-import from ipapython.ipautil import format_netloc, CIDict @@ -670,6 +672,10 @@ class LDAPClient(object): 'dnszoneidnsname': DNSName, 'krbcanonicalname': Principal, 'krbprincipalname': Principal, + 'usercertificate': crypto_x509.Certificate, + 'usercertificate;binary': crypto_x509.Certificate, + 'cACertificate': crypto_x509.Certificate, + 'cACertificate;binary': crypto_x509.Certificate, 'nsds5replicalastupdatestart': unicode, 'nsds5replicalastupdateend': unicode, 'nsds5replicalastinitstart': unicode, @@ -842,7 +848,7 @@ class LDAPClient(object): def encode(self, val): """ - Encode attribute value to LDAP representation (str). + Encode attribute value to LDAP representation (str/bytes). """ # Booleans are both an instance of bool and int, therefore # test for bool before int otherwise the int clause will be @@ -869,6 +875,8 @@ class LDAPClient(object): return dct elif isinstance(val, datetime.datetime): return val.strftime(LDAP_GENERALIZED_TIME_FORMAT).encode('utf-8') + elif isinstance(val, crypto_x509.Certificate): + return val.public_bytes(x509.Encoding.DER) elif val is None: return None else: @@ -876,7 +884,7 @@ class LDAPClient(object): def decode(self, val, attr): """ - Decode attribute value from LDAP representation (str). + Decode attribute value from LDAP representation (str/bytes). """ if isinstance(val, bytes): target_type = self.get_attribute_type(attr) @@ -892,6 +900,8 @@ class LDAPClient(object): return DNSName.from_text(val.decode('utf-8')) elif target_type in (DN, Principal): return target_type(val.decode('utf-8')) + elif target_type is crypto_x509.Certificate: + return x509.load_der_x509_certificate(val) else: return target_type(val) except Exception: diff --git a/ipaserver/install/dogtaginstance.py b/ipaserver/install/dogtaginstance.py index e638319..6aa3339 100644 --- a/ipaserver/install/dogtaginstance.py +++ b/ipaserver/install/dogtaginstance.py @@ -376,7 +376,7 @@ class DogtagInstance(service.Service): if conn is not None: conn.unbind() - return base64.b64encode(admin_cert) + return admin_cert def handle_setup_error(self, e): logger.critical("Failed to configure %s instance: %s", diff --git a/ipaserver/install/krainstance.py b/ipaserver/install/krainstance.py index 5a780b2..7f82b75 100644 --- a/ipaserver/install/krainstance.py +++ b/ipaserver/install/krainstance.py @@ -22,6 +22,7 @@ import os import pwd import shutil import tempfile +import base64 import six # pylint: disable=import-error @@ -268,13 +269,15 @@ class KRAInstance(DogtagInstance): "https://%s" % ipautil.format_netloc(self.master_host, 443)) else: # the admin cert file is needed for the first instance of KRA - cert = DogtagInstance.get_admin_cert(self) + cert = self.get_admin_cert() # First make sure that the directory exists parentdir = os.path.dirname(paths.ADMIN_CERT_PATH) if not os.path.exists(parentdir): os.makedirs(parentdir) with open(paths.ADMIN_CERT_PATH, "w") as admin_path: - admin_path.write(cert) + admin_path.write( + base64.b64encode(cert.public_bytes(x509.Encoding.DER)) + ) # Generate configuration file with open(cfg_file, "w") as f: diff --git a/ipaserver/plugins/baseuser.py b/ipaserver/plugins/baseuser.py index 8b902aa..ef55858 100644 --- a/ipaserver/plugins/baseuser.py +++ b/ipaserver/plugins/baseuser.py @@ -19,10 +19,10 @@ import six -from ipalib import api, errors, x509 +from ipalib import api, errors from ipalib import ( - Flag, Int, Password, Str, Bool, StrEnum, DateTime, Bytes, DNParam) -from ipalib.parameters import Principal + Flag, Int, Password, Str, Bool, StrEnum, DateTime, DNParam) +from ipalib.parameters import Principal, Certificate from ipalib.plugable import Registry from .baseldap import ( DN, LDAPObject, LDAPCreate, LDAPUpdate, LDAPSearch, LDAPDelete, @@ -30,8 +30,7 @@ from .baseldap import ( LDAPAddMember, LDAPRemoveMember, LDAPAddAttributeViaOption, LDAPRemoveAttributeViaOption, add_missing_object_class) -from ipaserver.plugins.service import ( - validate_certificate, validate_realm, normalize_principal) +from ipaserver.plugins.service import (validate_realm, normalize_principal) from ipalib.request import context from ipalib import _ from ipalib.constants import PATTERN_GROUPUSER_NAME @@ -363,7 +362,7 @@ class baseuser(LDAPObject): + '(\s*,\s*[a-zA-Z]{1,8}(-[a-zA-Z]{1,8})?(;q\=((0(\.[0-9]{0,3})?)|(1(\.0{0,3})?)))?)*)|(\*))$', pattern_errmsg='must match RFC 2068 - 14.4, e.g., "da, en-gb;q=0.8, en;q=0.7"', ), - Bytes('usercertificate*', validate_certificate, + Certificate('usercertificate*', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded user certificate'), @@ -762,8 +761,8 @@ class ModCertMapData(LDAPModAttribute): doc=_('Subject of the certificate'), flags=['virtual_attribute'] ), - Bytes( - 'certificate*', validate_certificate, + Certificate( + 'certificate*', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded user certificate'), @@ -798,8 +797,7 @@ class ModCertMapData(LDAPModAttribute): if issuer or subject: data.append(cls._build_mapdata(subject, issuer)) - for dercert in certificates: - cert = x509.load_der_x509_certificate(dercert) + for cert in certificates: issuer = DN(cert.issuer) subject = DN(cert.subject) if not subject: diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 1b2991b..ad8bea6 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -39,7 +39,9 @@ from ipalib import ngettext from ipalib.constants import IPA_CA_CN from ipalib.crud import Create, PKQuery, Retrieve, Search from ipalib.frontend import Method, Object -from ipalib.parameters import Bytes, DateTime, DNParam, DNSNameParam, Principal +from ipalib.parameters import ( + Bytes, Certificate, DateTime, DNParam, DNSNameParam, Principal +) from ipalib.plugable import Registry from .virtual import VirtualCommand from .baseldap import pkey_to_value @@ -324,10 +326,6 @@ def ca_kdc_check(api_instance, hostname): % dict(hostname=hostname)) -def validate_certificate(value): - return x509.validate_der_x509_certificate(value) - - def bind_principal_can_manage_cert(cert): """Check that the bind principal can manage the given cert. @@ -362,11 +360,10 @@ class BaseCertObject(Object): doc=_('Name of issuing CA'), flags={'no_create', 'no_update', 'no_search'}, ), - Bytes( - 'certificate', validate_certificate, + Certificate( + 'certificate', label=_("Certificate"), doc=_("Base-64 encoded certificate."), - normalizer=x509.ensure_der_format, flags={'no_create', 'no_update', 'no_search'}, ), Bytes( @@ -1438,17 +1435,7 @@ class cert_find(Search, CertMethod): ) def _get_cert_key(self, cert): - try: - cert_obj = x509.load_der_x509_certificate(cert) - except ValueError as e: - message = messages.SearchResultTruncated( - reason=_("failed to load certificate: %s") % e, - ) - self.add_message(message) - - raise - - return (DN(cert_obj.issuer), cert_obj.serial_number) + return (DN(cert.issuer), cert.serial_number) def _cert_search(self, pkey_only, **options): result = collections.OrderedDict() @@ -1458,16 +1445,12 @@ class cert_find(Search, CertMethod): except KeyError: return result, False, False - try: - issuer, serial_number = self._get_cert_key(cert) - except ValueError: - return result, True, True - - obj = {'serial_number': serial_number} + obj = {'serial_number': cert.serial_number} if not pkey_only: - obj['certificate'] = base64.b64encode(cert).decode('ascii') + obj['certificate'] = base64.b64encode( + cert.public_bytes(x509.Encoding.DER)).decode('ascii') - result[issuer, serial_number] = obj + result[self._get_cert_key(cert)] = obj return result, False, True @@ -1570,7 +1553,8 @@ class cert_find(Search, CertMethod): cert = options.get('certificate') if cert is not None: - filter = ldap.make_filter_from_attr('usercertificate', cert) + filter = ldap.make_filter_from_attr( + 'usercertificate', cert.public_bytes(x509.Encoding.DER)) else: filter = '(usercertificate=*)' filters.append(filter) @@ -1598,20 +1582,18 @@ class cert_find(Search, CertMethod): for entry in entries: for attr in ('usercertificate', 'usercertificate;binary'): for cert in entry.get(attr, []): + cert_key = self._get_cert_key(cert) try: - issuer, serial_number = self._get_cert_key(cert) - except ValueError: - truncated = True - continue - - try: - obj = result[issuer, serial_number] + obj = result[cert_key] except KeyError: - obj = {'serial_number': serial_number} + obj = {'serial_number': cert.serial_number} if not pkey_only and all: obj['certificate'] = ( - base64.b64encode(cert).decode('ascii')) - result[issuer, serial_number] = obj + base64.b64encode( + cert.public_bytes(x509.Encoding.DER)) + .decode('ascii')) + + result[cert_key] = obj if not pkey_only and (all or not no_members): owners = obj.setdefault('owner', []) @@ -1695,10 +1677,7 @@ class cert_find(Search, CertMethod): obj['certificate'].replace('\r\n', '')) if 'certificate_chain' in ca_obj: - cert = x509.load_der_x509_certificate( - obj['certificate']) - cert_der = ( - cert.public_bytes(serialization.Encoding.DER)) + cert_der = base64.b64decode(obj['certificate']) obj['certificate_chain'] = ( [cert_der] + ca_obj['certificate_chain']) diff --git a/ipaserver/plugins/certmap.py b/ipaserver/plugins/certmap.py index f26bf67..6b44d37 100644 --- a/ipaserver/plugins/certmap.py +++ b/ipaserver/plugins/certmap.py @@ -23,10 +23,9 @@ import dbus import six from ipalib import api, errors, x509 -from ipalib import Bytes from ipalib.crud import Search from ipalib.frontend import Object -from ipalib.parameters import Bool, DNSNameParam, Flag, Int, Str +from ipalib.parameters import Bool, DNSNameParam, Flag, Int, Str, Certificate from ipalib.plugable import Registry from .baseldap import ( LDAPCreate, @@ -39,7 +38,6 @@ from .baseldap import ( pkey_to_value) from ipalib import _, ngettext from ipalib import output -from ipaserver.plugins.service import validate_certificate if six.PY3: @@ -521,8 +519,8 @@ class certmap_match(Search): if arg.name == 'criteria': continue yield arg - yield Bytes( - 'certificate', validate_certificate, + yield Certificate( + 'certificate', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded user certificate'), diff --git a/ipaserver/plugins/host.py b/ipaserver/plugins/host.py index d5716a2..672966e 100644 --- a/ipaserver/plugins/host.py +++ b/ipaserver/plugins/host.py @@ -27,9 +27,10 @@ import dns.resolver import six from ipalib import api, errors, util +from ipalib.x509 import Encoding as x509_Encoding from ipalib import messages -from ipalib import Str, Flag, Bytes -from ipalib.parameters import Principal +from ipalib import Str, Flag +from ipalib.parameters import Principal, Certificate from ipalib.plugable import Registry from .baseldap import (LDAPQuery, LDAPObject, LDAPCreate, LDAPDelete, LDAPUpdate, LDAPSearch, @@ -40,7 +41,7 @@ from .baseldap import (LDAPQuery, LDAPObject, LDAPCreate, LDAPAddAttributeViaOption, LDAPRemoveAttributeViaOption) from .service import ( - validate_realm, normalize_principal, validate_certificate, + validate_realm, normalize_principal, set_certificate_attrs, ticket_flags_params, update_krbticketflags, set_kerberos_attrs, rename_ipaallowedtoperform_from_ldap, rename_ipaallowedtoperform_to_ldap, revoke_certs) @@ -48,7 +49,6 @@ from .dns import (dns_container_exists, add_records_for_host_validation, add_records_for_host, get_reverse_zone) from ipalib import _, ngettext -from ipalib import x509 from ipalib import output from ipalib.request import context from ipalib.util import (normalize_sshpubkey, validate_sshpubkey_no_options, @@ -485,7 +485,7 @@ class host(LDAPObject): label=_('Random password'), flags=('no_create', 'no_update', 'no_search', 'virtual_attribute'), ), - Bytes('usercertificate*', validate_certificate, + Certificate('usercertificate*', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded host certificate'), @@ -690,9 +690,8 @@ class host_add(LDAPCreate): entropy_bits=TMP_PWD_ENTROPY_BITS) # save the password so it can be displayed in post_callback setattr(context, 'randompassword', entry_attrs['userpassword']) - certs = options.get('usercertificate', []) - certs_der = [x509.ensure_der_format(c) for c in certs] - entry_attrs['usercertificate'] = certs_der + + entry_attrs['usercertificate'] = options.get('usercertificate', []) entry_attrs['managedby'] = dn entry_attrs['objectclass'].append('ieee802device') entry_attrs['objectclass'].append('ipasshhost') @@ -895,7 +894,6 @@ class host_mod(LDAPUpdate): # verify certificates certs = entry_attrs.get('usercertificate') or [] - certs_der = [x509.ensure_der_format(c) for c in certs] # revoke removed certificates ca_is_enabled = self.api.Command.ca_is_enabled()['result'] @@ -905,14 +903,13 @@ class host_mod(LDAPUpdate): except errors.NotFound: self.obj.handle_not_found(*keys) old_certs = entry_attrs_old.get('usercertificate', []) - old_certs_der = [x509.ensure_der_format(c) for c in old_certs] - removed_certs_der = set(old_certs_der) - set(certs_der) + removed_certs_der = set(old_certs) - set(certs) for der in removed_certs_der: rm_certs = api.Command.cert_find(certificate=der)['result'] revoke_certs(rm_certs) if certs: - entry_attrs['usercertificate'] = certs_der + entry_attrs['usercertificate'] = certs if options.get('random'): entry_attrs['userpassword'] = ipa_generate_password( @@ -1344,7 +1341,8 @@ class host_remove_cert(LDAPRemoveAttributeViaOption): assert isinstance(dn, DN) for cert in options.get('usercertificate', []): - revoke_certs(api.Command.cert_find(certificate=cert)['result']) + revoke_certs(api.Command.cert_find( + certificate=cert.public_bytes(x509_Encoding.DER))['result']) return dn diff --git a/ipaserver/plugins/idviews.py b/ipaserver/plugins/idviews.py index 3cc7762..263a35a 100644 --- a/ipaserver/plugins/idviews.py +++ b/ipaserver/plugins/idviews.py @@ -27,8 +27,10 @@ from .baseldap import (LDAPQuery, LDAPObject, LDAPCreate, LDAPRemoveAttributeViaOption, LDAPRetrieve, global_output_params) from .hostgroup import get_complete_hostgroup_member_list -from .service import validate_certificate -from ipalib import api, Str, Int, Bytes, Flag, _, ngettext, errors, output +from ipalib import ( + api, Str, Int, Flag, _, ngettext, errors, output +) +from ipalib.parameters import Certificate from ipalib.constants import ( IPA_ANCHOR_PREFIX, SID_ANCHOR_PREFIX, @@ -922,7 +924,7 @@ class idoverrideuser(baseidoverride): normalizer=normalize_sshpubkey, flags=['no_search'], ), - Bytes('usercertificate*', validate_certificate, + Certificate('usercertificate*', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded user certificate'), diff --git a/ipaserver/plugins/service.py b/ipaserver/plugins/service.py index a373a9e..0ec71ce 100644 --- a/ipaserver/plugins/service.py +++ b/ipaserver/plugins/service.py @@ -25,8 +25,8 @@ from cryptography.hazmat.primitives import hashes import six from ipalib import api, errors, messages -from ipalib import Bytes, StrEnum, Bool, Str, Flag -from ipalib.parameters import Principal +from ipalib import StrEnum, Bool, Str, Flag +from ipalib.parameters import Principal, Certificate from ipalib.plugable import Registry from .baseldap import ( host_is_master, @@ -215,13 +215,6 @@ def normalize_principal(value): return unicode(principal) -def validate_certificate(ugettext, cert): - """ - Check whether the certificate is properly encoded to DER - """ - if api.env.in_server: - x509.validate_der_x509_certificate(cert) - def revoke_certs(certs): """ @@ -269,8 +262,6 @@ def set_certificate_attrs(entry_attrs): cert = entry_attrs['usercertificate'][0] else: cert = entry_attrs['usercertificate'] - cert = x509.ensure_der_format(cert) - cert = x509.load_der_x509_certificate(cert) entry_attrs['subject'] = unicode(DN(cert.subject)) entry_attrs['serial_number'] = unicode(cert.serial_number) entry_attrs['serial_number_hex'] = u'0x%X' % cert.serial_number @@ -478,7 +469,7 @@ class service(LDAPObject): require_service=True, flags={'no_create'} ), - Bytes('usercertificate*', validate_certificate, + Certificate('usercertificate*', cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded service certificate'), @@ -632,9 +623,7 @@ class service_add(LDAPCreate): self.obj.validate_ipakrbauthzdata(entry_attrs) - certs = options.get('usercertificate', []) - certs_der = [x509.ensure_der_format(c) for c in certs] - entry_attrs['usercertificate'] = certs_der + entry_attrs['usercertificate'] = options.get('usercertificate', []) if not options.get('force', False): # We know the host exists if we've gotten this far but we @@ -705,7 +694,6 @@ class service_mod(LDAPUpdate): # verify certificates certs = entry_attrs.get('usercertificate') or [] - certs_der = [x509.ensure_der_format(c) for c in certs] # revoke removed certificates ca_is_enabled = self.api.Command.ca_is_enabled()['result'] if 'usercertificate' in options and ca_is_enabled: @@ -714,14 +702,14 @@ class service_mod(LDAPUpdate): except errors.NotFound: self.obj.handle_not_found(*keys) old_certs = entry_attrs_old.get('usercertificate', []) - old_certs_der = [x509.ensure_der_format(c) for c in old_certs] - removed_certs_der = set(old_certs_der) - set(certs_der) - for der in removed_certs_der: - rm_certs = api.Command.cert_find(certificate=der)['result'] + removed_certs = set(old_certs) - set(certs) + for cert in removed_certs: + rm_certs = api.Command.cert_find( + certificate=cert.public_bytes(x509.Encoding.DER))['result'] revoke_certs(rm_certs) if certs: - entry_attrs['usercertificate'] = certs_der + entry_attrs['usercertificate'] = certs update_krbticketflags(ldap, entry_attrs, attrs_list, options, True) @@ -997,7 +985,8 @@ class service_remove_cert(LDAPRemoveAttributeViaOption): assert isinstance(dn, DN) for cert in options.get('usercertificate', []): - revoke_certs(api.Command.cert_find(certificate=cert)['result']) + revoke_certs(api.Command.cert_find( + certificate=cert.public_bytes(x509.Encoding.DER))['result']) return dn diff --git a/ipatests/test_ipaserver/test_ldap.py b/ipatests/test_ipaserver/test_ldap.py index 88638de..a0d2a45 100644 --- a/ipatests/test_ipaserver/test_ldap.py +++ b/ipatests/test_ipaserver/test_ldap.py @@ -35,7 +35,7 @@ import six from ipaplatform.paths import paths from ipaserver.plugins.ldap2 import ldap2, AUTOBIND_DISABLED -from ipalib import api, x509, create_api, errors +from ipalib import api, create_api, errors from ipapython import ipautil from ipapython.dn import DN @@ -77,8 +77,7 @@ class test_ldap(object): self.conn = ldap2(api) self.conn.connect(autobind=AUTOBIND_DISABLED) entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = x509.load_der_x509_certificate(cert[0]) + cert = entry_attrs.get('usercertificate')[0] assert cert.serial_number is not None def test_simple(self): @@ -94,8 +93,7 @@ class test_ldap(object): self.conn = ldap2(api) self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password) entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = x509.load_der_x509_certificate(cert[0]) + cert = entry_attrs.get('usercertificate')[0] assert cert.serial_number is not None def test_Backend(self): @@ -120,8 +118,7 @@ class test_ldap(object): result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,)) entry_attrs = result['result'] - cert = entry_attrs.get('usercertificate') - cert = x509.load_der_x509_certificate(cert[0]) + cert = entry_attrs.get('usercertificate')[0] assert cert.serial_number is not None def test_autobind(self): @@ -134,8 +131,7 @@ class test_ldap(object): except errors.ACIError: raise nose.SkipTest("Only executed as root") entry_attrs = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = x509.load_der_x509_certificate(cert[0]) + cert = entry_attrs.get('usercertificate')[0] assert cert.serial_number is not None