Browse Source

Look for json messages for clients and process them first in the connector

master
Con Kolivas 10 years ago
parent
commit
65a6f4be5f
  1. 92
      src/connector.c

92
src/connector.c

@ -596,6 +596,35 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }
static void process_client_msg(cdata_t *cdata, const char *buf)
{
int64_t client_id64, client_id;
json_t *json_msg;
char *msg;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf);
return;
}
/* Extract the client id from the json message and remove its entry */
client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
} else
client_id = client_id64;
msg = json_dumps(json_msg, 0);
realloc_strcat(&msg, "\n");
send_client(cdata, client_id, msg);
json_decref(json_msg);
}
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{ {
int sockd = -1, ret = 0, selret; int sockd = -1, ret = 0, selret;
@ -603,7 +632,6 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf = NULL; char *buf = NULL;
json_t *json_msg;
do { do {
selret = wait_read_select(us->sockd, 5); selret = wait_read_select(us->sockd, 5);
@ -642,30 +670,26 @@ retry:
LOGWARNING("Failed to get message in connector_loop"); LOGWARNING("Failed to get message in connector_loop");
goto retry; goto retry;
} }
LOGDEBUG("Connector received message: %s", buf); LOGDEBUG("Connector received message: %s", buf);
if (cmdmatch(buf, "ping")) { /* The bulk of the messages will be json messages to send to clients
* so look for them first. */
if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf);
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Connector received ping request"); LOGDEBUG("Connector received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
goto retry; } else if (cmdmatch(buf, "accept")) {
}
if (cmdmatch(buf, "accept")) {
LOGDEBUG("Connector received accept signal"); LOGDEBUG("Connector received accept signal");
cdata->accept = true; cdata->accept = true;
goto retry; } else if (cmdmatch(buf, "reject")) {
}
if (cmdmatch(buf, "reject")) {
LOGDEBUG("Connector received reject signal"); LOGDEBUG("Connector received reject signal");
cdata->accept = false; cdata->accept = false;
goto retry; } else if (cmdmatch(buf, "loglevel")) {
}
if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
goto retry; } else if (cmdmatch(buf, "shutdown")) {
}
if (cmdmatch(buf, "shutdown"))
goto out; goto out;
if (cmdmatch(buf, "dropclient")) { } else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client; client_instance_t *client;
ret = sscanf(buf, "dropclient=%ld", &client_id64); ret = sscanf(buf, "dropclient=%ld", &client_id64);
@ -683,9 +707,7 @@ retry:
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
if (ret >= 0) if (ret >= 0)
LOGINFO("Connector dropped client id: %ld", client_id); LOGINFO("Connector dropped client id: %ld", client_id);
goto retry; } else if (cmdmatch(buf, "passthrough")) {
}
if (cmdmatch(buf, "passthrough")) {
client_instance_t *client; client_instance_t *client;
ret = sscanf(buf, "passthrough=%ld", &client_id); ret = sscanf(buf, "passthrough=%ld", &client_id);
@ -700,38 +722,10 @@ retry:
} }
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
goto retry; } else if (cmdmatch(buf, "getfd")) {
}
if (cmdmatch(buf, "getfd")) {
send_fd(cdata->serverfd[0], sockd); send_fd(cdata->serverfd[0], sockd);
goto retry;
}
/* Anything else should be a json message to send to a client */
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf);
goto retry;
}
/* Extract the client id from the json message and remove its entry */
client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
} else } else
client_id = client_id64; LOGWARNING("Unhandled connector message: %s", buf);
dealloc(buf);
buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n");
send_client(cdata, client_id, buf);
json_decref(json_msg);
buf = NULL;
goto retry; goto retry;
out: out:
Close(sockd); Close(sockd);

Loading…
Cancel
Save