From 6343e4cba17802e19daa5c971120fa352ff80ad4 Mon Sep 17 00:00:00 2001 From: Thierry Bordaz Date: Sep 18 2015 10:13:08 +0000 Subject: Ticket 48266: Fractional replication evaluates several times the same CSN Bug Description: In fractional replication if there are only skipped updates and many of them, the supplier acquire the replica for a long time. At the end of the session, RUV is not updated so the next session will restart evaluating the same skipped updates Fix Description: The fix introduces subentries under the suffix: 'cn=repl keep alive ,$SUFFIX' During an incremental replication session, if the session only contains skipped updates and the number of them overpass a threshold (100), it triggers an update on that subentry. This update will eventually be replicated, moving forward the RUV https://fedorahosted.org/389/ticket/48266 Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin Platforms tested: Flag Day: no Doc impact: no --- diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index 0b0f26b..17282bb 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -523,6 +523,8 @@ Replica *windows_replica_new(const Slapi_DN *root); during addition of the replica over LDAP */ Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation); void replica_destroy(void **arg); +int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid); +int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid); PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid, const char *locking_purl, char **current_purl); diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c index 216de3c..e0599e5 100644 --- a/ldap/servers/plugins/replication/repl5_inc_protocol.c +++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c @@ -1672,6 +1672,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu int finished = 0; ConnResult replay_crc; char csn_str[CSN_STRSIZE]; + PRBool subentry_update_sent = PR_FALSE; + PRBool subentry_update_needed = PR_FALSE; + int skipped_updates = 0; + int fractional_repl; +#define FRACTIONAL_SKIPPED_THRESHOLD 100 /* Start the results reading thread */ rd = repl5_inc_rd_new(prp); @@ -1688,6 +1693,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu memset ( (void*)&op, 0, sizeof (op) ); entry.op = &op; + fractional_repl = agmt_is_fractional(prp->agmt); do { cl5_operation_parameters_done ( entry.op ); memset ( (void*)entry.op, 0, sizeof (op) ); @@ -1783,6 +1789,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu csn_as_string(entry.op->csn, PR_FALSE, csn_str); replica_id = csn_get_replicaid(entry.op->csn); uniqueid = entry.op->target_address.uniqueid; + + if (fractional_repl && message_id) + { + /* This update was sent no need to update the subentry + * and restart counting the skipped updates + */ + subentry_update_needed = PR_FALSE; + skipped_updates = 0; + } if (prp->repl50consumer && message_id) { @@ -1813,6 +1828,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu agmt_get_long_name(prp->agmt), entry.op->target_address.uniqueid, csn_str); agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/); + if (fractional_repl) + { + skipped_updates++; + if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n", + agmt_get_long_name(prp->agmt), skipped_updates); + subentry_update_needed = PR_TRUE; + } + } } } break; @@ -1878,6 +1903,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu PR_Unlock(rd->lock); } while (!finished); + if (fractional_repl && subentry_update_needed) + { + Replica *replica; + ReplicaId rid = -1; /* Used to create the replica keep alive subentry */ + replica = (Replica*) object_get_data(prp->replica_object); + if (replica) + { + rid = replica_get_rid(replica); + } + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: skipped updates was definitely too high (%d) update the subentry now\n", + agmt_get_long_name(prp->agmt), skipped_updates); + replica_subentry_update(agmt_get_replarea(prp->agmt), rid); + } /* Terminate the results reading thread */ if (!prp->repl50consumer) { diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c index 92b4e96..6ac28c1 100644 --- a/ldap/servers/plugins/replication/repl5_replica.c +++ b/ldap/servers/plugins/replication/repl5_replica.c @@ -414,6 +414,161 @@ replica_destroy(void **arg) slapi_ch_free((void **)arg); } +#define KEEP_ALIVE_ATTR "keepalivetimestamp" +#define KEEP_ALIVE_ENTRY "repl keep alive" +#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s" + + +static int +replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) +{ + char *entry_string = NULL; + Slapi_Entry *e = NULL; + Slapi_PBlock *pb = NULL; + int return_value; + int rc = 0; + + entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d", + KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid); + if (entry_string == NULL) { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "replica_subentry_create add failed in slapi_ch_smprintf\n"); + rc = -1; + goto done; + } + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string); + e = slapi_str2entry(entry_string, 0); + + /* create the entry */ + pb = slapi_pblock_new(); + + + slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */ + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */); + slapi_add_internal_pb(pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value); + if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to " + "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e), + ldap_err2string(return_value)); + rc = -1; + slapi_entry_free(e); /* The entry was not consumed */ + goto done; + } + +done: + + slapi_pblock_destroy(pb); + slapi_ch_free_string(&entry_string); + return rc; + +} + +int +replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid) +{ + Slapi_PBlock *pb; + char *filter = NULL; + Slapi_Entry **entries = NULL; + int res; + int rc = 0; + + pb = slapi_pblock_new(); + filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid); + slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL, + filter, NULL, 0, NULL, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_search_internal_pb(pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res); + if (res == LDAP_SUCCESS) + { + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries); + if (entries && (entries[0] == NULL)) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Need to create replication keep alive entry \n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); + rc = replica_subentry_create(repl_root, rid); + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "replication keep alive entry already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); + rc = 0; + } + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Error accessing replication keep alive entry res=%d\n", + KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res); + /* The status of the entry is not clear, do not attempt to create it */ + rc = 1; + } + slapi_free_search_results_internal(pb); + + slapi_pblock_destroy(pb); + slapi_ch_free_string(&filter); + return rc; +} + +int +replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) +{ + int ldrc; + int rc = LDAP_SUCCESS; /* Optimistic default */ + LDAPMod * mods[2]; + LDAPMod mod; + struct berval * vals[2]; + char buf[20]; + time_t curtime; + struct tm ltm; + struct berval val; + Slapi_PBlock *modpb = NULL; + char *dn; + + replica_subentry_check(repl_root, rid); + curtime = current_time(); + gmtime_r(&curtime, <m); + strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", <m); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf); + + + val.bv_val = buf; + val.bv_len = strlen(val.bv_val); + + vals [0] = &val; + vals [1] = NULL; + + mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES; + mod.mod_type = KEEP_ALIVE_ATTR; + mod.mod_bvalues = vals; + + mods[0] = &mod; + mods[1] = NULL; + + modpb = slapi_pblock_new(); + dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); + + slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_modify_internal_pb(modpb); + + slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc); + + if (ldrc != LDAP_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf); + rc = ldrc; + } else { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name, + "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf); + } + + slapi_pblock_destroy(modpb); + slapi_ch_free_string(&dn); + return rc; + +} /* * Attempt to obtain exclusive access to replica (advisory only) * @@ -3816,6 +3971,7 @@ replica_enable_replication (Replica *r) /* What to do ? */ } + replica_subentry_check(r->repl_root, replica_get_rid(r)); /* Replica came back online, Check if the total update was terminated. If flag is still set, it was not terminated, therefore the data is very likely to be incorrect, and we should not restart Replication threads... diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c index da73ac4..9059efe 100644 --- a/ldap/servers/plugins/replication/repl5_tot_protocol.c +++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c @@ -320,7 +320,9 @@ repl5_tot_run(Private_Repl_Protocol *prp) int portnum = 0; Slapi_DN *area_sdn = NULL; CSN *remote_schema_csn = NULL; - int init_retry = 0; + int init_retry = 0; + Replica *replica; + ReplicaId rid = 0; /* Used to create the replica keep alive subentry */ PR_ASSERT(NULL != prp); @@ -413,7 +415,15 @@ retry: ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *)); ctrls[0] = create_managedsait_control (); ctrls[1] = create_backend_control(area_sdn); - + + /* Time to make sure it exists a keep alive subentry for that replica */ + replica = (Replica*) object_get_data(prp->replica_object); + if (replica) + { + rid = replica_get_rid(replica); + } + replica_subentry_check(area_sdn, rid); + slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);