diff --git a/src/ckpool.h b/src/ckpool.h index 0a72eb36..680b65ea 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -256,7 +256,8 @@ enum stratum_msgtype { SM_AUTH, SM_AUTHRESULT, SM_TXNS, - SM_TXNSRESULT + SM_TXNSRESULT, + SM_NONE }; static const char __maybe_unused *stratum_msgs[] = { @@ -272,7 +273,8 @@ static const char __maybe_unused *stratum_msgs[] = { "auth", "auth.result", "txns", - "txns.result" + "txns.result", + "" }; #ifdef USE_CKDB diff --git a/src/connector.c b/src/connector.c index 189bb6da..ea3d84d4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -504,10 +504,10 @@ reparse: * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ if (likely(!client->invalid)) { - if (ckp->node || !ckp->passthrough) - send_proc(ckp->stratifier, s); if (ckp->passthrough) send_proc(ckp->generator, s); + else + send_proc(ckp->stratifier, s); } free(s); @@ -938,9 +938,9 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf) { - char *msg, *node_msg; int64_t client_id; json_t *json_msg; + char *msg; json_msg = json_loads(buf, 0, NULL); if (unlikely(!json_msg)) { @@ -953,11 +953,8 @@ static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf) json_object_del(json_msg, "client_id"); /* Put client_id back in for a passthrough subclient, passing its * upstream client_id instead of the passthrough's. */ - if (ckp->node || client_id > 0xffffffffll) { + if (client_id > 0xffffffffll) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); - if (json_get_string(&node_msg, json_msg, "node.method")) - LOGDEBUG("Got node method %s", node_msg); - } msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); diff --git a/src/generator.c b/src/generator.c index 36cd2647..e46a4746 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1980,6 +1980,27 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } +static int node_msg_type(json_t *val) +{ + int i, ret = -1; + char *node_msg; + + if (!val) + goto out; + if (!json_get_string(&node_msg, val, "node.method")) + goto out; + for (i = 0; i < SM_NONE; i++) { + if (!strcmp(node_msg, stratum_msgs[i])) { + ret = i; + LOGWARNING("Got node method %d:%s", i, node_msg); + break; + } + } + json_object_del(val, "node.method"); +out: + return ret; +} + /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -2022,6 +2043,10 @@ static void *passthrough_recv(void *arg) Close(cs->fd); continue; } + if (ckp->node) { + json_t *val = json_loads(cs->buf, 0, NULL); + int msg_type = node_msg_type(val); + } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ diff --git a/src/stratifier.c b/src/stratifier.c index 2180a743..8a2abab4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -6240,10 +6240,14 @@ int stratifier(proc_instance_t *pi) threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); /* Create 1/4 as many stratum processing threads as there are CPUs */ - threads = threads / 2 ? : 1; + if (ckp->node) + threads = 1; + else { + threads = threads / 2 ? : 1; + sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); + } sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);