From 42719acdcebd3ef939587d4af4c3c6ad743ec601 Mon Sep 17 00:00:00 2001 From: Martin Basti Date: Jun 03 2016 13:58:21 +0000 Subject: DNS Locations: extend tests with server-* commands https://fedorahosted.org/freeipa/ticket/2008 Reviewed-By: Petr Spacek Reviewed-By: Jan Cholasta --- diff --git a/ipatests/test_xmlrpc/test_location_plugin.py b/ipatests/test_xmlrpc/test_location_plugin.py index 1ca3eac..97e97a2 100644 --- a/ipatests/test_xmlrpc/test_location_plugin.py +++ b/ipatests/test_xmlrpc/test_location_plugin.py @@ -5,12 +5,14 @@ from __future__ import absolute_import import pytest -from ipalib import errors +from ipalib import errors, api from ipatests.test_xmlrpc.tracker.location_plugin import LocationTracker +from ipatests.test_xmlrpc.tracker.server_plugin import ServerTracker from ipatests.test_xmlrpc.xmlrpc_test import ( XMLRPC_test, raises_exact, ) +from ipapython.dnsutil import DNSName @pytest.fixture(scope='class', params=[u'location1', u'sk\xfa\u0161ka.idna']) @@ -31,6 +33,12 @@ def location_absolute(request): return tracker.make_fixture(request) +@pytest.fixture(scope='class') +def server(request): + tracker = ServerTracker(api.env.host) + return tracker + + @pytest.mark.tier1 class TestNonexistentIPALocation(XMLRPC_test): def test_retrieve_nonexistent(self, location): @@ -75,7 +83,7 @@ class TestInvalidIPALocations(XMLRPC_test): class TestCRUD(XMLRPC_test): def test_create_duplicate(self, location): location.ensure_exists() - command = location.make_create_command(force=True) + command = location.make_create_command() with raises_exact(errors.DuplicateEntry( message=u'location with name "%s" already exists' % location.idnsname)): @@ -111,3 +119,82 @@ class TestCRUD(XMLRPC_test): def test_delete_location(self, location): location.delete() + + +@pytest.mark.tier1 +class TestLocationsServer(XMLRPC_test): + + def test_add_nonexistent_location_to_server(self, server): + nonexistent_loc = DNSName(u'nonexistent-location') + command = server.make_update_command( + updates=dict( + ipalocation_location=nonexistent_loc, + ) + ) + with raises_exact(errors.NotFound( + reason=u"{location}: location not found".format( + location=nonexistent_loc + ))): + command() + + def test_add_location_to_server(self, location, server): + location.ensure_exists() + server.update( + dict(ipalocation_location=location.idnsname_obj), + expected_updates=dict( + ipalocation_location=[location.idnsname_obj], + ) + ) + location.add_server_to_location(server.server_name) + location.retrieve() + + def test_retrieve(self, server): + server.retrieve() + + def test_retrieve_all(self, server): + server.retrieve(all=True) + + def test_search_server_with_location(self, location, server): + command = server.make_find_command( + server.server_name, in_location=location.idnsname_obj) + result = command() + server.check_find(result) + + def test_search_server_with_location_with_all(self, location, server): + command = server.make_find_command( + server.server_name, in_location=location.idnsname_obj, all=True) + result = command() + server.check_find(result, all=True) + + def test_search_server_without_location(self, location, server): + command = server.make_find_command( + server.server_name, not_in_location=location.idnsname_obj) + result = command() + server.check_find_nomatch(result) + + def test_add_location_to_server_custom_weight(self, location, server): + location.ensure_exists() + server.update( + dict( + ipalocation_location=location.idnsname_obj, + ipalocationweight=200, + ), + expected_updates=dict( + ipalocation_location=[location.idnsname_obj], + ipalocationweight=[u'200'], + ) + ) + # remove invalid data from the previous test + location.remove_server_from_location(server.server_name) + + location.add_server_to_location(server.server_name, weight=200) + location.retrieve() + + def test_remove_location_from_server(self, location, server): + server.update(dict(ipalocation_location=None)) + location.remove_server_from_location(server.server_name) + location.retrieve() + + def test_remove_location_weight_from_server(self, location, server): + server.update(dict(ipalocationweight=None)) + location.retrieve() diff --git a/ipatests/test_xmlrpc/tracker/base.py b/ipatests/test_xmlrpc/tracker/base.py index acd382d..6a0af51 100644 --- a/ipatests/test_xmlrpc/tracker/base.py +++ b/ipatests/test_xmlrpc/tracker/base.py @@ -281,6 +281,10 @@ class Tracker(object): result = command() self.attrs.update(updates) self.attrs.update(expected_updates) + for key, value in self.attrs.items(): + if value is None: + del self.attrs[key] + self.check_update(result, extra_keys=set(updates.keys()) | set(expected_updates.keys())) diff --git a/ipatests/test_xmlrpc/tracker/location_plugin.py b/ipatests/test_xmlrpc/tracker/location_plugin.py index c7af097..8901b7e 100644 --- a/ipatests/test_xmlrpc/tracker/location_plugin.py +++ b/ipatests/test_xmlrpc/tracker/location_plugin.py @@ -3,19 +3,25 @@ # from __future__ import absolute_import +import six + from ipapython.dn import DN from ipapython.dnsutil import DNSName from ipatests.util import assert_deepequal from ipatests.test_xmlrpc.tracker.base import Tracker +if six.PY3: + unicode = str + + class LocationTracker(Tracker): """Tracker for IPA Location tests""" - retrieve_keys = {'idnsname', 'description', 'dn'} + retrieve_keys = {'idnsname', 'description', 'dn', 'servers_server'} retrieve_all_keys = retrieve_keys | {'objectclass'} - create_keys = retrieve_keys | {'objectclass'} - find_keys = retrieve_keys - find_all_keys = retrieve_all_keys + create_keys = {'idnsname', 'description', 'dn', 'objectclass'} + find_keys = {'idnsname', 'description', 'dn',} + find_all_keys = find_keys | {'objectclass'} update_keys = {'idnsname', 'description'} def __init__(self, name, description=u"Location description"): @@ -34,13 +40,15 @@ class LocationTracker(Tracker): 'cn=etc', self.api.env.basedn ) + self.servers = {} + def make_create_command(self, force=None): """Make function that creates this location using location-add""" return self.make_command( 'location_add', self.idnsname, description=self.description, ) - def make_delete_command(self): + def make_delete_command(self, force=None): """Make function that removes this location using location-del""" return self.make_command('location_del', self.idnsname) @@ -95,6 +103,7 @@ class LocationTracker(Tracker): value=self.idnsname_obj, summary=None, result=expected, + servers=self.servers, ), result) def check_find(self, result, all=False, raw=False): @@ -117,3 +126,26 @@ class LocationTracker(Tracker): summary=u'Modified IPA location "{loc}"'.format(loc=self.idnsname), result=self.filter_attrs(self.update_keys | set(extra_keys)) ), result) + + def add_server_to_location( + self, server_name, weight=100, relative_weight=u"100.0%"): + self.attrs.setdefault('servers_server', []).append(server_name) + self.servers[server_name] = { + 'cn': [server_name], + 'ipalocationweight': [unicode(weight)], + 'location_relative_weight': [relative_weight] + } + + def remove_server_from_location(self, server_name): + if 'servers_server' in self.attrs: + try: + self.attrs['servers_server'].remove(server_name) + except ValueError: + pass + else: + if not self.attrs['servers_server']: + del self.attrs['servers_server'] + try: + del self.servers[server_name] + except KeyError: + pass diff --git a/ipatests/test_xmlrpc/tracker/server_plugin.py b/ipatests/test_xmlrpc/tracker/server_plugin.py new file mode 100644 index 0000000..42e63d7 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/server_plugin.py @@ -0,0 +1,110 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# +from __future__ import absolute_import + +from ipapython.dn import DN +from ipatests.util import assert_deepequal +from ipatests.test_xmlrpc.tracker.base import Tracker + + +class ServerTracker(Tracker): + """Tracker for IPA Location tests""" + retrieve_keys = { + 'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel', + 'iparepltopomanagedsuffix_topologysuffix', 'ipalocation_location', + 'ipalocationweight', + } + retrieve_all_keys = retrieve_keys | {'objectclass'} + create_keys = retrieve_keys | {'objectclass'} + find_keys = { + 'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel', + 'ipalocationweight', + } + find_all_keys = retrieve_all_keys + update_keys = { + 'cn', 'ipamaxdomainlevel', 'ipamindomainlevel', + 'ipalocation_location', 'ipalocationweight', + } + + def __init__(self, name): + super(ServerTracker, self).__init__(default_version=None) + self.server_name = name + self.dn = DN( + ('cn', self.server_name), + 'cn=masters,cn=ipa,cn=etc', + self.api.env.basedn + ) + self.exists = True # we cannot add server manually using server-add + self.attrs = dict( + dn=self.dn, + cn=[self.server_name], + iparepltopomanagedsuffix_topologysuffix=[u'domain', u'ca'], + objectclass=[ + u"ipalocationmember", + u"ipaReplTopoManagedServer", + u"top", + u"ipaConfigObject", + u"nsContainer", + u"ipaSupportedDomainLevelConfig" + ], + ipamaxdomainlevel=[u"1"], + ipamindomainlevel=[u"0"], + ) + self.exists = True + + def make_retrieve_command(self, all=False, raw=False): + """Make function that retrieves this server using server-show""" + return self.make_command( + 'server_show', self.name, all=all, raw=raw + ) + + def make_find_command(self, *args, **kwargs): + """Make function that finds servers using server-find""" + return self.make_command('server_find', *args, **kwargs) + + def make_update_command(self, updates): + """Make function that modifies the server using server-mod""" + return self.make_command('server_mod', self.name, **updates) + + def check_retrieve(self, result, all=False, raw=False): + """Check `server-show` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + assert_deepequal(dict( + value=self.server_name, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """Check `server-find` command result""" + if all: + expected = self.filter_attrs(self.find_all_keys) + else: + expected = self.filter_attrs(self.find_keys) + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 IPA server matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'server-find' command result when no match is expected """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 IPA servers matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """Check `server-update` command result""" + assert_deepequal(dict( + value=self.server_name, + summary=u'Modified IPA server "{server}"'.format(server=self.name), + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result)