From 50463445c04865baf75f268a67ef0d8aed92bc26 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 19 Jun 2014 14:16:18 +1000 Subject: [PATCH 1/6] Create a set of generic queue receiving thread and processing functions --- src/libckpool.c | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ src/libckpool.h | 21 ++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/src/libckpool.c b/src/libckpool.c index 3fc28643..77f682b9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -32,6 +32,7 @@ #include "ckpool.h" #include "libckpool.h" #include "sha2.h" +#include "utlist.h" #ifndef UNIX_PATH_MAX #define UNIX_PATH_MAX 108 @@ -75,6 +76,57 @@ void join_pthread(pthread_t thread) pthread_join(thread, NULL); } +/* Generic function for creating a message queue receiving and parsing thread */ +void *ckmsg_queue(void *arg) +{ + ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckmsg_t *msg; + + mutex_lock(&ckmsgq->lock); + if (!ckmsgq->msgs) + pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + msg = ckmsgq->msgs; + if (likely(msg)) + DL_DELETE(ckmsgq->msgs, msg); + mutex_unlock(&ckmsgq->lock); + + if (unlikely(!msg)) + continue; + ckmsgq->func(msg); + free(msg); + } + return NULL; +} + +void create_ckmsgq(const char *name, const void *func) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); + + strncpy(ckmsgq->name, name, 15); + ckmsgq->func = func; + mutex_init(&ckmsgq->lock); + cond_init(&ckmsgq->cond); + create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); +} + +/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq + * parsing thread to wake up and process it. */ +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +{ + ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); + + msg->data = data; + + mutex_lock(&ckmsgq->lock); + DL_APPEND(ckmsgq->msgs, msg); + pthread_cond_signal(&ckmsgq->cond); + mutex_unlock(&ckmsgq->lock); +} /* Place holders for when we add lock debugging */ #define GETLOCK(_lock, _file, _func, _line) diff --git a/src/libckpool.h b/src/libckpool.h index 8672d943..5b709d23 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -32,6 +32,8 @@ # include #endif +#include "utlist.h" + #ifndef bswap_16 #define bswap_16 __builtin_bswap16 #define bswap_32 __builtin_bswap32 @@ -260,6 +262,25 @@ typedef struct unixsock unixsock_t; typedef struct proc_instance proc_instance_t; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(void *); +}; + +typedef struct ckmsgq ckmsgq_t; + /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) { From 997a88191cd69396eb892f4a69c2b8642006f50b Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 19 Jun 2014 14:31:28 +1000 Subject: [PATCH 2/6] Make create_ckmsgq return a pointer to the generated queue and export the ckdbq functions --- src/libckpool.c | 6 ++++-- src/libckpool.h | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 77f682b9..c7af5c13 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -77,7 +77,7 @@ void join_pthread(pthread_t thread) } /* Generic function for creating a message queue receiving and parsing thread */ -void *ckmsg_queue(void *arg) +static void *ckmsg_queue(void *arg) { ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; @@ -103,7 +103,7 @@ void *ckmsg_queue(void *arg) return NULL; } -void create_ckmsgq(const char *name, const void *func) +ckmsgq_t *create_ckmsgq(const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -112,6 +112,8 @@ void create_ckmsgq(const char *name, const void *func) mutex_init(&ckmsgq->lock); cond_init(&ckmsgq->cond); create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); + + return ckmsgq; } /* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq diff --git a/src/libckpool.h b/src/libckpool.h index 5b709d23..6ce914d4 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -280,6 +280,8 @@ struct ckmsgq { }; typedef struct ckmsgq ckmsgq_t; +ckmsgq_t *create_ckmsgq(const char *name, const void *func); +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) From ea27e86a33069b795a11c451ca30d13cdf3d9917 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 19 Jun 2014 14:36:38 +1000 Subject: [PATCH 3/6] Make ckdb messaging use the generic ckmsg queues --- src/stratifier.c | 62 ++++++++++++------------------------------------ 1 file changed, 15 insertions(+), 47 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 5327963f..fff89f9b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -162,14 +162,12 @@ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; static pthread_mutex_t sshare_lock; static pthread_mutex_t sauth_lock; -static pthread_mutex_t ckdbq_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; static pthread_cond_t sshare_cond; static pthread_cond_t sauth_cond; -static pthread_cond_t ckdbq_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; @@ -190,16 +188,14 @@ static json_params_t *sshares; static json_params_t *sauths; struct ckdb_msg { - struct ckdb_msg *next; - struct ckdb_msg *prev; - + ckpool_t *ckp; json_t *val; int idtype; }; typedef struct ckdb_msg ckdb_msg_t; -static ckdb_msg_t *ckdb_msgs; +static ckmsgq_t *ckdbq; static int user_instance_id; @@ -427,11 +423,7 @@ static void ckdbq_add(const int idtype, json_t *val) msg->val = val; msg->idtype = idtype; - - mutex_lock(&ckdbq_lock); - DL_APPEND(ckdb_msgs, msg); - pthread_cond_signal(&ckdbq_cond); - mutex_unlock(&ckdbq_lock); + ckmsgq_add(ckdbq, msg); } static void send_workinfo(ckpool_t *ckp, workbase_t *wb) @@ -2102,42 +2094,21 @@ static void *authoriser(void *arg) return NULL; } -static void *ckdbqueue(void *arg) +static void ckdbq_process(ckdb_msg_t *data) { - ckpool_t *ckp = (ckpool_t *)arg; - - pthread_detach(pthread_self()); - rename_proc("ckdbqueue"); - - while (42) { - bool logged = false; - char *buf = NULL; - ckdb_msg_t *msg; - - mutex_lock(&ckdbq_lock); - if (!ckdb_msgs) - pthread_cond_wait(&ckdbq_cond, &ckdbq_lock); - msg = ckdb_msgs; - if (likely(msg)) - DL_DELETE(ckdb_msgs, msg); - mutex_unlock(&ckdbq_lock); - - if (unlikely(!msg)) - continue; + bool logged = false; + char *buf = NULL; - while (!buf) { - buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val, logged); - if (unlikely(!buf)) { - LOGWARNING("Failed to talk to ckdb, queueing messages"); - sleep(5); - } - logged = true; + while (!buf) { + buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged); + if (unlikely(!buf)) { + LOGWARNING("Failed to talk to ckdb, queueing messages"); + sleep(5); } - LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); - free(buf); + logged = true; } - - return NULL; + LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); + free(buf); } static const double nonces = 4294967296; @@ -2388,7 +2359,6 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; - pthread_t pth_ckdbqueue; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2430,9 +2400,7 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); - mutex_init(&ckdbq_lock); - cond_init(&ckdbq_cond); - create_pthread(&pth_ckdbqueue, ckdbqueue, ckp); + ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); if (!ckp->proxy) From e8c47e29fa39e01bcd8757193df9720b8df34983 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 19 Jun 2014 15:17:04 +1000 Subject: [PATCH 4/6] Move generic ckmsgq functions to ckpool.c to allow ckp to be included --- src/ckpool.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++ src/ckpool.h | 22 +++++++++++++++++++ src/libckpool.c | 54 ---------------------------------------------- src/libckpool.h | 21 ------------------ src/stratifier.c | 7 +++--- 5 files changed, 81 insertions(+), 79 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index ad5b0ba7..98203208 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -74,6 +74,62 @@ void logmsg(int loglevel, const char *fmt, ...) { } } +/* Generic function for creating a message queue receiving and parsing thread */ +static void *ckmsg_queue(void *arg) +{ + ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckmsg_t *msg; + + mutex_lock(&ckmsgq->lock); + if (!ckmsgq->msgs) + pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + msg = ckmsgq->msgs; + if (likely(msg)) + DL_DELETE(ckmsgq->msgs, msg); + mutex_unlock(&ckmsgq->lock); + + if (unlikely(!msg)) + continue; + ckmsgq->func(ckp, msg); + free(msg); + } + return NULL; +} + +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); + + strncpy(ckmsgq->name, name, 15); + ckmsgq->func = func; + ckmsgq->ckp = ckp; + mutex_init(&ckmsgq->lock); + cond_init(&ckmsgq->cond); + create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); + + return ckmsgq; +} + +/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq + * parsing thread to wake up and process it. */ +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +{ + ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); + + msg->data = data; + + mutex_lock(&ckmsgq->lock); + DL_APPEND(ckmsgq->msgs, msg); + pthread_cond_signal(&ckmsgq->cond); + mutex_unlock(&ckmsgq->lock); +} + /* Listen for incoming global requests. Always returns a response if possible */ static void *listener(void *arg) { diff --git a/src/ckpool.h b/src/ckpool.h index 804095a0..f74481a0 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -126,6 +126,28 @@ struct ckpool_instance { char **proxypass; }; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(ckpool_t *, void *); +}; + +typedef struct ckmsgq ckmsgq_t; +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); + ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); diff --git a/src/libckpool.c b/src/libckpool.c index c7af5c13..604c8ec0 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -76,60 +76,6 @@ void join_pthread(pthread_t thread) pthread_join(thread, NULL); } -/* Generic function for creating a message queue receiving and parsing thread */ -static void *ckmsg_queue(void *arg) -{ - ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; - - pthread_detach(pthread_self()); - rename_proc(ckmsgq->name); - - while (42) { - ckmsg_t *msg; - - mutex_lock(&ckmsgq->lock); - if (!ckmsgq->msgs) - pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); - msg = ckmsgq->msgs; - if (likely(msg)) - DL_DELETE(ckmsgq->msgs, msg); - mutex_unlock(&ckmsgq->lock); - - if (unlikely(!msg)) - continue; - ckmsgq->func(msg); - free(msg); - } - return NULL; -} - -ckmsgq_t *create_ckmsgq(const char *name, const void *func) -{ - ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); - - strncpy(ckmsgq->name, name, 15); - ckmsgq->func = func; - mutex_init(&ckmsgq->lock); - cond_init(&ckmsgq->cond); - create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); - - return ckmsgq; -} - -/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq - * parsing thread to wake up and process it. */ -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) -{ - ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); - - msg->data = data; - - mutex_lock(&ckmsgq->lock); - DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(&ckmsgq->cond); - mutex_unlock(&ckmsgq->lock); -} - /* Place holders for when we add lock debugging */ #define GETLOCK(_lock, _file, _func, _line) #define GOTLOCK(_lock, _file, _func, _line) diff --git a/src/libckpool.h b/src/libckpool.h index 6ce914d4..888bacdb 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -262,27 +262,6 @@ typedef struct unixsock unixsock_t; typedef struct proc_instance proc_instance_t; -struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; - void *data; -}; - -typedef struct ckmsg ckmsg_t; - -struct ckmsgq { - char name[16]; - pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; - ckmsg_t *msgs; - void (*func)(void *); -}; - -typedef struct ckmsgq ckmsgq_t; -ckmsgq_t *create_ckmsgq(const char *name, const void *func); -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); - /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) { diff --git a/src/stratifier.c b/src/stratifier.c index fff89f9b..749bd880 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -188,7 +188,6 @@ static json_params_t *sshares; static json_params_t *sauths; struct ckdb_msg { - ckpool_t *ckp; json_t *val; int idtype; }; @@ -2094,13 +2093,13 @@ static void *authoriser(void *arg) return NULL; } -static void ckdbq_process(ckdb_msg_t *data) +static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) { bool logged = false; char *buf = NULL; while (!buf) { - buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged); + buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); if (unlikely(!buf)) { LOGWARNING("Failed to talk to ckdb, queueing messages"); sleep(5); @@ -2400,7 +2399,7 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); - ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process); + ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); if (!ckp->proxy) From 93ecc3a5c82c1496c18720212bc2f6370cff7d06 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Jun 2014 21:29:52 +1000 Subject: [PATCH 5/6] Convert the authoriser and share processor to using the generic ckmsg queues and fix the data passed to the queue function --- src/ckpool.c | 2 +- src/stratifier.c | 169 +++++++++++++++-------------------------------- 2 files changed, 54 insertions(+), 117 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 98203208..84639d9e 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -96,7 +96,7 @@ static void *ckmsg_queue(void *arg) if (unlikely(!msg)) continue; - ckmsgq->func(ckp, msg); + ckmsgq->func(ckp, msg->data); free(msg); } return NULL; diff --git a/src/stratifier.c b/src/stratifier.c index 749bd880..c992c8f3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -160,23 +160,16 @@ static char lasthash[68]; /* For protecting the stratum msg data */ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; -static pthread_mutex_t sshare_lock; -static pthread_mutex_t sauth_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; -static pthread_cond_t sshare_cond; -static pthread_cond_t sauth_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_sends; struct json_params { - struct json_params *next; - struct json_params *prev; - json_t *params; json_t *id_val; int client_id; @@ -184,9 +177,6 @@ struct json_params { typedef struct json_params json_params_t; -static json_params_t *sshares; -static json_params_t *sauths; - struct ckdb_msg { json_t *val; int idtype; @@ -195,6 +185,8 @@ struct ckdb_msg { typedef struct ckdb_msg ckdb_msg_t; static ckmsgq_t *ckdbq; +static ckmsgq_t *sshareq; +static ckmsgq_t *sauthq; static int user_instance_id; @@ -1826,11 +1818,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.auth", 11)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sauth_lock); - DL_APPEND(sauths, jp); - pthread_cond_signal(&sauth_cond); - mutex_unlock(&sauth_lock); - + ckmsgq_add(sauthq, jp); return; } @@ -1850,11 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.submit", 13)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sshare_lock); - DL_APPEND(sshares, jp); - pthread_cond_signal(&sshare_cond); - mutex_unlock(&sshare_lock); - + ckmsgq_add(sshareq, jp); return; } @@ -1991,106 +1975,65 @@ static void discard_json_params(json_params_t **jp) *jp = NULL; } -static void *share_processor(void *arg) +static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) { - ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("sprocessor"); - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sshare_lock); - if (!sshares) - pthread_cond_wait(&sshare_cond, &sshare_lock); - jp = sshares; - if (likely(jp)) - DL_DELETE(sshares, jp); - mutex_unlock(&sshare_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); - continue; - } - json_msg = json_object(); - result_val = parse_submit(client, json_msg, jp->params, &err_val); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); + goto out; } - - return NULL; + json_msg = json_object(); + result_val = parse_submit(client, json_msg, jp->params, &err_val); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); } -static void *authoriser(void *arg) +static void sauth_process(ckpool_t *ckp, json_params_t *jp) { - ckpool_t *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("authoriser"); - - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - char buf[256]; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sauth_lock); - if (!sauths) - pthread_cond_wait(&sauth_cond, &sauth_lock); - jp = sauths; - if (likely(jp)) - DL_DELETE(sauths, jp); - mutex_unlock(&sauth_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); - continue; - } - result_val = parse_authorise(client, jp->params, &err_val); - if (json_is_true(result_val)) { - snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, - client->user_instance->username); - stratum_send_message(client, buf); - } else - stratum_send_message(client, "Failed authorisation :("); - json_msg = json_object(); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); + goto out; } + result_val = parse_authorise(client, jp->params, &err_val); + if (json_is_true(result_val)) { + char *buf; + + ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, + client->user_instance->username); + stratum_send_message(client, buf); + } else + stratum_send_message(client, "Failed authorisation :("); + json_msg = json_object(); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); - return NULL; } static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) @@ -2357,7 +2300,7 @@ static void *statsupdate(void *arg) int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; + pthread_t pth_statsupdate; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2391,14 +2334,8 @@ int stratifier(proc_instance_t *pi) cond_init(&stratum_recv_cond); create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - mutex_init(&sshare_lock); - cond_init(&sshare_cond); - create_pthread(&pth_share_processer, share_processor, ckp); - - mutex_init(&sauth_lock); - cond_init(&sauth_cond); - create_pthread(&pth_authoriser, authoriser, ckp); - + sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); + sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); From 72df09d650ed9c88637f21e4985faca085f5feff Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Jun 2014 22:16:55 +1000 Subject: [PATCH 6/6] Move the stratifier stratum send and receive queues to generic ckmsg queues --- src/generator.c | 11 ++- src/stratifier.c | 190 ++++++++++++++++------------------------------- src/stratifier.h | 10 --- 3 files changed, 75 insertions(+), 136 deletions(-) diff --git a/src/generator.c b/src/generator.c index cee2ecb9..af2fa6aa 100644 --- a/src/generator.c +++ b/src/generator.c @@ -18,7 +18,6 @@ #include "libckpool.h" #include "generator.h" #include "bitcoin.h" -#include "stratifier.h" #include "uthash.h" #include "utlist.h" @@ -54,6 +53,16 @@ struct share_msg { typedef struct share_msg share_msg_t; +struct stratum_msg { + struct stratum_msg *next; + struct stratum_msg *prev; + + json_t *json_msg; + int client_id; +}; + +typedef struct stratum_msg stratum_msg_t; + /* Per proxied pool instance data */ struct proxy_instance { ckpool_t *ckp; diff --git a/src/stratifier.c b/src/stratifier.c index c992c8f3..49bbe00e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -157,18 +157,6 @@ static int64_t workbase_id; static int64_t blockchange_id; static char lasthash[68]; -/* For protecting the stratum msg data */ -static pthread_mutex_t stratum_recv_lock; -static pthread_mutex_t stratum_send_lock; - -/* For signalling the threads to wake up and do work */ -static pthread_cond_t stratum_recv_cond; -static pthread_cond_t stratum_send_cond; - -/* For the linked list of all queued messages */ -static stratum_msg_t *stratum_recvs; -static stratum_msg_t *stratum_sends; - struct json_params { json_t *params; json_t *id_val; @@ -184,9 +172,19 @@ struct ckdb_msg { typedef struct ckdb_msg ckdb_msg_t; -static ckmsgq_t *ckdbq; -static ckmsgq_t *sshareq; -static ckmsgq_t *sauthq; +/* Stratum json messages with their associated client id */ +struct smsg { + json_t *json_msg; + int client_id; +}; + +typedef struct smsg smsg_t; + +static ckmsgq_t *ssends; // Stratum sends +static ckmsgq_t *srecvs; // Stratum receives +static ckmsgq_t *ckdbq; // ckdb +static ckmsgq_t *sshareq; // Stratum share sends +static ckmsgq_t *sauthq; // Stratum authorisations static int user_instance_id; @@ -779,15 +777,11 @@ out: static void stratum_add_recvd(json_t *val) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; - - mutex_lock(&stratum_recv_lock); - DL_APPEND(stratum_recvs, msg); - pthread_cond_signal(&stratum_recv_cond); - mutex_unlock(&stratum_recv_lock); + ckmsgq_add(srecvs, msg); } /* For creating a list of sends without locking that can then be concatenated @@ -796,7 +790,7 @@ static void stratum_add_recvd(json_t *val) static void stratum_broadcast(json_t *val) { stratum_instance_t *instance, *tmp; - stratum_msg_t *bulk_send = NULL; + ckmsg_t *bulk_send = NULL; if (unlikely(!val)) { LOGERR("Sent null json to stratum_broadcast"); @@ -805,14 +799,17 @@ static void stratum_broadcast(json_t *val) ck_rlock(&instance_lock); HASH_ITER(hh, stratum_instances, instance, tmp) { - stratum_msg_t *msg; + ckmsg_t *client_msg; + smsg_t *msg; if (!instance->authorised) continue; - msg = ckzalloc(sizeof(stratum_msg_t)); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = json_deep_copy(val); msg->client_id = instance->id; - DL_APPEND(bulk_send, msg); + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); } ck_runlock(&instance_lock); @@ -821,27 +818,23 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&stratum_send_lock); - if (stratum_sends) - DL_CONCAT(stratum_sends, bulk_send); + mutex_lock(&ssends->lock); + if (ssends->msgs) + DL_CONCAT(ssends->msgs, bulk_send); else - stratum_sends = bulk_send; - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ssends->msgs = bulk_send; + pthread_cond_signal(&ssends->cond); + mutex_unlock(&ssends->lock); } static void stratum_add_send(json_t *val, int client_id) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; msg->client_id = client_id; - - mutex_lock(&stratum_send_lock); - DL_APPEND(stratum_sends, msg); - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ckmsgq_add(ssends, msg); } static void drop_client(int id) @@ -1845,7 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val /* Unhandled message here */ } -static void parse_instance_msg(stratum_msg_t *msg) +static void parse_instance_msg(smsg_t *msg) { json_t *val = msg->json_msg, *id_val, *method, *params; int client_id = msg->client_id; @@ -1873,98 +1866,52 @@ out: free(msg); } -static void *stratum_receiver(void *arg) +static void srecv_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg; - - pthread_detach(pthread_self()); - rename_proc("sreceiver"); - - while (42) { - stratum_instance_t *instance; - - /* Pop the head off the list if it exists or wait for a conditional - * signal telling us there is work */ - mutex_lock(&stratum_recv_lock); - if (!stratum_recvs) - pthread_cond_wait(&stratum_recv_cond, &stratum_recv_lock); - msg = stratum_recvs; - if (likely(msg)) - DL_DELETE(stratum_recvs, msg); - mutex_unlock(&stratum_recv_lock); - - if (unlikely(!msg)) - continue; + stratum_instance_t *instance; - msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); - json_object_del(msg->json_msg, "client_id"); - - /* Parse the message here */ - ck_ilock(&instance_lock); - instance = __instance_by_id(msg->client_id); - if (!instance) { - /* client_id instance doesn't exist yet, create one */ - ck_ulock(&instance_lock); - instance = __stratum_add_instance(ckp, msg->client_id); - ck_dwilock(&instance_lock); - } - ck_uilock(&instance_lock); + msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); + json_object_del(msg->json_msg, "client_id"); - parse_instance_msg(msg); + /* Parse the message here */ + ck_ilock(&instance_lock); + instance = __instance_by_id(msg->client_id); + if (!instance) { + /* client_id instance doesn't exist yet, create one */ + ck_ulock(&instance_lock); + instance = __stratum_add_instance(ckp, msg->client_id); + ck_dwilock(&instance_lock); } + ck_uilock(&instance_lock); + + parse_instance_msg(msg); - return NULL; } -static void discard_stratum_msg(stratum_msg_t **msg) +static void discard_stratum_msg(smsg_t **msg) { json_decref((*msg)->json_msg); free(*msg); *msg = NULL; } -static void *stratum_sender(void *arg) +static void ssend_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg = NULL; - - pthread_detach(pthread_self()); - rename_proc("ssender"); - - while (42) { - char *s; - - if (msg) - discard_stratum_msg(&msg); - - mutex_lock(&stratum_send_lock); - if (!stratum_sends) - pthread_cond_wait(&stratum_send_cond, &stratum_send_lock); - msg = stratum_sends; - if (likely(msg)) - DL_DELETE(stratum_sends, msg); - mutex_unlock(&stratum_send_lock); - - if (unlikely(!msg)) - continue; - - if (unlikely(!msg->json_msg)) { - LOGERR("Sent null json msg to stratum_sender"); - continue; - } - - /* Add client_id to the json message and send it to the - * connector process to be delivered */ - json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); - s = json_dumps(msg->json_msg, 0); - send_proc(ckp->connector, s); - free(s); + char *s; - discard_stratum_msg(&msg); + if (unlikely(!msg->json_msg)) { + LOGERR("Sent null json msg to stratum_sender"); + free(msg); + return; } - return NULL; + /* Add client_id to the json message and send it to the + * connector process to be delivered */ + json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); + s = json_dumps(msg->json_msg, 0); + send_proc(ckp->connector, s); + free(s); + discard_stratum_msg(&msg); } static void discard_json_params(json_params_t **jp) @@ -2299,8 +2246,7 @@ static void *statsupdate(void *arg) int stratifier(proc_instance_t *pi) { - pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate; + pthread_t pth_blockupdate, pth_statsupdate; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2326,14 +2272,8 @@ int stratifier(proc_instance_t *pi) cklock_init(&instance_lock); - mutex_init(&stratum_send_lock); - cond_init(&stratum_send_cond); - create_pthread(&pth_stratum_sender, stratum_sender, ckp); - - mutex_init(&stratum_recv_lock); - cond_init(&stratum_recv_cond); - create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - + ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); diff --git a/src/stratifier.h b/src/stratifier.h index e6855878..0f41b91c 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,16 +10,6 @@ #ifndef STRATIFIER_H #define STRATIFIER_H -struct stratum_msg { - struct stratum_msg *next; - struct stratum_msg *prev; - - json_t *json_msg; - int client_id; -}; - -typedef struct stratum_msg stratum_msg_t; - int stratifier(proc_instance_t *pi); #endif /* STRATIFIER_H */