| |
@@ -7,22 +7,23 @@
|
| |
# --- END COPYRIGHT BLOCK ---
|
| |
#
|
| |
import pytest
|
| |
- import six
|
| |
import ldap
|
| |
from lib389.tasks import *
|
| |
from lib389.utils import *
|
| |
from lib389.topologies import topology_st
|
| |
-
|
| |
- from lib389.plugins import SevenBitCheckPlugin, AttributeUniquenessPlugin, MemberOfPlugin
|
| |
-
|
| |
+ from lib389.plugins import (SevenBitCheckPlugin, AttributeUniquenessPlugin,
|
| |
+ MemberOfPlugin, ManagedEntriesPlugin,
|
| |
+ ReferentialIntegrityPlugin, MEPTemplates,
|
| |
+ MEPConfigs)
|
| |
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
|
| |
- from lib389.idm.group import Groups
|
| |
-
|
| |
- from lib389._constants import DEFAULT_SUFFIX, PLUGIN_7_BIT_CHECK, PLUGIN_ATTR_UNIQUENESS, PLUGIN_MEMBER_OF
|
| |
+ from lib389.idm.organizationalunit import OrganizationalUnits
|
| |
+ from lib389.idm.group import Groups, Group
|
| |
+ from lib389._constants import DEFAULT_SUFFIX
|
| |
|
| |
logging.getLogger(__name__).setLevel(logging.DEBUG)
|
| |
log = logging.getLogger(__name__)
|
| |
|
| |
+
|
| |
def test_betxt_7bit(topology_st):
|
| |
"""Test that the 7-bit plugin correctly rejects an invalid update
|
| |
|
| |
@@ -52,7 +53,6 @@
|
| |
sevenbc.enable()
|
| |
topology_st.standalone.restart()
|
| |
|
| |
-
|
| |
users = UserAccounts(topology_st.standalone, basedn=DEFAULT_SUFFIX)
|
| |
user = users.create(properties=TEST_USER_PROPERTIES)
|
| |
|
| |
@@ -69,7 +69,7 @@
|
| |
|
| |
user_check = users.get("testuser")
|
| |
|
| |
- assert user_check.dn == user.dn
|
| |
+ assert user_check.dn.lower() == user.dn.lower()
|
| |
|
| |
#
|
| |
# Cleanup - remove the user
|
| |
@@ -100,9 +100,6 @@
|
| |
5. Test user entry should be removed
|
| |
"""
|
| |
|
| |
- USER1_DN = 'uid=test_entry1,' + DEFAULT_SUFFIX
|
| |
- USER2_DN = 'uid=test_entry2,' + DEFAULT_SUFFIX
|
| |
-
|
| |
attruniq = AttributeUniquenessPlugin(topology_st.standalone)
|
| |
attruniq.enable()
|
| |
topology_st.standalone.restart()
|
| |
@@ -110,26 +107,22 @@
|
| |
users = UserAccounts(topology_st.standalone, basedn=DEFAULT_SUFFIX)
|
| |
user1 = users.create(properties={
|
| |
'uid': 'testuser1',
|
| |
- 'cn' : 'testuser1',
|
| |
- 'sn' : 'user1',
|
| |
- 'uidNumber' : '1001',
|
| |
- 'gidNumber' : '2001',
|
| |
- 'homeDirectory' : '/home/testuser1'
|
| |
+ 'cn': 'testuser1',
|
| |
+ 'sn': 'user1',
|
| |
+ 'uidNumber': '1001',
|
| |
+ 'gidNumber': '2001',
|
| |
+ 'homeDirectory': '/home/testuser1'
|
| |
})
|
| |
|
| |
- try:
|
| |
- user2 = users.create(properties={
|
| |
+ with pytest.raises(ldap.LDAPError):
|
| |
+ users.create(properties={
|
| |
'uid': ['testuser2', 'testuser1'],
|
| |
- 'cn' : 'testuser2',
|
| |
- 'sn' : 'user2',
|
| |
- 'uidNumber' : '1002',
|
| |
- 'gidNumber' : '2002',
|
| |
- 'homeDirectory' : '/home/testuser2'
|
| |
+ 'cn': 'testuser2',
|
| |
+ 'sn': 'user2',
|
| |
+ 'uidNumber': '1002',
|
| |
+ 'gidNumber': '2002',
|
| |
+ 'homeDirectory': '/home/testuser2'
|
| |
})
|
| |
- log.fatal('test_betxn_attr_uniqueness: The second entry was incorrectly added.')
|
| |
- assert False
|
| |
- except ldap.LDAPError as e:
|
| |
- log.error('test_betxn_attr_uniqueness: Failed to add test user as expected:')
|
| |
|
| |
user1.delete()
|
| |
|
| |
@@ -191,8 +184,8 @@
|
| |
log.info('test_betxn_memberof: PASSED')
|
| |
|
| |
|
| |
- def test_betxn_modrdn_memberof(topology_st):
|
| |
- """Test modrdn operartions and memberOf
|
| |
+ def test_betxn_modrdn_memberof_cache_corruption(topology_st):
|
| |
+ """Test modrdn operations and memberOf
|
| |
|
| |
:id: 70d0b96e-b693-4bf7-bbf5-102a66ac5994
|
| |
|
| |
@@ -227,18 +220,18 @@
|
| |
|
| |
# Create user and add it to group
|
| |
users = UserAccounts(topology_st.standalone, basedn=DEFAULT_SUFFIX)
|
| |
- user = users.create(properties=TEST_USER_PROPERTIES)
|
| |
+ user = users.ensure_state(properties=TEST_USER_PROPERTIES)
|
| |
if not ds_is_older('1.3.7'):
|
| |
user.remove('objectClass', 'nsMemberOf')
|
| |
|
| |
group.add_member(user.dn)
|
| |
|
| |
# Attempt modrdn that should fail, but the original entry should stay in the cache
|
| |
- with pytest.raises(ldap.OBJECTCLASS_VIOLATION):
|
| |
+ with pytest.raises(ldap.OBJECT_CLASS_VIOLATION):
|
| |
group.rename('cn=group_to_people', newsuperior=peoplebase)
|
| |
|
| |
# Should fail, but not with NO_SUCH_OBJECT as the original entry should still be in the cache
|
| |
- with pytest.raises(ldap.OBJECTCLASS_VIOLATION):
|
| |
+ with pytest.raises(ldap.OBJECT_CLASS_VIOLATION):
|
| |
group.rename('cn=group_to_people', newsuperior=peoplebase)
|
| |
|
| |
#
|
| |
@@ -247,6 +240,108 @@
|
| |
log.info('test_betxn_modrdn_memberof: PASSED')
|
| |
|
| |
|
| |
+ def test_ri_and_mep_cache_corruption(topology_st):
|
| |
+ """Test RI plugin aborts change after MEP plugin fails.
|
| |
+ This is really testing the entry cache for corruption
|
| |
+
|
| |
+ :id: 70d0b96e-b693-4bf7-bbf5-102a66ac5995
|
| |
+
|
| |
+ :setup: Standalone instance
|
| |
+
|
| |
+ :steps: 1. Enable and configure mep and ri plugins
|
| |
+ 2. Add user and add it to a group
|
| |
+ 3. Disable MEP plugin and remove MEP group
|
| |
+ 4. Delete user
|
| |
+ 5. Check that user is still a member of the group
|
| |
+
|
| |
+ :expectedresults:
|
| |
+ 1. Success
|
| |
+ 2. Success
|
| |
+ 3. Success
|
| |
+ 4. It fails with NO_SUCH_OBJECT
|
| |
+ 5. Success
|
| |
+
|
| |
+ """
|
| |
+ # Start plugins
|
| |
+ topology_st.standalone.config.set('nsslapd-dynamic-plugins', 'on')
|
| |
+ mep_plugin = ManagedEntriesPlugin(topology_st.standalone)
|
| |
+ mep_plugin.enable()
|
| |
+ ri_plugin = ReferentialIntegrityPlugin(topology_st.standalone)
|
| |
+ ri_plugin.enable()
|
| |
+
|
| |
+ # Add our org units
|
| |
+ ous = OrganizationalUnits(topology_st.standalone, DEFAULT_SUFFIX)
|
| |
+ ou_people = ous.create(properties={'ou': 'managed_people'})
|
| |
+ ou_groups = ous.create(properties={'ou': 'managed_groups'})
|
| |
+
|
| |
+ # Configure MEP
|
| |
+ mep_templates = MEPTemplates(topology_st.standalone, DEFAULT_SUFFIX)
|
| |
+ mep_template1 = mep_templates.create(properties={
|
| |
+ 'cn': 'MEP template',
|
| |
+ 'mepRDNAttr': 'cn',
|
| |
+ 'mepStaticAttr': 'objectclass: posixGroup|objectclass: extensibleObject'.split('|'),
|
| |
+ 'mepMappedAttr': 'cn: $cn|uid: $cn|gidNumber: $uidNumber'.split('|')
|
| |
+ })
|
| |
+ mep_configs = MEPConfigs(topology_st.standalone)
|
| |
+ mep_configs.create(properties={'cn': 'config',
|
| |
+ 'originScope': ou_people.dn,
|
| |
+ 'originFilter': 'objectclass=posixAccount',
|
| |
+ 'managedBase': ou_groups.dn,
|
| |
+ 'managedTemplate': mep_template1.dn})
|
| |
+
|
| |
+ # Add an entry that meets the MEP scope
|
| |
+ users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX,
|
| |
+ rdn='ou={}'.format(ou_people.rdn))
|
| |
+ user = users.create(properties={
|
| |
+ 'uid': 'test-user1',
|
| |
+ 'cn': 'test-user',
|
| |
+ 'sn': 'test-user',
|
| |
+ 'uidNumber': '10011',
|
| |
+ 'gidNumber': '20011',
|
| |
+ 'homeDirectory': '/home/test-user1'
|
| |
+ })
|
| |
+
|
| |
+ # Add group
|
| |
+ groups = Groups(topology_st.standalone, DEFAULT_SUFFIX)
|
| |
+ user_group = groups.ensure_state(properties={'cn': 'group', 'member': user.dn})
|
| |
+
|
| |
+ # Check if a managed group entry was created
|
| |
+ mep_group = Group(topology_st.standalone, dn='cn={},{}'.format(user.rdn, ou_groups.dn))
|
| |
+ if not mep_group.exists():
|
| |
+ log.fatal("MEP group was not created for the user")
|
| |
+ assert False
|
| |
+
|
| |
+ # Mess with MEP so it fails
|
| |
+ mep_plugin.disable()
|
| |
+ mep_group.delete()
|
| |
+ mep_plugin.enable()
|
| |
+
|
| |
+ # Add another group for verify entry cache is not corrupted
|
| |
+ test_group = groups.create(properties={'cn': 'test_group'})
|
| |
+
|
| |
+ # Delete user, should fail, and user should still be a member
|
| |
+ with pytest.raises(ldap.NO_SUCH_OBJECT):
|
| |
+ user.delete()
|
| |
+
|
| |
+ # Verify membership is intact
|
| |
+ if not user_group.is_member(user.dn):
|
| |
+ log.fatal("Member was incorrectly removed from the group!! Or so it seems")
|
| |
+
|
| |
+ # Restart server and test again in case this was a cache issue
|
| |
+ topology_st.standalone.restart()
|
| |
+ if user_group.is_member(user.dn):
|
| |
+ log.info("The entry cache was corrupted")
|
| |
+ assert False
|
| |
+
|
| |
+ assert False
|
| |
+
|
| |
+ # Verify test group is still found in entry cache by deleting it
|
| |
+ test_group.delete()
|
| |
+
|
| |
+ # Success
|
| |
+ log.info("Test PASSED")
|
| |
+
|
| |
+
|
| |
if __name__ == '__main__':
|
| |
# Run isolated
|
| |
# -s for DEBUG mode
|
| |
I think you should not test INVALID at the point, and just let the entry go in the LRU (lru_add)