diff --git a/src/ckpool.c b/src/ckpool.c index ad5b0ba7..98203208 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -74,6 +74,62 @@ void logmsg(int loglevel, const char *fmt, ...) { } } +/* Generic function for creating a message queue receiving and parsing thread */ +static void *ckmsg_queue(void *arg) +{ + ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckmsg_t *msg; + + mutex_lock(&ckmsgq->lock); + if (!ckmsgq->msgs) + pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + msg = ckmsgq->msgs; + if (likely(msg)) + DL_DELETE(ckmsgq->msgs, msg); + mutex_unlock(&ckmsgq->lock); + + if (unlikely(!msg)) + continue; + ckmsgq->func(ckp, msg); + free(msg); + } + return NULL; +} + +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); + + strncpy(ckmsgq->name, name, 15); + ckmsgq->func = func; + ckmsgq->ckp = ckp; + mutex_init(&ckmsgq->lock); + cond_init(&ckmsgq->cond); + create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); + + return ckmsgq; +} + +/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq + * parsing thread to wake up and process it. */ +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +{ + ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); + + msg->data = data; + + mutex_lock(&ckmsgq->lock); + DL_APPEND(ckmsgq->msgs, msg); + pthread_cond_signal(&ckmsgq->cond); + mutex_unlock(&ckmsgq->lock); +} + /* Listen for incoming global requests. Always returns a response if possible */ static void *listener(void *arg) { diff --git a/src/ckpool.h b/src/ckpool.h index 804095a0..f74481a0 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -126,6 +126,28 @@ struct ckpool_instance { char **proxypass; }; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(ckpool_t *, void *); +}; + +typedef struct ckmsgq ckmsgq_t; +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); + ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); diff --git a/src/libckpool.c b/src/libckpool.c index c7af5c13..604c8ec0 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -76,60 +76,6 @@ void join_pthread(pthread_t thread) pthread_join(thread, NULL); } -/* Generic function for creating a message queue receiving and parsing thread */ -static void *ckmsg_queue(void *arg) -{ - ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; - - pthread_detach(pthread_self()); - rename_proc(ckmsgq->name); - - while (42) { - ckmsg_t *msg; - - mutex_lock(&ckmsgq->lock); - if (!ckmsgq->msgs) - pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); - msg = ckmsgq->msgs; - if (likely(msg)) - DL_DELETE(ckmsgq->msgs, msg); - mutex_unlock(&ckmsgq->lock); - - if (unlikely(!msg)) - continue; - ckmsgq->func(msg); - free(msg); - } - return NULL; -} - -ckmsgq_t *create_ckmsgq(const char *name, const void *func) -{ - ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); - - strncpy(ckmsgq->name, name, 15); - ckmsgq->func = func; - mutex_init(&ckmsgq->lock); - cond_init(&ckmsgq->cond); - create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); - - return ckmsgq; -} - -/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq - * parsing thread to wake up and process it. */ -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) -{ - ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); - - msg->data = data; - - mutex_lock(&ckmsgq->lock); - DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(&ckmsgq->cond); - mutex_unlock(&ckmsgq->lock); -} - /* Place holders for when we add lock debugging */ #define GETLOCK(_lock, _file, _func, _line) #define GOTLOCK(_lock, _file, _func, _line) diff --git a/src/libckpool.h b/src/libckpool.h index 6ce914d4..888bacdb 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -262,27 +262,6 @@ typedef struct unixsock unixsock_t; typedef struct proc_instance proc_instance_t; -struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; - void *data; -}; - -typedef struct ckmsg ckmsg_t; - -struct ckmsgq { - char name[16]; - pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; - ckmsg_t *msgs; - void (*func)(void *); -}; - -typedef struct ckmsgq ckmsgq_t; -ckmsgq_t *create_ckmsgq(const char *name, const void *func); -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); - /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) { diff --git a/src/stratifier.c b/src/stratifier.c index fff89f9b..749bd880 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -188,7 +188,6 @@ static json_params_t *sshares; static json_params_t *sauths; struct ckdb_msg { - ckpool_t *ckp; json_t *val; int idtype; }; @@ -2094,13 +2093,13 @@ static void *authoriser(void *arg) return NULL; } -static void ckdbq_process(ckdb_msg_t *data) +static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) { bool logged = false; char *buf = NULL; while (!buf) { - buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged); + buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); if (unlikely(!buf)) { LOGWARNING("Failed to talk to ckdb, queueing messages"); sleep(5); @@ -2400,7 +2399,7 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); - ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process); + ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); if (!ckp->proxy)