diff --git a/src/connector.c b/src/connector.c index 5f1b1f06..d4505347 100644 --- a/src/connector.c +++ b/src/connector.c @@ -20,6 +20,7 @@ #include "libckpool.h" #include "uthash.h" #include "utlist.h" +#include "stratifier.h" #define MAX_MSGSIZE 1024 @@ -529,14 +530,17 @@ reparse: * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ if (likely(!client->invalid)) { - if (!ckp->passthrough || ckp->node) - send_proc(ckp->stratifier, s); + if (!ckp->passthrough || ckp->node) { + stratifier_add_recv(ckp, val); + val = NULL; + } if (ckp->passthrough) send_proc(ckp->generator, s); } free(s); - json_decref(val); + if (val) + json_decref(val); } client->bufofs -= buflen; if (client->bufofs) @@ -965,17 +969,13 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) } if (ckp->node) { json_t *val = json_loads(buf, 0, NULL); - char *msg; if (!val) // Can happen if client sent invalid json message goto out; json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "server", json_integer(client->server)); - msg = json_dumps(val, JSON_COMPACT); - json_decref(val); - send_proc(ckp->stratifier, msg); - free(msg); + stratifier_add_recv(ckp, val); } if (ckp->redirector && !client->redirected) test_redirector_shares(ckp, client, buf); diff --git a/src/stratifier.c b/src/stratifier.c index 10daa208..ab12aff2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3914,7 +3914,7 @@ static void get_poolstats(sdata_t *sdata, int *sockd) _Close(sockd); } -static void srecv_process(ckpool_t *ckp, char *buf); +static void srecv_process(ckpool_t *ckp, json_t *val); /* For emergency use only, flushes all pending ckdbq messages */ static void ckdbq_flush(sdata_t *sdata) @@ -6498,21 +6498,16 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat parse_method(ckp, sdata, client, client_id, id_val, method, params); } -static void srecv_process(ckpool_t *ckp, char *buf) +static void srecv_process(ckpool_t *ckp, json_t *val) { + char address[INET6_ADDRSTRLEN], *buf; bool noid = false, dropped = false; - char address[INET6_ADDRSTRLEN]; sdata_t *sdata = ckp->sdata; stratum_instance_t *client; smsg_t *msg; - json_t *val; int server; - val = json_loads(buf, 0, NULL); - if (unlikely(!val)) { - LOGWARNING("Received unrecognised non-json message: %s", buf); - goto out; - } + buf = json_dumps(val, JSON_COMPACT); msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; val = json_object_get(msg->json_msg, "client_id"); @@ -6575,10 +6570,16 @@ static void srecv_process(ckpool_t *ckp, char *buf) dec_instance_ref(sdata, client); out_freemsg: free_smsg(msg); -out: free(buf); } +void stratifier_add_recv(ckpool_t *ckp, json_t *val) +{ + sdata_t *sdata = ckp->sdata; + + ckmsgq_add(sdata->srecvs, val); +} + static void ssend_process(ckpool_t *ckp, smsg_t *msg) { char *s; diff --git a/src/stratifier.h b/src/stratifier.h index 096614f1..6b66a201 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,6 +10,9 @@ #ifndef STRATIFIER_H #define STRATIFIER_H +#include "ckpool.h" + +void stratifier_add_recv(ckpool_t *ckp, json_t *val); void *stratifier(void *arg); #endif /* STRATIFIER_H */