From 690870bc92afd7341afc168170ca6b5280397bad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 20 Apr 2015 16:08:43 +1000 Subject: [PATCH] Convert the proxy_loop to use unix receive queues --- src/generator.c | 47 +++++++++++++++++------------------------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/src/generator.c b/src/generator.c index 1a06206c..3a3989fe 100644 --- a/src/generator.c +++ b/src/generator.c @@ -407,7 +407,6 @@ retry: out: kill_server(si); - dealloc(buf); return ret; } @@ -2515,14 +2514,14 @@ out: static int proxy_loop(proc_instance_t *pi) { proxy_instance_t *proxi = NULL, *cproxy; - int sockd = -1, ret = 0, selret; - unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; gdata_t *gdata = ckp->data; + unix_msg_t *umsg = NULL; char *buf = NULL; + int ret = 0; reconnect: - Close(sockd); + clear_unix_msg(&umsg); /* This does not necessarily mean we reconnect, but a change has * occurred and we need to reexamine the proxies. */ cproxy = wait_best_proxy(ckp, gdata); @@ -2536,28 +2535,17 @@ reconnect: } } retry: - Close(sockd); + clear_unix_msg(&umsg); do { - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { + umsg = get_unix_msg(pi); + if (unlikely(!umsg &&!ping_main(ckp))) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; goto out; } - } while (selret < 1); + } while (!umsg); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGEMERG("Failed to accept on proxy socket"); - ret = 1; - goto out; - } - dealloc(buf); - buf = recv_unix_msg(sockd); - if (!buf) { - LOGWARNING("Failed to get message in proxy_loop"); - goto retry; - } + buf = umsg->buf; LOGDEBUG("Proxy received request: %s", buf); if (likely(buf[0] == '{')) { if (ckp->passthrough) @@ -2572,21 +2560,21 @@ retry: submit_share(gdata, val); } } else if (cmdmatch(buf, "stats")) { - send_stats(gdata, sockd); + send_stats(gdata, umsg->sockd); } else if (cmdmatch(buf, "list")) { - send_list(gdata, sockd); + send_list(gdata, umsg->sockd); } else if (cmdmatch(buf, "sublist")) { - send_sublist(gdata, sockd, buf + 8); + send_sublist(gdata, umsg->sockd, buf + 8); } else if (cmdmatch(buf, "addproxy")) { - parse_addproxy(ckp, gdata, sockd, buf + 9); + parse_addproxy(ckp, gdata, umsg->sockd, buf + 9); } else if (cmdmatch(buf, "delproxy")) { - parse_delproxy(ckp, gdata, sockd, buf + 9); + parse_delproxy(ckp, gdata, umsg->sockd, buf + 9); } else if (cmdmatch(buf, "enableproxy")) { - parse_ableproxy(gdata, sockd, buf + 12, false); + parse_ableproxy(gdata, umsg->sockd, buf + 12, false); } else if (cmdmatch(buf, "disableproxy")) { - parse_ableproxy(gdata, sockd, buf + 13, true); + parse_ableproxy(gdata, umsg->sockd, buf + 13, true); } else if (cmdmatch(buf, "proxystats")) { - parse_proxystats(gdata, sockd, buf + 11); + parse_proxystats(gdata, umsg->sockd, buf + 11); } else if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; @@ -2598,7 +2586,7 @@ retry: sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Proxy received ping request"); - send_unix_msg(sockd, "pong"); + send_unix_msg(umsg->sockd, "pong"); } else if (cmdmatch(buf, "recruit")) { recruit_subproxy(gdata, buf); } else if (cmdmatch(buf, "dropproxy")) { @@ -2608,7 +2596,6 @@ retry: } goto retry; out: - Close(sockd); return ret; }