diff --git a/src/connector.c b/src/connector.c index 9d7ecfde..6fbe2387 100644 --- a/src/connector.c +++ b/src/connector.c @@ -21,6 +21,7 @@ #include "uthash.h" #include "utlist.h" #include "stratifier.h" +#include "generator.h" #define MAX_MSGSIZE 1024 @@ -363,15 +364,11 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) { json_t *val; - char *s; JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", client->address_name, "server", client->server, "method", "mining.term", "params"); - s = json_dumps(val, JSON_COMPACT); - json_decref(val); - send_proc(ckp->generator, s); - free(s); + generator_add_send(ckp, val); } static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) @@ -509,8 +506,6 @@ reparse: send_client(cdata, client->id, buf); return false; } else { - char *s; - if (client->passthrough) { int64_t passthrough_id; @@ -524,22 +519,18 @@ reparse: json_object_set_new_nocheck(val, "address", json_string(client->address_name)); } json_object_set_new_nocheck(val, "server", json_integer(client->server)); - s = json_dumps(val, JSON_COMPACT); /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ if (likely(!client->invalid)) { - if (!ckp->passthrough || ckp->node) { + if (!ckp->passthrough) stratifier_add_recv(ckp, val); - val = NULL; - } + if (ckp->node) + stratifier_add_recv(ckp, json_deep_copy(val)); if (ckp->passthrough) - send_proc(ckp->generator, s); - } - - free(s); - if (val) + generator_add_send(ckp, val); + } else json_decref(val); } client->bufofs -= buflen; @@ -1357,7 +1348,11 @@ retry: LOGDEBUG("Connector received message: %s", buf); /* The bulk of the messages will be json messages to send to clients * so look for them first. */ - if (cmdmatch(buf, "upstream=")) { + if (likely(buf[0] == '{')) { + json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); + + ckmsgq_add(cdata->cmpq, val); + } else if (cmdmatch(buf, "upstream=")) { char *msg = strdup(buf + 9); LOGDEBUG("Upstreaming %s", msg); diff --git a/src/generator.c b/src/generator.c index d5986a69..47b39325 100644 --- a/src/generator.c +++ b/src/generator.c @@ -163,6 +163,8 @@ struct generator_data { mutex_t share_lock; share_msg_t *shares; int64_t share_id; + + proxy_instance_t *current_proxy; }; typedef struct generator_data gdata_t; @@ -1731,16 +1733,39 @@ out: free(pm); } -static void passthrough_add_send(proxy_instance_t *proxy, const char *msg) +static void passthrough_add_send(proxy_instance_t *proxy, char *msg) { pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); pm->proxy = proxy; pm->cs = &proxy->cs; - ASPRINTF(&pm->msg, "%s\n", msg); + pm->msg = msg; ckmsgq_add(proxy->passsends, pm); } +void generator_add_send(ckpool_t *ckp, json_t *val) +{ + gdata_t *gdata = ckp->gdata; + char *buf; + + if (!ckp->passthrough) { + submit_share(gdata, val); + return; + } + if (unlikely(!gdata->current_proxy)) { + LOGWARNING("No current proxy to send passthrough data to"); + goto out; + } + buf = json_dumps(val, JSON_COMPACT | JSON_EOL); + if (unlikely(!buf)) { + LOGWARNING("Unable to decode json in generator_add_send"); + goto out; + } + passthrough_add_send(gdata->current_proxy, buf); +out: + json_decref(val); +} + static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, bool pinging) { @@ -2692,7 +2717,7 @@ reconnect: if (!cproxy) goto out; if (proxi != cproxy) { - proxi = cproxy; + gdata->current_proxy = proxi = cproxy; LOGWARNING("Successfully connected to pool %d %s as proxy%s", proxi->id, proxi->url, ckp->passthrough ? " in passthrough mode" : ""); } diff --git a/src/generator.h b/src/generator.h index 2e6bdf2a..639af50a 100644 --- a/src/generator.h +++ b/src/generator.h @@ -12,6 +12,7 @@ #include "config.h" +void generator_add_send(ckpool_t *ckp, json_t *val); void *generator(void *arg); #endif /* GENERATOR_H */