Browse Source

Only consider a proxy dead if all subproxy connections are also dead

master
Con Kolivas 10 years ago
parent
commit
0afbf3eff2
  1. 89
      src/generator.c

89
src/generator.c

@ -1701,6 +1701,31 @@ static void recruit_subproxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_recruit, proxi); create_pthread(&pth, proxy_recruit, proxi);
} }
static void *proxy_reconnect(void *arg)
{
proxy_instance_t *proxy = (proxy_instance_t *)arg;
server_instance_t *si = proxy->si;
connsock_t *cs = proxy->cs;
ckpool_t *ckp = proxy->ckp;
pthread_detach(pthread_self());
proxy_alive(ckp, si, proxy, cs, true, proxy->epfd);
return NULL;
}
/* For reconnecting the parent proxy instance async */
static void reconnect_proxy(proxy_instance_t *proxi)
{
pthread_t pth;
create_pthread(&pth, proxy_reconnect, proxi);
}
static void reconnect_generator(const ckpool_t *ckp)
{
send_proc(ckp->generator, "reconnect");
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -1722,7 +1747,7 @@ static void *passthrough_recv(void *arg)
} }
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
@ -1734,12 +1759,12 @@ static void *passthrough_recv(void *arg)
while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) {
if (alive) { if (alive) {
alive = false; alive = false;
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
} }
sleep(5); sleep(5);
} }
if (!alive) if (!alive)
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
/* Make sure we receive a line within 90 seconds */ /* Make sure we receive a line within 90 seconds */
ret = epoll_wait(epfd, &event, 1, 90000); ret = epoll_wait(epfd, &event, 1, 90000);
@ -1749,7 +1774,7 @@ static void *passthrough_recv(void *arg)
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
alive = proxi->alive = false; alive = proxi->alive = false;
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
continue; continue;
} }
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
@ -1771,6 +1796,23 @@ static proxy_instance_t *current_proxy(gdata_t *gdata)
return ret; return ret;
} }
static bool subproxies_alive(proxy_instance_t *proxy)
{
proxy_instance_t *subproxy, *tmp;
bool ret = false;
mutex_lock(&proxy->proxy_lock);
HASH_ITER(sh, proxy->subproxies, subproxy, tmp) {
if (subproxy->alive) {
ret = true;
break;
}
}
mutex_unlock(&proxy->proxy_lock);
return ret;
}
/* For receiving messages from the upstream proxy, also responsible for setting /* For receiving messages from the upstream proxy, also responsible for setting
* up the connection and testing it's alive. */ * up the connection and testing it's alive. */
static void *proxy_recv(void *arg) static void *proxy_recv(void *arg)
@ -1793,7 +1835,7 @@ static void *proxy_recv(void *arg)
} }
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
@ -1806,20 +1848,25 @@ static void *proxy_recv(void *arg)
time_t now; time_t now;
int ret; int ret;
while (!proxy_alive(ckp, si, proxi, proxi->cs, true, epfd)) { if (!proxi->alive) {
while (!subproxies_alive(proxi)) {
reconnect_proxy(proxi);
if (alive) { if (alive) {
LOGWARNING("Proxy %d:%s failed, attempting reconnect",
proxi->id, proxi->si->url);
alive = false; alive = false;
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
} }
sleep(5); sleep(5);
proxi->reconnect_time = time(NULL); proxi->reconnect_time = time(NULL);
} }
}
/* Wait 30 seconds before declaring this upstream pool alive /* Wait 30 seconds before declaring this upstream pool alive
* to prevent switching to unstable pools. */ * to prevent switching to unstable pools. */
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url);
proxi->reconnect_time = 0; proxi->reconnect_time = 0;
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
alive = true; alive = true;
} }
@ -1855,13 +1902,11 @@ static void *proxy_recv(void *arg)
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, 5);
} }
if (ret < 1) { if (ret < 1) {
if (parent_proxy(subproxy)) { subproxy->alive = false;
proxi->reconnect_time = time(NULL); if (!parent_proxy(subproxy))
alive = false; recruit_subproxy(proxi);
LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv, attempting reconnect",
subproxy->id, subproxy->si->url); proxi->id, subproxy->subid, subproxy->si->url);
send_proc(ckp->generator, "reconnect");
}
continue; continue;
} }
if (parse_method(ckp, subproxy, cs->buf)) { if (parse_method(ckp, subproxy, cs->buf)) {
@ -1872,7 +1917,7 @@ static void *proxy_recv(void *arg)
subproxy->alive = false; subproxy->alive = false;
disable_subproxy(gdata, proxi, subproxy); disable_subproxy(gdata, proxi, subproxy);
if (parent_proxy(subproxy)) { if (parent_proxy(subproxy)) {
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
subproxy->id, subproxy->si->url); subproxy->id, subproxy->si->url);
break; break;
@ -1933,7 +1978,7 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->proxies, proxi, tmp) { HASH_ITER(hh, gdata->proxies, proxi, tmp) {
if (proxi->alive) { if (proxi->alive || subproxies_alive(proxi)) {
if (!ret || proxi->id < ret->id) if (!ret || proxi->id < ret->id)
ret = proxi; ret = proxi;
} }
@ -1989,14 +2034,6 @@ retry:
} }
} while (selret < 1); } while (selret < 1);
if (unlikely(proxi->cs->fd < 0)) {
LOGWARNING("Upstream proxy %d:%s socket invalidated, will attempt failover",
proxi->id, proxi->cs->url);
proxi->alive = false;
proxi = NULL;
goto reconnect;
}
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGEMERG("Failed to accept on proxy socket"); LOGEMERG("Failed to accept on proxy socket");
@ -2160,7 +2197,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "reconnect"); reconnect_generator(ckp);
} }
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)

Loading…
Cancel
Save