#217 Fix concurrency bug in sending command replies
Closed 6 years ago by simo. Opened 6 years ago by simo.
simo/gssproxy concufix  into  master

file modified
+74 -55
@@ -207,12 +207,10 @@ 

      free(w);

  }

  

- static void gp_query_assign(struct gp_workers *w, struct gp_query *q)

+ static struct gp_thread *gp_get_free_thread(struct gp_workers *w)

  {

      struct gp_thread *t = NULL;

  

-     /* then either find a free thread or queue in the wait list */

- 

      /* ======> POOL LOCK */

      pthread_mutex_lock(&w->lock);

      if (w->free_list) {
@@ -223,18 +221,32 @@ 

      /* <====== POOL LOCK */

      pthread_mutex_unlock(&w->lock);

  

-     if (t) {

-         /* found free thread, assign work */

+     return t;

+ }

  

-         /* ======> COND_MUTEX */

-         pthread_mutex_lock(&t->cond_mutex);

+ static void gp_send_to_thread(struct gp_thread *t, struct gp_query *q)

+ {

+     /* ======> COND_MUTEX */

+     pthread_mutex_lock(&t->cond_mutex);

  

-         /* hand over the query */

-         t->query = q;

-         pthread_cond_signal(&t->cond_wakeup);

+     /* hand over the query */

+     t->query = q;

+     pthread_cond_signal(&t->cond_wakeup);

  

-         /* <====== COND_MUTEX */

-         pthread_mutex_unlock(&t->cond_mutex);

+     /* <====== COND_MUTEX */

+     pthread_mutex_unlock(&t->cond_mutex);

+ }

+ 

+ static void gp_query_assign(struct gp_workers *w, struct gp_query *q)

+ {

+     struct gp_thread *t = NULL;

+ 

+     /* then either find a free thread or queue in the wait list */

+     t = gp_get_free_thread(w);

+ 

+     if (t) {

+         /* found free thread, assign work */

+         gp_send_to_thread(t, q);

  

      } else {

  
@@ -247,6 +259,27 @@ 

      }

  }

  

+ static void gp_assign_queued(struct gp_workers *w)

+ {

+     struct gp_thread *t;

+     struct gp_query *q;

+ 

+     /* only the dispatcher handles wait_list

+     *  so we do not need to lock around it */

+     while (w->wait_list) {

+         t = gp_get_free_thread(w);

+ 

+         /* return if no more free threads */

+         if (t == NULL) return;

+ 

+         q = w->wait_list;

+         w->wait_list = q->next;

LIST_DEL? Not that we ever use the prev pointer for anything, but it would be a nasty surprise...

Probably should be a LIST_ADD anywhere the wait_list or other worker lists are manipulated too.

+         q->next = NULL;

+ 

+         gp_send_to_thread(t, q);

+     }

+ }

+ 

  static void gp_query_free(struct gp_query *q, bool free_buffer)

  {

      if (!q) {
@@ -283,67 +316,53 @@ 

  static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)

  {

      struct gp_workers *w;

-     struct gp_query *q = NULL;

+     struct gp_query *q;

      char dummy;

-     int ret;

  

      w = verto_get_private(ev);

  

-     /* first read out the dummy so the pipe doesn't get clogged */

-     ret = read(w->sig_pipe[0], &dummy, 1);

-     if (ret) {

-         /* ignore errors */

-     }

+     /* grab and dispatch all query replies, if any */

+     do {

+         q = NULL;

  

-     /* grab a query reply if any */

-     if (w->reply_list) {

          /* ======> POOL LOCK */

          pthread_mutex_lock(&w->lock);

  

-         if (w->reply_list != NULL) {

+         if (w->reply_list) {

              q = w->reply_list;

              w->reply_list = q->next;

          }

  

          /* <====== POOL LOCK */

          pthread_mutex_unlock(&w->lock);

-     }

  

-     if (q) {

-         switch (q->status) {

-         case GP_QUERY_IN:

-             /* ?! fallback and kill client conn */

-         case GP_QUERY_ERR:

-             gp_conn_free(q->conn);

-             gp_query_free(q, true);

-             break;

- 

-         case GP_QUERY_OUT:

-             GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);

-             gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);

-             gp_query_free(q, false);

-             break;

+         if (q) {

+             /* read out the dummy so the pipe doesn't get clogged */

+             (void)read(w->sig_pipe[0], &dummy, 1);

+             /* ignore errors */

+ 

+             switch (q->status) {

+             case GP_QUERY_IN:

+                 /* ?! fallback and kill client conn */

+             case GP_QUERY_ERR:

+                 gp_conn_free(q->conn);

+                 gp_query_free(q, true);

+                 break;

+ 

+             case GP_QUERY_OUT:

+                 GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n",

+                             q->buffer, q->buflen);

+                 gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);

+                 gp_query_free(q, false);

+                 break;

+             }

          }

-     }

- 

-     /* while we are at it, check if there is anything in the wait list

-      * we need to process, as one thread just got free :-) */

- 

-     q = NULL;

  

-     if (w->wait_list) {

-         /* only the dispatcher handles wait_list

-         *  so we do not need to lock around it */

-         if (w->wait_list) {

-             q = w->wait_list;

-             w->wait_list = q->next;

-             q->next = NULL;

-         }

-     }

+     } while (q != NULL);

  

-     if (q) {

-         gp_query_assign(w, q);

-     }

+     /* while we are at it, check if there is anything in the wait list

+      * we need to process, as some threads may have just got free */

+     gp_assign_queued(w);

  }

  

  

If multiple threads were ready at the same time we would process only the
first and then leave the rest hanging.
Try to process all the replies in the reply queue and then also try to
put as many workers as possible back to work on queued requests.

LIST_DEL? Not that we ever use the prev pointer for anything, but it would be a nasty surprise...

Probably should be a LIST_ADD anywhere the wait_list or other worker lists are manipulated too.

One comment inline.

The concurrency looks right, so if you're okay with the list macros, I can clean up and merge.

Closing this one, my root cause analysis was incorrect and the current code, while less efficient in some corner case, seem to work correctly.
I'll revive this one in the fure if we see the need

Pull-Request has been closed by simo

6 years ago
Metadata