From 2c702c4f44512e8731c3d0a026cbc8838517345f Mon Sep 17 00:00:00 2001 From: Simon Pichugin Date: Oct 13 2015 12:10:59 +0000 Subject: Ticket 47957 - Add replication test suite for a wait async feature Description: Test new attribute "nsDS5ReplicaWaitForAsyncResults". After setting the attribute, supplier will sleep established amount of millisecond if it finds the response from consumer is not ready. Tests: - not integer value; - multi value; - check that value has been set correctly [None, 2000, 0, -5]; - replication behavior with valid values [None, 2000, 0, -5]. https://fedorahosted.org/389/ticket/47957 Reviewed by: nhosoi@redhat.com (Thanks, Noriko!) --- diff --git a/dirsrvtests/suites/replication/wait_for_async_feature_test.py b/dirsrvtests/suites/replication/wait_for_async_feature_test.py new file mode 100644 index 0000000..4905088 --- /dev/null +++ b/dirsrvtests/suites/replication/wait_for_async_feature_test.py @@ -0,0 +1,280 @@ +import os +import sys +import time +import ldap +import logging +import pytest +from lib389 import DirSrv, Entry, tools, tasks +from lib389.tools import DirSrvTools +from lib389._constants import * +from lib389.properties import * +from lib389.tasks import * +from lib389.utils import * +from collections import Counter + +logging.getLogger(__name__).setLevel(logging.DEBUG) +log = logging.getLogger(__name__) + +installation1_prefix = None + +WAITFOR_ASYNC_ATTR = "nsDS5ReplicaWaitForAsyncResults" + +class TopologyReplication(object): + def __init__(self, master1, master2, m1_m2_agmt, m2_m1_agmt): + master1.open() + master2.open() + self.masters = ((master1, m1_m2_agmt), + (master2, m2_m1_agmt)) + + +@pytest.fixture(scope="module") +def topology(request): + global installation1_prefix + if installation1_prefix: + args_instance[SER_DEPLOYED_DIR] = installation1_prefix + + # Creating master 1... + master1 = DirSrv(verbose=False) + args_instance[SER_HOST] = HOST_MASTER_1 + args_instance[SER_PORT] = PORT_MASTER_1 + args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1 + args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX + args_master = args_instance.copy() + master1.allocate(args_master) + instance_master1 = master1.exists() + if instance_master1: + master1.delete() + master1.create() + master1.open() + master1.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_1) + + # Creating master 2... + master2 = DirSrv(verbose=False) + args_instance[SER_HOST] = HOST_MASTER_2 + args_instance[SER_PORT] = PORT_MASTER_2 + args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2 + args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX + args_master = args_instance.copy() + master2.allocate(args_master) + instance_master2 = master2.exists() + if instance_master2: + master2.delete() + master2.create() + master2.open() + master2.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER, replicaId=REPLICAID_MASTER_2) + + # + # Create all the agreements + # + # Creating agreement from master 1 to master 2 + properties = {RA_NAME: r'meTo_$host:$port', + RA_BINDDN: defaultProperties[REPLICATION_BIND_DN], + RA_BINDPW: defaultProperties[REPLICATION_BIND_PW], + RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD], + RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]} + m1_m2_agmt = master1.agreement.create(suffix=SUFFIX, host=master2.host, port=master2.port, properties=properties) + if not m1_m2_agmt: + log.fatal("Fail to create a master -> master replica agreement") + sys.exit(1) + log.debug("%s created" % m1_m2_agmt) + + # Creating agreement from master 2 to master 1 + properties = {RA_NAME: r'meTo_$host:$port', + RA_BINDDN: defaultProperties[REPLICATION_BIND_DN], + RA_BINDPW: defaultProperties[REPLICATION_BIND_PW], + RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD], + RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]} + m2_m1_agmt = master2.agreement.create(suffix=SUFFIX, host=master1.host, port=master1.port, properties=properties) + if not m2_m1_agmt: + log.fatal("Fail to create a master -> master replica agreement") + sys.exit(1) + log.debug("%s created" % m2_m1_agmt) + + # Allow the replicas to get situated with the new agreements... + time.sleep(5) + + # + # Initialize all the agreements + # + master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2) + master1.waitForReplInit(m1_m2_agmt) + master2.agreement.init(SUFFIX, HOST_MASTER_1, PORT_MASTER_1) + master2.waitForReplInit(m2_m1_agmt) + + # Check replication is working... + if master1.testReplication(DEFAULT_SUFFIX, master2): + log.info('Replication is working.') + else: + log.fatal('Replication is not working.') + assert False + + log.info("Set Replication Debugging loglevel for the errorlog") + master1.setLogLevel(lib389.LOG_REPLICA) + master2.setLogLevel(lib389.LOG_REPLICA) + + # Delete each instance in the end + def fin(): + master1.delete() + master2.delete() + request.addfinalizer(fin) + + # Clear out the tmp dir + master1.clearTmpDir(__file__) + + return TopologyReplication(master1, master2, m1_m2_agmt, m2_m1_agmt) + + +@pytest.fixture(params=[(None, (4, 10)), + ('2000', (0, 1)), + ('0', (4, 10)), + ('-5', (4, 10))]) +def waitfor_async_attr(topology, request): + """Sets attribute on all replicas""" + + attr_value = request.param[0] + expected_result = request.param[1] + + # Run through all masters + for master in topology.masters: + agmt = master[1] + try: + if attr_value: + log.info("Set %s: %s on %s" % ( + WAITFOR_ASYNC_ATTR, attr_value, master[0].serverid)) + mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, attr_value)] + else: + log.info("Delete %s from %s" % ( + WAITFOR_ASYNC_ATTR, master[0].serverid)) + mod = [(ldap.MOD_DELETE, WAITFOR_ASYNC_ATTR, None)] + master[0].modify_s(agmt, mod) + except ldap.LDAPError as e: + log.error('Failed to set or delete %s attribute: (%s)' % ( + WAITFOR_ASYNC_ATTR, e.message['desc'])) + + return (attr_value, expected_result) + + +@pytest.fixture +def entries(topology, request): + """Adds entries to the master1""" + + master1 = topology.masters[0][0] + + TEST_OU = "test" + test_dn = SUFFIX + test_list = [] + + log.info("Add 100 nested entries under replicated suffix on %s" % master1.serverid) + for i in xrange(100): + test_dn = 'ou=%s%s,%s' % (TEST_OU, i, test_dn) + test_list.insert(0, test_dn) + try: + master1.add_s(Entry((test_dn, + {'objectclass': 'top', + 'objectclass': 'organizationalUnit', + 'ou': TEST_OU}))) + except ldap.LDAPError as e: + log.error('Failed to add entry (%s): error (%s)' % (test_dn, + e.message['desc'])) + assert False + + log.info("Delete created entries") + for test_dn in test_list: + try: + master1.delete_s(test_dn) + except ldap.LDAPError, e: + log.error('Failed to delete entry (%s): error (%s)' % (test_dn, + e.message['desc'])) + assert False + + def fin(): + log.info("Clear the errors log in the end of the test case") + with open(master1.errlog, 'w') as errlog: + errlog.writelines("") + request.addfinalizer(fin) + + +def test_not_int_value(topology): + """Tests not integer value""" + + master1 = topology.masters[0][0] + agmt = topology.masters[0][1] + + log.info("Try to set %s: wv1" % WAITFOR_ASYNC_ATTR) + try: + mod = [(ldap.MOD_REPLACE, WAITFOR_ASYNC_ATTR, "wv1")] + master1.modify_s(agmt, mod) + except ldap.LDAPError as e: + assert e.message['desc'] == 'Invalid syntax' + + +def test_multi_value(topology): + """Tests multi value""" + + master1 = topology.masters[0][0] + agmt = topology.masters[0][1] + log.info("agmt: %s" % agmt) + + log.info("Try to set %s: 100 and 101 in the same time (multi value test)" % ( + WAITFOR_ASYNC_ATTR)) + try: + mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "100")] + master1.modify_s(agmt, mod) + mod = [(ldap.MOD_ADD, WAITFOR_ASYNC_ATTR, "101")] + master1.modify_s(agmt, mod) + except ldap.LDAPError as e: + assert e.message['desc'] == 'Object class violation' + + +def test_value_check(topology, waitfor_async_attr): + """Checks that value has been set correctly""" + + attr_value = waitfor_async_attr[0] + + for master in topology.masters: + agmt = master[1] + + log.info("Check attr %s on %s" % (WAITFOR_ASYNC_ATTR, master[0].serverid)) + try: + if attr_value: + entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=%s" % ( + WAITFOR_ASYNC_ATTR, attr_value)) + assert entry + else: + entry = master[0].search_s(agmt, ldap.SCOPE_BASE, "%s=*" % WAITFOR_ASYNC_ATTR) + assert not entry + except ldap.LDAPError as e: + log.fatal('Search failed, error: ' + e.message['desc']) + assert False + + +def test_behavior_with_value(topology, waitfor_async_attr, entries): + """Tests replication behavior with valid + nsDS5ReplicaWaitForAsyncResults attribute values + """ + + master1 = topology.masters[0][0] + sync_dict = Counter() + min_ap = waitfor_async_attr[1][0] + max_ap = waitfor_async_attr[1][1] + + log.info("Gather all sync attempts within Counter dict, group by timestamp") + with open(master1.errlog, 'r') as errlog: + errlog_filtered = filter(lambda x: "waitfor_async_results" in x, errlog) + for line in errlog_filtered: + # Watch only over unsuccessful sync attempts + if line.split()[4] != line.split()[5]: + timestamp = line.split(']')[0] + sync_dict[timestamp] += 1 + + log.info("Take the most common timestamp and assert it has appeared " \ + "in the range from %s to %s times" % (min_ap, max_ap)) + most_common_val = sync_dict.most_common(1)[0][1] + assert min_ap <= most_common_val <= max_ap + + +if __name__ == '__main__': + # Run isolated + # -s for DEBUG mode + CURRENT_FILE = os.path.realpath(__file__) + pytest.main("-s %s" % CURRENT_FILE)