| |
@@ -207,12 +207,10 @@
|
| |
free(w);
|
| |
}
|
| |
|
| |
- static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
|
| |
+ static struct gp_thread *gp_get_free_thread(struct gp_workers *w)
|
| |
{
|
| |
struct gp_thread *t = NULL;
|
| |
|
| |
- /* then either find a free thread or queue in the wait list */
|
| |
-
|
| |
/* ======> POOL LOCK */
|
| |
pthread_mutex_lock(&w->lock);
|
| |
if (w->free_list) {
|
| |
@@ -223,18 +221,32 @@
|
| |
/* <====== POOL LOCK */
|
| |
pthread_mutex_unlock(&w->lock);
|
| |
|
| |
- if (t) {
|
| |
- /* found free thread, assign work */
|
| |
+ return t;
|
| |
+ }
|
| |
|
| |
- /* ======> COND_MUTEX */
|
| |
- pthread_mutex_lock(&t->cond_mutex);
|
| |
+ static void gp_send_to_thread(struct gp_thread *t, struct gp_query *q)
|
| |
+ {
|
| |
+ /* ======> COND_MUTEX */
|
| |
+ pthread_mutex_lock(&t->cond_mutex);
|
| |
|
| |
- /* hand over the query */
|
| |
- t->query = q;
|
| |
- pthread_cond_signal(&t->cond_wakeup);
|
| |
+ /* hand over the query */
|
| |
+ t->query = q;
|
| |
+ pthread_cond_signal(&t->cond_wakeup);
|
| |
|
| |
- /* <====== COND_MUTEX */
|
| |
- pthread_mutex_unlock(&t->cond_mutex);
|
| |
+ /* <====== COND_MUTEX */
|
| |
+ pthread_mutex_unlock(&t->cond_mutex);
|
| |
+ }
|
| |
+
|
| |
+ static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
|
| |
+ {
|
| |
+ struct gp_thread *t = NULL;
|
| |
+
|
| |
+ /* then either find a free thread or queue in the wait list */
|
| |
+ t = gp_get_free_thread(w);
|
| |
+
|
| |
+ if (t) {
|
| |
+ /* found free thread, assign work */
|
| |
+ gp_send_to_thread(t, q);
|
| |
|
| |
} else {
|
| |
|
| |
@@ -247,6 +259,27 @@
|
| |
}
|
| |
}
|
| |
|
| |
+ static void gp_assign_queued(struct gp_workers *w)
|
| |
+ {
|
| |
+ struct gp_thread *t;
|
| |
+ struct gp_query *q;
|
| |
+
|
| |
+ /* only the dispatcher handles wait_list
|
| |
+ * so we do not need to lock around it */
|
| |
+ while (w->wait_list) {
|
| |
+ t = gp_get_free_thread(w);
|
| |
+
|
| |
+ /* return if no more free threads */
|
| |
+ if (t == NULL) return;
|
| |
+
|
| |
+ q = w->wait_list;
|
| |
+ w->wait_list = q->next;
|
| |
+ q->next = NULL;
|
| |
+
|
| |
+ gp_send_to_thread(t, q);
|
| |
+ }
|
| |
+ }
|
| |
+
|
| |
static void gp_query_free(struct gp_query *q, bool free_buffer)
|
| |
{
|
| |
if (!q) {
|
| |
@@ -283,67 +316,53 @@
|
| |
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
|
| |
{
|
| |
struct gp_workers *w;
|
| |
- struct gp_query *q = NULL;
|
| |
+ struct gp_query *q;
|
| |
char dummy;
|
| |
- int ret;
|
| |
|
| |
w = verto_get_private(ev);
|
| |
|
| |
- /* first read out the dummy so the pipe doesn't get clogged */
|
| |
- ret = read(w->sig_pipe[0], &dummy, 1);
|
| |
- if (ret) {
|
| |
- /* ignore errors */
|
| |
- }
|
| |
+ /* grab and dispatch all query replies, if any */
|
| |
+ do {
|
| |
+ q = NULL;
|
| |
|
| |
- /* grab a query reply if any */
|
| |
- if (w->reply_list) {
|
| |
/* ======> POOL LOCK */
|
| |
pthread_mutex_lock(&w->lock);
|
| |
|
| |
- if (w->reply_list != NULL) {
|
| |
+ if (w->reply_list) {
|
| |
q = w->reply_list;
|
| |
w->reply_list = q->next;
|
| |
}
|
| |
|
| |
/* <====== POOL LOCK */
|
| |
pthread_mutex_unlock(&w->lock);
|
| |
- }
|
| |
|
| |
- if (q) {
|
| |
- switch (q->status) {
|
| |
- case GP_QUERY_IN:
|
| |
- /* ?! fallback and kill client conn */
|
| |
- case GP_QUERY_ERR:
|
| |
- gp_conn_free(q->conn);
|
| |
- gp_query_free(q, true);
|
| |
- break;
|
| |
-
|
| |
- case GP_QUERY_OUT:
|
| |
- GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);
|
| |
- gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
|
| |
- gp_query_free(q, false);
|
| |
- break;
|
| |
+ if (q) {
|
| |
+ /* read out the dummy so the pipe doesn't get clogged */
|
| |
+ (void)read(w->sig_pipe[0], &dummy, 1);
|
| |
+ /* ignore errors */
|
| |
+
|
| |
+ switch (q->status) {
|
| |
+ case GP_QUERY_IN:
|
| |
+ /* ?! fallback and kill client conn */
|
| |
+ case GP_QUERY_ERR:
|
| |
+ gp_conn_free(q->conn);
|
| |
+ gp_query_free(q, true);
|
| |
+ break;
|
| |
+
|
| |
+ case GP_QUERY_OUT:
|
| |
+ GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n",
|
| |
+ q->buffer, q->buflen);
|
| |
+ gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
|
| |
+ gp_query_free(q, false);
|
| |
+ break;
|
| |
+ }
|
| |
}
|
| |
- }
|
| |
-
|
| |
- /* while we are at it, check if there is anything in the wait list
|
| |
- * we need to process, as one thread just got free :-) */
|
| |
-
|
| |
- q = NULL;
|
| |
|
| |
- if (w->wait_list) {
|
| |
- /* only the dispatcher handles wait_list
|
| |
- * so we do not need to lock around it */
|
| |
- if (w->wait_list) {
|
| |
- q = w->wait_list;
|
| |
- w->wait_list = q->next;
|
| |
- q->next = NULL;
|
| |
- }
|
| |
- }
|
| |
+ } while (q != NULL);
|
| |
|
| |
- if (q) {
|
| |
- gp_query_assign(w, q);
|
| |
- }
|
| |
+ /* while we are at it, check if there is anything in the wait list
|
| |
+ * we need to process, as some threads may have just got free */
|
| |
+ gp_assign_queued(w);
|
| |
}
|
| |
|
| |
|
| |
LIST_DEL? Not that we ever use the prev pointer for anything, but it would be a nasty surprise...
Probably should be a LIST_ADD anywhere the wait_list or other worker lists are manipulated too.