diff --git a/src/connector.c b/src/connector.c index 69f0d3a3..a0404e00 100644 --- a/src/connector.c +++ b/src/connector.c @@ -430,7 +430,7 @@ static void drop_all_clients(cdata_t *cdata) ck_wunlock(&cdata->lock); } -static void send_client(cdata_t *cdata, int64_t id, char *buf); +static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, uint32_t len); /* Look for shares being submitted via a redirector and add them to a linked * list for looking up the responses. */ @@ -460,6 +460,23 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) } } +static void send_client_msg(cdata_t *cdata, const int64_t id, char *buf) +{ + uint32_t len; + + if (unlikely(!buf)) { + LOGWARNING("Connector send_client sent a null buffer"); + return; + } + len = strlen(buf); + if (unlikely(!len)) { + LOGWARNING("Connector send_client sent a zero length buffer"); + free(buf); + return; + } + send_client(cdata, id, buf, len, len); +} + /* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { @@ -512,7 +529,7 @@ reparse: char *buf = strdup("Invalid JSON, disconnecting\n"); LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg); - send_client(cdata, client->id, buf); + send_client_msg(cdata, client->id, buf); invalidate_client(ckp, cdata, client); return; } else { @@ -890,25 +907,31 @@ out: /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, const int64_t id, char *buf) +static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, uint32_t len) { ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; - int len; - - if (unlikely(!buf)) { - LOGWARNING("Connector send_client sent a null buffer"); - return; - } - len = strlen(buf); - if (unlikely(!len)) { - LOGWARNING("Connector send_client sent a zero length buffer"); - free(buf); - return; - } + char *bkey = NULL; + json_t *val; if (unlikely(ckp->node && !id)) { + uint32_t blen = slen - len; + + if (blen > 0) { + bkey = strstr(buf + slen - 4 - 1, "bkey"); + if (likely(bkey)) { + val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); + if (unlikely(!val)) { + LOGWARNING("No json in bkey appended message %s", buf); + goto out; + } + json_append_bkeys(val, bkey, blen); + free(buf); + buf = json_dumps(val, JSON_EOL | JSON_COMPACT); + json_decref(val); + } + } LOGDEBUG("Message for node: %s", buf); send_proc(ckp->stratifier, buf); free(buf); @@ -946,9 +969,9 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) return; } if (ckp->node) { - json_t *val = json_loads(buf, 0, NULL); char *msg; + val = json_loads(buf, 0, NULL); if (!val) // Can happen if client sent invalid json message goto out; json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); @@ -993,7 +1016,7 @@ static void passthrough_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n"); - send_client(cdata, client->id, buf); + send_client_msg(cdata, client->id, buf); if (!ckp->rmem_warn) set_recvbufsize(ckp, client->fd, 1048576); if (!ckp->wmem_warn) @@ -1008,7 +1031,7 @@ static void remote_server(ckpool_t *ckp, cdata_t *cdata, client_instance_t *clie client->id, client->address_name); client->remote = true; ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n"); - send_client(cdata, client->id, buf); + send_client_msg(cdata, client->id, buf); if (!ckp->rmem_warn) set_recvbufsize(ckp, client->fd, 1048576); } @@ -1208,11 +1231,11 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) char *msg, *bkey = NULL; int64_t client_id; json_t *json_msg; - uint32_t len; + uint32_t slen; - len = strlen(buf); - if (likely(len > 4)) { - bkey = strstr(buf + len - 4 - 1, "bkey"); + slen = strlen(buf); + if (likely(slen > 4)) { + bkey = strstr(buf + slen - 4 - 1, "bkey"); if (bkey) LOGDEBUG("Bkey found in process_client_msg"); } @@ -1221,10 +1244,12 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) LOGWARNING("Invalid json message in process_client_msg: %s", buf); return; } - if (unlikely(bkey)) { - len = msglen - (bkey - buf); - json_append_bkeys(json_msg, bkey, len); - } +#if 0 + if (unlikely(bkey)) + msglen = msglen - (bkey - buf); + else + msglen = len; +#endif /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); @@ -1235,7 +1260,7 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); - send_client(cdata, client_id, msg); + send_client(cdata, client_id, msg, slen, msglen); json_decref(json_msg); } @@ -1250,7 +1275,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) /* We have a direct connection to the passthrough's connector so we * can send it any regular commands. */ ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id); - send_client(cdata, id, msg); + send_client_msg(cdata, id, msg); } static char *connector_stats(cdata_t *cdata, const int runtime)