| |
@@ -6,6 +6,9 @@
|
| |
* See LICENSE for details.
|
| |
* END COPYRIGHT BLOCK **/
|
| |
|
| |
+ #include <nspr4/prlog.h>
|
| |
+ #include <bits/stdint-intn.h>
|
| |
+
|
| |
#include "sync.h"
|
| |
|
| |
/* Main list of established persistent synchronizaton searches */
|
| |
@@ -29,7 +32,7 @@
|
| |
static int sync_add_request(SyncRequest *req);
|
| |
static void sync_remove_request(SyncRequest *req);
|
| |
static SyncRequest *sync_request_alloc(void);
|
| |
- void sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype);
|
| |
+ void sync_queue_change(OPERATION_PL_CTX_T *operation);
|
| |
static void sync_send_results(void *arg);
|
| |
static void sync_request_wakeup_all(void);
|
| |
static void sync_node_free(SyncQueueNode **node);
|
| |
@@ -37,17 +40,244 @@
|
| |
static int sync_acquire_connection(Slapi_Connection *conn);
|
| |
static int sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);
|
| |
|
| |
+ /* This routine appends the operation at the end of the
|
| |
+ * per thread pending list of nested operation..
|
| |
+ * being a betxn_preop the pending list has the same order
|
| |
+ * that the server received the operation
|
| |
+ */
|
| |
+ int
|
| |
+ sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)
|
| |
+ {
|
| |
+ OPERATION_PL_CTX_T *prim_op;
|
| |
+ OPERATION_PL_CTX_T *new_op;
|
| |
+ Slapi_DN *sdn;
|
| |
+
|
| |
+ if (!SYNC_IS_INITIALIZED()) {
|
| |
+ /* not initialized if sync plugin is not started */
|
| |
+ return 0;
|
| |
+ }
|
| |
+
|
| |
+ /* Create a new pending operation node */
|
| |
+ new_op = (OPERATION_PL_CTX_T *)slapi_ch_calloc(1, sizeof(OPERATION_PL_CTX_T));
|
| |
+ new_op->flags = OPERATION_PL_PENDING;
|
| |
+ slapi_pblock_get(pb, SLAPI_OPERATION, &new_op->op);
|
| |
+ slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
|
| |
+
|
| |
+ prim_op = get_thread_primary_op();
|
| |
+ if (prim_op) {
|
| |
+ /* It already exists a primary operation, so the current
|
| |
+ * operation is a nested one that we need to register at the end
|
| |
+ * of the pending nested operations
|
| |
+ */
|
| |
+ OPERATION_PL_CTX_T *current_op;
|
| |
+ for (current_op = prim_op; current_op->next; current_op = current_op->next);
|
| |
+ current_op->next = new_op;
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - nested operation targets "
|
| |
+ "\"%s\" (0x%lx)\n",
|
| |
+ slapi_sdn_get_dn(sdn), (ulong) new_op->op);
|
| |
+ } else {
|
| |
+ /* The current operation is the first/primary one in the txn
|
| |
+ * registers it directly in the thread private data (head)
|
| |
+ */
|
| |
+ set_thread_primary_op(new_op);
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - primary operation targets "
|
| |
+ "\"%s\" (0x%lx)\n",
|
| |
+ slapi_sdn_get_dn(sdn), (ulong) new_op->op);
|
| |
+ }
|
| |
+ return 0;
|
| |
+ }
|
| |
+
|
| |
+ /* This operation can not be proceed by sync_repl listener because
|
| |
+ * of internal problem. For example, POST entry does not exist
|
| |
+ */
|
| |
+ static void
|
| |
+ ignore_op_pl(Operation *op)
|
| |
+ {
|
| |
+ OPERATION_PL_CTX_T *prim_op, *curr_op;
|
| |
+ prim_op = get_thread_primary_op();
|
| |
+
|
| |
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
|
| |
+ if ((curr_op->op == op) &&
|
| |
+ (curr_op->flags == OPERATION_PL_PENDING)) { /* If by any "chance" a same operation structure was reused in consecutive updates
|
| |
+ * we can not only rely on 'op' value
|
| |
+ */
|
| |
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl operation (0x%lx) from the pending list\n",
|
| |
+ (ulong) op);
|
| |
+ curr_op->flags = OPERATION_PL_IGNORED;
|
| |
+ return;
|
| |
+ }
|
| |
+ }
|
| |
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl can not retrieve an operation (0x%lx) in pending list\n",
|
| |
+ (ulong) op);
|
| |
+ }
|
| |
+
|
| |
+ /* This is a generic function that is called by betxn_post of this plugin.
|
| |
+ * For the given operation (pb->pb_op) it sets in the pending list the state
|
| |
+ * of the completed operation.
|
| |
+ * When all operations are completed, if the primary operation is successful it
|
| |
+ * flushes (enqueue) the operations to the sync repl queue(s), else it just free
|
| |
+ * the pending list (skipping enqueue).
|
| |
+ */
|
| |
+ static void
|
| |
+ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t op_tag, char *label)
|
| |
+ {
|
| |
+ OPERATION_PL_CTX_T *prim_op = NULL, *curr_op;
|
| |
+ Operation *pb_op;
|
| |
+ Slapi_DN *sdn;
|
| |
+ int32_t rc;
|
| |
+
|
| |
+ if (!SYNC_IS_INITIALIZED()) {
|
| |
+ /* not initialized if sync plugin is not started */
|
| |
+ return;
|
| |
+ }
|
| |
+ slapi_pblock_get(pb, SLAPI_OPERATION, &pb_op);
|
| |
+ slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
|
| |
+
|
| |
+ if (NULL == e) {
|
| |
+ /* Ignore this operation (for example case of failure of the operation) */
|
| |
+ ignore_op_pl(pb_op);
|
| |
+ return;
|
| |
+ }
|
| |
+
|
| |
+ /* Retrieve the result of the operation */
|
| |
+ if (slapi_op_internal(pb)) {
|
| |
+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
|
| |
+ if (0 != rc) {
|
| |
+ /* The internal operation did not succeed */
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "internal operation Failed (0x%lx) rc=%d\n",
|
| |
+ (ulong) pb_op, rc);
|
| |
+ }
|
| |
+ } else {
|
| |
+ slapi_pblock_get(pb, SLAPI_PLUGIN_OPRETURN, &rc);
|
| |
+ if (0 != rc) {
|
| |
+ /* The operation did not succeed */
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "direct operation Failed (0x%lx) rc=%d\n",
|
| |
+ (ulong) pb_op, rc);
|
| |
+ }
|
| |
+ }
|
| |
+
|
| |
+
|
| |
+ prim_op = get_thread_primary_op();
|
| |
+ PR_ASSERT(prim_op);
|
| |
+ /* First mark the operation as completed/failed
|
| |
+ * the param to be used once the operation will be pushed
|
| |
+ * on the listeners queue
|
| |
+ */
|
| |
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
|
| |
+ if ((curr_op->op == pb_op) &&
|
| |
+ (curr_op->flags == OPERATION_PL_PENDING)) { /* If by any "chance" a same operation structure was reused in consecutive updates
|
| |
+ * we can not only rely on 'op' value
|
| |
+ */
|
| |
+ if (rc == LDAP_SUCCESS) {
|
| |
+ curr_op->flags = OPERATION_PL_SUCCEEDED;
|
| |
+ curr_op->entry = e ? slapi_entry_dup(e) : NULL;
|
| |
+ curr_op->eprev = eprev ? slapi_entry_dup(eprev) : NULL;
|
| |
+ curr_op->chgtype = op_tag;
|
| |
+ } else {
|
| |
+ curr_op->flags = OPERATION_PL_FAILED;
|
| |
+ }
|
| |
+ break;
|
| |
+ }
|
| |
+ }
|
| |
+ if (!curr_op) {
|
| |
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "%s - operation not found on the pendling list\n", label);
|
| |
+ PR_ASSERT(curr_op);
|
| |
+ }
|
| |
+
|
| |
+ #if DEBUG
|
| |
+ /* dump the pending queue */
|
| |
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
|
| |
+ char *flags_str;
|
| |
+ char * entry_str;
|
| |
+
|
| |
+ if (curr_op->entry) {
|
| |
+ entry_str = slapi_entry_get_dn(curr_op->entry);
|
| |
+ } else if (curr_op->eprev){
|
| |
+ entry_str = slapi_entry_get_dn(curr_op->eprev);
|
| |
+ } else {
|
| |
+ entry_str = "unknown";
|
| |
+ }
|
| |
+ switch (curr_op->flags) {
|
| |
+ case OPERATION_PL_SUCCEEDED:
|
| |
+ flags_str = "succeeded";
|
| |
+ break;
|
| |
+ case OPERATION_PL_FAILED:
|
| |
+ flags_str = "failed";
|
| |
+ break;
|
| |
+ case OPERATION_PL_IGNORED:
|
| |
+ flags_str = "ignored";
|
| |
+ break;
|
| |
+ case OPERATION_PL_PENDING:
|
| |
+ flags_str = "pending";
|
| |
+ break;
|
| |
+ default:
|
| |
+ flags_str = "unknown";
|
| |
+ break;
|
| |
+
|
| |
+
|
| |
+ }
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "dump pending list(0x%lx) %s %s\n",
|
| |
+ (ulong) curr_op->op, entry_str, flags_str);
|
| |
+ }
|
| |
+ #endif
|
| |
+
|
| |
+ /* Second check if it remains a pending operation in the pending list */
|
| |
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
|
| |
+ if (curr_op->flags == OPERATION_PL_PENDING) {
|
| |
+ break;
|
| |
+ }
|
| |
+ }
|
| |
+ if (curr_op) {
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "%s - It remains a pending operation (0x%lx)\n", label, (ulong) curr_op->op);
|
| |
+ } else {
|
| |
+ OPERATION_PL_CTX_T *next = NULL;
|
| |
+ PRBool enqueue_it = PR_TRUE;
|
| |
+ /* all operations on the pending list are completed moved them
|
| |
+ * to the listeners queue in the same order as pending list.
|
| |
+ * If the primary operation failed, operation are not moved to
|
| |
+ * the queue
|
| |
+ */
|
| |
+ if (prim_op->flags == OPERATION_PL_FAILED) {
|
| |
+ /* if primary update failed, the txn is aborted and none of
|
| |
+ * the operations were applied. Just forget this pending list
|
| |
+ */
|
| |
+ enqueue_it = PR_FALSE;
|
| |
+ }
|
| |
+ for (curr_op = prim_op; curr_op; curr_op = next) {
|
| |
+ char *entry;
|
| |
+ if (curr_op->entry) {
|
| |
+ entry = slapi_entry_get_dn(curr_op->entry);
|
| |
+ } else if (curr_op->eprev){
|
| |
+ entry = slapi_entry_get_dn(curr_op->eprev);
|
| |
+ } else {
|
| |
+ entry = "unknown";
|
| |
+ }
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "Do %s enqueue (0x%lx) %s\n",
|
| |
+ enqueue_it ? "" : "not", (ulong) curr_op->op, entry);
|
| |
+ if (enqueue_it) {
|
| |
+ sync_queue_change(curr_op);
|
| |
+ }
|
| |
+
|
| |
+ /* now free this pending operation */
|
| |
+ next = curr_op->next;
|
| |
+ slapi_entry_free(curr_op->entry);
|
| |
+ slapi_entry_free(curr_op->eprev);
|
| |
+ slapi_ch_free((void **)&curr_op);
|
| |
+ }
|
| |
+ /* we consumed all the pending operation, free the pending list*/
|
| |
+ set_thread_primary_op(NULL);
|
| |
+ }
|
| |
+ }
|
| |
int
|
| |
sync_add_persist_post_op(Slapi_PBlock *pb)
|
| |
{
|
| |
Slapi_Entry *e;
|
| |
-
|
| |
if (!SYNC_IS_INITIALIZED()) {
|
| |
return (0);
|
| |
}
|
| |
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
|
| |
- sync_queue_change(e, NULL, LDAP_REQ_ADD);
|
| |
+ sync_update_persist_op(pb, e, NULL, LDAP_REQ_ADD, "sync_add_persist_post_op");
|
| |
|
| |
return (0);
|
| |
}
|
| |
@@ -62,7 +292,7 @@
|
| |
}
|
| |
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e);
|
| |
- sync_queue_change(e, NULL, LDAP_REQ_DELETE);
|
| |
+ sync_update_persist_op(pb, e, NULL, LDAP_REQ_DELETE, "sync_del_persist_post_op");
|
| |
|
| |
return (0);
|
| |
}
|
| |
@@ -78,7 +308,7 @@
|
| |
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
|
| |
- sync_queue_change(e, e_prev, LDAP_REQ_MODIFY);
|
| |
+ sync_update_persist_op(pb, e, e_prev, LDAP_REQ_MODIFY, "sync_mod_persist_post_op");
|
| |
|
| |
return (0);
|
| |
}
|
| |
@@ -94,19 +324,22 @@
|
| |
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
|
| |
slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
|
| |
- sync_queue_change(e, e_prev, LDAP_REQ_MODRDN);
|
| |
+ sync_update_persist_op(pb, e, e_prev, LDAP_REQ_MODRDN, "sync_mod_persist_post_op");
|
| |
|
| |
return (0);
|
| |
}
|
| |
|
| |
void
|
| |
- sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype)
|
| |
+ sync_queue_change(OPERATION_PL_CTX_T *operation)
|
| |
{
|
| |
SyncRequest *req = NULL;
|
| |
SyncQueueNode *node = NULL;
|
| |
int matched = 0;
|
| |
int prev_match = 0;
|
| |
int cur_match = 0;
|
| |
+ Slapi_Entry *e = operation->entry;
|
| |
+ Slapi_Entry *eprev = operation->eprev;
|
| |
+ ber_int_t chgtype = operation->chgtype;
|
| |
|
| |
if (!SYNC_IS_INITIALIZED()) {
|
| |
return;
|
| |
@@ -195,16 +428,14 @@
|
| |
} else {
|
| |
pOldtail->sync_next = req->ps_eq_tail;
|
| |
}
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - entry "
|
| |
+ "\"%s\" \n",
|
| |
+ slapi_entry_get_dn_const(node->sync_entry));
|
| |
PR_Unlock(req->req_lock);
|
| |
}
|
| |
}
|
| |
-
|
| |
- SYNC_UNLOCK_READ();
|
| |
-
|
| |
/* Were there any matches? */
|
| |
if (matched) {
|
| |
- /* Notify update threads */
|
| |
- sync_request_wakeup_all();
|
| |
slapi_log_err(SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - enqueued entry "
|
| |
"\"%s\" on %d request listeners\n",
|
| |
slapi_entry_get_dn_const(e), matched);
|
| |
@@ -213,6 +444,13 @@
|
| |
"\"%s\" not enqueued on any request search listeners\n",
|
| |
slapi_entry_get_dn_const(e));
|
| |
}
|
| |
+ SYNC_UNLOCK_READ();
|
| |
+
|
| |
+ /* Were there any matches? */
|
| |
+ if (matched) {
|
| |
+ /* Notify update threads */
|
| |
+ sync_request_wakeup_all();
|
| |
+ }
|
| |
}
|
| |
/*
|
| |
* Initialize the list structure which contains the list
|
| |
@@ -359,11 +597,12 @@
|
| |
}
|
| |
|
| |
/*
|
| |
- * Called when stopping/disabling the plugin
|
| |
+ * Called when stopping/disabling the plugin (like shutdown)
|
| |
*/
|
| |
int
|
| |
sync_persist_terminate_all()
|
| |
{
|
| |
+ SyncRequest *req = NULL, *next;
|
| |
if (SYNC_IS_INITIALIZED()) {
|
| |
/* signal the threads to stop */
|
| |
plugin_closing = 1;
|
| |
@@ -377,6 +616,16 @@
|
| |
slapi_destroy_rwlock(sync_request_list->sync_req_rwlock);
|
| |
PR_DestroyLock(sync_request_list->sync_req_cvarlock);
|
| |
PR_DestroyCondVar(sync_request_list->sync_req_cvar);
|
| |
+
|
| |
+ /* it frees the structures, just in case it remained connected sync_repl client */
|
| |
+ for (req = sync_request_list->sync_req_head; NULL != req; req = next) {
|
| |
+ next = req->req_next;
|
| |
+ slapi_pblock_destroy(req->req_pblock);
|
| |
+ req->req_pblock = NULL;
|
| |
+ PR_DestroyLock(req->req_lock);
|
| |
+ req->req_lock = NULL;
|
| |
+ slapi_ch_free((void **)&req);
|
| |
+ }
|
| |
slapi_ch_free((void **)&sync_request_list);
|
| |
}
|
| |
|
| |
@@ -545,6 +794,8 @@
|
| |
int rc;
|
| |
PRUint64 connid;
|
| |
int opid;
|
| |
+ char **attrs_dup;
|
| |
+ char *strFilter;
|
| |
|
| |
slapi_pblock_get(req->req_pblock, SLAPI_CONN_ID, &connid);
|
| |
slapi_pblock_get(req->req_pblock, SLAPI_OPERATION_ID, &opid);
|
| |
@@ -589,9 +840,12 @@
|
| |
Slapi_Entry *ec;
|
| |
int chg_type = LDAP_SYNC_NONE;
|
| |
|
| |
- /* deque one element */
|
| |
+ /* dequeue one element */
|
| |
PR_Lock(req->req_lock);
|
| |
qnode = req->ps_eq_head;
|
| |
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - dequeue "
|
| |
+ "\"%s\" \n",
|
| |
+ slapi_entry_get_dn_const(qnode->sync_entry));
|
| |
req->ps_eq_head = qnode->sync_next;
|
| |
if (NULL == req->ps_eq_head) {
|
| |
req->ps_eq_tail = NULL;
|
| |
@@ -665,10 +919,22 @@
|
| |
sync_release_connection(req->req_pblock, conn, op, conn_acq_flag == 0);
|
| |
|
| |
done:
|
| |
+ /* This client closed the connection or shutdown, free the req */
|
| |
sync_remove_request(req);
|
| |
PR_DestroyLock(req->req_lock);
|
| |
req->req_lock = NULL;
|
| |
- slapi_ch_free((void **)&req->req_pblock);
|
| |
+
|
| |
+ slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_ATTRS, &attrs_dup);
|
| |
+ slapi_ch_array_free(attrs_dup);
|
| |
+ slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_ATTRS, NULL);
|
| |
+
|
| |
+ slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_STRFILTER, &strFilter);
|
| |
+ slapi_ch_free((void **)&strFilter);
|
| |
+ slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_STRFILTER, NULL);
|
| |
+
|
| |
+ slapi_pblock_destroy(req->req_pblock);
|
| |
+ req->req_pblock = NULL;
|
| |
+
|
| |
slapi_ch_free((void **)&req->req_orig_base);
|
| |
slapi_filter_free(req->req_filter, 1);
|
| |
sync_cookie_free(&req->req_cookie);
|
| |
These seem like left overs that you don't need to add here :)