From ad3e726a0361e1745dc2fc9c9d1fd542211e0c69 Mon Sep 17 00:00:00 2001 From: Simon Pichugin Date: Oct 11 2017 15:24:23 +0000 Subject: Issue 27 - Add a module for working with dse.ldif file Description: For some tests we need a way to parse and edit dse.ldif file. For starters, it will be nice to have next operations support: - get - Return attribute values under a given entry; - add - Add an attribute under a given entry; - delete - Delete singlevalued or multivalued attributes under a given entry; - replace - Replace attribute values with a new one under a given entry. Add tests to lib389/tests/dseldif_test.py https://pagure.io/lib389/issue/27 Reviewed by: wibrown (Thanks!) --- diff --git a/src/lib389/lib389/_constants.py b/src/lib389/lib389/_constants.py index 96e5517..1aff17d 100644 --- a/src/lib389/lib389/_constants.py +++ b/src/lib389/lib389/_constants.py @@ -69,6 +69,8 @@ DN_DM = "cn=Directory Manager" PW_DM = "password" DN_CONFIG = "cn=config" DN_LDBM = "cn=ldbm database,cn=plugins,cn=config" +DN_CONFIG_LDBM = "cn=config,cn=ldbm database,cn=plugins,cn=config" +DN_USERROOT_LDBM = "cn=userRoot,cn=ldbm database,cn=plugins,cn=config" DN_SCHEMA = "cn=schema" DN_MONITOR = "cn=monitor" DN_MONITOR_SNMP = "cn=snmp,cn=monitor" diff --git a/src/lib389/lib389/dseldif.py b/src/lib389/lib389/dseldif.py new file mode 100644 index 0000000..39e35b2 --- /dev/null +++ b/src/lib389/lib389/dseldif.py @@ -0,0 +1,95 @@ +# --- BEGIN COPYRIGHT BLOCK --- +# Copyright (C) 2017 Red Hat, Inc. +# All rights reserved. +# +# License: GPL (version 3 or any later version). +# See LICENSE for details. +# --- END COPYRIGHT BLOCK --- +# +import os +from lib389.paths import Paths + + +class DSEldif(object): + """A class for working with dse.ldif file""" + + def __init__(self, instance): + self._instance = instance + + ds_paths = Paths(self._instance.serverid, self._instance) + self.path = os.path.join(ds_paths.config_dir, 'dse.ldif') + + with open(self.path, 'r') as file_dse: + self._contents = file_dse.readlines() + + def _update(self): + """Update the dse.ldif with a new contents""" + + with open(self.path, "w") as file_dse: + file_dse.write("".join(self._contents)) + + def _find_attr(self, entry_dn, attr): + """Find all attribute values and indexes under a given entry + + Returns entry dn index and attribute data dict: + relative attribute indexes and the attribute value + """ + + entry_dn_i = self._contents.index("dn: {}\n".format(entry_dn)) + attr_data = {} + + # Find where the entry ends + try: + dn_end_i = self._contents[entry_dn_i:].index("\n") + except ValueError: + # We are in the end of the list + dn_end_i = len(self._contents) + + entry_slice = self._contents[entry_dn_i:entry_dn_i + dn_end_i] + + # Find the attribute + for line in entry_slice: + if line.startswith(attr): + attr_value = line.split(" ", 1)[1][:-1] + attr_data.update({entry_slice.index(line): attr_value}) + + if not attr_data: + raise ValueError("Attribute {} wasn't found under dn: {}".format(attr, entry_dn)) + + return entry_dn_i, attr_data + + def get(self, entry_dn, attr): + """Return attribute values under a given entry""" + + _, attr_data = self._find_attr(entry_dn, attr) + + return attr_data.values() + + def add(self, entry_dn, attr, value): + """Add an attribute under a given entry""" + + entry_dn_i = self._contents.index("dn: {}\n".format(entry_dn)) + self._contents.insert(entry_dn_i+1, "{}: {}\n".format(attr, value)) + self._update() + + def delete(self, entry_dn, attr, value=None): + """Delete attributes under a given entry""" + + entry_dn_i, attr_data = self._find_attr(entry_dn, attr) + + if value is not None: + for attr_i, attr_value in attr_data.items(): + if attr_value == value: + del self._contents[entry_dn_i + attr_i] + else: + for attr_i in sorted(attr_data.keys(), reverse=True): + del self._contents[entry_dn_i + attr_i] + self._update() + + def replace(self, entry_dn, attr, value): + """Replace attribute values with a new one under a given entry""" + + self.delete(entry_dn, attr) + self.add(entry_dn, attr, value) + self._update() + diff --git a/src/lib389/lib389/tests/dseldif_test.py b/src/lib389/lib389/tests/dseldif_test.py new file mode 100644 index 0000000..25eef77 --- /dev/null +++ b/src/lib389/lib389/tests/dseldif_test.py @@ -0,0 +1,129 @@ +# --- BEGIN COPYRIGHT BLOCK --- +# Copyright (C) 2017 Red Hat, Inc. +# All rights reserved. +# +# License: GPL (version 3 or any later version). +# See LICENSE for details. +# --- END COPYRIGHT BLOCK --- +# +import logging +import pytest + +from lib389._constants import * +from lib389.dseldif import DSEldif +from lib389.topologies import topology_st as topo + +DEBUGGING = os.getenv('DEBUGGING', False) + +if DEBUGGING: + logging.getLogger(__name__).setLevel(logging.DEBUG) +else: + logging.getLogger(__name__).setLevel(logging.INFO) + +log = logging.getLogger(__name__) + + +@pytest.mark.parametrize("entry_dn", (DN_CONFIG, + DN_CONFIG_LDBM)) +def test_get_singlevalue(topo, entry_dn): + """Check that we can get an attribute value under different suffixes""" + + dse_ldif = DSEldif(topo.standalone) + + log.info("Get 'cn' attr from {}".format(entry_dn)) + attr_values = dse_ldif.get(entry_dn, "cn") + assert attr_values == ["config"] + + +def test_get_multivalue(topo): + """Check that we can get attribute values""" + + dse_ldif = DSEldif(topo.standalone) + + log.info("Get objectClass from {}".format(DN_CONFIG)) + attr_values = dse_ldif.get(DN_CONFIG, "objectClass") + assert len(attr_values) == 3 + assert "top" in attr_values + assert "extensibleObject" in attr_values + assert "nsslapdConfig" in attr_values + + +@pytest.mark.parametrize("fake_attr_value", ("fake value", + "fakevalue")) +def test_add(topo, fake_attr_value): + """Check that we can add an attribute to a given suffix""" + + dse_ldif = DSEldif(topo.standalone) + fake_attr = "fakeAttr" + + log.info("Add {} to {}".format(fake_attr, DN_CONFIG)) + dse_ldif.add(DN_CONFIG, fake_attr, fake_attr_value) + attr_values = dse_ldif.get(DN_CONFIG, fake_attr) + assert attr_values == [fake_attr_value] + + log.info("Clean up") + dse_ldif.delete(DN_CONFIG, fake_attr) + with pytest.raises(ValueError): + dse_ldif.get(DN_CONFIG, fake_attr) + + +def test_replace(topo): + """Check that we can replace an attribute to a given suffix""" + + dse_ldif = DSEldif(topo.standalone) + port_attr = "nsslapd-port" + port_value = "390" + + log.info("Get default value of {}".format(port_attr)) + default_value = dse_ldif.get(DN_CONFIG, port_attr)[0] + + log.info("Replace {} with {}".format(port_attr, port_value)) + dse_ldif.replace(DN_CONFIG, port_attr, port_value) + attr_values = dse_ldif.get(DN_CONFIG, port_attr) + assert attr_values == [port_value] + + log.info("Restore default value") + dse_ldif.replace(DN_CONFIG, port_attr, default_value) + + +def test_delete_singlevalue(topo): + """Check that we can delete an attribute from a given suffix""" + + dse_ldif = DSEldif(topo.standalone) + fake_attr = "fakeAttr" + fake_attr_values = ["fake1", "fake2", "fake3"] + + log.info("Add multivalued {} to {}".format(fake_attr, DN_CONFIG)) + for value in fake_attr_values: + dse_ldif.add(DN_CONFIG, fake_attr, value) + + log.info("Delete {}".format(fake_attr_values[0])) + dse_ldif.delete(DN_CONFIG, fake_attr, fake_attr_values[0]) + attr_values = dse_ldif.get(DN_CONFIG, fake_attr) + assert len(attr_values) == 2 + assert fake_attr_values[0] not in attr_values + assert fake_attr_values[1] in attr_values + assert fake_attr_values[2] in attr_values + + log.info("Clean up") + dse_ldif.delete(DN_CONFIG, fake_attr) + with pytest.raises(ValueError): + dse_ldif.get(DN_CONFIG, fake_attr) + + +def test_delete_multivalue(topo): + """Check that we can delete attributes from a given suffix""" + + dse_ldif = DSEldif(topo.standalone) + fake_attr = "fakeAttr" + fake_attr_values = ["fake1", "fake2", "fake3"] + + log.info("Add multivalued {} to {}".format(fake_attr, DN_CONFIG)) + for value in fake_attr_values: + dse_ldif.add(DN_CONFIG, fake_attr, value) + + log.info("Delete all values of {}".format(fake_attr)) + dse_ldif.delete(DN_CONFIG, fake_attr) + with pytest.raises(ValueError): + dse_ldif.get(DN_CONFIG, fake_attr) +