Browse Source

Make client message processing a ckmsq to minimise connector_loop holdup

master
Con Kolivas 9 years ago
parent
commit
6986ba95fc
  1. 28
      src/connector.c

28
src/connector.c

@ -135,6 +135,9 @@ struct connector_data {
int64_t client_id;
/* client message process queue */
ckmsgq_t *cmpq;
/* For the linked list of pending sends */
sender_send_t *sender_sends;
@ -1200,18 +1203,11 @@ out:
return ret;
}
static void process_client_msg(cdata_t *cdata, const char *buf)
static void client_message_processor(ckpool_t *ckp, json_t *json_msg)
{
int64_t client_id;
json_t *json_msg;
char *msg;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
/* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
@ -1221,10 +1217,22 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(cdata, client_id, msg);
send_client(ckp->data, client_id, msg);
json_decref(json_msg);
}
static void process_client_msg(cdata_t *cdata, const char *buf)
{
json_t *json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
ckmsgq_add(cdata->cmpq, json_msg);
}
/* Send the passthrough the terminate node.method */
static void drop_passthrough_client(cdata_t *cdata, const int64_t id)
{
@ -1539,6 +1547,8 @@ int connector(proc_instance_t *pi)
if (tries)
LOGWARNING("Connector successfully bound to socket");
cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor);
if (ckp->remote && !setup_upstream(ckp, cdata)) {
ret = 1;
goto out;

Loading…
Cancel
Save