diff --git a/src/ckpool.c b/src/ckpool.c index 161ae8e1..abb54e89 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -248,6 +248,30 @@ static void *unix_receiver(void *arg) return NULL; } +/* Get the next message in the receive queue, or wait up to 5 seconds for + * the next message, returning NULL if no message is received in that time. */ +unix_msg_t *get_unix_msg(proc_instance_t *pi) +{ + unix_msg_t *umsg; + + mutex_lock(&pi->rmsg_lock); + if (!pi->unix_msgs) { + tv_t now; + ts_t abs; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += 5; + cond_timedwait(&pi->rmsg_cond, &pi->rmsg_lock, &abs); + } + umsg = pi->unix_msgs; + if (umsg) + DL_DELETE(pi->unix_msgs, umsg); + mutex_unlock(&pi->rmsg_lock); + + return umsg; +} + void create_unix_receiver(proc_instance_t *pi) { pthread_t pth; diff --git a/src/ckpool.h b/src/ckpool.h index 08eed951..09744d88 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -233,6 +233,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); +unix_msg_t *get_unix_msg(proc_instance_t *pi); void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp;