Browse Source

Move bkey extraction to send_client WIP

master
Con Kolivas 9 years ago
parent
commit
6be75b4c1b
  1. 81
      src/connector.c

81
src/connector.c

@ -430,7 +430,7 @@ static void drop_all_clients(cdata_t *cdata)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
} }
static void send_client(cdata_t *cdata, int64_t id, char *buf); static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, uint32_t len);
/* Look for shares being submitted via a redirector and add them to a linked /* Look for shares being submitted via a redirector and add them to a linked
* list for looking up the responses. */ * list for looking up the responses. */
@ -460,6 +460,23 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val)
} }
} }
static void send_client_msg(cdata_t *cdata, const int64_t id, char *buf)
{
uint32_t len;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
return;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
free(buf);
return;
}
send_client(cdata, id, buf, len, len);
}
/* Client is holding a reference count from being on the epoll list */ /* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{ {
@ -512,7 +529,7 @@ reparse:
char *buf = strdup("Invalid JSON, disconnecting\n"); char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg); LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg);
send_client(cdata, client->id, buf); send_client_msg(cdata, client->id, buf);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} else { } else {
@ -890,25 +907,31 @@ out:
/* Send a client by id a heap allocated buffer, allowing this function to /* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */ * free the ram. */
static void send_client(cdata_t *cdata, const int64_t id, char *buf) static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, uint32_t len)
{ {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
int len; char *bkey = NULL;
json_t *val;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
return;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
free(buf);
return;
}
if (unlikely(ckp->node && !id)) { if (unlikely(ckp->node && !id)) {
uint32_t blen = slen - len;
if (blen > 0) {
bkey = strstr(buf + slen - 4 - 1, "bkey");
if (likely(bkey)) {
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGWARNING("No json in bkey appended message %s", buf);
goto out;
}
json_append_bkeys(val, bkey, blen);
free(buf);
buf = json_dumps(val, JSON_EOL | JSON_COMPACT);
json_decref(val);
}
}
LOGDEBUG("Message for node: %s", buf); LOGDEBUG("Message for node: %s", buf);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
free(buf); free(buf);
@ -946,9 +969,9 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
return; return;
} }
if (ckp->node) { if (ckp->node) {
json_t *val = json_loads(buf, 0, NULL);
char *msg; char *msg;
val = json_loads(buf, 0, NULL);
if (!val) // Can happen if client sent invalid json message if (!val) // Can happen if client sent invalid json message
goto out; goto out;
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
@ -993,7 +1016,7 @@ static void passthrough_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t
LOGINFO("Connector adding passthrough client %"PRId64, client->id); LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true; client->passthrough = true;
ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n"); ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n");
send_client(cdata, client->id, buf); send_client_msg(cdata, client->id, buf);
if (!ckp->rmem_warn) if (!ckp->rmem_warn)
set_recvbufsize(ckp, client->fd, 1048576); set_recvbufsize(ckp, client->fd, 1048576);
if (!ckp->wmem_warn) if (!ckp->wmem_warn)
@ -1008,7 +1031,7 @@ static void remote_server(ckpool_t *ckp, cdata_t *cdata, client_instance_t *clie
client->id, client->address_name); client->id, client->address_name);
client->remote = true; client->remote = true;
ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n"); ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n");
send_client(cdata, client->id, buf); send_client_msg(cdata, client->id, buf);
if (!ckp->rmem_warn) if (!ckp->rmem_warn)
set_recvbufsize(ckp, client->fd, 1048576); set_recvbufsize(ckp, client->fd, 1048576);
} }
@ -1208,11 +1231,11 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
char *msg, *bkey = NULL; char *msg, *bkey = NULL;
int64_t client_id; int64_t client_id;
json_t *json_msg; json_t *json_msg;
uint32_t len; uint32_t slen;
len = strlen(buf); slen = strlen(buf);
if (likely(len > 4)) { if (likely(slen > 4)) {
bkey = strstr(buf + len - 4 - 1, "bkey"); bkey = strstr(buf + slen - 4 - 1, "bkey");
if (bkey) if (bkey)
LOGDEBUG("Bkey found in process_client_msg"); LOGDEBUG("Bkey found in process_client_msg");
} }
@ -1221,10 +1244,12 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
LOGWARNING("Invalid json message in process_client_msg: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }
if (unlikely(bkey)) { #if 0
len = msglen - (bkey - buf); if (unlikely(bkey))
json_append_bkeys(json_msg, bkey, len); msglen = msglen - (bkey - buf);
} else
msglen = len;
#endif
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
@ -1235,7 +1260,7 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(cdata, client_id, msg); send_client(cdata, client_id, msg, slen, msglen);
json_decref(json_msg); json_decref(json_msg);
} }
@ -1250,7 +1275,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id)
/* We have a direct connection to the passthrough's connector so we /* We have a direct connection to the passthrough's connector so we
* can send it any regular commands. */ * can send it any regular commands. */
ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id); ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id);
send_client(cdata, id, msg); send_client_msg(cdata, id, msg);
} }
static char *connector_stats(cdata_t *cdata, const int runtime) static char *connector_stats(cdata_t *cdata, const int runtime)

Loading…
Cancel
Save