From 7055f3b5d597d5fd15b283a858683fbfbda6f0c6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 13 Jun 2014 15:18:30 +1000 Subject: [PATCH] Create separate threads for processing share and authorisation submissions since they will be waiting for dtabase responses --- src/stratifier.c | 294 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 281 insertions(+), 13 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 031be764..717d280e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -164,15 +164,33 @@ static char lasthash[68]; /* For protecting the stratum msg data */ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; +static pthread_mutex_t sshare_lock; +static pthread_mutex_t sauth_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; +static pthread_cond_t sshare_cond; +static pthread_cond_t sauth_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_sends; +struct json_params { + struct json_params *next; + struct json_params *prev; + + json_t *params; + json_t *id_val; + int client_id; +}; + +typedef struct json_params json_params_t; + +static json_params_t *sshares; +static json_params_t *sauths; + static int user_instance_id; struct user_instance { @@ -1677,6 +1695,7 @@ out: return json_boolean(result); } +#if 0 /* We should have already determined all the values passed to this are valid * by now. Set update if we should also send the latest stratum parameters */ static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_val, @@ -1731,6 +1750,7 @@ static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_v out: return ret; } +#endif /* Must enter with workbase_lock held */ static json_t *__stratum_notify(bool clean) @@ -1776,6 +1796,7 @@ static void stratum_send_update(int client_id, bool clean) stratum_add_send(json_msg, client_id); } +#if 0 static void parse_instance_msg(int client_id, json_t *msg) { json_t *result_val = NULL, *err_val = NULL, *id_val = NULL; @@ -1838,6 +1859,133 @@ out: stratum_send_diff(client); } } +#endif + +static void send_json_err(int client_id, json_t *id_val, const char *err_msg) +{ + json_t *val; + + val = json_pack("{soss}", "id", json_copy(id_val), "error", err_msg); + stratum_add_send(val, client_id); +} + +static void update_client(const int client_id) +{ + stratum_instance_t *client; + + stratum_send_update(client_id, true); + + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); + + if (likely(client)) + stratum_send_diff(client); +} + +static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val) +{ + json_params_t *jp = ckalloc(sizeof(json_params_t)); + + jp->params = json_deep_copy(params); + jp->id_val = json_deep_copy(id_val); + jp->client_id = client_id; + return jp; +} + +static void parse_method(const int client_id, json_t *id_val, json_t *method_val, + json_t *params_val) +{ + stratum_instance_t *client; + const char *method; + json_t *val; + + method = json_string_value(method_val); + if (!strncasecmp(method, "mining.subscribe", 16)) { + val = parse_subscribe(client_id, params_val); + if (!val) + return; + json_object_set(val, "id", id_val); + json_object_set(val, "error", json_null()); + stratum_add_send(val, client_id); + update_client(client_id); + return; + } + + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); + + if (unlikely(!client)) { + LOGINFO("Failed to find client id %d in hashtable!", client_id); + return; + } + + if (!strncasecmp(method, "mining.auth", 11)) { + json_params_t *jp = create_json_params(client_id, params_val, id_val); + + mutex_lock(&sauth_lock); + DL_APPEND(sauths, jp); + pthread_cond_signal(&sauth_cond); + mutex_unlock(&sauth_lock); + + return; + } + + /* We should only accept authorised requests from here on */ + if (!client->authorised) { + char buf[256]; + + /* Dropping unauthorised clients here also allows the + * stratifier process to restart since it will have lost all + * the stratum instance data. Clients will just reconnect. */ + LOGINFO("Dropping unauthorised client %d", client->id); + snprintf(buf, 255, "dropclient=%d", client->id); + send_proc(client->ckp->connector, buf); + return; + } + + if (!strncasecmp(method, "mining.submit", 13)) { + json_params_t *jp = create_json_params(client_id, params_val, id_val); + + mutex_lock(&sshare_lock); + DL_APPEND(sshares, jp); + pthread_cond_signal(&sshare_cond); + mutex_unlock(&sshare_lock); + + return; + } + + /* Unhandled message here */ +} + +static void parse_instance_msg(stratum_msg_t *msg) +{ + json_t *val = msg->json_msg, *id_val, *method, *params; + int client_id = msg->client_id; + + /* Return back the same id_val even if it's null or not existent. */ + id_val = json_object_get(val, "id"); + + method = json_object_get(val, "method"); + if (unlikely(!method)) { + send_json_err(client_id, id_val, "-3:method not found"); + goto out; + } + if (unlikely(!json_is_string(method))) { + send_json_err(client_id, id_val, "-1:method is not string"); + goto out; + } + params = json_object_get(val, "params"); + if (unlikely(!params)) { + send_json_err(client_id, id_val, "-1:params not found"); + goto out; + } + parse_method(client_id, id_val, method, params); +out: + json_decref(val); + free(msg); +} static void *stratum_receiver(void *arg) { @@ -1846,6 +1994,7 @@ static void *stratum_receiver(void *arg) rename_proc("sreceiver"); + sleep(1); while (42) { stratum_instance_t *instance; @@ -1876,25 +2025,32 @@ static void *stratum_receiver(void *arg) } ck_uilock(&instance_lock); - parse_instance_msg(msg->client_id, msg->json_msg); - - json_decref(msg->json_msg); - free(msg); + parse_instance_msg(msg); } return NULL; } +static void discard_stratum_msg(stratum_msg_t **msg) +{ + json_decref((*msg)->json_msg); + free(*msg); + *msg = NULL; +} + static void *stratum_sender(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + stratum_msg_t *msg = NULL; rename_proc("ssender"); while (42) { - stratum_msg_t *msg; char *s; + if (msg) + discard_stratum_msg(&msg); + mutex_lock(&stratum_send_lock); if (!stratum_sends) pthread_cond_wait(&stratum_send_cond, &stratum_send_lock); @@ -1908,7 +2064,6 @@ static void *stratum_sender(void *arg) if (unlikely(!msg->json_msg)) { LOGERR("Sent null json msg to stratum_sender"); - free(msg); continue; } @@ -1919,8 +2074,111 @@ static void *stratum_sender(void *arg) send_proc(ckp->connector, s); free(s); - json_decref(msg->json_msg); - free(msg); + discard_stratum_msg(&msg); + } + + return NULL; +} + +static void discard_json_params(json_params_t **jp) +{ + json_decref((*jp)->params); + json_decref((*jp)->id_val); + free(*jp); + *jp = NULL; +} + +static void *share_processor(void *arg) +{ + ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; + json_params_t *jp = NULL; + + rename_proc("sprocessor"); + while (42) { + json_t *result_val, *json_msg, *err_val; + stratum_instance_t *client; + int client_id; + + if (jp) + discard_json_params(&jp); + + mutex_lock(&sauth_lock); + if (!sshares) + pthread_cond_wait(&sshare_cond, &sshare_lock); + jp = sshares; + mutex_unlock(&sshare_lock); + + if (unlikely(!jp)) + continue; + + client_id = jp->client_id; + + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); + + if (unlikely(!client)) { + LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); + continue; + } + json_msg = json_object(); + result_val = parse_submit(client, json_msg, jp->params, &err_val); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); + } + + return NULL; +} + +static void *authoriser(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + json_params_t *jp = NULL; + + rename_proc("authoriser"); + + while (42) { + json_t *result_val, *json_msg, *err_val; + stratum_instance_t *client; + int client_id; + char buf[256]; + + if (jp) + discard_json_params(&jp); + + mutex_lock(&sauth_lock); + if (!sauths) + pthread_cond_wait(&sauth_cond, &sauth_lock); + jp = sauths; + mutex_unlock(&sauth_lock); + + if (unlikely(!jp)) + continue; + + client_id = jp->client_id; + + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); + + if (unlikely(!client)) { + LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); + continue; + } + result_val = parse_authorise(client, jp->params, &err_val); + if (json_is_true(result_val)) { + snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, + client->user_instance->username); + stratum_send_message(client, buf); + } else + stratum_send_message(client, "Failed authorisation :("); + json_msg = json_object(); + json_object_set_new_nocheck(json_msg, "result", result_val); + json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); + json_object_set_nocheck(json_msg, "id", jp->id_val); + stratum_add_send(json_msg, client_id); } return NULL; @@ -2148,7 +2406,7 @@ static void load_users(ckpool_t *ckp) int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; - pthread_t pth_statsupdate; + pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2174,14 +2432,18 @@ int stratifier(proc_instance_t *pi) cklock_init(&instance_lock); - mutex_init(&stratum_recv_lock); - cond_init(&stratum_recv_cond); - create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - mutex_init(&stratum_send_lock); cond_init(&stratum_send_cond); create_pthread(&pth_stratum_sender, stratum_sender, ckp); + mutex_init(&sshare_lock); + cond_init(&sshare_cond); + create_pthread(&pth_share_processer, share_processor, ckp); + + mutex_init(&sauth_lock); + cond_init(&sauth_cond); + create_pthread(&pth_authoriser, authoriser, ckp); + cklock_init(&workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp); @@ -2193,6 +2455,12 @@ int stratifier(proc_instance_t *pi) load_users(ckp); + /* Start the receiver last to only process requests once all structures + * are set up. */ + mutex_init(&stratum_recv_lock); + cond_init(&stratum_recv_cond); + create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); + ret = stratum_loop(ckp, pi); out: return process_exit(ckp, pi, ret);