Browse Source

Return a json response to any valid json from the stratifier to connector to send to the relevant connected client

master
Con Kolivas 11 years ago
parent
commit
72ebb96ef6
  1. 97
      src/connector.c

97
src/connector.c

@ -225,6 +225,98 @@ out:
return NULL;
}
static void send_client(conn_instance_t *ci, int id, const char *buf)
{
int fd = -1, ret, len, ofs = 0;
client_instance_t *client;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
return;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
return;
}
ck_rlock(&ci->lock);
HASH_FIND_INT(clients, &id, client);
if (likely(client))
fd = client->fd;
ck_runlock(&ci->lock);
if (unlikely(fd == -1)) {
if (client)
LOGWARNING("Client id %d disconnected", id);
else
LOGWARNING("Connector failed to find client id %d", id);
return;
}
while (len) {
ret = send(fd, buf + ofs, len , 0);
if (unlikely(ret < 0)) {
if (interrupted())
continue;
LOGWARNING("Client id %d disconnected", id);
invalidate_client(client);
return;
}
ofs += ret;
len -= ret;
}
}
static int connector_loop(ckpool_t *ckp, proc_instance_t *pi, conn_instance_t *ci)
{
int sockd, client_id, ret = 0;
unixsock_t *us = &pi->us;
char *buf = NULL;
json_t *json_msg;
retry:
dealloc(buf);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
if (interrupted())
goto retry;
LOGERR("Failed to accept on connector socket");
ret = 1;
goto out;
}
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in connector_loop");
close(sockd);
goto retry;
}
LOGDEBUG("Connector received message: %s", buf);
if (!strncasecmp(buf, "shutdown", 8))
goto out;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf);
goto retry;
}
/* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
dealloc(buf);
buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n");
send_client(ci, client_id, buf);
json_decref(json_msg);
close(sockd);
goto retry;
out:
dealloc(buf);
return ret;
}
int connector(proc_instance_t *pi)
{
pthread_t pth_acceptor, pth_receiver;
@ -282,8 +374,9 @@ int connector(proc_instance_t *pi)
create_pthread(&pth_acceptor, acceptor, &ci);
create_pthread(&pth_receiver, receiver, &ci);
join_pthread(pth_acceptor);
ret = 1;
ret = connector_loop(ckp, pi, &ci);
//join_pthread(pth_acceptor);
out:
LOGINFO("%s connector exiting with return code %d", ckp->name, ret);
if (ret) {

Loading…
Cancel
Save