From 93ecc3a5c82c1496c18720212bc2f6370cff7d06 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Jun 2014 21:29:52 +1000 Subject: [PATCH] Convert the authoriser and share processor to using the generic ckmsg queues and fix the data passed to the queue function --- src/ckpool.c | 2 +- src/stratifier.c | 169 +++++++++++++++-------------------------------- 2 files changed, 54 insertions(+), 117 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 98203208..84639d9e 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -96,7 +96,7 @@ static void *ckmsg_queue(void *arg) if (unlikely(!msg)) continue; - ckmsgq->func(ckp, msg); + ckmsgq->func(ckp, msg->data); free(msg); } return NULL; diff --git a/src/stratifier.c b/src/stratifier.c index 749bd880..c992c8f3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -160,23 +160,16 @@ static char lasthash[68]; /* For protecting the stratum msg data */ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; -static pthread_mutex_t sshare_lock; -static pthread_mutex_t sauth_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; -static pthread_cond_t sshare_cond; -static pthread_cond_t sauth_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_sends; struct json_params { - struct json_params *next; - struct json_params *prev; - json_t *params; json_t *id_val; int client_id; @@ -184,9 +177,6 @@ struct json_params { typedef struct json_params json_params_t; -static json_params_t *sshares; -static json_params_t *sauths; - struct ckdb_msg { json_t *val; int idtype; @@ -195,6 +185,8 @@ struct ckdb_msg { typedef struct ckdb_msg ckdb_msg_t; static ckmsgq_t *ckdbq; +static ckmsgq_t *sshareq; +static ckmsgq_t *sauthq; static int user_instance_id; @@ -1826,11 +1818,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.auth", 11)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sauth_lock); - DL_APPEND(sauths, jp); - pthread_cond_signal(&sauth_cond); - mutex_unlock(&sauth_lock); - + ckmsgq_add(sauthq, jp); return; } @@ -1850,11 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (!strncasecmp(method, "mining.submit", 13)) { json_params_t *jp = create_json_params(client_id, params_val, id_val); - mutex_lock(&sshare_lock); - DL_APPEND(sshares, jp); - pthread_cond_signal(&sshare_cond); - mutex_unlock(&sshare_lock); - + ckmsgq_add(sshareq, jp); return; } @@ -1991,106 +1975,65 @@ static void discard_json_params(json_params_t **jp) *jp = NULL; } -static void *share_processor(void *arg) +static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) { - ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("sprocessor"); - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sshare_lock); - if (!sshares) - pthread_cond_wait(&sshare_cond, &sshare_lock); - jp = sshares; - if (likely(jp)) - DL_DELETE(sshares, jp); - mutex_unlock(&sshare_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); - continue; - } - json_msg = json_object(); - result_val = parse_submit(client, json_msg, jp->params, &err_val); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); + goto out; } - - return NULL; + json_msg = json_object(); + result_val = parse_submit(client, json_msg, jp->params, &err_val); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); } -static void *authoriser(void *arg) +static void sauth_process(ckpool_t *ckp, json_params_t *jp) { - ckpool_t *ckp = (ckpool_t *)arg; - json_params_t *jp = NULL; - - pthread_detach(pthread_self()); - rename_proc("authoriser"); - - while (42) { - json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; - int client_id; - char buf[256]; - - if (jp) - discard_json_params(&jp); - - mutex_lock(&sauth_lock); - if (!sauths) - pthread_cond_wait(&sauth_cond, &sauth_lock); - jp = sauths; - if (likely(jp)) - DL_DELETE(sauths, jp); - mutex_unlock(&sauth_lock); - - if (unlikely(!jp)) - continue; + json_t *result_val, *json_msg, *err_val = NULL; + stratum_instance_t *client; + int client_id; - client_id = jp->client_id; + client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); - if (unlikely(!client)) { - LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); - continue; - } - result_val = parse_authorise(client, jp->params, &err_val); - if (json_is_true(result_val)) { - snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, - client->user_instance->username); - stratum_send_message(client, buf); - } else - stratum_send_message(client, "Failed authorisation :("); - json_msg = json_object(); - json_object_set_new_nocheck(json_msg, "result", result_val); - json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + if (unlikely(!client)) { + LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); + goto out; } + result_val = parse_authorise(client, jp->params, &err_val); + if (json_is_true(result_val)) { + char *buf; + + ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, + client->user_instance->username); + stratum_send_message(client, buf); + } else + stratum_send_message(client, "Failed authorisation :("); + json_msg = json_object(); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); +out: + discard_json_params(&jp); - return NULL; } static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) @@ -2357,7 +2300,7 @@ static void *statsupdate(void *arg) int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; + pthread_t pth_statsupdate; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2391,14 +2334,8 @@ int stratifier(proc_instance_t *pi) cond_init(&stratum_recv_cond); create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - mutex_init(&sshare_lock); - cond_init(&sshare_cond); - create_pthread(&pth_share_processer, share_processor, ckp); - - mutex_init(&sauth_lock); - cond_init(&sauth_cond); - create_pthread(&pth_authoriser, authoriser, ckp); - + sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); + sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); cklock_init(&workbase_lock);