From ea27e86a33069b795a11c451ca30d13cdf3d9917 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 19 Jun 2014 14:36:38 +1000 Subject: [PATCH] Make ckdb messaging use the generic ckmsg queues --- src/stratifier.c | 62 ++++++++++++------------------------------------ 1 file changed, 15 insertions(+), 47 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 5327963f..fff89f9b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -162,14 +162,12 @@ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; static pthread_mutex_t sshare_lock; static pthread_mutex_t sauth_lock; -static pthread_mutex_t ckdbq_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; static pthread_cond_t sshare_cond; static pthread_cond_t sauth_cond; -static pthread_cond_t ckdbq_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; @@ -190,16 +188,14 @@ static json_params_t *sshares; static json_params_t *sauths; struct ckdb_msg { - struct ckdb_msg *next; - struct ckdb_msg *prev; - + ckpool_t *ckp; json_t *val; int idtype; }; typedef struct ckdb_msg ckdb_msg_t; -static ckdb_msg_t *ckdb_msgs; +static ckmsgq_t *ckdbq; static int user_instance_id; @@ -427,11 +423,7 @@ static void ckdbq_add(const int idtype, json_t *val) msg->val = val; msg->idtype = idtype; - - mutex_lock(&ckdbq_lock); - DL_APPEND(ckdb_msgs, msg); - pthread_cond_signal(&ckdbq_cond); - mutex_unlock(&ckdbq_lock); + ckmsgq_add(ckdbq, msg); } static void send_workinfo(ckpool_t *ckp, workbase_t *wb) @@ -2102,42 +2094,21 @@ static void *authoriser(void *arg) return NULL; } -static void *ckdbqueue(void *arg) +static void ckdbq_process(ckdb_msg_t *data) { - ckpool_t *ckp = (ckpool_t *)arg; - - pthread_detach(pthread_self()); - rename_proc("ckdbqueue"); - - while (42) { - bool logged = false; - char *buf = NULL; - ckdb_msg_t *msg; - - mutex_lock(&ckdbq_lock); - if (!ckdb_msgs) - pthread_cond_wait(&ckdbq_cond, &ckdbq_lock); - msg = ckdb_msgs; - if (likely(msg)) - DL_DELETE(ckdb_msgs, msg); - mutex_unlock(&ckdbq_lock); - - if (unlikely(!msg)) - continue; + bool logged = false; + char *buf = NULL; - while (!buf) { - buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val, logged); - if (unlikely(!buf)) { - LOGWARNING("Failed to talk to ckdb, queueing messages"); - sleep(5); - } - logged = true; + while (!buf) { + buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged); + if (unlikely(!buf)) { + LOGWARNING("Failed to talk to ckdb, queueing messages"); + sleep(5); } - LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); - free(buf); + logged = true; } - - return NULL; + LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); + free(buf); } static const double nonces = 4294967296; @@ -2388,7 +2359,6 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; - pthread_t pth_ckdbqueue; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2430,9 +2400,7 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); - mutex_init(&ckdbq_lock); - cond_init(&ckdbq_cond); - create_pthread(&pth_ckdbqueue, ckdbqueue, ckp); + ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); if (!ckp->proxy)