diff --git a/src/connector.c b/src/connector.c index 576636c1..8616f7b3 100644 --- a/src/connector.c +++ b/src/connector.c @@ -453,7 +453,7 @@ static void drop_all_clients(cdata_t *cdata) ck_wunlock(&cdata->lock); } -static void send_client(cdata_t *cdata, int64_t id, char *buf); +static void send_client(ckpool_t *ckp, cdata_t *cdata, int64_t id, char *buf); /* Look for shares being submitted via a redirector and add them to a linked * list for looking up the responses. */ @@ -532,7 +532,7 @@ reparse: char *buf = strdup("Invalid JSON, disconnecting\n"); LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf); - send_client(cdata, client->id, buf); + send_client(ckp, cdata, client->id, buf); return false; } else { if (client->passthrough) { @@ -953,9 +953,8 @@ out: /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, const int64_t id, char *buf) +static void send_client(ckpool_t *ckp, cdata_t *cdata, const int64_t id, char *buf) { - ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; bool redirect = false; @@ -1009,16 +1008,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) free(buf); return; } - if (ckp->node) { - json_t *val = json_loads(buf, 0, NULL); - - if (!val) // Can happen if client sent invalid json message - goto out; - json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); - json_object_set_new_nocheck(val, "address", json_string(client->address_name)); - json_object_set_new_nocheck(val, "server", json_integer(client->server)); - stratifier_add_recv(ckp, val); - } if (ckp->redirector && !client->redirected && client->authorised) { /* If clients match the IP of clients that have already * been whitelisted as finding valid shares then @@ -1029,7 +1018,7 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) redirect = test_redirector_shares(cdata, client, buf); } } -out: + sender_send = ckzalloc(sizeof(sender_send_t)); sender_send->client = client; sender_send->buf = buf; @@ -1046,6 +1035,28 @@ out: redirect_client(ckp, client); } +static void send_client_json(ckpool_t *ckp, cdata_t *cdata, int64_t client_id, json_t *json_msg) +{ + client_instance_t *client; + char *msg; + + if (ckp->node && (client = ref_client_by_id(cdata, client_id))) { + json_t *val = json_deep_copy(json_msg); + + json_object_set_new_nocheck(val, "client_id", json_integer(client_id)); + json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "server", json_integer(client->server)); + dec_instance_ref(cdata, client); + stratifier_add_recv(ckp, val); + } + if (ckp->passthrough && client_id) + json_object_del(json_msg, "node.method"); + + msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); + send_client(ckp, cdata, client_id, msg); + json_decref(json_msg); +} + static bool client_exists(cdata_t *cdata, const int64_t id) { client_instance_t *client; @@ -1059,12 +1070,12 @@ static bool client_exists(cdata_t *cdata, const int64_t id) static void passthrough_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - char *buf; + json_t *val; LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true}\n"); - send_client(cdata, client->id, buf); + JSON_CPACK(val, "{sb}", "result", true); + send_client_json(ckp, cdata, client->id, val); if (!ckp->rmem_warn) set_recvbufsize(ckp, client->fd, 1048576); if (!ckp->wmem_warn) @@ -1073,13 +1084,14 @@ static void passthrough_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t static void remote_server(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - char *buf; + json_t *val; LOGWARNING("Connector adding client %"PRId64" %s as remote trusted server", client->id, client->address_name); client->remote = true; - ASPRINTF(&buf, "{\"result\": true, \"ckdb\": %s}\n", CKP_STANDALONE(ckp) ? "false" : "true"); - send_client(cdata, client->id, buf); + JSON_CPACK(val, "{sbsb}", + "result", true, "ckdb", CKP_STANDALONE(ckp) ? false : true); + send_client_json(ckp, cdata, client->id, val); if (!ckp->rmem_warn) set_recvbufsize(ckp, client->fd, 2097152); if (!ckp->wmem_warn) @@ -1280,8 +1292,8 @@ out: static void client_message_processor(ckpool_t *ckp, json_t *json_msg) { cdata_t *cdata = ckp->cdata; + client_instance_t *client; int64_t client_id; - char *msg; /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); @@ -1292,23 +1304,17 @@ static void client_message_processor(ckpool_t *ckp, json_t *json_msg) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); /* Flag redirector clients once they've been authorised */ - if (ckp->redirector) { - client_instance_t *client = ref_client_by_id(cdata, client_id); - - if (likely(client)) { - if (!client->redirected && !client->authorised) { - json_t *method_val = json_object_get(json_msg, "node.method"); - const char *method = json_string_value(method_val); + if (ckp->redirector && (client = ref_client_by_id(cdata, client_id))) { + if (!client->redirected && !client->authorised) { + json_t *method_val = json_object_get(json_msg, "node.method"); + const char *method = json_string_value(method_val); - if (!safecmp(method, stratum_msgs[SM_AUTHRESULT])) - client->authorised = true; - } - dec_instance_ref(cdata, client); + if (!safecmp(method, stratum_msgs[SM_AUTHRESULT])) + client->authorised = true; } + dec_instance_ref(cdata, client); } - msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); - send_client(cdata, client_id, msg); - json_decref(json_msg); + send_client_json(ckp, cdata, client_id, json_msg); } void connector_add_message(ckpool_t *ckp, json_t *val) @@ -1319,7 +1325,7 @@ void connector_add_message(ckpool_t *ckp, json_t *val) } /* Send the passthrough the terminate node.method */ -static void drop_passthrough_client(cdata_t *cdata, const int64_t id) +static void drop_passthrough_client(ckpool_t *ckp, cdata_t *cdata, const int64_t id) { int64_t client_id; char *msg; @@ -1329,7 +1335,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) /* We have a direct connection to the passthrough's connector so we * can send it any regular commands. */ ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id); - send_client(cdata, id, msg); + send_client(ckp, cdata, id, msg); } char *connector_stats(void *data, const int runtime) @@ -1450,7 +1456,7 @@ retry: } /* A passthrough client */ if (client_id > 0xffffffffll) { - drop_passthrough_client(cdata, client_id); + drop_passthrough_client(ckp, cdata, client_id); goto retry; } client = ref_client_by_id(cdata, client_id);