From d7f3a0b2d31fe3a02a19286d590cf1a343458fd8 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale Date: Jun 30 2020 14:18:21 +0000 Subject: ra.get_certificate: use REST API Update ra.get_certificate to use the Dogtag REST API. This change is being done as part of the Dogtag GSS-API authentication effort because the servlet-based method expects an internal Dogtag user. It is less intrusive to just change FreeIPA to call the REST API instead (which is also part of an existing ticket). Depends on https://pagure.io/dogtagpki/issue/2601 (which was merged and released long ago). Part of: https://pagure.io/freeipa/issue/3473 Part of: https://pagure.io/freeipa/issue/5011 Reviewed-By: Christian Heimes --- diff --git a/ipalib/x509.py b/ipalib/x509.py index 1f612a3..0ee710f 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -63,9 +63,13 @@ if six.PY3: PEM = 0 DER = 1 +# The first group is the whole PEM datum and the second group is +# the base64 content (with newlines). For findall() the result is +# a list of 2-tuples of the PEM and base64 data. PEM_CERT_REGEX = re.compile( - b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', + b'(-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----)', re.DOTALL) + PEM_PRIV_REGEX = re.compile( b'-----BEGIN(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----.*?' b'-----END(?: ENCRYPTED)?(?: (?:RSA|DSA|DH|EC))? PRIVATE KEY-----', @@ -447,7 +451,7 @@ def load_certificate_list(data): Return a list of python-cryptography ``Certificate`` objects. """ certs = PEM_CERT_REGEX.findall(data) - return [load_pem_x509_certificate(cert) for cert in certs] + return [load_pem_x509_certificate(cert[0]) for cert in certs] def load_certificate_list_from_file(filename): diff --git a/ipaserver/plugins/dogtag.py b/ipaserver/plugins/dogtag.py index 2246b26..c5d8aeb 100644 --- a/ipaserver/plugins/dogtag.py +++ b/ipaserver/plugins/dogtag.py @@ -250,7 +250,7 @@ import contextlib import six -from ipalib import Backend, api +from ipalib import Backend, api, x509 from ipapython.dn import DN import ipapython.cookie from ipapython import dogtag, ipautil, certdb @@ -686,118 +686,6 @@ def parse_check_request_result_xml(doc): return response -def parse_display_cert_xml(doc): - ''' - :param doc: The root node of the xml document to parse - :returns: result dict - :except ValueError: - - After parsing the results are returned in a result dict. The following - table illustrates the mapping from the CMS data item to what may be found in - the result dict. If a CMS data item is absent it will also be absent in the - result dict. - - If the requestStatus is not SUCCESS then the response dict will have the - contents described in `parse_error_template_xml`. - - +----------------+---------------+-----------------+---------------+ - |cms name |cms type |result name |result type | - +================+===============+=================+===============+ - |emailCert |Boolean |email_cert |bool | - +----------------+---------------+-----------------+---------------+ - |noCertImport |Boolean |no_cert_import |bool | - +----------------+---------------+-----------------+---------------+ - |revocationReason|int |revocation_reason|int [1]_ | - +----------------+---------------+-----------------+---------------+ - |certPrettyPrint |string |cert_pretty |unicode | - +----------------+---------------+-----------------+---------------+ - |authorityid |string |authority |unicode | - +----------------+---------------+-----------------+---------------+ - |certFingerprint |string |fingerprint |unicode | - +----------------+---------------+-----------------+---------------+ - |certChainBase64 |string |certificate |unicode [2]_ | - +----------------+---------------+-----------------+---------------+ - |serialNumber |string |serial_number |int|long | - +----------------+---------------+-----------------+---------------+ - |pkcs7ChainBase64|string |pkcs7_chain |unicode [2]_ | - +----------------+---------------+-----------------+---------------+ - - .. [1] revocation reason may be one of: - - - 0 = UNSPECIFIED - - 1 = KEY_COMPROMISE - - 2 = CA_COMPROMISE - - 3 = AFFILIATION_CHANGED - - 4 = SUPERSEDED - - 5 = CESSATION_OF_OPERATION - - 6 = CERTIFICATE_HOLD - - 8 = REMOVE_FROM_CRL - - 9 = PRIVILEGE_WITHDRAWN - - 10 = AA_COMPROMISE - - .. [2] Base64 encoded - - ''' - - request_status = get_request_status_xml(doc) - - if request_status != CMS_STATUS_SUCCESS: - response = parse_error_template_xml(doc) - return response - - response = {} - response['request_status'] = request_status - - email_cert = doc.xpath('//xml/header/emailCert[1]') - if len(email_cert) == 1: - parse_and_set_boolean_xml(email_cert[0], response, 'email_cert') - - no_cert_import = doc.xpath('//xml/header/noCertImport[1]') - if len(no_cert_import) == 1: - parse_and_set_boolean_xml(no_cert_import[0], response, 'no_cert_import') - - revocation_reason = doc.xpath('//xml/header/revocationReason[1]') - if len(revocation_reason) == 1: - revocation_reason = int(revocation_reason[0].text) - response['revocation_reason'] = revocation_reason - - cert_pretty = doc.xpath('//xml/header/certPrettyPrint[1]') - if len(cert_pretty) == 1: - cert_pretty = etree.tostring(cert_pretty[0], method='text', - encoding=unicode).strip() - response['cert_pretty'] = cert_pretty - - authority = doc.xpath('//xml/header/authorityid[1]') - if len(authority) == 1: - authority = etree.tostring(authority[0], method='text', - encoding=unicode).strip() - response['authority'] = authority - - fingerprint = doc.xpath('//xml/header/certFingerprint[1]') - if len(fingerprint) == 1: - fingerprint = etree.tostring(fingerprint[0], method='text', - encoding=unicode).strip() - response['fingerprint'] = fingerprint - - certificate = doc.xpath('//xml/header/certChainBase64[1]') - if len(certificate) == 1: - certificate = etree.tostring(certificate[0], method='text', - encoding=unicode).strip() - response['certificate'] = certificate - - serial_number = doc.xpath('//xml/header/serialNumber[1]') - if len(serial_number) == 1: - serial_number = int(serial_number[0].text, 16) # parse as hex - response['serial_number'] = serial_number - response['serial_number_hex'] = u'0x%X' % serial_number - - pkcs7_chain = doc.xpath('//xml/header/pkcs7ChainBase64[1]') - if len(pkcs7_chain) == 1: - pkcs7_chain = etree.tostring(pkcs7_chain[0], method='text', - encoding=unicode).strip() - response['pkcs7_chain'] = pkcs7_chain - - return response def parse_revoke_cert_xml(doc): ''' @@ -1532,13 +1420,10 @@ class ra(rabase.rabase, RestClient): """ Retrieve an existing certificate. - :param serial_number: Certificate serial number. Must be a string value - because serial numbers may be of any magnitude and - XMLRPC cannot handle integers larger than 64-bit. - The string value should be decimal, but may optionally - be prefixed with a hex radix prefix if the integral value - is represented as hexadecimal. If no radix prefix is - supplied the string will be interpreted as decimal. + :param serial_number: Certificate serial number. May be int, + decimal string, or hex string with "0x" + prefix. + The command returns a dict with these possible key/value pairs. Some key/value pairs may be absent. @@ -1575,49 +1460,44 @@ class ra(rabase.rabase, RestClient): """ logger.debug('%s.get_certificate()', type(self).__name__) - # Convert serial number to integral type from string to properly handle - # radix issues. Note: the int object constructor will properly handle large - # magnitude integral values by returning a Python long type when necessary. - serial_number = int(serial_number, 0) - # Call CMS - http_status, _http_headers, http_body = ( - self._sslget('/ca/agent/ca/displayBySerial', - self.env.ca_agent_port, - serialNumber=str(serial_number), - xml='true') + path = 'certs/{}'.format(serial_number) + _http_status, _http_headers, http_body = self._ssldo( + 'GET', path, use_session=False, + headers={ + 'Accept': 'application/json', + }, ) - - # Parse and handle errors - if http_status != 200: - self.raise_certificate_operation_error('get_certificate', - detail=http_status) - - parse_result = self.get_parse_result_xml(http_body, parse_display_cert_xml) - request_status = parse_result['request_status'] - if request_status != CMS_STATUS_SUCCESS: - self.raise_certificate_operation_error('get_certificate', - cms_request_status_to_string(request_status), - parse_result.get('error_string')) + try: + resp = json.loads(ipautil.decode_json(http_body)) + except ValueError: + raise errors.RemoteRetrieveError( + reason=_("Response from CA was not valid JSON")) # Return command result cmd_result = {} - if 'certificate' in parse_result: - cmd_result['certificate'] = parse_result['certificate'] + if 'Encoded' in resp: + s = resp['Encoded'] + # The 'cert' plugin expects the result to be base64-encoded + # X.509 DER. We expect the result to be PEM. We have to + # strip the PEM headers and we use PEM_CERT_REGEX to do it. + match = x509.PEM_CERT_REGEX.search(s.encode('utf-8')) + if match: + s = match.group(2).decode('utf-8') + cmd_result['certificate'] = s.strip() - if 'serial_number' in parse_result: - # see module documentation concerning serial numbers and XMLRPC - cmd_result['serial_number'] = unicode(parse_result['serial_number']) - cmd_result['serial_number_hex'] = u'0x%X' % int(cmd_result['serial_number']) + if 'id' in resp: + serial = int(resp['id'], 0) + cmd_result['serial_number'] = unicode(serial) + cmd_result['serial_number_hex'] = u'0x%X' % serial - if 'revocation_reason' in parse_result: - cmd_result['revocation_reason'] = parse_result['revocation_reason'] + if 'RevocationReason' in resp: + cmd_result['revocation_reason'] = resp['RevocationReason'] return cmd_result - def request_certificate( self, csr, profile_id, ca_id, request_type='pkcs10'): """