From a26cf0d7910dd4c0a4da08682b4be8d3d94ba520 Mon Sep 17 00:00:00 2001 From: Ben Lipton Date: Jan 31 2017 09:20:28 +0000 Subject: tests: Add tests for CSR autogeneration This patch also contains some code changes to make the code easier to test and to make the tests pass. https://fedorahosted.org/freeipa/ticket/4899 Reviewed-By: Jan Cholasta --- diff --git a/ipaclient/csrgen.py b/ipaclient/csrgen.py index 0b9472f..96100ae 100644 --- a/ipaclient/csrgen.py +++ b/ipaclient/csrgen.py @@ -13,7 +13,6 @@ import jinja2.ext import jinja2.sandbox import six -from ipalib import api from ipalib import errors from ipalib.text import _ from ipaplatform.paths import paths @@ -83,6 +82,11 @@ class Formatter(object): self.passthrough_globals = {} def _define_passthrough(self, call): + """Some macros are meant to be interpreted during the final render, not + when data rules are interpolated into syntax rules. This method allows + those macros to be registered so that calls to them are passed through + to the prepared rule rather than interpreted. + """ def passthrough(caller): return u'{%% call %s() %%}%s{%% endcall %%}' % (call, caller()) @@ -104,18 +108,20 @@ class Formatter(object): :returns: jinja2.Template that can be rendered to produce the CSR data. """ syntax_rules = [] - for description, syntax_rule, data_rules in rules: + for field_mapping in rules: data_rules_prepared = [ - self._prepare_data_rule(rule) for rule in data_rules] + self._prepare_data_rule(rule) + for rule in field_mapping.data_rules] data_sources = [] - for rule in data_rules: + for rule in field_mapping.data_rules: data_source = rule.options.get('data_source') if data_source: data_sources.append(data_source) syntax_rules.append(self._prepare_syntax_rule( - syntax_rule, data_rules_prepared, description, data_sources)) + field_mapping.syntax_rule, data_rules_prepared, + field_mapping.description, data_sources)) template_params = self._get_template_params(syntax_rules) base_template = self.jinja2.get_template( @@ -160,16 +166,19 @@ class Formatter(object): syntax_rule.template, globals=self.passthrough_globals) is_required = syntax_rule.options.get('required', False) try: - rendered = template.render(datarules=data_rules) + prepared_template = template.render(datarules=data_rules) except jinja2.UndefinedError: logger.debug(traceback.format_exc()) raise errors.CSRTemplateError(reason=_( 'Template error when formatting certificate data')) - combinator = ' %s ' % syntax_rule.options.get( - 'data_source_combinator', 'or') - condition = combinator.join(data_sources) - prepared_template = self._wrap_conditional(rendered, condition) + if data_sources: + combinator = ' %s ' % syntax_rule.options.get( + 'data_source_combinator', 'or') + condition = combinator.join(data_sources) + prepared_template = self._wrap_conditional( + prepared_template, condition) + if is_required: prepared_template = self._wrap_required( prepared_template, description) @@ -198,8 +207,8 @@ class OpenSSLFormatter(Formatter): SyntaxRule = collections.namedtuple( 'SyntaxRule', ['template', 'is_extension']) - def __init__(self): - super(OpenSSLFormatter, self).__init__() + def __init__(self, *args, **kwargs): + super(OpenSSLFormatter, self).__init__(*args, **kwargs) self._define_passthrough('openssl.section') def _get_template_params(self, syntax_rules): @@ -226,17 +235,31 @@ class CertutilFormatter(Formatter): return {'options': syntax_rules} -# FieldMapping - representation of the rules needed to construct a complete -# certificate field. -# - description: str, a name or description of this field, to be used in -# messages -# - syntax_rule: Rule, the rule defining the syntax of this field -# - data_rules: list of Rule, the rules that produce data to be stored in this -# field -FieldMapping = collections.namedtuple( - 'FieldMapping', ['description', 'syntax_rule', 'data_rules']) -Rule = collections.namedtuple( - 'Rule', ['name', 'template', 'options']) +class FieldMapping(object): + """Representation of the rules needed to construct a complete cert field. + + Attributes: + description: str, a name or description of this field, to be used in + messages + syntax_rule: Rule, the rule defining the syntax of this field + data_rules: list of Rule, the rules that produce data to be stored in + this field + """ + __slots__ = ['description', 'syntax_rule', 'data_rules'] + + def __init__(self, description, syntax_rule, data_rules): + self.description = description + self.syntax_rule = syntax_rule + self.data_rules = data_rules + + +class Rule(object): + __slots__ = ['name', 'template', 'options'] + + def __init__(self, name, template, options): + self.name = name + self.template = template + self.options = options class RuleProvider(object): @@ -287,15 +310,22 @@ class FileRuleProvider(RuleProvider): options.update(ruleset['options']) if 'options' in rule: options.update(rule['options']) + self.rules[(rule_name, helper)] = Rule( rule_name, rule['template'], options) + return self.rules[(rule_name, helper)] def rules_for_profile(self, profile_id, helper): profile_path = os.path.join(self.csr_data_dir, 'profiles', '%s.json' % profile_id) - with open(profile_path) as profile_file: - profile = json.load(profile_file) + try: + with open(profile_path) as profile_file: + profile = json.load(profile_file) + except IOError: + raise errors.NotFound( + reason=_('No CSR generation rules are defined for profile' + ' %(profile_id)s') % {'profile_id': profile_id}) field_mappings = [] for field in profile: @@ -315,8 +345,7 @@ class CSRGenerator(object): def __init__(self, rule_provider): self.rule_provider = rule_provider - def csr_script(self, principal, profile_id, helper): - config = api.Command.config_show()['result'] + def csr_script(self, principal, config, profile_id, helper): render_data = {'subject': principal, 'config': config} formatter = self.FORMATTERS[helper]() diff --git a/ipaclient/plugins/csrgen.py b/ipaclient/plugins/csrgen.py index 0ad5fa1..0669a47 100644 --- a/ipaclient/plugins/csrgen.py +++ b/ipaclient/plugins/csrgen.py @@ -96,11 +96,12 @@ class cert_get_requestdata(Local): raise errors.NotFound( reason=_("The principal for this request doesn't exist.")) principal_obj = principal_obj['result'] + config = api.Command.config_show()['result'] generator = CSRGenerator(FileRuleProvider()) script = generator.csr_script( - principal_obj, profile_id, helper) + principal_obj, config, profile_id, helper) result = {} if 'out' in options: diff --git a/ipatests/setup.py b/ipatests/setup.py index 1fb5e92..e46e922 100644 --- a/ipatests/setup.py +++ b/ipatests/setup.py @@ -38,6 +38,7 @@ if __name__ == '__main__': "ipatests.test_cmdline", "ipatests.test_install", "ipatests.test_integration", + "ipatests.test_ipaclient", "ipatests.test_ipalib", "ipatests.test_ipapython", "ipatests.test_ipaserver", @@ -51,6 +52,7 @@ if __name__ == '__main__': package_data={ 'ipatests.test_install': ['*.update'], 'ipatests.test_integration': ['scripts/*'], + 'ipatests.test_ipaclient': ['data/*/*/*'], 'ipatests.test_ipalib': ['data/*'], 'ipatests.test_pkcs10': ['*.csr'], "ipatests.test_ipaserver": ['data/*'], diff --git a/ipatests/test_ipaclient/__init__.py b/ipatests/test_ipaclient/__init__.py new file mode 100644 index 0000000..0c42891 --- /dev/null +++ b/ipatests/test_ipaclient/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# + +""" +Sub-package containing unit tests for `ipaclient` package. +""" diff --git a/ipatests/test_ipaclient/data/test_csrgen/profiles/profile.json b/ipatests/test_ipaclient/data/test_csrgen/profiles/profile.json new file mode 100644 index 0000000..676f91b --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/profiles/profile.json @@ -0,0 +1,8 @@ +[ + { + "syntax": "basic", + "data": [ + "options" + ] + } +] diff --git a/ipatests/test_ipaclient/data/test_csrgen/rules/basic.json b/ipatests/test_ipaclient/data/test_csrgen/rules/basic.json new file mode 100644 index 0000000..feba3e9 --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/rules/basic.json @@ -0,0 +1,12 @@ +{ + "rules": [ + { + "helper": "openssl", + "template": "openssl_rule" + }, + { + "helper": "certutil", + "template": "certutil_rule" + } + ] +} diff --git a/ipatests/test_ipaclient/data/test_csrgen/rules/options.json b/ipatests/test_ipaclient/data/test_csrgen/rules/options.json new file mode 100644 index 0000000..111a6d8 --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/rules/options.json @@ -0,0 +1,18 @@ +{ + "rules": [ + { + "helper": "openssl", + "template": "openssl_rule", + "options": { + "helper_option": true + } + }, + { + "helper": "certutil", + "template": "certutil_rule" + } + ], + "options": { + "global_option": true + } +} diff --git a/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_certutil.sh b/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_certutil.sh new file mode 100644 index 0000000..74a704c --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_certutil.sh @@ -0,0 +1,11 @@ +#!/bin/bash -e + +if [[ $# -lt 1 ]]; then +echo "Usage: $0 [ ]" +echo "Called as: $0 $@" +exit 1 +fi + +CSR="$1" +shift +certutil -R -a -z <(head -c 4096 /dev/urandom) -o "$CSR" -s CN=machine.example.com,O=DOMAIN.EXAMPLE.COM --extSAN dns:machine.example.com "$@" diff --git a/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_openssl.sh b/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_openssl.sh new file mode 100644 index 0000000..c621a69 --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/scripts/caIPAserviceCert_openssl.sh @@ -0,0 +1,33 @@ +#!/bin/bash -e + +if [[ $# -ne 2 ]]; then +echo "Usage: $0 " +echo "Called as: $0 $@" +exit 1 +fi + +CONFIG="$(mktemp)" +CSR="$1" +shift + +echo \ +'[ req ] +prompt = no +encrypt_key = no + +distinguished_name = sec0 +req_extensions = sec2 + +[ sec0 ] +O=DOMAIN.EXAMPLE.COM +CN=machine.example.com + +[ sec1 ] +DNS = machine.example.com + +[ sec2 ] +subjectAltName = @sec1 +' > "$CONFIG" + +openssl req -new -config "$CONFIG" -out "$CSR" -key $1 +rm "$CONFIG" diff --git a/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_certutil.sh b/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_certutil.sh new file mode 100644 index 0000000..4aaeda0 --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_certutil.sh @@ -0,0 +1,11 @@ +#!/bin/bash -e + +if [[ $# -lt 1 ]]; then +echo "Usage: $0 [ ]" +echo "Called as: $0 $@" +exit 1 +fi + +CSR="$1" +shift +certutil -R -a -z <(head -c 4096 /dev/urandom) -o "$CSR" -s CN=testuser,O=DOMAIN.EXAMPLE.COM --extSAN email:testuser@example.com "$@" diff --git a/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_openssl.sh b/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_openssl.sh new file mode 100644 index 0000000..cdbe8a1 --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/scripts/userCert_openssl.sh @@ -0,0 +1,33 @@ +#!/bin/bash -e + +if [[ $# -ne 2 ]]; then +echo "Usage: $0 " +echo "Called as: $0 $@" +exit 1 +fi + +CONFIG="$(mktemp)" +CSR="$1" +shift + +echo \ +'[ req ] +prompt = no +encrypt_key = no + +distinguished_name = sec0 +req_extensions = sec2 + +[ sec0 ] +O=DOMAIN.EXAMPLE.COM +CN=testuser + +[ sec1 ] +email = testuser@example.com + +[ sec2 ] +subjectAltName = @sec1 +' > "$CONFIG" + +openssl req -new -config "$CONFIG" -out "$CSR" -key $1 +rm "$CONFIG" diff --git a/ipatests/test_ipaclient/data/test_csrgen/templates/identity_base.tmpl b/ipatests/test_ipaclient/data/test_csrgen/templates/identity_base.tmpl new file mode 100644 index 0000000..79111ab --- /dev/null +++ b/ipatests/test_ipaclient/data/test_csrgen/templates/identity_base.tmpl @@ -0,0 +1 @@ +{{ options|join(";") }} diff --git a/ipatests/test_ipaclient/test_csrgen.py b/ipatests/test_ipaclient/test_csrgen.py new file mode 100644 index 0000000..556f8e0 --- /dev/null +++ b/ipatests/test_ipaclient/test_csrgen.py @@ -0,0 +1,298 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# + +import os +import pytest + +from ipaclient import csrgen +from ipalib import errors + +BASE_DIR = os.path.dirname(__file__) +CSR_DATA_DIR = os.path.join(BASE_DIR, 'data', 'test_csrgen') + + +@pytest.fixture +def formatter(): + return csrgen.Formatter(csr_data_dir=CSR_DATA_DIR) + + +@pytest.fixture +def rule_provider(): + return csrgen.FileRuleProvider(csr_data_dir=CSR_DATA_DIR) + + +@pytest.fixture +def generator(): + return csrgen.CSRGenerator(csrgen.FileRuleProvider()) + + +class StubRuleProvider(csrgen.RuleProvider): + def __init__(self): + self.syntax_rule = csrgen.Rule( + 'syntax', '{{datarules|join(",")}}', {}) + self.data_rule = csrgen.Rule('data', 'data_template', {}) + self.field_mapping = csrgen.FieldMapping( + 'example', self.syntax_rule, [self.data_rule]) + self.rules = [self.field_mapping] + + def rules_for_profile(self, profile_id, helper): + return self.rules + + +class IdentityFormatter(csrgen.Formatter): + base_template_name = 'identity_base.tmpl' + + def __init__(self): + super(IdentityFormatter, self).__init__(csr_data_dir=CSR_DATA_DIR) + + def _get_template_params(self, syntax_rules): + return {'options': syntax_rules} + + +class IdentityCSRGenerator(csrgen.CSRGenerator): + FORMATTERS = {'identity': IdentityFormatter} + + +class test_Formatter(object): + def test_prepare_data_rule_with_data_source(self, formatter): + data_rule = csrgen.Rule('uid', '{{subject.uid.0}}', + {'data_source': 'subject.uid.0'}) + prepared = formatter._prepare_data_rule(data_rule) + assert prepared == '{% if subject.uid.0 %}{{subject.uid.0}}{% endif %}' + + def test_prepare_data_rule_no_data_source(self, formatter): + """Not a normal case, but we should handle it anyway""" + data_rule = csrgen.Rule('uid', 'static_text', {}) + prepared = formatter._prepare_data_rule(data_rule) + assert prepared == 'static_text' + + def test_prepare_syntax_rule_with_data_sources(self, formatter): + syntax_rule = csrgen.Rule( + 'example', '{{datarules|join(",")}}', {}) + data_rules = ['{{subject.field1}}', '{{subject.field2}}'] + data_sources = ['subject.field1', 'subject.field2'] + prepared = formatter._prepare_syntax_rule( + syntax_rule, data_rules, 'example', data_sources) + + assert prepared == ( + '{% if subject.field1 or subject.field2 %}{{subject.field1}},' + '{{subject.field2}}{% endif %}') + + def test_prepare_syntax_rule_with_combinator(self, formatter): + syntax_rule = csrgen.Rule('example', '{{datarules|join(",")}}', + {'data_source_combinator': 'and'}) + data_rules = ['{{subject.field1}}', '{{subject.field2}}'] + data_sources = ['subject.field1', 'subject.field2'] + prepared = formatter._prepare_syntax_rule( + syntax_rule, data_rules, 'example', data_sources) + + assert prepared == ( + '{% if subject.field1 and subject.field2 %}{{subject.field1}},' + '{{subject.field2}}{% endif %}') + + def test_prepare_syntax_rule_required(self, formatter): + syntax_rule = csrgen.Rule('example', '{{datarules|join(",")}}', + {'required': True}) + data_rules = ['{{subject.field1}}'] + data_sources = ['subject.field1'] + prepared = formatter._prepare_syntax_rule( + syntax_rule, data_rules, 'example', data_sources) + + assert prepared == ( + '{% filter required("example") %}{% if subject.field1 %}' + '{{subject.field1}}{% endif %}{% endfilter %}') + + def test_prepare_syntax_rule_passthrough(self, formatter): + """ + Calls to macros defined as passthrough are still call tags in the final + template. + """ + formatter._define_passthrough('example.macro') + + syntax_rule = csrgen.Rule( + 'example', + '{% call example.macro() %}{{datarules|join(",")}}{% endcall %}', + {}) + data_rules = ['{{subject.field1}}'] + data_sources = ['subject.field1'] + prepared = formatter._prepare_syntax_rule( + syntax_rule, data_rules, 'example', data_sources) + + assert prepared == ( + '{% if subject.field1 %}{% call example.macro() %}' + '{{subject.field1}}{% endcall %}{% endif %}') + + def test_prepare_syntax_rule_no_data_sources(self, formatter): + """Not a normal case, but we should handle it anyway""" + syntax_rule = csrgen.Rule( + 'example', '{{datarules|join(",")}}', {}) + data_rules = ['rule1', 'rule2'] + data_sources = [] + prepared = formatter._prepare_syntax_rule( + syntax_rule, data_rules, 'example', data_sources) + + assert prepared == 'rule1,rule2' + + +class test_FileRuleProvider(object): + def test_rule_basic(self, rule_provider): + rule_name = 'basic' + + rule1 = rule_provider._rule(rule_name, 'openssl') + rule2 = rule_provider._rule(rule_name, 'certutil') + + assert rule1.template == 'openssl_rule' + assert rule2.template == 'certutil_rule' + + def test_rule_global_options(self, rule_provider): + rule_name = 'options' + + rule1 = rule_provider._rule(rule_name, 'openssl') + rule2 = rule_provider._rule(rule_name, 'certutil') + + assert rule1.options['global_option'] is True + assert rule2.options['global_option'] is True + + def test_rule_helper_options(self, rule_provider): + rule_name = 'options' + + rule1 = rule_provider._rule(rule_name, 'openssl') + rule2 = rule_provider._rule(rule_name, 'certutil') + + assert rule1.options['helper_option'] is True + assert 'helper_option' not in rule2.options + + def test_rule_nosuchrule(self, rule_provider): + with pytest.raises(errors.NotFound): + rule_provider._rule('nosuchrule', 'openssl') + + def test_rule_nosuchhelper(self, rule_provider): + with pytest.raises(errors.EmptyResult): + rule_provider._rule('basic', 'nosuchhelper') + + def test_rules_for_profile_success(self, rule_provider): + rules = rule_provider.rules_for_profile('profile', 'certutil') + + assert len(rules) == 1 + field_mapping = rules[0] + assert field_mapping.syntax_rule.name == 'basic' + assert len(field_mapping.data_rules) == 1 + assert field_mapping.data_rules[0].name == 'options' + + def test_rules_for_profile_nosuchprofile(self, rule_provider): + with pytest.raises(errors.NotFound): + rule_provider.rules_for_profile('nosuchprofile', 'certutil') + + +class test_CSRGenerator(object): + def test_userCert_OpenSSL(self, generator): + principal = { + 'uid': ['testuser'], + 'mail': ['testuser@example.com'], + } + config = { + 'ipacertificatesubjectbase': [ + 'O=DOMAIN.EXAMPLE.COM' + ], + } + + script = generator.csr_script(principal, config, 'userCert', 'openssl') + with open(os.path.join( + CSR_DATA_DIR, 'scripts', 'userCert_openssl.sh')) as f: + expected_script = f.read() + assert script == expected_script + + def test_userCert_Certutil(self, generator): + principal = { + 'uid': ['testuser'], + 'mail': ['testuser@example.com'], + } + config = { + 'ipacertificatesubjectbase': [ + 'O=DOMAIN.EXAMPLE.COM' + ], + } + + script = generator.csr_script( + principal, config, 'userCert', 'certutil') + + with open(os.path.join( + CSR_DATA_DIR, 'scripts', 'userCert_certutil.sh')) as f: + expected_script = f.read() + assert script == expected_script + + def test_caIPAserviceCert_OpenSSL(self, generator): + principal = { + 'krbprincipalname': [ + 'HTTP/machine.example.com@DOMAIN.EXAMPLE.COM' + ], + } + config = { + 'ipacertificatesubjectbase': [ + 'O=DOMAIN.EXAMPLE.COM' + ], + } + + script = generator.csr_script( + principal, config, 'caIPAserviceCert', 'openssl') + with open(os.path.join( + CSR_DATA_DIR, 'scripts', 'caIPAserviceCert_openssl.sh')) as f: + expected_script = f.read() + assert script == expected_script + + def test_caIPAserviceCert_Certutil(self, generator): + principal = { + 'krbprincipalname': [ + 'HTTP/machine.example.com@DOMAIN.EXAMPLE.COM' + ], + } + config = { + 'ipacertificatesubjectbase': [ + 'O=DOMAIN.EXAMPLE.COM' + ], + } + + script = generator.csr_script( + principal, config, 'caIPAserviceCert', 'certutil') + with open(os.path.join( + CSR_DATA_DIR, 'scripts', 'caIPAserviceCert_certutil.sh')) as f: + expected_script = f.read() + assert script == expected_script + + +class test_rule_handling(object): + def test_optionalAttributeMissing(self, generator): + principal = {'uid': 'testuser'} + rule_provider = StubRuleProvider() + rule_provider.data_rule.template = '{{subject.mail}}' + rule_provider.data_rule.options = {'data_source': 'subject.mail'} + generator = IdentityCSRGenerator(rule_provider) + + script = generator.csr_script( + principal, {}, 'example', 'identity') + assert script == '\n' + + def test_twoDataRulesOneMissing(self, generator): + principal = {'uid': 'testuser'} + rule_provider = StubRuleProvider() + rule_provider.data_rule.template = '{{subject.mail}}' + rule_provider.data_rule.options = {'data_source': 'subject.mail'} + rule_provider.field_mapping.data_rules.append(csrgen.Rule( + 'data2', '{{subject.uid}}', {'data_source': 'subject.uid'})) + generator = IdentityCSRGenerator(rule_provider) + + script = generator.csr_script(principal, {}, 'example', 'identity') + assert script == ',testuser\n' + + def test_requiredAttributeMissing(self): + principal = {'uid': 'testuser'} + rule_provider = StubRuleProvider() + rule_provider.data_rule.template = '{{subject.mail}}' + rule_provider.data_rule.options = {'data_source': 'subject.mail'} + rule_provider.syntax_rule.options = {'required': True} + generator = IdentityCSRGenerator(rule_provider) + + with pytest.raises(errors.CSRTemplateError): + _script = generator.csr_script( + principal, {}, 'example', 'identity')