From d67e8aea264bc000c41843b88a03654aa675b04f Mon Sep 17 00:00:00 2001 From: William Brown Date: Nov 16 2017 18:33:52 +0000 Subject: Ticket 49435 - Fix NS race condition on loaded test systems Bug Description: During a test run, on a heavily loaded systems some events would time out before they could occur correctly. Fix Description: Change the structure of events to mitigate a deref performance hit, and add a ns_job_wait conditional that allows blocking on a job to complete so that tests do not require time based checks. https://pagure.io/389-ds-base/issue/49435 Author: wibrown Review by: mreynolds (Thanks!) (cherry picked from commit 11974a08f7bb083a48590cdc26652934fa74c0cb) --- diff --git a/src/nunc-stans/include/nunc-stans.h b/src/nunc-stans/include/nunc-stans.h index 386a8d2..192e38e 100644 --- a/src/nunc-stans/include/nunc-stans.h +++ b/src/nunc-stans/include/nunc-stans.h @@ -77,6 +77,10 @@ typedef enum _ns_result_t { * This occurs when a lower level OS issue occurs, generally thread related. */ NS_THREAD_FAILURE = 5, + /** + * The job is being deleted + */ + NS_DELETING = 6, } ns_result_t; /** @@ -837,6 +841,14 @@ ns_job_type_t ns_job_get_output_type(struct ns_job_t *job); ns_result_t ns_job_set_done_cb(struct ns_job_t *job, ns_job_func_t func); /** + * Block until a job is completed. This returns the next state of the job as as a return. + * + * \param job The job to set the callback for. + * \retval ns_job_state_t The next state the job will move to. IE, WAITING, DELETED, ARMED. + */ +ns_result_t ns_job_wait(struct ns_job_t *job); + +/** * Creates a new thread pool * * Must be called with a struct ns_thrpool_config that has been diff --git a/src/nunc-stans/ns/ns_event_fw.h b/src/nunc-stans/ns/ns_event_fw.h index 436b282..88997b2 100644 --- a/src/nunc-stans/ns/ns_event_fw.h +++ b/src/nunc-stans/ns/ns_event_fw.h @@ -80,7 +80,8 @@ typedef enum _ns_job_state { interface between the app/thread pool/event framework */ typedef struct ns_job_t { - pthread_mutex_t *monitor; + pthread_mutex_t monitor; + pthread_cond_t notify; struct ns_thrpool_t *tp; ns_job_func_t func; struct ns_job_data_t *data; diff --git a/src/nunc-stans/ns/ns_thrpool.c b/src/nunc-stans/ns/ns_thrpool.c index 2ad0bd7..1d8bb03 100644 --- a/src/nunc-stans/ns/ns_thrpool.c +++ b/src/nunc-stans/ns/ns_thrpool.c @@ -214,7 +214,7 @@ job_queue_cleanup(void *arg) static void internal_ns_job_done(ns_job_t *job) { - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "internal_ns_job_done %x state %d moving to NS_JOB_DELETED\n", job, job->state); #endif @@ -239,9 +239,9 @@ internal_ns_job_done(ns_job_t *job) job->done_cb(job); } - pthread_mutex_unlock(job->monitor); - pthread_mutex_destroy(job->monitor); - ns_free(job->monitor); + pthread_mutex_unlock(&(job->monitor)); + pthread_mutex_destroy(&(job->monitor)); + pthread_cond_destroy(&(job->notify)); ns_free(job); } @@ -250,7 +250,7 @@ internal_ns_job_done(ns_job_t *job) static void internal_ns_job_rearm(ns_job_t *job) { - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_NEEDS_ARM); /* Don't think I need to check persistence here, it could be the first arm ... */ #ifdef DEBUG @@ -267,7 +267,7 @@ internal_ns_job_rearm(ns_job_t *job) /* Prevents an un-necessary queue / dequeue to the event_q */ work_q_notify(job); } - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); } static void @@ -281,7 +281,7 @@ work_job_execute(ns_job_t *job) * DELETED! Crashes abound, you have been warned ... */ PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "work_job_execute %x state %d moving to NS_JOB_RUNNING\n", job, job->state); #endif @@ -303,7 +303,12 @@ work_job_execute(ns_job_t *job) #ifdef DEBUG ns_log(LOG_DEBUG, "work_job_execute %x state %d job func complete, sending to job_done...\n", job, job->state); #endif - pthread_mutex_unlock(job->monitor); + /* + * Let waiters know we are done, they'll pick up once + * we unlock. + */ + pthread_cond_signal(&(job->notify)); + pthread_mutex_unlock(&(job->monitor)); internal_ns_job_done(job); /* MUST NOT ACCESS JOB AGAIN.*/ } else if (job->state == NS_JOB_NEEDS_ARM) { @@ -311,7 +316,8 @@ work_job_execute(ns_job_t *job) ns_log(LOG_DEBUG, "work_job_execute %x state %d job func complete, sending to rearm...\n", job, job->state); #endif /* Rearm the job! */ - pthread_mutex_unlock(job->monitor); + /* We *don't* notify here because we ARE NOT done! */ + pthread_mutex_unlock(&(job->monitor)); internal_ns_job_rearm(job); } else { #ifdef DEBUG @@ -321,7 +327,12 @@ work_job_execute(ns_job_t *job) PR_ASSERT(!NS_JOB_IS_PERSIST(job->job_type)); /* We are now idle, set waiting. */ job->state = NS_JOB_WAITING; - pthread_mutex_unlock(job->monitor); + /* + * Let waiters know we are done, they'll pick up once + * we unlock. + */ + pthread_cond_signal(&(job->notify)); + pthread_mutex_unlock(&(job->monitor)); } /* MUST NOT ACCESS JOB AGAIN */ } @@ -338,7 +349,7 @@ static void work_q_notify(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "work_q_notify %x state %d\n", job, job->state); #endif @@ -346,12 +357,12 @@ work_q_notify(ns_job_t *job) if (job->state != NS_JOB_ARMED) { /* Maybe we should return some error here? */ ns_log(LOG_ERR, "work_q_notify %x state %d is not ARMED, cannot queue!\n", job, job->state); - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return; } /* MUST NOT ACCESS job after enqueue. So we stash tp.*/ ns_thrpool_t *ltp = job->tp; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); sds_lqueue_enqueue(ltp->work_q, (void *)job); pthread_mutex_lock(&(ltp->work_q_lock)); pthread_cond_signal(&(ltp->work_q_cv)); @@ -411,13 +422,13 @@ static void update_event(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "update_event %x state %d\n", job, job->state); #endif PR_ASSERT(job->state == NS_JOB_NEEDS_DELETE || job->state == NS_JOB_ARMED); if (job->state == NS_JOB_NEEDS_DELETE) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); internal_ns_job_done(job); return; } else if (NS_JOB_IS_IO(job->job_type) || job->ns_event_fw_fd) { @@ -426,7 +437,7 @@ update_event(ns_job_t *job) } else { job->tp->ns_event_fw->ns_event_fw_mod_io(job->tp->ns_event_fw_ctx, job); } - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); /* We need these returns to prevent a race on the next else if condition when we release job->monitor */ return; } else if (NS_JOB_IS_TIMER(job->job_type) || job->ns_event_fw_time) { @@ -435,7 +446,7 @@ update_event(ns_job_t *job) } else { job->tp->ns_event_fw->ns_event_fw_mod_timer(job->tp->ns_event_fw_ctx, job); } - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return; } else if (NS_JOB_IS_SIGNAL(job->job_type) || job->ns_event_fw_sig) { if (!job->ns_event_fw_sig) { @@ -443,15 +454,15 @@ update_event(ns_job_t *job) } else { job->tp->ns_event_fw->ns_event_fw_mod_signal(job->tp->ns_event_fw_ctx, job); } - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return; } else { /* It's a "run now" job. */ if (NS_JOB_IS_THREAD(job->job_type)) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); work_q_notify(job); } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); event_q_notify(job); } } @@ -602,14 +613,14 @@ event_cb(ns_job_t *job) */ /* There is no guarantee this won't be called once we start to enter the shutdown, especially with timers .... */ - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_ARMED || job->state == NS_JOB_NEEDS_DELETE); if (job->state == NS_JOB_ARMED && NS_JOB_IS_THREAD(job->job_type)) { #ifdef DEBUG ns_log(LOG_DEBUG, "event_cb %x state %d threaded, send to work_q\n", job, job->state); #endif - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); work_q_notify(job); } else if (job->state == NS_JOB_NEEDS_DELETE) { #ifdef DEBUG @@ -620,14 +631,14 @@ event_cb(ns_job_t *job) * It's here because it's been QUEUED for deletion and *may* be coming * from the thrpool destroy thread! */ - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); } else { #ifdef DEBUG ns_log(LOG_DEBUG, "event_cb %x state %d non-threaded, execute right meow\n", job, job->state); #endif /* Not threaded, execute now! */ - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); work_job_execute(job); /* MUST NOT ACCESS JOB FROM THIS POINT */ } @@ -682,12 +693,12 @@ static ns_job_t * new_ns_job(ns_thrpool_t *tp, PRFileDesc *fd, ns_job_type_t job_type, ns_job_func_t func, struct ns_job_data_t *data) { ns_job_t *job = ns_calloc(1, sizeof(ns_job_t)); - job->monitor = ns_calloc(1, sizeof(pthread_mutex_t)); pthread_mutexattr_t *monitor_attr = ns_calloc(1, sizeof(pthread_mutexattr_t)); pthread_mutexattr_init(monitor_attr); pthread_mutexattr_settype(monitor_attr, PTHREAD_MUTEX_RECURSIVE); - assert(pthread_mutex_init(job->monitor, monitor_attr) == 0); + assert(pthread_mutex_init(&(job->monitor), monitor_attr) == 0); + assert(pthread_cond_init(&(job->notify), NULL) == 0); ns_free(monitor_attr); job->tp = tp; @@ -746,14 +757,14 @@ ns_job_done(ns_job_t *job) /* Get the shutdown state ONCE at the start, atomically */ int32_t shutdown_state = ns_thrpool_is_shutdown(job->tp); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); if (job->state == NS_JOB_NEEDS_DELETE || job->state == NS_JOB_DELETED) { /* Just return if the job has been marked for deletion */ #ifdef DEBUG ns_log(LOG_DEBUG, "ns_job_done %x tp shutdown -> %x state %d return early\n", job, shutdown_state, job->state); #endif - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_SUCCESS; } @@ -762,7 +773,7 @@ ns_job_done(ns_job_t *job) #ifdef DEBUG ns_log(LOG_DEBUG, "ns_job_done %x tp shutdown -> false state %d failed to mark as done\n", job, job->state); #endif - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_INVALID_STATE; } @@ -773,13 +784,13 @@ ns_job_done(ns_job_t *job) ns_log(LOG_DEBUG, "ns_job_done %x tp shutdown -> false state %d setting to async NS_JOB_NEEDS_DELETE\n", job, job->state); #endif job->state = NS_JOB_NEEDS_DELETE; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); } else if (!shutdown_state) { #ifdef DEBUG ns_log(LOG_DEBUG, "ns_job_done %x tp shutdown -> false state %d setting NS_JOB_NEEDS_DELETE and queuing\n", job, job->state); #endif job->state = NS_JOB_NEEDS_DELETE; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); event_q_notify(job); } else { #ifdef DEBUG @@ -787,7 +798,7 @@ ns_job_done(ns_job_t *job) #endif job->state = NS_JOB_NEEDS_DELETE; /* We are shutting down, just remove it! */ - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); internal_ns_job_done(job); } return NS_SUCCESS; @@ -849,12 +860,12 @@ ns_add_io_job(ns_thrpool_t *tp, PRFileDesc *fd, ns_job_type_t job_type, ns_job_f return NS_ALLOCATION_FAILURE; } - pthread_mutex_lock(_job->monitor); + pthread_mutex_lock(&(_job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "ns_add_io_job state %d moving to NS_JOB_ARMED\n", (_job)->state); #endif _job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(_job->monitor); + pthread_mutex_unlock(&(_job->monitor)); internal_ns_job_rearm(_job); /* fill in a pointer to the job for the caller if requested */ @@ -889,12 +900,12 @@ ns_add_timeout_job(ns_thrpool_t *tp, struct timeval *tv, ns_job_type_t job_type, return NS_ALLOCATION_FAILURE; } - pthread_mutex_lock(_job->monitor); + pthread_mutex_lock(&(_job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "ns_add_timeout_job state %d moving to NS_JOB_ARMED\n", (_job)->state); #endif _job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(_job->monitor); + pthread_mutex_unlock(&(_job->monitor)); internal_ns_job_rearm(_job); /* fill in a pointer to the job for the caller if requested */ @@ -944,14 +955,14 @@ ns_add_io_timeout_job(ns_thrpool_t *tp, PRFileDesc *fd, struct timeval *tv, ns_j if (!_job) { return NS_ALLOCATION_FAILURE; } - pthread_mutex_lock(_job->monitor); + pthread_mutex_lock(&(_job->monitor)); _job->tv = *tv; #ifdef DEBUG ns_log(LOG_DEBUG, "ns_add_io_timeout_job state %d moving to NS_JOB_ARMED\n", (_job)->state); #endif _job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(_job->monitor); + pthread_mutex_unlock(&(_job->monitor)); internal_ns_job_rearm(_job); /* fill in a pointer to the job for the caller if requested */ @@ -982,12 +993,12 @@ ns_add_signal_job(ns_thrpool_t *tp, int32_t signum, ns_job_type_t job_type, ns_j return NS_ALLOCATION_FAILURE; } - pthread_mutex_lock(_job->monitor); + pthread_mutex_lock(&(_job->monitor)); #ifdef DEBUG ns_log(LOG_DEBUG, "ns_add_signal_job state %d moving to NS_JOB_ARMED\n", (_job)->state); #endif _job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(_job->monitor); + pthread_mutex_unlock(&(_job->monitor)); internal_ns_job_rearm(_job); /* fill in a pointer to the job for the caller if requested */ @@ -1038,9 +1049,9 @@ ns_add_shutdown_job(ns_thrpool_t *tp) if (!_job) { return NS_ALLOCATION_FAILURE; } - pthread_mutex_lock(_job->monitor); + pthread_mutex_lock(&(_job->monitor)); _job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(_job->monitor); + pthread_mutex_unlock(&(_job->monitor)); internal_ns_job_rearm(_job); return NS_SUCCESS; } @@ -1061,13 +1072,13 @@ void * ns_job_get_data(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state != NS_JOB_DELETED); if (job->state != NS_JOB_DELETED) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return job->data; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NULL; } } @@ -1076,14 +1087,14 @@ ns_result_t ns_job_set_data(ns_job_t *job, void *data) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING); if (job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING) { job->data = data; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_SUCCESS; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_INVALID_STATE; } } @@ -1092,13 +1103,13 @@ ns_thrpool_t * ns_job_get_tp(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state != NS_JOB_DELETED); if (job->state != NS_JOB_DELETED) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return job->tp; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NULL; } } @@ -1107,13 +1118,13 @@ ns_job_type_t ns_job_get_output_type(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_RUNNING); if (job->state == NS_JOB_RUNNING) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return job->output_job_type; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return 0; } } @@ -1122,13 +1133,13 @@ ns_job_type_t ns_job_get_type(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state != NS_JOB_DELETED); if (job->state != NS_JOB_DELETED) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return job->job_type; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return 0; } } @@ -1137,13 +1148,13 @@ PRFileDesc * ns_job_get_fd(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state != NS_JOB_DELETED); if (job->state != NS_JOB_DELETED) { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return job->fd; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NULL; } } @@ -1152,18 +1163,40 @@ ns_result_t ns_job_set_done_cb(struct ns_job_t *job, ns_job_func_t func) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING); if (job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING) { job->done_cb = func; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_SUCCESS; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_INVALID_STATE; } } +ns_result_t +ns_job_wait(struct ns_job_t *job) { + PR_ASSERT(job); + pthread_mutex_lock(&(job->monitor)); + if (job->state == NS_JOB_WAITING) { + /* It's done */ + pthread_mutex_unlock(&(job->monitor)); + return NS_SUCCESS; + } else { + pthread_cond_wait(&(job->notify), &(job->monitor)); + ns_job_state_t result = job->state; + pthread_mutex_unlock(&(job->monitor)); + if (result == NS_JOB_WAITING) { + return NS_SUCCESS; + } else if (result == NS_JOB_NEEDS_DELETE) { + return NS_DELETING; + } else { + PR_ASSERT(1 == 0); + return NS_INVALID_STATE; + } + } +} /* * This is a convenience function - use if you need to re-arm the same event @@ -1173,7 +1206,7 @@ ns_result_t ns_job_rearm(ns_job_t *job) { PR_ASSERT(job); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); PR_ASSERT(job->state == NS_JOB_WAITING || job->state == NS_JOB_RUNNING); if (ns_thrpool_is_shutdown(job->tp)) { @@ -1186,7 +1219,7 @@ ns_job_rearm(ns_job_t *job) #endif job->state = NS_JOB_NEEDS_ARM; internal_ns_job_rearm(job); - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_SUCCESS; } else if (!NS_JOB_IS_PERSIST(job->job_type) && job->state == NS_JOB_RUNNING) { /* For this to be called, and NS_JOB_RUNNING, we *must* be the callback thread! */ @@ -1195,10 +1228,10 @@ ns_job_rearm(ns_job_t *job) ns_log(LOG_DEBUG, "ns_rearm_job %x state %d setting NS_JOB_NEEDS_ARM\n", job, job->state); #endif job->state = NS_JOB_NEEDS_ARM; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_SUCCESS; } else { - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); return NS_INVALID_STATE; } /* Unreachable code .... */ @@ -1254,7 +1287,7 @@ setup_event_q_wakeup(ns_thrpool_t *tp) NS_JOB_READ | NS_JOB_PERSIST | NS_JOB_PRESERVE_FD, wakeup_cb, NULL); - pthread_mutex_lock(job->monitor); + pthread_mutex_lock(&(job->monitor)); /* The event_queue wakeup is ready, arm it. */ #ifdef DEBUG @@ -1267,7 +1300,7 @@ setup_event_q_wakeup(ns_thrpool_t *tp) /* Stash the wakeup job in tp so we can release it later. */ tp->event_q_wakeup_job = job; - pthread_mutex_unlock(job->monitor); + pthread_mutex_unlock(&(job->monitor)); } /* Initialize the thrpool config */ @@ -1463,7 +1496,7 @@ ns_thrpool_destroy(struct ns_thrpool_t *tp) * and use it to wake up the event loop. */ - pthread_mutex_lock(tp->event_q_wakeup_job->monitor); + pthread_mutex_lock(&(tp->event_q_wakeup_job->monitor)); // tp->event_q_wakeup_job->job_type |= NS_JOB_THREAD; /* This triggers the job to "run", which will cause a shutdown cascade */ @@ -1471,7 +1504,7 @@ ns_thrpool_destroy(struct ns_thrpool_t *tp) ns_log(LOG_DEBUG, "ns_thrpool_destroy %x state %d moving to NS_JOB_NEEDS_DELETE\n", tp->event_q_wakeup_job, tp->event_q_wakeup_job->state); #endif tp->event_q_wakeup_job->state = NS_JOB_NEEDS_DELETE; - pthread_mutex_unlock(tp->event_q_wakeup_job->monitor); + pthread_mutex_unlock(&(tp->event_q_wakeup_job->monitor)); /* Has to be event_q_notify, not internal_job_done */ event_q_notify(tp->event_q_wakeup_job); diff --git a/src/nunc-stans/test/test_nuncstans.c b/src/nunc-stans/test/test_nuncstans.c index 629377a..afe3c02 100644 --- a/src/nunc-stans/test/test_nuncstans.c +++ b/src/nunc-stans/test/test_nuncstans.c @@ -55,14 +55,21 @@ /* We need the internal headers for state checks */ #include "../ns/ns_event_fw.h" +#include + +#include + #ifdef HAVE_STDLIB_H #include #endif static int cb_check = 0; -static PRLock *cb_lock = NULL; -static PRCondVar *cb_cond = NULL; + +static pthread_mutex_t cb_lock; +static pthread_cond_t cb_cond; +// static PRLock *cb_lock = NULL; +// static PRCondVar *cb_cond = NULL; void ns_test_logger(int priority __attribute__((unused)), const char *fmt, va_list varg) @@ -71,6 +78,19 @@ ns_test_logger(int priority __attribute__((unused)), const char *fmt, va_list va vprintf(fmt, varg); } +static int +cond_wait_rel(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict reltime) { + struct timespec now; + struct timespec abswait; + + clock_gettime(CLOCK_REALTIME, &now); + + abswait.tv_sec = now.tv_sec + reltime->tv_sec; + abswait.tv_nsec = now.tv_nsec + reltime->tv_nsec; + + return pthread_cond_timedwait(cond, mutex, &abswait); +} + /* All our other tests will use this in some form. */ static int ns_test_setup(void **state) @@ -81,8 +101,8 @@ ns_test_setup(void **state) /* Reset the callback check */ cb_check = 0; /* Create the cond var the CB check will use. */ - cb_lock = PR_NewLock(); - cb_cond = PR_NewCondVar(cb_lock); + assert(pthread_mutex_init(&cb_lock, NULL) == 0); + assert(pthread_cond_init(&cb_cond, NULL) == 0); ns_thrpool_config_init(&ns_config); @@ -105,8 +125,8 @@ ns_test_teardown(void **state) ns_thrpool_destroy(tp); - PR_DestroyCondVar(cb_cond); - PR_DestroyLock(cb_lock); + pthread_cond_destroy(&cb_cond); + pthread_mutex_destroy(&cb_lock); return 0; } @@ -114,24 +134,23 @@ ns_test_teardown(void **state) static void ns_init_test_job_cb(struct ns_job_t *job __attribute__((unused))) { + pthread_mutex_lock(&cb_lock); cb_check += 1; - PR_Lock(cb_lock); - PR_NotifyCondVar(cb_cond); - PR_Unlock(cb_lock); + pthread_cond_signal(&cb_cond); + pthread_mutex_unlock(&cb_lock); } static void ns_init_disarm_job_cb(struct ns_job_t *job) { if (ns_job_done(job) == NS_SUCCESS) { + pthread_mutex_lock(&cb_lock); cb_check = 1; + pthread_cond_signal(&cb_cond); + pthread_mutex_unlock(&cb_lock); } else { assert_int_equal(1, 0); } - PR_Lock(cb_lock); - PR_NotifyCondVar(cb_cond); - /* Disarm ourselves */ - PR_Unlock(cb_lock); } static void @@ -146,20 +165,20 @@ ns_init_test(void **state) { struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {1, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, NULL, &job), 0); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 1); /* Once the job is done, it's not in the event queue, and it's complete */ - /* We have to stall momentarily to let the work_job_execute release the job to us */ - PR_Sleep(PR_SecondsToInterval(1)); + assert(ns_job_wait(job) == NS_SUCCESS); assert_int_equal(ns_job_done(job), NS_SUCCESS); } @@ -169,19 +188,20 @@ ns_set_data_test(void **state) /* Add a job with data */ struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {1, 0}; char *data = malloc(6); strcpy(data, "first"); - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_test_job_cb, data, &job), NS_SUCCESS); /* Let the job run */ - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); /* Check that the data is correct */ char *retrieved = (char *)ns_job_get_data(job); @@ -193,16 +213,14 @@ ns_set_data_test(void **state) data = malloc(7); strcpy(data, "second"); - while (job->state != NS_JOB_WAITING) { - PR_Sleep(PR_MillisecondsToInterval(50)); - } + assert(ns_job_wait(job) == NS_SUCCESS); ns_job_set_data(job, data); /* Rearm, and let it run again. */ - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); ns_job_rearm(job); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); /* Make sure it's now what we expect */ retrieved = (char *)ns_job_get_data(job); @@ -218,9 +236,7 @@ ns_set_data_test(void **state) * waiting. we might need a load barrier here ... */ - while (job->state != NS_JOB_WAITING) { - PR_Sleep(PR_MillisecondsToInterval(50)); - } + assert(ns_job_wait(job) == NS_SUCCESS); assert_int_equal(ns_job_done(job), NS_SUCCESS); } @@ -230,8 +246,9 @@ ns_job_done_cb_test(void **state) { struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {1, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_create_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_do_nothing_cb, &job), NS_SUCCESS); @@ -240,8 +257,8 @@ ns_job_done_cb_test(void **state) /* Remove it */ assert_int_equal(ns_job_done(job), NS_SUCCESS); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 1); } @@ -250,16 +267,15 @@ static void ns_init_rearm_job_cb(struct ns_job_t *job) { if (ns_job_rearm(job) != NS_SUCCESS) { + pthread_mutex_lock(&cb_lock); cb_check = 1; /* we failed to re-arm as expected, let's go away ... */ assert_int_equal(ns_job_done(job), NS_SUCCESS); + pthread_cond_signal(&cb_cond); + pthread_mutex_unlock(&cb_lock); } else { assert_int_equal(1, 0); } - PR_Lock(cb_lock); - PR_NotifyCondVar(cb_cond); - /* Disarm ourselves */ - PR_Unlock(cb_lock); } static void @@ -268,8 +284,9 @@ ns_job_persist_rearm_ignore_test(void **state) /* Test that rearm ignores the persistent job. */ struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {1, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_create_job(tp, NS_JOB_NONE | NS_JOB_THREAD | NS_JOB_PERSIST, ns_init_rearm_job_cb, &job), NS_SUCCESS); @@ -281,8 +298,8 @@ ns_job_persist_rearm_ignore_test(void **state) * should see only 1 in the cb_check. */ - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); /* If we fail to rearm, this is set to 1 Which is what we want. */ assert_int_equal(cb_check, 1); @@ -294,6 +311,7 @@ ns_job_persist_disarm_test(void **state) /* Make a persistent job */ struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {2, 0}; assert_int_equal( ns_create_job(tp, NS_JOB_NONE | NS_JOB_PERSIST, ns_init_disarm_job_cb, &job), @@ -302,9 +320,9 @@ ns_job_persist_disarm_test(void **state) assert_int_equal(ns_job_rearm(job), NS_SUCCESS); /* In the callback it should disarm */ - PR_Lock(cb_lock); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + pthread_mutex_lock(&cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); /* Make sure it did */ assert_int_equal(cb_check, 1); } @@ -329,14 +347,13 @@ ns_job_persist_disarm_test(void **state) static void ns_init_race_done_job_cb(struct ns_job_t *job) { - cb_check += 1; ns_job_done(job); /* We need to sleep to let the job race happen */ PR_Sleep(PR_SecondsToInterval(2)); - PR_Lock(cb_lock); - PR_NotifyCondVar(cb_cond); - /* Disarm ourselves */ - PR_Unlock(cb_lock); + pthread_mutex_lock(&cb_lock); + cb_check += 1; + pthread_cond_signal(&cb_cond); + pthread_mutex_unlock(&cb_lock); } static void @@ -344,14 +361,15 @@ ns_job_race_done_test(void **state) { struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {5, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_add_job(tp, NS_JOB_NONE | NS_JOB_THREAD, ns_init_race_done_job_cb, NULL, &job), NS_SUCCESS); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(5)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 1); } @@ -365,8 +383,9 @@ ns_job_signal_cb_test(void **state) { struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; + struct timespec timeout = {1, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_int_equal( ns_add_signal_job(tp, SIGUSR1, NS_JOB_SIGNAL, ns_init_test_job_cb, NULL, &job), NS_SUCCESS); @@ -376,8 +395,8 @@ ns_job_signal_cb_test(void **state) /* Send the signal ... */ raise(SIGUSR1); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); - PR_Unlock(cb_lock); + assert(cond_wait_rel(&cb_cond, &cb_lock, &timeout) == 0); + pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 1); @@ -408,12 +427,11 @@ ns_job_neg_timeout_test(void **state) static void ns_timer_job_cb(struct ns_job_t *job) { - cb_check += 1; ns_job_done(job); - PR_Lock(cb_lock); - PR_NotifyCondVar(cb_cond); - /* Disarm ourselves */ - PR_Unlock(cb_lock); + pthread_mutex_lock(&cb_lock); + cb_check += 1; + pthread_cond_signal(&cb_cond); + pthread_mutex_unlock(&cb_lock); } static void @@ -421,16 +439,19 @@ ns_job_timer_test(void **state) { struct ns_thrpool_t *tp = *state; struct ns_job_t *job = NULL; - struct timeval tv = {2, 0}; + struct timeval tv = {3, 0}; + struct timespec timeout = {2, 0}; - PR_Lock(cb_lock); + pthread_mutex_lock(&cb_lock); assert_true(ns_add_timeout_job(tp, &tv, NS_JOB_THREAD, ns_timer_job_cb, NULL, &job) == NS_SUCCESS); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(1)); + cond_wait_rel(&cb_cond, &cb_lock, &timeout); + // pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 0); - PR_WaitCondVar(cb_cond, PR_SecondsToInterval(2)); - PR_Unlock(cb_lock); + // pthread_mutex_lock(&cb_lock); + cond_wait_rel(&cb_cond, &cb_lock, &timeout); + pthread_mutex_unlock(&cb_lock); assert_int_equal(cb_check, 1); } @@ -441,7 +462,9 @@ ns_job_timer_test(void **state) static void ns_timer_persist_job_cb(struct ns_job_t *job) { + pthread_mutex_lock(&cb_lock); cb_check += 1; + pthread_mutex_unlock(&cb_lock); if (cb_check < 10) { ns_job_rearm(job); } else { @@ -456,16 +479,19 @@ ns_job_timer_persist_test(void **state) struct ns_job_t *job = NULL; struct timeval tv = {1, 0}; - PR_Lock(cb_lock); assert_true(ns_add_timeout_job(tp, &tv, NS_JOB_THREAD, ns_timer_persist_job_cb, NULL, &job) == NS_SUCCESS); PR_Sleep(PR_SecondsToInterval(5)); + pthread_mutex_lock(&cb_lock); assert_true(cb_check <= 6); + pthread_mutex_unlock(&cb_lock); PR_Sleep(PR_SecondsToInterval(6)); + pthread_mutex_lock(&cb_lock); assert_int_equal(cb_check, 10); + pthread_mutex_unlock(&cb_lock); } int