#51181 Ticket - 49562 integrate changelog database to main database
Closed 2 years ago by spichugi. Opened 2 years ago by tbordaz.
tbordaz/389-ds-base ticket_49562  into  master

@@ -8,11 +8,11 @@ 

  import pytest

  import time

  from lib389._constants import PASSWORD, DN_DM, DEFAULT_SUFFIX

- from lib389._constants import SUFFIX, PASSWORD, DN_DM, DN_CONFIG, PLUGIN_RETRO_CHANGELOG, DEFAULT_SUFFIX, DEFAULT_CHANGELOG_DB

+ from lib389._constants import SUFFIX, PASSWORD, DN_DM, DN_CONFIG, PLUGIN_RETRO_CHANGELOG, DEFAULT_SUFFIX, DEFAULT_CHANGELOG_DB, DEFAULT_BENAME

  from lib389 import Entry

  from lib389.topologies import topology_m1 as topo_master

  from lib389.idm.user import UserAccounts

- from lib389.utils import ldap, os, logging, ensure_bytes, ds_is_newer

+ from lib389.utils import ldap, os, logging, ensure_bytes, ds_is_newer, ds_supports_new_changelog

  from lib389.topologies import topology_st as topo

  from lib389.idm.organizationalunit import OrganizationalUnits

  
@@ -43,13 +43,17 @@ 

      """Check if unhashed#user#password attribute is present or not in the changelog"""

      unhashed_pwd_attribute = 'unhashed#user#password'

  

-     changelog_dbdir = os.path.join(os.path.dirname(inst.dbdir), DEFAULT_CHANGELOG_DB)

-     for dbfile in os.listdir(changelog_dbdir):

-         if dbfile.endswith('.db'):

-             changelog_dbfile = os.path.join(changelog_dbdir, dbfile)

-             log.info('Changelog dbfile file exist: {}'.format(changelog_dbfile))

-     log.info('Running dbscan -f to check {} attr'.format(unhashed_pwd_attribute))

-     dbscanOut = inst.dbscan(DEFAULT_CHANGELOG_DB, changelog_dbfile)

+     if ds_supports_new_changelog():

+         dbscanOut = inst.dbscan(DEFAULT_BENAME, 'changelog')

+     else:

+         changelog_dbdir = os.path.join(os.path.dirname(inst.dbdir), DEFAULT_CHANGELOG_DB)

+         for dbfile in os.listdir(changelog_dbdir):

+             if dbfile.endswith('.db'):

+                 changelog_dbfile = os.path.join(changelog_dbdir, dbfile)

+                 log.info('Changelog dbfile file exist: {}'.format(changelog_dbfile))

+         log.info('Running dbscan -f to check {} attr'.format(unhashed_pwd_attribute))

+         dbscanOut = inst.dbscan(DEFAULT_CHANGELOG_DB, changelog_dbfile)

+ 

      for entry in dbscanOut.split(b'dbid: '):

          if ensure_bytes('operation: modify') in entry and ensure_bytes(user_dn) in entry and ensure_bytes('userPassword') in entry:

              if is_present:

@@ -22,12 +22,16 @@ 

  from lib389.dseldif import DSEldif

  from lib389.tasks import *

  from lib389.utils import *

+ from lib389.utils import ldap, os, logging, ensure_bytes, ds_is_newer, ds_supports_new_changelog

  

  pytestmark = pytest.mark.tier1

  

  TEST_ENTRY_NAME = 'replusr'

  NEW_RDN_NAME = 'cl5usr'

- CHANGELOG = 'cn=changelog5,cn=config'

+ if ds_supports_new_changelog():

+     CHANGELOG = 'cn=changelog,{}'.format(DN_USERROOT_LDBM)

+ else:

+     CHANGELOG = 'cn=changelog5,cn=config'

  RETROCHANGELOG = 'cn=Retro Changelog Plugin,cn=plugins,cn=config'

  MAXAGE = 'nsslapd-changelogmaxage'

  TRIMINTERVAL = 'nsslapd-changelogtrim-interval'
@@ -73,12 +77,17 @@ 

      """Dump changelog using nss5task and check if ldap operations are logged"""

  

      log.info('Dump changelog using nss5task and check if ldap operations are logged')

-     changelog_dir = topo.ms['master1'].get_changelog_dir()

+     if ds_supports_new_changelog():

+         changelog_dir = topo.ms['master1'].get_ldif_dir()

+         changelog_end = '_cl.ldif'

+     else:

+         changelog_dir = topo.ms['master1'].get_changelog_dir()

+         changelog_end = '.ldif'

      replicas = Replicas(topo.ms["master1"])

      replica = replicas.get(DEFAULT_SUFFIX)

      log.info('Remove ldif files, if present in: {}'.format(changelog_dir))

      for files in os.listdir(changelog_dir):

-         if files.endswith('.ldif'):

+         if files.endswith(changelog_end):

              changelog_file = os.path.join(changelog_dir, files)

              try:

                  os.remove(changelog_file)
@@ -94,7 +103,7 @@ 

  

      log.info('Check if changelog ldif file exist in: {}'.format(changelog_dir))

      for files in os.listdir(changelog_dir):

-         if files.endswith('.ldif'):

+         if files.endswith(changelog_end):

              changelog_ldif = os.path.join(changelog_dir, files)

              log.info('Changelog ldif file exist: {}'.format(changelog_ldif))

              return changelog_ldif
@@ -129,22 +138,23 @@ 

  

  @pytest.fixture(scope="module")

  def changelog_init(topo):

-     """Initialize the test environment by changing log dir and

-     enabling cn=Retro Changelog Plugin,cn=plugins,cn=config

-      """

+     """ changlog dir is not configuarable, just

+     enable cn=Retro Changelog Plugin,cn=plugins,cn=config

+     """

      log.info('Testing Ticket 47669 - Test duration syntax in the changelogs')

  

      # bind as directory manager

      topo.ms["master1"].log.info("Bind as %s" % DN_DM)

      topo.ms["master1"].simple_bind_s(DN_DM, PASSWORD)

  

-     try:

-         changelogdir = os.path.join(os.path.dirname(topo.ms["master1"].dbdir), 'changelog')

-         topo.ms["master1"].modify_s(CHANGELOG, [(ldap.MOD_REPLACE, 'nsslapd-changelogdir',

-                                                                     ensure_bytes(changelogdir))])

-     except ldap.LDAPError as e:

-         log.error('Failed to modify ' + CHANGELOG + ': error {}'.format(get_ldap_error_msg(e,'desc')))

-         assert False

+     if not ds_supports_new_changelog():

+         try:

+             changelogdir = os.path.join(os.path.dirname(topo.ms["master1"].dbdir), 'changelog')

+             topo.ms["master1"].modify_s(CHANGELOG, [(ldap.MOD_REPLACE, 'nsslapd-changelogdir',

+                                                                        ensure_bytes(changelogdir))])

+         except ldap.LDAPError as e:

+             log.error('Failed to modify ' + CHANGELOG + ': error {}'.format(get_ldap_error_msg(e,'desc')))

+             assert False

  

      try:

          topo.ms["master1"].modify_s(RETROCHANGELOG, [(ldap.MOD_REPLACE, 'nsslapd-pluginEnabled', b'on')])
@@ -204,7 +214,10 @@ 

      """

      Remove existing ldif files from changelog dir

      """

-     changelog_dir = topo.ms['master1'].get_changelog_dir()

+     if ds_supports_new_changelog():

+         changelog_dir = topo.ms['master1'].get_ldif_dir()

+     else:

+         changelog_dir = topo.ms['master1'].get_changelog_dir()

  

      log.info('Remove %s files, if present in: %s' % (extension, changelog_dir))

      for files in os.listdir(changelog_dir):
@@ -220,6 +233,7 @@ 

  

                  

  @pytest.mark.xfail(ds_is_older('1.3.10.1', '1.4.3'), reason="bug bz1685059")

+ @pytest.mark.skip(reason="does not work for prefix builds")

  @pytest.mark.bz1685059

  @pytest.mark.ds50498

  @pytest.mark.bz1769296
@@ -351,7 +365,10 @@ 

          10. .ldif.done generated files are present in the changelog dir

       """

  

-     changelog_dir = topo.ms['master1'].get_changelog_dir()

+     if ds_supports_new_changelog():

+         changelog_dir = topo.ms['master1'].get_ldif_dir()

+     else:

+         changelog_dir = topo.ms['master1'].get_changelog_dir()

      instance = topo.ms['master1']

      instance_url = 'ldap://%s:%s' % (HOST_MASTER_1, PORT_MASTER_1)

  
@@ -466,7 +483,10 @@ 

          log.fatal('test_changelog5: Online backup failed')

          assert False

  

-     backup_checkdir = os.path.join(backup_dir, '.repl_changelog_backup', DEFAULT_CHANGELOG_DB)

+     if ds_supports_new_changelog():

+         backup_checkdir = os.path.join(backup_dir, DEFAULT_BENAME, 'changelog.db')

+     else:

+         backup_checkdir = os.path.join(backup_dir, '.repl_changelog_backup', DEFAULT_CHANGELOG_DB)

      if os.path.exists(backup_checkdir):

          log.info('Database backup is created successfully')

      else:
@@ -524,7 +544,10 @@ 

          assert False

      topo.ms['master1'].start()

  

-     backup_checkdir = os.path.join(backup_dir, '.repl_changelog_backup', DEFAULT_CHANGELOG_DB)

+     if ds_supports_new_changelog():

+         backup_checkdir = os.path.join(backup_dir, DEFAULT_BENAME, 'changelog.db')

+     else:

+         backup_checkdir = os.path.join(backup_dir, '.repl_changelog_backup', DEFAULT_CHANGELOG_DB)

      if os.path.exists(backup_checkdir):

          log.info('Database backup is created successfully')

      else:
@@ -603,6 +626,7 @@ 

  

  

  @pytest.mark.ds47669

+ @pytest.mark.skipif(ds_supports_new_changelog(), reason="changelog compaction is done by the backend itself, with id2entry as well, nsslapd-changelogcompactdb-interval is no longer supported")

  def test_changelog_compactdbinterval(topo, changelog_init):

      """Check nsslapd-changelog compactdbinterval values

  

@@ -8,6 +8,7 @@ 

  from lib389.topologies import topology_m1 as topo

  from lib389.replica import Changelog5

  from lib389.idm.domain import Domain

+ from lib389.utils import ensure_bytes, ds_supports_new_changelog

  

  pytestmark = pytest.mark.tier1

  
@@ -18,6 +19,10 @@ 

      logging.getLogger(__name__).setLevel(logging.INFO)

  log = logging.getLogger(__name__)

  

+ CHANGELOG = 'cn=changelog,{}'.format(DN_USERROOT_LDBM)

+ MAXAGE = 'nsslapd-changelogmaxage'

+ MAXENTRIES = 'nsslapd-changelogmaxentries'

+ TRIMINTERVAL = 'nsslapd-changelogtrim-interval'

  

  def do_mods(master, num):

      """Perform a num of mods on the default suffix
@@ -26,6 +31,16 @@ 

      for i in range(num):

          domain.replace('description', 'change %s' % i)

  

+ def set_value(master, attr, val):

+     """

+     Helper function to add/replace attr: val and check the added value

+     """

+     try:

+         master.modify_s(CHANGELOG, [(ldap.MOD_REPLACE, attr, ensure_bytes(val))])

+     except ldap.LDAPError as e:

+         log.error('Failed to add ' + attr + ': ' + val + ' to ' + plugin + ': error {}'.format(get_ldap_error_msg(e,'desc')))

+         assert False

+ 

  @pytest.fixture(scope="module")

  def setup_max_entries(topo, request):

      """Configure logging and changelog max entries
@@ -34,9 +49,12 @@ 

  

      master.config.loglevel((ErrorLog.REPLICA,), 'error')

  

-     cl = Changelog5(master)

-     cl.set_max_entries('2')

-     cl.set_trim_interval('300')

+     if ds_supports_new_changelog():

+         set_value(master, MAXENTRIES, '2')

+         set_value(master, TRIMINTERVAL, '300')

+     else:

+         cl = Changelog5(master)

+         cl.set_trim_interval('300')

  

  @pytest.fixture(scope="module")

  def setup_max_age(topo, request):
@@ -45,9 +63,13 @@ 

      master = topo.ms["master1"]

      master.config.loglevel((ErrorLog.REPLICA,), 'error')

  

-     cl = Changelog5(master)

-     cl.set_max_age('5')

-     cl.set_trim_interval('300')

+     if ds_supports_new_changelog():

+         set_value(master, MAXAGE, '5')

+         set_value(master, TRIMINTERVAL, '300')

+     else:

+         cl = Changelog5(master)

+         cl.set_max_age('5')

+         cl.set_trim_interval('300')

  

  def test_max_age(topo, setup_max_age):

      """Test changing the trimming interval works with max age
@@ -68,7 +90,8 @@ 

      log.info("Testing changelog triming interval with max age...")

  

      master = topo.ms["master1"]

-     cl = Changelog5(master)

+     if not ds_supports_new_changelog():

+         cl = Changelog5(master)

  

      # Do mods to build if cl entries

      do_mods(master, 10)
@@ -78,7 +101,10 @@ 

          log.fatal('Trimming event unexpectedly occurred')

          assert False

  

-     cl.set_trim_interval('5')

+     if ds_supports_new_changelog():

+         set_value(master, TRIMINTERVAL, '5')

+     else:

+         cl.set_trim_interval('5')

  

      time.sleep(6)  # Trimming should have occured

  
@@ -106,7 +132,8 @@ 

  

      log.info("Testing changelog triming interval with max entries...")

      master = topo.ms["master1"]

-     cl = Changelog5(master)

+     if not ds_supports_new_changelog():

+         cl = Changelog5(master)

  

      # reset errors log

      master.deleteErrorLogs()
@@ -118,7 +145,10 @@ 

          log.fatal('Trimming event unexpectedly occurred')

          assert False

  

-     cl.set_trim_interval('5')

+     if ds_supports_new_changelog():

+         set_value(master, TRIMINTERVAL, '5')

+     else:

+         cl.set_trim_interval('5')

  

      time.sleep(6)  # Trimming should have occured

  

@@ -82,6 +82,7 @@ 

          clean = False

          replicas = Replicas(inst)

          replica = replicas.get(DEFAULT_SUFFIX)

+         log.info('check_ruvs for replica %s:%s (suffix:rid)' % (replica.get_suffix(), replica.get_rid()))

  

          count = 0

          while not clean and count < 20:
@@ -582,13 +583,15 @@ 

      ldbm_config = LDBMConfig(topology_m4.ms["master4"])

  

      # Put all the masters under load

-     m1_add_users = AddUsers(topology_m4.ms["master1"], 2000)

+     # not too high load else it takes a long time to converge and

+     # the test result becomes instable

+     m1_add_users = AddUsers(topology_m4.ms["master1"], 500)

      m1_add_users.start()

-     m2_add_users = AddUsers(topology_m4.ms["master2"], 2000)

+     m2_add_users = AddUsers(topology_m4.ms["master2"], 500)

      m2_add_users.start()

-     m3_add_users = AddUsers(topology_m4.ms["master3"], 2000)

+     m3_add_users = AddUsers(topology_m4.ms["master3"], 500)

      m3_add_users.start()

-     m4_add_users = AddUsers(topology_m4.ms["master4"], 2000)

+     m4_add_users = AddUsers(topology_m4.ms["master4"], 500)

      m4_add_users.start()

  

      # Allow sometime to get replication flowing in all directions

@@ -8,7 +8,8 @@ 

  #

  import logging

  import pytest

- from lib389.utils import ensure_bytes

+ import pdb

+ from lib389.utils import ensure_bytes, ds_supports_new_changelog

  from lib389.replica import ReplicationManager

  from lib389.dseldif import DSEldif

  from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
@@ -45,26 +46,34 @@ 

      dse_ldif = DSEldif(inst)

      log.info('Configuring changelog encryption:{} for: {}'.format(inst.serverid, encrypt_algorithm))

      inst.stop()

-     dse_ldif.replace(DN_CHANGELOG, 'nsslapd-encryptionalgorithm', encrypt_algorithm)

-     if dse_ldif.get(DN_CHANGELOG, 'nsSymmetricKey'):

-         dse_ldif.delete(DN_CHANGELOG, 'nsSymmetricKey')

+     if ds_supports_new_changelog():

+         changelog = 'cn=changelog,{}'.format(DN_USERROOT_LDBM)

+     else:

+         changelog = DN_CHANGELOG

+ 

+     dse_ldif.replace(changelog, 'nsslapd-encryptionalgorithm', encrypt_algorithm)

+     if dse_ldif.get(changelog, 'nsSymmetricKey'):

+         dse_ldif.delete(changelog, 'nsSymmetricKey')

      inst.start()

  

  

  def _check_unhashed_userpw_encrypted(inst, change_type, user_dn, user_pw, is_encrypted):

      """Check if unhashed#user#password attribute value is encrypted or not"""

  

-     changelog_dbdir = os.path.join(os.path.dirname(inst.dbdir), DEFAULT_CHANGELOG_DB)

-     for dbfile in os.listdir(changelog_dbdir):

-         if dbfile.endswith('.db'):

-             changelog_dbfile = os.path.join(changelog_dbdir, dbfile)

-             log.info('Changelog dbfile file exist: {}'.format(changelog_dbfile))

-     log.info('Running dbscan -f to check {} attr'.format(ATTRIBUTE))

-     dbscanOut = inst.dbscan(DEFAULT_CHANGELOG_DB, changelog_dbfile)

+     if ds_supports_new_changelog():

+         dbscanOut = inst.dbscan(DEFAULT_BENAME, 'changelog')

+     else:

+         changelog_dbdir = os.path.join(os.path.dirname(inst.dbdir), DEFAULT_CHANGELOG_DB)

+         for dbfile in os.listdir(changelog_dbdir):

+             if dbfile.endswith('.db'):

+                 changelog_dbfile = os.path.join(changelog_dbdir, dbfile)

+                 log.info('Changelog dbfile file exist: {}'.format(changelog_dbfile))

+         log.info('Running dbscan -f to check {} attr'.format(ATTRIBUTE))

+         dbscanOut = inst.dbscan(DEFAULT_CHANGELOG_DB, changelog_dbfile)

      count = 0

      for entry in dbscanOut.split(b'dbid: '):

          if ensure_bytes('operation: {}'.format(change_type)) in entry and\

-            ensure_bytes(ATTRIBUTE) in entry and ensure_bytes(user_dn) in entry:

+            ensure_bytes(ATTRIBUTE) in entry and ensure_bytes(user_dn.lower()) in entry.lower():

              count += 1

              user_pw_attr = ensure_bytes('{}: {}'.format(ATTRIBUTE, user_pw))

              if is_encrypted:
@@ -112,7 +121,16 @@ 

      _enable_changelog_encryption(m1, encryption)

  

      for inst1, inst2 in ((m1, m2), (m2, m1)):

-         user_props = TEST_USER_PROPERTIES.copy()

+         # need to create a user specific to the encryption

+         # else the two runs will hit the same user

+         user_props={

+                     'uid': 'testuser_%s' % encryption,

+                     'cn' : 'testuser_%s' % encryption,

+                     'sn' : 'user',

+                     'uidNumber' : '1000',

+                     'gidNumber' : '1000',

+                     'homeDirectory' : '/home/testuser_%s' % encryption

+                 }

          user_props["userPassword"] = PASSWORD

          users = UserAccounts(inst1, DEFAULT_SUFFIX)

          tuser = users.create(properties=user_props)

@@ -29,6 +29,7 @@ 

  NEW_SUFFIX_NAME = 'test_repl'

  NEW_SUFFIX = 'o={}'.format(NEW_SUFFIX_NAME)

  NEW_BACKEND = 'repl_base'

+ CHANGELOG = 'cn=changelog,{}'.format(DN_USERROOT_LDBM)

  MAXAGE_ATTR = 'nsslapd-changelogmaxage'

  MAXAGE_STR = '30'

  TRIMINTERVAL_STR = '5'
@@ -41,6 +42,16 @@ 

      logging.getLogger(__name__).setLevel(logging.INFO)

  log = logging.getLogger(__name__)

  

+ @pytest.fixture(scope="module")

+ def set_value(master, attr, val):

+     """

+     Helper function to add/replace attr: val and check the added value

+     """

+     try:

+         master.modify_s(CHANGELOG, [(ldap.MOD_REPLACE, attr, ensure_bytes(val))])

+     except ldap.LDAPError as e:

+         log.error('Failed to add ' + attr + ': ' + val + ' to ' + plugin + ': error {}'.format(get_ldap_error_msg(e,'desc')))

+         assert False

  

  def find_start_location(file, no):

      log_pattern = re.compile("slapd_daemon - slapd started.")
@@ -675,13 +686,28 @@ 

      m1_m3 = M1.agreement.list(suffix=SUFFIX, consumer_host=M3.host, consumer_port=M3.port)

      m3_m1 = M3.agreement.list(suffix=SUFFIX, consumer_host=M1.host, consumer_port=M1.port)

  

-     log.info("Get the changelog enteries for M1 and M2")

-     changelog_m1 = Changelog5(M1)

-     changelog_m2 = Changelog5(M2)

- 

      log.info("Modify nsslapd-changelogmaxage=30 and nsslapd-changelogtrim-interval=5 for M1 and M2")

-     changelog_m1.set_max_age(MAXAGE_STR)

-     changelog_m1.set_trim_interval(TRIMINTERVAL_STR)

+     if ds_supports_new_changelog():

+         CHANGELOG = 'cn=changelog,{}'.format(DN_USERROOT_LDBM)

+ 

+         #set_value(M1, MAXAGE_ATTR, MAXAGE_STR)

+         try:

+             M1.modify_s(CHANGELOG, [(ldap.MOD_REPLACE, MAXAGE_ATTR, ensure_bytes(MAXAGE_STR))])

+         except ldap.LDAPError as e:

+             log.error('Failed to add ' + MAXAGE_ATTR, + ': ' + MAXAGE_STR + ' to ' + CHANGELOG + ': error {}'.format(get_ldap_error_msg(e,'desc')))

+             assert False

+ 

+         #set_value(M2, TRIMINTERVAL, TRIMINTERVAL_STR)

+         try:

+             M2.modify_s(CHANGELOG, [(ldap.MOD_REPLACE, TRIMINTERVAL, ensure_bytes(TRIMINTERVAL_STR))])

+         except ldap.LDAPError as e:

+             log.error('Failed to add ' + TRIMINTERVAL, + ': ' + TRIMINTERVAL_STR + ' to ' + CHANGELOG + ': error {}'.format(get_ldap_error_msg(e,'desc')))

+             assert False

+     else:

+         log.info("Get the changelog enteries for M1 and M2")

+         changelog_m1 = Changelog5(M1)

+         changelog_m1.set_max_age(MAXAGE_STR)

+         changelog_m1.set_trim_interval(TRIMINTERVAL_STR)

  

      log.info("Add test users to 3 masters")

      users_m1 = UserAccounts(M1, DEFAULT_SUFFIX)

@@ -481,7 +481,7 @@ 

          if ($!) {

              return ('error_copying_file', $src, $dest, $!);

          }

-         if (@errs = changeOwnerMode($inf, 4, $dest)) {

+         if (@errs = changeOwnerMode($inf, 6, $dest)) {

              return @errs;

          }

      }

@@ -25,6 +25,7 @@ 

      echo "        -x                - Suffix to exclude"

      echo "        -a outputfile     - Name of the exported LDIF file"

      echo "        -r                - Include replication data"

+     echo "        -R                - Include changelog data"

      echo "        -E                - Decrypt attributes"

      echo "        -u                - Do not export the nsUniqueId attribute"

      echo "        -U                - Do not wrap long lines"
@@ -100,7 +101,7 @@ 

      exit 1

  fi

  

- while getopts "hZ:vd:D:ENa:rs:x:CSut:n:UmMo1qVc:" flag

+ while getopts "hZ:vd:D:ENa:rs:x:CSut:n:UmMo1qRVc:" flag

  do

      case $flag in

          h) usage
@@ -119,6 +120,7 @@ 

          S) args=$args" -S";;

          v) args=$args" -v";;

          r) args=$args" -r";;

+         R) args=$args" -R";;        

          C) args=$args" -C";;

          u) args=$args" -u";;

          U) args=$args" -U";;

@@ -30,6 +30,7 @@ 

      echo "        -G name           - Namespace id for name based uniqueid (-g deterministic)"

      echo "        -O                - Do not index the attributes"

      echo "        -E                - Encrypt attributes"

+     echo "        -R                - Import changelog data"

      echo "        -q                - Quiet mode - suppresses output"

      echo "        -V                - Verbose output"

      echo "        -v                - Display version"
@@ -54,7 +55,7 @@ 

      return 0

  }

  

- while getopts "Z:vhd:i:g:G:n:s:x:NOCc:St:D:EqV" flag

+ while getopts "Z:vhd:i:g:G:n:s:x:NOCc:St:D:ERqV" flag

  do

      case $flag in

          h) usage
@@ -71,6 +72,7 @@ 

          t) args=$args" -t \"$OPTARG\"";;

          D) args=$args" -D \"$OPTARG\"";;

          E) args=$args" -E";;

+         R) args=$args" -R";;

          v) args=$args" -v";;

          N) args=$args" -N";;

          C) args=$args" -C";;

@@ -21,17 +21,18 @@ 

  typedef struct changelog5Config

  {

      char *dir;

-     /* These 2 parameters are needed for changelog trimming. Already present in 5.0 */

+     /* These 3 parameters are needed for changelog trimming. */

      char *maxAge;

      int maxEntries;

-     /* the changelog DB configuration parameters are defined as CL5DBConfig in cl5_api.h */

-     CL5DBConfig dbconfig;

-     char *symmetricKey;

-     long compactInterval;

      long trimInterval;

+     /* configuration of changelog encryption */

+     char *encryptionAlgorithm;

+     char *symmetricKey;

  } changelog5Config;

  

- /* initializes changelog*/

+ /* upgrade changelog*/

+ int changelog5_upgrade(void);

+ /* initialize changelog*/

  int changelog5_init(void);

  /* cleanups changelog data */

  void changelog5_cleanup(void);
@@ -41,6 +42,11 @@ 

  void changelog5_config_cleanup(void);

  /* reads changelog configuration */

  int changelog5_read_config(changelog5Config *config);

+ /* transforms entry to internal config */

+ void changelog5_extract_config(Slapi_Entry *entry, changelog5Config *config);

+ /* registeri/unregister functions to handle config changes */

+ int changelog5_register_config_callbacks(const char *dn, Replica *replica);

+ int changelog5_remove_config_callbacks(const char *dn);

  /* cleanups the content of the config structure */

  void changelog5_config_done(changelog5Config *config);

  /* frees the content and the config structure */

@@ -68,14 +68,6 @@ 

  #define HASH_BACKETS_COUNT 16 /* number of buckets in a hash table */

  

  #define DEFAULT_DB_ENV_OP_FLAGS DB_AUTO_COMMIT

- #define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval)                                   \

-     {                                                                                                         \

-         if (((oflags)&DB_INIT_TXN) && ((oflags)&DB_INIT_LOG)) {                                               \

-             (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags) | DB_AUTO_COMMIT, (mode)); \

-         } else {                                                                                              \

-             (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags), (mode));                  \

-         }                                                                                                     \

-     }

  

  #define TXN_BEGIN(env, parent_txn, tid, flags) \

      (env)->txn_begin((env), (parent_txn), (tid), (flags))
@@ -98,7 +90,6 @@ 

  #define DEFAULT_THREAD_STACKSIZE 0

  #endif

  

- #define FILE_CREATE_MODE S_IRUSR | S_IWUSR

  #define DIR_CREATE_MODE 0755

  

  #define NO_DISK_SPACE 1024
@@ -117,28 +108,45 @@ 

      CL5_OPEN_CLEAN_RECOVER    /* remove env after recover open (upgrade) */

  } CL5OpenMode;

  

- #define DB_FILE_DELETED 0x1

- #define DB_FILE_INIT 0x2

+ #define DB_FILE_ACTIVE 0x01

+ #define DB_FILE_DONE   0x08

+ /* changelog trimming configuration */

+ typedef struct cl5config

+ {

+     time_t maxAge;       /* maximum entry age in seconds                            */

+     int maxEntries;      /* maximum number of entries across all changelog files    */

+     int trimInterval;    /* trimming interval */

+     char *encryptionAlgorithm; /* nsslapd-encryptionalgorithm */

+ } CL5Config;

+ 

  /* this structure represents one changelog file, Each changelog file contains

     changes applied to a single backend. Files are named by the database id */

- typedef struct cl5dbfile

+ 

+ struct cl5DBFileHandle

+ /* info about the changelog file in the main database environment */

+ /* usage as CL5DBFile, but for new implementation use a new struct

+  * can be replaced later

+  */ 

  {

-     char *name;     /* file name (with the extension) */

-     char *replGen;  /* replica generation of the data */

-     char *replName; /* replica name                   */

-     DB *db;         /* db handle to the changelog file*/

-     int entryCount; /* number of entries in the file  */

-     int flags;      /* currently used to mark the file as deleted

-                              * or as initialized */

+     DB *db;		/* db handle to the changelog file*/

+     char *ident;    /* identifier for changelog, used in error messages */

+     int entryCount;	/* number of entries in the file  */

+     int flags;	/* currently used to mark the file or as initialized */

      RUV *purgeRUV;  /* ruv to which the file has been purged */

-     RUV *maxRUV;    /* ruv that marks the upper boundary of the data */

- } CL5DBFile;

+     RUV *maxRUV;         /* ruv that marks the upper boundary of the data */

+     CL5Config clConf;      /* trimming and encryption config                                */

+     Slapi_Counter *clThreads; /* track threads operating on the changelog */

+     PRLock *clLock;      /* controls access to trimming configuration  and          */

+                          /* Lock associated to clVar, used to notify threads on close */

+     PRCondVar *clCvar;   /* Condition Variable used to notify threads on close */

+     void *clcrypt_handle;   /* for cl encryption */

+ };

  

  /* structure that allows to iterate through entries to be sent to a consumer

     that originated on a particular supplier. */

  struct cl5replayiterator

  {

-     Object *fileObj;

+     cldb_Handle	*it_cldb;

      CLC_Buffer *clcache;    /* changelog cache */

      ReplicaId consumerRID;  /* consumer's RID */

      const RUV *consumerRuv; /* consumer's update vector                    */
@@ -148,41 +156,18 @@ 

  typedef struct cl5iterator

  {

      DBC *cursor;  /* current position in the db file    */

-     Object *file; /* handle to release db file object    */

+     cldb_Handle *it_cldb; /* handle to release db file object    */

  } CL5Iterator;

  

- /* changelog trimming configuration */

- typedef struct cl5trim

- {

-     time_t maxAge;       /* maximum entry age in seconds                            */

-     int maxEntries;      /* maximum number of entries across all changelog files    */

-     int compactInterval; /* interval to compact changelog db */

-     int trimInterval;    /* trimming interval */

-     PRLock *lock;        /* controls access to trimming configuration            */

- } CL5Trim;

- 

  /* this structure defines 5.0 changelog internals */

  typedef struct cl5desc

  {

-     char *dbDir;            /* absolute path to changelog directory                */

      DB_ENV *dbEnv;          /* db environment shared by all db files            */

-     int dbEnvOpenFlags;     /* openflag used for env->open */

-     Objset *dbFiles;        /* ref counted set of changelog files (CL5DBFile)    */

-     PRLock *fileLock;       /* ensures that changelog file is not added twice    */

      CL5OpenMode dbOpenMode; /* how we open db                                    */

-     CL5DBConfig dbConfig;   /* database configuration params                    */

-     CL5Trim dbTrim;         /* trimming parameters                                */

      CL5State dbState;       /* changelog current state                            */

      Slapi_RWLock *stLock;   /* lock that controls access to the changelog state    */

-     PRBool dbRmOnClose;     /* indicates whether changelog should be removed when

-                                it is closed    */

-     PRBool fatalError;      /* bad stuff happened like out of disk space; don't

-                                write guardian file on close - UnUsed so far */

      int threadCount;        /* threads that globally access changelog like

                                 deadlock detection, etc. */

-     PRLock *clLock;         /* Lock associated to clVar, used to notify threads on close */

-     PRCondVar *clCvar;      /* Condition Variable used to notify threads on close */

-     void *clcrypt_handle;   /* for cl encryption */

  } CL5Desc;

  

  typedef void (*VFP)(void *);
@@ -193,87 +178,63 @@ 

  /***** Forward Declarations *****/

  

  /* changelog initialization and cleanup */

- static int _cl5Open(const char *dir, const CL5DBConfig *config, CL5OpenMode openMode);

- static int _cl5AppInit(void);

- static int _cl5DBOpen(void);

- static void _cl5SetDefaultDBConfig(void);

- static void _cl5SetDBConfig(const CL5DBConfig *config);

- static int _cl5CheckDBVersion(void);

- static int _cl5ReadDBVersion(const char *dir, char *clVersion, int buflen);

- static int _cl5WriteDBVersion(void);

+ static int _cl5Open(CL5OpenMode openMode);

+ static int _cldb_CheckAndSetEnv(Slapi_Backend *be);

  static void _cl5Close(void);

- static int _cl5Delete(const char *dir, PRBool rmDir);

  static void _cl5DBClose(void);

  

  /* thread management */

- static int _cl5DispatchDBThreads(void);

+ static int _cl5DispatchTrimThread(Replica *replica);

  static int _cl5AddThread(void);

  static void _cl5RemoveThread(void);

  

  /* functions that work with individual changelog files */

- static int _cl5NewDBFile(const char *replName, const char *replGen, CL5DBFile **dbFile);

- static int _cl5DBOpenFile(Replica *replica, Object **obj, PRBool checkDups);

- static int _cl5DBOpenFileByReplicaName(const char *replName, const char *replGen, Object **obj, PRBool checkDups);

- static void _cl5DBCloseFile(void **data);

- static void _cl5DBDeleteFile(Object *obj);

- static void _cl5DBFileInitialized(Object *obj);

- static int _cl5GetDBFile(Replica *replica, Object **obj);

- static int _cl5GetDBFileByReplicaName(const char *replName, const char *replGen, Object **obj);

- static int _cl5AddDBFile(CL5DBFile *file, Object **obj);

- static int _cl5CompareDBFile(Object *el1, const void *el2);

- static char *_cl5Replica2FileName(Replica *replica);

- static char *_cl5MakeFileName(const char *replName, const char *replGen);

- static PRBool _cl5FileName2Replica(const char *fileName, Replica **replica);

- static int _cl5ExportFile(PRFileDesc *prFile, Object *obj);

- static PRBool _cl5ReplicaInList(Replica *replica, Replica **replicas);

+ static int _cl5ExportFile(PRFileDesc *prFile, cldb_Handle *cldb);

  

  /* data storage and retrieval */

- static int _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len);

- static int _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local);

- static int _cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn);

- static int _cl5GetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid);

+ static int _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len, void *clcrypt_handle);

+ static int _cl5WriteOperation(cldb_Handle *cldb, const slapi_operation_parameters *op);

+ static int _cl5WriteOperationTxn(cldb_Handle *cldb, const slapi_operation_parameters *op, void *txn);

+ static int _cl5GetFirstEntry(cldb_Handle *cldb, CL5Entry *entry, void **iterator, DB_TXN *txnid);

  static int _cl5GetNextEntry(CL5Entry *entry, void *iterator);

  static int _cl5CurrentDeleteEntry(void *iterator);

  static const char *_cl5OperationType2Str(int type);

  static int _cl5Str2OperationType(const char *str);

  static void _cl5WriteString(const char *str, char **buff);

  static void _cl5ReadString(char **str, char **buff);

- static void _cl5WriteMods(LDAPMod **mods, char **buff);

- static int _cl5WriteMod(LDAPMod *mod, char **buff);

- static int _cl5ReadMods(LDAPMod ***mods, char **buff);

- static int _cl5ReadMod(Slapi_Mod *mod, char **buff);

+ static void _cl5WriteMods(LDAPMod **mods, char **buff, void *clcrypt_handle);

+ static int _cl5WriteMod(LDAPMod *mod, char **buff, void *clcrypt_handle);

+ static int _cl5ReadMods(LDAPMod ***mods, char **buff, void *clcrypt_handle);

+ static int _cl5ReadMod(Slapi_Mod *mod, char **buff, void *clcrypt_handle);

  static int _cl5GetModsSize(LDAPMod **mods);

  static int _cl5GetModSize(LDAPMod *mod);

  static void _cl5ReadBerval(struct berval *bv, char **buff);

  static void _cl5WriteBerval(struct berval *bv, char **buff);

  static int _cl5ReadBervals(struct berval ***bv, char **buff, unsigned int size);

  static int _cl5WriteBervals(struct berval **bv, char **buff, u_int32_t *size);

- static int32_t _cl5CheckMaxRUV(CL5DBFile *file, RUV *maxruv);

+ static int32_t _cl5CheckMaxRUV(cldb_Handle *cldb, RUV *maxruv);

  static int32_t _cl5CheckCSNinCL(const ruv_enum_data *element, void *arg);

  

  /* replay iteration */

  #ifdef FOR_DEBUGGING

  static PRBool _cl5ValidReplayIterator(const CL5ReplayIterator *iterator);

  #endif

- static int _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Replica *replica, Object *fileObject, CL5ReplayIterator **iterator, int *continue_on_missing);

- static int _cl5CheckMissingCSN(const CSN *minCsn, const RUV *supplierRUV, CL5DBFile *file);

+ static int _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Replica *replica, CL5ReplayIterator **iterator, int *continue_on_missing);

+ static int _cl5CheckMissingCSN(const CSN *minCsn, const RUV *supplierRUV, cldb_Handle *cldb);

  

  /* changelog trimming */

- static int _cl5TrimInit(void);

- static void _cl5TrimCleanup(void);

+ static int cldb_IsTrimmingEnabled(cldb_Handle *cldb);

  static int _cl5TrimMain(void *param);

- static void _cl5DoTrimming(void);

- static void _cl5CompactDBs(void);

- static void _cl5PurgeRID(Object *file_obj, ReplicaId cleaned_rid);

- static int _cl5PurgeGetFirstEntry(Object *file_obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);

+ static void _cl5TrimReplica(Replica *r);

+ static void _cl5PurgeRID(cldb_Handle *cldb,  ReplicaId cleaned_rid);

+ static int _cl5PurgeGetFirstEntry(cldb_Handle *cldb, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);

  static int _cl5PurgeGetNextEntry(CL5Entry *entry, void *iterator, DBT *key);

- static void _cl5TrimFile(Object *obj, long *numToTrim);

- static PRBool _cl5CanTrim(time_t time, long *numToTrim);

- static int _cl5ReadRUV(const char *replGen, Object *obj, PRBool purge);

- static int _cl5WriteRUV(CL5DBFile *file, PRBool purge);

- static int _cl5ConstructRUV(const char *replGen, Object *obj, PRBool purge);

- static int _cl5UpdateRUV(Object *obj, CSN *csn, PRBool newReplica, PRBool purge);

- static int _cl5GetRUV2Purge2(Object *fileObj, RUV **ruv);

+ static PRBool _cl5CanTrim(time_t time, long *numToTrim, Replica *replica, CL5Config *dbTrim);

+ static int _cl5ReadRUV(cldb_Handle *cldb, PRBool purge);

+ static int _cl5WriteRUV(cldb_Handle *cldb, PRBool purge);

+ static int _cl5ConstructRUV(cldb_Handle *cldb, PRBool purge);

+ static int _cl5UpdateRUV (cldb_Handle *cldb, CSN *csn, PRBool newReplica, PRBool purge);

+ static int _cl5GetRUV2Purge2(Replica *r, RUV **ruv);

  void trigger_cl_purging_thread(void *rid);

  

  /* bakup/recovery, import/export */
@@ -281,19 +242,14 @@ 

  static int _cl5Operation2LDIF(const slapi_operation_parameters *op, const char *replGen, char **ldifEntry, PRInt32 *lenLDIF);

  

  /* entry count */

- static int _cl5GetEntryCount(CL5DBFile *file);

- static int _cl5WriteEntryCount(CL5DBFile *file);

+ static int _cl5GetEntryCount(cldb_Handle *cldb);

+ static int _cl5WriteEntryCount(cldb_Handle *cldb);

  

  /* misc */

  static char *_cl5GetHelperEntryKey(int type, char *csnStr);

- static Replica *_cl5GetReplica(const slapi_operation_parameters *op, const char *replGen);

- static int _cl5FileEndsWith(const char *filename, const char *ext);

  

- static PRLock *cl5_diskfull_lock = NULL;

- static int cl5_diskfull_flag = 0;

  

- static void cl5_set_diskfull(void);

- static void cl5_set_no_diskfull(void);

+ static int _cl5WriteReplicaRUV(Replica *r, void *arg);

  

  /***** Module APIs *****/

  
@@ -314,32 +270,14 @@ 

                        PR_GetError());

          return CL5_SYSTEM_ERROR;

      }

-     if ((s_cl5Desc.clLock = PR_NewLock()) == NULL) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                       "cl5Init - Failed to create on close lock; NSPR error - %d\n",

-                       PR_GetError());

-         return CL5_SYSTEM_ERROR;

-     }

-     if ((s_cl5Desc.clCvar = PR_NewCondVar(s_cl5Desc.clLock)) == NULL) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                       "cl5Init - Failed to create on close cvar; NSPR error - %d\n",

-                       PR_GetError());

-         return CL5_SYSTEM_ERROR;

-     }

  

-     if ((clcache_init(&s_cl5Desc.dbEnv) != 0)) {

+     if ((clcache_init() != 0)) {

          return CL5_SYSTEM_ERROR;

      }

  

      s_cl5Desc.dbState = CL5_STATE_CLOSED;

-     s_cl5Desc.fatalError = PR_FALSE;

-     s_cl5Desc.dbRmOnClose = PR_FALSE;

      s_cl5Desc.threadCount = 0;

  

-     if (NULL == cl5_diskfull_lock) {

-         cl5_diskfull_lock = PR_NewLock();

-     }

- 

      return CL5_SUCCESS;

  }

  
@@ -361,20 +299,6 @@ 

          slapi_destroy_rwlock(s_cl5Desc.stLock);

      s_cl5Desc.stLock = NULL;

  

-     if (cl5_diskfull_lock) {

-         PR_DestroyLock(cl5_diskfull_lock);

-         cl5_diskfull_lock = NULL;

-     }

-     if (s_cl5Desc.clLock != NULL) {

-         PR_DestroyLock(s_cl5Desc.clLock);

-         s_cl5Desc.clLock = NULL;

-     }

- 

-     if (s_cl5Desc.clCvar != NULL) {

-         PR_DestroyCondVar(s_cl5Desc.clCvar);

-         s_cl5Desc.clCvar = NULL;

-     }

- 

      memset(&s_cl5Desc, 0, sizeof(s_cl5Desc));

  }

  
@@ -382,8 +306,6 @@ 

     Description:    opens changelog; must be called after changelog is

                  initialized using cl5Init. It is thread safe and the second

                  call is ignored.

-    Parameters:  dir - changelog dir

-                 config - db configuration parameters; currently not used

     Return:        CL5_SUCCESS if successfull;

                  CL5_BAD_DATA if invalid directory is passed;

                  CL5_BAD_STATE if changelog is not initialized;
@@ -393,15 +315,10 @@ 

                  CL5_DB_ERROR if db initialization fails.

   */

  int

- cl5Open(const char *dir, const CL5DBConfig *config)

+ cl5Open(void)

  {

      int rc;

  

-     if (dir == NULL) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5Open: null directory\n");

-         return CL5_BAD_DATA;

-     }

- 

      if (s_cl5Desc.dbState == CL5_STATE_NONE) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5Open - Changelog is not initialized\n");
@@ -424,29 +341,27 @@ 

          goto done;

      }

  

-     rc = _cl5Open(dir, config, CL5_OPEN_NORMAL);

+     /* if we are here we know that the database environment is setup

+      * what remains is to setup the config info for all the individual

+      * changelogs.

+      * If we fail set state back to closed.

+      */

+     s_cl5Desc.dbState = CL5_STATE_OPEN;

+     slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

+                   "cl5Open - setting dbState to CL5_STATE_OPEN\n");

+     rc = _cl5Open(CL5_OPEN_NORMAL);

      if (rc != CL5_SUCCESS) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5Open - Failed to open changelog\n");

          goto done;

      }

  

-     /* dispatch global threads like deadlock detection, trimming, etc */

-     rc = _cl5DispatchDBThreads();

-     if (rc != CL5_SUCCESS) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                       "cl5Open - Failed to start database monitoring threads\n");

- 

-         _cl5Close();

-     } else {

-         s_cl5Desc.dbState = CL5_STATE_OPEN;

-         clcache_set_config();

- 

-         /* Set the cl encryption algorithm (if configured) */

-         rc = clcrypt_init(config, &s_cl5Desc.clcrypt_handle);

-     }

+     clcache_set_config();

  

  done:

+     if (rc != CL5_SUCCESS) {

+         s_cl5Desc.dbState = CL5_STATE_CLOSED;

+     }

      slapi_rwlock_unlock(s_cl5Desc.stLock);

  

      return rc;
@@ -489,115 +404,47 @@ 

  

      /* signal changelog closing to all threads */

      s_cl5Desc.dbState = CL5_STATE_CLOSING;

- 

-     PR_Lock(s_cl5Desc.clLock);

-     PR_NotifyCondVar(s_cl5Desc.clCvar);

-     PR_Unlock(s_cl5Desc.clLock);

+     /* replica_enumerate_replicas(cldb_StopTrimming, NULL); */

  

      _cl5Close();

  

      s_cl5Desc.dbState = CL5_STATE_CLOSED;

-     rc = clcrypt_destroy(s_cl5Desc.clcrypt_handle);

+ 

+     s_cl5Desc.dbEnv = NULL;

  

      slapi_rwlock_unlock(s_cl5Desc.stLock);

  

      return rc;

  }

  

- /* Name:        cl5Delete

-    Description:    removes changelog; changelog must be in the closed state.

-    Parameters:  dir - changelog directory

-    Return:        CL5_SUCCESS if successful;

-                 CL5_BAD_STATE if the changelog is not in closed state;

-                 CL5_BAD_DATA if invalid directory supplied

-                 CL5_SYSTEM_ERROR if NSPR call fails

-  */

- int

- cl5Delete(const char *dir)

+ static int

+ _cldb_DeleteDB(Replica *replica)

  {

-     int rc;

- 

-     if (dir == NULL) {

-         slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5Delete - NULL directory\n");

-         return CL5_BAD_DATA;

-     }

- 

-     if (s_cl5Desc.dbState == CL5_STATE_NONE) {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5Delete - Changelog is not initialized\n");

-         return CL5_BAD_STATE;

-     }

+     int rc = 0;

+     cldb_Handle *cldb;

+     Slapi_Backend *be;

  

-     slapi_rwlock_wrlock(s_cl5Desc.stLock);

+     cldb = replica_get_file_info(replica);

+     /* make sure that changelog stays open while operation is in progress */

  

-     if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5Delete - Invalid state - %d\n", s_cl5Desc.dbState);

-         slapi_rwlock_unlock(s_cl5Desc.stLock);

-         return CL5_BAD_STATE;

-     }

+     slapi_counter_increment(cldb->clThreads);

  

-     rc = _cl5Delete(dir, PR_TRUE /* remove changelog dir */);

-     if (rc != CL5_SUCCESS) {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5Delete - Failed to remove changelog\n");

-     }

+     be = slapi_be_select(replica_get_root(replica));

+  

+     slapi_back_ctrl_info(be, BACK_INFO_DBENV_CLDB_REMOVE, (void *)(cldb->db));

+     cldb->db = NULL;

  

-     slapi_rwlock_unlock(s_cl5Desc.stLock);

+     slapi_counter_decrement(cldb->clThreads);

      return rc;

  }

- 

- /* Name:        cl5DeleteDBSync

-    Description: The same as cl5DeleteDB except the function does not return

-                 until the file is removed.

- */

  int

- cl5DeleteDBSync(Replica *replica)

+ cldb_RemoveReplicaDB(Replica *replica)

  {

-     Object *obj;

-     int rc;

-     CL5DBFile *file;

- 

-     if (replica == NULL) {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5DeleteDBSync - invalid database id\n");

-         return CL5_BAD_DATA;

-     }

- 

-     /* changelog is not initialized */

-     if (s_cl5Desc.dbState == CL5_STATE_NONE) {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "

-                                                            "Changelog is not initialized\n");

-         return CL5_BAD_STATE;

-     }

- 

-     /* make sure that changelog stays open while operation is in progress */

-     rc = _cl5AddThread();

-     if (rc != CL5_SUCCESS)

-         return rc;

- 

-     rc = _cl5GetDBFile(replica, &obj);

-     if (rc == CL5_SUCCESS) {

-         char *filename = NULL;

-         file = (CL5DBFile *)object_get_data(obj);

-         PR_ASSERT(file);

-         /* file->name is freed in _cl5DBDeleteFile */

-         filename = slapi_ch_strdup(file->name);

- 

-         _cl5DBDeleteFile(obj);

- 

-         /* wait until the file is gone */

-         while (PR_Access(filename, PR_ACCESS_EXISTS) == PR_SUCCESS) {

-             DS_Sleep(PR_MillisecondsToInterval(100));

-         }

-         slapi_ch_free_string(&filename);

-     } else {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "

-                                                            "File for replica at (%s) not found\n",

-                       slapi_sdn_get_dn(replica_get_root(replica)));

-     }

+     int rc =0;

+     cldb_Handle *cldb = replica_get_file_info(replica);

  

-     _cl5RemoveThread();

+     cldb->flags |= DB_FILE_DONE;

+     rc = cldb_UnSetReplicaDB(replica, NULL);

      return rc;

  }

  
@@ -615,11 +462,8 @@ 

  int

  cl5GetUpperBoundRUV(Replica *r, RUV **ruv)

  {

-     int rc;

-     Object *file_obj;

-     CL5DBFile *file;

-     const char *replName;

-     char *replGen;

+     int rc = CL5_SUCCESS;

+     cldb_Handle *cldb = NULL;

  

      if (r == NULL || ruv == NULL) {

          slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
@@ -634,28 +478,13 @@ 

          return CL5_BAD_STATE;

      }

  

+     cldb = replica_get_file_info(r);

      /* make sure that changelog stays open while operation is in progress */

-     rc = _cl5AddThread();

-     if (rc != CL5_SUCCESS)

-         return rc;

- 

-     replName = replica_get_name(r);

-     replGen = replica_get_generation(r);

-     rc = _cl5GetDBFileByReplicaName(replName, replGen, &file_obj);

-     slapi_ch_free_string(&replGen);

-     if (rc == CL5_SUCCESS) {

-         file = (CL5DBFile *)object_get_data(file_obj);

-         PR_ASSERT(file && file->maxRUV);

- 

-         *ruv = ruv_dup(file->maxRUV);

- 

-         object_release(file_obj);

-     } else {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV - "

-                                                            "Could not find DB object for replica\n");

-     }

- 

-     _cl5RemoveThread();

+     slapi_counter_increment(cldb->clThreads);

+     PR_ASSERT(cldb && cldb->maxRUV);

+     *ruv = ruv_dup(cldb->maxRUV);

+     

+     slapi_counter_decrement(cldb->clThreads);

      return rc;

  }

  
@@ -674,12 +503,10 @@ 

                  CL5_MEMORY_ERROR if memory allocation fials.

   */

  int

- cl5ExportLDIF(const char *ldifFile, Replica **replicas)

+ cl5ExportLDIF(const char *ldifFile, Replica *replica)

  {

-     int i;

-     int rc;

      PRFileDesc *prFile = NULL;

-     Object *file_obj;

+     int rc;

  

      if (ldifFile == NULL) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
@@ -710,28 +537,17 @@ 

      slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,

                    "cl5ExportLDIF: starting changelog export to (%s) ...\n", ldifFile);

  

-     if (replicas) /* export only selected files */

-     {

-         for (i = 0; replicas[i]; i++) {

-             rc = _cl5GetDBFile(replicas[i], &file_obj);

-             if (rc == CL5_SUCCESS) {

-                 rc = _cl5ExportFile(prFile, file_obj);

-                 object_release(file_obj);

-             } else {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5ExportLDIF - "

-                                                                   "Failed to locate changelog file for replica at (%s)\n",

-                               slapi_sdn_get_dn(replica_get_root(replicas[i])));

-             }

-         }

-     } else /* export all files */

+     if (replica) /* export only selected files */

      {

-         for (file_obj = objset_first_obj(s_cl5Desc.dbFiles); file_obj;

-              file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj)) {

-             rc = _cl5ExportFile(prFile, file_obj);

+         cldb_Handle *cldb = replica_get_file_info(replica);

+         rc = _cl5ExportFile (prFile, cldb);

+         if (rc) {

+             slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5ExportLDIF - "

+                           "failed to locate changelog file for replica at (%s)\n",

+                           slapi_sdn_get_dn (replica_get_root (replica)));

          }

      }

  

-     rc = CL5_SUCCESS;

  done:;

  

      _cl5RemoveThread();
@@ -759,7 +575,7 @@ 

                  CL5_MEMORY_ERROR if memory allocation fials.

   */

  int

- cl5ImportLDIF(const char *clDir, const char *ldifFile, Replica **replicas)

+ cl5ImportLDIF(const char *clDir, const char *ldifFile, Replica *replica)

  {

      LDIFFP *file = NULL;

      int buflen = 0;
@@ -767,17 +583,13 @@ 

      int rc;

      char *buff = NULL;

      slapi_operation_parameters op;

-     Replica *prim_replica = NULL;

-     Replica *replica = NULL;

-     Object *file_obj = NULL;

      char *replGen = NULL;

-     CL5DBFile *dbfile = NULL;

-     struct berval **purgevals = NULL;

-     struct berval **maxvals = NULL;

      int purgeidx = 0;

      int maxidx = 0;

      int maxpurgesz = 0;

      int maxmaxsz = 0;

+     struct berval **purgevals = NULL;

+     struct berval **maxvals = NULL;

      int entryCount = 0;

  

      /* validate params */
@@ -793,14 +605,7 @@ 

          return CL5_BAD_STATE;

      }

  

-     if (replicas == NULL) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                       "cl5ImportLDIF - null list of replicas\n");

-         return CL5_BAD_DATA;

-     }

- 

-     prim_replica = replicas[0];

-     if (NULL == prim_replica) {

+     if (NULL == replica) {

          /* Never happens for now. (see replica_execute_ldif2cl_task) */

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5ImportLDIF - empty replica list\n");
@@ -830,21 +635,25 @@ 

      }

  

      /* remove changelog */

+     /* TBD (LK) remove and recreate cl database */

+     /* rc = _cl5Delete(clDir, PR_FALSE);

      rc = _cl5Delete(clDir, PR_FALSE);

      if (rc != CL5_SUCCESS) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5ImportLDIF - Failed to remove changelog\n");

          goto done;

      }

+     */

  

      /* open changelog */

-     rc = _cl5Open(clDir, NULL, CL5_OPEN_LDIF2CL);

+     rc = _cl5Open(CL5_OPEN_LDIF2CL);

      if (rc != CL5_SUCCESS) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5ImportLDIF - Failed to open changelog\n");

          goto done;

      }

      s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */

+     cldb_Handle *cldb = replica_get_file_info(replica);

  

  /* read entries and write them to changelog */

      while (ldif_read_record(file, &lineno, &buff, &buflen))
@@ -908,9 +717,8 @@ 

          }

          slapi_ch_free_string(&buff);

          buflen = 0;

-         /* if we perform selective import, check if the operation should be wriiten to changelog */

-         replica = _cl5GetReplica(&op, replGen);

-         if (replica == NULL) {

+         /* check if the operation should be written to changelog */

+         if (0 == strcmp(replGen, cldb->ident)) {

              /*

               * changetype: delete

               * replgen: 4d13a124000000010000
@@ -918,8 +726,7 @@ 

               * nsuniqueid: 00000000-00000000-00000000-00000000

               * dn: cn=start iteration

               */

-             rc = _cl5WriteOperation(replica_get_name(prim_replica),

-                                     replGen, &op, 1);

+             rc = _cl5WriteOperation (cldb, &op);

              if (rc != CL5_SUCCESS) {

                  slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                                "cl5ImportLDIF - "
@@ -934,52 +741,23 @@ 

              goto next;

          }

  

-         if (!replicas || _cl5ReplicaInList(replica, replicas)) {

-             /* write operation creates the file if it does not exist */

-             rc = _cl5WriteOperation(replica_get_name(replica),

-                                     replGen, &op, 1);

-             if (rc != CL5_SUCCESS) {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                               "cl5ImportLDIF - "

-                               "Failed to write operation to the changelog: "

-                               "type: %lu, dn: %s\n",

-                               op.operation_type, REPL_GET_DN(&op.target_address));

-                 slapi_ch_free_string(&replGen);

-                 operation_parameters_done(&op);

-                 goto done;

-             }

-             entryCount++;

-         }

      next:

          slapi_ch_free_string(&replGen);

          operation_parameters_done(&op);

      }

  

      /* Set RUVs and entry count */

-     file_obj = objset_first_obj(s_cl5Desc.dbFiles);

-     while (file_obj) {

-         dbfile = (CL5DBFile *)object_get_data(file_obj);

-         if (0 == strcasecmp(dbfile->replName, replica_get_name(prim_replica))) {

-             break;

-         }

-         dbfile = NULL;

-         file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);

-     }

- 

-     if (dbfile) {

+     if (cldb) {

          if (purgeidx > 0) {

-             ruv_destroy(&dbfile->purgeRUV);

-             rc = ruv_init_from_bervals(purgevals, &dbfile->purgeRUV);

+             ruv_destroy(&cldb->purgeRUV);

+             rc = ruv_init_from_bervals(purgevals, &cldb->purgeRUV);

          }

          if (maxidx > 0) {

-             ruv_destroy(&dbfile->maxRUV);

-             rc = ruv_init_from_bervals(maxvals, &dbfile->maxRUV);

+             ruv_destroy(&cldb->maxRUV);

+             rc = ruv_init_from_bervals(maxvals, &cldb->maxRUV);

          }

  

-         dbfile->entryCount = entryCount;

-     }

-     if (file_obj) {

-         object_release(file_obj);

+         cldb->entryCount = entryCount;

      }

  

  done:
@@ -1018,60 +796,64 @@ 

     Description:    sets changelog trimming parameters; changelog must be open.

     Parameters:  maxEntries - maximum number of entries in the chnagelog (in all files);

                  maxAge - maximum entry age;

-                 compactInterval - interval to compact changelog db;

                  trimInterval - changelog trimming interval.

     Return:        CL5_SUCCESS if successful;

                  CL5_BAD_STATE if changelog is not open

   */

  int

- cl5ConfigTrimming(int maxEntries, const char *maxAge, int compactInterval, int trimInterval)

+ cl5ConfigTrimming(Replica *replica, int maxEntries, const char *maxAge, int trimInterval)

  {

+     int isTrimmingEnabledBefore = 0;

+     int isTrimmingEnabledAfter = 0;

+     cldb_Handle *cldb = replica_get_file_info(replica);

+ 

      if (s_cl5Desc.dbState == CL5_STATE_NONE) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5ConfigTrimming - Changelog is not initialized\n");

          return CL5_BAD_STATE;

      }

  

-     /* make sure changelog is not closed while trimming configuration

-        is updated.*/

-     if (CL5_SUCCESS != _cl5AddThread()) {

-         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

-                       "cl5ConfigTrimming - Could not start changelog trimming thread\n");

-         return CL5_BAD_STATE;

-     }

+     slapi_counter_increment(cldb->clThreads);

+     /* make sure changelog is not closed while trimming configuration is updated.*/

  

-     PR_Lock(s_cl5Desc.dbTrim.lock);

+     PR_Lock(cldb->clLock); 

+ 

+     isTrimmingEnabledBefore = cldb_IsTrimmingEnabled(cldb);

  

      if (maxAge) {

          /* don't ignore this argument */

          if (strcmp(maxAge, CL5_STR_IGNORE) != 0) {

-             s_cl5Desc.dbTrim.maxAge = slapi_parse_duration(maxAge);

+             cldb->clConf.maxAge = slapi_parse_duration(maxAge);

          }

      } else {

          /* unlimited */

-         s_cl5Desc.dbTrim.maxAge = 0;

+         cldb->clConf.maxAge = 0;

      }

  

      if (maxEntries != CL5_NUM_IGNORE) {

-         s_cl5Desc.dbTrim.maxEntries = maxEntries;

-     }

- 

-     if (compactInterval != CL5_NUM_IGNORE) {

-         s_cl5Desc.dbTrim.compactInterval = compactInterval;

+         cldb->clConf.maxEntries = maxEntries;

      }

  

      if (trimInterval != CL5_NUM_IGNORE) {

-         s_cl5Desc.dbTrim.trimInterval = trimInterval;

+         cldb->clConf.trimInterval = trimInterval;

      }

  

-     /* The config was updated, notify the changelog trimming thread */

-     PR_Lock(s_cl5Desc.clLock);

-     PR_NotifyCondVar(s_cl5Desc.clCvar);

-     PR_Unlock(s_cl5Desc.clLock);

+     isTrimmingEnabledAfter = cldb_IsTrimmingEnabled(cldb);

  

-     PR_Unlock(s_cl5Desc.dbTrim.lock);

+     if (isTrimmingEnabledAfter && !isTrimmingEnabledBefore) {

+         /* start trimming */

+         cldb_StartTrimming(replica);

+     } else if (!isTrimmingEnabledAfter && isTrimmingEnabledBefore) {

+         /* stop trimming */

+         cldb_StopTrimming(replica, NULL);

+     } else {

+         /* The config was updated, notify the changelog trimming thread */

+         PR_NotifyCondVar(cldb->clCvar);

+     }

  

-     _cl5RemoveThread();

+     PR_Unlock(cldb->clLock);

+ 

+     slapi_counter_decrement(cldb->clThreads);

  

      return CL5_SUCCESS;

  }
@@ -1093,8 +875,11 @@ 

      if (it->cursor)

          it->cursor->c_close(it->cursor);

  

+     /* NOTE (LK) locking of CL files  ?*/

+     /*

      if (it->file)

          object_release(it->file);

+     */

  

      slapi_ch_free((void **)&it);

  }
@@ -1107,7 +892,6 @@ 

                     replica object since generation can change while operation

                     is in progress (if the data is reloaded). !!!

                  op - operation to write

-                 local - this is a non-replicated operation

                  txn - the transaction containing this operation

     Return:        CL5_SUCCESS if function is successfull;

                  CL5_BAD_DATA if invalid op is passed;
@@ -1116,7 +900,7 @@ 

                  CL5_DB_ERROR if any other db error occured;

   */

  int

- cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn)

+ cl5WriteOperationTxn(cldb_Handle *cldb, const slapi_operation_parameters *op, void *txn)

  {

      int rc;

  
@@ -1138,23 +922,16 @@ 

      }

  

      /* make sure that changelog is open while operation is in progress */

-     rc = _cl5AddThread();

-     if (rc != CL5_SUCCESS)

-         return rc;

+     slapi_counter_increment(cldb->clThreads);

  

-     rc = _cl5WriteOperationTxn(replName, replGen, op, local, txn);

+     rc = _cl5WriteOperationTxn(cldb, op, txn);

  

      /* update the upper bound ruv vector */

      if (rc == CL5_SUCCESS) {

-         Object *file_obj = NULL;

- 

-         if (_cl5GetDBFileByReplicaName(replName, replGen, &file_obj) == CL5_SUCCESS) {

-             rc = _cl5UpdateRUV(file_obj, op->csn, PR_FALSE, PR_FALSE);

-             object_release(file_obj);

-         }

+         rc = _cl5UpdateRUV(cldb, op->csn, PR_FALSE, PR_FALSE);

      }

  

-     _cl5RemoveThread();

+     slapi_counter_decrement(cldb->clThreads);

  

      return rc;

  }
@@ -1167,7 +944,6 @@ 

                     replica object since generation can change while operation

                     is in progress (if the data is reloaded). !!!

                  op - operation to write

-                 local - this is a non-replicated operation

     Return:        CL5_SUCCESS if function is successfull;

                  CL5_BAD_DATA if invalid op is passed;

                  CL5_BAD_STATE if db has not been initialized;
@@ -1175,9 +951,9 @@ 

                  CL5_DB_ERROR if any other db error occured;

   */

  int

- cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local)

+ cl5WriteOperation(cldb_Handle *cldb, const slapi_operation_parameters *op)

  {

-     return cl5WriteOperationTxn(replName, replGen, op, local, NULL);

+     return cl5WriteOperationTxn(cldb, op, NULL);

  }

  

  /* Name:        cl5CreateReplayIterator
@@ -1218,7 +994,6 @@ 

  {

      int rc;

      Replica *replica;

-     Object *file_obj = NULL;

  

      replica = prp->replica;

      if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
@@ -1240,22 +1015,11 @@ 

      if (rc != CL5_SUCCESS)

          return rc;

  

- 

-     rc = _cl5GetDBFile(replica, &file_obj);

-     if (rc == CL5_SUCCESS && file_obj) {

-         /* iterate through the ruv in csn order to find first master for which

-            we can replay changes */

- 

-         rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, file_obj, iterator, NULL);

-     } else {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5CreateReplayIteratorEx - Could not find DB object for replica\n");

-     }

+     /* iterate through the ruv in csn order to find first master for which 

+        we can replay changes */		    

+     rc = _cl5PositionCursorForReplay (consumerRID, consumerRuv, replica, iterator, NULL);

  

      if (rc != CL5_SUCCESS) {

-         if (file_obj) {

-             object_release(file_obj);

-         }

          /* release the thread */

          _cl5RemoveThread();

      }
@@ -1275,7 +1039,6 @@ 

  

      int rc;

      Replica *replica;

-     Object *file_obj = NULL;

  

      replica = prp->replica;

      if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
@@ -1297,28 +1060,18 @@ 

      if (rc != CL5_SUCCESS)

          return rc;

  

- 

-     rc = _cl5GetDBFile(replica, &file_obj);

-     if (rc == CL5_SUCCESS && file_obj) {

-         /* iterate through the ruv in csn order to find first master for which

-            we can replay changes */

-         ReplicaId consumerRID = agmt_get_consumer_rid(prp->agmt, prp->conn);

-         int continue_on_missing = agmt_get_ignoremissing(prp->agmt);

-         int save_cont_miss = continue_on_missing;

-         rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, file_obj, iterator, &continue_on_missing);

-         if (save_cont_miss == 1 && continue_on_missing == 0) {

-             /* the option to continue once on a missing csn was used, rest */

-             agmt_set_ignoremissing(prp->agmt, 0);

-         }

-     } else {

-         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,

-                       "cl5CreateReplayIterator - Could not find DB object for replica\n");

+     /* iterate through the ruv in csn order to find first master for which

+        we can replay changes */

+     ReplicaId consumerRID = agmt_get_consumer_rid(prp->agmt, prp->conn);

+     int continue_on_missing = agmt_get_ignoremissing(prp->agmt);

+     int save_cont_miss = continue_on_missing;

+     rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, iterator, &continue_on_missing);

+     if (save_cont_miss == 1 && continue_on_missing == 0) {

+         /* the option to continue once on a missing csn was used, rest */

+         agmt_set_ignoremissing(prp->agmt, 0);

      }

  

      if (rc != CL5_SUCCESS) {

-         if (file_obj)

-             object_release(file_obj);

- 

          /* release the thread */

          _cl5RemoveThread();

      }
@@ -1396,7 +1149,7 @@ 

  

      /* there is an entry we should return */

      /* Callers of this function should cl5_operation_parameters_done(op) */

-     if (0 != cl5DBData2Entry(data, datalen, entry)) {

+     if (0 != cl5DBData2Entry(data, datalen, entry, iterator->it_cldb->clcrypt_handle)) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,

                        "cl5GetNextOperationToReplay - %s - Failed to format entry rc=%d\n", agmt_name, rc);

          return rc;
@@ -1421,10 +1174,12 @@ 

  

      clcache_return_buffer(&(*iterator)->clcache);

  

-     if ((*iterator)->fileObj) {

-         object_release((*iterator)->fileObj);

-         (*iterator)->fileObj = NULL;

+     /* TBD (LK) lock/unlock cldb ?

+      if ((*iterator)->it_cldb) {

+         object_release((*iterator)->it_cldb);

+         (*iterator)->it_cldb = NULL;

      }

+     */

  

      /* release supplier's ruv */

      if ((*iterator)->supplierRuvObj) {
@@ -1438,50 +1193,6 @@ <