From d6616221aab8d889765db566f6b4187e02ea6c32 Mon Sep 17 00:00:00 2001 From: Simon Pichugin Date: Aug 28 2018 13:19:38 +0000 Subject: Issue 49866 - Refactor PwPolicy lib389/CLI module Description: Refactor Password Policy module and its CLI part. Add PwPolicyManager object and PwPolicyEntry(DSLdapObject). Validate LDIF and Backup dir paths. Don't accept a forward slash because it can lead to a security flow. Add an additional assertion to Backup/Restore CLI test suite. https://pagure.io/389-ds-base/issue/49866 Reviewed by: mreynolds (Thanks!) --- diff --git a/src/cockpit/389-console/js/backend.js b/src/cockpit/389-console/js/backend.js index 54aa64f..e1a006e 100644 --- a/src/cockpit/389-console/js/backend.js +++ b/src/cockpit/389-console/js/backend.js @@ -775,8 +775,8 @@ $(document).ready( function() { } else if (ldif_file_import.indexOf(' ') >= 0) { popup_msg("Error", "LDIF file can not contain any spaces"); return; - } else if (ldif_file_import.startsWith('/')) { - popup_msg("Error", "LDIF file can not start with a forward slash. " + + } else if (ldif_file_import.indexOf('/') === -1 ) { + popup_msg("Error", "LDIF file can not contain a forward slash. " + "LDIF files are written to the server's LDIF directory (nsslapd-ldifdir)"); return; } else { @@ -818,8 +818,8 @@ $(document).ready( function() { if (ldif_file_export.indexOf(' ') >= 0) { popup_msg("Error", "LDIF file can not contain any spaces"); return; - } else if (ldif_file_export.startsWith('/')) { - popup_msg("Error", "LDIF file can not start with a forward slash. " + + } else if (ldif_file_import.indexOf('/') === -1 ) { + popup_msg("Error", "LDIF file can not contain a forward slash. " + "LDIF files are written to the server's LDIF directory (nsslapd-ldifdir)"); return; } else if (ldif_file_export != ""){ diff --git a/src/cockpit/389-console/js/servers.js b/src/cockpit/389-console/js/servers.js index 0323d5c..bb29b90 100644 --- a/src/cockpit/389-console/js/servers.js +++ b/src/cockpit/389-console/js/servers.js @@ -1217,8 +1217,8 @@ $(document).ready( function() { popup_msg("Error", "Backup name can not contain any spaces"); return; } - if (backup_name.startsWith('/')) { - popup_msg("Error", "Backup name can not start with a forward slash. " + + if (backup_name.indexOf('/') === -1) { + popup_msg("Error", "Backup name can not contain a forward slash. " + "Backups are written to the server's backup directory (nsslapd-bakdir)"); return; } diff --git a/src/lib389/lib389/__init__.py b/src/lib389/lib389/__init__.py index b603158..47d6a2e 100644 --- a/src/lib389/lib389/__init__.py +++ b/src/lib389/lib389/__init__.py @@ -313,7 +313,7 @@ class DirSrv(SimpleLDAPObject, object): from lib389.monitor import Monitor, MonitorLDBM from lib389.rootdse import RootDSE from lib389.saslmap import SaslMapping, SaslMappings - from lib389.pwpolicy import Pwpolicy + from lib389.pwpolicy import PwPolicyManager # Need updating self.agreement = Agreement(self) @@ -327,7 +327,7 @@ class DirSrv(SimpleLDAPObject, object): self.plugins = Plugins(self) self.tasks = Tasks(self) self.saslmap = SaslMapping(self) - self.pwpolicy = Pwpolicy(self) + self.pwpolicy = PwPolicyManager(self) # Do we have a certdb path? # if MAJOR < 3: self.monitor = Monitor(self) @@ -2846,11 +2846,12 @@ class DirSrv(SimpleLDAPObject, object): else: # No output file specified. Use the default ldif location/name cmd.append('-a') + tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") if bename: - ldifname = "/" + self.serverid + "-" + bename + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ldifname = os.path.join(self._instance.ds_paths.ldif_dir, "%s-%s-%s.ldif" % (self._instance.serverid, be_name, tnow)) else: - ldifname = "/" + self.serverid + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S") - cmd.append(self.get_ldif_dir() + ldifname) + ldifname = os.path.join(self._instance.ds_paths.ldif_dir, "%s-%s.ldif" % (self._instance.serverid, tnow)) + cmd.append(ldifname) try: result = subprocess.check_output(cmd, encoding='utf-8') except subprocess.CalledProcessError as e: @@ -2916,7 +2917,8 @@ class DirSrv(SimpleLDAPObject, object): if archive_dir is None: # Use the instance name and date/time as the default backup name - archive_dir = self.get_bak_dir() + "/" + self.serverid + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + archive_dir = os.path.join(self.ds_paths.backup_dir, "%s-%s" % (self.serverid, tnow)) elif not archive_dir.startswith("/"): # Relative path, append it to the bak directory archive_dir = os.path.join(self.ds_paths.backup_dir, archive_dir) @@ -3418,8 +3420,11 @@ class DirSrv(SimpleLDAPObject, object): if archive is None: # Use the instance name and date/time as the default backup name tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") - archive = os.path.join(self.ds_paths.backup_dir, - "%s_%s" % (self.serverid, tnow)) + if self.serverid is not None: + backup_dir_name = "%s-%s" % (self.serverid, tnow) + else: + backup_dir_name = "backup-%s" % tnow + archive = os.path.join(self.ds_paths.backup_dir, backup_dir_name) elif archive[0] != "/": # Relative path, append it to the bak directory archive = os.path.join(self.ds_paths.backup_dir, archive) diff --git a/src/lib389/lib389/backend.py b/src/lib389/lib389/backend.py index ff4c22c..1fd57ba 100644 --- a/src/lib389/lib389/backend.py +++ b/src/lib389/lib389/backend.py @@ -610,8 +610,11 @@ class Backends(DSLdapObjects): ldif_paths = [] for ldif in list(ldifs): if not ldif.startswith("/"): - ldif = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif) - ldif_paths.append(ldif) + if ldif.endswith(".ldif"): + ldif = os.path.join(self._instance.ds_paths.ldif_dir, ldif) + else: + ldif = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif) + ldif_paths.append(ldif) task = ImportTask(self._instance) task_properties = {'nsInstance': be_name, @@ -643,12 +646,15 @@ class Backends(DSLdapObjects): task = ExportTask(self._instance) task_properties = {'nsInstance': be_names} if ldif is not None and not ldif.startswith("/"): - task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif) + if ldif.endswith(".ldif"): + task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, ldif) + else: + task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif) else: tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, - "%s_%s_%s.ldif" % (self._instance.serverid, - "_".join(be_names), tnow)) + "%s-%s-%s.ldif" % (self._instance.serverid, + "-".join(be_names), tnow)) if include_suffixes is not None: task_properties['nsIncludeSuffix'] = include_suffixes if exclude_suffixes is not None: diff --git a/src/lib389/lib389/cli_conf/pwpolicy.py b/src/lib389/lib389/cli_conf/pwpolicy.py index dc72965..2ec7c98 100644 --- a/src/lib389/lib389/cli_conf/pwpolicy.py +++ b/src/lib389/lib389/cli_conf/pwpolicy.py @@ -6,7 +6,11 @@ # See LICENSE for details. # --- END COPYRIGHT BLOCK --- +import json import ldap +from lib389.utils import ensure_str, ensure_list_str +from lib389.pwpolicy import PwPolicyEntries, PwPolicyManager +from lib389.idm.account import Account arg_to_attr = { 'pwdlocal': 'nsslapd-pwpolicy-local', @@ -51,51 +55,170 @@ arg_to_attr = { 'pwdallowhash': 'nsslapd-allow-hashed-passwords' } +def _args_to_attrs(args): + attrs = {} + for arg in vars(args): + val = getattr(args, arg) + if arg in arg_to_attr and val is not None: + attrs[arg_to_attr[arg]] = val + return attrs + + +def _get_policy_type(inst, dn=None): + pwp_manager = PwPolicyManager(inst) + if dn is None: + return "Global Password Policy" + elif pwp_manager.is_user_policy(dn): + return "User Policy" + elif pwp_manager.is_subtree_policy(dn): + return "Subtree Policy" + else: + raise ValueError("The policy wasn't set up for the target dn entry or it is invalid") + + +def _get_pw_policy(inst, targetdn, log, use_json=None): + pwp_manager = PwPolicyManager(inst) + policy_type = _get_policy_type(inst, targetdn) + + if "global" in policy_type.lower(): + targetdn = 'cn=config' + pwp_manager.pwp_attributes.extend(['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local']) + else: + targetdn = pwp_manager.get_pwpolicy_entry(targetdn).dn + + entries = inst.search_s(targetdn, ldap.SCOPE_BASE, 'objectclass=*', pwp_manager.pwp_attributes) + entry = entries[0] + + if use_json: + str_attrs = {} + for k in entry.data: + str_attrs[ensure_str(k)] = ensure_list_str(entry.data[k]) + + # ensure all the keys are lowercase + str_attrs = dict((k.lower(), v) for k, v in str_attrs.items()) + + print(json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": str_attrs})) + else: + if "global" in policy_type.lower(): + response = "Global Password Policy: cn=config\n------------------------------------\n" + else: + response = "Local {} Policy: {}\n------------------------------------\n".format(policy_type, targetdn) + for k in entry.data: + response += "{}: {}\n".format(k, ensure_str(entry.data[k][0])) + log.info(response) + def list_policies(inst, basedn, log, args): - print(inst.pwpolicy.list_policies(args.DN[0], use_json=args.json)) + log = log.getChild('list_policies') + targetdn = args.DN[0] + pwp_manager = PwPolicyManager(inst) + + if args.json: + result = {'type': 'list', 'items': []} + else: + result = "" + + # Verify target dn exists before getting started + user_entry = Account(inst, args.DN[0]) + if not user_entry.exists(): + raise ValueError('The target entry dn does not exist') + + # User pwpolicy entry is under the container that is under the parent, + # so we need to go one level up + if pwp_manager.is_user_policy(targetdn): + policy_type = _get_policy_type(inst, user_entry.dn) + if args.json: + result['items'].append([user_entry.dn, policy_type]) + else: + result += "%s (%s)\n" % (user_entry.dn, policy_type.lower()) + else: + pwp_entries = PwPolicyEntries(inst, targetdn) + for pwp_entry in pwp_entries.list(): + cn = pwp_entry.get_attr_val_utf8_l('cn') + if pwp_entry.is_subtree_policy(): + entrydn = cn.replace('cn=nspwpolicyentry_subtree,', '') + else: + entrydn = cn.replace('cn=nspwpolicyentry_user,', '') + policy_type = _get_policy_type(inst, entrydn) + + if args.json: + result['items'].append([entrydn, policy_type]) + else: + result += "%s (%s)\n" % (entrydn, policy_type.lower()) + + if args.json: + return print(json.dumps(result)) + else: + log.info(result) def get_local_policy(inst, basedn, log, args): - print(inst.pwpolicy.get_pwpolicy(targetdn=args.DN[0], use_json=args.json)) + log = log.getChild('get_local_policy') + _get_pw_policy(inst, args.DN[0], log, args.json) def get_global_policy(inst, basedn, log, args): - print(inst.pwpolicy.get_pwpolicy(targetdn=None, use_json=args.json)) + log = log.getChild('get_global_policy') + _get_pw_policy(inst, None, log, args.json) def create_subtree_policy(inst, basedn, log, args): - try: - inst.pwpolicy.create_subtree_policy(args.DN[0], args, arg_to_attr) - print('Successfully created subtree password policy') - except ldap.ALREADY_EXISTS: - raise ValueError('There is already a subtree password policy created for this entry') + log = log.getChild('create_subtree_policy') + # Gather the attributes + attrs = _args_to_attrs(args) + pwp_manager = PwPolicyManager(inst) + pwp_manager.create_subtree_policy(args.DN[0], attrs) + + log.info('Successfully created subtree password policy') def create_user_policy(inst, basedn, log, args): - try: - inst.pwpolicy.create_user_policy(args.DN[0], args, arg_to_attr) - print('Successfully created user password policy') - except ldap.ALREADY_EXISTS: - raise ValueError('There is already a user password policy created for this entry') + log = log.getChild('create_user_policy') + # Gather the attributes + attrs = _args_to_attrs(args) + pwp_manager = PwPolicyManager(inst) + pwp_manager.create_user_policy(args.DN[0], attrs) + + log.info('Successfully created user password policy') def set_global_policy(inst, basedn, log, args): - for arg in vars(args): - val = getattr(args, arg) - if arg in arg_to_attr and val is not None: - inst.config.set(arg_to_attr[arg], val) - print("Successfully updated global policy") + log = log.getChild('set_global_policy') + # Gather the attributes + attrs = _args_to_attrs(args) + pwp_manager = PwPolicyManager(inst) + pwp_manager.set_global_policy(attrs) + + log.info('Successfully updated global password policy') def set_local_policy(inst, basedn, log, args): - inst.pwpolicy.set_policy(args.DN[0], args, arg_to_attr) - print("Successfully updated local policy") + log = log.getChild('set_local_policy') + targetdn = args.DN[0] + # Gather the attributes + attrs = _args_to_attrs(args) + pwp_manager = PwPolicyManager(inst) + pwp_entry = pwp_manager.get_pwpolicy_entry(args.DN[0]) + policy_type = _get_policy_type(inst, targetdn) + + modlist = [] + for attr, value in attrs.items(): + modlist.append((attr, value)) + if len(modlist) > 0: + pwp_entry.replace_many(*modlist) + else: + raise ValueError("There are no password policies to set") + + log.info('Successfully updated %s' % policy_type.lower()) def del_local_policy(inst, basedn, log, args): - inst.pwpolicy.delete_local_policy(args.DN[0]) - print("Successfully removed local password policy") + log = log.getChild('del_local_policy') + targetdn = args.DN[0] + policy_type = _get_policy_type(inst, targetdn) + pwp_manager = PwPolicyManager(inst) + pwp_manager.delete_local_policy(targetdn) + log.info('Successfully deleted %s' % policy_type.lower()) def create_parser(subparsers): diff --git a/src/lib389/lib389/cos.py b/src/lib389/lib389/cos.py index 783e5e5..0e15941 100644 --- a/src/lib389/lib389/cos.py +++ b/src/lib389/lib389/cos.py @@ -14,15 +14,17 @@ from lib389._mapped_object import DSLdapObject, DSLdapObjects from lib389.utils import ensure_str + class CosTemplate(DSLdapObject): - def __init__(self, instance, dn=None): - """A Cos Template defining the values to override on a target. + """A Cos Template defining the values to override on a target. + + :param instance: DirSrv instance + :type instance: DirSrv + :param dn: The dn of the template + :type dn: str + """ - :param instance: DirSrv instance - :type instance: DirSrv - :param dn: The dn of the template - :type dn: str - """ + def __init__(self, instance, dn=None): super(CosTemplate, self).__init__(instance, dn) self._rdn_attribute = 'cn' self._must_attributes = ['cn'] @@ -35,18 +37,20 @@ class CosTemplate(DSLdapObject): ] self._protected = False + class CosTemplates(DSLdapObjects): + """The set of costemplates that exist for direct and indirect + implementations. + + :param instance: A dirsrv instance + :type instance: DirSrv + :param basedn: The basedn of the templates + :type basedn: str + :param rdn: The rdn of the templates + :type rdn: str + """ + def __init__(self, instance, basedn, rdn=None): - """The set of costemplates that exist for direct and indirect - implementations. - - :param instance: A dirsrv instance - :type instance: DirSrv - :param basedn: The basedn of the templates - :type basedn: str - :param rdn: The rdn of the templates - :type rdn: str - """ super(CosTemplates, self).__init__(instance) self._objectclasses = [ 'cosTemplate' @@ -59,18 +63,19 @@ class CosTemplates(DSLdapObjects): class CosIndirectDefinition(DSLdapObject): + """A Cos Indirect Definition associating an attr:value pair as a link + attr to a template type. + + :param instance: DirSrv instance + :type instance: DirSrv + :param dn: The dn of the template + :type dn: str + """ + def __init__(self, instance, dn=None): - """A Cos Indirect Definition associating an attr:value pair as a link - attr to a template type. - - :param instance: DirSrv instance - :type instance: DirSrv - :param dn: The dn of the template - :type dn: str - """ super(CosIndirectDefinition, self).__init__(instance, dn) self._rdn_attribute = 'cn' - self._must_attributes = ['cn', 'cosIndirectSpecifier', 'cosAttribute'] + self._must_attributes = ['cn', 'cosIndirectSpecifier', 'cosattribute'] self._create_objectclasses = [ 'top', 'cosSuperDefinition', @@ -78,17 +83,19 @@ class CosIndirectDefinition(DSLdapObject): ] self._protected = False + class CosIndirectDefinitions(DSLdapObjects): + """The set of cos indirect definitions that exist. + + :param instance: A dirsrv instance + :type instance: DirSrv + :param basedn: The basedn of the templates + :type basedn: str + :param rdn: The rdn of the templates + :type rdn: str + """ + def __init__(self, instance, basedn, rdn=None): - """The set of cos indirect definitions that exist. - - :param instance: A dirsrv instance - :type instance: DirSrv - :param basedn: The basedn of the templates - :type basedn: str - :param rdn: The rdn of the templates - :type rdn: str - """ super(CosIndirectDefinitions, self).__init__(instance) self._objectclasses = [ 'cosSuperDefinition', @@ -102,38 +109,43 @@ class CosIndirectDefinitions(DSLdapObjects): class CosPointerDefinition(DSLdapObject): + """A Cos Pointer Definition associating a dn syntax type as a link + attr to a template type. + + :param instance: DirSrv instance + :type instance: DirSrv + :param dn: The dn of the template + :type dn: str + """ + def __init__(self, instance, dn=None): - """A Cos Pointer Definition associating a dn syntax type as a link - attr to a template type. - - :param instance: DirSrv instance - :type instance: DirSrv - :param dn: The dn of the template - :type dn: str - """ super(CosPointerDefinition, self).__init__(instance, dn) self._rdn_attribute = 'cn' self._must_attributes = ['cn', 'cosTemplateDn', 'cosAttribute'] self._create_objectclasses = [ 'top', + 'ldapsubentry', 'cosSuperDefinition', 'cosPointerDefinition', ] self._protected = False + class CosPointerDefinitions(DSLdapObjects): + """The set of cos pointer definitions that exist. + + :param instance: A dirsrv instance + :type instance: DirSrv + :param basedn: The basedn of the templates + :type basedn: str + :param rdn: The rdn of the templates + :type rdn: str + """ + def __init__(self, instance, basedn, rdn=None): - """The set of cos pointer definitions that exist. - - :param instance: A dirsrv instance - :type instance: DirSrv - :param basedn: The basedn of the templates - :type basedn: str - :param rdn: The rdn of the templates - :type rdn: str - """ super(CosPointerDefinitions, self).__init__(instance) self._objectclasses = [ + 'ldapsubentry', 'cosSuperDefinition', 'cosPointerDefinition', ] diff --git a/src/lib389/lib389/pwpolicy.py b/src/lib389/lib389/pwpolicy.py index cf3fc59..dd4669b 100644 --- a/src/lib389/lib389/pwpolicy.py +++ b/src/lib389/lib389/pwpolicy.py @@ -9,28 +9,27 @@ import ldap import json from ldap import modlist +from lib389._mapped_object import DSLdapObject, DSLdapObjects +from lib389.config import Config +from lib389.idm.account import Account, Accounts +from lib389.idm.nscontainer import nsContainers, nsContainer +from lib389.cos import CosPointerDefinitions, CosPointerDefinition, CosTemplates from lib389.utils import ensure_str, ensure_list_str, ensure_bytes USER_POLICY = 1 SUBTREE_POLICY = 2 -class Pwpolicy(object): - """A local password policy, either user or subtree +class PwPolicyManager(object): + """Manages user, subtree and global password policies :param instance: An instance :type instance: lib389.DirSrv - :param dn: Entry DN - :type dn: str - - - cn=nsPwPolicyEntry,DN_OF_ENTRY,cn=nsPwPolicyContainer,SUFFIX - """ - def __init__(self, conn): - self.conn = conn - self.log = conn.log + def __init__(self, instance): + self._instance = instance + self.log = instance.log self.pwp_attributes = [ 'passwordstoragescheme', 'passwordChange', @@ -74,274 +73,268 @@ class Pwpolicy(object): 'nsslapd-allow-hashed-passwords' ] - def create_pwp_container(self, basedn): - attrs = {'objectclass': [b'top', b'nsContainer'], - 'cn': [b'nsPwPolicyContainer']} - ldif = modlist.addModlist(attrs) - try: - self.conn.add_ext_s(basedn, ldif) - except ldap.ALREADY_EXISTS: - # Already exists, no problem - pass + def is_user_policy(self, dn): + """Check if the entry has a user password policy + + :param dn: Entry DN with PwPolicy set up + :type dn: str - def create_user_policy(self, targetdn, args, arg_to_attr): - """Create a local user password policy entry + :returns: True if the entry has a user policy, False otherwise """ - # Verify target dn exists before getting started + # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too + entry = Account(self._instance, dn) try: - self.conn.search_s(targetdn, ldap.SCOPE_BASE, "objectclass=*") + if entry.present("objectclass", "costemplate"): + # It is a CoSTemplate entry, not user policy + return False + + # Check if it's a subtree or a user policy + if entry.present("pwdpolicysubentry"): + return True + else: + return False except ldap.NO_SUCH_OBJECT: + return False + + def is_subtree_policy(self, dn): + """Check if the entry has a subtree password policy + + :param dn: Entry DN with PwPolicy set up + :type dn: str + + :returns: True if the entry has a subtree policy, False otherwise + """ + + # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too + cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn) + if cos_pointer_def.exists(): + return True + else: + return False + + def create_user_policy(self, dn, properties): + """Creates all entries which are needed for the user + password policy + + :param dn: Entry DN for the subtree pwpolicy + :type dn: str + :param properties: A dict with password policy settings + :type properties: dict + + :returns: PwPolicyEntry instance + """ + + # Verify target dn exists before getting started + user_entry = Account(self._instance, dn) + if not user_entry.exists(): raise ValueError('Can not create user password policy because the target dn does not exist') - rdns = ldap.dn.explode_dn(targetdn) + rdns = ldap.dn.explode_dn(user_entry.dn) rdns.pop(0) parentdn = ",".join(rdns) # Create the pwp container if needed - self.create_pwp_container("cn=nsPwPolicyContainer,{}".format(parentdn)) - - # Gather the attributes and create policy entry - attrs = {} - for arg in vars(args): - val = getattr(args, arg) - if arg in arg_to_attr and val is not None: - attrs[arg_to_attr[arg]] = ensure_bytes(val) - attrs['objectclass'] = [b'top', b'ldapsubentry', b'passwordpolicy'] - ldif = modlist.addModlist(attrs) - user_dn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, parentdn) - self.conn.add_ext_s(user_dn, ldif) - - # Add policy to entry - self.conn.modify_s(targetdn, [(ldap.MOD_REPLACE, 'pwdpolicysubentry', ensure_bytes(user_dn))]) - - def create_subtree_policy(self, targetdn, args, arg_to_attr): - """Create a local subtree password policy entry - requires COS entry + pwp_containers = nsContainers(self._instance, basedn=parentdn) + pwp_container = pwp_containers.ensure_state(properties={'cn': 'nsPwPolicyContainer'}) + + # Create policy entry + properties['cn'] = 'cn=nsPwPolicyEntry_user,%s' % dn + pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn) + pwp_entry = pwp_entries.create(properties=properties) + + # Add policy to the entry + user_entry.replace('pwdpolicysubentry', pwp_entry.dn) + + return pwp_entry + + def create_subtree_policy(self, dn, properties): + """Creates all entries which are needed for the subtree + password policy + + :param dn: Entry DN for the subtree pwpolicy + :type dn: str + :param properties: A dict with password policy settings + :type properties: dict + + :returns: PwPolicyEntry instance """ # Verify target dn exists before getting started - try: - self.conn.search_s(targetdn, ldap.SCOPE_BASE, "objectclass=*") - except ldap.NO_SUCH_OBJECT: - raise ValueError('Can not create user password policy because the target dn does not exist') + subtree_entry = Account(self._instance, dn) + if not subtree_entry.exists(): + raise ValueError('Can not create subtree password policy because the target dn does not exist') # Create the pwp container if needed - container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn) - self.create_pwp_container(container_dn) - - # Create COS entries - cos_template_entry = 'cn=nsPwTemplateEntry,{}'.format(targetdn) - cos_template_dn = 'cn="cn=nsPwTemplateEntry,{}",{}'.format(targetdn, container_dn) - cos_subentry_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn) - cos_template_attrs = {'cosPriority': b'1', 'pwdpolicysubentry': ensure_bytes(cos_subentry_dn), - 'cn': ensure_bytes(cos_template_entry)} - - cos_template_attrs['objectclass'] = [b'top', b'ldapsubentry', b'extensibleObject', b'costemplate'] - ldif = modlist.addModlist(cos_template_attrs) - self.conn.add_ext_s(cos_template_dn, ldif) - - cos_def_attrs = {'objectclass': [b'top', b'ldapsubentry', b'extensibleObject', - b'cosSuperDefinition', b'cosPointerDefinition'], - 'cosAttribute': b'pwdpolicysubentry default operational', - 'cosTemplateDn': ensure_bytes(cos_template_dn), - 'cn': b'nsPwPolicy_CoS'} - ldif = modlist.addModlist(cos_def_attrs) - self.conn.add_ext_s("cn=nsPwPolicy_CoS,{}".format(targetdn), ldif) - - # Gather the attributes and create policy sub entry - attrs = {} - for arg in vars(args): - val = getattr(args, arg) - if arg in arg_to_attr and val is not None: - attrs[arg_to_attr[arg]] = ensure_bytes(val) - attrs['objectclass'] = [b'top', b'ldapsubentry', b'passwordpolicy'] - ldif = modlist.addModlist(attrs) - try: - self.conn.add_ext_s(cos_subentry_dn, ldif) - except ldap.ALREADY_EXISTS: - # Already exists, no problem - pass + pwp_containers = nsContainers(self._instance, basedn=dn) + pwp_container = pwp_containers.ensure_state(properties={'cn': 'nsPwPolicyContainer'}) - # Add policy to entry - self.conn.modify_s(targetdn, [(ldap.MOD_REPLACE, 'pwdpolicysubentry', ensure_bytes(cos_subentry_dn))]) + # Create policy entry + properties['cn'] = 'cn=nsPwPolicyEntry_subtree,%s' % dn + pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn) + pwp_entry = pwp_entries.create(properties=properties) - def delete_local_policy(self, targetdn): - container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn) + # The CoS template entry (nsPwTemplateEntry) + # that has the pwdpolicysubentry value pointing to the above (nsPwPolicyEntry) entry + cos_templates = CosTemplates(self._instance, pwp_container.dn) + cos_template = cos_templates.create(properties={'cosPriority': '1', + 'pwpolicysubentry': pwp_entry.dn, + 'cn': 'cn=nsPwTemplateEntry,%s' % dn}) - # First check that the entry exists - try: - entries = self.conn.search_s(targetdn, ldap.SCOPE_BASE, 'objectclass=top', ['pwdpolicysubentry']) - target_entry = entries[0] - except ldap.NO_SUCH_OBJECT: - raise ValueError('The entry does not exist, nothing to remove') + # The CoS specification entry at the subtree level + cos_pointer_defs = CosPointerDefinitions(self._instance, dn) + cos_pointer_defs.create(properties={'cosAttribute': 'pwdpolicysubentry default operational', + 'cosTemplateDn': cos_template.dn, + 'cn': 'nsPwPolicy_CoS'}) - # Subtree or local policy? - try: - cos_def_dn = 'cn=nsPwPolicy_CoS,{}'.format(targetdn) - self.conn.search_s(cos_def_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))") - found_subtree = True - except: - found_subtree = False - - # If subtree policy then remove COS template and definition - if found_subtree: - container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn) - cos_template_dn = 'cn="cn=nsPwTemplateEntry,{}",{}'.format(targetdn, container_dn) - policy_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn) - self.conn.delete_s(cos_template_dn) - self.conn.delete_s(policy_dn) - self.conn.delete_s(cos_def_dn) + return pwp_entry + + def get_pwpolicy_entry(self, dn): + """Get a local password policy entry + + :param dn: Entry DN for the local pwpolicy + :type dn: str + + :returns: PwPolicyEntry instance + """ + + # Verify target dn exists before getting started + entry = Account(self._instance, dn) + if not entry.exists(): + raise ValueError('Can not get the password policy entry because the target dn does not exist') + + # Check if it's a subtree or a user policy + if self.is_user_policy(entry.dn): + pwp_entry_dn = entry.get_attr_val_utf8("pwdpolicysubentry") + elif self.is_subtree_policy(entry.dn): + pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % dn) + + pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn) + pwp_entry_dn = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn).dn else: - # Remove password subentry from target DN, then remove the policy entry itself - rdns = ldap.dn.explode_dn(targetdn) + raise ValueError("The policy wasn't set up for the target dn entry or it is invalid") + + return PwPolicyEntry(self._instance, pwp_entry_dn) + + def delete_local_policy(self, dn): + """Delete a local password policy entry + + :param dn: Entry DN for the local pwpolicy + :type dn: str + """ + + subtree = False + # Verify target dn exists before getting started + entry = Account(self._instance, dn) + if not entry.exists(): + raise ValueError('The target entry dn does not exist') + + if self.is_subtree_policy(entry.dn): + parentdn = dn + subtree = True + elif self.is_user_policy(entry.dn): + rdns = ldap.dn.explode_dn(entry.dn) rdns.pop(0) parentdn = ",".join(rdns) - container_dn = "cn=nsPwPolicyContainer,{}".format(parentdn) - policy_dn = target_entry.getValue('pwdpolicysubentry') - if policy_dn is None or policy_dn == "": - raise ValueError('There is no local password policy for this entry') - self.conn.delete_s(ensure_str(policy_dn)) - - # Remove the passwordPolicySubentry from the target - self.conn.modify_s(targetdn, [(ldap.MOD_DELETE, 'pwdpolicysubentry', None)]) - - # if there are no other entries under the container, then remove the container - entries = self.conn.search_s(container_dn, ldap.SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=*))") - if len(entries) == 1: - self.conn.delete_s(container_dn) - - def get_pwpolicy(self, targetdn=None, use_json=False): - """Get the local or global password policy entry""" - global_policy = False - policy_type = "global" - if targetdn is not None: - # Local policy listed by name - entrydn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, targetdn) - pwp_attributes = self.pwp_attributes else: - # Global policy - global_policy = True - entrydn = "cn=config" - pwp_attributes = self.pwp_attributes - pwp_attributes += ['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local'] + raise ValueError("The policy wasn't set up for the target dn entry or the policy is invalid") - try: - entries = self.conn.search_s(entrydn, - ldap.SCOPE_BASE, - 'objectclass=*', - pwp_attributes) - entry = entries[0] - except ldap.NO_SUCH_OBJECT: - # okay lets see if its auser policy - rdns = ldap.dn.explode_dn(targetdn) - rdns.pop(0) - parentdn = (",").join(rdns) - entrydn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, parentdn) - try: - entries = self.conn.search_s(entrydn, - ldap.SCOPE_BASE, - 'objectclass=*', - pwp_attributes) - entry = entries[0] - except ldap.LDAPError as e: - raise ValueError("Could not find password policy for entry: {} Error: {}".format(targetdn, str(e))) - except ldap.LDAPError as e: - raise ValueError("Could not find password policy for entry: {} Error: {}".format(targetdn, str(e))) - - if not global_policy: - # subtree or user policy? - cos_dn = 'cn=nspwpolicy_cos,' + targetdn - try: - self.conn.search_s(cos_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))") - policy_type = "Subtree" - except: - policy_type = "User" - - if use_json: - str_attrs = {} - for k in entry.data: - str_attrs[ensure_str(k)] = ensure_list_str(entry.data[k]) - - # ensure all the keys are lowercase - str_attrs = dict((k.lower(), v) for k, v in str_attrs.items()) - - response = json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": str_attrs}) + pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % parentdn) + + pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn) + if subtree: + pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn) else: - if global_policy: - response = "Global Password Policy: cn=config\n------------------------------------\n" - else: - response = "Local {} Policy: {}\n------------------------------------\n".format(policy_type, targetdn) - for k in entry.data: - response += "{}: {}\n".format(k, ensure_str(entry.data[k][0])) + pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_user,%s' % dn) + + if self.is_subtree_policy(entry.dn): + cos_templates = CosTemplates(self._instance, pwp_container.dn) + cos_template = cos_templates.get('cn=nsPwTemplateEntry,%s' % dn) + cos_template.delete() + + cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn) + cos_pointer_def.delete() + else: + entry.remove("pwdpolicysubentry", pwp_entry.dn) + + pwp_entry.delete() + try: + pwp_container.delete() + except ldap.NOT_ALLOWED_ON_NONLEAF: + pass - return response + def set_global_policy(self, properties): + """Configure global password policy - def list_policies(self, targetdn, use_json=False): - """Return a list of the target DN's of all user policies + :param properties: A dictionary with password policy attributes + :type properties: dict """ - if use_json: - result = {'type': 'list', 'items': []} - else: - result = "" + modlist = [] + for attr, value in properties.items(): + modlist.append((attr, value)) - # First get all the policies - policies = self.conn.search_s(targetdn, ldap.SCOPE_SUBTREE, "(&(objectclass=ldapsubentry)(objectclass=passwordpolicy))", ['cn']) - if policies is None or len(policies) == 0: - if use_json: - return json.dumps(result) - else: - return "No local password polices found" - - # Determine if the policy is subtree or user (subtree policies have COS entries) - for policy in policies: - cn = ensure_str(policy.getValue('cn')) - entrydn = cn.lower().replace('cn=nspwpolicyentry,', '') # .lstrip() - cos_dn = cn.lower().replace('cn=nspwpolicyentry', 'cn=nspwpolicy_cos') - try: - self.conn.search_s(cos_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))") - found_subtree = True - except: - found_subtree = False - - if found_subtree: - # Build subtree policy list - if use_json: - result['items'].append([entrydn, "Subtree Policy"]) - else: - result += entrydn + " (subtree policy)\n" - else: - # Build user policy list - if use_json: - result['items'].append([entrydn, "User Policy"]) - else: - result += entrydn + " (user policy)\n" - - if use_json: - return json.dumps(result) + if len(modlist) > 0: + config = Config(self._instance) + config.replace_many(*modlist) else: - return result + raise ValueError("There are no password policies to set") + + +class PwPolicyEntry(DSLdapObject): + """A single instance of a task entry + + :param instance: An instance + :type instance: lib389.DirSrv + :param dn: Entry DN + :type dn: str + """ + + def __init__(self, instance, dn): + super(PwPolicyEntry, self).__init__(instance, dn) + self._rdn_attribute = 'cn' + self._must_attributes = ['cn'] + self._create_objectclasses = ['top', 'ldapsubentry', 'passwordpolicy'] + self._protected = False + + def is_user_policy(self): + """Check if the policy entry is a user password policy""" + + pwp_manager = PwPolicyManager(self._instance) + cn = self.get_attr_val_utf8_l('cn') + entrydn = cn.replace('cn=nspwpolicyentry_user,', '') + + return pwp_manager.is_user_policy(entrydn) + + def is_subtree_policy(self): + """Check if the policy entry is a user password policy""" + + pwp_manager = PwPolicyManager(self._instance) + cn = self.get_attr_val_utf8_l('cn') + entrydn = cn.replace('cn=nspwpolicyentry_subtree,', '') + + return pwp_manager.is_subtree_policy(entrydn) + + +class PwPolicyEntries(DSLdapObjects): + """DSLdapObjects that represents all password policy entries in container. + + :param instance: An instance + :type instance: lib389.DirSrv + :param basedn: Suffix DN + :type basedn: str + :param rdn: The DN that will be combined wit basedn + :type rdn: str + """ + + def __init__(self, instance, basedn): + super(PwPolicyEntries, self).__init__(instance) + self._objectclasses = [ + 'top', + 'ldapsubentry', + 'passwordpolicy' + ] + self._filterattrs = ['cn'] + self._childobject = PwPolicyEntry + self._basedn = basedn - def set_policy(self, targetdn, args, arg_to_attr): - '''This could be a user or subtree policy, so find out which one and - use the correct container dn''' - try: - cos_def_dn = 'cn=nsPwPolicy_CoS,{}'.format(targetdn) - self.conn.search_s(cos_def_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))") - # This is a subtree policy - container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn) - except: - # This is a user policy - rdns = ldap.dn.explode_dn(targetdn) - rdns.pop(0) - parentdn = ",".join(rdns) - container_dn = "cn=nsPwPolicyContainer,{}".format(parentdn) - - policy_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn) - mods = [] - for arg in vars(args): - val = getattr(args, arg) - if arg in arg_to_attr and val is not None: - mods.append((ldap.MOD_REPLACE, ensure_str(arg_to_attr[arg]), ensure_bytes(val))) - if len(mods) > 0: - self.conn.modify_s(policy_dn, mods) diff --git a/src/lib389/lib389/tests/cli/conf_backup_test.py b/src/lib389/lib389/tests/cli/conf_backup_test.py index 1f339c5..9028c94 100644 --- a/src/lib389/lib389/tests/cli/conf_backup_test.py +++ b/src/lib389/lib389/tests/cli/conf_backup_test.py @@ -4,8 +4,10 @@ import shutil from lib389.cli_conf.backup import backup_create, backup_restore from lib389.cli_base import LogCapture, FakeArgs +from lib389.idm.user import UserAccounts from lib389.topologies import topology_st from lib389.utils import ds_is_older +from lib389._constants import DEFAULT_SUFFIX pytestmark = pytest.mark.skipif(ds_is_older('1.4.0'), reason="Not implemented") @@ -13,18 +15,27 @@ def test_basic(topology_st): BACKUP_DIR = os.path.join(topology_st.standalone.ds_paths.backup_dir, "basic_backup") topology_st.logcap = LogCapture() args = FakeArgs() + + users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX) + user = users.create_test_user() + user.replace("description", "backup_test") + # Clean the backup dir first if os.path.exists(BACKUP_DIR): shutil.rmtree(BACKUP_DIR) + # Create the backup args.archive = BACKUP_DIR args.db_type = None backup_create(topology_st.standalone, None, topology_st.logcap.log, args) assert os.listdir(BACKUP_DIR) + # Restore the backup args.archive = topology_st.standalone.ds_paths.backup_dir args.db_type = None backup_restore(topology_st.standalone, None, topology_st.logcap.log, args) + assert user.present("description", "backup_test") + # No error has happened! Done! # Clean up if os.path.exists(BACKUP_DIR):