From b09bb253fc582e75147a81c9dde8b08ad502632f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 29 Dec 2015 15:27:44 +1100 Subject: [PATCH] Act as a passthrough in node mode, passing what type of stratum message we're passing through --- src/ckpool.c | 2 +- src/ckpool.h | 32 ++++++++++++++++++++++++++++ src/connector.c | 16 ++++++++------ src/stratifier.c | 55 +++++++++++++++++++++++++++--------------------- 4 files changed, 74 insertions(+), 31 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 384b8a00..15a9a5bf 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1641,7 +1641,7 @@ int main(int argc, char **argv) case 'N': if (ckp.proxy || ckp.redirector || ckp.userproxy || ckp.passthrough) quit(1, "Cannot set another proxy type or redirector and node mode"); - ckp.standalone = ckp.proxy = ckp.node = true; + ckp.standalone = ckp.proxy = ckp.passthrough = ckp.node = true; break; case 'n': ckp.name = optarg; diff --git a/src/ckpool.h b/src/ckpool.h index cde4f0aa..0a72eb36 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -243,6 +243,38 @@ struct ckpool_instance { void *data; }; +enum stratum_msgtype { + SM_RECONNECT = 0, + SM_DIFF, + SM_MSG, + SM_UPDATE, + SM_ERROR, + SM_SUBSCRIBE, + SM_SUBSCRIBERESULT, + SM_SHARE, + SM_SHARERESULT, + SM_AUTH, + SM_AUTHRESULT, + SM_TXNS, + SM_TXNSRESULT +}; + +static const char __maybe_unused *stratum_msgs[] = { + "reconnect", + "diff", + "message", + "update", + "error", + "subscribe", + "subscribe.result", + "share", + "share.result", + "auth", + "auth.result", + "txns", + "txns.result" +}; + #ifdef USE_CKDB #define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #else diff --git a/src/connector.c b/src/connector.c index d932855d..189bb6da 100644 --- a/src/connector.c +++ b/src/connector.c @@ -504,10 +504,10 @@ reparse: * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ if (likely(!client->invalid)) { + if (ckp->node || !ckp->passthrough) + send_proc(ckp->stratifier, s); if (ckp->passthrough) send_proc(ckp->generator, s); - else - send_proc(ckp->stratifier, s); } free(s); @@ -936,11 +936,11 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) send_client(cdata, client->id, buf); } -static void process_client_msg(cdata_t *cdata, const char *buf) +static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf) { + char *msg, *node_msg; int64_t client_id; json_t *json_msg; - char *msg; json_msg = json_loads(buf, 0, NULL); if (unlikely(!json_msg)) { @@ -953,8 +953,12 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_object_del(json_msg, "client_id"); /* Put client_id back in for a passthrough subclient, passing its * upstream client_id instead of the passthrough's. */ - if (client_id > 0xffffffffll) + if (ckp->node || client_id > 0xffffffffll) { json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); + if (json_get_string(&node_msg, json_msg, "node.method")) + LOGDEBUG("Got node method %s", node_msg); + } + msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); json_decref(json_msg); @@ -1055,7 +1059,7 @@ retry: /* The bulk of the messages will be json messages to send to clients * so look for them first. */ if (likely(buf[0] == '{')) { - process_client_msg(cdata, buf); + process_client_msg(ckp, cdata, buf); } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; diff --git a/src/stratifier.c b/src/stratifier.c index 172f5d57..2180a743 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2192,11 +2192,21 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val) mutex_unlock(ssends->lock); } -static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id) +/* passthrough subclients have client_ids in the high bits */ +static inline bool passthrough_subclient(const int64_t client_id) { - smsg_t *msg; + return (client_id > 0xffffffffll); +} - msg = ckzalloc(sizeof(smsg_t)); +static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id, + const int msg_type) +{ + smsg_t *msg = ckzalloc(sizeof(smsg_t)); + ckpool_t *ckp = sdata->ckp; + + if (ckp->node || passthrough_subclient(client_id)) + json_set_string(val, "node.method", stratum_msgs[msg_type]); + LOGDEBUG("Sending stratum message %s", stratum_msgs[msg_type]); msg->json_msg = val; msg->client_id = client_id; ckmsgq_add(sdata->ssends, msg); @@ -2595,7 +2605,7 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) client->reconnect_request = time(NULL); JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); - stratum_add_send(sdata, json_msg, client->id); + stratum_add_send(sdata, json_msg, client->id, SM_RECONNECT); } static void dead_proxy(sdata_t *sdata, const char *buf) @@ -3520,12 +3530,6 @@ out_unlock: return ret; } -/* passthrough subclients have client_ids in the high bits */ -static inline bool passthrough_subclient(const int64_t client_id) -{ - return (client_id > 0xffffffffll); -} - /* Extranonce1 must be set here. Needs to be entered with client holding a ref * count. */ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_id, const json_t *params_val) @@ -4251,7 +4255,7 @@ static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client) JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(), "method", "mining.set_difficulty"); - stratum_add_send(sdata, json_msg, client->id); + stratum_add_send(sdata, json_msg, client->id, SM_DIFF); } /* Needs to be entered with client holding a ref count. */ @@ -4261,7 +4265,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", "params", msg); - stratum_add_send(sdata, json_msg, client->id); + stratum_add_send(sdata, json_msg, client->id, SM_MSG); } static double time_bias(const double tdiff, const double period) @@ -4897,7 +4901,7 @@ static void stratum_send_update(sdata_t *sdata, const int64_t client_id, const b json_msg = __stratum_notify(sdata->current_workbase, clean); ck_runlock(&sdata->workbase_lock); - stratum_add_send(sdata, json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id, SM_UPDATE); } static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_val, const char *err_msg) @@ -4905,7 +4909,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va json_t *val; JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); - stratum_add_send(sdata, val, client_id); + stratum_add_send(sdata, val, client_id, SM_ERROR); } /* Needs to be entered with client holding a ref count. */ @@ -5028,6 +5032,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie return; } + if (cmdmatch(method, "mining.term")) { + LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + drop_client(ckp, sdata, client_id); + return; + } + + if (ckp->node) + return; + if (cmdmatch(method, "mining.subscribe")) { json_t *val, *result_val; @@ -5046,7 +5059,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie json_object_set_new_nocheck(val, "result", result_val); json_object_set_nocheck(val, "id", id_val); json_object_set_new_nocheck(val, "error", json_null()); - stratum_add_send(sdata, val, client_id); + stratum_add_send(sdata, val, client_id, SM_SUBSCRIBERESULT); if (likely(client->subscribed)) init_client(sdata, client, client_id); return; @@ -5118,12 +5131,6 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie return; } - if (cmdmatch(method, "mining.term")) { - LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); - drop_client(ckp, sdata, client_id); - return; - } - /* Unhandled message here */ LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); return; @@ -5319,7 +5326,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); steal_json_id(json_msg, jp); - stratum_add_send(sdata, json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id, SM_SHARERESULT); out_decref: dec_instance_ref(sdata, client); out: @@ -5381,7 +5388,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); steal_json_id(json_msg, jp); - stratum_add_send(sdata, json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT); if (!json_is_true(result_val) || !client->suggest_diff) goto out; @@ -5596,7 +5603,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) } else json_set_string(val, "error", "Invalid job_id"); out_send: - stratum_add_send(sdata, val, jp->client_id); + stratum_add_send(sdata, val, jp->client_id, SM_TXNSRESULT); out: if (client) dec_instance_ref(sdata, client);