#51214 Ticket 51190 - SyncRepl plugin provides a wrong cookie
Closed 3 years ago by spichugi. Opened 3 years ago by tbordaz.
tbordaz/389-ds-base ticket_51190  into  master

@@ -12,6 +12,9 @@ 

  @author: mreynolds

  '''

  import logging

+ import threading

These seem like left overs that you don't need to add here :)

If I remove any of those 3 lines, the tests are failing :(

+ from ldap.syncrepl import SyncreplConsumer

+ from ldap.ldapobject import ReconnectLDAPObject

  import subprocess

  import pytest

  from lib389.utils import *
@@ -1548,7 +1551,6 @@ 

      log.info('test_referint: PASS\n')

      return

  

- 

  def test_retrocl(topo, args=None):

      """Test Retro Changelog basic functionality

  

@@ -9,14 +9,17 @@ 

  import logging

  import ldap

  import time

+ import threading

  from ldap.syncrepl import SyncreplConsumer

+ from ldap.ldapobject import ReconnectLDAPObject

  import pytest

  from lib389 import DirSrv

  from lib389.idm.user import nsUserAccounts, UserAccounts

+ from lib389.idm.group import Groups

  from lib389.topologies import topology_st as topology

  from lib389.paths import Paths

  from lib389.utils import ds_is_older

- from lib389.plugins import RetroChangelogPlugin, ContentSyncPlugin

+ from lib389.plugins import RetroChangelogPlugin, ContentSyncPlugin, AutoMembershipPlugin, MemberOfPlugin, MemberOfSharedConfig, AutoMembershipDefinitions

  from lib389._constants import *

  

  from . import ISyncRepl, syncstate_assert
@@ -58,3 +61,493 @@ 

      sync = ISyncRepl(st)

      # Run the checks

      syncstate_assert(st, sync)

+ 

+ class TestSyncer(ReconnectLDAPObject, SyncreplConsumer):

+     def __init__(self, *args, **kwargs):

+         self.cookie = None

+         self.cookies = []

+         ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)

+ 

+     def syncrepl_set_cookie(self, cookie):

+         # extract the changenumber from the cookie

+         self.cookie = cookie

+         self.cookies.append(cookie.split('#')[2])

+         log.info("XXXX Set cookie: %s" % cookie)

+ 

+     def syncrepl_get_cookie(self):

+         log.info("XXXX Get cookie: %s" % self.cookie)

+         return self.cookie

+ 

+     def syncrepl_present(self, uuids, refreshDeletes=False):

+         log.info("XXXX syncrepl_present uuids %s %s" % ( uuids, refreshDeletes))

+ 

+     def syncrepl_delete(self, uuids):

+         log.info("XXXX syncrepl_delete uuids %s" % uuids)

+ 

+     def syncrepl_entry(self, dn, attrs, uuid):

+         log.info("XXXX syncrepl_entry dn %s" % dn)

+ 

+     def syncrepl_refreshdone(self):

+         log.info("XXXX syncrepl_refreshdone")

+ 

+     def get_cookies(self):

+         return self.cookies

+ 

+ class Sync_persist(threading.Thread, ReconnectLDAPObject, SyncreplConsumer):

+     # This runs a sync_repl client in background

+     # it registers a result that contain a list of the change numbers (from the cookie)

+     # that are list as they are received

+     def __init__(self, inst):

+         threading.Thread.__init__(self)

+         self.daemon = True

+         self.inst = inst

+         self.cookie = None

+         self.conn = inst.clone({SER_ROOT_DN: 'cn=directory manager', SER_ROOT_PW: 'password'})

+         self.filterstr = '(|(objectClass=groupofnames)(objectClass=person))'

+         self.attrs = [

+             'objectclass',

+             'cn',

+             'displayname',

+             'gidnumber',

+             'givenname',

+             'homedirectory',

+             'mail',

+             'member',

+             'memberof',

+             'sn',

+             'uid',

+             'uidnumber',

+         ]

+         self.conn.open()

+         self.result = []

+ 

+     def get_result(self):

+         # used to return the cookies list to the requestor

+         return self.result

+ 

+     def run(self):

+         """Start a sync repl client"""

+         ldap_connection = TestSyncer(self.inst.toLDAPURL())

+         ldap_connection.simple_bind_s('cn=directory manager', 'password')

+         ldap_search = ldap_connection.syncrepl_search(

+             "dc=example,dc=com",

+             ldap.SCOPE_SUBTREE,

+             mode='refreshAndPersist',

+             attrlist=self.attrs,

+             filterstr=self.filterstr,

+             cookie=None

+         )

+ 

+         try:

+             while ldap_connection.syncrepl_poll(all=1, msgid=ldap_search):

+                 pass

+         except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR) as e:

+             print('syncrepl_poll: LDAP error (%s)', e)

+         self.result = ldap_connection.get_cookies()

+         log.info("ZZZ result = %s" % self.result)

+         self.conn.unbind()

+ 

+ def test_sync_repl_cookie(topology, request):

+     """Test sync_repl cookie are progressing is an increasing order

+        when there are nested updates

+ 

+     :id: d7fbde25-5702-46ac-b38e-169d7a68e97c

+     :setup: Standalone Instance

+     :steps:

+       1.: enable retroCL

+       2.: configure retroCL to log nsuniqueid as targetUniqueId

+       3.: enable content_sync plugin

+       4.: enable automember

+       5.: create (2) groups. Few groups can help to reproduce the concurrent updates problem.

+       6.: configure automember to provision those groups with 'member'

+       7.: enable and configure memberof plugin

+       8.: enable plugin log level

+       9.: restart the server

+       10.: create a thread dedicated to run a sync repl client

+       11.: Create (9) users that will generate nested updates (automember/memberof)

+       12.: stop sync repl client and collect the list of cookie.change_no

+       13.: check that cookies.change_no are in increasing order

+     :expectedresults:

+       1.: succeeds

+       2.: succeeds

+       3.: succeeds

+       4.: succeeds

+       5.: succeeds

+       6.: succeeds

+       7.: succeeds

+       8.: succeeds

+       9.: succeeds

+       10.: succeeds

+       11.: succeeds

+       12.: succeeds

+       13.: succeeds

+     """

+     inst = topology[0]

+ 

+     # Enable/configure retroCL

+     plugin = RetroChangelogPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+     plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')

+ 

+     # Enable sync plugin

+     plugin = ContentSyncPlugin(inst)

+     plugin.enable()

+ 

+     # Enable automember

+     plugin = AutoMembershipPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     # Add the automember group

+     groups = Groups(inst, DEFAULT_SUFFIX)

+     group = []

+     for i in range(1,3):

+         group.append(groups.create(properties={'cn': 'group%d' % i}))

+ 

+     # Add the automember config entry

+     am_configs = AutoMembershipDefinitions(inst)

+     for g in group:

+         am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),

+                                                   'autoMemberScope': DEFAULT_SUFFIX,

+                                                   'autoMemberFilter': 'uid=*',

+                                                   'autoMemberDefaultGroup': g.dn,

+                                                   'autoMemberGroupingAttr': 'member:dn'})

+ 

+     # Enable and configure memberof plugin

+     plugin = MemberOfPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     plugin.replace_groupattr('member')

+ 

+     memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))

+     memberof_config.create(properties={'cn': 'memberOf config',

+                                        'memberOfGroupAttr': 'member',

+                                        'memberOfAttr': 'memberof'})

+     # Enable plugin log level (usefull for debug)

+     inst.setLogLevel(65536)

+     inst.restart()

+ 

+     # create a sync repl client and wait 5 seconds to be sure it is running

+     sync_repl = Sync_persist(inst)

+     sync_repl.start()

+     time.sleep(5)

+ 

+     # create users, that automember/memberof will generate nested updates

+     users = UserAccounts(inst, DEFAULT_SUFFIX)

+     users_set = []

+     for i in range(10001, 10010):

+         users_set.append(users.create_test_user(uid=i))

+ 

+     # stop the server to get the sync_repl result set (exit from while loop).

+     # Only way I found to acheive that.

+     # and wait a bit to let sync_repl thread time to set its result before fetching it.

+     inst.stop()

+     time.sleep(10)

+     cookies = sync_repl.get_result()

+ 

+     # checking that the cookie are in increasing and in an acceptable range (0..1000)

+     assert len(cookies) > 0

+     prev = 0

+     for cookie in cookies:

+         log.info('Check cookie %s' % cookie)

+ 

+         assert int(cookie) > 0

+         assert int(cookie) < 1000

+         assert int(cookie) > prev

+         prev = int(cookie)

+     sync_repl.join()

+     log.info('test_sync_repl_cookie: PASS\n')

+ 

+     def fin():

+         inst.restart()

+         for user in users_set:

+             try:

+                 user.delete()

+             except:

+                 pass

+         for g in group:

+             try:

+                 g.delete()

+             except:

+                 pass

+ 

+     request.addfinalizer(fin)

+ 

+     return

+ 

+ def test_sync_repl_cookie_add_del(topology, request):

+     """Test sync_repl cookie are progressing is an increasing order

+        when there add and del

+ 

+     :id: 83e11038-6ed0-4a5b-ac77-e44887ab11e3

+     :setup: Standalone Instance

+     :steps:

+       1.: enable retroCL

+       2.: configure retroCL to log nsuniqueid as targetUniqueId

+       3.: enable content_sync plugin

+       4.: enable automember

+       5.: create (2) groups. Few groups can help to reproduce the concurrent updates problem.

+       6.: configure automember to provision those groups with 'member'

+       7.: enable and configure memberof plugin

+       8.: enable plugin log level

+       9.: restart the server

+       10.: create a thread dedicated to run a sync repl client

+       11.: Create (3) users that will generate nested updates (automember/memberof)

+       12.: Delete (3) users

+       13.: stop sync repl client and collect the list of cookie.change_no

+       14.: check that cookies.change_no are in increasing order

+     :expectedresults:

+       1.: succeeds

+       2.: succeeds

+       3.: succeeds

+       4.: succeeds

+       5.: succeeds

+       6.: succeeds

+       7.: succeeds

+       8.: succeeds

+       9.: succeeds

+       10.: succeeds

+       11.: succeeds

+       12.: succeeds

+       13.: succeeds

+       14.: succeeds

+     """

+     inst = topology[0]

+ 

+     # Enable/configure retroCL

+     plugin = RetroChangelogPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+     plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')

+ 

+     # Enable sync plugin

+     plugin = ContentSyncPlugin(inst)

+     plugin.enable()

+ 

+     # Enable automember

+     plugin = AutoMembershipPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     # Add the automember group

+     groups = Groups(inst, DEFAULT_SUFFIX)

+     group = []

+     for i in range(1,3):

+         group.append(groups.create(properties={'cn': 'group%d' % i}))

+ 

+     # Add the automember config entry

+     am_configs = AutoMembershipDefinitions(inst)

+     for g in group:

+         am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),

+                                                   'autoMemberScope': DEFAULT_SUFFIX,

+                                                   'autoMemberFilter': 'uid=*',

+                                                   'autoMemberDefaultGroup': g.dn,

+                                                   'autoMemberGroupingAttr': 'member:dn'})

+ 

+     # Enable and configure memberof plugin

+     plugin = MemberOfPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     plugin.replace_groupattr('member')

+ 

+     memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))

+     memberof_config.create(properties={'cn': 'memberOf config',

+                                        'memberOfGroupAttr': 'member',

+                                        'memberOfAttr': 'memberof'})

+     # Enable plugin log level (usefull for debug)

+     inst.setLogLevel(65536)

+     inst.restart()

+ 

+     # create a sync repl client and wait 5 seconds to be sure it is running

+     sync_repl = Sync_persist(inst)

+     sync_repl.start()

+     time.sleep(5)

+ 

+     # create users, that automember/memberof will generate nested updates

+     users = UserAccounts(inst, DEFAULT_SUFFIX)

+     users_set = []

+     for i in range(10001, 10004):

+         users_set.append(users.create_test_user(uid=i))

+ 

+     time.sleep(10)

+     # delete users, that automember/memberof will generate nested updates

+     for user in users_set:

+         user.delete()

+     # stop the server to get the sync_repl result set (exit from while loop).

+     # Only way I found to acheive that.

+     # and wait a bit to let sync_repl thread time to set its result before fetching it.

+     inst.stop()

+     cookies = sync_repl.get_result()

+ 

+     # checking that the cookie are in increasing and in an acceptable range (0..1000)

+     assert len(cookies) > 0

+     prev = 0

+     for cookie in cookies:

+         log.info('Check cookie %s' % cookie)

+ 

+         assert int(cookie) > 0

+         assert int(cookie) < 1000

+         assert int(cookie) > prev

+         prev = int(cookie)

+     sync_repl.join()

+     log.info('test_sync_repl_cookie_add_del: PASS\n')

+ 

+     def fin():

+         inst.restart()

+         for g in group:

+             try:

+                 g.delete()

+             except:

+                 pass

+ 

+     request.addfinalizer(fin)

+ 

+     return

+ 

+ def test_sync_repl_cookie_with_failure(topology, request):

+     """Test sync_repl cookie are progressing is the right order

+        when there is a failure in nested updates

+ 

+     :id: e0103448-170e-4080-8f22-c34606447ce2

+     :setup: Standalone Instance

+     :steps:

+       1.: enable retroCL

+       2.: configure retroCL to log nsuniqueid as targetUniqueId

+       3.: enable content_sync plugin

+       4.: enable automember

+       5.: create (4) groups.

+           make group2 groupOfUniqueNames so the automember

+           will fail to add 'member' (uniqueMember expected)

+       6.: configure automember to provision those groups with 'member'

+       7.: enable and configure memberof plugin

+       8.: enable plugin log level

+       9.: restart the server

+       10.: create a thread dedicated to run a sync repl client

+       11.: Create a group that will be the only update received by sync repl client

+       12.: Create (9) users that will generate nested updates (automember/memberof)

+       13.: stop sync repl client and collect the list of cookie.change_no

+       14.: check that the list of cookie.change_no contains only the group 'step 11'

+     :expectedresults:

+       1.: succeeds

+       2.: succeeds

+       3.: succeeds

+       4.: succeeds

+       5.: succeeds

+       6.: succeeds

+       7.: succeeds

+       8.: succeeds

+       9.: succeeds

+       10.: succeeds

+       11.: succeeds

+       12.: Fails (expected)

+       13.: succeeds

+       14.: succeeds

+     """

+     inst = topology[0]

+ 

+     # Enable/configure retroCL

+     plugin = RetroChangelogPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+     plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')

+ 

+     # Enable sync plugin

+     plugin = ContentSyncPlugin(inst)

+     plugin.enable()

+ 

+     # Enable automember

+     plugin = AutoMembershipPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     # Add the automember group

+     groups = Groups(inst, DEFAULT_SUFFIX)

+     group = []

+     for i in range(1,5):

+         group.append(groups.create(properties={'cn': 'group%d' % i}))

+ 

+     # Set group2 as a groupOfUniqueNames so that automember will fail to update that group

+     # This will trigger a failure in internal MOD and a failure to add member

+     group[1].replace('objectclass', 'groupOfUniqueNames')

+ 

+     # Add the automember config entry

+     am_configs = AutoMembershipDefinitions(inst)

+     for g in group:

+         am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),

+                                                   'autoMemberScope': DEFAULT_SUFFIX,

+                                                   'autoMemberFilter': 'uid=*',

+                                                   'autoMemberDefaultGroup': g.dn,

+                                                   'autoMemberGroupingAttr': 'member:dn'})

+ 

+     # Enable and configure memberof plugin

+     plugin = MemberOfPlugin(inst)

+     plugin.disable()

+     plugin.enable()

+ 

+     plugin.replace_groupattr('member')

+ 

+     memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))

+     memberof_config.create(properties={'cn': 'memberOf config',

+                                        'memberOfGroupAttr': 'member',

+                                        'memberOfAttr': 'memberof'})

+ 

+     # Enable plugin log level (usefull for debug)

+     inst.setLogLevel(65536)

+     inst.restart()

+ 

+     # create a sync repl client and wait 5 seconds to be sure it is running

+     sync_repl = Sync_persist(inst)

+     sync_repl.start()

+     time.sleep(5)

+ 

+     # Add a test group just to check that sync_repl receives only one update

+     group.append(groups.create(properties={'cn': 'group%d' % 10}))

+ 

+     # create users, that automember/memberof will generate nested updates

+     users = UserAccounts(inst, DEFAULT_SUFFIX)

+     users_set = []

+     for i in range(1000,1010):

+         try:

+             users_set.append(users.create_test_user(uid=i))

+             # Automember should fail to add uid=1000 in group2

+             assert(False)

+         except ldap.UNWILLING_TO_PERFORM:

+             pass

+ 

+     # stop the server to get the sync_repl result set (exit from while loop).

+     # Only way I found to acheive that.

+     # and wait a bit to let sync_repl thread time to set its result before fetching it.

+     inst.stop()

+     time.sleep(10)

+     cookies = sync_repl.get_result()

+ 

+     # checking that the cookie list contains only one entry

+     assert len(cookies) == 1

+     prev = 0

+     for cookie in cookies:

+         log.info('Check cookie %s' % cookie)

+ 

+         assert int(cookie) > 0

+         assert int(cookie) < 1000

+         assert int(cookie) > prev

+         prev = int(cookie)

+     sync_repl.join()

+     log.info('test_sync_repl_cookie_with_failure: PASS\n')

+ 

+     def fin():

+         inst.restart()

+         for user in users_set:

+             try:

+                 user.delete()

+             except:

+                 pass

+         for g in group:

+             try:

+                 g.delete()

+             except:

+                 pass

+ 

+     request.addfinalizer(fin)

@@ -17,6 +17,7 @@ 

  #include <stdio.h>

  #include <string.h>

  #include <stdbool.h>

+ #include "slap.h"

  #include "slapi-plugin.h"

  #include "slapi-private.h"

  
@@ -26,6 +27,8 @@ 

  #define SYNC_PREOP_DESC       "content-sync-preop-subplugin"

  #define SYNC_POSTOP_DESC      "content-sync-postop-subplugin"

  #define SYNC_INT_POSTOP_DESC  "content-sync-int-postop-subplugin"

+ #define SYNC_BETXN_PREOP_DESC "content-sync-betxn-preop-subplugin"

+ #define SYNC_BE_POSTOP_DESC "content-sync-be-post-subplugin"

  

  #define OP_FLAG_SYNC_PERSIST 0x01

  
@@ -66,6 +69,37 @@ 

      Sync_UpdateNode *cb_updates;

  } Sync_CallBackData;

  

+ /* Pending list flags 

+  * OPERATION_PL_PENDING: operation not yet completed

+  * OPERATION_PL_SUCCEEDED: operation completed successfully

+  * OPERATION_PL_FAILED: operation completed and failed

+  * OPERATION_PL_IGNORED: operation completed but with an undefine status

+  */

+ typedef enum _pl_flags {

+     OPERATION_PL_PENDING = 1,

+     OPERATION_PL_SUCCEEDED = 2,

+     OPERATION_PL_FAILED = 3,

+     OPERATION_PL_IGNORED = 4

+ } pl_flags_t;

+ 

+ /* Pending list operations.

+  * it contains a list ('next') of nested operations. The

+  * order the same order that the server applied the operation

+  * see https://www.port389.org/docs/389ds/design/content-synchronization-plugin.html#queue-and-pending-list

+  */

+ typedef struct OPERATION_PL_CTX

+ {

+     Operation *op;      /* Pending operation, should not be freed as it belongs to the pblock */

+     pl_flags_t flags;  /* operation is completed (set to TRUE in POST) */

+     Slapi_Entry *entry; /* entry to be store in the enqueued node. 1st arg sync_queue_change */

+     Slapi_Entry *eprev; /* pre-entry to be stored in the enqueued node. 2nd arg sync_queue_change */

+     ber_int_t chgtype;  /* change type to be stored in the enqueued node. 3rd arg of sync_queue_change */

+     struct OPERATION_PL_CTX *next; /* list of nested operation, the head of the list is the primary operation */

+ } OPERATION_PL_CTX_T;

+ 

+ OPERATION_PL_CTX_T * get_thread_primary_op(void);

+ void set_thread_primary_op(OPERATION_PL_CTX_T *op);

+ 

  int sync_register_operation_extension(void);

  int sync_unregister_operation_entension(void);

  
@@ -77,6 +111,7 @@ 

  int sync_mod_persist_post_op(Slapi_PBlock *pb);

  int sync_modrdn_persist_post_op(Slapi_PBlock *pb);

  int sync_add_persist_post_op(Slapi_PBlock *pb);

+ int sync_update_persist_betxn_pre_op(Slapi_PBlock *pb);

  

  int sync_parse_control_value(struct berval *psbvp, ber_int_t *mode, int *reload, char **cookie);

  int sync_create_state_control(Slapi_Entry *e, LDAPControl **ctrlp, int type, Sync_Cookie *cookie);
@@ -183,3 +218,4 @@ 

      Sync_Cookie *cookie; /* cookie to add in control */

      PRThread *tid;       /* thread for persistent phase */

  } SyncOpInfo;

+ 

@@ -14,7 +14,10 @@ 

  static int sync_close(Slapi_PBlock *pb);

  static int sync_preop_init(Slapi_PBlock *pb);

  static int sync_postop_init(Slapi_PBlock *pb);

- static int sync_internal_postop_init(Slapi_PBlock *pb);

+ static int sync_be_postop_init(Slapi_PBlock *pb);

+ static int sync_betxn_preop_init(Slapi_PBlock *pb);

+ 

+ static PRUintn thread_primary_op;

  

  int

  sync_init(Slapi_PBlock *pb)
@@ -80,17 +83,32 @@ 

      }

  

      if (rc == 0) {

-         char *plugin_type = "internalpostoperation";

+         char *plugin_type = "betxnpreoperation";

          /* the config change checking post op */

          if (slapi_register_plugin(plugin_type,

                                    1,                /* Enabled */

                                    "sync_init",      /* this function desc */

-                                   sync_internal_postop_init, /* init func for post op */

-                                   SYNC_INT_POSTOP_DESC, /* plugin desc */

+                                   sync_betxn_preop_init, /* init func for post op */

+                                   SYNC_BETXN_PREOP_DESC, /* plugin desc */

                                    NULL,

                                    plugin_identity)) {

              slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,

-                           "sync_init - Failed to register internal postop plugin\n");

+                           "sync_init - Failed to register be_txn_pre_op plugin\n");

+             rc = 1;

+         }

+     }

+     if (rc == 0) {

+         char *plugin_type = "bepostoperation";

+         /* the config change checking post op */

+         if (slapi_register_plugin(plugin_type,

+                                   1,                /* Enabled */

+                                   "sync_init",      /* this function desc */

+                                   sync_be_postop_init, /* init func for be_post op */

+                                   SYNC_BE_POSTOP_DESC, /* plugin desc */

+                                   NULL,

+                                   plugin_identity)) {

+             slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,

+                           "sync_init - Failed to register be_txn_pre_op plugin\n");

              rc = 1;

          }

      }
@@ -114,25 +132,31 @@ 

  sync_postop_init(Slapi_PBlock *pb)

  {

      int rc;

-     rc = slapi_pblock_set(pb, SLAPI_PLUGIN_POST_ADD_FN, (void *)sync_add_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_DELETE_FN, (void *)sync_del_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_SEARCH_FN, (void *)sync_srch_refresh_post_search);

+     rc = slapi_pblock_set(pb, SLAPI_PLUGIN_POST_SEARCH_FN, (void *)sync_srch_refresh_post_search);

      return (rc);

  }

  

  static int

- sync_internal_postop_init(Slapi_PBlock *pb)

+ sync_be_postop_init(Slapi_PBlock *pb)

  {

      int rc;

-     rc = slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_ADD_FN, (void *)sync_add_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_DELETE_FN, (void *)sync_del_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);

-     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);

+     rc = slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_ADD_FN, (void *)sync_add_persist_post_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_DELETE_FN, (void *)sync_del_persist_post_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);

      return (rc);

  }

  

+ static int

+ sync_betxn_preop_init(Slapi_PBlock *pb)

+ {

+     int rc;

+     rc = slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_ADD_FN, (void *)sync_update_persist_betxn_pre_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_DELETE_FN, (void *)sync_update_persist_betxn_pre_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_MODIFY_FN, (void *)sync_update_persist_betxn_pre_op);

+     rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_MODRDN_FN, (void *)sync_update_persist_betxn_pre_op);

+     return (rc);

+ }

  /*

      sync_start

      --------------
@@ -156,6 +180,12 @@ 

                        "sync_start - Unable to get arguments\n");

          return (-1);

      }

+     /* It registers a per thread 'thread_primary_op' variable that is

+      * a list of pending operations. For simple operation, this list

+      * only contains one operation. For nested, the list contains the operations

+      * in the order that they were applied

+      */

+     PR_NewThreadPrivateIndex(&thread_primary_op, NULL);

      sync_persist_initialize(argc, argv);

  

      return (0);
@@ -174,3 +204,32 @@ 

  

      return (0);

  }

+ 

+ /* Return the head of the operations list

+  * the head is the primary operation.

+  * The list is private to that thread and contains

+  * all nested operations applied by the thread.

+  */

+ OPERATION_PL_CTX_T *

+ get_thread_primary_op(void)

+ {

+     OPERATION_PL_CTX_T *prim_op = NULL;

+     if (thread_primary_op) {

+         prim_op = (OPERATION_PL_CTX_T *)PR_GetThreadPrivate(thread_primary_op);

+     }

+ 

+     return prim_op;

+ }

+ 

+ /* It is set with a non NULL op when this is a primary operation

+  * else it set to NULL when the all pending list has be flushed.

+  * The list is flushed when no more operations (in that list) are

+  * pending (OPERATION_PL_PENDING).

+  */

+ void

+ set_thread_primary_op(OPERATION_PL_CTX_T *op)

+ {

+     if (thread_primary_op) {

+         PR_SetThreadPrivate(thread_primary_op, (void *)op);

+     }

+ } 

\ No newline at end of file

@@ -6,6 +6,9 @@ 

   * See LICENSE for details.

   * END COPYRIGHT BLOCK **/

  

+ #include <nspr4/prlog.h>

+ #include <bits/stdint-intn.h>

+ 

  #include "sync.h"

  

  /* Main list of established persistent synchronizaton searches */
@@ -29,7 +32,7 @@ 

  static int sync_add_request(SyncRequest *req);

  static void sync_remove_request(SyncRequest *req);

  static SyncRequest *sync_request_alloc(void);

- void sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype);

+ void sync_queue_change(OPERATION_PL_CTX_T *operation);

  static void sync_send_results(void *arg);

  static void sync_request_wakeup_all(void);

  static void sync_node_free(SyncQueueNode **node);
@@ -37,17 +40,244 @@ 

  static int sync_acquire_connection(Slapi_Connection *conn);

  static int sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);

  

+ /* This routine appends the operation at the end of the

+  * per thread pending list of nested operation..

+  * being a betxn_preop the pending list has the same order

+  * that the server received the operation

+  */

+ int

+ sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)

+ {

+     OPERATION_PL_CTX_T *prim_op;

+     OPERATION_PL_CTX_T *new_op;

+     Slapi_DN *sdn;

+ 

+     if (!SYNC_IS_INITIALIZED()) {

+         /* not initialized if sync plugin is not started */

+         return 0;

+     }

+ 

+     /* Create a new pending operation node */

+     new_op = (OPERATION_PL_CTX_T *)slapi_ch_calloc(1, sizeof(OPERATION_PL_CTX_T));

+     new_op->flags = OPERATION_PL_PENDING;

+     slapi_pblock_get(pb, SLAPI_OPERATION, &new_op->op);

+     slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);

+ 

+     prim_op = get_thread_primary_op();

+     if (prim_op) {

+         /* It already exists a primary operation, so the current

+          * operation is a nested one that we need to register at the end

+          * of the pending nested operations

+          */

+         OPERATION_PL_CTX_T *current_op;

+         for (current_op = prim_op; current_op->next; current_op = current_op->next);

+         current_op->next = new_op;

+         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - nested operation targets "

+                       "\"%s\" (0x%lx)\n",

+                       slapi_sdn_get_dn(sdn), (ulong) new_op->op);

+     } else {

+         /* The current operation is the first/primary one in the txn

+          * registers it directly in the thread private data (head)

+          */

+         set_thread_primary_op(new_op);

+         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - primary operation targets "

+                       "\"%s\" (0x%lx)\n",

+                       slapi_sdn_get_dn(sdn), (ulong) new_op->op);

+     }

+     return 0;

+ }

+ 

+ /* This operation can not be proceed by sync_repl listener because

+  * of internal problem. For example, POST entry does not exist

+  */

+ static void

+ ignore_op_pl(Operation *op)

+ {

+     OPERATION_PL_CTX_T *prim_op, *curr_op;

+     prim_op = get_thread_primary_op();

+ 

+     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {

+         if ((curr_op->op == op) && 

+             (curr_op->flags == OPERATION_PL_PENDING)) {  /* If by any "chance" a same operation structure was reused in consecutive updates

+                                                          * we can not only rely on 'op' value

+                                                          */

+             slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl operation (0x%lx) from the pending list\n",

+                     (ulong) op);

+             curr_op->flags = OPERATION_PL_IGNORED;

+             return;

+         }

+     }

+     slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl can not retrieve an operation (0x%lx) in pending list\n",

+                     (ulong) op);

+ }

+ 

+ /* This is a generic function that is called by betxn_post of this plugin.

+  * For the given operation (pb->pb_op) it sets in the pending list the state

+  * of the completed operation.

+  * When all operations are completed, if the primary operation is successful it

+  * flushes (enqueue) the operations to the sync repl queue(s), else it just free

+  * the pending list (skipping enqueue). 

+  */

+ static void

+ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t op_tag, char *label)

+ {

+     OPERATION_PL_CTX_T *prim_op = NULL, *curr_op;

+     Operation *pb_op;

+     Slapi_DN *sdn;

+     int32_t rc;

+ 

+     if (!SYNC_IS_INITIALIZED()) {

+         /* not initialized if sync plugin is not started */

+         return;

+     }

+     slapi_pblock_get(pb, SLAPI_OPERATION, &pb_op);

+     slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);

+ 

+     if (NULL == e) {

+         /* Ignore this operation (for example case of failure of the operation) */

+         ignore_op_pl(pb_op);

+         return;

+     }

+     

+     /* Retrieve the result of the operation */

+     if (slapi_op_internal(pb)) {

+         slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);

+         if (0 != rc) {

+             /* The internal operation did not succeed */

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "internal operation Failed (0x%lx) rc=%d\n",

+                        (ulong) pb_op, rc);

+         }

+     } else {

+         slapi_pblock_get(pb, SLAPI_PLUGIN_OPRETURN, &rc);

+         if (0 != rc) {

+             /* The operation did not succeed */

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "direct operation Failed (0x%lx) rc=%d\n",

+                        (ulong) pb_op, rc);

+         }

+     }

+ 

+ 

+     prim_op = get_thread_primary_op();

+     PR_ASSERT(prim_op);

+     /* First mark the operation as completed/failed

+      * the param to be used once the operation will be pushed

+      * on the listeners queue

+      */

+     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {

+         if ((curr_op->op == pb_op) &&

+             (curr_op->flags == OPERATION_PL_PENDING)) {  /* If by any "chance" a same operation structure was reused in consecutive updates

+                                                          * we can not only rely on 'op' value

+                                                          */

+             if (rc == LDAP_SUCCESS) {

+                 curr_op->flags = OPERATION_PL_SUCCEEDED;

+                 curr_op->entry = e ? slapi_entry_dup(e) : NULL;

+                 curr_op->eprev = eprev ? slapi_entry_dup(eprev) : NULL;

+                 curr_op->chgtype = op_tag;

+             } else {

+                 curr_op->flags = OPERATION_PL_FAILED;

+             }

+             break;

+         }

+     }

+     if (!curr_op) {

+         slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "%s - operation not found on the pendling list\n", label);

+         PR_ASSERT(curr_op);

+     }

+     

+ #if DEBUG

+     /* dump the pending queue */

+     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {

+         char *flags_str;

+         char * entry_str;

+ 

+         if (curr_op->entry) {

+             entry_str = slapi_entry_get_dn(curr_op->entry);

+         } else if (curr_op->eprev){

+             entry_str = slapi_entry_get_dn(curr_op->eprev);

+         } else {

+             entry_str = "unknown";

+         }

+         switch (curr_op->flags) {

+             case OPERATION_PL_SUCCEEDED:

+                 flags_str = "succeeded";

+                 break;

+             case OPERATION_PL_FAILED:

+                 flags_str = "failed";

+                 break;

+             case OPERATION_PL_IGNORED:

+                 flags_str = "ignored";

+                 break;

+             case OPERATION_PL_PENDING:

+                 flags_str = "pending";

+                 break;

+             default:

+                 flags_str = "unknown";

+                 break;

+                         

+ 

+         }

+         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "dump pending list(0x%lx) %s %s\n",

+                     (ulong) curr_op->op, entry_str, flags_str);

+     }

+ #endif

+ 

+     /* Second check if it remains a pending operation in the pending list */

+     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {

+         if (curr_op->flags == OPERATION_PL_PENDING) {

+             break;

+         }

+     }

+     if (curr_op) {

+         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "%s - It remains a pending operation (0x%lx)\n", label, (ulong) curr_op->op);

+     } else {

+         OPERATION_PL_CTX_T *next = NULL;

+         PRBool enqueue_it = PR_TRUE;

+         /* all operations on the pending list are completed moved them

+          * to the listeners queue in the same order as pending list.

+          * If the primary operation failed, operation are not moved to

+          * the queue

+          */

+         if (prim_op->flags == OPERATION_PL_FAILED) {

+             /* if primary update failed, the txn is aborted and none of

+              * the operations were applied. Just forget this pending list

+              */

+             enqueue_it = PR_FALSE;

+         }

+         for (curr_op = prim_op; curr_op; curr_op = next) {

+             char *entry;

+             if (curr_op->entry) {

+                 entry = slapi_entry_get_dn(curr_op->entry);

+             } else if (curr_op->eprev){

+                 entry = slapi_entry_get_dn(curr_op->eprev);

+             } else {

+                 entry = "unknown";

+             }

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "Do %s enqueue (0x%lx) %s\n",

+                     enqueue_it ? "" : "not", (ulong) curr_op->op, entry);

+             if (enqueue_it) {

+                 sync_queue_change(curr_op);

+             }

+             

+             /* now free this pending operation */

+             next = curr_op->next;

+             slapi_entry_free(curr_op->entry);

+             slapi_entry_free(curr_op->eprev);

+             slapi_ch_free((void **)&curr_op);

+         }

+         /* we consumed all the pending operation, free the pending list*/

+         set_thread_primary_op(NULL);

+     }

+ }

  int

  sync_add_persist_post_op(Slapi_PBlock *pb)

  {

      Slapi_Entry *e;

- 

      if (!SYNC_IS_INITIALIZED()) {

          return (0);

      }

  

      slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);

-     sync_queue_change(e, NULL, LDAP_REQ_ADD);

+     sync_update_persist_op(pb, e, NULL, LDAP_REQ_ADD, "sync_add_persist_post_op");

  

      return (0);

  }
@@ -62,7 +292,7 @@ 

      }

  

      slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e);

-     sync_queue_change(e, NULL, LDAP_REQ_DELETE);

+     sync_update_persist_op(pb, e, NULL, LDAP_REQ_DELETE, "sync_del_persist_post_op");

  

      return (0);

  }
@@ -78,7 +308,7 @@ 

  

      slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);

      slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);

-     sync_queue_change(e, e_prev, LDAP_REQ_MODIFY);

+     sync_update_persist_op(pb, e, e_prev, LDAP_REQ_MODIFY, "sync_mod_persist_post_op");

  

      return (0);

  }
@@ -94,19 +324,22 @@ 

  

      slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);

      slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);

-     sync_queue_change(e, e_prev, LDAP_REQ_MODRDN);

+     sync_update_persist_op(pb, e, e_prev, LDAP_REQ_MODRDN, "sync_mod_persist_post_op");

  

      return (0);

  }

  

  void

- sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype)

+ sync_queue_change(OPERATION_PL_CTX_T *operation)

  {

      SyncRequest *req = NULL;

      SyncQueueNode *node = NULL;

      int matched = 0;

      int prev_match = 0;

      int cur_match = 0;

+     Slapi_Entry *e = operation->entry;

+     Slapi_Entry *eprev = operation->eprev;

+     ber_int_t chgtype = operation->chgtype;

  

      if (!SYNC_IS_INITIALIZED()) {

          return;
@@ -195,16 +428,14 @@ 

              } else {

                  pOldtail->sync_next = req->ps_eq_tail;

              }

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - entry "

+                                                               "\"%s\" \n",

+                       slapi_entry_get_dn_const(node->sync_entry));

              PR_Unlock(req->req_lock);

          }

      }

- 

-     SYNC_UNLOCK_READ();

- 

      /* Were there any matches? */

      if (matched) {

-         /* Notify update threads */

-         sync_request_wakeup_all();

          slapi_log_err(SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - enqueued entry "

                                                                "\"%s\" on %d request listeners\n",

                        slapi_entry_get_dn_const(e), matched);
@@ -213,6 +444,13 @@ 

                                                                "\"%s\" not enqueued on any request search listeners\n",

                        slapi_entry_get_dn_const(e));

      }

+     SYNC_UNLOCK_READ();

+ 

+     /* Were there any matches? */

+     if (matched) {

+         /* Notify update threads */

+         sync_request_wakeup_all();

+     }

  }

  /*

   * Initialize the list structure which contains the list
@@ -359,11 +597,12 @@ 

  }

  

  /*

-  * Called when stopping/disabling the plugin

+  * Called when stopping/disabling the plugin (like shutdown)

   */

  int

  sync_persist_terminate_all()

  {

+     SyncRequest *req = NULL, *next;

      if (SYNC_IS_INITIALIZED()) {

          /* signal the threads to stop */

          plugin_closing = 1;
@@ -377,6 +616,16 @@ 

          slapi_destroy_rwlock(sync_request_list->sync_req_rwlock);

          PR_DestroyLock(sync_request_list->sync_req_cvarlock);

          PR_DestroyCondVar(sync_request_list->sync_req_cvar);

+ 

+         /* it frees the structures, just in case it remained connected sync_repl client */

+         for (req = sync_request_list->sync_req_head; NULL != req; req = next) {

+             next = req->req_next;

+             slapi_pblock_destroy(req->req_pblock);

+             req->req_pblock = NULL;

+             PR_DestroyLock(req->req_lock);

+             req->req_lock = NULL;

+             slapi_ch_free((void **)&req);

+         }

          slapi_ch_free((void **)&sync_request_list);

      }

  
@@ -545,6 +794,8 @@ 

      int rc;

      PRUint64 connid;

      int opid;

+     char **attrs_dup;

+     char *strFilter;

  

      slapi_pblock_get(req->req_pblock, SLAPI_CONN_ID, &connid);

      slapi_pblock_get(req->req_pblock, SLAPI_OPERATION_ID, &opid);
@@ -589,9 +840,12 @@ 

              Slapi_Entry *ec;

              int chg_type = LDAP_SYNC_NONE;

  

-             /* deque one element */

+             /* dequeue one element */

              PR_Lock(req->req_lock);

              qnode = req->ps_eq_head;

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - dequeue  "

+                           "\"%s\" \n",

+                           slapi_entry_get_dn_const(qnode->sync_entry));

              req->ps_eq_head = qnode->sync_next;

              if (NULL == req->ps_eq_head) {

                  req->ps_eq_tail = NULL;
@@ -665,10 +919,22 @@ 

      sync_release_connection(req->req_pblock, conn, op, conn_acq_flag == 0);

  

  done:

+     /* This client closed the connection or shutdown, free the req */

      sync_remove_request(req);

      PR_DestroyLock(req->req_lock);

      req->req_lock = NULL;

-     slapi_ch_free((void **)&req->req_pblock);

+ 

+     slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_ATTRS, &attrs_dup);

+     slapi_ch_array_free(attrs_dup);

+     slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_ATTRS, NULL);

+ 

+     slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_STRFILTER, &strFilter);

+     slapi_ch_free((void **)&strFilter);

+     slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_STRFILTER, NULL);

+ 

+     slapi_pblock_destroy(req->req_pblock);

+     req->req_pblock = NULL;

+ 

      slapi_ch_free((void **)&req->req_orig_base);

      slapi_filter_free(req->req_filter, 1);

      sync_cookie_free(&req->req_cookie);

@@ -7,6 +7,7 @@ 

   * END COPYRIGHT BLOCK **/

  

  #include "sync.h"

+ #include "slap.h"  /* for LDAP_TAG_SK_REVERSE */

  

  static struct berval *create_syncinfo_value(int type, const char *cookie, const char **uuids);

  static char *sync_cookie_get_server_info(Slapi_PBlock *pb);
@@ -444,6 +445,37 @@ 

      clientinfo = slapi_ch_smprintf("%s:%s:%s", clientdn, targetdn, strfilter);

      return (clientinfo);

  }

+ 

+ /* This is used with internal search that reverse the order

+  * of returned entries. So to get

+  */

+ static LDAPControl *

+ sync_build_sort_control(const char *attr)

+ {

+     LDAPControl *ctrl;

+     BerElement *ber;

+     int rc;

+ 

+     ber = ber_alloc();

+     if (NULL == ber)

+         return NULL;

+ 

+     rc = ber_printf(ber, "{{stb}}", attr, LDAP_TAG_SK_REVERSE, 1);

+     if (-1 == rc) {

+         ber_free(ber, 1);

+         return NULL;

+     }

+ 

+     rc = slapi_build_control(LDAP_CONTROL_SORTREQUEST, ber, 1, &ctrl);

+ 

+     ber_free(ber, 1);

+ 

+     if (LDAP_SUCCESS != rc)

+         return NULL;

+ 

+     return ctrl;

+ }

+ 

  static unsigned long

  sync_cookie_get_change_number(int lastnr, const char *uniqueid)

  {
@@ -452,11 +484,15 @@ 

      Slapi_Entry *cl_entry;

      int rv;

      unsigned long newnr = SYNC_INVALID_CHANGENUM;

+     LDAPControl **ctrls = NULL;

+     

+     ctrls = (LDAPControl **)slapi_ch_calloc(2, sizeof(LDAPControl *));

      char *filter = slapi_ch_smprintf("(&(changenumber>=%d)(targetuniqueid=%s))", lastnr, uniqueid);

+     ctrls[0] = sync_build_sort_control("changenumber");

  

      srch_pb = slapi_pblock_new();

      slapi_search_internal_set_pb(srch_pb, CL_SRCH_BASE, LDAP_SCOPE_SUBTREE, filter,

-                                  NULL, 0, NULL, NULL, plugin_get_default_component_id(), 0);

+                                  NULL, 0, ctrls, NULL, plugin_get_default_component_id(), 0);

      slapi_search_internal_pb(srch_pb);

      slapi_pblock_get(srch_pb, SLAPI_PLUGIN_INTOP_RESULT, &rv);

      if (rv == LDAP_SUCCESS) {
@@ -469,6 +505,24 @@ 

              slapi_attr_first_value(attr, &val);

              newnr = sync_number2ulong((char *)slapi_value_get_string(val));

          }

+ #if DEBUG

+         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_cookie_get_change_number looks for \"%s\"\n",

+                     filter);

+         for (size_t i = 0; entries[i]; i++) {

+             Slapi_Attr *attr;

+             Slapi_Value *val;

+             char *entrydn;

+             unsigned long nr;

+             slapi_entry_attr_find(entries[i], CL_ATTR_ENTRYDN, &attr);

+             slapi_attr_first_value(attr, &val);

+             entrydn = (char *)slapi_value_get_string(val);

+             slapi_entry_attr_find(entries[i], CL_ATTR_CHANGENUMBER, &attr);

+             slapi_attr_first_value(attr, &val);

+             nr = sync_number2ulong((char *)slapi_value_get_string(val));

+             slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_cookie_get_change_number after %d: %d %s\n",

+                     lastnr, (int) nr, entrydn);

+         }

+ #endif

      }

  

      slapi_free_search_results_internal(srch_pb);

Bug description:
Sync repl contains post op callbacks that queue updates
to be sent to a sync_repl client.
When an update generates nested updates (automemeber,
memberof,...) the order of the updates in the queue is
not following the order of applied updates. The consequence
is that the cookie (containing the update nubmer) can be wrong.
It can contains jumps, disorder and invalid number (-1).

Fix description:
The fix implements a pending list of updates (in the thread
private space). When all pending lists updates are committed
(sync repl post callback), then the updates are moved to the queue.

https://pagure.io/389-ds-base/issue/51190

Reviewed by: ?

Platforms tested: F31

So you update the queue in pre-op, but what if the operation fails in post-op? Is the "invalid" update still sent to the client?

Looks like debugging leftovers... Also, it makes sense to set a breakpoint like this: import pdb; pdb.set_trace(). So if you import and set_trace in the same place, it would be easier to clean up afterward.

I don't fully understand how you use the new connection...
inst.clone() returns a new connection.
So now you have self.conn - one connection, and self.inst - another connection.

I can't understand how you use this line... pytest, as far as I know, don't use it. And I don't see if you call the test somewhere outside of the pytest...

Probably, you plan to add it later but I'll leave a comment here as a reminder to myself to check it when the time comes. :)

I see a couple of compile warnings also:

./config.h:367: warning: "LINUX" redefined
  367 | #define LINUX 1
      |
In file included from /usr/include/nspr4/prtypes.h:26,
             from /usr/include/nspr4/prlog.h:9,
             from ldap/servers/plugins/sync/sync_persist.c:9:
/usr/include/nspr4/prcpucfg.h:19: note: this is the location of the previous definition
   19 | #define LINUX
      |
ldap/servers/plugins/sync/sync_persist.c: In function ‘sync_update_persist_betxn_pre_op’:
ldap/servers/plugins/sync/sync_persist.c:51:9: warning: ‘return’ with no value, in function returning non-void
   51 |         return;
      |         ^~~~~~
ldap/servers/plugins/sync/sync_persist.c:43:1: note: declared here
   43 | sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)
      | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ldap/servers/plugins/sync/sync_persist.c: In function ‘sync_update_persist_op’:
ldap/servers/plugins/sync/sync_persist.c:91:9: warning: ‘return’ with no value, in function returning non-void
   91 |         return;
      |         ^~~~~~
ldap/servers/plugins/sync/sync_persist.c:84:1: note: declared here
   84 | sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t op_tag, char *label)
      | ^~~~~~~~~~~~~~~~~~~~~~
ldap/servers/plugins/sync/sync_persist.c:95:9: warning: ‘return’ with no value, in function returning non-void
   95 |         return;
      |         ^~~~~~
ldap/servers/plugins/sync/sync_persist.c:84:1: note: declared here
   84 | sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t op_tag, char *label)

I forgot to mention that the indentation is wrong for most the logging functions (slapi_log_err) :-)

@mreynolds , @spichugi thanks for your reviews. The question about internal updates failures is a difficult one and the fix is incomplete.
There is a principle that on successful internal operation, pblock.SLAPI_ENTRY_POST_OP is set. It looks it is enforced. But I am not sure of the opposite (pblock.SLAPI_ENTRY_POST_OP set means the operation was successful). I think a safety approach would be to test in the sync_repl POST OP that SLAPI_PLUGIN_OPRETURN and SLAPI_RESULT_CODE are successful.
In addition in case of failure, the cleanup of the pending operation is not done and I should revisit this part and... write an additional testcase :(

rebased onto 4d163ca

3 years ago

A long time after here is the revisited PR: adding pending list for nested updates, handling of failures and preventing concurrent updates disorder

Next is to update the design to reflect those changes

More indentation issues around of the logging functions (not just here but in a lot of other places)

Is this ready to be reviewed yet? I just noticed a lot of "#if 0" code blocks, and some very minor indentation issues :-)

It would also be nice to to remove "unsigned long" and use uint32_t, but not necessary. And don't forget all the logging function indentation issues! :-)

Besides that, if this patch passes ASAN testing and there no compiler warnings then you get my ack!

rebased onto cbb5e61

3 years ago

rebased onto ec2a95e

3 years ago

@mreynolds, I finally update the design (https://www.port389.org/docs/389ds/design/content-synchronization-plugin.html#queue-and-pending-list) and updated (indentation, remaining dead code, leaks and compiler warning). Please have a look

@tbordaz I'm going to read your design shortly. It may be worth your time to also review #51260 given that I have just found a potential issues where ther cookie and changelog being sent could accidently corrupt client state, so perhaps this change may impact your upcoming change with the queue.

There is a suites/sync repl section, this test should be there rather than plugin/acceptance I think.

Just to be clear, this is to remove a memory leak in shutdown yes?

Could this be an enum instead of defines?

I think most of this is pretty minor, but most of the patch looks pretty reasonable. Could you comment a bit in the code about the design and how the thread private op works a bit more to help future developers understand your thoughts and the interactions that are occuring here? Thanks!

rebased onto bfb38bc

3 years ago

Thanks @firstyear for the review. I updated the PR (moving test, adding comments and doing cleanup). The sleep in the sync_repl thread was a left over of some tests to reproduce invalid order !! thanks for having spot this :)

These seem like left overs that you don't need to add here :)

Okay, beside the last python comment I can see what this does and it looks reasonable to me. Thanks @tbordaz :)

I think reading it it won't affect/impact #51260 since they are seperate issues (but we may find also that #51260 is contributing to this issue too in a subtle way .... )

So I think ack from me once the last two minor comments are addressed.

If I remove any of those 3 lines, the tests are failing :(

Oh in that case, ignore that about the 3 lines in the acceptance tests. If you can add the brace in sync_init.c I think this is acked you can then merge :)

rebased onto f9638bb

3 years ago

Pull-Request has been merged by tbordaz

3 years ago

389-ds-base is moving from Pagure to Github. This means that new issues and pull requests
will be accepted only in 389-ds-base's github repository.

This pull request has been cloned to Github as issue and is available here:
- https://github.com/389ds/389-ds-base/issues/4267

If you want to continue to work on the PR, please navigate to the github issue,
download the patch from the attachments and file a new pull request.

Thank you for understanding. We apologize for all inconvenience.

Pull-Request has been closed by spichugi

3 years ago