#50307 Ticket 50305 - Revise CleanAllRUV task restart process
Closed 3 years ago by spichugi. Opened 5 years ago by mreynolds.
mreynolds/389-ds-base ticket50305  into  master

@@ -104,7 +104,11 @@ 

      while not done and count < timeout:

          try:

              entry = topology_m4.ms["master1"].getEntry(task_dn, attrlist=attrlist)

-             if not entry or entry.nsTaskExitCode:

+             if entry is not None:

+                 if entry.hasAttr('nsTaskExitCode'):

+                     done = True

+                     break

+             else:

                  done = True

                  break

          except ldap.NO_SUCH_OBJECT:
@@ -142,7 +146,7 @@ 

  

  @pytest.fixture()

  def m4rid(request, topology_m4):

-     log.debug("Wait a bit before the reset - it is required fot the slow machines")

+     log.debug("Wait a bit before the reset - it is required for the slow machines")

      time.sleep(5)

      log.debug("-------------- BEGIN RESET of m4 -----------------")

      repl = ReplicationManager(DEFAULT_SUFFIX)
@@ -468,6 +472,9 @@ 

      # Start master 3

      topology_m4.ms["master3"].start()

  

+     # Need to wait 5 seconds before server processes any leftover tasks

+     time.sleep(6)

+ 

      # Check master 1 tried to run abort task.  We expect the abort task to be aborted.

      if not topology_m4.ms["master1"].searchErrorsLog('Aborting abort task'):

          log.fatal('test_abort_restart: Abort task did not restart')

@@ -728,7 +728,7 @@ 

  Slapi_Entry *get_in_memory_ruv(Slapi_DN *suffix_sdn);

  int replica_write_ruv(Replica *r);

  char *replica_get_dn(Replica *r);

- void replica_check_for_tasks(Replica *r, Slapi_Entry *e);

+ void replica_check_for_tasks(time_t when, void *arg);

  void replica_update_state(time_t when, void *arg);

  void replica_reset_csn_pl(Replica *r);

  uint64_t replica_get_protocol_timeout(Replica *r);
@@ -813,7 +813,7 @@ 

  void replica_config_destroy(void);

  int get_replica_type(Replica *r);

  int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid);

- void add_cleaned_rid(cleanruv_data *data, char *maxcsn);

+ void add_cleaned_rid(cleanruv_data *data);

  int is_cleaned_rid(ReplicaId rid);

  int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg);

  void replica_cleanallruv_thread_ext(void *arg);
@@ -825,9 +825,9 @@ 

  int decode_cleanruv_payload(struct berval *extop_value, char **payload);

  struct berval *create_cleanruv_payload(char *value);

  void ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids);

- void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root);

+ void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task);

  int is_task_aborted(ReplicaId rid);

- void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, int skip);

+ void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip);

  int is_pre_cleaned_rid(ReplicaId rid);

  void set_cleaned_rid(ReplicaId rid);

  void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...);

@@ -101,6 +101,8 @@ 

                      ext->replica = NULL;

                  }

              }

+             /* Wait a few seconds for everything to startup before resuming any replication tasks */

+             slapi_eq_once(replica_check_for_tasks, (void *)replica_get_root(r), time(NULL) + 5);

          }

      }

  }

@@ -236,8 +236,6 @@ 

                                             1000 * r->tombstone_reap_interval);

      }

  

-     replica_check_for_tasks(r, e);

- 

  done:

      if (rc != 0 && r) {

          replica_destroy((void **)&r);
@@ -2101,12 +2099,55 @@ 

      return (_replica_check_validity(r));

  }

  

+ static void

+ replica_delete_task_config(Slapi_Entry *e, char *attr, char *value)

+ {

+     Slapi_PBlock *modpb;

+     struct berval *vals[2];

+     struct berval val[1];

+     LDAPMod *mods[2];

+     LDAPMod mod;

+     int32_t rc = 0;

+ 

+     val[0].bv_len = strlen(value);

+     val[0].bv_val = value;

+     vals[0] = &val[0];

+     vals[1] = NULL;

+ 

+     mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;

+     mod.mod_type = attr;

+     mod.mod_bvalues = vals;

+     mods[0] = &mod;

+     mods[1] = NULL;

+ 

+     modpb = slapi_pblock_new();

+     slapi_modify_internal_set_pb(modpb, slapi_entry_get_dn(e), mods, NULL, NULL,

+             repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);

+     slapi_modify_internal_pb(modpb);

+     slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc);

+     slapi_pblock_destroy(modpb);

+ 

+     if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) {

+         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,

+                 "delete_cleaned_rid_config - Failed to remove task data from (%s) error (%d)\n",

+                 slapi_entry_get_dn(e), rc);

+     }

+ }

+ 

  void

- replica_check_for_tasks(Replica *r, Slapi_Entry *e)

+ replica_check_for_tasks(time_t when __attribute__((unused)), void *arg)

why do you need the unused param

  {

+     const Slapi_DN *repl_root = (Slapi_DN *)arg;

+     Slapi_Entry *e = NULL;

+     Replica *r = NULL;

+     Object *repl_obj = NULL;;

      char **clean_vals;

  

-     if (e == NULL || ldif_dump_is_running() == PR_TRUE) {

+     e = _replica_get_config_entry(repl_root, NULL);

+     repl_obj = replica_get_replica_from_dn(repl_root);

+     r = (Replica *)object_get_data(repl_obj);

+ 

+     if (e == NULL || r == NULL || ldif_dump_is_running() == PR_TRUE) {

          /* If db2ldif is being run, do not check if there are incomplete tasks */

          return;

      }
@@ -2115,218 +2156,218 @@ 

       *  if so set the cleaned rid, and fire off the thread

       */

      if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaCleanRUV)) != NULL) {

-         PRThread *thread = NULL;

-         struct berval *payload = NULL;

-         CSN *maxcsn = NULL;

-         ReplicaId rid;

-         char csnstr[CSN_STRSIZE];

-         char *token = NULL;

-         char *forcing;

-         PRBool original_task;

-         char *csnpart;

-         char *ridstr;

-         char *iter = NULL;

-         int i;

- 

-         for (i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++) {

-             cleanruv_data *data = NULL;

+         for (size_t i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++) {

+             struct timespec ts = slapi_current_rel_time_hr();

+             PRBool original_task = PR_TRUE;

+             Slapi_Entry *task_entry = NULL;

+             Slapi_PBlock *add_pb = NULL;

+             int32_t result = 0;

+             ReplicaId rid;

+             char *token = NULL;

+             char *forcing;

+             char *iter = NULL;

+             char *repl_root = NULL;

+             char *ridstr = NULL;

+             char *rdn = NULL;

+             char *dn = NULL;

+             char *orig_val = slapi_ch_strdup(clean_vals[i]);

  

              /*

-              *  Set the cleanruv data, and add the cleaned rid

+              *  Get all the needed from

from what

               */

              token = ldap_utf8strtok_r(clean_vals[i], ":", &iter);

              if (token) {

                  rid = atoi(token);

                  if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) {

-                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Invalid replica id(%d) "

-                                                                    "aborting task.\n",

-                                   rid);

+                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "

+                             "Invalid replica id(%d) aborting task.  Aborting cleaning task!\n", rid);

+                     replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);

                      goto done;

                  }

              } else {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Unable to parse cleanallruv "

-                                                                "data (%s), aborting task.\n",

-                               clean_vals[i]);

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "

+                         "Unable to parse cleanallruv data (%s), missing rid, aborting task.  Aborting cleaning task!\n",

+                         clean_vals[i]);

+                 replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);

                  goto done;

              }

-             csnpart = ldap_utf8strtok_r(iter, ":", &iter);

-             maxcsn = csn_new();

-             csn_init_by_string(maxcsn, csnpart);

-             csn_as_string(maxcsn, PR_FALSE, csnstr);

+ 

+             /* Get forcing */

              forcing = ldap_utf8strtok_r(iter, ":", &iter);

-             original_task = PR_TRUE;

-             if (forcing == NULL) {

-                 forcing = "no";

-             } else if (!strcasecmp(forcing, "yes") || !strcasecmp(forcing, "no")) {

-                 /* forcing was correctly set, lets try to read the original task flag */

-                 token = ldap_utf8strtok_r(iter, ":", &iter);

-                 if (token && !atoi(token)) {

-                     original_task = PR_FALSE;

-                 }

+             if (forcing == NULL || strlen(forcing) > 3) {

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "

+                         "Unable to parse cleanallruv data (%s), missing/invalid force option (%s).  Aborting cleaning task!\n",

+                         clean_vals[i], forcing ? forcing : "missing force option");

+                 replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);

+                 goto done;

              }

  

-             slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "CleanAllRUV Task - cleanAllRUV task found, "

-                                                               "resuming the cleaning of rid(%d)...\n",

-                           rid);

-             /*

-              *  Create payload

-              */

-             ridstr = slapi_ch_smprintf("%d:%s:%s:%s", rid, slapi_sdn_get_dn(replica_get_root(r)), csnstr, forcing);

-             payload = create_cleanruv_payload(ridstr);

-             slapi_ch_free_string(&ridstr);

+             /* Get original task flag */

+             token = ldap_utf8strtok_r(iter, ":", &iter);

+             if (token) {

+                 if (!atoi(token)) {

+                      original_task = PR_FALSE;

+                 }

+             } else {

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "

+                         "Unable to parse cleanallruv data (%s), missing original task flag.  Aborting cleaning task!\n",

+                         clean_vals[i]);

+                 replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);

+                 goto done;

+             }

  

-             if (payload == NULL) {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Startup: Failed to "

-                                                                "create extended op payload, aborting task");

-                 csn_free(&maxcsn);

+             /* Get repl root */

+             token = ldap_utf8strtok_r(iter, ":", &iter);

+             if (token) {

+                 repl_root = token;

+             } else {

+                 /* no repl root, have to void task */

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - "

+                         "Unable to parse cleanallruv data (%s), missing replication root aborting task.  Aborting cleaning task!\n",

+                         clean_vals[i]);

+                 replica_delete_task_config(e, (char *)type_replicaCleanRUV, orig_val);

                  goto done;

              }

+ 

              /*

-              *  Setup the data struct, and fire off the thread.

+              * We have all our data, now add the task....

               */

-             data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));

-             if (data == NULL) {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Failed to allocate cleanruv_data.\n");

-                 csn_free(&maxcsn);

-             } else {

-                 /* setup our data */

-                 data->repl_obj = NULL;

-                 data->replica = NULL;

-                 data->rid = rid;

-                 data->task = NULL;

-                 data->maxcsn = maxcsn;

-                 data->payload = payload;

-                 data->sdn = slapi_sdn_dup(r->repl_root);

-                 data->force = slapi_ch_strdup(forcing);

-                 data->repl_root = NULL;

- 

-                 /* This is a corner case, a cleanAllRuv task was interrupted by a shutdown or a crash

-                  * We retrieved from type_replicaCleanRUV if the cleanAllRuv request

-                  * was received from a direct task ADD or if was received via

-                  * the cleanAllRuv extop.

-                  */

-                 data->original_task = original_task;

- 

-                 thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread_ext,

-                                          (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,

-                                          PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);

-                 if (thread == NULL) {

-                     /* log an error and free everything */

-                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "CleanAllRUV Task - Unable to create cleanAllRUV "

-                                                                    "thread for rid(%d)\n",

-                                   (int)data->rid);

-                     csn_free(&maxcsn);

-                     slapi_sdn_free(&data->sdn);

-                     ber_bvfree(data->payload);

-                     slapi_ch_free_string(&data->force);

-                     slapi_ch_free((void **)&data);

-                 }

+             slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "CleanAllRUV Task - "

+                     "CleanAllRUV task found, resuming the cleaning of rid(%d)...\n", rid);

+ 

+             add_pb = slapi_pblock_new();

+             task_entry = slapi_entry_alloc();

+             rdn = slapi_ch_smprintf("restarted-%ld", ts.tv_sec);

+             dn = slapi_create_dn_string("cn=%s,cn=cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec);

+             slapi_entry_init(task_entry, dn, NULL);

+ 

+             ridstr = slapi_ch_smprintf("%d", rid);

+             slapi_entry_add_string(task_entry, "objectclass", "top");

+             slapi_entry_add_string(task_entry, "objectclass", "extensibleObject");

+             slapi_entry_add_string(task_entry, "cn", rdn);

+             slapi_entry_add_string(task_entry, "replica-base-dn", repl_root);

+             slapi_entry_add_string(task_entry, "replica-id", ridstr);

+             slapi_entry_add_string(task_entry, "replica-force-cleaning", forcing);

+             slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0");

+ 

+             slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL,

+                     repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);

+             slapi_add_internal_pb(add_pb);

+             slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result);

+             slapi_pblock_destroy(add_pb);

+             if (result != LDAP_SUCCESS) {

+                slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,

+                        "replica_check_for_tasks - failed to add cleanallruv task entry: %s\n",

+                        ldap_err2string(result));

              }

-         }

  

-     done:

+         done:

+             slapi_ch_free_string(&orig_val);

+             slapi_ch_free_string(&ridstr);

+             slapi_ch_free_string(&rdn);

+         }

          slapi_ch_array_free(clean_vals);

      }

  

      if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaAbortCleanRUV)) != NULL) {

-         PRThread *thread = NULL;

-         struct berval *payload;

-         ReplicaId rid;

-         char *certify = NULL;

-         char *ridstr = NULL;

-         char *token = NULL;

-         char *repl_root;

-         char *iter = NULL;

-         int i;

- 

-         for (i = 0; clean_vals[i]; i++) {

-             cleanruv_data *data = NULL;

+         for (size_t i = 0; clean_vals[i]; i++) {

+             struct timespec ts = slapi_current_rel_time_hr();

+             PRBool original_task = PR_TRUE;

+             Slapi_Entry *task_entry = NULL;

+             Slapi_PBlock *add_pb = NULL;

+             ReplicaId rid;

+             char *certify = NULL;

+             char *ridstr = NULL;

+             char *token = NULL;

+             char *repl_root;

+             char *iter = NULL;

+             char *rdn = NULL;

+             char *dn = NULL;

+             char *orig_val = slapi_ch_strdup(clean_vals[i]);

+             int32_t result = 0;

  

              token = ldap_utf8strtok_r(clean_vals[i], ":", &iter);

              if (token) {

                  rid = atoi(token);

                  if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID) {

-                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Invalid replica id(%d) "

-                                                                    "aborting abort task.\n",

-                                   rid);

+                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - "

+                             "Invalid replica id(%d) aborting abort task.\n", rid);

+                     replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);

                      goto done2;

                  }

              } else {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Unable to parse cleanallruv "

-                                                                "data (%s), aborting abort task.\n",

-                               clean_vals[i]);

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - "

+                         "Unable to parse cleanallruv data (%s), aborting abort task.\n", clean_vals[i]);

+                 replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);

                  goto done2;

              }

  

              repl_root = ldap_utf8strtok_r(iter, ":", &iter);

              certify = ldap_utf8strtok_r(iter, ":", &iter);

  

+             /* Get original task flag */

+             token = ldap_utf8strtok_r(iter, ":", &iter);

+             if (token) {

+                 if (!atoi(token)) {

+                      original_task = PR_FALSE;

+                 }

+             } else {

+                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,

+                         "Abort CleanAllRUV Task - Unable to parse cleanallruv data (%s), "

+                         "missing original task flag.  Aborting abort task!\n",

+                         clean_vals[i]);

+                 replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);

+                 goto done;

+             }

+ 

              if (!is_cleaned_rid(rid)) {

-                 slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - Replica id(%d) is not "

-                                                                   "being cleaned, nothing to abort.  Aborting abort task.\n",

-                               rid);

-                 delete_aborted_rid(r, rid, repl_root, 0);

+                 slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - "

+                         "Replica id(%d) is not being cleaned, nothing to abort.  Aborting abort task.\n", rid);

+                 replica_delete_task_config(e, (char *)type_replicaAbortCleanRUV, orig_val);

                  goto done2;

              }

  

-             add_aborted_rid(rid, r, repl_root);

+             add_aborted_rid(rid, r, repl_root, certify, original_task);

              stop_ruv_cleaning();

  

-             slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - Abort task found, "

-                                                               "resuming abort of rid(%d).\n",

-                           rid);

-             /*

-              *  Setup the data struct, and fire off the abort thread.

-              */

-             data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));

-             if (data == NULL) {

-                 slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Failed to allocate cleanruv_data.\n");

-             } else {

-                 ridstr = slapi_ch_smprintf("%d:%s:%s", rid, repl_root, certify);

-                 payload = create_cleanruv_payload(ridstr);

-                 slapi_ch_free_string(&ridstr);

- 

-                 if (payload == NULL) {

-                     slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Failed to create extended "

-                                                                    "op payload\n");

-                     slapi_ch_free((void **)&data);

-                 } else {

-                     /* setup the data */

-                     data->repl_obj = NULL;

-                     data->replica = NULL;

-                     data->rid = rid;

-                     data->task = NULL;

-                     data->payload = payload;

-                     data->repl_root = slapi_ch_strdup(repl_root);

-                     data->sdn = slapi_sdn_dup(r->repl_root);

-                     data->certify = slapi_ch_strdup(certify);

- 

-                     /* This is a corner case, a cleanAllRuv task was interrupted by a shutdown or a crash

-                      * Let's assum this replica was the original receiver of the task.

-                      * This flag has no impact on Abort cleanAllRuv

-                      */

-                     data->original_task = PR_TRUE;

- 

-                     thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread,

-                                              (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,

-                                              PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);

-                     if (thread == NULL) {

-                         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "Abort CleanAllRUV Task - Unable to create abort cleanAllRUV "

-                                                                        "thread for rid(%d)\n",

-                                       (int)data->rid);

-                         slapi_sdn_free(&data->sdn);

-                         ber_bvfree(data->payload);

-                         slapi_ch_free_string(&data->repl_root);

-                         slapi_ch_free_string(&data->certify);

-                         slapi_ch_free((void **)&data);

-                     }

-                 }

+             slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "Abort CleanAllRUV Task - "

+                     "Abort task found, resuming abort of rid(%d).\n", rid);

+ 

+             add_pb = slapi_pblock_new();

+             task_entry = slapi_entry_alloc();

+             rdn = slapi_ch_smprintf("restarted-abort-%ld", ts.tv_sec);

+             dn = slapi_create_dn_string("cn=%s,cn=abort cleanallruv, cn=tasks, cn=config", rdn, ts.tv_sec);

+             slapi_entry_init(task_entry, dn, NULL);

+ 

+             ridstr = slapi_ch_smprintf("%d", rid);

+             slapi_entry_add_string(task_entry, "objectclass", "top");

+             slapi_entry_add_string(task_entry, "objectclass", "extensibleObject");

+             slapi_entry_add_string(task_entry, "cn", rdn);

+             slapi_entry_add_string(task_entry, "replica-base-dn", repl_root);

+             slapi_entry_add_string(task_entry, "replica-id", ridstr);

+             slapi_entry_add_string(task_entry, "replica-certify-all", certify);

+             slapi_entry_add_string(task_entry, "replica-original-task", original_task ? "1" : "0");

+ 

+             slapi_add_entry_internal_set_pb(add_pb, task_entry, NULL,

+                     repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);

+             slapi_add_internal_pb(add_pb);

+             slapi_pblock_get(add_pb, SLAPI_PLUGIN_INTOP_RESULT, &result);

+             slapi_pblock_destroy(add_pb);

+             if (result != LDAP_SUCCESS) {

+                slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,

+                        "replica_check_for_tasks - failed to add cleanallruv abort task entry: %s\n",

+                        ldap_err2string(result));

              }

-         }

+         done2:

+             slapi_ch_free_string(&orig_val);

+             slapi_ch_free_string(&ridstr);

+             slapi_ch_free_string(&rdn);

  

-     done2:

+         }

          slapi_ch_array_free(clean_vals);

      }

+     object_release(repl_obj);

+     slapi_entry_free(e);

  }

  

  /* This function updates the entry to contain information generated

@@ -57,7 +57,7 @@ 

  static int replica_execute_cl2ldif_task(Object *r, char *returntext);

  static int replica_execute_ldif2cl_task(Object *r, char *returntext);

  static int replica_execute_cleanruv_task(Object *r, ReplicaId rid, char *returntext);

- static int replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, char *returntext);

+ static int replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, PRBool original_task, char *returntext);

  static void replica_cleanallruv_thread(void *arg);

  static void replica_send_cleanruv_task(Repl_Agmt *agmt, cleanruv_data *clean_data);

  static int check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task);
@@ -216,7 +216,7 @@ 

      /* check rdn is "cn=replica" */

      replicardn = slapi_rdn_new_sdn(slapi_entry_get_sdn(e));

      if (replicardn) {

-           char *nrdn = slapi_rdn_get_nrdn(replicardn);

+           const char *nrdn = slapi_rdn_get_nrdn(replicardn);

            if (nrdn == NULL) {

                if (errortext != NULL) {

                   strcpy(errortext, MSG_NOREPLICANORMRDN);
@@ -1078,7 +1078,7 @@ 

          }

          if (apply_mods) {

              Slapi_Task *empty_task = NULL;

-             return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, empty_task, returntext, "no");

+             return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, empty_task, "no", PR_TRUE, returntext);

          } else

              return LDAP_SUCCESS;

      } else {
@@ -1409,6 +1409,8 @@ 

      const char *force_cleaning;

      const char *base_dn;

      const char *rid_str;

+     const char *orig_val = NULL;

+     PRBool original_task = PR_TRUE;

      int rc = SLAPI_DSE_CALLBACK_OK;

  

      /* allocate new task now */
@@ -1454,6 +1456,11 @@ 

      } else {

          force_cleaning = "no";

      }

+     if ((orig_val = slapi_fetch_attr(e, "replica-original-task", 0)) != NULL) {

+         if (!strcasecmp(orig_val, "0")) {

+             original_task = PR_FALSE;

+         }

+     }

      /*

       *  Check the rid

       */
@@ -1486,7 +1493,7 @@ 

      }

  

      /* clean the RUV's */

-     rc = replica_execute_cleanall_ruv_task(r, rid, task, force_cleaning, returntext);

+     rc = replica_execute_cleanall_ruv_task(r, rid, task, force_cleaning, original_task, returntext);

  

  out:

      if (rc) {
@@ -1509,7 +1516,7 @@ 

   *

   */

  static int

- replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, char *returntext __attribute__((unused)))

+ replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, PRBool original_task, char *returntext __attribute__((unused)))

  {

      struct berval *payload = NULL;

      Slapi_Task *pre_task = NULL; /* this is supposed to be null for logging */
@@ -1603,7 +1610,7 @@ 

      /* It is either a consequence of a direct ADD cleanAllRuv task

       * or modify of the replica to add nsds5task: cleanAllRuv

       */

-     data->original_task = PR_TRUE;

+     data->original_task = original_task;

  

      thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread,

                               (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
@@ -1734,7 +1741,7 @@ 

      /*

       *  Add the cleanallruv task to the repl config - so we can handle restarts

       */

-     add_cleaned_rid(data, csnstr); /* marks config that we started cleaning a rid */

+     add_cleaned_rid(data); /* marks config that we started cleaning a rid */

      cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Cleaning rid (%d)...", data->rid);

      /*

       *  First, wait for the maxcsn to be covered
@@ -2067,7 +2074,7 @@ 

                   "Waiting for all the replicas to finish cleaning...");

  

      csn_as_string(data->maxcsn, PR_FALSE, csnstr);

-     filter = PR_smprintf("(%s=%d:%s:%s:%d)", type_replicaCleanRUV, (int)data->rid, csnstr, data->force, data->original_task ? 1 : 0);

+     filter = PR_smprintf("(%s=%d:%s:%s:%d:%s)", type_replicaCleanRUV, (int)data->rid, csnstr, data->force, data->original_task ? 1 : 0, data->repl_root);

      while (not_all_cleaned && !is_task_aborted(data->rid) && !slapi_is_shutting_down()) {

          agmt_obj = agmtlist_get_first_agreement_for_replica(data->replica);

          if (agmt_obj == NULL) {
@@ -2540,15 +2547,15 @@ 

   *  Add the rid and maxcsn to the repl config (so we can resume after a server restart)

   */

  void

- add_cleaned_rid(cleanruv_data *cleanruv_data, char *maxcsn)

+ add_cleaned_rid(cleanruv_data *cleanruv_data)

  {

      Slapi_PBlock *pb;

      struct berval *vals[2];

      struct berval val;

      LDAPMod *mods[2];

      LDAPMod mod;

-     char data[CSN_STRSIZE + 10];

-     char *dn;

+     char *data = NULL;

+     char *dn = NULL;

      int rc;

      ReplicaId rid;

      Replica *r;
@@ -2558,13 +2565,15 @@ 

      r = cleanruv_data->replica;

      forcing = cleanruv_data->force;

  

-     if (r == NULL || maxcsn == NULL) {

+     if (r == NULL) {

          return;

      }

      /*

       *  Write the rid & maxcsn to the config entry

       */

-     val.bv_len = PR_snprintf(data, sizeof(data), "%d:%s:%s:%d", rid, maxcsn, forcing, cleanruv_data->original_task ? 1 : 0);

+     data = slapi_ch_smprintf("%d:%s:%d:%s",

+             rid, forcing, cleanruv_data->original_task ? 1 : 0,

+             cleanruv_data->repl_root);

      dn = replica_get_dn(r);

      pb = slapi_pblock_new();

      mod.mod_op = LDAP_MOD_ADD | LDAP_MOD_BVALUES;
@@ -2572,6 +2581,7 @@ 

      mod.mod_bvalues = vals;

      vals[0] = &val;

      vals[1] = NULL;

+     val.bv_len = strlen(data);

      val.bv_val = data;

      mods[0] = &mod;

      mods[1] = NULL;
@@ -2585,6 +2595,7 @@ 

                                                         "Failed to update replica config (%d), rid (%d)\n",

                        rc, rid);

      }

+     slapi_ch_free_string(&data);

      slapi_ch_free_string(&dn);

      slapi_pblock_destroy(pb);

  }
@@ -2593,7 +2604,7 @@ 

   *  Add aborted rid and repl root to config in case of a server restart

   */

  void

- add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root)

+ add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task)

  {

      Slapi_PBlock *pb;

      struct berval *vals[2];
@@ -2619,7 +2630,7 @@ 

       */

      dn = replica_get_dn(r);

      pb = slapi_pblock_new();

-     data = PR_smprintf("%d:%s", rid, repl_root);

+     data = PR_smprintf("%d:%s:%s:%d", rid, repl_root, certify_all, original_task ? 1 : 0);

      mod.mod_op = LDAP_MOD_ADD | LDAP_MOD_BVALUES;

      mod.mod_type = (char *)type_replicaAbortCleanRUV;

      mod.mod_bvalues = vals;
@@ -2646,7 +2657,7 @@ 

  }

  

  void

- delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, int skip)

+ delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip)

  {

      Slapi_PBlock *pb;

      LDAPMod *mods[2];
@@ -2674,7 +2685,7 @@ 

      } else {

          /* only remove the config, leave the in-memory rid */

          dn = replica_get_dn(r);

-         data = PR_smprintf("%d:%s", (int)rid, repl_root);

+         data = PR_smprintf("%d:%s:%s:%d", (int)rid, repl_root, certify_all, original_task ? 1 : 0);

  

          mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;

          mod.mod_type = (char *)type_replicaAbortCleanRUV;
@@ -2711,8 +2722,6 @@ 

      Slapi_Entry **entries = NULL;

      LDAPMod *mods[2];

      LDAPMod mod;

-     struct berval *vals[5] = {0, 0, 0, 0, 0}; /* maximum of 4 tasks */

-     struct berval val[5];

      char *iter = NULL;

      char *dn = NULL;

      int i, ii;
@@ -2759,7 +2768,6 @@ 

              for (i = 0; entries[i] != NULL; i++) {

                  char **attr_val = slapi_entry_attr_get_charray(entries[i], type_replicaCleanRUV);

                  char *edn = slapi_entry_get_dn(entries[i]);

-                 int count = 0;

  

                  for (ii = 0; attr_val && attr_val[ii] && i < 5; ii++) {

                      /* make a copy to retain the full value after toking */
@@ -2767,45 +2775,35 @@ 

  

                      rid = atoi(ldap_utf8strtok_r(attr_val[ii], ":", &iter));

                      if (rid == clean_data->rid) {

-                         val[count].bv_len = strlen(aval);

-                         val[count].bv_val = aval;

-                         vals[count] = &val[count];

-                         count++;

-                     } else {

-                         slapi_ch_free_string(&aval);

+                         struct berval *vals[2];

+                         struct berval val[1];

+                         val[0].bv_len = strlen(aval);

+                         val[0].bv_val = aval;

+                         vals[0] = &val[0];

+                         vals[1] = NULL;

+ 

+                         mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;

+                         mod.mod_type = (char *)type_replicaCleanRUV;

+                         mod.mod_bvalues = vals;

+                         mods[0] = &mod;

+                         mods[1] = NULL;

+ 

+                         modpb = slapi_pblock_new();

+                         slapi_modify_internal_set_pb(modpb, edn, mods, NULL, NULL,

+                                                      repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);

+                         slapi_modify_internal_pb(modpb);

+                         slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc);

+                         slapi_pblock_destroy(modpb);

+                         if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) {

+                             cleanruv_log(clean_data->task, clean_data->rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

+                                     "delete_cleaned_rid_config - Failed to remove task data from (%s) error (%d), rid (%d)",

+                                     edn, rc, clean_data->rid);

+                             goto bail;

+                         }

                      }

+                     slapi_ch_free_string(&aval);

                  }

                  slapi_ch_array_free(attr_val);

- 

-                 /*

-                  *  Now delete the attribute

-                  */

-                 vals[4] = NULL;

-                 mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;

-                 mod.mod_type = (char *)type_replicaCleanRUV;

-                 mod.mod_bvalues = vals;

-                 mods[0] = &mod;

-                 mods[1] = NULL;

- 

-                 modpb = slapi_pblock_new();

-                 slapi_modify_internal_set_pb(modpb, edn, mods, NULL, NULL,

-                                              repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);

-                 slapi_modify_internal_pb(modpb);

-                 slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &rc);

-                 slapi_pblock_destroy(modpb);

- 

-                 /* free the attr vals */

-                 for (ii = 0; ii < count; ii++) {

-                     slapi_ch_free_string(&val[ii].bv_val);

-                 }

- 

-                 if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_OBJECT) {

-                     cleanruv_log(clean_data->task, clean_data->rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

-                                  "delete_cleaned_rid_config - Failed to remove task data "

-                                  "from (%s) error (%d), rid (%d)",

-                                  edn, rc, clean_data->rid);

-                     goto bail;

-                 }

              }

          }

      }
@@ -2870,7 +2868,9 @@ 

      Replica *replica;

      ReplicaId rid = -1;

      Object *r;

+     PRBool original_task = PR_TRUE;

      const char *certify_all;

+     const char *orig_val;

      const char *base_dn;

      const char *rid_str;

      char *ridstr = NULL;
@@ -2986,8 +2986,9 @@ 

       *  Stop the cleaning, and delete the rid

       */

      replica = (Replica *)object_get_data(r);

-     add_aborted_rid(rid, replica, (char *)base_dn);

+     add_aborted_rid(rid, replica, (char *)base_dn, (char *)certify_all, original_task);

      stop_ruv_cleaning();

+ 

      /*

       *  Prepare the abort struct and fire off the thread

       */
@@ -2998,6 +2999,11 @@ 

          rc = SLAPI_DSE_CALLBACK_ERROR;

          goto out;

      }

+     if ((orig_val = slapi_fetch_attr(e, "replica-original-task", 0)) != NULL) {

+         if (!strcasecmp(orig_val, "0")) {

+             original_task = PR_FALSE;

+         }

+     }

      data->repl_obj = r; /* released in replica_abort_task_thread() */

      data->replica = replica;

      data->task = task;
@@ -3006,7 +3012,7 @@ 

      data->repl_root = slapi_ch_strdup(base_dn);

      data->sdn = NULL;

      data->certify = slapi_ch_strdup(certify_all);

-     data->original_task = PR_TRUE;

+     data->original_task = original_task;

  

      thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread,

                               (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
@@ -3069,8 +3075,8 @@ 

          }

          if (data->replica == NULL && data->repl_obj) {

              data->replica = (Replica *)object_get_data(data->repl_obj);

+             release_it = 1;

          }

-         release_it = 1;

      }

  

      /*
@@ -3148,11 +3154,11 @@ 

          /*

           *  Clean up the config

           */

-         delete_aborted_rid(data->replica, data->rid, data->repl_root, 1); /* delete just the config, leave rid in memory */

+         delete_aborted_rid(data->replica, data->rid, data->repl_root, data->certify, data->original_task, 1); /* delete just the config, leave rid in memory */

          if (strcasecmp(data->certify, "yes") == 0) {

              check_replicas_are_done_aborting(data);

          }

-         delete_aborted_rid(data->replica, data->rid, data->repl_root, 0); /* remove the in-memory aborted rid */

+         delete_aborted_rid(data->replica, data->rid, data->repl_root, data->certify, data->original_task, 0); /* remove the in-memory aborted rid */

          if (rc == 0) {

              cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully aborted task for rid(%d)", data->rid);

          } else {

@@ -1416,7 +1416,7 @@ 

      /*

       *  Set the aborted rid and stop the cleaning

       */

-     add_aborted_rid(rid, r, repl_root);

+     add_aborted_rid(rid, r, repl_root, data->certify, data->original_task);

      stop_ruv_cleaning();

      /*

       *  Send out the extended ops to the replicas

file modified
+9 -11
@@ -379,17 +379,15 @@ 

          int ttl;

          time_t expire;

  

-         e = get_internal_entry(pb, task->task_dn);

-         if (e == NULL)

-             return;

-         ttl = atoi(slapi_fetch_attr(e, "ttl", DEFAULT_TTL));

-         if (ttl > (24*3600))

-             ttl = (24*3600); /* be reasonable, allow to check task status not longer than one day  */

-         expire = time(NULL) + ttl;

-         task->task_flags |= SLAPI_TASK_DESTROYING;

-         /* queue an event to destroy the state info */

-         slapi_eq_once(destroy_task, (void *)task, expire);

- 

+         if ((e = get_internal_entry(pb, task->task_dn))) {

+             ttl = atoi(slapi_fetch_attr(e, "ttl", DEFAULT_TTL));

+             if (ttl > (24*3600))

+                 ttl = (24*3600); /* be reasonable, allow to check task status not longer than one day  */

+             expire = time(NULL) + ttl;

+             task->task_flags |= SLAPI_TASK_DESTROYING;

+             /* queue an event to destroy the state info */

+             slapi_eq_once(destroy_task, (void *)task, expire);

+         }

          slapi_free_search_results_internal(pb);

          slapi_pblock_destroy(pb);

      }

Bug Description:
If the server was stopped while a CleanAllRUV task was
running the task gets marked in the replica config entry
so it knowns to resume the task at server startup. The
problem is that when it resumed it just fires off the
task thread, and did not create a new Slapi_Task entry.
This makes it impossible to track these tasks that got
resumed.

Fix Description:

              There were a few things wrong with the resume process,
              including it was harded coded to only handle a maximum
              of 4 tasks.  We also were not recording all the required
              information needed to resume the task.

              Now "resume" process can handle an infinite number of
              tasks, and it creates fresh Slapi_Task entries so the
              tasks can be tracked.

CI tested & ASAN approved

https://pagure.io/389-ds-base/issue/50305

rebased onto f632ada822c7ffa6dba630a5c502dd7bdedc2f1f

5 years ago

why do you need the unused param

@lkrispen, this is there because that function is now passed to slapi_eq_once(). Since it unused I guess it could removed from the API, but that would be a different issue.

@lkrispen made this suggestion:

Just thinking about it, wouldn't it be an option that the task entry survives until it is completed and written to the dse.ldif at shutdown, so at startup tasks could be resumed from the task entry not from params in the repl entry ?

I thought this would work and would be a great idea, but sadly we can not use it. As the cleanallruv task is propagated among the replicas, there is only one "task". That task sends special extended operations to all the replicas (not tasks). So if a replica is stopped that only received the extended op (cleanAllRUV op) , then there is no task entry. So there is nothing to resume at start up. So for now we need to keep the current design.

rebased onto d08f7eb

4 years ago

Pull-Request has been merged by mreynolds

4 years ago

389-ds-base is moving from Pagure to Github. This means that new issues and pull requests
will be accepted only in 389-ds-base's github repository.

This pull request has been cloned to Github as issue and is available here:
- https://github.com/389ds/389-ds-base/issues/3366

If you want to continue to work on the PR, please navigate to the github issue,
download the patch from the attachments and file a new pull request.

Thank you for understanding. We apologize for all inconvenience.

Pull-Request has been closed by spichugi

3 years ago