From 6986ba95fc58de4ac11d8f6d124896f83509ef0a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 09:34:05 +1100 Subject: [PATCH] Make client message processing a ckmsq to minimise connector_loop holdup --- src/connector.c | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/connector.c b/src/connector.c index c93f6ffe..25000977 100644 --- a/src/connector.c +++ b/src/connector.c @@ -135,6 +135,9 @@ struct connector_data { int64_t client_id; + /* client message process queue */ + ckmsgq_t *cmpq; + /* For the linked list of pending sends */ sender_send_t *sender_sends; @@ -1200,18 +1203,11 @@ out: return ret; } -static void process_client_msg(cdata_t *cdata, const char *buf) +static void client_message_processor(ckpool_t *ckp, json_t *json_msg) { int64_t client_id; - json_t *json_msg; char *msg; - json_msg = json_loads(buf, 0, NULL); - if (unlikely(!json_msg)) { - LOGWARNING("Invalid json message in process_client_msg: %s", buf); - return; - } - /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); @@ -1221,10 +1217,22 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); - send_client(cdata, client_id, msg); + send_client(ckp->data, client_id, msg); json_decref(json_msg); } +static void process_client_msg(cdata_t *cdata, const char *buf) +{ + json_t *json_msg = json_loads(buf, 0, NULL); + + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message in process_client_msg: %s", buf); + return; + } + + ckmsgq_add(cdata->cmpq, json_msg); +} + /* Send the passthrough the terminate node.method */ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) { @@ -1539,6 +1547,8 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); + cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); + if (ckp->remote && !setup_upstream(ckp, cdata)) { ret = 1; goto out;