From d08f7eb688102cd54bbacd009d162f0cc16cd5fe Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: Apr 05 2019 15:13:36 +0000 Subject: Ticket 50305 - Revise CleanAllRUV task restart process Bug Description: If the server was stopped while a CleanAllRUV task was running the task gets marked in the replica config entry so it knowns to resume the task at server startup. The problem is that when it resumed it just fires off the task thread, and did not create a new Slapi_Task entry. This makes it impossible to track these tasks that got resumed. Fix Description: There were a few things wrong with the resume process, including it was harded coded to only handle a maximum of 4 tasks. We also were not recording all the required information needed to resume the task. Now "resume" process can handle an infinite number of tasks, and it creates fresh Slapi_Task entries so the tasks can be tracked. CI tested & ASAN approved https://pagure.io/389-ds-base/issue/50305 Reviewed by: lkrispenz(Thanks!) --- diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_test.py index 74eae91..2b244da 100644 --- a/dirsrvtests/tests/suites/replication/cleanallruv_test.py +++ b/dirsrvtests/tests/suites/replication/cleanallruv_test.py @@ -104,7 +104,11 @@ def task_done(topology_m4, task_dn, timeout=60): while not done and count < timeout: try: entry = topology_m4.ms["master1"].getEntry(task_dn, attrlist=attrlist) - if not entry or entry.nsTaskExitCode: + if entry is not None: + if entry.hasAttr('nsTaskExitCode'): + done = True + break + else: done = True break except ldap.NO_SUCH_OBJECT: @@ -142,7 +146,7 @@ def restore_master4(topology_m4): @pytest.fixture() def m4rid(request, topology_m4): - log.debug("Wait a bit before the reset - it is required fot the slow machines") + log.debug("Wait a bit before the reset - it is required for the slow machines") time.sleep(5) log.debug("-------------- BEGIN RESET of m4 -----------------") repl = ReplicationManager(DEFAULT_SUFFIX) @@ -468,6 +472,9 @@ def test_abort_restart(topology_m4, m4rid): # Start master 3 topology_m4.ms["master3"].start() + # Need to wait 5 seconds before server processes any leftover tasks + time.sleep(6) + # Check master 1 tried to run abort task. We expect the abort task to be aborted. if not topology_m4.ms["master1"].searchErrorsLog('Aborting abort task'): log.fatal('test_abort_restart: Abort task did not restart') diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index d271b87..138578d 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -728,7 +728,7 @@ void replica_update_ruv_consumer(Replica *r, RUV *supplier_ruv); Slapi_Entry *get_in_memory_ruv(Slapi_DN *suffix_sdn); int replica_write_ruv(Replica *r); char *replica_get_dn(Replica *r); -void replica_check_for_tasks(Replica *r, Slapi_Entry *e); +void replica_check_for_tasks(time_t when, void *arg); void replica_update_state(time_t when, void *arg); void replica_reset_csn_pl(Replica *r); uint64_t replica_get_protocol_timeout(Replica *r); @@ -813,7 +813,7 @@ int replica_config_init(void); void replica_config_destroy(void); int get_replica_type(Replica *r); int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid); -void add_cleaned_rid(cleanruv_data *data, char *maxcsn); +void add_cleaned_rid(cleanruv_data *data); int is_cleaned_rid(ReplicaId rid); int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg); void replica_cleanallruv_thread_ext(void *arg); @@ -825,9 +825,9 @@ int process_repl_agmts(Replica *replica, int *agmt_info, char *oid, Slapi_Task * int decode_cleanruv_payload(struct berval *extop_value, char **payload); struct berval *create_cleanruv_payload(char *value); void ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids); -void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root); +void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task); int is_task_aborted(ReplicaId rid); -void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, int skip); +void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip); int is_pre_cleaned_rid(ReplicaId rid); void set_cleaned_rid(ReplicaId rid); void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...); diff --git a/ldap/servers/plugins/replication/repl5_mtnode_ext.c b/ldap/servers/plugins/replication/repl5_mtnode_ext.c index 466943d..745bd51 100644 --- a/ldap/servers/plugins/replication/repl5_mtnode_ext.c +++ b/ldap/servers/plugins/replication/repl5_mtnode_ext.c @@ -101,6 +101,8 @@ multimaster_mtnode_construct_replicas() ext->replica = NULL; } } + /* Wait a few seconds for everything to startup before resuming any replication tasks */ + slapi_eq_once(replica_check_for_tasks, (void *)replica_get_root(r), time(NULL) + 5); } } } diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c index 79c79c2..b3d6198 100644 --- a/ldap/servers/plugins/replication/repl5_replica.c +++ b/ldap/servers/plugins/replication/repl5_replica.c @@ -236,8 +236,6 @@ replica_new_from_entry(Slapi_Entry *e, char *errortext, PRBool is_add_operation) 1000 * r->tombstone_reap_interval); } - replica_check_for_tasks(r, e); - done: if (rc != 0 && r) { replica_destroy((void **)&r); @@ -2101,12 +2099,55 @@ _replica_init_from_config(Replica *r, Slapi_Entry *e, char *errortext) return (_replica_check_validity(r)); } +static void +replica_delete_task_config(Slapi_Entry *e, char *attr, char *value) +{ + Slapi_PBlock *modpb; + struct berval *vals[2]; + struct berval val[1]; + LDAPMod *mods[2]; + LDAPMod mod; + int32_t rc = 0; + + val[0].bv_len = strlen(value); + val[0].bv_val = value; + vals[0] = &val[0]; + vals[1] = NULL; + + mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES; + mod.mod_type = attr; + mod.mod_bvalues = vals; + mods[0] = &mod; + mods[1] = NULL; + + modpb = slapi_pblock_new(); + slapi_modify_internal_set_pb(modpb, slapi_entry_get_dn(e), mods, NULL, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_modify_internal_pb(modpb); + slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + slapi_pblock_destroy(modpb); + + if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, + "delete_cleaned_rid_config - Failed to remove task data from (%s) error (%d)\n", + slapi_entry_get_dn(e), rc); + } +} + void -replica_check_for_tasks(Replica *r, Slapi_Entry *e) +replica_check_for_tasks(time_t when __attribute__((unused)), void *arg) { + const Slapi_DN *repl_root = (Slapi_DN *)arg; + Slapi_Entry *e = NULL; + Replica *r = NULL; + Object *repl_obj = NULL;; char **clean_vals; - if (e == NULL || ldif_dump_is_running() == PR_TRUE) { + e = _replica_get_config_entry(repl_root, NULL); + repl_obj = replica_get_replica_from_dn(repl_root); + r = (Replica *)object_get_data(repl_obj); + + if (e == NULL || r == NULL || ldif_dump_is_running() == PR_TRUE) { /* If db2ldif is being run, do not check if there are incomplete tasks */ return; } @@ -2115,218 +2156,218 @@ replica_check_for_tasks(Replica *r, Slapi_Entry *e) * if so set the cleaned rid, and fire off the thread */ if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaCleanRUV)) != NULL) { - PRThread *thread = NULL; - struct berval *payload = NULL; - CSN *maxcsn = NULL; - ReplicaId rid; - char csnstr[CSN_STRSIZE]; - char *token = NULL; - char *forcing; - PRBool original_task; - char *csnpart; - char *ridstr; - char *iter = NULL; - int i; - - for (i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++) { - cleanruv_data *data = NULL; + for (size_t i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++) { + struct timespec ts = slapi_current_rel_time_hr(); + PRBool original_task = PR_TRUE; + Slapi_Entry *task_entry = NULL; + Slapi_PBlock *add_pb = NULL; + int32_t result = 0; + ReplicaId rid; + char *token = NULL; + char *forcing; + char *iter = NULL; + char *repl_root = NULL; + char *ridstr = NULL; + char *rdn = NULL; + char *dn = NULL; + char *orig_val = slapi_ch_strdup(clean_vals[i]); /* - * Set the cleanruv data, and add the cleaned rid + * Get all the needed from */ token = ldap_utf8strtok_r(clean_vals[i], ":", &iter); if (token) { rid = atoi(token); if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Invalid replica id(%d) " - "aborting task.\n", - rid); + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - " + "Invalid replica id(%d) aborting task. Aborting cleaning task!\n", rid); + replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val); goto done; } } else { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Unable to parse cleanallruv " - "data (%s), aborting task.\n", - clean_vals[i]); + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - " + "Unable to parse cleanallruv data (%s), missing rid, aborting task. Aborting cleaning task!\n", + clean_vals[i]); + replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val); goto done; } - csnpart = ldap_utf8strtok_r(iter, ":", &iter); - maxcsn = csn_new(); - csn_init_by_string(maxcsn, csnpart); - csn_as_string(maxcsn, PR_FALSE, csnstr); + + /* Get forcing */ forcing = ldap_utf8strtok_r(iter, ":", &iter); - original_task = PR_TRUE; - if (forcing == NULL) { - forcing = "no"; - } else if (!strcasecmp(forcing, "yes") || !strcasecmp(forcing, "no")) { - /* forcing was correctly set, lets try to read the original task flag */ - token = ldap_utf8strtok_r(iter, ":", &iter); - if (token && !atoi(token)) { - original_task = PR_FALSE; - } + if (forcing == NULL || strlen(forcing) > 3) { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - " + "Unable to parse cleanallruv data (%s), missing/invalid force option (%s). Aborting cleaning task!\n", + clean_vals[i], forcing ? forcing : "missing force option"); + replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val); + goto done; } - slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "CleanAllRUV Task - cleanAllRUV task found, " - "resuming the cleaning of rid(%d)...\n", - rid); - /* - * Create payload - */ - ridstr = slapi_ch_smprintf("%d:%s:%s:%s", rid, slapi_sdn_get_dn(replica_get_root(r)), csnstr, forcing); - payload = create_cleanruv_payload(ridstr); - slapi_ch_free_string(&ridstr); + /* Get original task flag */ + token = ldap_utf8strtok_r(iter, ":", &iter); + if (token) { + if (!atoi(token)) { + original_task = PR_FALSE; + } + } else { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - " + "Unable to parse cleanallruv data (%s), missing original task flag. Aborting cleaning task!\n", + clean_vals[i]); + replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val); + goto done; + } - if (payload == NULL) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Startup: Failed to " - "create extended op payload, aborting task"); - csn_free(&maxcsn); + /* Get repl root */ + token = ldap_utf8strtok_r(iter, ":", &iter); + if (token) { + repl_root = token; + } else { + /* no repl root, have to void task */ + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - " + "Unable to parse cleanallruv data (%s), missing replication root aborting task. Aborting cleaning task!\n", + clean_vals[i]); + replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val); goto done; } + /* - * Setup the data struct, and fire off the thread. + * We have all our data, now add the task.... */ - data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data)); - if (data == NULL) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Failed to allocate cleanruv_data.\n"); - csn_free(&maxcsn); - } else { - /* setup our data */ - data->repl_obj = NULL; - data->replica = NULL; - data->rid = rid; - data->task = NULL; - data->maxcsn = maxcsn; - data->payload = payload; - data->sdn = slapi_sdn_dup(r->repl_root); - data->force = slapi_ch_strdup(forcing); - data->repl_root = NULL; - - /* This is a corner case, a cleanAllRuv task was interrupted by a shutdown or a crash - * We retrieved from type_replicaCleanRUV if the cleanAllRuv request - * was received from a direct task ADD or if was received via - * the cleanAllRuv extop. - */ - data->original_task = original_task; - - thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread_ext, - (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, - PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); - if (thread == NULL) { - /* log an error and free everything */ - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Unable to create cleanAllRUV " - "thread for rid(%d)\n", - (int)data->rid); - csn_free(&maxcsn); - slapi_sdn_free(&data->sdn); - ber_bvfree(data->payload); - slapi_ch_free_string(&data->force); - slapi_ch_free((void **)&data); - } + slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "CleanAllRUV Task - " + "CleanAllRUV task found, resuming the cleaning of rid(%d)...\n", rid); + + add_pb = slapi_pblock_new(); + task_entry = slapi_entry_alloc(); + rdn = slapi_ch_smprintf("restarted-%ld", ts.tv_sec); + dn = slapi_create_dn_string("cn=%s,cn=cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec); + slapi_entry_init(task_entry, dn, NULL); + + ridstr = slapi_ch_smprintf("%d", rid); + slapi_entry_add_string(task_entry, "objectclass", "top"); + slapi_entry_add_string(task_entry, "objectclass", "extensibleObject"); + slapi_entry_add_string(task_entry, "cn", rdn); + slapi_entry_add_string(task_entry, "replica-base-dn", repl_root); + slapi_entry_add_string(task_entry, "replica-id", ridstr); + slapi_entry_add_string(task_entry, "replica-force-cleaning", forcing); + slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0"); + + slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_add_internal_pb(add_pb); + slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result); + slapi_pblock_destroy(add_pb); + if (result != LDAP_SUCCESS) { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, + "replica_check_for_tasks - failed to add cleanallruv task entry: %s\n", + ldap_err2string(result)); } - } - done: + done: + slapi_ch_free_string(&orig_val); + slapi_ch_free_string(&ridstr); + slapi_ch_free_string(&rdn); + } slapi_ch_array_free(clean_vals); } if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaAbortCleanRUV)) != NULL) { - PRThread *thread = NULL; - struct berval *payload; - ReplicaId rid; - char *certify = NULL; - char *ridstr = NULL; - char *token = NULL; - char *repl_root; - char *iter = NULL; - int i; - - for (i = 0; clean_vals[i]; i++) { - cleanruv_data *data = NULL; + for (size_t i = 0; clean_vals[i]; i++) { + struct timespec ts = slapi_current_rel_time_hr(); + PRBool original_task = PR_TRUE; + Slapi_Entry *task_entry = NULL; + Slapi_PBlock *add_pb = NULL; + ReplicaId rid; + char *certify = NULL; + char *ridstr = NULL; + char *token = NULL; + char *repl_root; + char *iter = NULL; + char *rdn = NULL; + char *dn = NULL; + char *orig_val = slapi_ch_strdup(clean_vals[i]); + int32_t result = 0; token = ldap_utf8strtok_r(clean_vals[i], ":", &iter); if (token) { rid = atoi(token); if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Invalid replica id(%d) " - "aborting abort task.\n", - rid); + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - " + "Invalid replica id(%d) aborting abort task.\n", rid); + replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val); goto done2; } } else { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Unable to parse cleanallruv " - "data (%s), aborting abort task.\n", - clean_vals[i]); + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - " + "Unable to parse cleanallruv data (%s), aborting abort task.\n", clean_vals[i]); + replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val); goto done2; } repl_root = ldap_utf8strtok_r(iter, ":", &iter); certify = ldap_utf8strtok_r(iter, ":", &iter); + /* Get original task flag */ + token = ldap_utf8strtok_r(iter, ":", &iter); + if (token) { + if (!atoi(token)) { + original_task = PR_FALSE; + } + } else { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, + "Abort CleanAllRUV Task - Unable to parse cleanallruv data (%s), " + "missing original task flag. Aborting abort task!\n", + clean_vals[i]); + replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val); + goto done; + } + if (!is_cleaned_rid(rid)) { - slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - Replica id(%d) is not " - "being cleaned, nothing to abort. Aborting abort task.\n", - rid); - delete_aborted_rid(r, rid, repl_root, 0); + slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - " + "Replica id(%d) is not being cleaned, nothing to abort. Aborting abort task.\n", rid); + replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val); goto done2; } - add_aborted_rid(rid, r, repl_root); + add_aborted_rid(rid, r, repl_root, certify, original_task); stop_ruv_cleaning(); - slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - Abort task found, " - "resuming abort of rid(%d).\n", - rid); - /* - * Setup the data struct, and fire off the abort thread. - */ - data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data)); - if (data == NULL) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Failed to allocate cleanruv_data.\n"); - } else { - ridstr = slapi_ch_smprintf("%d:%s:%s", rid, repl_root, certify); - payload = create_cleanruv_payload(ridstr); - slapi_ch_free_string(&ridstr); - - if (payload == NULL) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Failed to create extended " - "op payload\n"); - slapi_ch_free((void **)&data); - } else { - /* setup the data */ - data->repl_obj = NULL; - data->replica = NULL; - data->rid = rid; - data->task = NULL; - data->payload = payload; - data->repl_root = slapi_ch_strdup(repl_root); - data->sdn = slapi_sdn_dup(r->repl_root); - data->certify = slapi_ch_strdup(certify); - - /* This is a corner case, a cleanAllRuv task was interrupted by a shutdown or a crash - * Let's assum this replica was the original receiver of the task. - * This flag has no impact on Abort cleanAllRuv - */ - data->original_task = PR_TRUE; - - thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread, - (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, - PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); - if (thread == NULL) { - slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Unable to create abort cleanAllRUV " - "thread for rid(%d)\n", - (int)data->rid); - slapi_sdn_free(&data->sdn); - ber_bvfree(data->payload); - slapi_ch_free_string(&data->repl_root); - slapi_ch_free_string(&data->certify); - slapi_ch_free((void **)&data); - } - } + slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - " + "Abort task found, resuming abort of rid(%d).\n", rid); + + add_pb = slapi_pblock_new(); + task_entry = slapi_entry_alloc(); + rdn = slapi_ch_smprintf("restarted-abort-%ld", ts.tv_sec); + dn = slapi_create_dn_string("cn=%s,cn=abort cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec); + slapi_entry_init(task_entry, dn, NULL); + + ridstr = slapi_ch_smprintf("%d", rid); + slapi_entry_add_string(task_entry, "objectclass", "top"); + slapi_entry_add_string(task_entry, "objectclass", "extensibleObject"); + slapi_entry_add_string(task_entry, "cn", rdn); + slapi_entry_add_string(task_entry, "replica-base-dn", repl_root); + slapi_entry_add_string(task_entry, "replica-id", ridstr); + slapi_entry_add_string(task_entry, "replica-certify-all", certify); + slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0"); + + slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_add_internal_pb(add_pb); + slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result); + slapi_pblock_destroy(add_pb); + if (result != LDAP_SUCCESS) { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, + "replica_check_for_tasks - failed to add cleanallruv abort task entry: %s\n", + ldap_err2string(result)); } - } + done2: + slapi_ch_free_string(&orig_val); + slapi_ch_free_string(&ridstr); + slapi_ch_free_string(&rdn); - done2: + } slapi_ch_array_free(clean_vals); } + object_release(repl_obj); + slapi_entry_free(e); } /* This function updates the entry to contain information generated diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index 7ac7e04..eb92a9d 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -57,7 +57,7 @@ static int replica_execute_task(Object *r, const char *task_name, char *returnte static int replica_execute_cl2ldif_task(Object *r, char *returntext); static int replica_execute_ldif2cl_task(Object *r, char *returntext); static int replica_execute_cleanruv_task(Object *r, ReplicaId rid, char *returntext); -static int replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, char *returntext); +static int replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, PRBool original_task, char *returntext); static void replica_cleanallruv_thread(void *arg); static void replica_send_cleanruv_task(Repl_Agmt *agmt, cleanruv_data *clean_data); static int check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task); @@ -216,7 +216,7 @@ replica_config_add(Slapi_PBlock *pb __attribute__((unused)), /* check rdn is "cn=replica" */ replicardn = slapi_rdn_new_sdn(slapi_entry_get_sdn(e)); if (replicardn) { - char *nrdn = slapi_rdn_get_nrdn(replicardn); + const char *nrdn = slapi_rdn_get_nrdn(replicardn); if (nrdn == NULL) { if (errortext != NULL) { strcpy(errortext, MSG_NOREPLICANORMRDN); @@ -1078,7 +1078,7 @@ replica_execute_task(Object *r, const char *task_name, char *returntext, int app } if (apply_mods) { Slapi_Task *empty_task = NULL; - return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, empty_task, returntext, "no"); + return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, empty_task, "no", PR_TRUE, returntext); } else return LDAP_SUCCESS; } else { @@ -1409,6 +1409,8 @@ replica_cleanall_ruv_task(Slapi_PBlock *pb __attribute__((unused)), const char *force_cleaning; const char *base_dn; const char *rid_str; + const char *orig_val = NULL; + PRBool original_task = PR_TRUE; int rc = SLAPI_DSE_CALLBACK_OK; /* allocate new task now */ @@ -1454,6 +1456,11 @@ replica_cleanall_ruv_task(Slapi_PBlock *pb __attribute__((unused)), } else { force_cleaning = "no"; } + if ((orig_val = slapi_fetch_attr(e, "replica-original-task", 0)) != NULL) { + if (!strcasecmp(orig_val, "0")) { + original_task = PR_FALSE; + } + } /* * Check the rid */ @@ -1486,7 +1493,7 @@ replica_cleanall_ruv_task(Slapi_PBlock *pb __attribute__((unused)), } /* clean the RUV's */ - rc = replica_execute_cleanall_ruv_task(r, rid, task, force_cleaning, returntext); + rc = replica_execute_cleanall_ruv_task(r, rid, task, force_cleaning, original_task, returntext); out: if (rc) { @@ -1509,7 +1516,7 @@ out: * */ static int -replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, char *returntext __attribute__((unused))) +replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, PRBool original_task, char *returntext __attribute__((unused))) { struct berval *payload = NULL; Slapi_Task *pre_task = NULL; /* this is supposed to be null for logging */ @@ -1603,7 +1610,7 @@ replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, co /* It is either a consequence of a direct ADD cleanAllRuv task * or modify of the replica to add nsds5task: cleanAllRuv */ - data->original_task = PR_TRUE; + data->original_task = original_task; thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread, (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, @@ -1734,7 +1741,7 @@ replica_cleanallruv_thread(void *arg) /* * Add the cleanallruv task to the repl config - so we can handle restarts */ - add_cleaned_rid(data, csnstr); /* marks config that we started cleaning a rid */ + add_cleaned_rid(data); /* marks config that we started cleaning a rid */ cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Cleaning rid (%d)...", data->rid); /* * First, wait for the maxcsn to be covered @@ -2067,7 +2074,7 @@ check_replicas_are_done_cleaning(cleanruv_data *data) "Waiting for all the replicas to finish cleaning..."); csn_as_string(data->maxcsn, PR_FALSE, csnstr); - filter = PR_smprintf("(%s=%d:%s:%s:%d)", type_replicaCleanRUV, (int)data->rid, csnstr, data->force, data->original_task ? 1 : 0); + filter = PR_smprintf("(%s=%d:%s:%s:%d:%s)", type_replicaCleanRUV, (int)data->rid, csnstr, data->force, data->original_task ? 1 : 0, data->repl_root); while (not_all_cleaned && !is_task_aborted(data->rid) && !slapi_is_shutting_down()) { agmt_obj = agmtlist_get_first_agreement_for_replica(data->replica); if (agmt_obj == NULL) { @@ -2540,15 +2547,15 @@ set_cleaned_rid(ReplicaId rid) * Add the rid and maxcsn to the repl config (so we can resume after a server restart) */ void -add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn) +add_cleaned_rid(cleanruv_data *cleanruv_data) { Slapi_PBlock *pb; struct berval *vals[2]; struct berval val; LDAPMod *mods[2]; LDAPMod mod; - char data[CSN_STRSIZE + 10]; - char *dn; + char *data = NULL; + char *dn = NULL; int rc; ReplicaId rid; Replica *r; @@ -2558,13 +2565,15 @@ add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn) r = cleanruv_data->replica; forcing = cleanruv_data->force; - if (r == NULL || maxcsn == NULL) { + if (r == NULL) { return; } /* * Write the rid & maxcsn to the config entry */ - val.bv_len = PR_snprintf(data, sizeof(data), "%d:%s:%s:%d", rid, maxcsn, forcing, cleanruv_data->original_task ? 1 : 0); + data = slapi_ch_smprintf("%d:%s:%d:%s", + rid, forcing, cleanruv_data->original_task ? 1 : 0, + cleanruv_data->repl_root); dn = replica_get_dn(r); pb = slapi_pblock_new(); mod.mod_op = LDAP_MOD_ADD | LDAP_MOD_BVALUES; @@ -2572,6 +2581,7 @@ add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn) mod.mod_bvalues = vals; vals[0] = &val; vals[1] = NULL; + val.bv_len = strlen(data); val.bv_val = data; mods[0] = &mod; mods[1] = NULL; @@ -2585,6 +2595,7 @@ add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn) "Failed to update replica config (%d), rid (%d)\n", rc, rid); } + slapi_ch_free_string(&data); slapi_ch_free_string(&dn); slapi_pblock_destroy(pb); } @@ -2593,7 +2604,7 @@ add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn) * Add aborted rid and repl root to config in case of a server restart */ void -add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root) +add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task) { Slapi_PBlock *pb; struct berval *vals[2]; @@ -2619,7 +2630,7 @@ add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root) */ dn = replica_get_dn(r); pb = slapi_pblock_new(); - data = PR_smprintf("%d:%s", rid, repl_root); + data = PR_smprintf("%d:%s:%s:%d", rid, repl_root, certify_all, original_task ? 1 : 0); mod.mod_op = LDAP_MOD_ADD | LDAP_MOD_BVALUES; mod.mod_type = (char *)type_replicaAbortCleanRUV; mod.mod_bvalues = vals; @@ -2646,7 +2657,7 @@ add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root) } void -delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, int skip) +delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip) { Slapi_PBlock *pb; LDAPMod *mods[2]; @@ -2674,7 +2685,7 @@ delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, int skip) } else { /* only remove the config, leave the in-memory rid */ dn = replica_get_dn(r); - data = PR_smprintf("%d:%s", (int)rid, repl_root); + data = PR_smprintf("%d:%s:%s:%d", (int)rid, repl_root, certify_all, original_task ? 1 : 0); mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES; mod.mod_type = (char *)type_replicaAbortCleanRUV; @@ -2711,8 +2722,6 @@ delete_cleaned_rid_config(cleanruv_data *clean_data) Slapi_Entry **entries = NULL; LDAPMod *mods[2]; LDAPMod mod; - struct berval *vals[5] = {0, 0, 0, 0, 0}; /* maximum of 4 tasks */ - struct berval val[5]; char *iter = NULL; char *dn = NULL; int i, ii; @@ -2759,7 +2768,6 @@ delete_cleaned_rid_config(cleanruv_data *clean_data) for (i = 0; entries[i] != NULL; i++) { char **attr_val = slapi_entry_attr_get_charray(entries[i], type_replicaCleanRUV); char *edn = slapi_entry_get_dn(entries[i]); - int count = 0; for (ii = 0; attr_val && attr_val[ii] && i < 5; ii++) { /* make a copy to retain the full value after toking */ @@ -2767,45 +2775,35 @@ delete_cleaned_rid_config(cleanruv_data *clean_data) rid = atoi(ldap_utf8strtok_r(attr_val[ii], ":", &iter)); if (rid == clean_data->rid) { - val[count].bv_len = strlen(aval); - val[count].bv_val = aval; - vals[count] = &val[count]; - count++; - } else { - slapi_ch_free_string(&aval); + struct berval *vals[2]; + struct berval val[1]; + val[0].bv_len = strlen(aval); + val[0].bv_val = aval; + vals[0] = &val[0]; + vals[1] = NULL; + + mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES; + mod.mod_type = (char *)type_replicaCleanRUV; + mod.mod_bvalues = vals; + mods[0] = &mod; + mods[1] = NULL; + + modpb = slapi_pblock_new(); + slapi_modify_internal_set_pb(modpb, edn, mods, NULL, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_modify_internal_pb(modpb); + slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + slapi_pblock_destroy(modpb); + if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) { + cleanruv_log(clean_data->task, clean_data->rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, + "delete_cleaned_rid_config - Failed to remove task data from (%s) error (%d), rid (%d)", + edn, rc, clean_data->rid); + goto bail; + } } + slapi_ch_free_string(&aval); } slapi_ch_array_free(attr_val); - - /* - * Now delete the attribute - */ - vals[4] = NULL; - mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES; - mod.mod_type = (char *)type_replicaCleanRUV; - mod.mod_bvalues = vals; - mods[0] = &mod; - mods[1] = NULL; - - modpb = slapi_pblock_new(); - slapi_modify_internal_set_pb(modpb, edn, mods, NULL, NULL, - repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); - slapi_modify_internal_pb(modpb); - slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc); - slapi_pblock_destroy(modpb); - - /* free the attr vals */ - for (ii = 0; ii < count; ii++) { - slapi_ch_free_string(&val[ii].bv_val); - } - - if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) { - cleanruv_log(clean_data->task, clean_data->rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, - "delete_cleaned_rid_config - Failed to remove task data " - "from (%s) error (%d), rid (%d)", - edn, rc, clean_data->rid); - goto bail; - } } } } @@ -2870,7 +2868,9 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), Replica *replica; ReplicaId rid = -1; Object *r; + PRBool original_task = PR_TRUE; const char *certify_all; + const char *orig_val; const char *base_dn; const char *rid_str; char *ridstr = NULL; @@ -2986,8 +2986,9 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), * Stop the cleaning, and delete the rid */ replica = (Replica *)object_get_data(r); - add_aborted_rid(rid, replica, (char *)base_dn); + add_aborted_rid(rid, replica, (char *)base_dn, (char *)certify_all, original_task); stop_ruv_cleaning(); + /* * Prepare the abort struct and fire off the thread */ @@ -2998,6 +2999,11 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), rc = SLAPI_DSE_CALLBACK_ERROR; goto out; } + if ((orig_val = slapi_fetch_attr(e, "replica-original-task", 0)) != NULL) { + if (!strcasecmp(orig_val, "0")) { + original_task = PR_FALSE; + } + } data->repl_obj = r; /* released in replica_abort_task_thread() */ data->replica = replica; data->task = task; @@ -3006,7 +3012,7 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), data->repl_root = slapi_ch_strdup(base_dn); data->sdn = NULL; data->certify = slapi_ch_strdup(certify_all); - data->original_task = PR_TRUE; + data->original_task = original_task; thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread, (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, @@ -3069,8 +3075,8 @@ replica_abort_task_thread(void *arg) } if (data->replica == NULL && data->repl_obj) { data->replica = (Replica *)object_get_data(data->repl_obj); + release_it = 1; } - release_it = 1; } /* @@ -3148,11 +3154,11 @@ done: /* * Clean up the config */ - delete_aborted_rid(data->replica, data->rid, data->repl_root, 1); /* delete just the config, leave rid in memory */ + delete_aborted_rid(data->replica, data->rid, data->repl_root, data->certify, data->original_task, 1); /* delete just the config, leave rid in memory */ if (strcasecmp(data->certify, "yes") == 0) { check_replicas_are_done_aborting(data); } - delete_aborted_rid(data->replica, data->rid, data->repl_root, 0); /* remove the in-memory aborted rid */ + delete_aborted_rid(data->replica, data->rid, data->repl_root, data->certify, data->original_task, 0); /* remove the in-memory aborted rid */ if (rc == 0) { cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully aborted task for rid(%d)", data->rid); } else { diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c index 68e2544..b49cb8c 100644 --- a/ldap/servers/plugins/replication/repl_extop.c +++ b/ldap/servers/plugins/replication/repl_extop.c @@ -1416,7 +1416,7 @@ multimaster_extop_abort_cleanruv(Slapi_PBlock *pb) /* * Set the aborted rid and stop the cleaning */ - add_aborted_rid(rid, r, repl_root); + add_aborted_rid(rid, r, repl_root, data->certify, data->original_task); stop_ruv_cleaning(); /* * Send out the extended ops to the replicas diff --git a/ldap/servers/slapd/task.c b/ldap/servers/slapd/task.c index 6e223fe..025ffcb 100644 --- a/ldap/servers/slapd/task.c +++ b/ldap/servers/slapd/task.c @@ -379,17 +379,15 @@ slapi_task_status_changed(Slapi_Task *task) int ttl; time_t expire; - e = get_internal_entry(pb, task->task_dn); - if (e == NULL) - return; - ttl = atoi(slapi_fetch_attr(e, "ttl", DEFAULT_TTL)); - if (ttl > (24*3600)) - ttl = (24*3600); /* be reasonable, allow to check task status not longer than one day */ - expire = time(NULL) + ttl; - task->task_flags |= SLAPI_TASK_DESTROYING; - /* queue an event to destroy the state info */ - slapi_eq_once(destroy_task, (void *)task, expire); - + if ((e = get_internal_entry(pb, task->task_dn))) { + ttl = atoi(slapi_fetch_attr(e, "ttl", DEFAULT_TTL)); + if (ttl > (24*3600)) + ttl = (24*3600); /* be reasonable, allow to check task status not longer than one day */ + expire = time(NULL) + ttl; + task->task_flags |= SLAPI_TASK_DESTROYING; + /* queue an event to destroy the state info */ + slapi_eq_once(destroy_task, (void *)task, expire); + } slapi_free_search_results_internal(pb); slapi_pblock_destroy(pb); }