#50258 Ticket 50257 - lib389 - password policy user vs subtree checks are broken
Closed 3 years ago by spichugi. Opened 5 years ago by mreynolds.
mreynolds/389-ds-base ticket50257  into  master

@@ -928,6 +928,25 @@ 

              insts = []

          return insts

  

+     def exists(self, selector=[], dn=None):

+         """Check if a child entry exists

+ 

+         :returns: True if it exists

+         """

+         results = []

+         try:

+             if dn is not None:

+                 results = self._get_dn(dn)

+             else:

+                 results = self._get_selector(selector)

+         except:

+             return False

+ 

+         if len(results) == 1:

+             return True

+         else:

+             return False

+ 

      def get(self, selector=[], dn=None, json=False):

          """Get a child entry (DSLdapObject, Replica, etc.) with dn or selector

          using a base DN and objectClasses of our object (DSLdapObjects, Replicas, etc.)

@@ -271,6 +271,7 @@ 

          # Missing value

          raise ValueError("Missing attribute to delete")

  

+ 

  def _generic_modify_change_to_mod(change):

      values = change.split(":")

      if len(values) <= 2:
@@ -295,6 +296,7 @@ 

      else:

          raise ValueError("Unknown action '%s'. Expected add, delete, replace" % change)

  

+ 

  def _generic_modify(inst, basedn, log, manager_class, selector, args=None):

      # Here, we should have already selected the type etc. mc should be a

      # type of DSLdapObject (singular)

@@ -12,50 +12,8 @@ 

  from lib389.pwpolicy import PwPolicyEntries, PwPolicyManager

  from lib389.idm.account import Account

  

- arg_to_attr = {

-         'pwdlocal': 'nsslapd-pwpolicy-local',

-         'pwdscheme': 'passwordstoragescheme',

-         'pwdchange': 'passwordChange',

-         'pwdmustchange': 'passwordMustChange',

-         'pwdhistory': 'passwordHistory',

-         'pwdhistorycount': 'passwordInHistory',

-         'pwdadmin': 'passwordAdminDN',

-         'pwdtrack': 'passwordTrackUpdateTime',

-         'pwdwarning': 'passwordWarning',

-         'pwdisglobal': 'passwordIsGlobalPolicy',

-         'pwdexpire': 'passwordExp',

-         'pwdmaxage': 'passwordMaxAge',

-         'pwdminage': 'passwordMinAge',

-         'pwdgracelimit': 'passwordGraceLimit',

-         'pwdsendexpiring': 'passwordSendExpiringTime',

-         'pwdlockout': 'passwordLockout',

-         'pwdunlock': 'passwordUnlock',

-         'pwdlockoutduration': 'passwordLockoutDuration',

-         'pwdmaxfailures': 'passwordMaxFailure',

-         'pwdresetfailcount': 'passwordResetFailureCount',

-         'pwdchecksyntax': 'passwordCheckSyntax',

-         'pwdminlen': 'passwordMinLength',

-         'pwdmindigits': 'passwordMinDigits',

-         'pwdminalphas': 'passwordMinAlphas',

-         'pwdminuppers': 'passwordMinUppers',

-         'pwdminlowers': 'passwordMinLowers',

-         'pwdminspecials': 'passwordMinSpecials',

-         'pwdmin8bits': 'passwordMin8bit',

-         'pwdmaxrepeats': 'passwordMaxRepeats',

-         'pwdpalindrome': 'passwordPalindrome',

-         'pwdmaxseq': 'passwordMaxSequence',

-         'pwdmaxseqsets': 'passwordMaxSeqSets',

-         'pwdmaxclasschars': 'passwordMaxClassChars',

-         'pwdmincatagories': 'passwordMinCategories',

-         'pwdmintokenlen': 'passwordMinTokenLength',

-         'pwdbadwords': 'passwordBadWords',

-         'pwduserattrs': 'passwordUserAttributes',

-         'pwddictcheck': 'passwordDictCheck',

-         'pwddictpath': 'passwordDictPath',

-         'pwdallowhash': 'nsslapd-allow-hashed-passwords'

-     }

- 

- def _args_to_attrs(args):

+ 

+ def _args_to_attrs(args, arg_to_attr):

      attrs = {}

      for arg in vars(args):

          val = getattr(args, arg)
@@ -68,50 +26,44 @@ 

      pwp_manager = PwPolicyManager(inst)

      if dn is None:

          return "Global Password Policy"

-     elif pwp_manager.is_user_policy(dn):

-         return "User Policy"

      elif pwp_manager.is_subtree_policy(dn):

          return "Subtree Policy"

      else:

-         raise ValueError("The policy wasn't set up for the target dn entry or it is invalid")

+         return "User Policy"

  

  

  def _get_pw_policy(inst, targetdn, log, use_json=None):

      pwp_manager = PwPolicyManager(inst)

      policy_type = _get_policy_type(inst, targetdn)

- 

+     attr_list = pwp_manager.get_attr_list()

      if "global" in policy_type.lower():

          targetdn = 'cn=config'

-         pwp_manager.pwp_attributes.extend(['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local'])

+         attr_list.extend(['passwordIsGlobalPolicy', 'nsslapd-pwpolicy_local'])

+         attrs = inst.config.get_attrs_vals_utf8(attr_list)

      else:

-         targetdn = pwp_manager.get_pwpolicy_entry(targetdn).dn

- 

-     entries = inst.search_s(targetdn, ldap.SCOPE_BASE, 'objectclass=*', pwp_manager.pwp_attributes)

-     entry = entries[0]

+         policy = pwp_manager.get_pwpolicy_entry(targetdn)

+         targetdn = policy.dn

+         attrs = policy.get_attrs_vals_utf8(attr_list)

  

      if use_json:

-         str_attrs = {}

-         for k in entry.data:

-             str_attrs[ensure_str(k)] = ensure_list_str(entry.data[k])

- 

-         # ensure all the keys are lowercase

-         str_attrs = dict((k.lower(), v) for k, v in str_attrs.items())

- 

-         print(json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": str_attrs}))

+         print(json.dumps({"type": "entry", "pwp_type": policy_type, "dn": ensure_str(targetdn), "attrs": attrs}))

      else:

          if "global" in policy_type.lower():

              response = "Global Password Policy: cn=config\n------------------------------------\n"

          else:

              response = "Local {} Policy: {}\n------------------------------------\n".format(policy_type, targetdn)

-         for k in entry.data:

-             response += "{}: {}\n".format(k, ensure_str(entry.data[k][0]))

+         for key, value in list(attrs.items()):

+             if len(value) == 0:

+                 value = ""

+             else:

+                 value = value[0]

+             response += "{}: {}\n".format(key, value)

          print(response)

  

  

  def list_policies(inst, basedn, log, args):

      log = log.getChild('list_policies')

      targetdn = args.DN[0]

-     pwp_manager = PwPolicyManager(inst)

  

      if args.json:

          result = {'type': 'list', 'items': []}
@@ -125,26 +77,16 @@ 

  

      # User pwpolicy entry is under the container that is under the parent,

      # so we need to go one level up

-     if pwp_manager.is_user_policy(targetdn):

-         policy_type = _get_policy_type(inst, user_entry.dn)

+     pwp_entries = PwPolicyEntries(inst, targetdn)

+     for pwp_entry in pwp_entries.list():

+         dn_comps = ldap.dn.explode_dn(pwp_entry.get_attr_val_utf8_l('cn'))

+         dn_comps.pop(0)

+         entrydn = ",".join(dn_comps)

+         policy_type = _get_policy_type(inst, entrydn)

          if args.json:

-             result['items'].append([user_entry.dn, policy_type])

+             result['items'].append([entrydn, policy_type])

          else:

-             result += "%s (%s)\n" % (user_entry.dn, policy_type.lower())

-     else:

-         pwp_entries = PwPolicyEntries(inst, targetdn)

-         for pwp_entry in pwp_entries.list():

-             cn = pwp_entry.get_attr_val_utf8_l('cn')

-             if pwp_entry.is_subtree_policy():

-                 entrydn = cn.replace('cn=nspwpolicyentry_subtree,', '')

-             else:

-                 entrydn = cn.replace('cn=nspwpolicyentry_user,', '')

-             policy_type = _get_policy_type(inst, entrydn)

- 

-             if args.json:

-                 result['items'].append([entrydn, policy_type])

-             else:

-                 result += "%s (%s)\n" % (entrydn, policy_type.lower())

+             result += "%s (%s)\n" % (entrydn, policy_type.lower())

  

      if args.json:

          print(json.dumps(result))
@@ -165,8 +107,8 @@ 

  def create_subtree_policy(inst, basedn, log, args):

      log = log.getChild('create_subtree_policy')

      # Gather the attributes

-     attrs = _args_to_attrs(args)

      pwp_manager = PwPolicyManager(inst)

+     attrs = _args_to_attrs(args, pwp_manager.arg_to_attr)

      pwp_manager.create_subtree_policy(args.DN[0], attrs)

  

      print('Successfully created subtree password policy')
@@ -174,9 +116,8 @@ 

  

  def create_user_policy(inst, basedn, log, args):

      log = log.getChild('create_user_policy')

-     # Gather the attributes

-     attrs = _args_to_attrs(args)

      pwp_manager = PwPolicyManager(inst)

+     attrs = _args_to_attrs(args, pwp_manager.arg_to_attr)

      pwp_manager.create_user_policy(args.DN[0], attrs)

  

      print('Successfully created user password policy')
@@ -184,9 +125,8 @@ 

  

  def set_global_policy(inst, basedn, log, args):

      log = log.getChild('set_global_policy')

-     # Gather the attributes

-     attrs = _args_to_attrs(args)

      pwp_manager = PwPolicyManager(inst)

+     attrs = _args_to_attrs(args, pwp_manager.arg_to_attr)

      pwp_manager.set_global_policy(attrs)

  

      print('Successfully updated global password policy')
@@ -195,9 +135,8 @@ 

  def set_local_policy(inst, basedn, log, args):

      log = log.getChild('set_local_policy')

      targetdn = args.DN[0]

-     # Gather the attributes

-     attrs = _args_to_attrs(args)

      pwp_manager = PwPolicyManager(inst)

+     attrs = _args_to_attrs(args, pwp_manager.arg_to_attr)

      pwp_entry = pwp_manager.get_pwpolicy_entry(args.DN[0])

      policy_type = _get_policy_type(inst, targetdn)

  

file modified
+140 -131
@@ -13,9 +13,6 @@ 

  from lib389.idm.nscontainer import nsContainers, nsContainer

  from lib389.cos import CosPointerDefinitions, CosPointerDefinition, CosTemplates

  

- USER_POLICY = 1

- SUBTREE_POLICY = 2

- 

  

  class PwPolicyManager(object):

      """Manages user, subtree and global password policies
@@ -27,88 +24,66 @@ 

      def __init__(self, instance):

          self._instance = instance

          self.log = instance.log

-         self.pwp_attributes = [

-             'passwordstoragescheme',

-             'passwordChange',

-             'passwordMustChange',

-             'passwordHistory',

-             'passwordInHistory',

-             'passwordAdminDN',

-             'passwordTrackUpdateTime',

-             'passwordWarning',

-             'passwordMaxAge',

-             'passwordMinAge',

-             'passwordExp',

-             'passwordGraceLimit',

-             'passwordSendExpiringTime',

-             'passwordLockout',

-             'passwordUnlock',

-             'passwordMaxFailure',

-             'passwordLockoutDuration',

-             'passwordResetFailureCount',

-             'passwordCheckSyntax',

-             'passwordMinLength',

-             'passwordMinDigits',

-             'passwordMinAlphas',

-             'passwordMinUppers',

-             'passwordMinLowers',

-             'passwordMinSpecials',

-             'passwordMaxRepeats',

-             'passwordMin8bit',

-             'passwordMinCategories',

-             'passwordMinTokenLength',

-             'passwordDictPath',

-             'passwordDictCheck',

-             'passwordPalindrome',

-             'passwordMaxSequence',

-             'passwordMaxClassChars',

-             'passwordMaxSeqSets',

-             'passwordBadWords',

-             'passwordUserAttributes',

-             'passwordIsGlobalPolicy',

-             'nsslapd-pwpolicy-local',

-             'nsslapd-allow-hashed-passwords'

-         ]

- 

-     def is_user_policy(self, dn):

-         """Check if the entry has a user password policy

- 

-         :param dn: Entry DN with PwPolicy set up

-         :type dn: str

- 

-         :returns: True if the entry has a user policy, False otherwise

-         """

- 

-         # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too

-         entry = Account(self._instance, dn)

-         try:

-             if entry.present("objectclass", "costemplate"):

-                 # It is a CoSTemplate entry, not user policy

-                 return False

- 

-             # Check if it's a subtree or a user policy

-             if entry.present("pwdpolicysubentry"):

-                 return True

-             else:

-                 return False

-         except ldap.NO_SUCH_OBJECT:

-             return False

+         self.arg_to_attr = {

+             'pwdlocal': 'nsslapd-pwpolicy-local',

+             'pwdscheme': 'passwordstoragescheme',

+             'pwdchange': 'passwordChange',

+             'pwdmustchange': 'passwordMustChange',

+             'pwdhistory': 'passwordHistory',

+             'pwdhistorycount': 'passwordInHistory',

+             'pwdadmin': 'passwordAdminDN',

+             'pwdtrack': 'passwordTrackUpdateTime',

+             'pwdwarning': 'passwordWarning',

+             'pwdisglobal': 'passwordIsGlobalPolicy',

+             'pwdexpire': 'passwordExp',

+             'pwdmaxage': 'passwordMaxAge',

+             'pwdminage': 'passwordMinAge',

+             'pwdgracelimit': 'passwordGraceLimit',

+             'pwdsendexpiring': 'passwordSendExpiringTime',

+             'pwdlockout': 'passwordLockout',

+             'pwdunlock': 'passwordUnlock',

+             'pwdlockoutduration': 'passwordLockoutDuration',

+             'pwdmaxfailures': 'passwordMaxFailure',

+             'pwdresetfailcount': 'passwordResetFailureCount',

+             'pwdchecksyntax': 'passwordCheckSyntax',

+             'pwdminlen': 'passwordMinLength',

+             'pwdmindigits': 'passwordMinDigits',

+             'pwdminalphas': 'passwordMinAlphas',

+             'pwdminuppers': 'passwordMinUppers',

+             'pwdminlowers': 'passwordMinLowers',

+             'pwdminspecials': 'passwordMinSpecials',

+             'pwdmin8bits': 'passwordMin8bit',

+             'pwdmaxrepeats': 'passwordMaxRepeats',

+             'pwdpalindrome': 'passwordPalindrome',

+             'pwdmaxseq': 'passwordMaxSequence',

+             'pwdmaxseqsets': 'passwordMaxSeqSets',

+             'pwdmaxclasschars': 'passwordMaxClassChars',

+             'pwdmincatagories': 'passwordMinCategories',

+             'pwdmintokenlen': 'passwordMinTokenLength',

+             'pwdbadwords': 'passwordBadWords',

+             'pwduserattrs': 'passwordUserAttributes',

+             'pwddictcheck': 'passwordDictCheck',

+             'pwddictpath': 'passwordDictPath',

+             'pwdallowhash': 'nsslapd-allow-hashed-passwords'

+         }

+ 

+     def get_attr_list(self):

+         attr_list = []

+         for arg, attr in list(self.arg_to_attr.items()):

+             attr_list.append(attr)

+         return attr_list

  

      def is_subtree_policy(self, dn):

-         """Check if the entry has a subtree password policy

+         """Check if the entry has a subtree password policy.  If we can find a

+         template entry it is subtree policy

  

          :param dn: Entry DN with PwPolicy set up

          :type dn: str

  

          :returns: True if the entry has a subtree policy, False otherwise

          """

- 

-         # CoSTemplate entry also can have 'pwdpolicysubentry', so we better validate this part too

-         cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn)

-         if cos_pointer_def.exists():

-             return True

-         else:

-             return False

+         cos_templates = CosTemplates(self._instance, 'cn=nsPwPolicyContainer,{}'.format(dn))

+         return cos_templates.exists('cn=nsPwTemplateEntry,%s' % dn)

  

      def create_user_policy(self, dn, properties):

          """Creates all entries which are needed for the user
@@ -127,9 +102,9 @@ 

          if not user_entry.exists():

              raise ValueError('Can not create user password policy because the target dn does not exist')

  

-         rdns = ldap.dn.explode_dn(user_entry.dn)

-         rdns.pop(0)

-         parentdn = ",".join(rdns)

+         dn_comps = ldap.dn.explode_dn(user_entry.dn)

+         dn_comps.pop(0)

+         parentdn = ",".join(dn_comps)

  

          # Create the pwp container if needed

          pwp_containers = nsContainers(self._instance, basedn=parentdn)
@@ -139,9 +114,13 @@ 

          properties['cn'] = 'cn=nsPwPolicyEntry_user,%s' % dn

          pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

          pwp_entry = pwp_entries.create(properties=properties)

- 

-         # Add policy to the entry

-         user_entry.replace('pwdpolicysubentry', pwp_entry.dn)

+         try:

+             # Add policy to the entry

+             user_entry.replace('pwdpolicysubentry', pwp_entry.dn)

+         except ldap.LDAPError as e:

+             # failure, undo what we have done

+             pwp_entry.delete()

+             raise e

  

          # make sure that local policies are enabled

          self.set_global_policy({'nsslapd-pwpolicy-local': 'on'})
@@ -170,22 +149,31 @@ 

          pwp_container = pwp_containers.ensure_state(properties={'cn': 'nsPwPolicyContainer'})

  

          # Create policy entry

+         pwp_entry = None

          properties['cn'] = 'cn=nsPwPolicyEntry_subtree,%s' % dn

          pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

          pwp_entry = pwp_entries.create(properties=properties)

- 

-         # The CoS template entry (nsPwTemplateEntry)

-         # that has the pwdpolicysubentry value pointing to the above (nsPwPolicyEntry) entry

-         cos_templates = CosTemplates(self._instance, pwp_container.dn)

-         cos_template = cos_templates.create(properties={'cosPriority': '1',

-                                                         'pwdpolicysubentry': pwp_entry.dn,

-                                                         'cn': 'cn=nsPwTemplateEntry,%s' % dn})

- 

-         # The CoS specification entry at the subtree level

-         cos_pointer_defs = CosPointerDefinitions(self._instance, dn)

-         cos_pointer_defs.create(properties={'cosAttribute': 'pwdpolicysubentry default operational',

-                                             'cosTemplateDn': cos_template.dn,

-                                             'cn': 'nsPwPolicy_CoS'})

+         try:

+             # The CoS template entry (nsPwTemplateEntry) that has the pwdpolicysubentry

+             # value pointing to the above (nsPwPolicyEntry) entry

+             cos_template = None

+             cos_templates = CosTemplates(self._instance, pwp_container.dn)

+             cos_template = cos_templates.create(properties={'cosPriority': '1',

+                                                             'pwdpolicysubentry': pwp_entry.dn,

+                                                             'cn': 'cn=nsPwTemplateEntry,%s' % dn})

+ 

+             # The CoS specification entry at the subtree level

+             cos_pointer_defs = CosPointerDefinitions(self._instance, dn)

+             cos_pointer_defs.create(properties={'cosAttribute': 'pwdpolicysubentry default operational',

+                                                 'cosTemplateDn': cos_template.dn,

+                                                 'cn': 'nsPwPolicy_CoS'})

+         except ldap.LDAPError as e:

+             # Something went wrong, remove what we have done

+             if pwp_entry is not None:

+                 pwp_entry.delete()

+             if cos_template is not None:

+                 cos_template.delete()

+             raise e

  

          # make sure that local policies are enabled

          self.set_global_policy({'nsslapd-pwpolicy-local': 'on'})
@@ -201,23 +189,28 @@ 

          :returns: PwPolicyEntry instance

          """

  

-         # Verify target dn exists before getting started

          entry = Account(self._instance, dn)

          if not entry.exists():

              raise ValueError('Can not get the password policy entry because the target dn does not exist')

  

-         # Check if it's a subtree or a user policy

-         if self.is_user_policy(entry.dn):

-             pwp_entry_dn = entry.get_attr_val_utf8("pwdpolicysubentry")

-         elif self.is_subtree_policy(entry.dn):

-             pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % dn)

- 

-             pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

-             pwp_entry_dn = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn).dn

-         else:

-             raise ValueError("The policy wasn't set up for the target dn entry or it is invalid")

- 

-         return PwPolicyEntry(self._instance, pwp_entry_dn)

+         # Get the parent DN

+         dn_comps = ldap.dn.explode_dn(entry.dn)

+         dn_comps.pop(0)

+         parentdn = ",".join(dn_comps)

+ 

+         # Get the parent's policies

+         pwp_entries = PwPolicyEntries(self._instance, parentdn)

+         policies = pwp_entries.list()

+         for policy in policies:

+             dn_comps = ldap.dn.explode_dn(policy.get_attr_val_utf8_l('cn'))

+             dn_comps.pop(0)

+             pwp_dn = ",".join(dn_comps)

+             if pwp_dn == dn.lower():

+                 # This DN does have a policy associated with it

+                 return policy

+ 

+         # Did not find a policy for this entry

+         raise ValueError("No password policy was found for this entry")

  

      def delete_local_policy(self, dn):

          """Delete a local password policy entry
@@ -227,43 +220,59 @@ 

          """

  

          subtree = False

-         # Verify target dn exists before getting started

+ 

+         # Verify target dn exists, and has a policy

          entry = Account(self._instance, dn)

          if not entry.exists():

              raise ValueError('The target entry dn does not exist')

+         pwp_entry = self.get_pwpolicy_entry(entry.dn)

  

+         # Subtree or user policy?

          if self.is_subtree_policy(entry.dn):

              parentdn = dn

              subtree = True

-         elif self.is_user_policy(entry.dn):

-             rdns = ldap.dn.explode_dn(entry.dn)

-             rdns.pop(0)

-             parentdn = ",".join(rdns)

          else:

-             raise ValueError("The policy wasn't set up for the target dn entry or the policy is invalid")

+             dn_comps = ldap.dn.explode_dn(dn)

+             dn_comps.pop(0)

+             parentdn = ",".join(dn_comps)

  

+         # Starting deleting the policy, ignore the parts that might already have been removed

          pwp_container = nsContainer(self._instance, 'cn=nsPwPolicyContainer,%s' % parentdn)

- 

-         pwp_entries = PwPolicyEntries(self._instance, pwp_container.dn)

          if subtree:

-             pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_subtree,%s' % dn)

+             try:

+                 # Delete template

+                 cos_templates = CosTemplates(self._instance, pwp_container.dn)

+                 cos_template = cos_templates.get('cn=nsPwTemplateEntry,%s' % dn)

+                 cos_template.delete()

+             except ldap.NO_SUCH_OBJECT:

+                 # Already deleted

+                 pass

+             try:

+                 # Delete definition

+                 cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn)

+                 cos_pointer_def.delete()

+             except ldap.NO_SUCH_OBJECT:

+                 # Already deleted

+                 pass

          else:

-             pwp_entry = pwp_entries.get('cn=nsPwPolicyEntry_user,%s' % dn)

- 

-         if self.is_subtree_policy(entry.dn):

-             cos_templates = CosTemplates(self._instance, pwp_container.dn)

-             cos_template = cos_templates.get('cn=nsPwTemplateEntry,%s' % dn)

-             cos_template.delete()

- 

-             cos_pointer_def = CosPointerDefinition(self._instance, 'cn=nsPwPolicy_CoS,%s' % dn)

-             cos_pointer_def.delete()

-         else:

-             entry.remove("pwdpolicysubentry", pwp_entry.dn)

+             try:

+                 # Cleanup user entry

+                 entry.remove("pwdpolicysubentry", pwp_entry.dn)

+             except ldap.NO_SUCH_ATTRIBUTE:

+                 # Policy already removed from user

+                 pass

+ 

+         # Remove the local policy

+         try:

+             pwp_entry.delete()

+         except ldap.NO_SUCH_OBJECT:

+             # Already deleted

+             pass

  

-         pwp_entry.delete()

          try:

              pwp_container.delete()

-         except ldap.NOT_ALLOWED_ON_NONLEAF:

+         except (ldap.NOT_ALLOWED_ON_NONLEAF, ldap.NO_SUCH_OBJECT):

+             # There are other policies still using this container, no problem

              pass

  

      def set_global_policy(self, properties):

@@ -78,19 +78,34 @@ 

  

  def check_output(some_string, missing=False, ignorecase=True):

      """Check the output of captured STDOUT.  This assumes "sys.stdout = io.StringIO()"

-     otherwise there would be nothing to read

-     :param some_string - text to search for in output

+     otherwise there would be nothing to read.  Flush IO after performing check.

+     :param some_string - text, or list of strings, to search for in output

      :param missing - test if some_string is NOT present in output

      :param ignorecase - Set whether to ignore the character case in both the output

                          and some_string

      """

      output = sys.stdout.getvalue()

+     is_list = isinstance(some_string, list)

+ 

      if ignorecase:

          output = output.lower()

-         some_string = some_string.lower()

+         if is_list:

+             some_string = [text.lower() for text in some_string]

+         else:

+             some_string = some_string.lower()

+ 

      if missing:

-         assert(some_string not in output)

+         if is_list:

+             for text in some_string:

+                 assert(text not in output)

+         else:

+             assert(some_string not in output)

      else:

-         assert(some_string in output)

-     # clear buffer

+         if is_list:

+             for text in some_string:

+                 assert(text in output)

+         else:

+             assert(some_string in output)

+ 

+     # Clear the buffer

      sys.stdout = io.StringIO()

@@ -0,0 +1,151 @@ 

+ import io

+ import sys

+ import pytest

+ 

+ from lib389.cli_conf.pwpolicy import (create_user_policy, create_subtree_policy,

+                                       list_policies, set_local_policy,

+                                       get_local_policy, del_local_policy,

+                                       get_global_policy, set_global_policy)

+ 

+ from lib389.cli_base import LogCapture, FakeArgs

+ from lib389.tests.cli import check_output

+ from lib389.topologies import topology_st

+ from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES

+ from lib389.idm.organizationalunit import OrganizationalUnits

+ from lib389._constants import (DEFAULT_SUFFIX)

+ from lib389.utils import ds_is_older

+ 

+ pytestmark = pytest.mark.skipif(ds_is_older('1.4.0'), reason="Not implemented")

+ 

+ USER_DN = "uid=testuser,ou=people,{}".format(DEFAULT_SUFFIX)

+ USER_OUTPUT = "{} (user policy)".format(USER_DN)

+ OU_DN = "ou=people,{}".format(DEFAULT_SUFFIX)

+ OU_OUTPUT = "{} (subtree policy)".format(OU_DN)

+ 

+ 

+ @pytest.fixture(scope="function")

+ def test_args(dn):

+     args = FakeArgs()

+     args.suffix = False

+     args.json = False

+     args.verbose = False

+     args.DN = [dn]

+     return args

+ 

+ 

+ @pytest.fixture(scope="function")

+ def do_setup(topology_st, request):

+     """Create a user and make sure ou=pople exists

+     """

+     sys.stdout = io.StringIO()

+ 

+     users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)

+     users.ensure_state(properties=TEST_USER_PROPERTIES)

+ 

+     ou = OrganizationalUnits(topology_st.standalone, DEFAULT_SUFFIX)

+     ou.ensure_state(properties={'ou': 'people'})

+ 

+ 

+ def test_pwp_cli(topology_st, do_setup):

+     """Test creating, listing, getting, and deleting a backend (and subsuffix)

+     :id: 800f432a-52ab-4661-ac66-a2bdd9b984da

+     :setup: Standalone instance

+     :steps:

+         1. Create User policy

+         2. Create Subtree policy

+         3. List policies

+         4. Set user policy

+         5. Get user policy

+         6. Set subtree policy

+         7. Get subtree policy

+         8. Delete user policy

+         9. Delete subtree policy

+         10. List local policies - make sure none are returned

+         11. Get global policy

+         12. Set global policy

+         13. Verify global policy update

+     :expectedresults:

+         1. Success

+         2. Success

+         3. Success

+         4. Success

+         5. Success

+         6. Success

+         7. Success

+         8. Success

+         9. Success

+         10. Success

+         11. Success

+         12. Success

+         13. Success

+     """

+     topology_st.logcap = LogCapture()

+     sys.stdout = io.StringIO()

+ 

+     # Create User Policy

+     args = test_args(USER_DN)

+     args.pwdchange = 'on'

+     create_user_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # Create Subtree Policy

+     args = test_args(OU_DN)

+     args.pwdchange = 'off'

+     create_subtree_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # List policies

+     args = test_args(DEFAULT_SUFFIX)

+     list_policies(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output([USER_OUTPUT, OU_OUTPUT])

+ 

+     # Set User Policy

+     args = test_args(USER_DN)

+     args.pwdhistory = 'on'

+     set_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # Get User Policy

+     args = test_args(USER_DN)

+     get_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output("passwordHistory: on")

+ 

+     # Set Subtree Policy

+     args = test_args(OU_DN)

+     args.pwdexpire = 'on'

+     set_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # Get Subtree Policy

+     args = test_args(OU_DN)

+     get_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output("passwordExp: on")

+ 

+     # Delete User Policy (and verify)

+     args = test_args(USER_DN)

+     del_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     with pytest.raises(ValueError):

+         get_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # Delete Subtree Policy (and verify)

+     args = test_args(OU_DN)

+     del_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     with pytest.raises(ValueError):

+         get_local_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # List policies (or lack there of)

+     args = test_args(DEFAULT_SUFFIX)

+     list_policies(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output([USER_OUTPUT, OU_OUTPUT], missing=True)

+ 

+     # Get global policy

+     args = test_args(DEFAULT_SUFFIX)

+     get_global_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output('passwordLockout: off')

+ 

+     # Set global policy

+     args = test_args(DEFAULT_SUFFIX)

+     args.pwdlockout = "on"

+     set_global_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+ 

+     # Check update was applied

+     get_global_policy(topology_st.standalone, None, topology_st.logcap.log, args)

+     check_output('passwordLockout: on')

Description:

We were not properly checking for user verses subtree policies. This patch cleaned up alot of flawed code, and properly uses DSLdapObjects to find policies and process them.

https://pagure.io/389-ds-base/issue/50257

rebased onto 0f6e4277e9f50b5af61df6180a74d74c55bbe634

5 years ago

rebased onto 14b7721e2ce86e13e852f5cafc6a2c5b18875075

5 years ago

rebased onto a6b3bd87db4605bf7c461bbe1643ac0e9eb90aca

5 years ago

Added CLI test for local and global password policy CLI commands...

rebased onto 45df44e34bb1ea45303c5eb10fea2fef18e02a46

5 years ago

rebased onto fc1c1c15502f4cc48932cd759cce7423fa67a54a

5 years ago

With this search, is there a way you could use the DSLdapObjects to achieve this? It'll trigger the _s usage warning.

If this is absolutely required, is it possible to have it use the server_controls and the escapehatch flags?

For my own curiosity, why do do we remap these attribute names to different values?

For my own curiosity, why do do we remap these attribute names to different values?

They are actually the alias names of the attributes in the schema, but the idea was: less to type, slightly easier to read.

Isn't there a .exists you could use instead?

Could reset be a seperate function instead of an argument here? I think that's better design and clearer about when and how reset is applied relative to the check.

What's this for?

This is required for LogCapture, and things like check_output()

Isn't there a .exists you could use instead?

There is not, not for DSLdapObjects, so I added one...

rebased onto 3f42ff7655024fbc5e563ba8067b3d5cb2af053a

5 years ago

All changes applied, please review...

Weird question but is this function idempotent? IE given a "full policy state", "partial state", or "removed", does it "always succeed"? I think that would be good in the case of a partial error, we can re-run and have it still succeed. This will be important for command line later ....

I think this isn't quite what I had in mind. I was thinking more:

blah = logs.check_output()
logs.reset_output()

I think this isn't quite what I had in mind. I was thinking more:

Well you clearly stated create a new function that does have not that option :-p

blah = logs.check_output()
logs.reset_output()

So "check_output" actually has nothing to do with LogCapture. I thought they were related in regards to the IO.string stuff, but they are not (my bad). So it doesn't make sense to move it to LogCapture.

The most common use case is a one time read of the output, only in special situations do you need to check for multiple values. So I think I will just change the function to accept a list of values to check (and then always clear the buffer). So no new args, and no new functions.

Weird question but is this function idempotent? IE given a "full policy state", "partial state", or "removed", does it "always succeed"? I think that would be good in the case of a partial error, we can re-run and have it still succeed. This will be important for command line later ....

Well it always succeeds assuming everything is already correctly set up. For existing deployments this is not a guarantee. So I will make sure it will work on partial set ups...

rebased onto 4613ab5be2684372b2b19ebfe9d175599ff1f91c

5 years ago

rebased onto 36989662bca6ec8bafa590727bc4804ccbdce1b3

5 years ago

@firstyear, okay I rewrote check_output to optionally accept a list or string, and I made the create user/subtree policy & delete local policy functions to be idempotent.

Niiccee! Looks great, ack from me :)

(I will leave you the happy satisfaction of clicking merge though)

rebased onto 0ad1dd2

5 years ago

Pull-Request has been merged by mreynolds

5 years ago

389-ds-base is moving from Pagure to Github. This means that new issues and pull requests
will be accepted only in 389-ds-base's github repository.

This pull request has been cloned to Github as issue and is available here:
- https://github.com/389ds/389-ds-base/issues/3317

If you want to continue to work on the PR, please navigate to the github issue,
download the patch from the attachments and file a new pull request.

Thank you for understanding. We apologize for all inconvenience.

Pull-Request has been closed by spichugi

3 years ago