Browse Source

Convert the proxy_loop to use unix receive queues

master
Con Kolivas 10 years ago
parent
commit
690870bc92
  1. 47
      src/generator.c

47
src/generator.c

@ -407,7 +407,6 @@ retry:
out:
kill_server(si);
dealloc(buf);
return ret;
}
@ -2515,14 +2514,14 @@ out:
static int proxy_loop(proc_instance_t *pi)
{
proxy_instance_t *proxi = NULL, *cproxy;
int sockd = -1, ret = 0, selret;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data;
unix_msg_t *umsg = NULL;
char *buf = NULL;
int ret = 0;
reconnect:
Close(sockd);
clear_unix_msg(&umsg);
/* This does not necessarily mean we reconnect, but a change has
* occurred and we need to reexamine the proxies. */
cproxy = wait_best_proxy(ckp, gdata);
@ -2536,28 +2535,17 @@ reconnect:
}
}
retry:
Close(sockd);
clear_unix_msg(&umsg);
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
} while (!umsg);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGEMERG("Failed to accept on proxy socket");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in proxy_loop");
goto retry;
}
buf = umsg->buf;
LOGDEBUG("Proxy received request: %s", buf);
if (likely(buf[0] == '{')) {
if (ckp->passthrough)
@ -2572,21 +2560,21 @@ retry:
submit_share(gdata, val);
}
} else if (cmdmatch(buf, "stats")) {
send_stats(gdata, sockd);
send_stats(gdata, umsg->sockd);
} else if (cmdmatch(buf, "list")) {
send_list(gdata, sockd);
send_list(gdata, umsg->sockd);
} else if (cmdmatch(buf, "sublist")) {
send_sublist(gdata, sockd, buf + 8);
send_sublist(gdata, umsg->sockd, buf + 8);
} else if (cmdmatch(buf, "addproxy")) {
parse_addproxy(ckp, gdata, sockd, buf + 9);
parse_addproxy(ckp, gdata, umsg->sockd, buf + 9);
} else if (cmdmatch(buf, "delproxy")) {
parse_delproxy(ckp, gdata, sockd, buf + 9);
parse_delproxy(ckp, gdata, umsg->sockd, buf + 9);
} else if (cmdmatch(buf, "enableproxy")) {
parse_ableproxy(gdata, sockd, buf + 12, false);
parse_ableproxy(gdata, umsg->sockd, buf + 12, false);
} else if (cmdmatch(buf, "disableproxy")) {
parse_ableproxy(gdata, sockd, buf + 13, true);
parse_ableproxy(gdata, umsg->sockd, buf + 13, true);
} else if (cmdmatch(buf, "proxystats")) {
parse_proxystats(gdata, sockd, buf + 11);
parse_proxystats(gdata, umsg->sockd, buf + 11);
} else if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
@ -2598,7 +2586,7 @@ retry:
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
send_unix_msg(umsg->sockd, "pong");
} else if (cmdmatch(buf, "recruit")) {
recruit_subproxy(gdata, buf);
} else if (cmdmatch(buf, "dropproxy")) {
@ -2608,7 +2596,6 @@ retry:
}
goto retry;
out:
Close(sockd);
return ret;
}

Loading…
Cancel
Save