| |
@@ -12,18 +12,22 @@
|
| |
|
| |
import os
|
| |
import pytest
|
| |
+ import time
|
| |
+ import re
|
| |
from lib389.topologies import topology_m1 as topo
|
| |
from lib389.idm.organizationalunit import OrganizationalUnits
|
| |
from lib389.idm.domain import Domain
|
| |
from lib389.idm.posixgroup import PosixGroups
|
| |
from lib389.plugins import AutoMembershipPlugin, AutoMembershipDefinitions, \
|
| |
- MemberOfPlugin, AutoMembershipRegexRules, AutoMembershipDefinition
|
| |
+ MemberOfPlugin, AutoMembershipRegexRules, AutoMembershipDefinition, RetroChangelogPlugin
|
| |
from lib389.backend import Backends
|
| |
from lib389.config import Config
|
| |
from lib389._constants import DEFAULT_SUFFIX
|
| |
from lib389.idm.user import UserAccounts
|
| |
from lib389.idm.group import Groups, Group, UniqueGroup, nsAdminGroups, nsAdminGroup
|
| |
+ from lib389.tasks import Tasks, AutomemberRebuildMembershipTask, ExportTask
|
| |
from lib389.utils import ds_is_older
|
| |
+ from lib389.paths import Paths
|
| |
import ldap
|
| |
|
| |
pytestmark = pytest.mark.tier1
|
| |
@@ -32,6 +36,7 @@
|
| |
TEST_BASE = "dc=testAutoMembers,dc=com"
|
| |
BASE_REPL = "dc=replAutoMembers,dc=com"
|
| |
SUBSUFFIX = f'dc=SubSuffix,{BASE_SUFF}'
|
| |
+ PLUGIN_AUTO = "cn=Auto Membership Plugin,cn=plugins,cn=config"
|
| |
REPMANDN = "cn=ReplManager"
|
| |
CACHE_SIZE = '-1'
|
| |
CACHEMEM_SIZE = '10485760'
|
| |
@@ -849,6 +854,332 @@
|
| |
topo.ms["master1"] .restart()
|
| |
|
| |
|
| |
+ @pytest.fixture(scope="module")
|
| |
+ def _startuptask(topo):
|
| |
+ """
|
| |
+ Fixture module that will change required entries for test cases.
|
| |
+ """
|
| |
+ for Configs in ["cn=Managers,cn=subsuffGroups",
|
| |
+ "cn=Contractors,cn=subsuffGroups",
|
| |
+ "cn=testuserGroups",
|
| |
+ "cn=subsuffGroups",
|
| |
+ "cn=hostGroups"]:
|
| |
+ AutoMembershipDefinition(topo.ms["master1"], f'{Configs},{PLUGIN_AUTO}').delete()
|
| |
+ AutoMembershipDefinition(topo.ms["master1"], "cn=userGroups,{}".format(PLUGIN_AUTO)).replace(
|
| |
+ 'autoMemberScope', 'ou=TaskEmployees,dc=autoMembers,dc=com')
|
| |
+ topo.ms['master1'].restart()
|
| |
+
|
| |
+
|
| |
+ @pytest.fixture(scope="function")
|
| |
+ def _fixture_for_build_task(request, topo):
|
| |
+ def finof():
|
| |
+ master = topo.ms['master1']
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ for user in nsAdminGroups(master, auto_mem_scope, rdn=None).list():
|
| |
+ user.delete()
|
| |
+
|
| |
+ request.addfinalizer(finof)
|
| |
+
|
| |
+
|
| |
+ def bulk_check_groups(topo, GROUP_DN, MEMBATTR, TOTAL_MEM):
|
| |
+ assert len(nsAdminGroup(topo, GROUP_DN).get_attr_vals_utf8(MEMBATTR)) == TOTAL_MEM
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_re_build_task(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff973a8-e7ff-11e8-a89b-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users and enable autoMembers plug-in
|
| |
+ 2. Run automembers re-build task to create the member attributes
|
| |
+ 3. Search for any error logs
|
| |
+ :expected results:
|
| |
+ 1. Success
|
| |
+ 2. Success
|
| |
+ 3. Success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ testid = "autoMemTask_01"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ contract_grp = "cn=Contractors,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # make sure the retro changelog is disabled
|
| |
+ RetroChangelogPlugin(master).disable()
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for i in range(10):
|
| |
+ add_user(topo, "{}{}".format(user_rdn, str(i)), auto_mem_scope, str(1188), str(1189), "Manager")
|
| |
+ for grp in (managers_grp, contract_grp):
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ assert check_groups(topo, grp, f'uid=User_autoMemTask_010,{auto_mem_scope}', 'member')
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ error_string = "automember_rebuild_task_thread"
|
| |
+ AutomemberRebuildMembershipTask(master).create(properties={
|
| |
+ 'basedn': auto_mem_scope,
|
| |
+ 'filter': "objectClass=posixAccount"
|
| |
+ })
|
| |
+ # Search for any error logs
|
| |
+ assert not master.searchErrorsLog(error_string)
|
| |
+ for grp in (managers_grp, contract_grp):
|
| |
+ bulk_check_groups(master, grp, "member", 10)
|
| |
+
|
| |
+
|
| |
+ def ldif_check_groups(USERS_DN, MEMBATTR, TOTAL_MEM, LDIF_FILE):
|
| |
+ study = open('{}'.format(LDIF_FILE), 'r')
|
| |
+ study_ready = study.read()
|
| |
+ assert len(re.findall("{}: {}".format(MEMBATTR, USERS_DN.lower()), study_ready)) == TOTAL_MEM
|
| |
+
|
| |
+
|
| |
+ def check_file_exists(export_ldif):
|
| |
+ count = 0
|
| |
+ while not os.path.exists(export_ldif) and count < 3:
|
| |
+ time.sleep(1)
|
| |
+ count += 1
|
| |
+
|
| |
+ count = 0
|
| |
+ while (os.stat(export_ldif).st_size == 0) and count < 3:
|
| |
+ time.sleep(1)
|
| |
+ count += 1
|
| |
+
|
| |
+ if os.path.exists(export_ldif) and os.stat(export_ldif).st_size != 0:
|
| |
+ return True
|
| |
+ else:
|
| |
+ return False
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_export_task(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff98b18-e7ff-11e8-872a-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users and enable autoMembers plug-in
|
| |
+ 2. Run automembers export task to create an ldif file with member attributes
|
| |
+ :expected results:
|
| |
+ 1. Success
|
| |
+ 2. Success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ p = Paths('master1')
|
| |
+ testid = "autoMemTask_02"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # Disabling plugin
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for i in range(10):
|
| |
+ add_user(topo, "{}{}".format(user_rdn, str(i)), auto_mem_scope, str(2788), str(2789), "Manager")
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ export_ldif = p.backup_dir + "/Out_Export_02.ldif"
|
| |
+ if os.path.exists(export_ldif):
|
| |
+ os.remove(export_ldif)
|
| |
+ exp_task = Tasks(master)
|
| |
+ exp_task.automemberExport(suffix=auto_mem_scope, fstr='objectclass=posixAccount', ldif_out=export_ldif)
|
| |
+ check_file_exists(export_ldif)
|
| |
+ ldif_check_groups("cn={}".format(user_rdn), "member", 10, export_ldif)
|
| |
+ os.remove(export_ldif)
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_mapping(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff9a206-e7ff-11e8-bf59-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users and enable autoMembers plug-in
|
| |
+ 2. Run automembers Mapping task with input/output ldif files
|
| |
+ :expected results:
|
| |
+ 1. Should success
|
| |
+ 2. Should success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ p = Paths('master1')
|
| |
+ testid = "autoMemTask_02"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ export_ldif = p.backup_dir+"/Out_Export_02.ldif"
|
| |
+ output_ldif3 = p.backup_dir+"/Output_03.ldif"
|
| |
+ for file in [export_ldif, output_ldif3]:
|
| |
+ if os.path.exists(file):
|
| |
+ os.remove(file)
|
| |
+ for i in range(10):
|
| |
+ add_user(topo, "{}{}".format(user_rdn, str(i)), auto_mem_scope, str(2788), str(2789), "Manager")
|
| |
+ ExportTask(master).export_suffix_to_ldif(ldiffile=export_ldif, suffix=BASE_SUFF)
|
| |
+ check_file_exists(export_ldif)
|
| |
+ map_task = Tasks(master)
|
| |
+ map_task.automemberMap(ldif_in=export_ldif, ldif_out=output_ldif3)
|
| |
+ check_file_exists(output_ldif3)
|
| |
+ ldif_check_groups("cn={}".format(user_rdn), "member", 10, output_ldif3)
|
| |
+ for file in [export_ldif, output_ldif3]:
|
| |
+ os.remove(file)
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_re_build(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff9b944-e7ff-11e8-ad35-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users with inetOrgPerson object class
|
| |
+ 2. Run automembers re-build task to create the member attributes, exp to FAIL
|
| |
+ :expected results:
|
| |
+ 1. Should success
|
| |
+ 2. Should not success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ testid = "autoMemTask_04"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # Disabling plugin
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for number in range(10):
|
| |
+ add_user(topo, f'{user_rdn}{number}', auto_mem_scope, str(number), str(number), "Manager")
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ # Enabling plugin
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ AutomemberRebuildMembershipTask(master).create(properties={
|
| |
+ 'basedn': auto_mem_scope,
|
| |
+ 'filter': "objectClass=inetOrgPerson"
|
| |
+ })
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_export(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff9cf74-e7ff-11e8-b712-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users with inetOrgPerson objectClass
|
| |
+ 2. Run automembers export task to create an ldif file with member attributes, exp to FAIL
|
| |
+ :expected results:
|
| |
+ 1. Should success
|
| |
+ 2. Should not success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ p = Paths('master1')
|
| |
+ testid = "autoMemTask_05"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # Disabling plugin
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for number in range(10):
|
| |
+ add_user(topo, f'{user_rdn}{number}', auto_mem_scope, str(number), str(number), "Manager")
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ # Enabling plugin
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ export_ldif = p.backup_dir + "/Out_Export_02.ldif"
|
| |
+ if os.path.exists(export_ldif):
|
| |
+ os.remove(export_ldif)
|
| |
+ exp_task = Tasks(master)
|
| |
+ exp_task.automemberExport(suffix=auto_mem_scope, fstr='objectclass=inetOrgPerson', ldif_out=export_ldif)
|
| |
+ check_file_exists(export_ldif)
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ ldif_check_groups("uid={}".format(user_rdn), "member", 10, export_ldif)
|
| |
+ os.remove(export_ldif)
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_run_re_build(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff9e5c2-e7ff-11e8-943e-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users with inetOrgPerson obj class
|
| |
+ 2. Change plugin config
|
| |
+ 3. Enable plug-in and run re-build task to create the member attributes
|
| |
+ :expected results:
|
| |
+ 1. Should success
|
| |
+ 2. Should success
|
| |
+ 3. Should success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ p = Paths('master1')
|
| |
+ testid = "autoMemTask_06"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # Disabling plugin
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for number in range(10):
|
| |
+ add_user(topo, f'{user_rdn}{number}', auto_mem_scope, '111', '111', "Manager")
|
| |
+ for user in nsAdminGroups(master, auto_mem_scope, rdn=None).list():
|
| |
+ user.add('objectclass', 'inetOrgPerson')
|
| |
+ AutoMembershipDefinition(master,
|
| |
+ f'cn=userGroups,{PLUGIN_AUTO}').replace('autoMemberFilter',
|
| |
+ "objectclass=inetOrgPerson")
|
| |
+ master.restart()
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ AutomemberRebuildMembershipTask(master).create(properties={
|
| |
+ 'basedn': auto_mem_scope,
|
| |
+ 'filter': "objectClass=inetOrgPerson"})
|
| |
+ time.sleep(2)
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ AutoMembershipDefinition(master,
|
| |
+ f'cn=userGroups,{PLUGIN_AUTO}').replace('autoMemberFilter',
|
| |
+ "objectclass=posixAccount")
|
| |
+ master.restart()
|
| |
+
|
| |
+
|
| |
+ def test_automemtask_run_export(topo, _create_all_entries, _startuptask, _fixture_for_build_task):
|
| |
+ """
|
| |
+ :id: 4ff9fba2-e7ff-11e8-a5ec-8c16451d917b
|
| |
+ :setup: 4 Instances with replication
|
| |
+ :steps:
|
| |
+ 1. Add 10 users with inetOrgPerson objectClass
|
| |
+ 2. change plugin config
|
| |
+ 3. Run export task to create an ldif file with member attributes
|
| |
+ :expected results:
|
| |
+ 1. Should success
|
| |
+ 2. Should success
|
| |
+ 3. Should success
|
| |
+ """
|
| |
+ master = topo.ms['master1']
|
| |
+ p = Paths('master1')
|
| |
+ testid = "autoMemTask_07"
|
| |
+ auto_mem_scope = "ou=TaskEmployees,{}".format(BASE_SUFF)
|
| |
+ managers_grp = "cn=Managers,ou=userGroups,{}".format(BASE_SUFF)
|
| |
+ user_rdn = "User_{}".format(testid)
|
| |
+ # Disabling plugin
|
| |
+ AutoMembershipPlugin(master).disable()
|
| |
+ master.restart()
|
| |
+ for number in range(10):
|
| |
+ add_user(topo, f'{user_rdn}{number}', auto_mem_scope, '222', '222', "Manager")
|
| |
+ for user in nsAdminGroups(master, auto_mem_scope, rdn=None).list():
|
| |
+ user.add('objectclass', 'inetOrgPerson')
|
| |
+ AutoMembershipDefinition(master, f'cn=userGroups,{PLUGIN_AUTO}').replace('autoMemberFilter',
|
| |
+ "objectclass=inetOrgPerson")
|
| |
+ master.restart()
|
| |
+ # Enabling plugin
|
| |
+ AutoMembershipPlugin(master).enable()
|
| |
+ master.restart()
|
| |
+ with pytest.raises(AssertionError):
|
| |
+ bulk_check_groups(master, managers_grp, "member", 10)
|
| |
+ export_ldif = p.backup_dir + "/Out_Export_02.ldif"
|
| |
+ if os.path.exists(export_ldif):
|
| |
+ os.remove(export_ldif)
|
| |
+ exp_task = Tasks(master)
|
| |
+ exp_task.automemberExport(suffix=auto_mem_scope, fstr='objectclass=inetOrgPerson', ldif_out=export_ldif)
|
| |
+ check_file_exists(export_ldif)
|
| |
+ ldif_check_groups("cn={}".format(user_rdn), "member", 10, export_ldif)
|
| |
+ AutoMembershipDefinition(master, f'cn=userGroups,{PLUGIN_AUTO}').\
|
| |
+ replace('autoMemberFilter', "objectclass=posixAccount")
|
| |
+
|
| |
+
|
| |
if __name__ == "__main__":
|
| |
CURRENT_FILE = os.path.realpath(__file__)
|
| |
- pytest.main("-s -v %s" % CURRENT_FILE)
|
| |
+ pytest.main("-s -v %s" % CURRENT_FILE)
|
| |
\ No newline at end of file
|
| |
Bug Description: CI test - automember_plugin(part3)
Relates: https://pagure.io/389-ds-base/issue/48055
Author: aborah
Reviewed by: ???