From 72df09d650ed9c88637f21e4985faca085f5feff Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Jun 2014 22:16:55 +1000 Subject: [PATCH] Move the stratifier stratum send and receive queues to generic ckmsg queues --- src/generator.c | 11 ++- src/stratifier.c | 190 ++++++++++++++++------------------------------- src/stratifier.h | 10 --- 3 files changed, 75 insertions(+), 136 deletions(-) diff --git a/src/generator.c b/src/generator.c index cee2ecb9..af2fa6aa 100644 --- a/src/generator.c +++ b/src/generator.c @@ -18,7 +18,6 @@ #include "libckpool.h" #include "generator.h" #include "bitcoin.h" -#include "stratifier.h" #include "uthash.h" #include "utlist.h" @@ -54,6 +53,16 @@ struct share_msg { typedef struct share_msg share_msg_t; +struct stratum_msg { + struct stratum_msg *next; + struct stratum_msg *prev; + + json_t *json_msg; + int client_id; +}; + +typedef struct stratum_msg stratum_msg_t; + /* Per proxied pool instance data */ struct proxy_instance { ckpool_t *ckp; diff --git a/src/stratifier.c b/src/stratifier.c index c992c8f3..49bbe00e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -157,18 +157,6 @@ static int64_t workbase_id; static int64_t blockchange_id; static char lasthash[68]; -/* For protecting the stratum msg data */ -static pthread_mutex_t stratum_recv_lock; -static pthread_mutex_t stratum_send_lock; - -/* For signalling the threads to wake up and do work */ -static pthread_cond_t stratum_recv_cond; -static pthread_cond_t stratum_send_cond; - -/* For the linked list of all queued messages */ -static stratum_msg_t *stratum_recvs; -static stratum_msg_t *stratum_sends; - struct json_params { json_t *params; json_t *id_val; @@ -184,9 +172,19 @@ struct ckdb_msg { typedef struct ckdb_msg ckdb_msg_t; -static ckmsgq_t *ckdbq; -static ckmsgq_t *sshareq; -static ckmsgq_t *sauthq; +/* Stratum json messages with their associated client id */ +struct smsg { + json_t *json_msg; + int client_id; +}; + +typedef struct smsg smsg_t; + +static ckmsgq_t *ssends; // Stratum sends +static ckmsgq_t *srecvs; // Stratum receives +static ckmsgq_t *ckdbq; // ckdb +static ckmsgq_t *sshareq; // Stratum share sends +static ckmsgq_t *sauthq; // Stratum authorisations static int user_instance_id; @@ -779,15 +777,11 @@ out: static void stratum_add_recvd(json_t *val) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; - - mutex_lock(&stratum_recv_lock); - DL_APPEND(stratum_recvs, msg); - pthread_cond_signal(&stratum_recv_cond); - mutex_unlock(&stratum_recv_lock); + ckmsgq_add(srecvs, msg); } /* For creating a list of sends without locking that can then be concatenated @@ -796,7 +790,7 @@ static void stratum_add_recvd(json_t *val) static void stratum_broadcast(json_t *val) { stratum_instance_t *instance, *tmp; - stratum_msg_t *bulk_send = NULL; + ckmsg_t *bulk_send = NULL; if (unlikely(!val)) { LOGERR("Sent null json to stratum_broadcast"); @@ -805,14 +799,17 @@ static void stratum_broadcast(json_t *val) ck_rlock(&instance_lock); HASH_ITER(hh, stratum_instances, instance, tmp) { - stratum_msg_t *msg; + ckmsg_t *client_msg; + smsg_t *msg; if (!instance->authorised) continue; - msg = ckzalloc(sizeof(stratum_msg_t)); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = json_deep_copy(val); msg->client_id = instance->id; - DL_APPEND(bulk_send, msg); + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); } ck_runlock(&instance_lock); @@ -821,27 +818,23 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&stratum_send_lock); - if (stratum_sends) - DL_CONCAT(stratum_sends, bulk_send); + mutex_lock(&ssends->lock); + if (ssends->msgs) + DL_CONCAT(ssends->msgs, bulk_send); else - stratum_sends = bulk_send; - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ssends->msgs = bulk_send; + pthread_cond_signal(&ssends->cond); + mutex_unlock(&ssends->lock); } static void stratum_add_send(json_t *val, int client_id) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; msg->client_id = client_id; - - mutex_lock(&stratum_send_lock); - DL_APPEND(stratum_sends, msg); - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ckmsgq_add(ssends, msg); } static void drop_client(int id) @@ -1845,7 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val /* Unhandled message here */ } -static void parse_instance_msg(stratum_msg_t *msg) +static void parse_instance_msg(smsg_t *msg) { json_t *val = msg->json_msg, *id_val, *method, *params; int client_id = msg->client_id; @@ -1873,98 +1866,52 @@ out: free(msg); } -static void *stratum_receiver(void *arg) +static void srecv_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg; - - pthread_detach(pthread_self()); - rename_proc("sreceiver"); - - while (42) { - stratum_instance_t *instance; - - /* Pop the head off the list if it exists or wait for a conditional - * signal telling us there is work */ - mutex_lock(&stratum_recv_lock); - if (!stratum_recvs) - pthread_cond_wait(&stratum_recv_cond, &stratum_recv_lock); - msg = stratum_recvs; - if (likely(msg)) - DL_DELETE(stratum_recvs, msg); - mutex_unlock(&stratum_recv_lock); - - if (unlikely(!msg)) - continue; + stratum_instance_t *instance; - msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); - json_object_del(msg->json_msg, "client_id"); - - /* Parse the message here */ - ck_ilock(&instance_lock); - instance = __instance_by_id(msg->client_id); - if (!instance) { - /* client_id instance doesn't exist yet, create one */ - ck_ulock(&instance_lock); - instance = __stratum_add_instance(ckp, msg->client_id); - ck_dwilock(&instance_lock); - } - ck_uilock(&instance_lock); + msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); + json_object_del(msg->json_msg, "client_id"); - parse_instance_msg(msg); + /* Parse the message here */ + ck_ilock(&instance_lock); + instance = __instance_by_id(msg->client_id); + if (!instance) { + /* client_id instance doesn't exist yet, create one */ + ck_ulock(&instance_lock); + instance = __stratum_add_instance(ckp, msg->client_id); + ck_dwilock(&instance_lock); } + ck_uilock(&instance_lock); + + parse_instance_msg(msg); - return NULL; } -static void discard_stratum_msg(stratum_msg_t **msg) +static void discard_stratum_msg(smsg_t **msg) { json_decref((*msg)->json_msg); free(*msg); *msg = NULL; } -static void *stratum_sender(void *arg) +static void ssend_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg = NULL; - - pthread_detach(pthread_self()); - rename_proc("ssender"); - - while (42) { - char *s; - - if (msg) - discard_stratum_msg(&msg); - - mutex_lock(&stratum_send_lock); - if (!stratum_sends) - pthread_cond_wait(&stratum_send_cond, &stratum_send_lock); - msg = stratum_sends; - if (likely(msg)) - DL_DELETE(stratum_sends, msg); - mutex_unlock(&stratum_send_lock); - - if (unlikely(!msg)) - continue; - - if (unlikely(!msg->json_msg)) { - LOGERR("Sent null json msg to stratum_sender"); - continue; - } - - /* Add client_id to the json message and send it to the - * connector process to be delivered */ - json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); - s = json_dumps(msg->json_msg, 0); - send_proc(ckp->connector, s); - free(s); + char *s; - discard_stratum_msg(&msg); + if (unlikely(!msg->json_msg)) { + LOGERR("Sent null json msg to stratum_sender"); + free(msg); + return; } - return NULL; + /* Add client_id to the json message and send it to the + * connector process to be delivered */ + json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); + s = json_dumps(msg->json_msg, 0); + send_proc(ckp->connector, s); + free(s); + discard_stratum_msg(&msg); } static void discard_json_params(json_params_t **jp) @@ -2299,8 +2246,7 @@ static void *statsupdate(void *arg) int stratifier(proc_instance_t *pi) { - pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate; + pthread_t pth_blockupdate, pth_statsupdate; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2326,14 +2272,8 @@ int stratifier(proc_instance_t *pi) cklock_init(&instance_lock); - mutex_init(&stratum_send_lock); - cond_init(&stratum_send_cond); - create_pthread(&pth_stratum_sender, stratum_sender, ckp); - - mutex_init(&stratum_recv_lock); - cond_init(&stratum_recv_cond); - create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - + ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); diff --git a/src/stratifier.h b/src/stratifier.h index e6855878..0f41b91c 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,16 +10,6 @@ #ifndef STRATIFIER_H #define STRATIFIER_H -struct stratum_msg { - struct stratum_msg *next; - struct stratum_msg *prev; - - json_t *json_msg; - int client_id; -}; - -typedef struct stratum_msg stratum_msg_t; - int stratifier(proc_instance_t *pi); #endif /* STRATIFIER_H */