diff --git a/src/ckpool.c b/src/ckpool.c index a1c4a514..6b8fa480 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg) return NULL; } +/* Generic workqueue function and message receiving and parsing thread */ +static void *ckwq_queue(void *arg) +{ + ckwq_t *ckmsgq = (ckwq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckwqmsg_t *wqmsg; + tv_t now; + ts_t abs; + + mutex_lock(ckmsgq->lock); + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec++; + if (!ckmsgq->wqmsgs) + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + wqmsg = ckmsgq->wqmsgs; + if (wqmsg) + DL_DELETE(ckmsgq->wqmsgs, wqmsg); + mutex_unlock(ckmsgq->lock); + + if (!wqmsg) + continue; + wqmsg->func(ckp, wqmsg->data); + free(wqmsg); + } + return NULL; +} + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons return ckmsgq; } +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) +{ + ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); + mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); + ckwq[i].ckp = ckp; + ckwq[i].lock = lock; + ckwq[i].cond = cond; + create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); + } + + return ckwq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(ckmsgq->cond); + pthread_cond_broadcast(ckmsgq->cond); mutex_unlock(ckmsgq->lock); } +void ckwq_add(ckwq_t *ckwq, const void *func, void *data) +{ + ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); + + wqmsg->func = func; + wqmsg->data = data; + + mutex_lock(ckwq->lock); + ckwq->messages++; + DL_APPEND(ckwq->wqmsgs, wqmsg); + pthread_cond_broadcast(ckwq->cond); + mutex_unlock(ckwq->lock); +} + /* Return whether there are any messages queued in the ckmsgq linked list. */ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { diff --git a/src/ckpool.h b/src/ckpool.h index 771ac692..eaf166fd 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -22,13 +22,22 @@ struct ckpool_instance; typedef struct ckpool_instance ckpool_t; +typedef struct ckmsg ckmsg_t; + struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; + ckmsg_t *next; + ckmsg_t *prev; void *data; }; -typedef struct ckmsg ckmsg_t; +typedef struct ckwqmsg ckwqmsg_t; + +struct ckwqmsg { + ckwqmsg_t *next; + ckwqmsg_t *prev; + void *data; + void (*func)(ckpool_t *, void *); +}; struct ckmsgq { ckpool_t *ckp; @@ -43,6 +52,18 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; +struct ckwq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + mutex_t *lock; + pthread_cond_t *cond; + ckwqmsg_t *wqmsgs; + int64_t messages; +}; + +typedef struct ckwq ckwq_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -204,7 +225,9 @@ struct ckpool_instance { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); +void ckwq_add(ckwq_t *ckwq, const void *func, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); ckpool_t *global_ckp;