#49636 Ticket 49569 - ns deadlock with connection management in ds
Closed 3 years ago by spichugi. Opened 6 years ago by firstyear.
firstyear/389-ds-base 49569-ns-deadlock  into  master

@@ -958,9 +958,6 @@ 

  {

      int rc = LDAP_PROTOCOL_ERROR;

  

-     slapi_log_err(SLAPI_LOG_TRACE, "attr_get_value_cmp_fn",

-                   "=> attr_get_value_cmp_fn\n");

- 

      *compare_fn = NULL;

  

      if (attr == NULL) {
@@ -1001,7 +998,6 @@ 

      rc = LDAP_SUCCESS;

  

  done:

-     slapi_log_err(SLAPI_LOG_TRACE, "attr_get_value_cmp_fn", "<= attr_get_value_cmp_fn \n");

      return rc;

  }

  

file modified
+383 -173
@@ -1,6 +1,7 @@ 

  /** BEGIN COPYRIGHT BLOCK

   * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.

   * Copyright (C) 2005 Red Hat, Inc.

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * All rights reserved.

   *

   * License: GPL (version 3 or any later version).
@@ -38,17 +39,8 @@ 

                                    ber_len_t ber_len,

                                    ber_len_t maxbersize);

  

- static PRStack *op_stack;     /* stack of Slapi_Operation * objects so we don't have to malloc/free every time */

- static PRInt32 op_stack_size; /* size of op_stack */

- 

- struct Slapi_op_stack

- {

-     PRStackElem stackelem; /* must be first in struct for PRStack to work */

-     Slapi_Operation *op;

- };

- 

- static void add_work_q(work_q_item *, struct Slapi_op_stack *);

- static work_q_item *get_work_q(struct Slapi_op_stack **);

+ static void add_work_q(work_q_item *, Operation *);

+ static work_q_item *get_work_q(Operation **);

  

  /*

   * We maintain a global work queue of items that have not yet
@@ -58,7 +50,7 @@ 

  {

      PRStackElem stackelem; /* must be first in struct for PRStack to work */

      work_q_item *work_item;

-     struct Slapi_op_stack *op_stack_obj;

+     Operation *op;

      struct Slapi_work_q *next_work_item;

  };

  
@@ -92,7 +84,7 @@ 

  destroy_work_q(struct Slapi_work_q **work_q)

  {

      if (work_q && *work_q) {

-         (*work_q)->op_stack_obj = NULL;

+         (*work_q)->op = NULL;

          (*work_q)->work_item = NULL;

          PR_StackPush(work_q_stack, (PRStackElem *)*work_q);

          PR_AtomicIncrement(&work_q_stack_size);
@@ -102,33 +94,17 @@ 

      }

  }

  

- static struct Slapi_op_stack *

+ static Operation *

  connection_get_operation(void)

  {

-     struct Slapi_op_stack *stack_obj = (struct Slapi_op_stack *)PR_StackPop(op_stack);

-     if (!stack_obj) {

-         stack_obj = (struct Slapi_op_stack *)slapi_ch_calloc(1, sizeof(struct Slapi_op_stack));

-         stack_obj->op = operation_new(plugin_build_operation_action_bitmap(0,

-                                                                            plugin_get_server_plg()));

-     } else {

-         PR_AtomicDecrement(&op_stack_size);

-         if (!stack_obj->op) {

-             stack_obj->op = operation_new(plugin_build_operation_action_bitmap(0,

-                                                                                plugin_get_server_plg()));

-         } else {

-             operation_init(stack_obj->op,

-                            plugin_build_operation_action_bitmap(0, plugin_get_server_plg()));

-         }

-     }

-     return stack_obj;

+     return operation_new(plugin_build_operation_action_bitmap(0, plugin_get_server_plg()));

+ 

  }

  

  static void

- connection_done_operation(Connection *conn, struct Slapi_op_stack *stack_obj)

+ connection_done_operation(Connection *conn, Operation *op)

  {

-     operation_done(&(stack_obj->op), conn);

-     PR_StackPush(op_stack, (PRStackElem *)stack_obj);

-     PR_AtomicIncrement(&op_stack_size);

+     operation_free(&op, conn);

  }

  

  /*
@@ -169,6 +145,9 @@ 

       * soon anyway, so please be patient while I undertake this!

       *

       * - wibrown December 2016.

+      *

+      * It's happening!!!!

+      * William Sep 2018

       */

  }

  
@@ -280,7 +259,7 @@ 

      char *str_unknown = "unknown";

      int in_referral_mode = config_check_referral_mode();

  

-     slapi_log_err(SLAPI_LOG_CONNS, "connection_reset", "new %sconnection on %d\n", pTmp, conn->c_sd);

+     slapi_log_err(SLAPI_LOG_CONNS, "connection_reset", "new %sconnection on fd=%d\n", pTmp, conn->c_sd);

  

      /* bump our count of connections and update SNMP stats */

      conn->c_connid = slapi_counter_increment(num_conns);
@@ -431,7 +410,6 @@ 

  void

  init_op_threads()

  {

-     int i;

      PRErrorCode errorCode;

      int max_threads = config_get_threadnumber();

      /* Initialize the locks and cv */
@@ -453,10 +431,8 @@ 

  

      work_q_stack = PR_CreateStack("connection_work_q");

  

-     op_stack = PR_CreateStack("connection_operation");

- 

      /* start the operation threads */

-     for (i = 0; i < max_threads; i++) {

+     for (size_t i = 0; i < max_threads; i++) {

          PR_SetConcurrency(4);

          if (PR_CreateThread(PR_USER_THREAD,

                              (VFP)(void *)connection_threadmain, NULL,
@@ -709,7 +685,8 @@ 

          if (!release_only && (conn->c_refcnt == 1) && (conn->c_flags & CONN_FLAG_CLOSING)) {

              /* if refcnt == 1 usually means only the active connection list has a ref */

              /* refcnt == 0 means conntable just dropped the last ref */

-             ns_connection_post_io_or_closing(conn);

+             /* ns_connection_post_io_or_closing(conn); */

+             ns_handle_closure_conn_nomutex(conn);

          }

  

          return 0;
@@ -724,12 +701,12 @@ 

  

  /* this function should be called under c_mutex */

  int

- connection_acquire_nolock_ext(Connection *conn, int allow_when_closing)

+ connection_acquire_nolock(Connection *conn)

  {

      /* connection in the closing state can't be acquired */

-     if (!allow_when_closing && (conn->c_flags & CONN_FLAG_CLOSING)) {

+     if (conn->c_flags & CONN_FLAG_CLOSING) {

          /* This may happen while other threads are still working on this connection */

-         slapi_log_err(SLAPI_LOG_ERR, "connection_acquire_nolock_ext",

+         slapi_log_err(SLAPI_LOG_ERR, "connection_acquire_nolock",

                        "conn=%" PRIu64 " fd=%d Attempt to acquire connection in the closing state\n",

                        conn->c_connid, conn->c_sd);

          return -1;
@@ -739,12 +716,6 @@ 

      }

  }

  

- int

- connection_acquire_nolock(Connection *conn)

- {

-     return connection_acquire_nolock_ext(conn, 0);

- }

- 

  /* returns non-0 if connection can be reused and 0 otherwise */

  int

  connection_is_free(Connection *conn, int use_lock)
@@ -945,17 +916,16 @@ 

  void

  connection_make_new_pb(Slapi_PBlock *pb, Connection *conn)

  {

-     struct Slapi_op_stack *stack_obj = NULL;

+     Operation *op = NULL;

      /* we used to malloc/free the pb for each operation - now, just use a local stack pb

       * in connection_threadmain, and just clear it out

       */

      /* *ppb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); */

      /* *ppb = slapi_pblock_new(); */

      slapi_pblock_set(pb, SLAPI_CONNECTION, conn);

-     stack_obj = connection_get_operation();

-     slapi_pblock_set(pb, SLAPI_OPERATION, stack_obj->op);

-     slapi_pblock_set_op_stack_elem(pb, stack_obj);

-     connection_add_operation(conn, stack_obj->op);

+     op = connection_get_operation();

+     slapi_pblock_set(pb, SLAPI_OPERATION, op);

+     connection_add_operation(conn, op);

  }

  

  int
@@ -963,7 +933,7 @@ 

  {

      int ret = CONN_FOUND_WORK_TO_DO;

      work_q_item *wqitem = NULL;

-     struct Slapi_op_stack *op_stack_obj = NULL;

+     Operation *op = NULL;

  

      PR_Lock(work_q_lock);

  
@@ -974,15 +944,14 @@ 

      if (op_shutdown) {

          slapi_log_err(SLAPI_LOG_TRACE, "connection_wait_for_new_work", "shutdown\n");

          ret = CONN_SHUTDOWN;

-     } else if (NULL == (wqitem = get_work_q(&op_stack_obj))) {

+     } else if (NULL == (wqitem = get_work_q(&op))) {

          /* not sure how this can happen */

          slapi_log_err(SLAPI_LOG_TRACE, "connection_wait_for_new_work", "no work to do\n");

          ret = CONN_NOWORK;

      } else {

          /* make new pb */

          slapi_pblock_set(pb, SLAPI_CONNECTION, wqitem);

-         slapi_pblock_set_op_stack_elem(pb, op_stack_obj);

-         slapi_pblock_set(pb, SLAPI_OPERATION, op_stack_obj->op);

+         slapi_pblock_set(pb, SLAPI_OPERATION, op);

      }

  

      PR_Unlock(work_q_lock);
@@ -1118,7 +1087,7 @@ 

  

   */

  int

- connection_read_operation(Connection *conn, Operation *op, ber_tag_t *tag, int *remaining_data)

+ connection_read_operation_nolock(Connection *conn, Operation *op, ber_tag_t *tag, int *remaining_data)

  {

      ber_len_t len = 0;

      int ret = 0;
@@ -1131,7 +1100,6 @@ 

      size_t buffer_data_avail;

      int conn_closed = 0;

  

-     pthread_mutex_lock(&(conn->c_mutex));

      /*

       * if the socket is still valid, get the ber element

       * waiting for us on this connection. timeout is handled
@@ -1139,8 +1107,7 @@ 

       */

      if ((conn->c_sd == SLAPD_INVALID_SOCKET) ||

          (conn->c_flags & CONN_FLAG_CLOSING)) {

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      }

  

      *tag = LBER_DEFAULT;
@@ -1150,8 +1117,7 @@ 

          if (0 != get_next_from_buffer(buffer + conn->c_private->c_buffer_offset,

                                        buffer_data_avail,

                                        &len, tag, op->o_ber, conn)) {

-             ret = CONN_DONE;

-             goto done;

+             return CONN_DONE;

          }

          new_operation = 0;

      }
@@ -1166,8 +1132,7 @@ 

          } else {

              ret = get_next_from_buffer(NULL, 0, &len, tag, op->o_ber, conn);

              if (ret == -1) {

-                 ret = CONN_DONE;

-                 goto done; /* get_next_from_buffer does the disconnect stuff */

+                 return CONN_DONE;

              } else if (ret == 0) {

                  ret = len;

              }
@@ -1179,14 +1144,12 @@ 

                  disconnect_server_nomutex(conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, 0);

                  conn->c_gettingber = 0;

                  signal_listner();

-                 ret = CONN_DONE;

-                 goto done;

+                 return CONN_DONE;

              }

              /* err = PR_GetError(); */

              /* If we would block, we need to poll for a while */

              syserr = PR_GetOSError();

-             if (SLAPD_PR_WOULD_BLOCK_ERROR(err) ||

-                 SLAPD_SYSTEM_WOULD_BLOCK_ERROR(syserr)) {

+             if (SLAPD_PR_WOULD_BLOCK_ERROR(err) || SLAPD_SYSTEM_WOULD_BLOCK_ERROR(syserr)) {

                  struct PRPollDesc pr_pd;

                  PRIntervalTime timeout = PR_MillisecondsToInterval(CONN_TURBO_TIMEOUT_INTERVAL);

                  pr_pd.fd = (PRFileDesc *)conn->c_prfd;
@@ -1200,14 +1163,12 @@ 

                  if (0 == ret) {

                      /* We timed out, should the server shutdown ? */

                      if (op_shutdown) {

-                         ret = CONN_SHUTDOWN;

-                         goto done;

+                         return CONN_SHUTDOWN;

                      }

                      /* We timed out, is this the first read in a PDU ? */

                      if (new_operation) {

                          /* If so, we return */

-                         ret = CONN_TIMEDOUT;

-                         goto done;

+                         return CONN_TIMEDOUT;

                      } else {

                          /* Otherwise we loop, unless we exceeded the ioblock timeout */

                          if (waits_done > ioblocktimeout_waits) {
@@ -1215,8 +1176,7 @@ 

                                            "ioblocktimeout expired on connection %" PRIu64 "\n", conn->c_connid);

                              disconnect_server_nomutex(conn, conn->c_connid, -1,

                                                        SLAPD_DISCONNECT_IO_TIMEOUT, 0);

-                             ret = CONN_DONE;

-                             goto done;

+                             return CONN_DONE;

                          } else {

  

                              /* The turbo mode may cause threads starvation.
@@ -1236,8 +1196,7 @@ 

                                    "PR_Poll for connection %" PRIu64 " returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror(err));

                      /* If this happens we should close the connection */

                      disconnect_server_nomutex(conn, conn->c_connid, -1, err, syserr);

-                     ret = CONN_DONE;

-                     goto done;

+                     return CONN_DONE;

                  }

                  slapi_log_err(SLAPI_LOG_CONNS,

                                "connection_read_operation", "connection %" PRIu64 " waited %d times for read to be ready\n", conn->c_connid, waits_done);
@@ -1248,8 +1207,7 @@ 

                                "PR_Recv for connection %" PRIu64 " returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror(err));

                  /* If this happens we should close the connection */

                  disconnect_server_nomutex(conn, conn->c_connid, -1, err, syserr);

-                 ret = CONN_DONE;

-                 goto done;

+                 return CONN_DONE;

              }

          } else {

              /* We read some data off the network, do something with it */
@@ -1260,8 +1218,7 @@ 

                  if (get_next_from_buffer(buffer,

                                           conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset,

                                           &len, tag, op->o_ber, conn) != 0) {

-                     ret = CONN_DONE;

-                     goto done;

+                     return CONN_DONE;

                  }

              }

              slapi_log_err(SLAPI_LOG_CONNS,
@@ -1277,8 +1234,7 @@ 

          *remaining_data = 1;

      } else if (conn_closed) {

          /* connection closed */

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      }

  

      if (*tag != LDAP_TAG_MESSAGE) {
@@ -1290,8 +1246,7 @@ 

                        conn->c_connid, *tag, LDAP_TAG_MESSAGE);

          disconnect_server_nomutex(conn, conn->c_connid, -1,

                                    SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO);

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      }

  

      if ((*tag = ber_get_int(op->o_ber, &msgid)) != LDAP_TAG_MSGID) {
@@ -1299,13 +1254,11 @@ 

          slapi_log_err(SLAPI_LOG_ERR,

                        "connection_read_operation", "conn=%" PRIu64 " unable to read tag for incoming request\n", conn->c_connid);

          disconnect_server_nomutex(conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO);

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      }

      if (is_ber_too_big(conn, len)) {

          disconnect_server_nomutex(conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_TOO_BIG, 0);

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      }

      op->o_msgid = msgid;

  
@@ -1317,36 +1270,32 @@ 

          slapi_log_err(SLAPI_LOG_ERR,

                        "connection_read_operation", "conn=%" PRIu64 " ber_peek_tag returns 0x%lx\n", conn->c_connid, *tag);

          disconnect_server_nomutex(conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_PEEK, EPROTO);

-         ret = CONN_DONE;

-         goto done;

+         return CONN_DONE;

      default:

          break;

      }

      op->o_tag = *tag;

  done:

-     pthread_mutex_unlock(&(conn->c_mutex));

      return ret;

  }

  

- void

- connection_make_readable(Connection *conn)

- {

+ int

+ connection_read_operation(Connection *conn, Operation *op, ber_tag_t *tag, int *remaining_data) {

+     int ret = 0;

      pthread_mutex_lock(&(conn->c_mutex));

-     conn->c_gettingber = 0;

+     ret = connection_read_operation_nolock(conn, op, tag, remaining_data);

      pthread_mutex_unlock(&(conn->c_mutex));

-     signal_listner();

+     return ret;

  }

  

- void

+ static void

  connection_make_readable_nolock(Connection *conn)

  {

      conn->c_gettingber = 0;

-     slapi_log_err(SLAPI_LOG_CONNS, "connection_make_readable_nolock", "making readable conn %" PRIu64 " fd=%d\n",

-                   conn->c_connid, conn->c_sd);

-     if (!(conn->c_flags & CONN_FLAG_CLOSING)) {

-         /* if the connection is closing, try the close in connection_release_nolock */

-         ns_connection_post_io_or_closing(conn);

-     }

+     slapi_log_err(SLAPI_LOG_CONNS, "connection_make_readable_nolock",

+         "making readable conn %" PRIu64 " fd=%d\n",

+         conn->c_connid, conn->c_sd);

+     signal_listner();

  }

  

  /*
@@ -1810,9 +1759,9 @@ 

                  pthread_mutex_unlock(&(conn->c_mutex));

              }

              /* ps_add makes a shallow copy of the pb - so we

-                  * can't free it or init it here - just set operation to NULL.

-                  * ps_send_results will call connection_remove_operation_ext to free it

-                  */

+              * can't free it or init it here - just set operation to NULL.

+              * ps_send_results will call connection_remove_operation_ext to free it

+              */

              slapi_pblock_set(pb, SLAPI_OPERATION, NULL);

              slapi_pblock_init(pb);

          } else {
@@ -1835,15 +1784,17 @@ 

                  more_data = conn_buffered_data_avail_nolock(conn, &conn_closed) ? 1 : 0;

              }

              if (!more_data) {

+                 /*

+                  * If is_timedout is 1, then thread_turbo_flag is set to 0, so we

+                  * move the check for timeout to the outer and then continue. This resolves

+                  * a coverity issue for dead code detection.

+                  */

+                 if (replication_connection || (is_timedout == 1)) {

+                     connection_make_readable_nolock(conn);

+                     need_wakeup = 1;

+                 }

+ 

                  if (!thread_turbo_flag) {

-                     /*

-                      * Don't release the connection now.

-                      * But note down what to do.

-                      */

-                     if (replication_connection || (1 == is_timedout)) {

-                         connection_make_readable_nolock(conn);

-                         need_wakeup = 1;

-                     }

                      if (!need_wakeup) {

                          if (conn->c_threadnumber == maxthreads) {

                              need_wakeup = 1;
@@ -1864,10 +1815,6 @@ 

                      if (need_wakeup) {

                          signal_listner();

                      }

-                 } else if (1 == is_timedout) {

-                     /* covscan reports this code is unreachable  (2019/6/4) */

-                     connection_make_readable_nolock(conn);

-                     signal_listner();

                  }

              }

              pthread_mutex_unlock(&(conn->c_mutex));
@@ -1875,11 +1822,188 @@ 

      } /* while (1) */

  }

  

+ void

+ ns_connection_cleanup_nolock(Connection *c, Operation *op, Slapi_PBlock *pb) {

+     /* Destroy the pblock (with conditions if psearch) */

+     if (op->o_flags & OP_FLAG_PS) {

+         /*

+          * If this operation begins a psearch, then we detach the operation

+          * from the pblock. When we call pb_destroy, this prevents the op being

+          * destroyed. If we don't we trigger use-after-free!

+          *

+          * ps_send_results will call connection_remove_operation_ext to free it

+          * later for us.

+          */

+         slapi_pblock_set(pb, SLAPI_OPERATION, NULL);

+     } else {

+         connection_remove_operation_ext(pb, c, op);

+     }

+ 

+     /* Remove the refcnt from ns_connection_activity */

+     connection_release_nolock_ext(c, 1);

+     /* Finally kill the pb */

+     slapi_pblock_destroy(pb);

+ }

+ 

+ void

+ ns_connection_do_work(Connection *conn, Operation *op, Slapi_PBlock *pb) {

+ 

+     /* are we in referral-only mode? */

+     if (config_check_referral_mode() && op->o_tag != LDAP_REQ_UNBIND) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_do_work",

+                       "conn %" PRIu64 " for fd=%d - sending referral\n",

+                       conn->c_connid, conn->c_sd);

+         referral_mode_reply(pb);

+     } else if (connection_need_new_password(conn, op, pb)) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_do_work",

+                       "conn %" PRIu64 " for fd=%d - new password required\n",

+                       conn->c_connid, conn->c_sd);

+ 

+         /* check if new password is required */

+         return;

+     } else if (conn->c_flags & CONN_FLAG_IMPORT) {

+         /* if this is a bulk import, only "add" and "import done"

+          * are allowed */

+         if ((op->o_tag != LDAP_REQ_ADD) && (op->o_tag != LDAP_REQ_EXTENDED)) {

+             /* no cookie for you. */

+             slapi_log_err(SLAPI_LOG_ERR, "ns_connection_do_work",

+                           "conn %" PRIu64 " for fd=%d - attempted operation %lu from within bulk import\n",

+                           conn->c_connid, conn->c_sd, op->o_tag);

+             slapi_send_ldap_result(pb, LDAP_PROTOCOL_ERROR, NULL, NULL, 0, NULL);

+         }

+     } else {

+         /*

+          * Call the do_<operation> function to process this request.

+          */

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_do_work",

+                       "conn %" PRIu64 " for fd=%d - connection_dispatch_operation\n",

+                       conn->c_connid, conn->c_sd);

+         connection_dispatch_operation(conn, op, pb);

+     }

+ }

+ 

+ void

+ ns_handle_work_done(struct ns_job_t *job) {

+     Slapi_PBlock *pb = (Slapi_PBlock *)ns_job_get_data(job);

+     PR_ASSERT(pb);

+     Connection *c = NULL;

+     Operation *op = NULL;

+     slapi_pblock_get(pb, SLAPI_CONNECTION, &c);

+     slapi_pblock_get(pb, SLAPI_OPERATION, &op);

+     PR_ASSERT(c);

+     PR_ASSERT(op);

+ 

+     pthread_mutex_lock(&(c->c_mutex));

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_work_done",

+                   "conn %" PRIu64 " for fd=%d - removing async operation references\n",

+                   c->c_connid, c->c_sd);

+     ns_connection_cleanup_nolock(c, op, pb);

+     pthread_mutex_unlock(&(c->c_mutex));

+ }

+ 

+ void

+ ns_handle_work(struct ns_job_t *job) {

+     /* Actually do something with the operation! */

+     /* All the IO should be done, just do the op, then write the result. */

+     Slapi_PBlock *pb = (Slapi_PBlock *)ns_job_get_data(job);

+     PR_ASSERT(pb);

+     Connection *c = NULL;

+     Operation *op = NULL;

+     slapi_pblock_get(pb, SLAPI_CONNECTION, &c);

+     slapi_pblock_get(pb, SLAPI_OPERATION, &op);

+     PR_ASSERT(c);

+     PR_ASSERT(op);

+     /*

+      * Do we have the resources for the operation?

+      */

+ 

+     pthread_mutex_lock(&(c->c_mutex));

+     int32_t maxthreads = config_get_maxthreadsperconn();

+     if (c->c_threadnumber >= maxthreads) {

+         c->c_maxthreadsblocked++;

+         /* Requeue for a future attempt */

+         ns_job_rearm(job);

+         slapi_log_err(SLAPI_LOG_WARNING, "ns_handle_work",

+                       "conn %" PRIu64 " for fd=%d - maxthreads %"PRId32" of %"PRId32" per conn limit reached, requeuing\n",

+                       c->c_connid, c->c_sd, c->c_threadnumber, maxthreads);

+         pthread_mutex_unlock(&(c->c_mutex));

+         return;

+     } else {

+         c->c_threadnumber += 1;

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_work",

+                       "conn %" PRIu64 " for fd=%d - maxthreads %"PRId32" of %"PRId32" per conn, starting operation ...\n",

+                       c->c_connid, c->c_sd, c->c_threadnumber, maxthreads);

+ 

+         /* Some stolen code for stats of thread counting */

+         if (c->c_threadnumber == maxthreads) {

+             c->c_maxthreadscount++;

+             slapi_counter_increment(max_threads_count);

+             slapi_counter_increment(conns_in_maxthreads);

+             slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsConnectionsInMaxThreads);

+         }

+ 

+         pthread_mutex_unlock(&(c->c_mutex));

+     }

+ 

+     /*

+      ********************

+      * DO THE OPERATION *

+      ********************

+      */

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_work",

+                   "conn %" PRIu64 " for fd=%d - started operation\n",

+                   c->c_connid, c->c_sd);

+     ns_connection_do_work(c, op, pb);

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_work",

+                   "conn %" PRIu64 " for fd=%d - ended operation\n",

+                   c->c_connid, c->c_sd);

+ 

+     /*

+      * WARNING!!! It is UNSAFE to call the direct ns close functions

+      * for the connection here. Use disconnect_server_nomutex instead.

+      */

+ 

+     /* The thread is complete */

+     pthread_mutex_lock(&(c->c_mutex));

+     c->c_threadnumber -= 1;

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_work",

+                   "conn %" PRIu64 " for fd=%d - maxthreads %"PRId32" of %"PRId32" per conn, at end of operation\n",

+                   c->c_connid, c->c_sd, c->c_threadnumber, maxthreads);

+     pthread_mutex_unlock(&(c->c_mutex));

+ 

+     /* Destroy this worker at last - this frees everything! */

+     ns_job_done(job);

+ }

+ 

+ /*

+  * The caller here still has c mutex held.

+  */

+ void

+ ns_handle_add_work(Connection *c, Slapi_PBlock *pb) {

+     /* Add a job to async handle the operation itself. */

+     ns_result_t job_result = ns_add_job(c->c_tp, NS_JOB_THREAD, ns_handle_work, ns_handle_work_done, pb, NULL);

+ 

+     if (job_result != NS_SUCCESS) {

+         if (job_result == NS_SHUTDOWN) {

+             slapi_log_err(SLAPI_LOG_ERR, "ns_handle_add_work",

+                           "conn %" PRIu64 " for fd=%d - async operation failed to be added to queue as server is shuttdng down\n",

+                           c->c_connid, c->c_sd);

+         } else {

+             slapi_log_err(SLAPI_LOG_ERR, "ns_handle_add_work",

+                           "conn %" PRIu64 " for fd=%d - async operation failed to be added to queue error %d\n",

+                           c->c_connid, c->c_sd, job_result);

+         }

+     } else {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_add_work",

+                       "conn %" PRIu64 " for fd=%d - async operation added to work queue\n",

+                       c->c_connid, c->c_sd);

+     }

+ }

+ 

  /* thread need to hold conn->c_mutex before calling this function */

- int

- connection_activity(Connection *conn, int maxthreads)

- {

-     struct Slapi_op_stack *op_stack_obj;

+ static Operation *

+ connection_activity_ext(Connection *conn, int maxthreads) {

+     Operation *op = NULL;

  

      if (connection_acquire_nolock(conn) == -1) {

          slapi_log_err(SLAPI_LOG_CONNS,
@@ -1887,12 +2011,19 @@ 

                        conn->c_connid, conn->c_sd);

          /* XXX how to handle this error? */

          /* MAB: 25 Jan 01: let's return on error and pray this won't leak */

-         return (-1);

+         return NULL;

      }

  

      /* set these here so setup_pr_read_pds will not add this conn back to the poll array */

      conn->c_gettingber = 1;

-     conn->c_threadnumber++;

+     /*

+      * With nunc stans we count threads differently. As a result

+      * If we see maxthreads == 0 here, we skip the ++. Remove this hack job

+      * when we gut the old conn code.

+      */

+     if (maxthreads > 0) {

+         conn->c_threadnumber++;

+     }

      if (conn->c_threadnumber == maxthreads) {

          conn->c_flags |= CONN_FLAG_MAX_THREADS;

          conn->c_maxthreadscount++;
@@ -1901,24 +2032,128 @@ 

          slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsConnectionsInMaxThreads);

          slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsMaxThreadsHits);

      }

-     op_stack_obj = connection_get_operation();

-     connection_add_operation(conn, op_stack_obj->op);

-     /* Add conn to the end of the work queue.  */

-     /* have to do this last - add_work_q will signal waiters in connection_wait_for_new_work */

-     add_work_q((work_q_item *)conn, op_stack_obj);

+ 

+     op = connection_get_operation();

+     connection_add_operation(conn, op);

  

      if (!config_check_referral_mode()) {

          slapi_counter_increment(ops_initiated);

          slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps);

      }

-     return 0;

+     return op;

+ }

+ 

+ /*

+  * Called as from the ns_handle_pr_read_ready callback. That function

+  * holds the NS job lock then the conn lock, so we can just use nomutex

+  * functions.

+  *

+  * This guarantees we are the only IO on the conn at the time.

+  *

+  * It also helpfully seperates an IO event thread from a worker doing

+  * a search. This just sets up the worker ready to go. This means most

+  * connections can have 1:N IO:Workers in operation, without rely on

+  * weird locking tricks.

+  */

+ int32_t

+ ns_connection_activity(Connection *conn, int32_t maxthreads) {

+     /* THIS ACQUIRES THE CONNECTION */

+     /* We added one to the refcnt!!! Remember to remove it! */

+     Operation *op = connection_activity_ext(conn, maxthreads);

+     if (op == NULL) {

+         return -1;

+     }

+ 

+     ber_tag_t tag = 0;

+     int32_t more_data = 0;

+     int32_t result = 0;

+     /* Actually read a PDU here and get it ready .... */

+     int cret = connection_read_operation_nolock(conn, op, &tag, &more_data);

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_activity",

+                   "conn %" PRIu64 " - cret %d\n", conn->c_connid, cret);

+     switch (cret) {

+     case CONN_FOUND_WORK_TO_DO:

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_activity",

+                       "conn %" PRIu64 " read operation successfully - more_data %d "

+                       "ops_initiated %d refcnt %d flags %d\n",

+                       conn->c_connid, more_data,

+                       conn->c_opsinitiated, conn->c_refcnt, conn->c_flags);

+         /* Create the pb and set items into it. */

+         Slapi_PBlock *pb = slapi_pblock_new();

+         slapi_pblock_set(pb, SLAPI_CONNECTION, conn);

+         slapi_pblock_set(pb, SLAPI_OPERATION, op);

+         /* IF REPLICATION || UNBIND RUN IT NOW IN THIS THREAD */

+         if (tag == LDAP_REQ_UNBIND || conn->c_isreplication_session) {

+             /*

+              * This won't deadlock anything because this won't spawn

+              * new threads, and we hold job lock AND the conn lock.

+              * As both are monitors, we are pretty safe here.

+              */

+             ns_connection_do_work(conn, op, pb);

+             /* Now clean up after ourselves. */

+             ns_connection_cleanup_nolock(conn, op, pb);

+             /* Tell our caller not to rearm after an unbind */

+             if (tag == LDAP_REQ_UNBIND) {

+                 result = -4;

+             }

+ 

+         } else {

+             /*

+              * schedule for async dispatch instead.

+              * We don't need to worry about max threads here, because the

+              * worker will just reset the job if it's too busy.

+              */

+             ns_handle_add_work(conn, pb);

+         }

+         break;

+     case CONN_DONE:

+     case CONN_TIMEDOUT:

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_activity",

+                       "conn %" PRIu64 " read not ready due to %d - more_data %d "

+                       "ops_initiated %d refcnt %d flags %d\n",

+                       conn->c_connid, cret, more_data,

+                       conn->c_opsinitiated, conn->c_refcnt, conn->c_flags);

+         connection_release_nolock_ext(conn, 1);

+         /* Free the allocated operation. */

+         connection_remove_operation(conn, op);

+         connection_done_operation(conn, op);

+         result = -2;

+         break;

+     default:

+         slapi_log_err(SLAPI_LOG_ERR, "ns_connection_activity",

+                       "conn %" PRIu64 " read operation failed UNKNOWN %d - more_data %d "

+                       "ops_initiated %d refcnt %d flags %d\n",

+                       conn->c_connid, cret, more_data,

+                       conn->c_opsinitiated, conn->c_refcnt, conn->c_flags);

+         connection_release_nolock_ext(conn, 1);

+         /* Free the allocated operation. */

+         connection_remove_operation(conn, op);

+         connection_done_operation(conn, op);

+         result = -3;

+         break;

+     }

+ 

+     return result;

+ }

+ 

+ int32_t

+ connection_activity(Connection *conn, int maxthreads)

+ {

+     Operation *op = connection_activity_ext(conn, maxthreads);

+     /* Add conn to the end of the work queue.  */

+     /* have to do this last - add_work_q will signal waiters in connection_wait_for_new_work */

+     if (op != NULL) {

+         add_work_q((work_q_item *)conn, op);

+         return 0;

+     }

+     return -1;

  }

  

  /* add_work_q():  will add a work_q_item to the end of the global work queue. The work queue

      is implemented as a single link list. */

  

  static void

- add_work_q(work_q_item *wqitem, struct Slapi_op_stack *op_stack_obj)

+ add_work_q(work_q_item *wqitem, Operation *op)

  {

      struct Slapi_work_q *new_work_q = NULL;

  
@@ -1926,7 +2161,7 @@ 

  

      new_work_q = create_work_q();

      new_work_q->work_item = wqitem;

-     new_work_q->op_stack_obj = op_stack_obj;

+     new_work_q->op = op;

      new_work_q->next_work_item = NULL;

  

      PR_Lock(work_q_lock);
@@ -1950,7 +2185,7 @@ 

      with the work_q_lock held */

  

  static work_q_item *

- get_work_q(struct Slapi_op_stack **op_stack_obj)

+ get_work_q(Operation **op)

  {

      struct Slapi_work_q *tmp = NULL;

      work_q_item *wqitem;
@@ -1968,7 +2203,7 @@ 

      head_work_q = tmp->next_work_item;

  

      wqitem = tmp->work_item;

-     *op_stack_obj = tmp->op_stack_obj;

+     *op = tmp->op;

      PR_AtomicDecrement(&work_q_size); /* decrement q size */

      /* Free the memory used by the item found. */

      destroy_work_q(&tmp);
@@ -1986,8 +2221,8 @@ 

  op_thread_cleanup()

  {

      slapi_log_err(SLAPI_LOG_INFO, "op_thread_cleanup",

-                   "slapd shutting down - signaling operation threads - op stack size %d max work q size %d max work q stack size %d\n",

-                   op_stack_size, work_q_size_max, work_q_stack_size_max);

+                   "slapd shutting down - signaling operation threads - max work q size %d max work q stack size %d\n",

+                   work_q_size_max, work_q_stack_size_max);

  

      PR_AtomicIncrement(&op_shutdown);

      PR_Lock(work_q_lock);
@@ -1999,32 +2234,23 @@ 

  void

  connection_post_shutdown_cleanup()

  {

-     struct Slapi_op_stack *stack_obj;

+     Operation *op = NULL;

      int stack_cnt = 0;

      struct Slapi_work_q *work_q;

      int work_cnt = 0;

  

      while ((work_q = (struct Slapi_work_q *)PR_StackPop(work_q_stack))) {

          Connection *conn = (Connection *)work_q->work_item;

-         stack_obj = work_q->op_stack_obj;

-         if (stack_obj) {

-             if (conn) {

-                 connection_remove_operation(conn, stack_obj->op);

-             }

-             connection_done_operation(conn, stack_obj);

+         op = work_q->op;

+         if (op && conn) {

+             connection_remove_operation(conn, op);

+             connection_done_operation(conn, op);

          }

          slapi_ch_free((void **)&work_q);

          work_cnt++;

      }

      PR_DestroyStack(work_q_stack);

      work_q_stack = NULL;

-     while ((stack_obj = (struct Slapi_op_stack *)PR_StackPop(op_stack))) {

-         operation_free(&stack_obj->op, NULL);

-         slapi_ch_free((void **)&stack_obj);

-         stack_cnt++;

-     }

-     PR_DestroyStack(op_stack);

-     op_stack = NULL;

      slapi_log_err(SLAPI_LOG_INFO, "connection_post_shutdown_cleanup",

                    "slapd shutting down - freed %d work q stack objects - freed %d op stack objects\n",

                    work_cnt, stack_cnt);
@@ -2075,8 +2301,7 @@ 

  connection_remove_operation_ext(Slapi_PBlock *pb, Connection *conn, Operation *op)

  {

      connection_remove_operation(conn, op);

-     void *op_stack_elem = slapi_pblock_get_op_stack_elem(pb);

-     connection_done_operation(conn, op_stack_elem);

+     connection_done_operation(conn, op);

      slapi_pblock_set(pb, SLAPI_OPERATION, NULL);

      slapi_pblock_init(pb);

  }
@@ -2239,7 +2464,7 @@ 

   */

  

  void

- disconnect_server_nomutex_ext(Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error, int schedule_closure_job)

+ disconnect_server_nomutex(Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error)

  {

      if ((conn->c_sd != SLAPD_INVALID_SOCKET &&

           conn->c_connid == opconnid) &&
@@ -2247,15 +2472,6 @@ 

          slapi_log_err(SLAPI_LOG_CONNS, "disconnect_server_nomutex_ext", "Setting conn %" PRIu64 " fd=%d "

                                                                          "to be disconnected: reason %d\n",

                        conn->c_connid, conn->c_sd, reason);

-         /*

-          * PR_Close must be called before anything else is done because

-          * of NSPR problem on NT which requires that the socket on which

-          * I/O timed out is closed before any other I/O operation is

-          * attempted by the thread.

-          * WARNING :  As of today the current code does not fulfill the

-          * requirements above.

-          */

- 

          /* Mark that the socket should be closed on this connection.

           * We don't want to actually close the socket here, because

           * the listener thread could be PR_Polling over it right now.
@@ -2316,9 +2532,9 @@ 

                  }

              }

          }

-         if (schedule_closure_job) {

-             ns_connection_post_io_or_closing(conn); /* make sure event loop wakes up and closes this conn */

-         }

+ 

+         /* This checks NS enable for us :) */

+         ns_handle_closure_conn_nomutex(conn);

  

      } else {

          slapi_log_err(SLAPI_LOG_CONNS, "disconnect_server_nomutex_ext", "Not setting conn %d to be disconnected: %s\n",
@@ -2328,12 +2544,6 @@ 

  }

  

  void

- disconnect_server_nomutex(Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error)

- {

-     disconnect_server_nomutex_ext(conn, opconnid, opid, reason, error, 1);

- }

- 

- void

  connection_abandon_operations(Connection *c)

  {

      Operation *op;

file modified
+204 -247
@@ -1,6 +1,7 @@ 

  /** BEGIN COPYRIGHT BLOCK

   * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.

   * Copyright (C) 2005 Red Hat, Inc.

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * All rights reserved.

   *

   * License: GPL (version 3 or any later version).
@@ -159,14 +160,6 @@ 

  static int write_pid_file(void);

  static int init_shutdown_detect(void);

  

- #define NS_HANDLER_NEW_CONNECTION   0

- #define NS_HANDLER_READ_CONNECTION  1

- #define NS_HANDLER_CLOSE_CONNECTION 2

- static ns_job_func_t ns_handlers[] = {

-     ns_handle_new_connection,

-     ns_handle_pr_read_ready,

-     ns_handle_closure

- };

  /* Globals which are used to store the sockets between

   * calls to daemon_pre_setuid_init() and the daemon thread

   * creation. */
@@ -903,7 +896,9 @@ 

          exit(1);

      }

  

-     init_op_threads();

+     if (enable_nunc_stans != 1) {

+         init_op_threads();

+     }

  

      /*

       *  If we are monitoring disk space, then create the mutex, the cvar,
@@ -1005,7 +1000,7 @@ 

          for (size_t ii = 0; ii < listeners; ++ii) {

              listener_idxs[ii].ct = the_connection_table; /* to pass to handle_new_connection */

              ns_result_t result = ns_add_io_job(tp, listener_idxs[ii].listenfd, NS_JOB_ACCEPT | NS_JOB_PERSIST | NS_JOB_PRESERVE_FD,

-                                                ns_handlers[NS_HANDLER_NEW_CONNECTION], &listener_idxs[ii], &(listener_idxs[ii].ns_job));

+                                                ns_handle_new_connection, &listener_idxs[ii], &(listener_idxs[ii].ns_job));

              if (result != NS_SUCCESS) {

                  slapi_log_err(SLAPI_LOG_CRIT, "slapd_daemon", "ns_add_io_job failed to create add acceptor %d\n", result);

              }
@@ -1025,11 +1020,14 @@ 

  #endif

  

      if (enable_nunc_stans) {

+         /* We have to indicate we have active threads here */

+         g_incr_active_threadcnt();

          if (ns_thrpool_wait(tp)) {

              slapi_log_err(SLAPI_LOG_ERR,

                            "slapd-daemon", "ns_thrpool_wait failed errno %d (%s)\n", errno,

                            slapd_system_strerror(errno));

          }

+         g_decr_active_threadcnt();

  

          /* we have exited from ns_thrpool_wait. This means we are shutting down! */

          /* Please see https://firstyear.fedorapeople.org/nunc-stans/md_docs_job-safety.html */
@@ -1125,7 +1123,9 @@ 

  #endif

      }

  

-     op_thread_cleanup();

+     if (!enable_nunc_stans) {

+         op_thread_cleanup();

+     }

      housekeeping_stop(); /* Run this after op_thread_cleanup() logged sth */

      disk_monitoring_stop();

  
@@ -1139,7 +1139,13 @@ 

       * NOTE: We do this after we stop psearch, because there could

       * be a race between flagging the psearch done, and users still

       * try to send on the connection. Similar with op_threads.

+      *

+      * With full NS integration, this queues all the disconnect jobs

+      * and because of the design of NS shutdown they ALL will be

+      * serviced before the thread shutdown jobs are. How good is

+      * that!

       */

+ 

      connection_table_disconnect_all(the_connection_table);

  

      /*
@@ -1241,7 +1247,15 @@ 

  

      be_cleanupall();

      plugin_dependency_freeall();

-     connection_post_shutdown_cleanup();

+ 

+     /*

+      * With nunc-stans this isn't needed, because with NS

+      * we close down everything and free it all during

+      * ct_disconnect_all. As a result, this isn't needed.

+      */

+     if (!enable_nunc_stans) {

+         connection_post_shutdown_cleanup();

+     }

      slapi_log_err(SLAPI_LOG_TRACE, "slapd_daemon", "slapd shutting down - backends closed down\n");

      referrals_free();

      schema_destroy_dse_lock();
@@ -1577,25 +1591,25 @@ 

      }

  }

  

- #define CONN_NEEDS_CLOSING(c) (c->c_flags & CONN_FLAG_CLOSING) || (c->c_sd == SLAPD_INVALID_SOCKET)

- /* Used internally by ns_handle_closure and ns_handle_pr_read_ready.

-  * Returns 0 if the connection was successfully closed, or 1 otherwise.

-  * Must be called with the c->c_mutex locked.

-  */

- static int

- ns_handle_closure_nomutex(Connection *c)

- {

-     int rc = 0;

-     PR_ASSERT(c->c_refcnt > 0); /* one for the conn active list, plus possible other threads */

-     PR_ASSERT(CONN_NEEDS_CLOSING(c));

-     if (connection_table_move_connection_out_of_active_list(c->c_ct, c)) {

-         /* not closed - another thread still has a ref */

-         rc = 1;

-         /* reschedule closure job */

-         ns_connection_post_io_or_closing(c);

-     }

-     return rc;

+ static void

+ ns_handle_conn_done(struct ns_job_t *job) {

+     /* This comes FROM c_job, so no need to guard. */

+     Connection *c = (Connection *)ns_job_get_data(job);

+     pthread_mutex_lock(&(c->c_mutex));

+     /* Is this assert valid? During shutdown it may not be ... */

+     PR_ASSERT(c->c_refcnt == 2);

+     /* Forcefully set the rc to 1 */

+     c->c_refcnt = 1;

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_conn_done",

+                   "conn %" PRIu64 " for fd=%d - destroying connection\n",

+                   c->c_connid, c->c_sd);

+     pthread_mutex_unlock(&(c->c_mutex));

+ 

+     int32_t ct_move_res = connection_table_move_connection_out_of_active_list(c->c_ct, c);

+     PR_ASSERT(ct_move_res == 0);

  }

+ 

+ 

  /* This function is called when the connection has been marked

   * as closing and needs to be cleaned up.  It will keep trying

   * and re-arming itself until there are no references.
@@ -1603,192 +1617,115 @@ 

  static void

  ns_handle_closure(struct ns_job_t *job)

  {

+     /*

+      * This is not the job from the conneciton, but an

+      * async shut down job! The job you need to mark as

+      * done is seperate!

+      */

      Connection *c = (Connection *)ns_job_get_data(job);

-     int do_yield = 0;

  

+     /* take a guard here ... */

+     struct ns_job_guard_t *guard = ns_job_lock(job);

      pthread_mutex_lock(&(c->c_mutex));

-     /* Assert we really have the right job state. */

-     PR_ASSERT(job == c->c_job);

  

-     connection_release_nolock_ext(c, 1); /* release ref acquired for event framework */

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_closure",

+                   "conn %" PRIu64 " for fd=%d - start async close task\n",

+                   c->c_connid, c->c_sd);

+ 

      PR_ASSERT(c->c_ns_close_jobs == 1);  /* should be exactly 1 active close job - this one */

-     c->c_ns_close_jobs--;                /* this job is processing closure */

-     /* Because handle closure will add a new job, we need to detach our current one. */

-     c->c_job = NULL;

-     do_yield = ns_handle_closure_nomutex(c);

-     pthread_mutex_unlock(&(c->c_mutex));

-     /* Remove this task now. */

-     ns_job_done(job);

-     if (do_yield) {

-         /* closure not done - another reference still outstanding */

-         /* yield thread after unlocking conn mutex */

-         PR_Sleep(PR_INTERVAL_NO_WAIT); /* yield to allow other thread to release conn */

+ 

+     /* Should be 1 rc from event fw, one from closure */

+     PR_ASSERT(c->c_refcnt > 1);

+ 

+     if (c->c_refcnt == 2) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_closure",

+                       "conn %" PRIu64 " for fd=%d - async close task success\n",

+                       c->c_connid, c->c_sd);

+         /* Queue the conn job for destruction ... */

+         ns_job_unlock(guard);

+         ns_job_done(c->c_job);

+         /* We are done, lets escape! */

+         ns_job_done(job);

+     } else {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_closure",

+                       "conn %" PRIu64 " for fd=%d - connection still in use - waiting ...\n",

+                       c->c_connid, c->c_sd);

+         /* Try waiting again for RC to drop .... */

+         ns_job_rearm(job);

+ 

+         ns_job_unlock(guard);

      }

-     return;

+ 

+     /* All good! Finally remove the job and our disarm. */

+     pthread_mutex_unlock(&(c->c_mutex));

  }

  

- /**

-  * Schedule more I/O for this connection, or make sure that it

-  * is closed in the event loop.

-  * 

-  * caller must hold c_mutex

+ /* Used internally by ns_handle_closure and ns_handle_pr_read_ready.

+  * Returns 0 if the connection was successfully closed, or 1 otherwise.

+  * Must be called with the c->c_mutex locked.

   */

  void

- ns_connection_post_io_or_closing(Connection *conn)

+ ns_handle_closure_conn_nomutex(Connection *c)

  {

-     struct timeval tv;

- 

-     if (!enable_nunc_stans) {

+     if (enable_nunc_stans == 0) {

          return;

      }

  

-     /*

-      * A job was already scheduled.

-      * Check if it is the appropriate one

-      */

-     if (conn->c_job != NULL) {

-         if (connection_is_free(conn, 0)) {

-             PRStatus shutdown_status;

- 

-             /* The connection being freed,

-              * It means that ns_handle_closure already completed and the connection

-              * is no longer on the active list.

-              * The scheduled job is useless and scheduling a new one as well

-              */

-             shutdown_status = ns_job_done(conn->c_job);

-             if (shutdown_status != PR_SUCCESS) {

-                 slapi_log_err(SLAPI_LOG_CRIT, "ns_connection_post_io_or_closing", "Failed cancel a job on a freed connection %d !\n", conn->c_sd);

-             }

-             conn->c_job = NULL;

-             return;

-         }

-         if (CONN_NEEDS_CLOSING(conn)) {

-             if (ns_job_is_func(conn->c_job, ns_handlers[NS_HANDLER_CLOSE_CONNECTION])) {

-                 /* Due to the closing state we would schedule a ns_handle_closure

-                  * but one is already registered.

-                  * Just return;

-                  */

-                 slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_post_io_or_closing", "Already ns_handle_closure "

-                         "job in progress on conn %" PRIu64 " for fd=%d\n",

-                         conn->c_connid, conn->c_sd);

-                 return;

-             } else {

-                 /* Due to the closing state we would schedule a ns_handle_closure

-                  * but a different handler is registered. Stop it and schedule (below) ns_handle_closure

-                  */

-                 ns_job_done(conn->c_job);

-                 conn->c_job = NULL;

-             }

-         } else {

-             /* Here the connection is still active => ignore the call and return */

-             if (ns_job_is_func(conn->c_job, ns_handlers[NS_HANDLER_READ_CONNECTION])) {

-                 /* Connection is still active and a read_ready is already scheduled

-                  * Likely a consequence of async operations

-                  * Just let the current read_ready do its job

-                  */

-                 slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_post_io_or_closing", "Already ns_handle_pr_read_ready "

-                                                                                "job in progress on conn %" PRIu64 " for fd=%d\n",

-                           conn->c_connid, conn->c_sd);

-             } else {

-                 /* Weird situation where the connection is not flagged closing but ns_handle_closure

-                  * is scheduled.

-                  * We should not try to read it anymore

-                  */

-                 PR_ASSERT(ns_job_is_func(conn->c_job, ns_handlers[NS_HANDLER_CLOSE_CONNECTION]));

-             }

-             return;

-         }

-     }

- 

-     /* At this point conn->c_job is NULL

-      * Either it was null when the function was called

-      * Or we cleared it (+ns_job_done) if the wrong (according

-      * to the connection state) handler was scheduled

-      *

-      * Now we need to determine which handler to schedule

-      */

+     /* Check if a close job already exists? */

+     if (c->c_ns_close_jobs == 0) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_closure_conn_nomutex",

+                       "conn %" PRIu64 " for fd=%d - creating async close task\n",

+                       c->c_connid, c->c_sd);

+         /*

+          * This is our first loop around, lets free the rc

+          * and then go from there! Remember this is the rc

+          * from the event framework! Workers must clear their

+          * own refcounts!

+          */

+         c->c_ns_close_jobs = 1;

+         /*

+          * Before we mark closing we TAKE a reference. We are going

+          * to aim to have TWO references so that ns_handle_closure

+          * will spin on waiting for c_refcnt == 2, then we can call

+          * conn_release, and do ns_job_done on the c_job. Then the

+          * c_job will be guaranteed to be only refererant, we can

+          * force RC to 1, then trigger move from ct.

+          */

+         c->c_refcnt++;

+         /*

+          * Guarantee the closing flag is here, this stops new refcounts

+          * being added.

+          */

+         c->c_flags |= CONN_FLAG_CLOSING;

  

-     if (CONN_NEEDS_CLOSING(conn)) {

-         /* there should only ever be 0 or 1 active closure jobs */

-         PR_ASSERT((conn->c_ns_close_jobs == 0) || (conn->c_ns_close_jobs == 1));

-         if (conn->c_ns_close_jobs) {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_post_io_or_closing", "Already a close "

-                                                                                "job in progress on conn %" PRIu64 " for fd=%d\n",

-                           conn->c_connid, conn->c_sd);

-             return;

-         } else {

-             conn->c_ns_close_jobs++;                                                      /* now 1 active closure job */

-             connection_acquire_nolock_ext(conn, 1 /* allow acquire even when closing */); /* event framework now has a reference */

-             /* Close the job asynchronously. Why? */

-             ns_result_t job_result = ns_add_job(conn->c_tp, NS_JOB_TIMER, ns_handlers[NS_HANDLER_CLOSE_CONNECTION], conn, &(conn->c_job));

-             if (job_result != NS_SUCCESS) {

-                 if (job_result == NS_SHUTDOWN) {

-                     slapi_log_err(SLAPI_LOG_INFO, "ns_connection_post_io_or_closing", "post closure job "

-                                                                                       "for conn %" PRIu64 " for fd=%d failed to be added to event queue as server is shutting down\n",

-                                   conn->c_connid, conn->c_sd);

-                 } else {

-                     slapi_log_err(SLAPI_LOG_ERR, "ns_connection_post_io_or_closing", "post closure job "

-                                                                                      "for conn %" PRIu64 " for fd=%d failed to be added to event queue %d\n",

-                                   conn->c_connid, conn->c_sd, job_result);

-                 }

-             } else {

-                 slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_post_io_or_closing", "post closure job "

-                                                                                    "for conn %" PRIu64 " for fd=%d\n",

-                               conn->c_connid, conn->c_sd);

-             }

-         }

-     } else {

-         /* process event normally - wait for I/O until idletimeout */

-         /* With nunc-stans there is a quirk. When we have idleTimeout of -1

-          * which is set on some IPA bind dns for infinite, this causes libevent

-          * to *instantly* timeout. So if we detect < 0, we set 0 to this timeout, to

-          * catch all possible times that an admin could set.

+         /*

+          * Making this a thread job means anyone can close it, not just event thread. Prevent

+          * a deadlock with this one weird trick!

           */

-         if (conn->c_idletimeout < 0) {

-             tv.tv_sec = 0;

-         } else {

-             tv.tv_sec = conn->c_idletimeout;

-         }

-         tv.tv_usec = 0;

- #ifdef DEBUG

-         PR_ASSERT(0 == connection_acquire_nolock(conn));

- #else

-         if (connection_acquire_nolock(conn) != 0) { /* event framework now has a reference */

+         ns_result_t job_result = ns_add_job(c->c_tp, NS_JOB_THREAD, ns_handle_closure, NULL, c, NULL);

+         if (job_result == NS_SHUTDOWN) {

              /*

-              * This has already been logged as an error in ./ldap/servers/slapd/connection.c

-              * The error occurs when we get a connection in a closing state.

-              * For now we return, but there is probably a better way to handle the error case.

+              * Okay, we are shutting down. That means no more events

+              * AND we won't process any jobs. BUT ns_job_done can now

+              * destroy connections that are ARMED. So let's fake our

+              * conn environment and just destroy this.

+              *

+              * In theory because shutdown drains all the jobs, we should be

+              * at rc 2, but we can't rely on that.

               */

-             return;

-         }

- #endif

-         ns_result_t job_result = ns_add_io_timeout_job(conn->c_tp, conn->c_prfd, &tv,

-                                                        NS_JOB_READ | NS_JOB_PRESERVE_FD,

-                                                        ns_handlers[NS_HANDLER_READ_CONNECTION], conn, &(conn->c_job));

-         if (job_result != NS_SUCCESS) {

-             if (job_result == NS_SHUTDOWN) {

-                 slapi_log_err(SLAPI_LOG_INFO, "ns_connection_post_io_or_closing", "post I/O job for "

-                                                                                   "conn %" PRIu64 " for fd=%d failed to be added to event queue as server is shutting down\n",

-                               conn->c_connid, conn->c_sd);

-             } else {

-                 slapi_log_err(SLAPI_LOG_ERR, "ns_connection_post_io_or_closing", "post I/O job for "

-                                                                                  "conn %" PRIu64 " for fd=%d failed to be added to event queue %d\n",

-                               conn->c_connid, conn->c_sd, job_result);

-             }

-         } else {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_connection_post_io_or_closing", "post I/O job for "

-                                                                                "conn %" PRIu64 " for fd=%d added to event queue\n",

-                           conn->c_connid, conn->c_sd);

+             PR_ASSERT(c->c_refcnt == 2);

+             ns_job_done(c->c_job);

+             /* WARNING. DO NOT ACCESS c-> after this point!!!  */

          }

+     } else {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_closure_conn_nomutex",

+                       "conn %" PRIu64 " for fd=%d - async close task already created\n",

+                       c->c_connid, c->c_sd);

      }

      return;

  }

  

- /* This function must be called without the thread flag, in the

-  * event loop.  This function may free the connection.  This can

-  * only be done in the event loop thread.

-  */

- void

+ static void

  ns_handle_pr_read_ready(struct ns_job_t *job)

  {

      Connection *c = (Connection *)ns_job_get_data(job);
@@ -1797,70 +1734,63 @@ 

      /* Assert we really have the right job state. */

      PR_ASSERT(job == c->c_job);

  

-     /* On all code paths we remove the job, so set it null now */

-     c->c_job = NULL;

- 

-     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "activity on conn %" PRIu64 " for fd=%d\n",

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "conn %" PRIu64 " for fd=%d - activity found\n",

                    c->c_connid, c->c_sd);

-     /* if we were called due to some i/o event, see what the state of the socket is */

-     if (slapi_is_loglevel_set(SLAPI_LOG_CONNS) && !NS_JOB_IS_TIMER(ns_job_get_output_type(job)) && c && c->c_sd) {

-         /* check socket state */

-         char buf[1];

-         ssize_t rc = recv(c->c_sd, buf, sizeof(buf), MSG_PEEK);

-         if (!rc) {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "socket is closed conn"

-                                                                       " %" PRIu64 " for fd=%d\n",

-                           c->c_connid, c->c_sd);

-         } else if (rc > 0) {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "socket read data available"

-                                                                       " for conn %" PRIu64 " for fd=%d\n",

-                           c->c_connid, c->c_sd);

-         } else if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "socket has no data available"

-                                                                       " conn %" PRIu64 " for fd=%d\n",

-                           c->c_connid, c->c_sd);

-         } else {

-             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "socket has error [%d] "

-                                                                       "conn %" PRIu64 " for fd=%d\n",

-                           errno, c->c_connid, c->c_sd);

-         }

-     }

-     connection_release_nolock_ext(c, 1); /* release ref acquired when job was added */

-     if (CONN_NEEDS_CLOSING(c)) {

-         ns_handle_closure_nomutex(c);

+     /* Now work out what to do with our event. */

+     if ((c->c_flags & CONN_FLAG_CLOSING) || (c->c_sd == SLAPD_INVALID_SOCKET)) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready",

+                       "conn %" PRIu64 " for fd=%d - flag closing or invalid socket\n",

+                       c->c_connid, c->c_sd);

+         /* We need to close. Don't rearm ourself! */

+         ns_handle_closure_conn_nomutex(c);

+     } else if (NS_JOB_IS_TIMER(ns_job_get_output_type(job))) {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready",

+                       "conn %" PRIu64 " for fd=%d - socket timedout due to inactivity\n",

+                       c->c_connid, c->c_sd);

          /* We shouldn't need the c_idletimeout check here because of how libevent works.

           * consider testing this and removing it oneday.

           */

-     } else if (NS_JOB_IS_TIMER(ns_job_get_output_type(job))) {

          if (c->c_idletimeout > 0) {

              /* idle timeout */

-             disconnect_server_nomutex_ext(c, c->c_connid, -1,

-                                           SLAPD_DISCONNECT_IDLE_TIMEOUT, EAGAIN,

-                                           0 /* do not schedule closure, do it next */);

+             disconnect_server_nomutex(c, c->c_connid, -1, SLAPD_DISCONNECT_IDLE_TIMEOUT, EAGAIN);

          } else {

              slapi_log_err(SLAPI_LOG_WARNING, "ns_handle_pr_read_ready", "Received idletime out with c->c_idletimeout as 0. Ignoring.\n");

          }

-         ns_handle_closure_nomutex(c);

-     } else if ((connection_activity(c, c->c_max_threads_per_conn)) == -1) {

-         /* This might happen as a result of

-          * trying to acquire a closing connection

-          */

-         slapi_log_err(SLAPI_LOG_ERR, "ns_handle_pr_read_ready", "connection_activity: abandoning"

-                                                                 " conn %" PRIu64 " as fd=%d is already closing\n",

-                       c->c_connid, c->c_sd);

-         /* The call disconnect_server should do nothing,

-          * as the connection c should be already set to CLOSING */

-         disconnect_server_nomutex_ext(c, c->c_connid, -1,

-                                       SLAPD_DISCONNECT_POLL, EPIPE,

-                                       0 /* do not schedule closure, do it next */);

-         ns_handle_closure_nomutex(c);

      } else {

-         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready", "queued conn %" PRIu64 " for fd=%d\n",

-                       c->c_connid, c->c_sd);

+         /*

+          * Connection_activity actually works out if we have work to do.

+          * It will perform the read from the socket (if possible), and then

+          * if there is a complete and available PDU queues the work. Then if we

+          * suceed, we just rearm the IO event (with a new timeout).

+          */

+         /*

+          * We pass 0 here to ns_connection_activity to prevent it from

+          * taking the maxthreads path, as we count maxthreads differently

+          * with nunc-stans.

+          */

+         int32_t act_res = ns_connection_activity(c, 0);

+         if (act_res == 0) {

+             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_pr_read_ready",

+                           "conn %" PRIu64 " for fd=%d - socket requeued for new events\n",

+                           c->c_connid, c->c_sd);

+ 

+             /* Now reset it for reading (or timeout ....). */

+             ns_job_rearm(c->c_job);

+         } else {

+             /*

+              * This might happen as a result of

+              * trying to acquire a closing connection

+              */

+             slapi_log_err(SLAPI_LOG_WARNING, "ns_handle_pr_read_ready",

+                           "conn %" PRIu64 " for fd=%d - socket is already closing\n",

+                           c->c_connid, c->c_sd);

+             /* The call disconnect_server should do nothing,

+              * as the connection c should be already set to CLOSING */

+             disconnect_server_nomutex(c, c->c_connid, -1, SLAPD_DISCONNECT_POLL, EPIPE);

+         }

      }

      /* Since we call done on the job, we need to remove it here. */

      pthread_mutex_unlock(&(c->c_mutex));

-     ns_job_done(job);

      return;

  }

  
@@ -2429,6 +2359,7 @@ 

  static void

  ns_handle_new_connection(struct ns_job_t *job)

  {

+     slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_new_connection", "Starting new connection process\n");

      int rc;

      Connection *c = NULL;

      listener_info *li = (listener_info *)ns_job_get_data(job);
@@ -2467,9 +2398,35 @@ 

       * that poll() was avoided, even at the expense of putting this new fd back

       * in nunc-stans to poll for read ready.

       */

-     pthread_mutex_lock(&(c->c_mutex));

-     ns_connection_post_io_or_closing(c);

-     pthread_mutex_unlock(&(c->c_mutex));

+     struct timeval tv = {0};

+     uint64_t c_connid = c->c_connid;

+     int c_sd = c->c_sd;

+ 

+     if (c->c_idletimeout < 0) {

+         tv.tv_sec = 0;

+     } else {

+         tv.tv_sec = c->c_idletimeout;

+     }

+     /* After this point job/conn could be freed! */

+     ns_result_t job_result = ns_add_io_timeout_job(c->c_tp, c->c_prfd, &tv,

+                                                    NS_JOB_READ | NS_JOB_PRESERVE_FD | NS_JOB_THREAD,

+                                                    ns_handle_pr_read_ready, ns_handle_conn_done, c, &(c->c_job));

+     if (job_result != NS_SUCCESS) {

+         if (job_result == NS_SHUTDOWN) {

+             slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_new_connection",

+                 "post I/O job for conn %" PRIu64 " for fd=%d failed to be added to event queue as server is shutting down\n",

+                 c_connid, c_sd);

+         } else {

+             slapi_log_err(SLAPI_LOG_ERR, "ns_handle_new_connection",

+                 "post I/O job for conn %" PRIu64 " for fd=%d failed to be added to event queue %d\n",

+                 c_connid, c_sd, job_result);

+         }

+     } else {

+         slapi_log_err(SLAPI_LOG_CONNS, "ns_handle_new_connection",

+             "conn %" PRIu64 " for fd=%d - added to IO event queue\n",

+             c_connid, c_sd);

+     }

+ 

      return;

  }

  

file modified
+2
@@ -1,6 +1,7 @@ 

  /** BEGIN COPYRIGHT BLOCK

   * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.

   * Copyright (C) 2005 Red Hat, Inc.

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * All rights reserved.

   *

   * License: GPL (version 3 or any later version).
@@ -58,6 +59,7 @@ 

   */

  void connection_abandon_operations(Connection *conn);

  int connection_activity(Connection *conn, int maxthreads);

+ int32_t ns_connection_activity(Connection *conn, int32_t maxthreads);

  void init_op_threads(void);

  int connection_new_private(Connection *conn);

  void connection_remove_operation(Connection *conn, Operation *op);

@@ -1743,7 +1743,11 @@ 

      cfg->maxbersize = SLAPD_DEFAULT_MAXBERSIZE;

      cfg->logging_backend = slapi_ch_strdup(SLAPD_INIT_LOGGING_BACKEND_INTERNAL);

      cfg->rootdn = slapi_ch_strdup(SLAPD_DEFAULT_DIRECTORY_MANAGER);

+ #ifdef DEBUG

+     init_enable_nunc_stans = cfg->enable_nunc_stans = LDAP_ON;

+ #else

      init_enable_nunc_stans = cfg->enable_nunc_stans = LDAP_OFF;

+ #endif

  #if defined(LINUX)

      init_malloc_mxfast = cfg->malloc_mxfast = DEFAULT_MALLOC_UNSET;

      init_malloc_trim_threshold = cfg->malloc_trim_threshold = DEFAULT_MALLOC_UNSET;

@@ -1,6 +1,7 @@ 

  /** BEGIN COPYRIGHT BLOCK

   * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.

   * Copyright (C) 2005 Red Hat, Inc.

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * All rights reserved.

   *

   * License: GPL (version 3 or any later version).
@@ -1019,7 +1020,7 @@ 

  int send_ldapv3_referral(Slapi_PBlock *pb, struct berval **urls);

  int set_db_default_result_handlers(Slapi_PBlock *pb);

  void disconnect_server_nomutex(Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error);

- void disconnect_server_nomutex_ext(Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error, int schedule_closure_job);

+ void ns_handle_closure_conn_nomutex(Connection *c);

  long g_get_current_conn_count(void);

  void g_increment_current_conn_count(void);

  void g_decrement_current_conn_count(void);
@@ -1483,9 +1484,6 @@ 

  #endif

  void slapd_wait4child(int);

  

- void ns_handle_pr_read_ready(struct ns_job_t *job);

- void ns_connection_post_io_or_closing(Connection *conn);

- 

  /*

   * main.c

   */

file modified
+8 -8
@@ -2,6 +2,7 @@ 

   * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.

   * Copyright (C) 2018 Red Hat, Inc.

   * Copyright (C) 2009 Hewlett-Packard Development Company, L.P.

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * All rights reserved.

   *

   * Contributors:
@@ -1927,14 +1928,13 @@ 

          }

      }

  

-     if (g_get_active_threadcnt() == 0) {

-         /*

-          * If the server is starting up the thread count will be zero, so

-          * we should not proceed, because not all the backends have been

-          * initialized yet.

-          */

-         return NULL;

-     }

+     /*

+      * In the past we would check g_get_active_threadcnt() here to see if we

+      * could actually generate and work with a password policy, but there was

+      * no condition where it was actually valid - the pwdpolicy can always

+      * be provided, and used, and the number of active workers has no bearing

+      * on that.

+      */

  

      if (pb) {

          slapi_pblock_get(pb, SLAPI_OPERATION_TYPE, &optype);

@@ -1,5 +1,6 @@ 

  /* --- BEGIN COPYRIGHT BLOCK ---

   * Copyright (C) 2015  Red Hat

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * see files 'COPYING' and 'COPYING.openssl' for use and warranty

   * information

   *
@@ -111,6 +112,17 @@ 

   */

  struct ns_job_t;

  

+ /** \struct ns_job_guard_t;

+  * A mutex guard for a ns_job_t.

+  *

+  * When taking a lock on the ns_job_t, to ensure that we don't leak

+  * the mutex causing a deadlock, we return a guard type. If this type

+  * leaks we can easily identify it with tools like LSAN.

+  *

+  * \sa ns_job_lock(), ns_job_unlock()

+  */

+ struct ns_job_guard_t;

+ 

  /**

   * The job callback function type

   *
@@ -620,6 +632,7 @@ 

                                    struct timeval *tv,

                                    ns_job_type_t job_type,

                                    ns_job_func_t func,

+                                   ns_job_func_t done_func,

                                    void *data,

                                    struct ns_job_t **job);

  
@@ -675,7 +688,12 @@ 

   *          to shutdown.  It will return PR_FAILURE in that case.

   * \sa ns_job_t, ns_job_get_data, NS_JOB_NONE, NS_JOB_THREAD

   */

- ns_result_t ns_add_job(ns_thrpool_t *tp, ns_job_type_t job_type, ns_job_func_t func, void *data, struct ns_job_t **job);

+ ns_result_t ns_add_job(ns_thrpool_t *tp,

+                        ns_job_type_t job_type,

+                        ns_job_func_t func,

+                        ns_job_func_t done_func,

+                        void *data,

+                        struct ns_job_t **job);

  

  /**

   * Allows the callback to access the file descriptor for an I/O job
@@ -959,7 +977,21 @@ 

   */

  ns_result_t ns_job_rearm(struct ns_job_t *job);

  

- int

- ns_job_is_func(struct ns_job_t *job, ns_job_func_t func);

+ /**

+  * Given an NS job, take the lock and return a guard.

+  *

+  * You must unlock the guard with ns_job_unlock.

+  *

+  * \param job The job to re-arm

+  * \retval guard A guard with the lock held.

+  */

+ struct ns_job_guard_t * ns_job_lock(struct ns_job_t *job);

+ 

+ /**

+  * Given a guard, unlock the related job for consumption in other threads.

+  *

+  * \param guard The guard to unlock.

+  */

+ void ns_job_unlock(struct ns_job_guard_t *guard);

  

  #endif /* NS_THRPOOL_H */

@@ -1,5 +1,6 @@ 

  /* --- BEGIN COPYRIGHT BLOCK ---

   * Copyright (C) 2015  Red Hat

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * see files 'COPYING' and 'COPYING.openssl' for use and warranty

   * information

   *
@@ -103,6 +104,11 @@ 

      ns_job_func_t done_cb;

  } ns_job_t;

  

+ typedef struct ns_job_guard_t

+ {

+     pthread_mutex_t *monitor;

+ } ns_job_guard_t;

+ 

  typedef void (*ns_event_fw_accept_cb_t)(PRFileDesc *fd, ns_job_t *job);

  typedef void (*ns_event_fw_read_cb_t)(PRFileDesc *fd, ns_job_t *job);

  typedef void (*ns_event_fw_write_cb_t)(PRFileDesc *fd, ns_job_t *job);

file modified
+118 -42
@@ -1,5 +1,6 @@ 

  /* --- BEGIN COPYRIGHT BLOCK ---

   * Copyright (C) 2015  Red Hat

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * see files 'COPYING' and 'COPYING.openssl' for use and warranty

   * information

   *
@@ -218,6 +219,23 @@ 

  #ifdef DEBUG

      ns_log(LOG_DEBUG, "internal_ns_job_done %x state %d moving to NS_JOB_DELETED\n", job, job->state);

  #endif

+ 

+     /*

+      * Let our call back perform our needed work

+      * This has to be before the state transition

+      * so that ns_job_get_data works.

+      *

+      * One could argue that until this ends, this is

+      * still in the 'needs delete' stage as we are

+      * not done yet.

+      */

+     if (job->done_cb != NULL) {

+ #ifdef DEBUG

+         ns_log(LOG_DEBUG, "internal_ns_job_done %x executing job->done_cb ...\n", job);

+ #endif

+         job->done_cb(job);

+     }

+ 

      /* In theory, due to the monitor placement, this should never be able to be seen by any other thread ... */

      job->state = NS_JOB_DELETED;

  
@@ -235,10 +253,6 @@ 

          PR_Close(job->fd);

      } /* else application is responsible for fd */

  

-     if (job->done_cb != NULL) {

-         job->done_cb(job);

-     }

- 

      pthread_mutex_unlock(&(job->monitor));

      pthread_mutex_destroy(&(job->monitor));

      pthread_cond_destroy(&(job->notify));
@@ -262,12 +276,13 @@ 

  

      if (NS_JOB_IS_IO(job->job_type) || NS_JOB_IS_TIMER(job->job_type) || NS_JOB_IS_SIGNAL(job->job_type)) {

          event_q_notify(job);

+         pthread_mutex_unlock(&(job->monitor));

      } else {

          /* if this is a non event task, just queue it on the work q */

          /* Prevents an un-necessary queue / dequeue to the event_q */

+         pthread_mutex_unlock(&(job->monitor));

          work_q_notify(job);

      }

-     pthread_mutex_unlock(&(job->monitor));

  }

  

  static void
@@ -282,20 +297,27 @@ 

       */

      PR_ASSERT(job);

      pthread_mutex_lock(&(job->monitor));

+     /*

+      * This is how we allow queued jobs to be disarmed!

+      * any other job->state just moves to whatever next

+      * transition is required.

+      */

+     if (job->state == NS_JOB_ARMED) {

  #ifdef DEBUG

-     ns_log(LOG_DEBUG, "work_job_execute %x state %d moving to NS_JOB_RUNNING\n", job, job->state);

+         ns_log(LOG_DEBUG, "work_job_execute %x state %d moving to NS_JOB_RUNNING\n", job, job->state);

  #endif

-     job->state = NS_JOB_RUNNING;

-     /* Do the work. */

-     PR_ASSERT(job->func);

-     job->func(job);

+         job->state = NS_JOB_RUNNING;

+         /* Do the work. */

+         PR_ASSERT(job->func);

+         job->func(job);

  

-     /* Only if !threaded job, and persistent, we automatically tell us to rearm */

-     if (!NS_JOB_IS_THREAD(job->job_type) && NS_JOB_IS_PERSIST(job->job_type) && job->state == NS_JOB_RUNNING) {

+         /* Only if !threaded job, and persistent, we automatically tell us to rearm */

+         if (!NS_JOB_IS_THREAD(job->job_type) && NS_JOB_IS_PERSIST(job->job_type) && job->state == NS_JOB_RUNNING) {

  #ifdef DEBUG

-         ns_log(LOG_DEBUG, "work_job_execute PERSIST and RUNNING, remarking %x as NS_JOB_NEEDS_ARM\n", job);

+             ns_log(LOG_DEBUG, "work_job_execute PERSIST and RUNNING, remarking %x as NS_JOB_NEEDS_ARM\n", job);

  #endif

-         job->state = NS_JOB_NEEDS_ARM;

+             job->state = NS_JOB_NEEDS_ARM;

+         }

      }

  

      if (job->state == NS_JOB_NEEDS_DELETE) {
@@ -690,7 +712,12 @@ 

  }

  

  static ns_job_t *

- new_ns_job(ns_thrpool_t *tp, PRFileDesc *fd, ns_job_type_t job_type, ns_job_func_t func, struct ns_job_data_t *data)

+ new_ns_job(ns_thrpool_t *tp,

+            PRFileDesc *fd,

+            ns_job_type_t job_type,

+            ns_job_func_t func,

+            ns_job_func_t done_func,

+            struct ns_job_data_t *data)

  {

      ns_job_t *job = ns_calloc(1, sizeof(ns_job_t));

  
@@ -701,6 +728,8 @@ 

      assert(pthread_cond_init(&(job->notify), NULL) == 0);

      ns_free(monitor_attr);

  

+     pthread_mutex_lock(&(job->monitor));

+ 

      job->tp = tp;

      /* We have to have this due to our obsession of hiding struct contents ... */

      /* It's only used in tevent anyway .... */
@@ -712,35 +741,51 @@ 

      job->free_event_context = free_event_context;

      job->event_cb = event_cb;

      job->job_type = job_type;

-     job->done_cb = NULL;

+     job->done_cb = done_func;

  #ifdef DEBUG

      ns_log(LOG_DEBUG, "new_ns_job %x initial NS_JOB_WAITING\n", job);

  #endif

      job->state = NS_JOB_WAITING;

+     pthread_mutex_unlock(&(job->monitor));

      return job;

  }

  

  static ns_job_t *

- alloc_io_context(ns_thrpool_t *tp, PRFileDesc *fd, ns_job_type_t job_type, ns_job_func_t func, struct ns_job_data_t *data)

+ alloc_io_context(ns_thrpool_t *tp,

+                  PRFileDesc *fd,

+                  ns_job_type_t job_type,

+                  ns_job_func_t func,

+                  ns_job_func_t done_func,

+                  struct ns_job_data_t *data)

  {

-     ns_job_t *job = new_ns_job(tp, fd, job_type, func, data);

+     ns_job_t *job = new_ns_job(tp, fd, job_type, func, done_func, data);

  

      return job;

  }

  

  static ns_job_t *

- alloc_timeout_context(ns_thrpool_t *tp, struct timeval *tv, ns_job_type_t job_type, ns_job_func_t func, struct ns_job_data_t *data)

- {

-     ns_job_t *job = new_ns_job(tp, NULL, NS_JOB_TIMER | job_type, func, data);

+ alloc_timeout_context(ns_thrpool_t *tp,

+                       struct timeval *tv,

+                       ns_job_type_t job_type,

+                       ns_job_func_t func,

+                       ns_job_func_t done_func,

+                       struct ns_job_data_t *data)

+ {

+     ns_job_t *job = new_ns_job(tp, NULL, NS_JOB_TIMER | job_type, func, done_func, data);

      job->tv = *tv;

  

      return job;

  }

  

  static ns_job_t *

- alloc_signal_context(ns_thrpool_t *tp, PRInt32 signum, ns_job_type_t job_type, ns_job_func_t func, struct ns_job_data_t *data)

- {

-     ns_job_t *job = new_ns_job(tp, NULL, NS_JOB_SIGNAL | job_type, func, data);

+ alloc_signal_context(ns_thrpool_t *tp,

+                      PRInt32 signum,

+                      ns_job_type_t job_type,

+                      ns_job_func_t func,

+                      ns_job_func_t done_func,

+                      struct ns_job_data_t *data)

+ {

+     ns_job_t *job = new_ns_job(tp, NULL, NS_JOB_SIGNAL | job_type, func, done_func, data);

      job->signal = signum;

  

      return job;
@@ -816,7 +861,7 @@ 

          return NS_SHUTDOWN;

      }

  

-     *job = new_ns_job(tp, NULL, job_type, func, NULL);

+     *job = new_ns_job(tp, NULL, job_type, func, NULL, NULL);

      if (*job == NULL) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -855,7 +900,7 @@ 

      }

  

      /* get an event context for an accept */

-     _job = alloc_io_context(tp, fd, job_type, func, data);

+     _job = alloc_io_context(tp, fd, job_type, func, NULL, data);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -895,7 +940,7 @@ 

      }

  

      /* get an event context for a timer job */

-     _job = alloc_timeout_context(tp, tv, job_type, func, data);

+     _job = alloc_timeout_context(tp, tv, job_type, func, NULL, data);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -918,7 +963,14 @@ 

  

  /* queue a file descriptor to listen for and accept new connections */

  ns_result_t

- ns_add_io_timeout_job(ns_thrpool_t *tp, PRFileDesc *fd, struct timeval *tv, ns_job_type_t job_type, ns_job_func_t func, void *data, ns_job_t **job)

+ ns_add_io_timeout_job(ns_thrpool_t *tp,

+                       PRFileDesc *fd,

+                       struct timeval *tv,

+                       ns_job_type_t job_type,

+                       ns_job_func_t func,

+                       ns_job_func_t done_func,

+                       void *data,

+                       ns_job_t **job)

  {

      ns_job_t *_job = NULL;

  
@@ -951,7 +1003,7 @@ 

      }

  

      /* get an event context for an accept */

-     _job = alloc_io_context(tp, fd, job_type | NS_JOB_TIMER, func, data);

+     _job = alloc_io_context(tp, fd, job_type | NS_JOB_TIMER, func, done_func, data);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -988,7 +1040,7 @@ 

      }

  

      /* get an event context for a signal job */

-     _job = alloc_signal_context(tp, signum, job_type, func, data);

+     _job = alloc_signal_context(tp, signum, job_type, func, NULL, data);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -1010,7 +1062,12 @@ 

  }

  

  ns_result_t

- ns_add_job(ns_thrpool_t *tp, ns_job_type_t job_type, ns_job_func_t func, void *data, ns_job_t **job)

+ ns_add_job(ns_thrpool_t *tp,

+            ns_job_type_t job_type,

+            ns_job_func_t func,

+            ns_job_func_t done_func,

+            void *data,

+            ns_job_t **job)

  {

      ns_job_t *_job = NULL;

  
@@ -1023,7 +1080,7 @@ 

          return NS_SHUTDOWN;

      }

  

-     _job = new_ns_job(tp, NULL, job_type, func, data);

+     _job = new_ns_job(tp, NULL, job_type, func, done_func, data);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -1032,10 +1089,12 @@ 

          *job = _job;

      }

  

+     pthread_mutex_lock(&(_job->monitor));

  #ifdef DEBUG

      ns_log(LOG_DEBUG, "ns_add_job %x state %d moving to NS_JOB_ARMED\n", _job, (_job)->state);

  #endif

      _job->state = NS_JOB_NEEDS_ARM;

+     pthread_mutex_unlock(&(_job->monitor));

      internal_ns_job_rearm(_job);

  

      return NS_SUCCESS;
@@ -1045,7 +1104,7 @@ 

  ns_add_shutdown_job(ns_thrpool_t *tp)

  {

      ns_job_t *_job = NULL;

-     _job = new_ns_job(tp, NULL, NS_JOB_SHUTDOWN_WORKER, NULL, NULL);

+     _job = new_ns_job(tp, NULL, NS_JOB_SHUTDOWN_WORKER, NULL, NULL, NULL);

      if (!_job) {

          return NS_ALLOCATION_FAILURE;

      }
@@ -1144,6 +1203,26 @@ 

      }

  }

  

+ struct ns_job_guard_t *

+ ns_job_lock(ns_job_t *job) {

+     PR_ASSERT(job);

+     pthread_mutex_lock(&(job->monitor));

+     /* Now construct the guard, and return it. */

+     ns_job_guard_t *guard = malloc(sizeof(ns_job_guard_t));

+     guard->monitor = &(job->monitor);

+     PR_ASSERT(guard);

+     return guard;

+ }

+ 

+ void

+ ns_job_unlock(struct ns_job_guard_t *guard) {

+     PR_ASSERT(guard);

+     if (guard != NULL) {

+         pthread_mutex_unlock(guard->monitor);

+         free(guard);

+     }

+ }

+ 

  PRFileDesc *

  ns_job_get_fd(ns_job_t *job)

  {
@@ -1210,6 +1289,7 @@ 

      PR_ASSERT(job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING);

  

      if (ns_thrpool_is_shutdown(job->tp)) {

+         pthread_mutex_unlock(&(job->monitor));

          return NS_SHUTDOWN;

      }

  
@@ -1218,12 +1298,12 @@ 

          ns_log(LOG_DEBUG, "ns_rearm_job %x state %d moving to NS_JOB_NEEDS_ARM\n", job, job->state);

  #endif

          job->state = NS_JOB_NEEDS_ARM;

-         internal_ns_job_rearm(job);

          pthread_mutex_unlock(&(job->monitor));

+         internal_ns_job_rearm(job);

          return NS_SUCCESS;

      } else if (!NS_JOB_IS_PERSIST(job->job_type) && job->state == NS_JOB_RUNNING) {

- /* For this to be called, and NS_JOB_RUNNING, we *must* be the callback thread! */

- /* Just mark it (ie do nothing), the work_job_execute function will trigger internal_ns_job_rearm */

+         /* For this to be called, and NS_JOB_RUNNING, we *must* be the callback thread! */

+         /* Just mark it (ie do nothing), the work_job_execute function will trigger internal_ns_job_rearm */

  #ifdef DEBUG

          ns_log(LOG_DEBUG, "ns_rearm_job %x state %d setting NS_JOB_NEEDS_ARM\n", job, job->state);

  #endif
@@ -1235,13 +1315,9 @@ 

          return NS_INVALID_STATE;

      }

      /* Unreachable code .... */

+     PR_ASSERT(0);

      return NS_INVALID_REQUEST;

  }

- int

- ns_job_is_func(struct ns_job_t *job, ns_job_func_t func)

- {

-     return(job && job->func == func);

- }

  

  static void

  ns_thrpool_delete(ns_thrpool_t *tp)
@@ -1290,7 +1366,7 @@ 

      /* wakeup events are processed inside the event loop thread */

      job = alloc_io_context(tp, tp->event_q_wakeup_pipe_read,

                             NS_JOB_READ | NS_JOB_PERSIST | NS_JOB_PRESERVE_FD,

-                            wakeup_cb, NULL);

+                            wakeup_cb, NULL, NULL);

  

      pthread_mutex_lock(&(job->monitor));

  

@@ -1,5 +1,6 @@ 

  /* --- BEGIN COPYRIGHT BLOCK ---

   * Copyright (C) 2016  Red Hat

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * see files 'COPYING' and 'COPYING.openssl' for use and warranty

   * information

   *
@@ -169,7 +170,7 @@ 

  

      pthread_mutex_lock(&cb_lock);

      assert_int_equal(

-         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, NULL, &job),

+         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, NULL, NULL, &job),

          0);

  

      assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0);
@@ -196,7 +197,7 @@ 

  

      pthread_mutex_lock(&cb_lock);

      assert_int_equal(

-         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, data, &job),

+         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, NULL, data, &job),

          NS_SUCCESS);

  

      /* Let the job run */
@@ -365,7 +366,7 @@ 

  

      pthread_mutex_lock(&cb_lock);

      assert_int_equal(

-         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_race_done_job_cb, NULL, &job),

+         ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_race_done_job_cb, NULL, NULL, &job),

          NS_SUCCESS);

  

      assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0);
@@ -415,9 +416,9 @@ 

  

      struct timeval tv = {-1, 0};

  

-     PR_ASSERT(NS_INVALID_REQUEST == ns_add_io_timeout_job(tp, 0, &tv, NS_JOB_THREAD, ns_init_do_nothing_cb, NULL, NULL));

+     assert(NS_INVALID_REQUEST == ns_add_io_timeout_job(tp, 0, &tv, NS_JOB_THREAD, ns_init_do_nothing_cb, NULL, NULL, NULL));

  

-     PR_ASSERT(NS_INVALID_REQUEST == ns_add_timeout_job(tp, &tv, NS_JOB_THREAD, ns_init_do_nothing_cb, NULL, NULL));

+     assert(NS_INVALID_REQUEST == ns_add_timeout_job(tp, &tv, NS_JOB_THREAD, ns_init_do_nothing_cb, NULL, NULL));

  }

  

  /*
@@ -425,9 +426,8 @@ 

   */

  

  static void

- ns_timer_job_cb(struct ns_job_t *job)

+ ns_timer_job_cb(struct ns_job_t *job __attribute__((unused)))

  {

-     ns_job_done(job);

      pthread_mutex_lock(&cb_lock);

      cb_check += 1;

      pthread_cond_signal(&cb_cond);
@@ -451,8 +451,18 @@ 

  

      // pthread_mutex_lock(&cb_lock);

      cond_wait_rel(&cb_cond, &cb_lock, &timeout);

-     pthread_mutex_unlock(&cb_lock);

      assert_int_equal(cb_check, 1);

+ 

+     // Now rearm and check again.

+     ns_job_rearm(job);

+     cond_wait_rel(&cb_cond, &cb_lock, &timeout);

+     assert_int_equal(cb_check, 1);

+ 

+     cond_wait_rel(&cb_cond, &cb_lock, &timeout);

+     assert_int_equal(cb_check, 2);

+     pthread_mutex_unlock(&cb_lock);

+ 

+     ns_job_done(job);

  }

  

  /*
@@ -494,6 +504,41 @@ 

      pthread_mutex_unlock(&cb_lock);

  }

  

+ static void

+ ns_job_lock_guard_test(void **state)

+ {

+     struct ns_thrpool_t *tp = *state;

+     struct ns_job_t *job = NULL;

+     struct timespec timeout = {1, 0};

+ 

+     /* The job is created now. */

+ 

+     assert_int_equal(

+         ns_create_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, &job),

+         0);

+ 

+     /* Lock on the job, and ensure it can't progress. */

+ 

+     struct ns_job_guard_t *guard = ns_job_lock(job);

+ 

+     pthread_mutex_lock(&cb_lock);

+     ns_job_rearm(job);

+     assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) != 0);

+     pthread_mutex_unlock(&cb_lock);

+ 

+     /* Now unlock the guard and let it finish. */

+     pthread_mutex_lock(&cb_lock);

+     ns_job_unlock(guard);

+ 

+     assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0);

+     assert_int_equal(cb_check, 1);

+     pthread_mutex_unlock(&cb_lock);

+ 

+     /* Get rid of the job */

+     ns_job_done(job);

+     /* Finish! */

+ }

+ 

  int

  main(void)

  {
@@ -528,6 +573,9 @@ 

          cmocka_unit_test_setup_teardown(ns_job_timer_persist_test,

                                          ns_test_setup,

                                          ns_test_teardown),

+         cmocka_unit_test_setup_teardown(ns_job_lock_guard_test,

+                                         ns_test_setup,

+                                         ns_test_teardown),

      };

      return cmocka_run_group_tests(tests, NULL, NULL);

  }

@@ -1,5 +1,6 @@ 

  /* --- BEGIN COPYRIGHT BLOCK ---

   * Copyright (C) 2016  Red Hat

+  * Copyright (C) 2018 William Brown <william@blackhats.net.au>

   * see files 'COPYING' and 'COPYING.openssl' for use and warranty

   * information

   *
@@ -396,7 +397,7 @@ 

      clock_gettime(CLOCK_MONOTONIC, &ts);

      printf("BEGIN: %ld.%ld\n", ts.tv_sec, ts.tv_nsec);

      for (int32_t i = 0; i < tparams->jobs; i++) {

-         assert_int_equal(ns_add_job(ns_job_get_tp(job), NS_JOB_NONE | NS_JOB_THREAD, client_initiate_connection_cb, NULL, NULL), 0);

+         assert_int_equal(ns_add_job(ns_job_get_tp(job), NS_JOB_NONE | NS_JOB_THREAD, client_initiate_connection_cb, NULL, NULL, NULL), 0);

      }

      assert_int_equal(ns_job_done(job), 0);

  
@@ -476,7 +477,7 @@ 

  

      /* While true, add connect / write jobs */

      for (PRInt32 i = 0; i < tparams->client_thread_count; i++) {

-         assert_int_equal(ns_add_job(ctp, NS_JOB_NONE | NS_JOB_THREAD, client_create_work, tparams, NULL), 0);

+         assert_int_equal(ns_add_job(ctp, NS_JOB_NONE | NS_JOB_THREAD, client_create_work, NULL, tparams, NULL), 0);

      }

  

      /* Wait for all the clients to be done dispatching jobs to the server */

Bug Description: The integration of NS and DS doesn't account
for threadsafety. Adding thread saftey to NS however didn't
fix DS's broken model. DS makes assumptions about IO ownership
that NS doesn't support nicely. As a result some deadlocks could
occur.

Fix Description: Add a method to allow an external caller to
lock NS jobs. This allows DS to correctly lock in the correct
order.

https://pagure.io/389-ds-base/issue/49569

Author: wibrown

Review by: ???

@tbordaz @mreynolds You may bee interested to look at this patch please :)

rebased onto 6f8ad57e0ebcaa0a0b7d52a0387d7818edaa40f2

5 years ago

Should this be removed since it doesn't do anything, or is it incomplete?

Also, please don't use java style comments

Can this comment above be removed? If you want to record the history of the logic perhaps use a more appropriate comment?

I see a lot of code that looks like it was experimentally commented out. If it's not needed it should be cleaned up, and the java style comments should also not be used.

The rest of the code looks okay, but I want @tbordaz to look it over, and I would really like @vashirov to run some tests on it (maybe even an IPA install).

Yeah, I have done a clean up since. I've run it with tests and they seem to do well. I need to clean up the NS=ON by default again, and probably extend the python tests to support an env/config setting for it? That way we can test against IPA.

I really commited because I wanted the code in git, and not lost to the sands of time in my laptop.

rebased onto 4e0e1d857e9c8c058634ba275f9331da5964b8d0

5 years ago

As promised, cleanup provided - this still enables NS=ON by default in libglobs which is easier for testing, but obviously we don't want to let that slip into master.

@firstyear thank you so much for this patch. I started reviewing it but soon it was clear it requires a deep review. I discussed with you (on private emails) the outcome of the team discussions but forgot to update the PR. Sorry for that.

Before doing any fix/enhancements in NS, we decided to invest in test coverage. The tests aim to measure the benefit of NS and isolate (when it is possible) reproducible test cases of the known failures (connection leak and hang). Then regarding the expected benefits we will be able to justify the need of enabling NS and work (i.e. review) on NS robustness.

@tbordaz Thanks for that. I'd like to just remind you and everyone that I'm not able to access private Red Hat emails any more, so it would be better if these discussions were had on 389-devel .... :(

Anyway, with the patch I think we could merge and trust the feature gating (nunc-stans: off) in cn=config, so that we can continue to test. If it's already disabled and we trust that flag to operate, then merging this prevents further bitrot and divergence between potential fixes. Then we can continue to test and improve, and the discussion is not 'merge this patch' but 're-enable by default'.

Does that sound like a better plan? My concern is this patch has been going for a long time and it's harder and harder to rebase every time....

Hey @firstyear, I apologize for the delay, but I finally started to test this PR.
I've got a hang during suites/paged_results/paged_results_test.py::test_search_multiple_paging, here's the stacktrace. In the errors log I see:

[23/Oct/2018:13:39:10.433425941 -0400] - WARN - ns_handle_pr_read_ready - conn 2 for fd=65 - socket is already closing
[23/Oct/2018:13:39:10.434146277 -0400] - WARN - ns_handle_pr_read_ready - conn 3 for fd=66 - socket is already closing
[23/Oct/2018:13:39:10.434603631 -0400] - WARN - ns_handle_pr_read_ready - conn 4 for fd=67 - socket is already closing
[23/Oct/2018:13:39:47.678581690 -0400] - WARN - ns_handle_pr_read_ready - conn 6 for fd=66 - socket is already closing
[23/Oct/2018:13:39:49.979713098 -0400] - WARN - ns_handle_pr_read_ready - conn 7 for fd=66 - socket is already closing
[23/Oct/2018:13:39:49.980524182 -0400] - WARN - ns_handle_pr_read_ready - conn 8 for fd=67 - socket is already closing
[23/Oct/2018:13:39:52.519567804 -0400] - WARN - ns_handle_pr_read_ready - conn 9 for fd=66 - socket is already closing
[23/Oct/2018:13:39:59.945238135 -0400] - WARN - ns_handle_pr_read_ready - conn 10 for fd=66 - socket is already closing
[23/Oct/2018:13:39:59.945955597 -0400] - WARN - ns_handle_pr_read_ready - conn 11 for fd=67 - socket is already closing
[23/Oct/2018:13:40:31.652500670 -0400] - WARN - ns_handle_pr_read_ready - conn 12 for fd=66 - socket is already closing
[23/Oct/2018:13:40:34.885272354 -0400] - WARN - ns_handle_pr_read_ready - conn 13 for fd=66 - socket is already closing
[23/Oct/2018:13:40:38.085292444 -0400] - WARN - ns_handle_pr_read_ready - conn 5 for fd=65 - socket is already closing
[23/Oct/2018:14:26:09.948360955 -0400] - WARN - ns_handle_pr_read_ready - conn 16 for fd=67 - socket is already closing

It's 100% reproducible on my VM, please let me know if you need more info.

@vashirov Thanks for this! I'll run those tests and try that out, and fix it up.

@tbordaz We said a few things on the ML but didn't come to a conclusion?

PS: @vashirov I probably don't need more info, but I do need more time :)

@firstyear I was waiting for some feedback from others members. I updated the thread on the ML.

Okay, I have rebased and updated this patch - I will begin to look at the issues that @vashirov has identified!

rebased onto 40550f45aafaf944824b82eb86ac5ec353cc4949

4 years ago

Interesting - I see a failure in paged results, but it occurs both ON and OFF with nunc-stans. I'm not seeing a deadlock or a hang as you report. I'll investigate properly to be sure it is not NS related.

The failure appears to be separate from NS, it looks like it's in the design of the test case. @tbordaz and I were discussing today about how to continue testing this patch now to avoid the deadlock case.

I would like to propose though, that we merge this patch "as is" - it is already an improvement from the current state of nunc-stans. We have NS disabled in cn=config, and this patch would leave it disabled. By merging it, we can then start to test it more frequently, and we would not have such a long review hanging for so long.

There are some small patch clean ups still to perform before we merge, but I think if we continue to delay it keeps getting harder to move this forward and test effectively.

Thoughts?

Clean it up and merge it!

Will do! I will ask @tbordaz to have a look today then also :)

rebased onto 274ff87879d65fe719559b962f8d49e416147e39

4 years ago

rebased onto ae11bc9825d5d964545df18a23c9f7339d321d44

4 years ago

This update rebases to master, and defaults to NS off. It changes the model of integration so that jobs are for the lifetime of a connection, instead of "one per event". This reduces allocator load, and makes the whole system deterministic. Additionally, it prevents reuse of the memory structures for operation, removing a possible serialisation point and memory growth point.

It should be safe to merge, and we can test it with nunc-stans enable via cn=config!

Thanks for getting this done! I'm feeling optimistic!

I'm going to keep my schedule for the next month free to work on this in case there are issues :P

Maybe a comment on why this code block is commented out?

Looks good!!! A few questions but nothing major. No complier warnings either, but I am waiting for covscan to complete... I'll report those results later.

Covscan issues reported:

Error: DEADCODE (CWE-561): [#def228]
389-ds-base-1.4.1.4.20190619git2daf26aa8/ldap/servers/slapd/connection.c:1448: assignment: Assigning: "is_timedout" = "0".
389-ds-base-1.4.1.4.20190619git2daf26aa8/ldap/servers/slapd/connection.c:1814: const: At condition "1 == is_timedout", the value of "is_timedout" must be equal to 0.
389-ds-base-1.4.1.4.20190619git2daf26aa8/ldap/servers/slapd/connection.c:1814: dead_error_condition: The condition "1 == is_timedout" cannot be true.
389-ds-base-1.4.1.4.20190619git2daf26aa8/ldap/servers/slapd/connection.c:1816: dead_error_begin: Execution cannot reach this statement: "connection_make_readable_no...".
# 1814|                   } else if (1 == is_timedout) {
# 1815|                       /* covscan reports this code is unreachable  (2019/6/4) */
# 1816|->                     connection_make_readable_nolock(conn);
# 1817|                       signal_listner();
# 1818|                   }


Error: LOCK (CWE-667): [#def467]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:267: lock: "pthread_mutex_lock" locks "job->monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:278: double_lock: "event_q_notify" locks "job->monitor" while it is locked.
#  276|   
#  277|       if (NS_JOB_IS_IO(job->job_type) || NS_JOB_IS_TIMER(job->job_type) || NS_JOB_IS_SIGNAL(job->job_type)) {
#  278|->         event_q_notify(job);
#  279|       } else {
#  280|           /* if this is a non event task, just queue it on the work q */

Error: LOCK (CWE-667): [#def468]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:267: lock: "pthread_mutex_lock" locks "job->monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:282: double_lock: "work_q_notify" locks "job->monitor" while it is locked.
#  280|           /* if this is a non event task, just queue it on the work q */
#  281|           /* Prevents an un-necessary queue / dequeue to the event_q */
#  282|->         work_q_notify(job);
#  283|       }
#  284|       pthread_mutex_unlock(&(job->monitor));

Error: MISSING_LOCK (CWE-667): [#def469]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:745: missing_lock: Accessing "job->state" without holding lock "ns_job_t.monitor". Elsewhere, "ns_job_t.state" is accessed with "ns_job_t.monitor" held 17 out of 19 times (6 of these accesses strongly imply that it is necessary).
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:218: example_lock: Example 1: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:240: example_access: Example 1 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:267: example_lock: Example 2: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:273: example_access: Example 2 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:801: example_lock: Example 3: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:827: example_access: Example 3 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1282: example_lock: Example 4: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1293: example_access: Example 4 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1572: example_lock: Example 5: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1579: example_access: Example 5 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
#  743|       ns_log(LOG_DEBUG, "new_ns_job %x initial NS_JOB_WAITING\n", job);
#  744|   #endif
#  745|->     job->state = NS_JOB_WAITING;
#  746|       return job;
#  747|   }

Error: MISSING_LOCK (CWE-667): [#def470]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:771: missing_lock: Accessing "job->tv" without holding lock "ns_job_t.monitor". Elsewhere, "ns_job_t.tv" is accessed with "ns_job_t.monitor" held 1 out of 2 times (1 of these accesses strongly imply that it is necessary).
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1006: example_lock: Example 1: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1007: example_access: Example 1 (cont.): "ns_job_t.tv" is accessed with lock "ns_job_t.monitor" held.
#  769|   {
#  770|       ns_job_t *job = new_ns_job(tp, NULL, NS_JOB_TIMER | job_type, func, done_func, data);
#  771|->     job->tv = *tv;
#  772|   
#  773|       return job;

Error: MISSING_LOCK (CWE-667): [#def471]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1091: missing_lock: Accessing "_job->state" without holding lock "ns_job_t.monitor". Elsewhere, "ns_job_t.state" is accessed with "ns_job_t.monitor" held 17 out of 19 times (6 of these accesses strongly imply that it is necessary).
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:218: example_lock: Example 1: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:240: example_access: Example 1 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:267: example_lock: Example 2: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:273: example_access: Example 2 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:801: example_lock: Example 3: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:827: example_access: Example 3 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1282: example_lock: Example 4: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1293: example_access: Example 4 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1572: example_lock: Example 5: Locking "ns_job_t.monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1579: example_access: Example 5 (cont.): "ns_job_t.state" is accessed with lock "ns_job_t.monitor" held.
# 1089|       ns_log(LOG_DEBUG, "ns_add_job %x state %d moving to NS_JOB_ARMED\n", _job, (_job)->state);
# 1090|   #endif
# 1091|->     _job->state = NS_JOB_NEEDS_ARM;
# 1092|       internal_ns_job_rearm(_job);
# 1093|   

Error: LOCK (CWE-667): [#def472]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1282: lock: "pthread_mutex_lock" locks "job->monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1286: missing_unlock: Returning without unlocking "job->monitor".
# 1284|   
# 1285|       if (ns_thrpool_is_shutdown(job->tp)) {
# 1286|->         return NS_SHUTDOWN;
# 1287|       }
# 1288|   

Error: LOCK (CWE-667): [#def473]
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1282: lock: "pthread_mutex_lock" locks "job->monitor".
389-ds-base-1.4.1.4.20190619git2daf26aa8/src/nunc-stans/ns/ns_thrpool.c:1294: double_lock: "internal_ns_job_rearm" locks "job->monitor" while it is locked.
# 1292|   #endif
# 1293|           job->state = NS_JOB_NEEDS_ARM;
# 1294|->         internal_ns_job_rearm(job);
# 1295|           pthread_mutex_unlock(&(job->monitor));
# 1296|           return NS_SUCCESS;

I don't think any of these are new, but worth looking over one more time...

I'm glad you sent me the covscan report - reviewing these, there was an issue in here, which I have resolved (maybe I'm just more patient with coverity today :) )

EDIT: I have also fixed/commented all the locations you mentioned in the review. The date is "correct" because that's when I look at the patch last apparently ;)

rebased onto 80e1de0f154e10ff12161689717dd6fccde07175

4 years ago

The patch is nice but huge and difficult to check confirm if it fixes deadlock or not. I think I found a scenario of deadlock but before a general comment.

With your patch we have per opened connection a unique and same ns_job for all the life of the connection. It could be better than the previous implementation where we had continuously changing ns_job. However the scenario of deadlock was because of two locks (c_mutex and job->monitor) taken in the opposite order. The current patch keeps those two locks and I think there is still a risk of taking them in the opposite order.

The following scenario could create deadlock but I do not know if it is realistic:

Thread A, is a ns_thread that detects activity on the connection and call the callback function

worker_thread_func ->
   work_job_execute ->
     lock(job_monitor)
     call ns_handle_work -> 
          lock(c_mutex)

Thread B, is returning data (entries or result) to a search and detecting error condition it disconnect the server

op_shared_search ->
   send_results ->
      do_disconnect ->
          lock(c_mutex)
          disconnect_server ->
                ns_handle_closure_conn_nomutex ->
                       ns_add_job(ns_handle_closure) ->
                           internal_ns_job_rearm ->
                              lock(job_monitor)

If there is conn<-->job is unique and stable, do you think it could be possible to use a unique lock to protect those two struct ?

I think you may have an excellent point here. Perhaps when nunc_stans is true, since there is a conn<-->job as you say, we could exclusively use the job lock in that case. That would allow us to more easily find and see these deadlocks.

I think the scenario you have put here is sound and reasonable, so I'll think about what's right to do here. There is meant to be "protection" inside of the ns_add_job or re-arm to allow self-re-arming if we are on the worker, but Ithink here because you correctly state they are on seperate threads it may not come into effect.

Let me have some time to think about the solution, but I certainly think you may be correct here.

After a lot of discussion yesterday with @tbordaz we agreed to use the c_mutex, and to have a method to supply an external monitor into NS at io-event create. I think the only complexity would be that c_mutex is a PR_Monitor, but ns use pthread mutex with monitor enabled. So we may need to cchange c_mutex to be pthread (which is faster anyway ....) so that we can then keep the current ns monitor for it's internal usage.

The alternative is that we have to write a lock-wrapper for c_mutex that does "if nunc_stans -> use job monitor, else us pr_monitor c_mutex". But that's also a lot of code to touch and replace.

https://pagure.io/389-ds-base/issue/50459

I think I will implement this, then I can come back and rebase this code onto it? Does that seem reasonable @tbordaz ?

IMHO #50459 (NSPR mutex/monitor -> pthread mutex/monitor) and #49569 (nunc stans deadlock) are independant.
I would vote to first get #49569 pushed into master, then confirm the performance benefit of pthread monitor on c_mutex with another ticket. The when benefit is proven applies it to the full server with #50459.

But they aren't - if we are going to make nunc-stans job and conn to use the same lock to avoid the lock pattern you mention, then we need a way to supply the monitor to the ns_job at io event create time. Today NS uses pthread, so the conn monitor has to be pthread also to give that to the ns event to allow the single lock.

That's why I suggested 50459 first.

So if we were to merge 49569 today, yeah, it's a huge improvement over the current NS code, but we still could not "enable" it due to the deadlock concern you raised. It would only be after 50459 that we could really do this to create the single lock to avoid the case.

I'm open to merging this today of course - I think the op_stack clean up is really valuable, and the NS code really already is unsafe, so merging "better but still unsafe" code is not harmful. But again, we really couldn't enable it until after 50459 ....

Arghhh.. sorry for the confusion I did not understand your proposal.

Yes it is fine. Fixing #50459 first then #49569

Great! I'll do that soon then (but there may be a small delay as I have some SUSE responsibilities for the next two -> three weeks which may delay my progress)

rebased onto 0d85f31

4 years ago

Okay, this update rebases to master to allow the pthread_mutex rewrite of c_mutex to be present in preparation to work on the ns-lock sharing.

In general though, I can conform that this passes a few test suites, so I think this is getting closer to completion.

Saying this, as a reminder, this feature IS feature gated with a config option, defaulting to OFF, so merging "early" is not a dangerous action as it won't be released to actual users until we change the default.

@tbordaz Review in the mind of not fixing deadlock, but fixes the double-handler and connection leak :) Thank you!

Pull-Request has been closed by firstyear

4 years ago

389-ds-base is moving from Pagure to Github. This means that new issues and pull requests
will be accepted only in 389-ds-base's github repository.

This pull request has been cloned to Github as issue and is available here:
- https://github.com/389ds/389-ds-base/issues/2695

If you want to continue to work on the PR, please navigate to the github issue,
download the patch from the attachments and file a new pull request.

Thank you for understanding. We apologize for all inconvenience.

Pull-Request has been closed by spichugi

3 years ago