diff --git a/src/ckpool.c b/src/ckpool.c index ad5b0ba7..84639d9e 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -74,6 +74,62 @@ void logmsg(int loglevel, const char *fmt, ...) { } } +/* Generic function for creating a message queue receiving and parsing thread */ +static void *ckmsg_queue(void *arg) +{ + ckmsgq_t *ckmsgq = (ckmsgq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckmsg_t *msg; + + mutex_lock(&ckmsgq->lock); + if (!ckmsgq->msgs) + pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + msg = ckmsgq->msgs; + if (likely(msg)) + DL_DELETE(ckmsgq->msgs, msg); + mutex_unlock(&ckmsgq->lock); + + if (unlikely(!msg)) + continue; + ckmsgq->func(ckp, msg->data); + free(msg); + } + return NULL; +} + +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); + + strncpy(ckmsgq->name, name, 15); + ckmsgq->func = func; + ckmsgq->ckp = ckp; + mutex_init(&ckmsgq->lock); + cond_init(&ckmsgq->cond); + create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); + + return ckmsgq; +} + +/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq + * parsing thread to wake up and process it. */ +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +{ + ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); + + msg->data = data; + + mutex_lock(&ckmsgq->lock); + DL_APPEND(ckmsgq->msgs, msg); + pthread_cond_signal(&ckmsgq->cond); + mutex_unlock(&ckmsgq->lock); +} + /* Listen for incoming global requests. Always returns a response if possible */ static void *listener(void *arg) { diff --git a/src/ckpool.h b/src/ckpool.h index 804095a0..f74481a0 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -126,6 +126,28 @@ struct ckpool_instance { char **proxypass; }; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(ckpool_t *, void *); +}; + +typedef struct ckmsgq ckmsgq_t; +ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); + ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); diff --git a/src/generator.c b/src/generator.c index cee2ecb9..af2fa6aa 100644 --- a/src/generator.c +++ b/src/generator.c @@ -18,7 +18,6 @@ #include "libckpool.h" #include "generator.h" #include "bitcoin.h" -#include "stratifier.h" #include "uthash.h" #include "utlist.h" @@ -54,6 +53,16 @@ struct share_msg { typedef struct share_msg share_msg_t; +struct stratum_msg { + struct stratum_msg *next; + struct stratum_msg *prev; + + json_t *json_msg; + int client_id; +}; + +typedef struct stratum_msg stratum_msg_t; + /* Per proxied pool instance data */ struct proxy_instance { ckpool_t *ckp; diff --git a/src/libckpool.c b/src/libckpool.c index 3fc28643..604c8ec0 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -32,6 +32,7 @@ #include "ckpool.h" #include "libckpool.h" #include "sha2.h" +#include "utlist.h" #ifndef UNIX_PATH_MAX #define UNIX_PATH_MAX 108 @@ -75,7 +76,6 @@ void join_pthread(pthread_t thread) pthread_join(thread, NULL); } - /* Place holders for when we add lock debugging */ #define GETLOCK(_lock, _file, _func, _line) #define GOTLOCK(_lock, _file, _func, _line) diff --git a/src/libckpool.h b/src/libckpool.h index 8672d943..888bacdb 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -32,6 +32,8 @@ # include #endif +#include "utlist.h" + #ifndef bswap_16 #define bswap_16 __builtin_bswap16 #define bswap_32 __builtin_bswap32 diff --git a/src/stratifier.c b/src/stratifier.c index 5327963f..49bbe00e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -157,28 +157,7 @@ static int64_t workbase_id; static int64_t blockchange_id; static char lasthash[68]; -/* For protecting the stratum msg data */ -static pthread_mutex_t stratum_recv_lock; -static pthread_mutex_t stratum_send_lock; -static pthread_mutex_t sshare_lock; -static pthread_mutex_t sauth_lock; -static pthread_mutex_t ckdbq_lock; - -/* For signalling the threads to wake up and do work */ -static pthread_cond_t stratum_recv_cond; -static pthread_cond_t stratum_send_cond; -static pthread_cond_t sshare_cond; -static pthread_cond_t sauth_cond; -static pthread_cond_t ckdbq_cond; - -/* For the linked list of all queued messages */ -static stratum_msg_t *stratum_recvs; -static stratum_msg_t *stratum_sends; - struct json_params { - struct json_params *next; - struct json_params *prev; - json_t *params; json_t *id_val; int client_id; @@ -186,20 +165,26 @@ struct json_params { typedef struct json_params json_params_t; -static json_params_t *sshares; -static json_params_t *sauths; - struct ckdb_msg { - struct ckdb_msg *next; - struct ckdb_msg *prev; - json_t *val; int idtype; }; typedef struct ckdb_msg ckdb_msg_t; -static ckdb_msg_t *ckdb_msgs; +/* Stratum json messages with their associated client id */ +struct smsg { + json_t *json_msg; + int client_id; +}; + +typedef struct smsg smsg_t; + +static ckmsgq_t *ssends; // Stratum sends +static ckmsgq_t *srecvs; // Stratum receives +static ckmsgq_t *ckdbq; // ckdb +static ckmsgq_t *sshareq; // Stratum share sends +static ckmsgq_t *sauthq; // Stratum authorisations static int user_instance_id; @@ -427,11 +412,7 @@ static void ckdbq_add(const int idtype, json_t *val) msg->val = val; msg->idtype = idtype; - - mutex_lock(&ckdbq_lock); - DL_APPEND(ckdb_msgs, msg); - pthread_cond_signal(&ckdbq_cond); - mutex_unlock(&ckdbq_lock); + ckmsgq_add(ckdbq, msg); } static void send_workinfo(ckpool_t *ckp, workbase_t *wb) @@ -796,15 +777,11 @@ out: static void stratum_add_recvd(json_t *val) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; - - mutex_lock(&stratum_recv_lock); - DL_APPEND(stratum_recvs, msg); - pthread_cond_signal(&stratum_recv_cond); - mutex_unlock(&stratum_recv_lock); + ckmsgq_add(srecvs, msg); } /* For creating a list of sends without locking that can then be concatenated @@ -813,7 +790,7 @@ static void stratum_add_recvd(json_t *val) static void stratum_broadcast(json_t *val) { stratum_instance_t *instance, *tmp; - stratum_msg_t *bulk_send = NULL; + ckmsg_t *bulk_send = NULL; if (unlikely(!val)) { LOGERR("Sent null json to stratum_broadcast"); @@ -822,14 +799,17 @@ static void stratum_broadcast(json_t *val) ck_rlock(&instance_lock); HASH_ITER(hh, stratum_instances, instance, tmp) { - stratum_msg_t *msg; + ckmsg_t *client_msg; + smsg_t *msg; if (!instance->authorised) continue; - msg = ckzalloc(sizeof(stratum_msg_t)); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = json_deep_copy(val); msg->client_id = instance->id; - DL_APPEND(bulk_send, msg); + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); } ck_runlock(&instance_lock); @@ -838,27 +818,23 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&stratum_send_lock); - if (stratum_sends) - DL_CONCAT(stratum_sends, bulk_send); + mutex_lock(&ssends->lock); + if (ssends->msgs) + DL_CONCAT(ssends->msgs, bulk_send); else - stratum_sends = bulk_send; - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ssends->msgs = bulk_send; + pthread_cond_signal(&ssends->cond); + mutex_unlock(&ssends->lock); } static void stratum_add_send(json_t *val, int client_id) { - stratum_msg_t *msg; + smsg_t *msg; - msg = ckzalloc(sizeof(stratum_msg_t)); + msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; msg->client_id = client_id; - - mutex_lock(&stratum_send_lock); - DL_APPEND(stratum_sends, msg); - pthread_cond_signal(&stratum_send_cond); - mutex_unlock(&stratum_send_lock); + ckmsgq_add(ssends, msg); } static void drop_client(int id) @@ -1835,11 +1811,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.auth", 11)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sauth_lock); - DL_APPEND(sauths, jp); - pthread_cond_signal(&sauth_cond); - mutex_unlock(&sauth_lock); - + ckmsgq_add(sauthq, jp); return; } @@ -1859,18 +1831,14 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.submit", 13)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sshare_lock); - DL_APPEND(sshares, jp); - pthread_cond_signal(&sshare_cond); - mutex_unlock(&sshare_lock); - + ckmsgq_add(sshareq, jp); return; } /* Unhandled message here */ } -static void parse_instance_msg(stratum_msg_t *msg) +static void parse_instance_msg(smsg_t *msg) { json_t *val = msg->json_msg, *id_val, *method, *params; int client_id = msg->client_id; @@ -1898,98 +1866,52 @@ out: free(msg); } -static void *stratum_receiver(void *arg) +static void srecv_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg; - - pthread_detach(pthread_self()); - rename_proc("sreceiver"); - - while (42) { - stratum_instance_t *instance; - - /* Pop the head off the list if it exists or wait for a conditional - * signal telling us there is work */ - mutex_lock(&stratum_recv_lock); - if (!stratum_recvs) - pthread_cond_wait(&stratum_recv_cond, &stratum_recv_lock); - msg = stratum_recvs; - if (likely(msg)) - DL_DELETE(stratum_recvs, msg); - mutex_unlock(&stratum_recv_lock); - - if (unlikely(!msg)) - continue; + stratum_instance_t *instance; - msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); - json_object_del(msg->json_msg, "client_id"); - - /* Parse the message here */ - ck_ilock(&instance_lock); - instance = __instance_by_id(msg->client_id); - if (!instance) { - /* client_id instance doesn't exist yet, create one */ - ck_ulock(&instance_lock); - instance = __stratum_add_instance(ckp, msg->client_id); - ck_dwilock(&instance_lock); - } - ck_uilock(&instance_lock); + msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); + json_object_del(msg->json_msg, "client_id"); - parse_instance_msg(msg); + /* Parse the message here */ + ck_ilock(&instance_lock); + instance = __instance_by_id(msg->client_id); + if (!instance) { + /* client_id instance doesn't exist yet, create one */ + ck_ulock(&instance_lock); + instance = __stratum_add_instance(ckp, msg->client_id); + ck_dwilock(&instance_lock); } + ck_uilock(&instance_lock); + + parse_instance_msg(msg); - return NULL; } -static void discard_stratum_msg(stratum_msg_t **msg) +static void discard_stratum_msg(smsg_t **msg) { json_decref((*msg)->json_msg); free(*msg); *msg = NULL; } -static void *stratum_sender(void *arg) +static void ssend_process(ckpool_t *ckp, smsg_t *msg) { - ckpool_t *ckp = (ckpool_t *)arg; - stratum_msg_t *msg = NULL; - - pthread_detach(pthread_self()); - rename_proc("ssender"); - - while (42) { - char *s; - - if (msg) - discard_stratum_msg(&msg); - - mutex_lock(&stratum_send_lock); - if (!stratum_sends) - pthread_cond_wait(&stratum_send_cond, &stratum_send_lock); - msg = stratum_sends; - if (likely(msg)) - DL_DELETE(stratum_sends, msg); - mutex_unlock(&stratum_send_lock); - - if (unlikely(!msg)) - continue; - - if (unlikely(!msg->json_msg)) { - LOGERR("Sent null json msg to stratum_sender"); - continue; - } - - /* Add client_id to the json message and send it to the - * connector process to be delivered */ - json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); - s = json_dumps(msg->json_msg, 0); - send_proc(ckp->connector, s); - free(s); + char *s; - discard_stratum_msg(&msg); + if (unlikely(!msg->json_msg)) { + LOGERR("Sent null json msg to stratum_sender"); + free(msg); + return; } - return NULL; + /* Add client_id to the json message and send it to the + * connector process to be delivered */ + json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); + s = json_dumps(msg->json_msg, 0); + send_proc(ckp->connector, s); + free(s); + discard_stratum_msg(&msg); } static void discard_json_params(json_params_t **jp) @@ -2000,144 +1922,82 @@ static void discard_json_params(json_params_t **jp) *jp = NULL; } -static void *share_processor(void *arg) +static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) { - ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("sprocessor"); - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sshare_lock); - if (!sshares) - pthread_cond_wait(&sshare_cond, &sshare_lock); - jp = sshares; - if (likely(jp)) - DL_DELETE(sshares, jp); - mutex_unlock(&sshare_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); - continue; - } - json_msg = json_object(); - result_val = parse_submit(client, json_msg, jp->params, &err_val); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); + goto out; } - - return NULL; + json_msg = json_object(); + result_val = parse_submit(client, json_msg, jp->params, &err_val); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); } -static void *authoriser(void *arg) +static void sauth_process(ckpool_t *ckp, json_params_t *jp) { - ckpool_t *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("authoriser"); - - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - char buf[256]; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sauth_lock); - if (!sauths) - pthread_cond_wait(&sauth_cond, &sauth_lock); - jp = sauths; - if (likely(jp)) - DL_DELETE(sauths, jp); - mutex_unlock(&sauth_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); - continue; - } - result_val = parse_authorise(client, jp->params, &err_val); - if (json_is_true(result_val)) { - snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, - client->user_instance->username); - stratum_send_message(client, buf); - } else - stratum_send_message(client, "Failed authorisation :("); - json_msg = json_object(); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); + goto out; } + result_val = parse_authorise(client, jp->params, &err_val); + if (json_is_true(result_val)) { + char *buf; + + ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, + client->user_instance->username); + stratum_send_message(client, buf); + } else + stratum_send_message(client, "Failed authorisation :("); + json_msg = json_object(); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); - return NULL; } -static void *ckdbqueue(void *arg) +static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) { - ckpool_t *ckp = (ckpool_t *)arg; - - pthread_detach(pthread_self()); - rename_proc("ckdbqueue"); - - while (42) { - bool logged = false; - char *buf = NULL; - ckdb_msg_t *msg; - - mutex_lock(&ckdbq_lock); - if (!ckdb_msgs) - pthread_cond_wait(&ckdbq_cond, &ckdbq_lock); - msg = ckdb_msgs; - if (likely(msg)) - DL_DELETE(ckdb_msgs, msg); - mutex_unlock(&ckdbq_lock); - - if (unlikely(!msg)) - continue; + bool logged = false; + char *buf = NULL; - while (!buf) { - buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val, logged); - if (unlikely(!buf)) { - LOGWARNING("Failed to talk to ckdb, queueing messages"); - sleep(5); - } - logged = true; + while (!buf) { + buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); + if (unlikely(!buf)) { + LOGWARNING("Failed to talk to ckdb, queueing messages"); + sleep(5); } - LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); - free(buf); + logged = true; } - - return NULL; + LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); + free(buf); } static const double nonces = 4294967296; @@ -2386,9 +2246,7 @@ static void *statsupdate(void *arg) int stratifier(proc_instance_t *pi) { - pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; - pthread_t pth_ckdbqueue; + pthread_t pth_blockupdate, pth_statsupdate; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2414,25 +2272,11 @@ int stratifier(proc_instance_t *pi) cklock_init(&instance_lock); - mutex_init(&stratum_send_lock); - cond_init(&stratum_send_cond); - create_pthread(&pth_stratum_sender, stratum_sender, ckp); - - mutex_init(&stratum_recv_lock); - cond_init(&stratum_recv_cond); - create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - - mutex_init(&sshare_lock); - cond_init(&sshare_cond); - create_pthread(&pth_share_processer, share_processor, ckp); - - mutex_init(&sauth_lock); - cond_init(&sauth_cond); - create_pthread(&pth_authoriser, authoriser, ckp); - - mutex_init(&ckdbq_lock); - cond_init(&ckdbq_cond); - create_pthread(&pth_ckdbqueue, ckdbqueue, ckp); + ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); + sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); + sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock); if (!ckp->proxy) diff --git a/src/stratifier.h b/src/stratifier.h index e6855878..0f41b91c 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,16 +10,6 @@ #ifndef STRATIFIER_H #define STRATIFIER_H -struct stratum_msg { - struct stratum_msg *next; - struct stratum_msg *prev; - - json_t *json_msg; - int client_id; -}; - -typedef struct stratum_msg stratum_msg_t; - int stratifier(proc_instance_t *pi); #endif /* STRATIFIER_H */