diff --git a/src/libckpool.c b/src/libckpool.c index 3fc28643..77f682b9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -32,6 +32,7 @@ #include "ckpool.h" #include "libckpool.h" #include "sha2.h" +#include "utlist.h" #ifndef UNIX_PATH_MAX #define UNIX_PATH_MAX 108 @@ -75,6 +76,57 @@ void join_pthread(pthread_t thread) pthread_join(thread, NULL); } +/* Generic function for creating a message queue receiving and parsing thread */ +void *ckmsg_queue(void *arg) +{ + ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckmsg_t *msg; + + mutex_lock(&ckmsgq->lock); + if (!ckmsgq->msgs) + pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + msg = ckmsgq->msgs; + if (likely(msg)) + DL_DELETE(ckmsgq->msgs, msg); + mutex_unlock(&ckmsgq->lock); + + if (unlikely(!msg)) + continue; + ckmsgq->func(msg); + free(msg); + } + return NULL; +} + +void create_ckmsgq(const char *name, const void *func) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); + + strncpy(ckmsgq->name, name, 15); + ckmsgq->func = func; + mutex_init(&ckmsgq->lock); + cond_init(&ckmsgq->cond); + create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); +} + +/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq + * parsing thread to wake up and process it. */ +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +{ + ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); + + msg->data = data; + + mutex_lock(&ckmsgq->lock); + DL_APPEND(ckmsgq->msgs, msg); + pthread_cond_signal(&ckmsgq->cond); + mutex_unlock(&ckmsgq->lock); +} /* Place holders for when we add lock debugging */ #define GETLOCK(_lock, _file, _func, _line) diff --git a/src/libckpool.h b/src/libckpool.h index 8672d943..5b709d23 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -32,6 +32,8 @@ # include #endif +#include "utlist.h" + #ifndef bswap_16 #define bswap_16 __builtin_bswap16 #define bswap_32 __builtin_bswap32 @@ -260,6 +262,25 @@ typedef struct unixsock unixsock_t; typedef struct proc_instance proc_instance_t; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(void *); +}; + +typedef struct ckmsgq ckmsgq_t; + /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) {