diff --git a/src/ckpool.c b/src/ckpool.c index 8c522d46..e1d1186b 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -115,16 +115,16 @@ static void *ckmsg_queue(void *arg) tv_t now; ts_t abs; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs); + pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); msg = ckmsgq->msgs; if (msg) DL_DELETE(ckmsgq->msgs, msg); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); if (!msg) continue; @@ -141,13 +141,39 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) strncpy(ckmsgq->name, name, 15); ckmsgq->func = func; ckmsgq->ckp = ckp; - mutex_init(&ckmsgq->lock); - cond_init(&ckmsgq->cond); + ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); + ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(ckmsgq->lock); + cond_init(ckmsgq->cond); create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); return ckmsgq; } +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); + pthread_mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(pthread_mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckmsgq[i].name, 15, "%.8s%x", name, i); + ckmsgq[i].func = func; + ckmsgq[i].ckp = ckp; + ckmsgq[i].lock = lock; + ckmsgq[i].cond = cond; + create_pthread(&ckmsgq[i].pth, ckmsg_queue, &ckmsgq[i]); + } + + return ckmsgq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -156,10 +182,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) msg->data = data; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(&ckmsgq->cond); - mutex_unlock(&ckmsgq->lock); + pthread_cond_signal(ckmsgq->cond); + mutex_unlock(ckmsgq->lock); } /* Return whether there are any messages queued in the ckmsgq linked list. */ @@ -167,10 +193,10 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { bool ret = true; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); if (ckmsgq->msgs) ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 9284165d..d246e97f 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -34,8 +34,8 @@ struct ckmsgq { ckpool_t *ckp; char name[16]; pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_mutex_t *lock; + pthread_cond_t *cond; ckmsg_t *msgs; void (*func)(ckpool_t *, void *); }; @@ -181,6 +181,7 @@ struct ckpool_instance { #endif ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); diff --git a/src/stratifier.c b/src/stratifier.c index 69499cc9..c79a30a2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1110,13 +1110,13 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&ssends->lock); + mutex_lock(ssends->lock); if (ssends->msgs) DL_CONCAT(ssends->msgs, bulk_send); else ssends->msgs = bulk_send; - pthread_cond_signal(&ssends->cond); - mutex_unlock(&ssends->lock); + pthread_cond_signal(ssends->cond); + mutex_unlock(ssends->lock); } static void stratum_add_send(json_t *val, int64_t client_id)