Browse Source

Detect node message type in the generator

master
Con Kolivas 9 years ago
parent
commit
c5718d510e
  1. 6
      src/ckpool.h
  2. 11
      src/connector.c
  3. 25
      src/generator.c
  4. 6
      src/stratifier.c

6
src/ckpool.h

@ -256,7 +256,8 @@ enum stratum_msgtype {
SM_AUTH, SM_AUTH,
SM_AUTHRESULT, SM_AUTHRESULT,
SM_TXNS, SM_TXNS,
SM_TXNSRESULT SM_TXNSRESULT,
SM_NONE
}; };
static const char __maybe_unused *stratum_msgs[] = { static const char __maybe_unused *stratum_msgs[] = {
@ -272,7 +273,8 @@ static const char __maybe_unused *stratum_msgs[] = {
"auth", "auth",
"auth.result", "auth.result",
"txns", "txns",
"txns.result" "txns.result",
""
}; };
#ifdef USE_CKDB #ifdef USE_CKDB

11
src/connector.c

@ -504,10 +504,10 @@ reparse:
* do this unlocked as the occasional false negative can be * do this unlocked as the occasional false negative can be
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(!client->invalid)) { if (likely(!client->invalid)) {
if (ckp->node || !ckp->passthrough)
send_proc(ckp->stratifier, s);
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
else
send_proc(ckp->stratifier, s);
} }
free(s); free(s);
@ -938,9 +938,9 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf) static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf)
{ {
char *msg, *node_msg;
int64_t client_id; int64_t client_id;
json_t *json_msg; json_t *json_msg;
char *msg;
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
@ -953,11 +953,8 @@ static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf)
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
/* Put client_id back in for a passthrough subclient, passing its /* Put client_id back in for a passthrough subclient, passing its
* upstream client_id instead of the passthrough's. */ * upstream client_id instead of the passthrough's. */
if (ckp->node || client_id > 0xffffffffll) { if (client_id > 0xffffffffll)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
if (json_get_string(&node_msg, json_msg, "node.method"))
LOGDEBUG("Got node method %s", node_msg);
}
msg = json_dumps(json_msg, JSON_EOL); msg = json_dumps(json_msg, JSON_EOL);
send_client(cdata, client_id, msg); send_client(cdata, client_id, msg);

25
src/generator.c

@ -1980,6 +1980,27 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi); create_pthread(&pth, proxy_reconnect, proxi);
} }
static int node_msg_type(json_t *val)
{
int i, ret = -1;
char *node_msg;
if (!val)
goto out;
if (!json_get_string(&node_msg, val, "node.method"))
goto out;
for (i = 0; i < SM_NONE; i++) {
if (!strcmp(node_msg, stratum_msgs[i])) {
ret = i;
LOGWARNING("Got node method %d:%s", i, node_msg);
break;
}
}
json_object_del(val, "node.method");
out:
return ret;
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -2022,6 +2043,10 @@ static void *passthrough_recv(void *arg)
Close(cs->fd); Close(cs->fd);
continue; continue;
} }
if (ckp->node) {
json_t *val = json_loads(cs->buf, 0, NULL);
int msg_type = node_msg_type(val);
}
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */

6
src/stratifier.c

@ -6240,10 +6240,14 @@ int stratifier(proc_instance_t *pi)
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */ /* Create 1/4 as many stratum processing threads as there are CPUs */
if (ckp->node)
threads = 1;
else {
threads = threads / 2 ? : 1; threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
}
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save