From 1b4b9d53d2cfec92802f6fd4fdbc0101e987c6ee Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: May 25 2012 19:27:03 +0000 Subject: Ticket 368 - Make the cleanAllRUV task one step Bug Description: The first version of this fix required a second releaseRUV step. Fix Description: Created a new "monitoring" thread that checks all the replicas RUV's to see if the rid was cleaned. Once they are cleaned, we automatically release the RID for reuse. I also refined the logging so it easy to track the status of the task. https://fedorahosted.org/389/ticket/368 reviewed by: richm (Thanks Rich!) (cherry picked from commit 507b593fa47efeb8a4fc3c2d6b75428e7a69a480) --- diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index d3feeb0..b04d9c4 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -439,6 +439,9 @@ long conn_get_timeout(Repl_Connection *conn); void conn_set_agmt_changed(Repl_Connection *conn); ConnResult conn_read_result(Repl_Connection *conn, int *message_id); ConnResult conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retdatap, LDAPControl ***returned_controls, int send_msgid, int *resp_msgid, int noblock); +LDAP * conn_get_ldap(Repl_Connection *conn); +void conn_lock(Repl_Connection *conn); +void conn_unlock(Repl_Connection *conn); /* In repl5_protocol.c */ typedef struct repl_protocol Repl_Protocol; @@ -598,9 +601,16 @@ void set_released_rid(int rid); int is_released_rid(int rid); int is_already_released_rid(); void delete_released_rid(); +void replica_cleanallruv_monitor_thread(void *arg); #define ALREADY_RELEASED -1 +typedef struct _cleanruv_data +{ + Object *repl_obj; + ReplicaId rid; +} cleanruv_data; + /* replutil.c */ LDAPControl* create_managedsait_control (); LDAPControl* create_backend_control(Slapi_DN *sdn); diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c index a074a9f..5fdc601 100644 --- a/ldap/servers/plugins/replication/repl5_connection.c +++ b/ldap/servers/plugins/replication/repl5_connection.c @@ -1710,6 +1710,16 @@ conn_get_timeout(Repl_Connection *conn) return retval; } +LDAP * +conn_get_ldap(Repl_Connection *conn) +{ + if(conn){ + return conn->ld; + } else { + return NULL; + } +} + void conn_set_agmt_changed(Repl_Connection *conn) { PR_ASSERT(NULL != conn); @@ -1908,3 +1918,19 @@ repl5_debug_timeout_callback(time_t when, void *arg) "repl5_debug_timeout_callback: set debug level to %d at %ld\n", s_debug_level, when); } + +void +conn_lock(Repl_Connection *conn) +{ + if(conn){ + PR_Lock(conn->lock); + } +} + +void +conn_unlock(Repl_Connection *conn) +{ + if(conn){ + PR_Unlock(conn->lock); + } +} diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index 5a3ed07..ca646e9 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -81,12 +81,12 @@ static int replica_execute_cl2ldif_task (Object *r, char *returntext); static int replica_execute_ldif2cl_task (Object *r, char *returntext); static int replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext); static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext); -static int replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext); +static int replica_execute_release_ruv_task(Object *r, ReplicaId rid); static struct berval *create_ruv_payload(char *value); static int replica_cleanup_task (Object *r, const char *task_name, char *returntext, int apply_mods); static int replica_task_done(Replica *replica); - static multimaster_mtnode_extension * _replica_config_get_mtnode_ext (const Slapi_Entry *e); +int g_get_shutdown(); /* * Note: internal add/modify/delete operations should not be run while @@ -860,21 +860,6 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt else return LDAP_SUCCESS; } - else if (strncasecmp (task_name, RELEASERUV, RELEASERUVLEN) == 0) - { - int temprid = atoi(&(task_name[RELEASERUVLEN])); - if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){ - PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id for task - %s", task_name); - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,"replica_execute_task: %s\n", returntext); - return LDAP_OPERATIONS_ERROR; - } - if (apply_mods) - { - return replica_execute_release_ruv_task(r, (ReplicaId)temprid, returntext); - } - else - return LDAP_SUCCESS; - } else { PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "unsupported replica task - %s", task_name); @@ -1177,7 +1162,6 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not * Clean the changelog RUV's, and set the rids */ cl5CleanRUV(rid); - set_cleaned_rid(rid); delete_released_rid(); if (rc != RUV_SUCCESS){ @@ -1191,19 +1175,22 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext) { + PRThread *thread = NULL; Repl_Connection *conn; Replica *replica = (Replica*)object_get_data (r); Object *agmt_obj; Repl_Agmt *agmt; ConnResult crc; + cleanruv_data *data = NULL; const Slapi_DN *dn = NULL; struct berval *payload = NULL; char *ridstr = NULL; int send_msgid = 0; + int agmt_count = 0; int rc = 0; - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: cleaning rid (%d)...\n",(int)rid); - + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: cleaning rid (%d)...\n",(int)rid); + set_cleaned_rid(rid); /* * Create payload */ @@ -1245,8 +1232,9 @@ replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext) slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to send " "cleanruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); } else { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: successfully sent " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: successfully sent " "cleanruv extended op to (%s)\n",slapi_sdn_get_dn(dn)); + agmt_count++; } conn_start_linger(conn); } @@ -1270,17 +1258,218 @@ done: */ replica_execute_cleanruv_task (r, rid, returntext); - if(rc == 0){ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: operation successful\n"); + if(rc == 0 && agmt_count > 0){ /* success, but we need to check our replicas */ + /* + * Launch the cleanruv monitoring thread. Once all the replicas are cleaned it will release the rid + */ + data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); + if (data == NULL) { + slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to allocate " + "cleanruv_data. Aborting task.\n"); + return -1; + } + data->repl_obj = r; + data->rid = rid; + + thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_monitor_thread, + (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); + if (thread == NULL) { + slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: unable to create cleanAllRUV " + "monitoring thread. Aborting task.\n"); + } + + } else if(r == 0){ /* success */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully Finished.\n"); } else { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: operation failed (%d)\n",rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Task failed (%d)\n",rc); } return rc; } +/* + * After all the cleanAllRUV extended ops have been sent, we need to monitoring those replicas' + * RUVs. Once the rid is cleaned, then we need to release it, and push this "release" + * to the other replicas + */ +void +replica_cleanallruv_monitor_thread(void *arg) +{ + Object *agmt_obj; + LDAP *ld; + Repl_Connection *conn; + Repl_Agmt *agmt; + Replica *replica;; + cleanruv_data *data = arg; + LDAPMessage *result, *entry; + BerElement *ber; + time_t start_time; + struct berval **vals; + char *rid_text; + char *attrs[2]; + char *attr; + int replicas_cleaned = 0; + int found = 0; + int crc; + int rc = 0; + int i; + + /* + * Initialize our settings + */ + attrs[0] = "nsds50ruv"; + attrs[1] = NULL; + rid_text = slapi_ch_smprintf("{replica %d ldap", data->rid); + replica = (Replica*)object_get_data (data->repl_obj); + start_time = current_time(); + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Waiting for all the replicas to get cleaned...\n"); + + while(!g_get_shutdown()) + { + DS_Sleep(PR_SecondsToInterval(10)); + found = 0; + agmt_obj = agmtlist_get_first_agreement_for_replica (replica); + while (agmt_obj){ + agmt = (Repl_Agmt*)object_get_data (agmt_obj); + if(!agmt_is_enabled(agmt)){ + agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); + continue; + } + /* + * Get the replication connection + */ + conn = (Repl_Connection *)agmt_get_connection(agmt); + crc = conn_connect(conn); + if (CONN_OPERATION_FAILED == crc ){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to connect " + "to repl agreement connection (%s), error %d\n","", ACQUIRE_TRANSIENT_ERROR); + continue; + } else if (CONN_SSL_NOT_ENABLED == crc){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to acquire " + "repl agmt connection (%s), error %d\n","", ACQUIRE_FATAL_ERROR); + continue; + } + /* + * Get the LDAP connection handle from the conn + */ + ld = conn_get_ldap(conn); + if(ld == NULL){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to get LDAP " + "handle from the replication agmt (%s). Moving on to the next agmt.\n",agmt_get_long_name(agmt)); + continue; + } + /* + * Search this replica for its tombstone/ruv entry + */ + conn_cancel_linger(conn); + conn_lock(conn); + rc = ldap_search_ext_s(ld, slapi_sdn_get_dn(agmt_get_replarea(agmt)), LDAP_SCOPE_SUBTREE, + "(&(nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff)(objectclass=nstombstone))", + attrs, 0, NULL, NULL, NULL, 0, &result); + if(rc != LDAP_SUCCESS){ + /* + * Couldn't contact ldap server, what should we do? + * + * Skip it and move on! It's the admin's job to make sure all the replicas are up and running + */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: monitor thread failed to contact " + "agmt (%s), moving on to the next agmt.\n", agmt_get_long_name(agmt)); + conn_unlock(conn); + conn_start_linger(conn); + continue; + } + /* + * There is only one entry. Check its "nsds50ruv" for our cleaned rid + */ + entry = ldap_first_entry( ld, result ); + if ( entry != NULL ) { + for ( attr = ldap_first_attribute( ld, entry, &ber ); attr != NULL; attr = ldap_next_attribute( ld, entry, ber ) ){ + /* make sure the attribute is nsds50ruv */ + if(strcasecmp(attr,"nsds50ruv") != 0){ + continue; + } + if ((vals = ldap_get_values_len( ld, entry, attr)) != NULL ) { + for ( i = 0; vals[i] && vals[i]->bv_val; i++ ) { + /* look for this replica */ + if(strstr(rid_text, vals[i]->bv_val)){ + /* rid has not been cleaned yet, start over */ + found = 1; + break; + } + } + ldap_value_free_len(vals); + } + ldap_memfree( attr ); + } + if ( ber != NULL ) { + ber_free( ber, 0 ); + } + } + ldap_msgfree( result ); + /* + * Unlock the connection, and start the linger timer + */ + conn_unlock(conn); + conn_start_linger(conn); + + if(found){ + /* + * If we've been trying to clean these replicas for over an hour, just quit + */ + if( (current_time() - start_time) > 3600){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: timed out checking if replicas have " + "been cleaned. The rid has not been released, you need to rerun the task.\n"); + goto done; + } + /* + * a rid has not been cleaned yet, go back to sleep and check them again + */ + break; + } + + agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); + } + if(!found){ + /* + * The replicas have been cleaned! Next, release the rid + */ + replicas_cleaned = 1; + break; + } + } /* while */ + + /* + * If the replicas are cleaned, release the rid + */ + if(replicas_cleaned){ + rc = replica_execute_release_ruv_task(data->repl_obj, data->rid); + if(rc == 0){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully Finished. All active " + "replicas have been cleaned.\n"); + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Failed: Replica ID was not released (%d) " + "You will need to rerun the task.\n", rc); + } + } else { + /* + * Shutdown + */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: slapd shutting down, you will need to rerun the task.\n"); + } + +done: + slapi_ch_free((void **)&rid_text); + slapi_ch_free((void **)&data); +} + +/* + * This function releases the cleaned rid so that it can be reused. + * We send this operation to all the known active replicas. + */ static int -replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext) +replica_execute_release_ruv_task(Object *r, ReplicaId rid) { Repl_Connection *conn; Replica *replica = (Replica*)object_get_data (r); @@ -1293,7 +1482,7 @@ replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext) int send_msgid = 0; int rc = 0; - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: releasing rid (%d)...\n", rid); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: releasing rid (%d)...\n", rid); /* * Set the released rid, and trigger cl trimmming @@ -1339,18 +1528,18 @@ replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext) conn_cancel_linger(conn); crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, payload, NULL, &send_msgid); if (CONN_OPERATION_SUCCESS != crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to send " - "releaseruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to send " + "releaseRUV extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); rc = LDAP_OPERATIONS_ERROR; } else { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: successfully sent " - "extended op to (%s)\n",slapi_sdn_get_dn(dn)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: successfully sent " + "releaseRUV extended op to (%s)\n",slapi_sdn_get_dn(dn)); } conn_start_linger(conn); } if(crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: replica (%s) has not " - "been cleaned. You will need to rerun the RELEASERUV task on this replica\n", + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica (%s) has not " + "been released. You will need to rerun the task\n", slapi_sdn_get_dn(dn)); rc = crc; } @@ -1364,9 +1553,9 @@ done: if(rc == 0){ set_released_rid(ALREADY_RELEASED); delete_cleaned_rid(); - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: Successfully released rid (%d)\n", rid); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully released rid (%d)\n", rid); } else { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: Failed to release rid (%d), error (%d)\n", rid, rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Failed to release rid (%d), error (%d)\n", rid, rc); } if(payload) diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c index e4abde0..f6378d2 100644 --- a/ldap/servers/plugins/replication/repl_extop.c +++ b/ldap/servers/plugins/replication/repl_extop.c @@ -1388,18 +1388,21 @@ free_and_return: int multimaster_extop_cleanruv(Slapi_PBlock *pb){ multimaster_mtnode_extension *mtnode_ext; + PRThread *thread = NULL; Repl_Connection *conn; const Slapi_DN *dn; Replica *r = NULL; Object *agmt_obj; Repl_Agmt *agmt; ConnResult crc; + cleanruv_data *data = NULL; struct berval *extop_value; char *extop_oid; char *repl_root; char *payload = NULL; char *iter; int send_msgid = 0; + int agmt_count = 0; int rid = 0; int rc = 0; @@ -1411,41 +1414,39 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){ /* something is wrong, error out */ return -1; } - /* * Extract the rid and repl_root from the payload */ if(decode_cleanruv_payload(extop_value, &payload)){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to decode payload. Aborting ext op\n"); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to decode payload. Aborting ext op\n"); return -1; } rid = atoi(ldap_utf8strtok_r(payload, ":", &iter)); repl_root = ldap_utf8strtok_r(iter, ":", &iter); - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n",rid); - /* * If we already cleaned this server, just return success */ if(is_cleaned_rid(rid)){ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: rid (%d) has already been cleaned, skipping\n",rid); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_extop: rid (%d) has already been cleaned, skipping\n",rid); return rc; } else { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n", rid); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: cleaning rid (%d)...\n", rid); + set_cleaned_rid(rid); } /* * Get the node, so we can get the replica and its agreements */ if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to get replication node " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to get replication node " "from (%s), aborting operation\n", repl_root); return -1; } if (mtnode_ext->replica) object_acquire (mtnode_ext->replica); if (mtnode_ext->replica == NULL){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica is missing from (%s), " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica is missing from (%s), " "aborting operation\n",repl_root); rc = LDAP_OPERATIONS_ERROR; goto free_and_return; @@ -1466,36 +1467,37 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){ conn = (Repl_Connection *)agmt_get_connection(agmt); if(conn == NULL){ /* no connection for this agreement, move on to the next agmt */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: the replica (%s), is " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: the replica (%s), is " "missing the connection. This replica will not be cleaned.\n", slapi_sdn_get_dn(dn)); agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); continue; } crc = conn_connect(conn); if (CONN_OPERATION_FAILED == crc ){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to connect " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to connect " "to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR); rc = LDAP_OPERATIONS_ERROR; } else if (CONN_SSL_NOT_ENABLED == crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to acquire " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to acquire " "repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR); rc = LDAP_OPERATIONS_ERROR; } else { conn_cancel_linger(conn); crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, extop_value, NULL, &send_msgid); if (CONN_OPERATION_SUCCESS != crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to send " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to send " "clean_ruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); rc = LDAP_OPERATIONS_ERROR; } else { /* success */ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: successfully sent " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: successfully sent " "extended op to (%s)\n",slapi_sdn_get_dn(dn) ); + agmt_count++; } conn_start_linger(conn); } if(crc != CONN_OPERATION_SUCCESS){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica (%s) has not " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica (%s) has not " "been cleaned. You will need to rerun the CLEANALLRUV task on this replica\n", slapi_sdn_get_dn(dn) ); rc = LDAP_OPERATIONS_ERROR; @@ -1508,10 +1510,30 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb){ free_and_return: - if(rc == 0){ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaned rid (%d)\n", rid); + if(rc == 0 && agmt_count > 0){ + /* + * Launch the cleanruv monitoring thread. Once all the replicas are cleaned it will release the rid + */ + data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); + if (data == NULL) { + slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to allocate " + "cleanruv_Data\n"); + return -1; + } + data->repl_obj = mtnode_ext->replica; + data->rid = rid; + + thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_monitor_thread, + (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); + if (thread == NULL) { + slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: unable to create cleanAllRUV " + "monitoring thread. Aborting task.\n"); + } + } else if (rc == 0){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Successfully Finished.\n"); } else { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to clean rid (%d), error (%d)\n",rid, rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanALLRUV_extop: failed to clean rid (%d), error (%d)\n",rid, rc); } if (mtnode_ext->replica) @@ -1558,34 +1580,34 @@ multimaster_extop_releaseruv(Slapi_PBlock *pb){ } if(decode_cleanruv_payload(extop_value, &payload)){ - slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseruv_extop: failed to decode payload, aborting ext op\n"); + slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseRUV_extop: failed to decode payload, aborting ext op.\n"); return -1; } rid = atoi(ldap_utf8strtok_r(payload, ":", &iter)); repl_root = ldap_utf8strtok_r(iter, ":", &iter); - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: releasing rid (%d)...\n",rid); /* * If we already released this ruv, just return. */ if(is_released_rid(rid) || is_already_released_rid()){ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: rid (%d) has already been released, skipping\n",rid); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_extop: rid (%d) has already been released, skipping.\n",rid); return 0; } else { /* set the released rid, and trigger trimming */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: releasing rid (%d)...\n", rid); set_released_rid((int)rid); trigger_cl_trimming(); } if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to get node " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to get node " "from replication root dn(%s), aborting operation.\n", repl_root); return -1; } if (mtnode_ext->replica) object_acquire (mtnode_ext->replica); if (mtnode_ext->replica == NULL){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica is missing from (%s), " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: replica is missing from (%s), " "aborting operation.\n", repl_root); rc = LDAP_OPERATIONS_ERROR; goto free_and_return; @@ -1606,38 +1628,38 @@ multimaster_extop_releaseruv(Slapi_PBlock *pb){ conn = (Repl_Connection *)agmt_get_connection(agmt); if(conn == NULL){ /* no connection for this agreement, log error, and move on */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: the replica (%s), is " - "missing the connection. This replica will not be cleaned.\n", slapi_sdn_get_dn(dn)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: the replica (%s), is " + "missing the connection. This replica will not be released.\n", slapi_sdn_get_dn(dn)); agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); continue; } crc = conn_connect(conn); if (CONN_OPERATION_FAILED == crc ){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to connect " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to connect " "to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR); rc = LDAP_OPERATIONS_ERROR; } else if (CONN_SSL_NOT_ENABLED == crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to acquire " + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_extop: failed to acquire " "repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR); rc = LDAP_OPERATIONS_ERROR; } else { conn_cancel_linger(conn); crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, extop_value, NULL, &send_msgid); if (CONN_OPERATION_SUCCESS != crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to send " - "releaseruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: failed to send " + "releaseRUV extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc); rc = LDAP_OPERATIONS_ERROR; } else { /* success */ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: successfully sent " - "extended op to (%s)\n",slapi_sdn_get_dn(dn) ); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: successfully sent " + "releaseRUV extended op to (%s)\n",slapi_sdn_get_dn(dn) ); rc = 0; } conn_start_linger(conn); } if(crc){ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica (%s) has not " - "been cleaned. You will need to rerun the RELEASERUV task on this replica\n", + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: replica (%s) has not " + "been released. You will need to rerun the task.\n", slapi_sdn_get_dn(dn) ); } agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); @@ -1650,9 +1672,10 @@ free_and_return: if(rc == 0){ set_released_rid(ALREADY_RELEASED); delete_cleaned_rid(); - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: released rid (%d) successfully\n", rid); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Successfully released rid (%d)\n", rid); } else { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to release rid(%d), error (%d), please retry the task\n",rid, rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_extop: Failed to release rid(%d), error (%d), " + "please retry the task.\n",rid, rc); } if(mtnode_ext->replica)