diff --git a/src/connector.c b/src/connector.c index a18ba2e7..0b1f4de7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -225,6 +225,98 @@ out: return NULL; } +static void send_client(conn_instance_t *ci, int id, const char *buf) +{ + int fd = -1, ret, len, ofs = 0; + client_instance_t *client; + + if (unlikely(!buf)) { + LOGWARNING("Connector send_client sent a null buffer"); + return; + } + len = strlen(buf); + if (unlikely(!len)) { + LOGWARNING("Connector send_client sent a zero length buffer"); + return; + } + + ck_rlock(&ci->lock); + HASH_FIND_INT(clients, &id, client); + if (likely(client)) + fd = client->fd; + ck_runlock(&ci->lock); + + if (unlikely(fd == -1)) { + if (client) + LOGWARNING("Client id %d disconnected", id); + else + LOGWARNING("Connector failed to find client id %d", id); + return; + } + + while (len) { + ret = send(fd, buf + ofs, len , 0); + if (unlikely(ret < 0)) { + if (interrupted()) + continue; + LOGWARNING("Client id %d disconnected", id); + invalidate_client(client); + return; + } + ofs += ret; + len -= ret; + } +} + +static int connector_loop(ckpool_t *ckp, proc_instance_t *pi, conn_instance_t *ci) +{ + int sockd, client_id, ret = 0; + unixsock_t *us = &pi->us; + char *buf = NULL; + json_t *json_msg; + +retry: + dealloc(buf); + sockd = accept(us->sockd, NULL, NULL); + if (sockd < 0) { + if (interrupted()) + goto retry; + LOGERR("Failed to accept on connector socket"); + ret = 1; + goto out; + } + + buf = recv_unix_msg(sockd); + if (!buf) { + LOGWARNING("Failed to get message in connector_loop"); + close(sockd); + goto retry; + } + LOGDEBUG("Connector received message: %s", buf); + if (!strncasecmp(buf, "shutdown", 8)) + goto out; + json_msg = json_loads(buf, 0, NULL); + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message: %s", buf); + goto retry; + } + + /* Extract the client id from the json message and remove its entry */ + client_id = json_integer_value(json_object_get(json_msg, "client_id")); + json_object_del(json_msg, "client_id"); + dealloc(buf); + buf = json_dumps(json_msg, 0); + realloc_strcat(&buf, "\n"); + send_client(ci, client_id, buf); + json_decref(json_msg); + close(sockd); + + goto retry; +out: + dealloc(buf); + return ret; +} + int connector(proc_instance_t *pi) { pthread_t pth_acceptor, pth_receiver; @@ -282,8 +374,9 @@ int connector(proc_instance_t *pi) create_pthread(&pth_acceptor, acceptor, &ci); create_pthread(&pth_receiver, receiver, &ci); - join_pthread(pth_acceptor); - ret = 1; + ret = connector_loop(ckp, pi, &ci); + + //join_pthread(pth_acceptor); out: LOGINFO("%s connector exiting with return code %d", ckp->name, ret); if (ret) {