Browse Source

Move json decoding to threaded client_message_processor

master
Con Kolivas 9 years ago
parent
commit
0b0dfe4f83
  1. 25
      src/connector.c

25
src/connector.c

@ -1226,11 +1226,17 @@ out:
return ret; return ret;
} }
static void client_message_processor(ckpool_t *ckp, json_t *json_msg) static void client_message_processor(ckpool_t *ckp, char *buf)
{ {
json_t *json_msg = json_loads(buf, 0, NULL);
int64_t client_id; int64_t client_id;
char *msg; char *msg;
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
goto out;
}
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
@ -1242,18 +1248,8 @@ static void client_message_processor(ckpool_t *ckp, json_t *json_msg)
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(ckp->data, client_id, msg); send_client(ckp->data, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
} out:
free(buf);
static void process_client_msg(cdata_t *cdata, const char *buf)
{
json_t *json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
ckmsgq_add(cdata->cmpq, json_msg);
} }
/* Send the passthrough the terminate node.method */ /* Send the passthrough the terminate node.method */
@ -1365,7 +1361,8 @@ retry:
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf); ckmsgq_add(cdata->cmpq, buf);
umsg->buf = NULL;
} else if (cmdmatch(buf, "upstream=")) { } else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); char *msg = strdup(buf + 9);

Loading…
Cancel
Save