#49923 Issue 49866 - Refactor PwPolicy lib389/CLI module
Closed 3 years ago by spichugi. Opened 5 years ago by spichugi.
spichugi/389-ds-base pwdpol_obj  into  master

@@ -775,8 +775,8 @@ 

        } else if (ldif_file_import.indexOf(' ') >= 0) {

          popup_msg("Error", "LDIF file can not contain any spaces");

          return;

-       } else if (ldif_file_import.startsWith('/')) {

-         popup_msg("Error", "LDIF file can not start with a forward slash. " +

+       } else if (ldif_file_import.indexOf('/') === -1 ) {

+         popup_msg("Error", "LDIF file can not contain a forward slash. " +

                             "LDIF files are written to the server's LDIF directory (nsslapd-ldifdir)");

          return;

        } else {
@@ -818,8 +818,8 @@ 

        if (ldif_file_export.indexOf(' ') >= 0) {

          popup_msg("Error", "LDIF file can not contain any spaces");

          return;

-       } else if (ldif_file_export.startsWith('/')) {

-         popup_msg("Error", "LDIF file can not start with a forward slash. " +

+       } else if (ldif_file_import.indexOf('/') === -1 ) {

+         popup_msg("Error", "LDIF file can not contain a forward slash. " +

                             "LDIF files are written to the server's LDIF directory (nsslapd-ldifdir)");

          return;

        } else if (ldif_file_export != ""){

@@ -1217,8 +1217,8 @@ 

          popup_msg("Error", "Backup name can not contain any spaces");

          return;

        }

-       if (backup_name.startsWith('/')) {

-         popup_msg("Error", "Backup name can not start with a forward slash. " +

+       if (backup_name.indexOf('/') === -1) {

+         popup_msg("Error", "Backup name can not contain a forward slash. " +

                             "Backups are written to the server's backup directory (nsslapd-bakdir)");

          return;

        }

file modified
+13 -8
@@ -313,7 +313,7 @@ 

          from lib389.monitor import Monitor, MonitorLDBM

          from lib389.rootdse import RootDSE

          from lib389.saslmap import SaslMapping, SaslMappings

-         from lib389.pwpolicy import Pwpolicy

+         from lib389.pwpolicy import PwPolicyManager

  

          # Need updating

          self.agreement = Agreement(self)
@@ -327,7 +327,7 @@ 

          self.plugins = Plugins(self)

          self.tasks = Tasks(self)

          self.saslmap = SaslMapping(self)

-         self.pwpolicy = Pwpolicy(self)

+         self.pwpolicy = PwPolicyManager(self)

          # Do we have a certdb path?

          # if MAJOR < 3:

          self.monitor = Monitor(self)
@@ -2846,11 +2846,12 @@ 

          else:

              # No output file specified.  Use the default ldif location/name

              cmd.append('-a')

+             tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

              if bename:

-                 ldifname = "/" + self.serverid + "-" + bename + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

+                 ldifname = os.path.join(self._instance.ds_paths.ldif_dir, "%s-%s-%s.ldif" % (self._instance.serverid, be_name, tnow))

              else:

-                 ldifname = "/" + self.serverid + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

-             cmd.append(self.get_ldif_dir() + ldifname)

+                 ldifname = os.path.join(self._instance.ds_paths.ldif_dir, "%s-%s.ldif" % (self._instance.serverid, tnow))

+             cmd.append(ldifname)

          try:

              result = subprocess.check_output(cmd, encoding='utf-8')

          except subprocess.CalledProcessError as e:
@@ -2916,7 +2917,8 @@ 

  

          if archive_dir is None:

              # Use the instance name and date/time as the default backup name

-             archive_dir = self.get_bak_dir() + "/" + self.serverid + "-" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

+             tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

+             archive_dir = os.path.join(self.ds_paths.backup_dir, "%s-%s" % (self.serverid, tnow))

          elif not archive_dir.startswith("/"):

              # Relative path, append it to the bak directory

              archive_dir = os.path.join(self.ds_paths.backup_dir, archive_dir)
@@ -3418,8 +3420,11 @@ 

          if archive is None:

              # Use the instance name and date/time as the default backup name

              tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

-             archive = os.path.join(self.ds_paths.backup_dir,

-                                    "%s_%s" % (self.serverid, tnow))

+             if self.serverid is not None:

+                 backup_dir_name = "%s-%s" % (self.serverid, tnow)

+             else:

+                 backup_dir_name = "backup-%s" % tnow

+             archive = os.path.join(self.ds_paths.backup_dir, backup_dir_name)

          elif archive[0] != "/":

              # Relative path, append it to the bak directory

              archive = os.path.join(self.ds_paths.backup_dir, archive)

file modified
+11 -5
@@ -610,8 +610,11 @@ 

          ldif_paths = []

          for ldif in list(ldifs):

              if not ldif.startswith("/"):

-                 ldif = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif)

-                 ldif_paths.append(ldif)

+                 if ldif.endswith(".ldif"):

+                     ldif = os.path.join(self._instance.ds_paths.ldif_dir, ldif)

+                 else:

+                     ldif = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif)

+             ldif_paths.append(ldif)

  

          task = ImportTask(self._instance)

          task_properties = {'nsInstance': be_name,
@@ -643,12 +646,15 @@ 

          task = ExportTask(self._instance)

          task_properties = {'nsInstance': be_names}

          if ldif is not None and not ldif.startswith("/"):

-             task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif)

+             if ldif.endswith(".ldif"):

+                 task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, ldif)

+             else:

+                 task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir, "%s.ldif" % ldif)

          else:

              tnow = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

              task_properties['nsFilename'] = os.path.join(self._instance.ds_paths.ldif_dir,

-                                                          "%s_%s_%s.ldif" % (self._instance.serverid,

-                                                                             "_".join(be_names), tnow))

+                                                          "%s-%s-%s.ldif" % (self._instance.serverid,

+                                                                             "-".join(be_names), tnow))

          if include_suffixes is not None:

              task_properties['nsIncludeSuffix'] = include_suffixes

          if exclude_suffixes is not None:

@@ -6,7 +6,11 @@ 

  # See LICENSE for details.

  # --- END COPYRIGHT BLOCK ---

  

+ import json

  import ldap

+ from lib389.utils import ensure_str, ensure_list_str

+ from lib389.pwpolicy import PwPolicyEntries, PwPolicyManager

+ from lib389.idm.account import Account

  

  arg_to_attr = {

          'pwdlocal': 'nsslapd-pwpolicy-local',
@@ -51,51 +55,170 @@ 

          'pwdallowhash': 'nsslapd-allow-hashed-passwords'

      }

  

+ def _args_to_attrs(args):

+     attrs = {}

+     for arg in vars(args):

+         val = getattr(args, arg)

+         if arg in arg_to_attr and val is not None:

+             attrs[arg_to_attr[arg]] = val

+     return attrs

+ 

+ 

+ def _get_policy_type(inst, dn=None):

+     pwp_manager = PwPolicyManager(inst)

+     if dn is None:

+         return "Global Password Policy"

+     elif pwp_manager.is_user_policy(dn):

+         return "User Policy"

+     elif pwp_manager.is_subtree_policy(dn):

+         return "Subtree Policy"

+     else:

+         raise ValueError("The policy wasn't set up for the target dn entry or it is invalid")

+ 

+ 

+ def _get_pw_policy(inst, targetdn, log, use_json=None):

+     pwp_manager = PwPolicyManager(inst)

+     policy_type = _get_policy_type(inst, targetdn)

+ 

+     if "global" in policy_type.lower():

+         targetdn = 'cn=config'

+         pwp_manager.pwp_attributes.extend(['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local'])

+     else:

+         targetdn = pwp_manager.get_pwpolicy_entry(targetdn).dn

+ 

+     entries = inst.search_s(targetdn, ldap.SCOPE_BASE, 'objectclass=*', pwp_manager.pwp_attributes)

+     entry = entries[0]

+ 

+     if use_json:

+         str_attrs = {}

+         for k in entry.data:

+             str_attrs[ensure_str(k)] = ensure_list_str(entry.data[k])

+ 

+         # ensure all the keys are lowercase

+         str_attrs = dict((k.lower(), v) for k, v in str_attrs.items())

+ 

+         print(json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": str_attrs}))

+     else:

+         if "global" in policy_type.lower():

+             response = "Global Password Policy: cn=config\n------------------------------------\n"

+         else:

+             response = "Local {} Policy: {}\n------------------------------------\n".format(policy_type, targetdn)

+         for k in entry.data:

+             response += "{}: {}\n".format(k, ensure_str(entry.data[k][0]))

+         log.info(response)

+ 

  

  def list_policies(inst, basedn, log, args):

-     print(inst.pwpolicy.list_policies(args.DN[0], use_json=args.json))

+     log = log.getChild('list_policies')

+     targetdn = args.DN[0]

+     pwp_manager = PwPolicyManager(inst)

+ 

+     if args.json:

+         result = {'type': 'list', 'items': []}

+     else:

+         result = ""

+ 

+     # Verify target dn exists before getting started

+     user_entry = Account(inst, args.DN[0])

+     if not user_entry.exists():

+         raise ValueError('The target entry dn does not exist')

+ 

+     # User pwpolicy entry is under the container that is under the parent,

+     # so we need to go one level up

+     if pwp_manager.is_user_policy(targetdn):

+         policy_type = _get_policy_type(inst, user_entry.dn)

+         if args.json:

+             result['items'].append([user_entry.dn, policy_type])

+         else:

+             result += "%s (%s)\n" % (user_entry.dn, policy_type.lower())

+     else:

+         pwp_entries = PwPolicyEntries(inst, targetdn)

+         for pwp_entry in pwp_entries.list():

+             cn = pwp_entry.get_attr_val_utf8_l('cn')

+             if pwp_entry.is_subtree_policy():

+                 entrydn = cn.replace('cn=nspwpolicyentry_subtree,', '')

+             else:

+                 entrydn = cn.replace('cn=nspwpolicyentry_user,', '')

+             policy_type = _get_policy_type(inst, entrydn)

+ 

+             if args.json:

+                 result['items'].append([entrydn, policy_type])

+             else:

+                 result += "%s (%s)\n" % (entrydn, policy_type.lower())

+ 

+     if args.json:

+         return print(json.dumps(result))

+     else:

+         log.info(result)

  

  

  def get_local_policy(inst, basedn, log, args):

-     print(inst.pwpolicy.get_pwpolicy(targetdn=args.DN[0], use_json=args.json))

+     log = log.getChild('get_local_policy')

+     _get_pw_policy(inst, args.DN[0], log, args.json)

  

  

  def get_global_policy(inst, basedn, log, args):

-     print(inst.pwpolicy.get_pwpolicy(targetdn=None, use_json=args.json))

+     log = log.getChild('get_global_policy')

+     _get_pw_policy(inst, None, log, args.json)

  

  

  def create_subtree_policy(inst, basedn, log, args):

-     try:

-         inst.pwpolicy.create_subtree_policy(args.DN[0], args, arg_to_attr)

-         print('Successfully created subtree password policy')

-     except ldap.ALREADY_EXISTS:

-         raise ValueError('There is already a subtree password policy created for this entry')

+     log = log.getChild('create_subtree_policy')

+     # Gather the attributes

+     attrs = _args_to_attrs(args)

+     pwp_manager = PwPolicyManager(inst)

+     pwp_manager.create_subtree_policy(args.DN[0], attrs)

+ 

+     log.info('Successfully created subtree password policy')

  

  

  def create_user_policy(inst, basedn, log, args):

-     try:

-         inst.pwpolicy.create_user_policy(args.DN[0], args, arg_to_attr)

-         print('Successfully created user password policy')

-     except ldap.ALREADY_EXISTS:

-         raise ValueError('There is already a user password policy created for this entry')

+     log = log.getChild('create_user_policy')

+     # Gather the attributes

+     attrs = _args_to_attrs(args)

+     pwp_manager = PwPolicyManager(inst)

+     pwp_manager.create_user_policy(args.DN[0], attrs)

+ 

+     log.info('Successfully created user password policy')

  

  

  def set_global_policy(inst, basedn, log, args):

-     for arg in vars(args):

-         val = getattr(args, arg)

-         if arg in arg_to_attr and val is not None:

-             inst.config.set(arg_to_attr[arg], val)

-     print("Successfully updated global policy")

+     log = log.getChild('set_global_policy')

+     # Gather the attributes

+     attrs = _args_to_attrs(args)

+     pwp_manager = PwPolicyManager(inst)

+     pwp_manager.set_global_policy(attrs)

+ 

+     log.info('Successfully updated global password policy')

  

  

  def set_local_policy(inst, basedn, log, args):

-     inst.pwpolicy.set_policy(args.DN[0], args, arg_to_attr)

-     print("Successfully updated local policy")

+     log = log.getChild('set_local_policy')

+     targetdn = args.DN[0]

+     # Gather the attributes

+     attrs = _args_to_attrs(args)

+     pwp_manager = PwPolicyManager(inst)

+     pwp_entry = pwp_manager.get_pwpolicy_entry(args.DN[0])

+     policy_type = _get_policy_type(inst, targetdn)

+ 

+     modlist = []

+     for attr, value in attrs.items():

+         modlist.append((attr, value))

+     if len(modlist) > 0:

+         pwp_entry.replace_many(*modlist)

+     else:

+         raise ValueError("There are no password policies to set")

+ 

+     log.info('Successfully updated %s' % policy_type.lower())

  

  

  def del_local_policy(inst, basedn, log, args):

-     inst.pwpolicy.delete_local_policy(args.DN[0])

-     print("Successfully removed local password policy")

+     log = log.getChild('del_local_policy')

+     targetdn = args.DN[0]

+     policy_type = _get_policy_type(inst, targetdn)

+     pwp_manager = PwPolicyManager(inst)

+     pwp_manager.delete_local_policy(targetdn)

+     log.info('Successfully deleted %s' % policy_type.lower())

  

  

  def create_parser(subparsers):

file modified
+64 -52
@@ -14,15 +14,17 @@ 

  

  from lib389.utils import ensure_str

  

+ 

  class CosTemplate(DSLdapObject):

-     def __init__(self, instance, dn=None):

-         """A Cos Template defining the values to override on a target.

+     """A Cos Template defining the values to override on a target.

+ 

+     :param instance: DirSrv instance

+     :type instance: DirSrv

+     :param dn: The dn of the template

+     :type dn: str

+     """

  

-         :param instance: DirSrv instance

-         :type instance: DirSrv

-         :param dn: The dn of the template

-         :type dn: str

-         """

+     def __init__(self, instance, dn=None):

          super(CosTemplate, self).__init__(instance, dn)

          self._rdn_attribute = 'cn'

          self._must_attributes = ['cn']
@@ -35,18 +37,20 @@ 

          ]

          self._protected = False

  

+ 

  class CosTemplates(DSLdapObjects):

+     """The set of costemplates that exist for direct and indirect

+     implementations.

+ 

+     :param instance: A dirsrv instance

+     :type instance: DirSrv

+     :param basedn: The basedn of the templates

+     :type basedn: str

+     :param rdn: The rdn of the templates

+     :type rdn: str

+     """

+ 

      def __init__(self, instance, basedn, rdn=None):

-         """The set of costemplates that exist for direct and indirect

-         implementations.

- 

-         :param instance: A dirsrv instance

-         :type instance: DirSrv

-         :param basedn: The basedn of the templates

-         :type basedn: str

-         :param rdn: The rdn of the templates

-         :type rdn: str

-         """

          super(CosTemplates, self).__init__(instance)

          self._objectclasses = [

              'cosTemplate'
@@ -59,18 +63,19 @@ 

  

  

  class CosIndirectDefinition(DSLdapObject):

+     """A Cos Indirect Definition associating an attr:value pair as a link

+     attr to a template type.

+ 

+     :param instance: DirSrv instance

+     :type instance: DirSrv

+     :param dn: The dn of the template

+     :type dn: str

+     """

+ 

      def __init__(self, instance, dn=None):

-         """A Cos Indirect Definition associating an attr:value pair as a link

-         attr to a template type.

- 

-         :param instance: DirSrv instance

-         :type instance: DirSrv

-         :param dn: The dn of the template

-         :type dn: str

-         """

          super(CosIndirectDefinition, self).__init__(instance, dn)

          self._rdn_attribute = 'cn'

-         self._must_attributes = ['cn', 'cosIndirectSpecifier', 'cosAttribute']

+         self._must_attributes = ['cn', 'cosIndirectSpecifier', 'cosattribute']

          self._create_objectclasses = [

              'top',

              'cosSuperDefinition',
@@ -78,17 +83,19 @@ 

          ]

          self._protected = False

  

+ 

  class CosIndirectDefinitions(DSLdapObjects):

+     """The set of cos indirect definitions that exist.

+ 

+     :param instance: A dirsrv instance

+     :type instance: DirSrv

+     :param basedn: The basedn of the templates

+     :type basedn: str

+     :param rdn: The rdn of the templates

+     :type rdn: str

+     """

+ 

      def __init__(self, instance, basedn, rdn=None):

-         """The set of cos indirect definitions that exist.

- 

-         :param instance: A dirsrv instance

-         :type instance: DirSrv

-         :param basedn: The basedn of the templates

-         :type basedn: str

-         :param rdn: The rdn of the templates

-         :type rdn: str

-         """

          super(CosIndirectDefinitions, self).__init__(instance)

          self._objectclasses = [

              'cosSuperDefinition',
@@ -102,38 +109,43 @@ 

  

  

  class CosPointerDefinition(DSLdapObject):

+     """A Cos Pointer Definition associating a dn syntax type as a link

+     attr to a template type.

+ 

+     :param instance: DirSrv instance

+     :type instance: DirSrv

+     :param dn: The dn of the template

+     :type dn: str

+     """

+ 

      def __init__(self, instance, dn=None):

-         """A Cos Pointer Definition associating a dn syntax type as a link

-         attr to a template type.

- 

-         :param instance: DirSrv instance

-         :type instance: DirSrv

-         :param dn: The dn of the template

-         :type dn: str

-         """

          super(CosPointerDefinition, self).__init__(instance, dn)

          self._rdn_attribute = 'cn'

          self._must_attributes = ['cn', 'cosTemplateDn', 'cosAttribute']

          self._create_objectclasses = [

              'top',

+             'ldapsubentry',

              'cosSuperDefinition',

              'cosPointerDefinition',

          ]

          self._protected = False

  

+ 

  class CosPointerDefinitions(DSLdapObjects):

+     """The set of cos pointer definitions that exist.

+ 

+     :param instance: A dirsrv instance

+     :type instance: DirSrv

+     :param basedn: The basedn of the templates

+     :type basedn: str

+     :param rdn: The rdn of the templates

+     :type rdn: str

+     """

+ 

      def __init__(self, instance, basedn, rdn=None):

-         """The set of cos pointer definitions that exist.

- 

-         :param instance: A dirsrv instance

-         :type instance: DirSrv

-         :param basedn: The basedn of the templates

-         :type basedn: str

-         :param rdn: The rdn of the templates

-         :type rdn: str

-         """

          super(CosPointerDefinitions, self).__init__(instance)

          self._objectclasses = [

+             'ldapsubentry',

              'cosSuperDefinition',

              'cosPointerDefinition',

          ]

file modified
+242 -249
@@ -9,28 +9,27 @@ 

  import ldap

  import json

  from ldap import modlist

+ from lib389._mapped_object import DSLdapObject, DSLdapObjects

+ from lib389.config import Config

+ from lib389.idm.account import Account, Accounts

+ from lib389.idm.nscontainer import nsContainers, nsContainer

+ from lib389.cos import CosPointerDefinitions, CosPointerDefinition, CosTemplates

  from lib389.utils import ensure_str, ensure_list_str, ensure_bytes

  

  USER_POLICY = 1

  SUBTREE_POLICY = 2

  

  

- class Pwpolicy(object):

-     """A local password policy, either user or subtree

+ class PwPolicyManager(object):

Shouldn't this be a subclass of CosTemplates?

+     """Manages user, subtree and global password policies

  

      :param instance: An instance

      :type instance: lib389.DirSrv

-     :param dn: Entry DN

-     :type dn: str

- 

- 

-     cn=nsPwPolicyEntry,DN_OF_ENTRY,cn=nsPwPolicyContainer,SUFFIX

- 

      """

  

-     def __init__(self, conn):

-         self.conn = conn

-         self.log = conn.log

+     def __init__(self, instance):

+         self._instance = instance

+         self.log = instance.log

          self.pwp_attributes = [

              'passwordstoragescheme',

              'passwordChange',
@@ -74,274 +73,268 @@ 

              'nsslapd-allow-hashed-passwords'

          ]

  

-     def create_pwp_container(self, basedn):

-         attrs = {'objectclass': [b'top', b'nsContainer'],

-                  'cn': [b'nsPwPolicyContainer']}

-         ldif = modlist.addModlist(attrs)

-         try:

-             self.conn.add_ext_s(basedn, ldif)

-         except ldap.ALREADY_EXISTS:

-             # Already exists, no problem

-             pass

+     def is_user_policy(self, dn):

+         """Check if the entry has a user password policy

+ 

+         :param dn: Entry DN with PwPolicy set up

+         :type dn: str

  

-     def create_user_policy(self, targetdn, args, arg_to_attr):

-         """Create a local user password policy entry

+         :returns: True if the entry has a user policy, False otherwise

          """

  

-         # Verify target dn exists before getting started

+         # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too

+         entry = Account(self._instance, dn)

          try:

-             self.conn.search_s(targetdn, ldap.SCOPE_BASE, "objectclass=*")

+             if entry.present("objectclass", "costemplate"):

+                 # It is a CoSTemplate entry, not user policy

+                 return False

+ 

+             # Check if it's a subtree or a user policy

+             if entry.present("pwdpolicysubentry"):

+                 return True

+             else:

+                 return False

          except ldap.NO_SUCH_OBJECT:

+             return False

+ 

+     def is_subtree_policy(self, dn):

+         """Check if the entry has a subtree password policy

+ 

+         :param dn: Entry DN with PwPolicy set up

+         :type dn: str

+ 

+         :returns: True if the entry has a subtree policy, False otherwise

+         """

+ 

+         # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too

+         cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn)

+         if cos_pointer_def.exists():

+             return True

+         else:

+             return False

+ 

+     def create_user_policy(self, dn, properties):

+         """Creates all entries which are needed for the user

+         password policy

+ 

+         :param dn: Entry DN for the subtree pwpolicy

+         :type dn: str

+         :param properties: A dict with password policy settings

+         :type properties: dict

+ 

+         :returns: PwPolicyEntry instance

+         """

+ 

+         # Verify target dn exists before getting started

+         user_entry = Account(self._instance, dn)

+         if not user_entry.exists():

              raise ValueError('Can not create user password policy because the target dn does not exist')

  

-         rdns = ldap.dn.explode_dn(targetdn)

+         rdns = ldap.dn.explode_dn(user_entry.dn)

          rdns.pop(0)

          parentdn = ",".join(rdns)

  

          # Create the pwp container if needed

-         self.create_pwp_container("cn=nsPwPolicyContainer,{}".format(parentdn))

- 

-         # Gather the attributes and create policy entry

-         attrs = {}

-         for arg in vars(args):

-             val = getattr(args, arg)

-             if arg in arg_to_attr and val is not None:

-                 attrs[arg_to_attr[arg]] = ensure_bytes(val)

-         attrs['objectclass'] = [b'top', b'ldapsubentry', b'passwordpolicy']

-         ldif = modlist.addModlist(attrs)

-         user_dn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, parentdn)

-         self.conn.add_ext_s(user_dn, ldif)

- 

-         # Add policy to entry

-         self.conn.modify_s(targetdn, [(ldap.MOD_REPLACE, 'pwdpolicysubentry', ensure_bytes(user_dn))])

- 

-     def create_subtree_policy(self, targetdn, args, arg_to_attr):

-         """Create a local subtree password policy entry - requires COS entry

+         pwp_containers = nsContainers(self._instance, basedn=parentdn)

+         pwp_container = pwp_containers.ensure_state(properties={'cn': 'nsPwPolicyContainer'})

+ 

+         # Create policy entry

+         properties['cn'] = 'cn=nsPwPolicyEntry_user,%s' % dn

+         pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

+         pwp_entry = pwp_entries.create(properties=properties)

+ 

+         # Add policy to the entry

+         user_entry.replace('pwdpolicysubentry', pwp_entry.dn)

+ 

+         return pwp_entry

+ 

+     def create_subtree_policy(self, dn, properties):

+         """Creates all entries which are needed for the subtree

+         password policy

+ 

+         :param dn: Entry DN for the subtree pwpolicy

+         :type dn: str

+         :param properties: A dict with password policy settings

+         :type properties: dict

+ 

+         :returns: PwPolicyEntry instance

          """

  

          # Verify target dn exists before getting started

-         try:

-             self.conn.search_s(targetdn, ldap.SCOPE_BASE, "objectclass=*")

-         except ldap.NO_SUCH_OBJECT:

-             raise ValueError('Can not create user password policy because the target dn does not exist')

+         subtree_entry = Account(self._instance, dn)

+         if not subtree_entry.exists():

+             raise ValueError('Can not create subtree password policy because the target dn does not exist')

  

          # Create the pwp container if needed

-         container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn)

-         self.create_pwp_container(container_dn)

- 

-         # Create COS entries

-         cos_template_entry = 'cn=nsPwTemplateEntry,{}'.format(targetdn)

-         cos_template_dn = 'cn="cn=nsPwTemplateEntry,{}",{}'.format(targetdn, container_dn)

-         cos_subentry_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn)

-         cos_template_attrs = {'cosPriority': b'1', 'pwdpolicysubentry': ensure_bytes(cos_subentry_dn),

-                               'cn': ensure_bytes(cos_template_entry)}

- 

-         cos_template_attrs['objectclass'] = [b'top', b'ldapsubentry', b'extensibleObject', b'costemplate']

-         ldif = modlist.addModlist(cos_template_attrs)

-         self.conn.add_ext_s(cos_template_dn, ldif)

- 

-         cos_def_attrs = {'objectclass': [b'top', b'ldapsubentry', b'extensibleObject',

-                                          b'cosSuperDefinition', b'cosPointerDefinition'],

-                          'cosAttribute': b'pwdpolicysubentry default operational',

-                          'cosTemplateDn': ensure_bytes(cos_template_dn),

-                          'cn': b'nsPwPolicy_CoS'}

-         ldif = modlist.addModlist(cos_def_attrs)

-         self.conn.add_ext_s("cn=nsPwPolicy_CoS,{}".format(targetdn), ldif)

- 

-         # Gather the attributes and create policy sub entry

-         attrs = {}

-         for arg in vars(args):

-             val = getattr(args, arg)

-             if arg in arg_to_attr and val is not None:

-                 attrs[arg_to_attr[arg]] = ensure_bytes(val)

-         attrs['objectclass'] = [b'top', b'ldapsubentry', b'passwordpolicy']

-         ldif = modlist.addModlist(attrs)

-         try:

-             self.conn.add_ext_s(cos_subentry_dn, ldif)

-         except ldap.ALREADY_EXISTS:

-             # Already exists, no problem

-             pass

+         pwp_containers = nsContainers(self._instance, basedn=dn)

+         pwp_container = pwp_containers.ensure_state(properties={'cn': 'nsPwPolicyContainer'})

  

-         # Add policy to entry

-         self.conn.modify_s(targetdn, [(ldap.MOD_REPLACE, 'pwdpolicysubentry', ensure_bytes(cos_subentry_dn))])

+         # Create policy entry

+         properties['cn'] = 'cn=nsPwPolicyEntry_subtree,%s' % dn

+         pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

+         pwp_entry = pwp_entries.create(properties=properties)

  

-     def delete_local_policy(self, targetdn):

-         container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn)

+         # The CoS template entry (nsPwTemplateEntry)

+         # that has the pwdpolicysubentry value pointing to the above (nsPwPolicyEntry) entry

+         cos_templates = CosTemplates(self._instance, pwp_container.dn)

+         cos_template = cos_templates.create(properties={'cosPriority': '1',

+                                                         'pwpolicysubentry': pwp_entry.dn,

+                                                         'cn': 'cn=nsPwTemplateEntry,%s' % dn})

  

-         # First check that the entry exists

-         try:

-             entries = self.conn.search_s(targetdn, ldap.SCOPE_BASE, 'objectclass=top', ['pwdpolicysubentry'])

-             target_entry = entries[0]

-         except ldap.NO_SUCH_OBJECT:

-             raise ValueError('The entry does not exist, nothing to remove')

+         # The CoS specification entry at the subtree level

+         cos_pointer_defs = CosPointerDefinitions(self._instance, dn)

+         cos_pointer_defs.create(properties={'cosAttribute': 'pwdpolicysubentry default operational',

+                                             'cosTemplateDn': cos_template.dn,

+                                             'cn': 'nsPwPolicy_CoS'})

  

-         # Subtree or local policy?

-         try:

-             cos_def_dn = 'cn=nsPwPolicy_CoS,{}'.format(targetdn)

-             self.conn.search_s(cos_def_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))")

-             found_subtree = True

-         except:

-             found_subtree = False

- 

-         # If subtree policy then remove COS template and definition

-         if found_subtree:

-             container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn)

-             cos_template_dn = 'cn="cn=nsPwTemplateEntry,{}",{}'.format(targetdn, container_dn)

-             policy_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn)

-             self.conn.delete_s(cos_template_dn)

-             self.conn.delete_s(policy_dn)

-             self.conn.delete_s(cos_def_dn)

+         return pwp_entry

+ 

+     def get_pwpolicy_entry(self, dn):

+         """Get a local password policy entry

+ 

+         :param dn: Entry DN for the local pwpolicy

+         :type dn: str

+ 

+         :returns: PwPolicyEntry instance

+         """

+ 

+         # Verify target dn exists before getting started

+         entry = Account(self._instance, dn)

+         if not entry.exists():

+             raise ValueError('Can not get the password policy entry because the target dn does not exist')

+ 

+         # Check if it's a subtree or a user policy

+         if self.is_user_policy(entry.dn):

+             pwp_entry_dn = entry.get_attr_val_utf8("pwdpolicysubentry")

+         elif self.is_subtree_policy(entry.dn):

+             pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % dn)

+ 

+             pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

+             pwp_entry_dn = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn).dn

          else:

-             # Remove password subentry from target DN, then remove the policy entry itself

-             rdns = ldap.dn.explode_dn(targetdn)

+             raise ValueError("The policy wasn't set up for the target dn entry or it is invalid")

+ 

+         return PwPolicyEntry(self._instance, pwp_entry_dn)

+ 

+     def delete_local_policy(self, dn):

+         """Delete a local password policy entry

+ 

+         :param dn: Entry DN for the local pwpolicy

+         :type dn: str

+         """

+ 

+         subtree = False

+         # Verify target dn exists before getting started

+         entry = Account(self._instance, dn)

+         if not entry.exists():

+             raise ValueError('The target entry dn does not exist')

+ 

+         if self.is_subtree_policy(entry.dn):

+             parentdn = dn

+             subtree = True

+         elif self.is_user_policy(entry.dn):

+             rdns = ldap.dn.explode_dn(entry.dn)

              rdns.pop(0)

              parentdn = ",".join(rdns)

-             container_dn = "cn=nsPwPolicyContainer,{}".format(parentdn)

-             policy_dn = target_entry.getValue('pwdpolicysubentry')

-             if policy_dn is None or policy_dn == "":

-                 raise ValueError('There is no local password policy for this entry')

-             self.conn.delete_s(ensure_str(policy_dn))

- 

-         # Remove the passwordPolicySubentry from the target

-         self.conn.modify_s(targetdn, [(ldap.MOD_DELETE, 'pwdpolicysubentry', None)])

- 

-         # if there are no other entries under the container, then remove the container

-         entries = self.conn.search_s(container_dn, ldap.SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=*))")

-         if len(entries) == 1:

-             self.conn.delete_s(container_dn)

- 

-     def get_pwpolicy(self, targetdn=None, use_json=False):

-         """Get the local or global password policy entry"""

-         global_policy = False

-         policy_type = "global"

-         if targetdn is not None:

-             # Local policy listed by name

-             entrydn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, targetdn)

-             pwp_attributes = self.pwp_attributes

          else:

-             # Global policy

-             global_policy = True

-             entrydn = "cn=config"

-             pwp_attributes = self.pwp_attributes

-             pwp_attributes += ['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local']

+             raise ValueError("The policy wasn't set up for the target dn entry or the policy is invalid")

  

-         try:

-             entries = self.conn.search_s(entrydn,

-                                          ldap.SCOPE_BASE,

-                                          'objectclass=*',

-                                          pwp_attributes)

-             entry = entries[0]

-         except ldap.NO_SUCH_OBJECT:

-             # okay lets see if its auser policy

-             rdns = ldap.dn.explode_dn(targetdn)

-             rdns.pop(0)

-             parentdn = (",").join(rdns)

-             entrydn = 'cn="cn=nsPwPolicyEntry,{}",cn=nsPwPolicyContainer,{}'.format(targetdn, parentdn)

-             try:

-                 entries = self.conn.search_s(entrydn,

-                                              ldap.SCOPE_BASE,

-                                              'objectclass=*',

-                                              pwp_attributes)

-                 entry = entries[0]

-             except ldap.LDAPError as e:

-                 raise ValueError("Could not find password policy for entry: {} Error: {}".format(targetdn, str(e)))

-         except ldap.LDAPError as e:

-             raise ValueError("Could not find password policy for entry: {} Error: {}".format(targetdn, str(e)))

- 

-         if not global_policy:

-             # subtree or user policy?

-             cos_dn = 'cn=nspwpolicy_cos,' + targetdn

-             try:

-                 self.conn.search_s(cos_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))")

-                 policy_type = "Subtree"

-             except:

-                 policy_type = "User"

- 

-         if use_json:

-             str_attrs = {}

-             for k in entry.data:

-                 str_attrs[ensure_str(k)] = ensure_list_str(entry.data[k])

- 

-             # ensure all the keys are lowercase

-             str_attrs = dict((k.lower(), v) for k, v in str_attrs.items())

- 

-             response = json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": str_attrs})

+         pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % parentdn)

+ 

+         pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

+         if subtree:

+             pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn)

          else:

-             if global_policy:

-                 response = "Global Password Policy: cn=config\n------------------------------------\n"

-             else:

-                 response = "Local {} Policy: {}\n------------------------------------\n".format(policy_type, targetdn)

-             for k in entry.data:

-                 response += "{}: {}\n".format(k, ensure_str(entry.data[k][0]))

+             pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_user,%s' % dn)

+ 

+         if self.is_subtree_policy(entry.dn):

+             cos_templates = CosTemplates(self._instance, pwp_container.dn)

+             cos_template = cos_templates.get('cn=nsPwTemplateEntry,%s' % dn)

+             cos_template.delete()

+ 

+             cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn)

+             cos_pointer_def.delete()

+         else:

+             entry.remove("pwdpolicysubentry", pwp_entry.dn)

+ 

+         pwp_entry.delete()

+         try:

+             pwp_container.delete()

+         except ldap.NOT_ALLOWED_ON_NONLEAF:

+             pass

  

-         return response

+     def set_global_policy(self, properties):

+         """Configure global password policy

  

-     def list_policies(self, targetdn, use_json=False):

-         """Return a list of the target DN's of all user policies

+         :param properties: A dictionary with password policy attributes

+         :type properties: dict

          """

  

-         if use_json:

-             result = {'type': 'list', 'items': []}

-         else:

-             result = ""

+         modlist = []

+         for attr, value in properties.items():

+             modlist.append((attr, value))

  

-         # First get all the policies

-         policies = self.conn.search_s(targetdn, ldap.SCOPE_SUBTREE, "(&(objectclass=ldapsubentry)(objectclass=passwordpolicy))", ['cn'])

-         if policies is None or len(policies) == 0:

-             if use_json:

-                 return json.dumps(result)

-             else:

-                 return "No local password polices found"

- 

-         # Determine if the policy is subtree or user (subtree policies have COS entries)

-         for policy in policies:

-             cn = ensure_str(policy.getValue('cn'))

-             entrydn = cn.lower().replace('cn=nspwpolicyentry,', '')   # .lstrip()

-             cos_dn = cn.lower().replace('cn=nspwpolicyentry', 'cn=nspwpolicy_cos')

-             try:

-                 self.conn.search_s(cos_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))")

-                 found_subtree = True

-             except:

-                 found_subtree = False

- 

-             if found_subtree:

-                 # Build subtree policy list

-                 if use_json:

-                     result['items'].append([entrydn, "Subtree Policy"])

-                 else:

-                     result += entrydn + " (subtree policy)\n"

-             else:

-                 # Build user policy list

-                 if use_json:

-                     result['items'].append([entrydn, "User Policy"])

-                 else:

-                     result += entrydn + " (user policy)\n"

- 

-         if use_json:

-             return json.dumps(result)

+         if len(modlist) > 0:

+             config = Config(self._instance)

+             config.replace_many(*modlist)

          else:

-             return result

+             raise ValueError("There are no password policies to set")

+ 

+ 

+ class PwPolicyEntry(DSLdapObject):

+     """A single instance of a task entry

+ 

+     :param instance: An instance

+     :type instance: lib389.DirSrv

+     :param dn: Entry DN

+     :type dn: str

+     """

+ 

+     def __init__(self, instance, dn):

+         super(PwPolicyEntry, self).__init__(instance, dn)

+         self._rdn_attribute = 'cn'

+         self._must_attributes = ['cn']

+         self._create_objectclasses = ['top', 'ldapsubentry', 'passwordpolicy']

+         self._protected = False

+ 

+     def is_user_policy(self):

+         """Check if the policy entry is a user password policy"""

+ 

+         pwp_manager = PwPolicyManager(self._instance)

+         cn = self.get_attr_val_utf8_l('cn')

+         entrydn = cn.replace('cn=nspwpolicyentry_user,', '')

+ 

+         return pwp_manager.is_user_policy(entrydn)

+ 

+     def is_subtree_policy(self):

+         """Check if the policy entry is a user password policy"""

+ 

+         pwp_manager = PwPolicyManager(self._instance)

+         cn = self.get_attr_val_utf8_l('cn')

+         entrydn = cn.replace('cn=nspwpolicyentry_subtree,', '')

+ 

+         return pwp_manager.is_subtree_policy(entrydn)

+ 

+ 

+ class PwPolicyEntries(DSLdapObjects):

+     """DSLdapObjects that represents all password policy entries in container.

+ 

+     :param instance: An instance

+     :type instance: lib389.DirSrv

+     :param basedn: Suffix DN

+     :type basedn: str

+     :param rdn: The DN that will be combined wit basedn

+     :type rdn: str

+     """

+ 

+     def __init__(self, instance, basedn):

+         super(PwPolicyEntries, self).__init__(instance)

+         self._objectclasses = [

+             'top',

+             'ldapsubentry',

+             'passwordpolicy'

+         ]

+         self._filterattrs = ['cn']

+         self._childobject = PwPolicyEntry

+         self._basedn = basedn

  

-     def set_policy(self, targetdn, args, arg_to_attr):

-         '''This could be a user or subtree policy, so find out which one and

-         use the correct container dn'''

-         try:

-             cos_def_dn = 'cn=nsPwPolicy_CoS,{}'.format(targetdn)

-             self.conn.search_s(cos_def_dn, ldap.SCOPE_BASE, "(|(objectclass=ldapsubentry)(objectclass=*))")

-             # This is a subtree policy

-             container_dn = "cn=nsPwPolicyContainer,{}".format(targetdn)

-         except:

-             # This is a user policy

-             rdns = ldap.dn.explode_dn(targetdn)

-             rdns.pop(0)

-             parentdn = ",".join(rdns)

-             container_dn = "cn=nsPwPolicyContainer,{}".format(parentdn)

- 

-         policy_dn = 'cn="cn=nsPwPolicyEntry,{}",{}'.format(targetdn, container_dn)

-         mods = []

-         for arg in vars(args):

-             val = getattr(args, arg)

-             if arg in arg_to_attr and val is not None:

-                 mods.append((ldap.MOD_REPLACE, ensure_str(arg_to_attr[arg]), ensure_bytes(val)))

-         if len(mods) > 0:

-             self.conn.modify_s(policy_dn, mods)

@@ -4,8 +4,10 @@ 

  

  from lib389.cli_conf.backup import backup_create, backup_restore

  from lib389.cli_base import LogCapture, FakeArgs

+ from lib389.idm.user import UserAccounts

  from lib389.topologies import topology_st

  from lib389.utils import ds_is_older

+ from lib389._constants import DEFAULT_SUFFIX

  pytestmark = pytest.mark.skipif(ds_is_older('1.4.0'), reason="Not implemented")

  

  
@@ -13,18 +15,27 @@ 

      BACKUP_DIR = os.path.join(topology_st.standalone.ds_paths.backup_dir, "basic_backup")

      topology_st.logcap = LogCapture()

      args = FakeArgs()

+ 

+     users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)

+     user = users.create_test_user()

+     user.replace("description", "backup_test")

+ 

      # Clean the backup dir first

      if os.path.exists(BACKUP_DIR):

          shutil.rmtree(BACKUP_DIR)

+ 

      # Create the backup

      args.archive = BACKUP_DIR

      args.db_type = None

      backup_create(topology_st.standalone, None, topology_st.logcap.log, args)

      assert os.listdir(BACKUP_DIR)

+ 

      # Restore the backup

      args.archive = topology_st.standalone.ds_paths.backup_dir

      args.db_type = None

      backup_restore(topology_st.standalone, None, topology_st.logcap.log, args)

+     assert user.present("description", "backup_test")

+ 

      # No error has happened! Done!

      # Clean up

      if os.path.exists(BACKUP_DIR):

Description: Refactor Password Policy module and its CLI part.
Add PwPolicyManager object and PwPolicyEntry(DSLdapObject).
Validate LDIF and Backup dir paths. Don't accept a forward slash
because it can lead to a security flow.
Add an additional assertion to Backup/Restore CLI test suite.

https://pagure.io/389-ds-base/issue/49866

Reviewed by: ?

rebased onto fd793a31adca948671ca5972944a4972d46b41c6

5 years ago

Looks like we aren't handling set correctly. It allows an empty modification to be performed:

[root@localhost cli]# ./dsconf localhost pwpolicy set
Successfully updated global password policy

Audit log:

time: 20180824152612
dn: cn=config
result: 0
changetype: modify
replace: modifiersname
modifiersname: cn=dm
-
replace: modifytimestamp
modifytimestamp: 20180824192612Z
-

We should handling that better and returning an error/usage

I think the local password policy "list" could be improved:

[root@localhost cli]# ./dsconf localhost localpwp adduser --pwdtrack=on "ou=people,dc=example,dc=com"
Successfully created user password policy
[root@localhost cli]# ./dsconf localhost localpwp list dc=example,dc=com

[root@localhost cli]# ./dsconf localhost localpwp list ou=people,dc=example,dc=com
ou=people,dc=example,dc=com (user policy)

I think using the DN of "dc=example,dc=com" should return all the policies, but it looks like its doing a BASE scope search, and not a subtree search.

The rest looks good, CLI seems to work fine (except for list), nice job!

rebased onto 4ca007fb9f55f9e2e7978a96c103cdc721e94778

5 years ago

Thanks!
The issues are fixed, please, review.

I found an issue, not yours, but mine from the original commit. This error talks about creating a user policy when I am just trying to list all policies on a DN that does not exist:

# ./dsconf localhost localpwp list ou=TYPO,dc=example,dc=com
Error: Can not create user password policy because the target dn does not exist

So this error should be cleaned up, and maybe check all the errors messages for more copy-paste errors. Thanks!

The rest looks good, ACK

rebased onto dcefc0983aff3dc7b938e633d3ae84b52057b496

5 years ago

Nice catch! Fixed.
I cleaned up the errors. I'll give some time for William to review it too. I'll merge tomorrow if nothing will come up.

rebased onto d661622

5 years ago

Pull-Request has been merged by spichugi

5 years ago

Shouldn't this be a subclass of CosTemplates?

Hey, aren't pwpolicies subclasses of CosTemplates because they are cosTemplates?

Sorry for late review, I was unwell and travelling

Hey, aren't pwpolicies subclasses of CosTemplates because they are cosTemplates?

As I see the feature, we have one manager class which takes care of all PwPolicies (user, subtree and global). We use it for CLI (and WebUI) and we can use it for writing tests/set up the policy with lib389 API.

If I understood you correctly, the case you mentioned is about subtree policy only. I use cosTeamplate DSLdapObject but I do it in the PwPolicyManager method - create_subtree_policy.

I was referring to this guide (and existing Mark's code)
https://access.redhat.com/documentation/en-us/red_hat_directory_server/10/html-single/administration_guide/index#Configuring_the_Password_Policy-Configuring_SubtreeUser_Password_Policy_Using_the_Command_Line

@spichugi Okay, I'll need to re-read this to review properly. I'll trust you have done it for now, and we can always correct it later if needed.

389-ds-base is moving from Pagure to Github. This means that new issues and pull requests
will be accepted only in 389-ds-base's github repository.

This pull request has been cloned to Github as issue and is available here:
- https://github.com/389ds/389-ds-base/issues/2982

If you want to continue to work on the PR, please navigate to the github issue,
download the patch from the attachments and file a new pull request.

Thank you for understanding. We apologize for all inconvenience.

Pull-Request has been closed by spichugi

3 years ago