From cd754e98e46c276686f6f894bd1e5c34e739d7fc Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 20 Apr 2015 14:06:54 +1000 Subject: [PATCH] Move connector to using unix receive queues --- src/connector.c | 49 ++++++++++++++++++------------------------------ src/stratifier.c | 2 +- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/src/connector.c b/src/connector.c index 424483a8..43f268e5 100644 --- a/src/connector.c +++ b/src/connector.c @@ -768,25 +768,22 @@ static char *connector_stats(cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int sockd = -1, ret = 0, selret; int64_t client_id64, client_id; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; uint8_t test_cycle = 0; - char *buf = NULL; - - do { - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { - LOGEMERG("Connector failed to ping main process, exiting"); - ret = 1; - goto out; - } - } while (selret < 1); + char *buf; + int ret = 0; LOGWARNING("%s connector ready", ckp->name); retry: + if (umsg) { + Close(umsg->sockd); + free(umsg->buf); + dealloc(umsg); + } + if (!++test_cycle) { /* Test for pthread join every 256 messages */ if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { @@ -801,21 +798,11 @@ retry: } } - Close(sockd); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGEMERG("Failed to accept on connector socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (!buf) { - LOGWARNING("Failed to get message in connector_loop"); - goto retry; - } + do { + umsg = get_unix_msg(pi); + } while (!umsg); + buf = umsg->buf; LOGDEBUG("Connector received message: %s", buf); /* The bulk of the messages will be json messages to send to clients * so look for them first. */ @@ -841,7 +828,7 @@ retry: LOGINFO("Connector dropped client id: %"PRId64, client_id); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Connector received ping request"); - send_unix_msg(sockd, "pong"); + send_unix_msg(umsg->sockd, "pong"); } else if (cmdmatch(buf, "accept")) { LOGDEBUG("Connector received accept signal"); cdata->accept = true; @@ -853,7 +840,7 @@ retry: LOGDEBUG("Connector received stats request"); msg = connector_stats(cdata); - send_unix_msg(sockd, msg); + send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { @@ -878,13 +865,11 @@ retry: sscanf(buf, "getxfd%d", &fdno); if (fdno > -1 && fdno < ckp->serverurls) - send_fd(cdata->serverfd[fdno], sockd); + send_fd(cdata->serverfd[fdno], umsg->sockd); } else LOGWARNING("Unhandled connector message: %s", buf); goto retry; out: - Close(sockd); - dealloc(buf); return ret; } @@ -996,6 +981,8 @@ int connector(proc_instance_t *pi) create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + create_unix_receiver(pi); + ret = connector_loop(pi, cdata); out: dealloc(ckp->data); diff --git a/src/stratifier.c b/src/stratifier.c index 49a2cbf6..500cb835 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1770,7 +1770,7 @@ retry: umsg = get_unix_msg(pi); if (unlikely(!umsg &&!ping_main(ckp))) { - LOGEMERG("Generator failed to ping main process, exiting"); + LOGEMERG("Stratifier failed to ping main process, exiting"); ret = 1; goto out; }