#50539 Issue 50538 - cleanAllRUV task limit is not enforced for replicated tasks
Merged 4 months ago by mreynolds. Opened 4 months ago by mreynolds.
mreynolds/389-ds-base issue50538  into  master

@@ -1,5 +1,5 @@ 

  # --- BEGIN COPYRIGHT BLOCK ---

- # Copyright (C) 2016 Red Hat, Inc.

+ # Copyright (C) 2019 Red Hat, Inc.

  # All rights reserved.

  #

  # License: GPL (version 3 or any later version).

@@ -7,7 +7,6 @@ 

  # --- END COPYRIGHT BLOCK ---

  #

  import threading

- 

  import pytest

  import random

  from lib389 import DirSrv

@@ -821,6 +820,49 @@ 

      assert not m1.detectDisorderlyShutdown()

  

  

+ def test_max_tasks(topology_m4):

+     """Test we can not create more than 64 cleaning tasks

+ 

+     :id: c34d0b40-3c3e-4f53-8656-5e4c2a310a1f

+     :setup: Replication setup with four masters

+     :steps:

+         1. Stop masters 3 & 4

+         2. Create over 64 tasks between m1 and m2

+         3. Check logs to see if (>65) tasks were rejected

+ 

+     :expectedresults:

+         1. Success

+         2. Success

+         3. Success

+     """

+ 

+     # Stop masters 3 & 4

+     m1 = topology_m4.ms["master1"]

+     m2 = topology_m4.ms["master2"]

+     m3 = topology_m4.ms["master3"]

+     m4 = topology_m4.ms["master4"]

+     m3.stop()

+     m4.stop()

+ 

+     # Add over 64 tasks between master1 & 2 to try to exceed the 64 task limit

+     for i in range(1, 64):

+         cruv_task = CleanAllRUVTask(m1)

+         cruv_task.create(properties={

+             'replica-id': str(i),

+             'replica-base-dn': DEFAULT_SUFFIX,

+             'replica-force-cleaning': 'no',  # This forces these tasks to stick around

+         })

+         cruv_task = CleanAllRUVTask(m2)

+         cruv_task.create(properties={

+             'replica-id': "10" + str(i),

+             'replica-base-dn': DEFAULT_SUFFIX,

+             'replica-force-cleaning': 'yes',  # This allows the tasks to propagate

+         })

+ 

+     # Check the errors log for our error message in master 1

+     assert m1.searchErrorsLog('Exceeded maximum number of active CLEANALLRUV tasks')

shouldn't the message be on M2 as well, could you query for active tasks and count them ?

+ 

+ 

  if __name__ == '__main__':

      # Run isolated

      # -s for DEBUG mode

@@ -80,6 +80,8 @@ 

  #define CLEANRUV_FINISHED  "finished"

  #define CLEANRUV_CLEANING  "cleaning"

  #define CLEANRUV_NO_MAXCSN "no maxcsn"

+ #define CLEANALLRUV_ID "CleanAllRUV Task"

+ #define ABORT_CLEANALLRUV_ID "Abort CleanAllRUV Task"

  

  /* DS 5.0 replication protocol error codes */

  #define NSDS50_REPL_REPLICA_READY             0x00  /* Replica ready, go ahead */

@@ -784,6 +786,7 @@ 

  void multimaster_be_state_change(void *handle, char *be_name, int old_be_state, int new_be_state);

  

  #define CLEANRIDSIZ 64 /* maximum number for concurrent CLEANALLRUV tasks */

+ #define CLEANRID_BUFSIZ 128

  

  typedef struct _cleanruv_data

  {

@@ -815,6 +818,8 @@ 

  int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid);

  void add_cleaned_rid(cleanruv_data *data);

  int is_cleaned_rid(ReplicaId rid);

+ int32_t check_and_set_cleanruv_task_count(ReplicaId rid);

+ int32_t check_and_set_abort_cleanruv_task_count(void);

  int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg);

  void replica_cleanallruv_thread_ext(void *arg);

  void stop_ruv_cleaning(void);

@@ -833,8 +838,6 @@ 

  void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...);

  char *replica_cleanallruv_get_local_maxcsn(ReplicaId rid, char *base_dn);

  

- 

- 

  /* replutil.c */

  LDAPControl *create_managedsait_control(void);

  LDAPControl *create_backend_control(Slapi_DN *sdn);

@@ -30,17 +30,18 @@ 

  #define CLEANALLRUV "CLEANALLRUV"

  #define CLEANALLRUVLEN 11

  #define REPLICA_RDN "cn=replica"

- #define CLEANALLRUV_ID "CleanAllRUV Task"

- #define ABORT_CLEANALLRUV_ID "Abort CleanAllRUV Task"

  

  int slapi_log_urp = SLAPI_LOG_REPL;

- static ReplicaId cleaned_rids[CLEANRIDSIZ + 1] = {0};

- static ReplicaId pre_cleaned_rids[CLEANRIDSIZ + 1] = {0};

- static ReplicaId aborted_rids[CLEANRIDSIZ + 1] = {0};

- static Slapi_RWLock *rid_lock = NULL;

- static Slapi_RWLock *abort_rid_lock = NULL;

+ static ReplicaId cleaned_rids[CLEANRID_BUFSIZ] = {0};

+ static ReplicaId pre_cleaned_rids[CLEANRID_BUFSIZ] = {0};

+ static ReplicaId aborted_rids[CLEANRID_BUFSIZ] = {0};

+ static PRLock *rid_lock = NULL;

+ static PRLock *abort_rid_lock = NULL;

  static PRLock *notify_lock = NULL;

  static PRCondVar *notify_cvar = NULL;

+ static PRLock *task_count_lock = NULL;

+ static int32_t clean_task_count = 0;

+ static int32_t abort_task_count = 0;

  

  /* Forward Declartions */

  static int replica_config_add(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *entryAfter, int *returncode, char *returntext, void *arg);

@@ -67,8 +68,6 @@ 

  static int replica_cleanallruv_check_maxcsn(Repl_Agmt *agmt, char *basedn, char *rid_text, char *maxcsn, Slapi_Task *task);

  static int replica_cleanallruv_replica_alive(Repl_Agmt *agmt);

  static int replica_cleanallruv_check_ruv(char *repl_root, Repl_Agmt *ra, char *rid_text, Slapi_Task *task, char *force);

- static int get_cleanruv_task_count(void);

- static int get_abort_cleanruv_task_count(void);

  static int replica_cleanup_task(Object *r, const char *task_name, char *returntext, int apply_mods);

  static int replica_task_done(Replica *replica);

  static void delete_cleaned_rid_config(cleanruv_data *data);

@@ -114,20 +113,27 @@ 

                        PR_GetError());

          return -1;

      }

-     rid_lock = slapi_new_rwlock();

+     rid_lock = PR_NewLock();

      if (rid_lock == NULL) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - "

                                                         "Failed to create rid_lock; NSPR error - %d\n",

                        PR_GetError());

          return -1;

      }

-     abort_rid_lock = slapi_new_rwlock();

+     abort_rid_lock = PR_NewLock();

      if (abort_rid_lock == NULL) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - "

                                                         "Failed to create abort_rid_lock; NSPR error - %d\n",

                        PR_GetError());

          return -1;

      }

+     task_count_lock = PR_NewLock();

+     if (task_count_lock == NULL) {

+         slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - "

+                                                        "Failed to create task_count_lock; NSPR error - %d\n",

+                       PR_GetError());

+         return -1;

+     }

      if ((notify_lock = PR_NewLock()) == NULL) {

          slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - "

                                                         "Failed to create notify lock; NSPR error - %d\n",

@@ -1522,12 +1528,6 @@ 

  

      cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Initiating CleanAllRUV Task...");

  

-     if (get_cleanruv_task_count() >= CLEANRIDSIZ) {

-         /* we are already running the maximum number of tasks */

-         cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

-                      "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ);

-         return LDAP_UNWILLING_TO_PERFORM;

-     }

      /*

       *  Grab the replica

       */

@@ -1579,6 +1579,13 @@ 

          goto fail;

      }

  

+     if (check_and_set_cleanruv_task_count(rid) != LDAP_SUCCESS) {

+         cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

+                      "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ);

+         rc = LDAP_UNWILLING_TO_PERFORM;

+         goto fail;

+     }

+ 

      /*

       *  Launch the cleanallruv thread.  Once all the replicas are cleaned it will release the rid

       */

@@ -1586,6 +1593,9 @@ 

      if (data == NULL) {

          cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, "Failed to allocate cleanruv_data.  Aborting task.");

          rc = -1;

+         PR_Lock(task_count_lock);

+         clean_task_count--;

+         PR_Unlock(task_count_lock);

          goto fail;

      }

      data->repl_obj = r;

@@ -1668,13 +1678,13 @@ 

      int aborted = 0;

      int rc = 0;

  

-     if (!data || slapi_is_shutting_down()) {

-         return; /* no data */

-     }

- 

      /* Increase active thread count to prevent a race condition at server shutdown */

      g_incr_active_threadcnt();

  

+     if (!data || slapi_is_shutting_down()) {

+         goto done;

+     }

+ 

      if (data->task) {

          slapi_task_inc_refcount(data->task);

          slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name,

@@ -1721,16 +1731,13 @@ 

          slapi_task_begin(data->task, 1);

      }

      /*

-      *  Presetting the rid prevents duplicate thread creation, but allows the db and changelog to still

-      *  process updates from the rid.

-      *  set_cleaned_rid() blocks updates, so we don't want to do that... yet unless we are in force mode.

-      *  If we are forcing a clean independent of state of other servers for this RID we can set_cleaned_rid()

+      *  We have already preset this rid, but if we are forcing a clean independent of state

+      *  of other servers for this RID we can set_cleaned_rid()

       */

      if (data->force) {

          set_cleaned_rid(data->rid);

-     } else {

-         preset_cleaned_rid(data->rid);

      }

+ 

      rid_text = slapi_ch_smprintf("%d", data->rid);

      csn_as_string(data->maxcsn, PR_FALSE, csnstr);

      /*

@@ -1900,6 +1907,9 @@ 

      /*

       *  If the replicas are cleaned, release the rid

       */

+     if (slapi_is_shutting_down()) {

+         stop_ruv_cleaning();

+     }

      if (!aborted && !slapi_is_shutting_down()) {

          /*

           * Success - the rid has been cleaned!

@@ -1918,10 +1928,9 @@ 

          } else {

              cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Propagated task does not delete Keep alive entry (%d).", data->rid);

          }

- 

          clean_agmts(data);

          remove_cleaned_rid(data->rid);

-         cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully cleaned rid(%d).", data->rid);

+         cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully cleaned rid(%d)", data->rid);

      } else {

          /*

           *  Shutdown or abort

@@ -1954,6 +1963,10 @@ 

      slapi_ch_free_string(&data->force);

      slapi_ch_free_string(&rid_text);

      slapi_ch_free((void **)&data);

+     /* decrement task count */

+     PR_Lock(task_count_lock);

+     clean_task_count--;

+     PR_Unlock(task_count_lock);

      g_decr_active_threadcnt();

  }

  

@@ -2451,16 +2464,14 @@ 

  int

  is_cleaned_rid(ReplicaId rid)

  {

-     int i;

- 

-     slapi_rwlock_rdlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != 0; i++) {

+     PR_Lock(rid_lock);

+     for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) {

          if (rid == cleaned_rids[i]) {

-             slapi_rwlock_unlock(rid_lock);

+             PR_Unlock(rid_lock);

              return 1;

          }

      }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(rid_lock);

  

      return 0;

  }

@@ -2468,16 +2479,14 @@ 

  int

  is_pre_cleaned_rid(ReplicaId rid)

  {

-     int i;

- 

-     slapi_rwlock_rdlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ && pre_cleaned_rids[i] != 0; i++) {

+     PR_Lock(rid_lock);

+     for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) {

          if (rid == pre_cleaned_rids[i]) {

-             slapi_rwlock_unlock(rid_lock);

+             PR_Unlock(rid_lock);

              return 1;

          }

      }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(rid_lock);

  

      return 0;

  }

@@ -2490,14 +2499,14 @@ 

      if (rid == 0) {

          return 0;

      }

-     slapi_rwlock_rdlock(abort_rid_lock);

-     for (i = 0; i < CLEANRIDSIZ && aborted_rids[i] != 0; i++) {

+     PR_Lock(abort_rid_lock);

+     for (i = 0; i < CLEANRID_BUFSIZ && aborted_rids[i] != 0; i++) {

          if (rid == aborted_rids[i]) {

-             slapi_rwlock_unlock(abort_rid_lock);

+             PR_Unlock(abort_rid_lock);

              return 1;

          }

      }

-     slapi_rwlock_unlock(abort_rid_lock);

+     PR_Unlock(abort_rid_lock);

      return 0;

  }

  

@@ -2506,15 +2515,14 @@ 

  {

      int i;

  

-     slapi_rwlock_wrlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ; i++) {

+     PR_Lock(rid_lock);

+     for (i = 0; i < CLEANRID_BUFSIZ && pre_cleaned_rids[i] != rid; i++) {

          if (pre_cleaned_rids[i] == 0) {

              pre_cleaned_rids[i] = rid;

-             pre_cleaned_rids[i + 1] = 0;

              break;

          }

      }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(rid_lock);

  }

  

  /*

@@ -2527,14 +2535,13 @@ 

  {

      int i;

  

-     slapi_rwlock_wrlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ; i++) {

+     PR_Lock(rid_lock);

+     for (i = 0; i < CLEANRID_BUFSIZ && cleaned_rids[i] != rid; i++) {

          if (cleaned_rids[i] == 0) {

              cleaned_rids[i] = rid;

-             cleaned_rids[i + 1] = 0;

          }

      }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(rid_lock);

  }

  

  /*

@@ -2610,15 +2617,14 @@ 

      int rc;

      int i;

  

-     slapi_rwlock_wrlock(abort_rid_lock);

-     for (i = 0; i < CLEANRIDSIZ; i++) {

+     PR_Lock(abort_rid_lock);

+     for (i = 0; i < CLEANRID_BUFSIZ; i++) {

          if (aborted_rids[i] == 0) {

              aborted_rids[i] = rid;

-             aborted_rids[i + 1] = 0;

              break;

          }

      }

-     slapi_rwlock_unlock(abort_rid_lock);

+     PR_Unlock(abort_rid_lock);

      /*

       *  Write the rid to the config entry

       */

@@ -2661,21 +2667,24 @@ 

      char *data;

      char *dn;

      int rc;

-     int i;

  

      if (r == NULL)

          return;

  

      if (skip) {

          /* skip the deleting of the config, and just remove the in memory rid */

-         slapi_rwlock_wrlock(abort_rid_lock);

-         for (i = 0; i < CLEANRIDSIZ && aborted_rids[i] != rid; i++)

-             ; /* found rid, stop */

-         for (; i < CLEANRIDSIZ; i++) {

-             /* rewrite entire array */

-             aborted_rids[i] = aborted_rids[i + 1];

-         }

-         slapi_rwlock_unlock(abort_rid_lock);

+         ReplicaId new_abort_rids[CLEANRID_BUFSIZ] = {0};

+         int32_t idx = 0;

+ 

+         PR_Lock(abort_rid_lock);

+         for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) {

+             if (aborted_rids[i] != rid) {

+                 new_abort_rids[idx] = aborted_rids[i];

+                 idx++;

+             }

+         }

+         memcpy(aborted_rids, new_abort_rids, sizeof(new_abort_rids));

+         PR_Unlock(abort_rid_lock);

      } else {

          /* only remove the config, leave the in-memory rid */

          dn = replica_get_dn(r);

@@ -2821,27 +2830,31 @@ 

  void

  remove_cleaned_rid(ReplicaId rid)

  {

-     int i;

-     /*

-      *  Remove this rid, and optimize the array

-      */

-     slapi_rwlock_wrlock(rid_lock);

+     ReplicaId new_cleaned_rids[CLEANRID_BUFSIZ] = {0};

+     ReplicaId new_pre_cleaned_rids[CLEANRID_BUFSIZ] = {0};

+     size_t idx = 0;

+ 

+     PR_Lock(rid_lock);

  

-     for (i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != rid; i++)

-         ; /* found rid, stop */

-     for (; i < CLEANRIDSIZ; i++) {

-         /* rewrite entire array */

-         cleaned_rids[i] = cleaned_rids[i + 1];

+     for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) {

+         if (cleaned_rids[i] != rid) {

+             new_cleaned_rids[idx] = cleaned_rids[i];

+             idx++;

+         }

      }

+     memcpy(cleaned_rids, new_cleaned_rids, sizeof(new_cleaned_rids));

+ 

      /* now do the preset cleaned rids */

-     for (i = 0; i < CLEANRIDSIZ && pre_cleaned_rids[i] != rid; i++)

-         ; /* found rid, stop */

-     for (; i < CLEANRIDSIZ; i++) {

-         /* rewrite entire array */

-         pre_cleaned_rids[i] = pre_cleaned_rids[i + 1];

+     idx = 0;

+     for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) {

+         if (pre_cleaned_rids[i] != rid) {

+             new_pre_cleaned_rids[idx] = pre_cleaned_rids[i];

+             idx++;

+         }

      }

+     memcpy(pre_cleaned_rids, new_pre_cleaned_rids, sizeof(new_pre_cleaned_rids));

  

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(rid_lock);

  }

  

  /*

@@ -2871,16 +2884,6 @@ 

      char *ridstr = NULL;

      int rc = SLAPI_DSE_CALLBACK_OK;

  

-     if (get_abort_cleanruv_task_count() >= CLEANRIDSIZ) {

-         /* we are already running the maximum number of tasks */

-         PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE,

-                     "Exceeded maximum number of active ABORT CLEANALLRUV tasks(%d)",

-                     CLEANRIDSIZ);

-         cleanruv_log(task, -1, ABORT_CLEANALLRUV_ID, SLAPI_LOG_ERR, "%s", returntext);

-         *returncode = LDAP_OPERATIONS_ERROR;

-         return SLAPI_DSE_CALLBACK_ERROR;

-     }

- 

      /* allocate new task now */

      task = slapi_new_task(slapi_entry_get_ndn(e));

  

@@ -2965,6 +2968,16 @@ 

           */

          certify_all = "no";

      }

+ 

+     if (check_and_set_abort_cleanruv_task_count() != LDAP_SUCCESS) {

+         /* we are already running the maximum number of tasks */

+         PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE,

+                     "Exceeded maximum number of active ABORT CLEANALLRUV tasks(%d)",

+                     CLEANRIDSIZ);

+         cleanruv_log(task, -1, ABORT_CLEANALLRUV_ID, SLAPI_LOG_ERR, "%s", returntext);

+         *returncode = LDAP_UNWILLING_TO_PERFORM;

+         goto out;

+     }

      /*

       *  Create payload

       */

@@ -3179,6 +3192,9 @@ 

      slapi_ch_free_string(&data->certify);

      slapi_sdn_free(&data->sdn);

      slapi_ch_free((void **)&data);

+     PR_Lock(task_count_lock);

+     abort_task_count--;

+     PR_Unlock(task_count_lock);

      g_decr_active_threadcnt();

  }

  

@@ -3530,36 +3546,43 @@ 

      return rc;

  }

  

- static int

- get_cleanruv_task_count(void)

+ /*

+  * Before starting a cleanAllRUV task make sure there are not

+  * too many task threads already running.  If everything is okay

+  * also pre-set the RID now so rebounding extended ops do not

+  * try to clean it over and over.

+  */

+ int32_t

+ check_and_set_cleanruv_task_count(ReplicaId rid)

  {

-     int i, count = 0;

+     int32_t rc = 0;

  

-     slapi_rwlock_wrlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ; i++) {

-         if (pre_cleaned_rids[i] != 0) {

-             count++;

-         }

+     PR_Lock(task_count_lock);

+     if (clean_task_count >= CLEANRIDSIZ) {

+         rc = -1;

+     } else {

+         clean_task_count++;

+         preset_cleaned_rid(rid);

      }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(task_count_lock);

  

-     return count;

+     return rc;

  }

  

- static int

- get_abort_cleanruv_task_count(void)

+ int32_t

+ check_and_set_abort_cleanruv_task_count(void)

  {

-     int i, count = 0;

+     int32_t rc = 0;

  

-     slapi_rwlock_wrlock(rid_lock);

-     for (i = 0; i < CLEANRIDSIZ; i++) {

-         if (aborted_rids[i] != 0) {

-             count++;

+     PR_Lock(task_count_lock);

+     if (abort_task_count > CLEANRIDSIZ) {

+             rc = -1;

+         } else {

+             abort_task_count++;

          }

-     }

-     slapi_rwlock_unlock(rid_lock);

+     PR_Unlock(task_count_lock);

  

-     return count;

+     return rc;

  }

  

  /*

@@ -1393,6 +1393,12 @@ 

          rc = LDAP_OPERATIONS_ERROR;

          goto out;

      }

+     if (check_and_set_abort_cleanruv_task_count() != LDAP_SUCCESS) {

+         cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

+                      "Exceeded maximum number of active abort CLEANALLRUV tasks(%d)", CLEANRIDSIZ);

+         rc = LDAP_UNWILLING_TO_PERFORM;

+         goto out;

+     }

      /*

       *  Prepare the abort data

       */

@@ -1499,6 +1505,7 @@ 

      if (force == NULL) {

          force = "no";

      }

+ 

      maxcsn = csn_new();

      csn_init_by_string(maxcsn, csnstr);

      /*

@@ -1535,13 +1542,21 @@ 

          goto free_and_return;

      }

  

+     if (check_and_set_cleanruv_task_count((ReplicaId)rid) != LDAP_SUCCESS) {

+         cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,

+                      "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ);

+         rc = LDAP_UNWILLING_TO_PERFORM;

+         goto free_and_return;

+     }

+ 

      if (replica_get_type(r) != REPLICA_TYPE_READONLY) {

          /*

           *  Launch the cleanruv monitoring thread.  Once all the replicas are cleaned it will release the rid

           *

           *  This will also release mtnode_ext->replica

           */

-         slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multimaster_extop_cleanruv - CleanAllRUV Task - Launching cleanAllRUV thread...\n");

+ 

+         cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, "Launching cleanAllRUV thread...\n");

          data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));

          if (data == NULL) {

              slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multimaster_extop_cleanruv - CleanAllRUV Task - Failed to allocate "

@@ -1635,7 +1650,7 @@ 

          ber_printf(resp_bere, "{s}", CLEANRUV_ACCEPTED);

          ber_flatten(resp_bere, &resp_bval);

          slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);

-         slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL);

+         slapi_send_ldap_result(pb, rc, NULL, NULL, 0, NULL);

          /* resp_bere */

          if (NULL != resp_bere) {

              ber_free(resp_bere, 1);

Bug Description:

There is a hard limit of 64 concurrent cleanAllRUV tasks, but this limit is only enforced when creating "new" tasks. It was not enforced when a task was received via an extended operation. There were also race conditions in the existing logic that allowed the array of cleaned rids to get corrupted . This allowed for a very large number of task threads to be created.

Fix Description:

Maintain a new counter to keep track of the number of clean and abort threads to make sure it never over runs the rid array buffers.

relates: https://pagure.io/389-ds-base/issue/50538

rebased onto 425ef509dae561a0cb20d663fb9a2f2c6f45f629

4 months ago

rebased onto 8272d387dd2469ee9db8b7c7752ff616bbcb9a7e

4 months ago

The patch looks good, I'm just a bit confused by the test case: the issue is about not limiting the number of tasks when received via replication ext ops, but if I read the test correctly it is testing the limit when directly creating tasks. Would we need to add eg 50 tasks to two masters and then see what happens ?

The patch looks good, I'm just a bit confused by the test case: the issue is about not limiting the number of tasks when received via replication ext ops, but if I read the test correctly it is testing the limit when directly creating tasks. Would we need to add eg 50 tasks to two masters and then see what happens ?

Yeah the testcase is not reproducing the exact issue I originally reported. That's because I found more general problems with the entire process (and it was after midnight when I was wrapping it up and got tired). I'll change the test case shortly....

ok, you have my ack for the patch

rebased onto 3575730

4 months ago

Fixed another race condition, and improved CI test, please review...

shouldn't the message be on M2 as well, could you query for active tasks and count them ?

shouldn't the message be on M2 as well, could you query for active tasks and count them ?

No, and in fact it was really hard to reproduce this in the first place because the tasks would normally finish too fast. So the tasks I create on M1 are not "force cleaning". So m3 and m4 are stopped. In this case m1 will not send out the cleanAllRUV extended op. So they just hang around doing nothing. On M2 the tasks are set to "force clean" so this allows M2's cleanAllRUV extended op to be sent to other replicas regardless if m3 or m4 are stopped. So only m1 will actually receive more than 64 tasks at one time

thanks for the explanation

thanks for the explanation

Thanks for the review!

Pull-Request has been merged by mreynolds

4 months ago