From 40d8dded7fc1e71621516da9197c736057c0b6e4 Mon Sep 17 00:00:00 2001 From: Martin Babinsky Date: Jun 13 2016 15:50:54 +0000 Subject: Test suite for `serverroles` backend Tests retrieving roles/attributes and setting server attributes in various scenarios. https://fedorahosted.org/freeipa/ticket/5181 Reviewed-By: Jan Cholasta Reviewed-By: Martin Basti Reviewed-By: Pavel Vomacka --- diff --git a/ipatests/test_ipaserver/test_serverroles.py b/ipatests/test_ipaserver/test_serverroles.py new file mode 100644 index 0000000..ef8cca3 --- /dev/null +++ b/ipatests/test_ipaserver/test_serverroles.py @@ -0,0 +1,745 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# + +""" +Tests for the serverroles backend +""" + +from collections import namedtuple + +import ldap +import pytest + +from ipalib import api, create_api, errors +from ipapython.dn import DN +from ipatests.util import MockLDAP + + +def _make_service_entry_mods(enabled=True, other_config=None): + mods = { + b'objectClass': [b'top', b'nsContainer', b'ipaConfigObject'], + } + if enabled: + mods.update({b'ipaConfigString': [b'enabledService']}) + + if other_config is not None: + mods.setdefault(b'ipaConfigString', []) + mods[b'ipaConfigString'].extend(other_config) + + return mods + + +def _make_master_entry_mods(ca=False): + mods = { + b'objectClass': [ + b'top', + b'nsContainer', + b'ipaReplTopoManagedServer', + b'ipaSupportedDomainLevelConfig', + b'ipaConfigObject', + ], + b'ipaMaxDomainLevel': [b'1'], + b'ipaMinDomainLevel': [b'0'], + b'ipaReplTopoManagedsuffix': [str(api.env.basedn)] + } + if ca: + mods[b'ipaReplTopoManagedsuffix'].append(b'o=ipaca') + + return mods + +_adtrust_agents = DN( + ('cn', 'adtrust agents'), + ('cn', 'sysaccounts'), + ('cn', 'etc'), + api.env.basedn +) + + +master_data = { + 'ca-dns-dnssec-keymaster': { + 'services': { + 'CA': { + 'enabled': True, + }, + 'DNS': { + 'enabled': True, + }, + 'DNSKeySync': { + 'enabled': True, + }, + 'DNSSEC': { + 'enabled': True, + 'config': ['DNSSecKeyMaster'] + } + }, + 'expected_roles': { + 'enabled': ['IPA master', 'CA server', 'DNS server'] + }, + 'expected_attributes': {'DNS server': 'dnssec_key_master_server'} + }, + 'ca-kra-renewal-master': { + 'services': { + 'CA': { + 'enabled': True, + 'config': ['caRenewalMaster'] + }, + 'KRA': { + 'enabled': True, + }, + }, + 'expected_roles': { + 'enabled': ['IPA master', 'CA server', 'KRA server'] + }, + 'expected_attributes': {'CA server': 'ca_renewal_master_server'} + }, + 'dns-trust-agent': { + 'services': { + 'DNS': { + 'enabled': True, + }, + 'DNSKeySync': { + 'enabled': True, + } + }, + 'attributes': { + _adtrust_agents: { + 'member': ['host'] + } + }, + 'expected_roles': { + 'enabled': ['IPA master', 'DNS server', 'AD trust agent'] + } + }, + 'trust-agent': { + 'attributes': { + _adtrust_agents: { + 'member': ['host'] + } + }, + 'expected_roles': { + 'enabled': ['IPA master', 'AD trust agent'] + } + }, + 'trust-controller-dns': { + 'services': { + 'ADTRUST': { + 'enabled': True, + }, + 'DNS': { + 'enabled': True, + }, + 'DNSKeySync': { + 'enabled': True, + } + }, + 'attributes': { + _adtrust_agents: { + 'member': ['host', 'cifs'] + } + }, + 'expected_roles': { + 'enabled': ['IPA master', 'AD trust agent', 'AD trust controller', + 'DNS server'] + } + }, + 'trust-controller-ca': { + 'services': { + 'ADTRUST': { + 'enabled': True, + }, + 'CA': { + 'enabled': True, + }, + }, + 'attributes': { + _adtrust_agents: { + 'member': ['host', 'cifs'] + } + }, + 'expected_roles': { + 'enabled': ['IPA master', 'AD trust agent', 'AD trust controller', + 'CA server'] + } + }, + 'configured-ca': { + 'services': { + 'CA': { + 'enabled': False, + }, + }, + 'expected_roles': { + 'enabled': ['IPA master'], + 'configured': ['CA server'] + } + }, + 'configured-dns': { + 'services': { + 'DNS': { + 'enabled': False, + }, + 'DNSKeySync': { + 'enabled': False, + } + }, + 'expected_roles': { + 'enabled': ['IPA master'], + 'configured': ['DNS server'] + } + }, + 'mixed-state-dns': { + 'services': { + 'DNS': { + 'enabled': False + }, + 'DNSKeySync': { + 'enabled': True + } + }, + 'expected_roles': { + 'enabled': ['IPA master'], + 'configured': ['DNS server'] + } + }, +} + + +class MockMasterTopology(object): + """ + object that will set up and tear down entries in LDAP backend to mimic + a presence of real IPA masters with services running on them. + """ + + ipamaster_services = [u'KDC', u'HTTP', u'KPASSWD'] + + def __init__(self, api_instance, domain_data): + self.api = api_instance + + self.domain = self.api.env.domain + self.domain_data = domain_data + self.masters_base = DN( + self.api.env.container_masters, self.api.env.basedn) + + self.test_master_dn = DN( + ('cn', self.api.env.host), self.api.env.container_masters, + self.api.env.basedn) + + self.ldap = MockLDAP() + + self.existing_masters = { + m['cn'][0] for m in self.api.Command.server_find( + u'', sizelimit=0, + pkey_only=True, + no_members=True, + raw=True)['result']} + + self.existing_attributes = self._check_test_host_attributes() + + def iter_domain_data(self): + MasterData = namedtuple('MasterData', + ['dn', 'fqdn', 'services', 'attrs']) + for name in self.domain_data: + fqdn = self.get_fqdn(name) + master_dn = self.get_master_dn(name) + master_services = self.domain_data[name].get('services', {}) + + master_attributes = self.domain_data[name].get('attributes', {}) + + yield MasterData( + dn=master_dn, + fqdn=fqdn, + services=master_services, + attrs=master_attributes + ) + + def get_fqdn(self, name): + return '.'.join([name, self.domain]) + + def get_master_dn(self, name): + return DN(('cn', self.get_fqdn(name)), self.masters_base) + + def get_service_dn(self, name, master_dn): + return DN(('cn', name), master_dn) + + def _add_host_entry(self, fqdn): + self.api.Command.host_add(fqdn, force=True) + self.api.Command.hostgroup_add_member(u'ipaservers', host=fqdn) + + def _del_host_entry(self, fqdn): + try: + self.api.Command.host_del(fqdn) + except errors.NotFound: + pass + + def _add_service_entry(self, service, fqdn): + return self.api.Command.service_add( + '/'.join([service, fqdn]), + force=True + ) + + def _del_service_entry(self, service, fqdn): + try: + self.api.Command.service_del( + '/'.join([service, fqdn]), + ) + except errors.NotFound: + pass + + def _add_svc_entries(self, master_dn, svc_desc): + self._add_ipamaster_services(master_dn) + for name in svc_desc: + svc_dn = self.get_service_dn(name, master_dn) + svc_mods = svc_desc[name] + + self.ldap.add_entry( + str(svc_dn), + _make_service_entry_mods( + enabled=svc_mods['enabled'], + other_config=svc_mods.get('config', None))) + + def _remove_svc_master_entries(self, master_dn): + try: + entries = self.ldap.connection.search_s( + str(master_dn), ldap.SCOPE_SUBTREE + ) + except ldap.NO_SUCH_OBJECT: + return + + if entries: + entries.sort(key=lambda x: len(x[0]), reverse=True) + for entry_dn, attrs in entries: + self.ldap.del_entry(str(entry_dn)) + + def _add_ipamaster_services(self, master_dn): + """ + add all the service entries which are part of the IPA Master role + """ + for svc_name in self.ipamaster_services: + svc_dn = self.get_service_dn(svc_name, master_dn) + self.ldap.add_entry(str(svc_dn), _make_service_entry_mods()) + + def _add_members(self, dn, fqdn, member_attrs): + entry, attrs = self.ldap.connection.search_s( + str(dn), ldap.SCOPE_SUBTREE)[0] + mods = [] + value = attrs.get('member', []) + mod_op = ldap.MOD_REPLACE + if not value: + mod_op = ldap.MOD_ADD + + for a in member_attrs: + + if a == 'host': + value.append( + str(self.api.Object.host.get_dn(fqdn))) + else: + result = self._add_service_entry(a, fqdn)['result'] + value.append(str(result['dn'])) + + mods.append( + (mod_op, 'member', value) + ) + + self.ldap.connection.modify_s(str(dn), mods) + + def _remove_members(self, dn, fqdn, member_attrs): + entry, attrs = self.ldap.connection.search_s( + str(dn), ldap.SCOPE_SUBTREE)[0] + mods = [] + for a in member_attrs: + value = set(attrs.get('member', [])) + if not value: + continue + + if a == 'host': + try: + value.remove( + str(self.api.Object.host.get_dn(fqdn))) + except KeyError: + pass + else: + try: + value.remove( + str(self.api.Object.service.get_dn( + '/'.join([a, fqdn])))) + except KeyError: + pass + self._del_service_entry(a, fqdn) + + mods.append( + (ldap.MOD_REPLACE, 'member', list(value)) + ) + + try: + self.ldap.connection.modify_s(str(dn), mods) + except (ldap.NO_SUCH_OBJECT, ldap.NO_SUCH_ATTRIBUTE): + pass + + def _check_test_host_attributes(self): + existing_attributes = set() + + for service, value, attr_name in ( + ('CA', 'caRenewalMaster', 'ca renewal master'), + ('DNSSEC', 'DNSSecKeyMaster', 'dnssec key master')): + + svc_dn = DN(('cn', service), self.test_master_dn) + try: + svc_entry = self.api.Backend.ldap2.get_entry(svc_dn) + except errors.NotFound: + continue + else: + config_string_val = svc_entry.get('ipaConfigString', []) + + if value in config_string_val: + existing_attributes.add(attr_name) + + return existing_attributes + + def _remove_ca_renewal_master(self): + if 'ca renewal master' not in self.existing_attributes: + return + + ca_dn = DN(('cn', 'CA'), self.test_master_dn) + ca_entry = self.api.Backend.ldap2.get_entry(ca_dn) + + config_string_val = ca_entry.get('ipaConfigString', []) + try: + config_string_val.remove('caRenewalMaster') + except KeyError: + return + + ca_entry.update({'ipaConfigString': config_string_val}) + self.api.Backend.ldap2.update_entry(ca_entry) + + def _restore_ca_renewal_master(self): + if 'ca renewal master' not in self.existing_attributes: + return + + ca_dn = DN(('cn', 'CA'), self.test_master_dn) + ca_entry = self.api.Backend.ldap2.get_entry(ca_dn) + + config_string_val = ca_entry.get('ipaConfigString', []) + config_string_val.append('caRenewalMaster') + + ca_entry.update({'ipaConfigString': config_string_val}) + self.api.Backend.ldap2.update_entry(ca_entry) + + def setup_data(self): + self._remove_ca_renewal_master() + for master_data in self.iter_domain_data(): + # create host + self._add_host_entry(master_data.fqdn) + + # create master + self.ldap.add_entry( + str(master_data.dn), _make_master_entry_mods( + ca='CA' in master_data.services)) + + # now add service entries + self._add_svc_entries(master_data.dn, master_data.services) + + # optionally add some attributes required e.g. by AD trust roles + for entry_dn, attrs in master_data.attrs.items(): + if 'member' in attrs: + self._add_members( + entry_dn, + master_data.fqdn, + attrs['member'] + ) + + def teardown_data(self): + self._restore_ca_renewal_master() + for master_data in self.iter_domain_data(): + # first remove the master entries and service containers + self._remove_svc_master_entries(master_data.dn) + + # optionally clean up leftover attributes + for entry_dn, attrs in master_data.attrs.items(): + if 'member' in attrs: + self._remove_members( + entry_dn, + master_data.fqdn, + attrs['member'], + ) + + # finally remove host entry + self._del_host_entry(master_data.fqdn) + + +@pytest.fixture(scope='module') +def mock_api(request): + test_api = create_api(mode=None) + test_api.bootstrap(in_server=True, in_tree=True) + test_api.finalize() + + if not test_api.Backend.ldap2.isconnected(): + test_api.Backend.ldap2.connect() + + return test_api + + +@pytest.fixture(scope='module') +def mock_masters(request, mock_api): + """ + Populate the LDAP backend with test data + """ + + if not api.Backend.rpcclient.isconnected(): + api.Backend.rpcclient.connect() + + master_topo = MockMasterTopology(mock_api, master_data) + + def finalize(): + master_topo.teardown_data() + if api.Backend.rpcclient.isconnected(): + api.Backend.rpcclient.disconnect() + + request.addfinalizer(finalize) + + master_topo.setup_data() + + return master_topo + + +def enabled_role_iter(master_data): + for m, data in master_data.items(): + for role in data['expected_roles']['enabled']: + yield m, role + + +def provided_role_iter(master_data): + for m, data in master_data.items(): + yield m, data['expected_roles']['enabled'] + + +def configured_role_iter(master_data): + for m, data in master_data.items(): + if 'configured' in data['expected_roles']: + for role in data['expected_roles']['configured']: + yield m, role + + +def role_provider_iter(master_data): + result = {} + for m, data in master_data.items(): + for role in data['expected_roles']['enabled']: + if role not in result: + result[role] = [] + + result[role].append(m) + + for role_name, masters in result.items(): + yield role_name, masters + + +def attribute_masters_iter(master_data): + for m, data in master_data.items(): + if 'expected_attributes' in data: + for assoc_role, attr in data['expected_attributes'].items(): + yield m, assoc_role, attr + + +def dns_servers_iter(master_data): + for m, data in master_data.items(): + if "DNS server" in data['expected_roles']['enabled']: + yield m + + +@pytest.fixture(params=list(enabled_role_iter(master_data)), + ids=['role: {}, master: {}, enabled'.format(role, m) + for m, role in enabled_role_iter(master_data)]) +def enabled_role(request): + return request.param + + +@pytest.fixture(params=list(provided_role_iter(master_data)), + ids=["{}: {}".format(m, ', '.join(roles)) for m, roles in + provided_role_iter(master_data)]) +def provided_roles(request): + return request.param + + +@pytest.fixture(params=list(configured_role_iter(master_data)), + ids=['role: {}, master: {}, configured'.format(role, m) + for m, role in configured_role_iter(master_data)]) +def configured_role(request): + return request.param + + +@pytest.fixture(params=list(role_provider_iter(master_data)), + ids=['{} providers'.format(role_name) + for role_name, m in + role_provider_iter(master_data)]) +def role_providers(request): + return request.param + + +@pytest.fixture(params=list(attribute_masters_iter(master_data)), + ids=['{} of {}: {}'.format(attr, role, m) for m, role, attr in + attribute_masters_iter(master_data)]) +def attribute_providers(request): + return request.param + + +@pytest.fixture(params=list(dns_servers_iter(master_data)), + ids=list(dns_servers_iter(master_data))) +def dns_server(request): + return request.param + + +class TestServerRoleStatusRetrieval(object): + def retrieve_role(self, master, role, mock_api, mock_masters): + fqdn = mock_masters.get_fqdn(master) + return mock_api.Backend.serverroles.server_role_retrieve( + server_server=fqdn, role_servrole=role) + + def find_role(self, role_name, mock_api, mock_masters, master=None): + if master is not None: + hostname = mock_masters.get_fqdn(master) + else: + hostname = None + + result = mock_api.Backend.serverroles.server_role_search( + server_server=hostname, + role_servrole=role_name) + + return [ + r for r in result if r[u'server_server'] not in + mock_masters.existing_masters] + + def get_enabled_roles_on_master(self, master, mock_api, mock_masters): + fqdn = mock_masters.get_fqdn(master) + result = mock_api.Backend.serverroles.server_role_search( + server_server=fqdn, role_servrole=None, status=u'enabled' + ) + return sorted(set(r[u'role_servrole'] for r in result)) + + def get_masters_with_enabled_role(self, role_name, mock_api, mock_masters): + result = mock_api.Backend.serverroles.server_role_search( + server_server=None, role_servrole=role_name) + + return sorted( + r[u'server_server'] for r in result if + r[u'status'] == u'enabled' and r[u'server_server'] not in + mock_masters.existing_masters) + + def test_listing_of_enabled_role( + self, mock_api, mock_masters, enabled_role): + master, role_name = enabled_role + result = self.retrieve_role(master, role_name, mock_api, mock_masters) + + assert result[0][u'status'] == u'enabled' + + def test_listing_of_configured_role( + self, mock_api, mock_masters, configured_role): + master, role_name = configured_role + result = self.retrieve_role(master, role_name, mock_api, mock_masters) + + assert result[0][u'status'] == u'configured' + + def test_role_providers( + self, mock_api, mock_masters, role_providers): + role_name, providers = role_providers + expected_masters = sorted(mock_masters.get_fqdn(m) for m in providers) + + actual_masters = self.get_masters_with_enabled_role( + role_name, mock_api, mock_masters) + + assert expected_masters == actual_masters + + def test_provided_roles_on_master( + self, mock_api, mock_masters, provided_roles): + master, expected_roles = provided_roles + expected_roles.sort() + actual_roles = self.get_enabled_roles_on_master( + master, mock_api, mock_masters) + assert expected_roles == actual_roles + + def test_unknown_role_status_raises_notfound(self, mock_api, mock_masters): + unknown_role = 'IAP maestr' + fqdn = mock_masters.get_fqdn('ca-dns-dnssec-keymaster') + with pytest.raises(errors.NotFound): + mock_api.Backend.serverroles.server_role_retrieve( + fqdn, unknown_role) + + def test_no_servrole_queries_all_roles_on_server(self, mock_api, + mock_masters): + master_name = 'ca-dns-dnssec-keymaster' + enabled_roles = master_data[master_name]['expected_roles']['enabled'] + result = self.find_role(None, mock_api, mock_masters, + master=master_name) + + for r in result: + if r[u'role_servrole'] in enabled_roles: + assert r[u'status'] == u'enabled' + else: + assert r[u'status'] == u'absent' + + def test_invalid_substring_search_returns_nothing(self, mock_api, + mock_masters): + invalid_substr = 'fwfgbb' + + assert (not self.find_role(invalid_substr, mock_api, mock_masters, + 'ca-dns-dnssec-keymaster')) + + +class TestServerAttributes(object): + def config_retrieve(self, assoc_role_name, mock_api): + return mock_api.Backend.serverroles.config_retrieve( + assoc_role_name) + + def config_update(self, mock_api, **attrs_values): + return mock_api.Backend.serverroles.config_update(**attrs_values) + + def test_attribute_master(self, mock_api, mock_masters, + attribute_providers): + master, assoc_role, attr_name = attribute_providers + fqdn = mock_masters.get_fqdn(master) + actual_attr_masters = self.config_retrieve( + assoc_role, mock_api)[attr_name] + + assert actual_attr_masters == fqdn + + def test_set_attribute_on_the_same_provider_raises_emptymodlist( + self, mock_api, mock_masters): + attr_name = "ca_renewal_master_server" + role_name = "CA server" + + existing_renewal_master = self.config_retrieve( + role_name, mock_api)[attr_name] + + with pytest.raises(errors.EmptyModlist): + self.config_update( + mock_api, **{attr_name: existing_renewal_master}) + + def test_set_attribute_on_master_without_assoc_role_raises_validationerror( + self, mock_api, mock_masters): + attr_name = "ca_renewal_master_server" + + non_ca_fqdn = mock_masters.get_fqdn('trust-controller-dns') + + with pytest.raises(errors.ValidationError): + self.config_update(mock_api, **{attr_name: non_ca_fqdn}) + + def test_set_unknown_attribute_on_master_raises_notfound( + self, mock_api, mock_masters): + attr_name = "ca_renuwal_maztah" + fqdn = mock_masters.get_fqdn('trust-controller-ca') + + with pytest.raises(errors.NotFound): + self.config_update(mock_api, **{attr_name: fqdn}) + + def test_set_ca_renewal_master_on_other_ca_and_back(self, mock_api, + mock_masters): + attr_name = "ca_renewal_master_server" + role_name = "CA server" + original_renewal_master = self.config_retrieve( + role_name, mock_api)[attr_name] + + other_ca_server = mock_masters.get_fqdn('trust-controller-ca') + + for host in (other_ca_server, original_renewal_master): + self.config_update(mock_api, **{attr_name: host}) + + assert ( + self.config_retrieve(role_name, mock_api)[attr_name] == host)