diff --git a/src/generator.c b/src/generator.c index 73503436..867bf5d2 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1701,6 +1701,31 @@ static void recruit_subproxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_recruit, proxi); } +static void *proxy_reconnect(void *arg) +{ + proxy_instance_t *proxy = (proxy_instance_t *)arg; + server_instance_t *si = proxy->si; + connsock_t *cs = proxy->cs; + ckpool_t *ckp = proxy->ckp; + + pthread_detach(pthread_self()); + proxy_alive(ckp, si, proxy, cs, true, proxy->epfd); + return NULL; +} + +/* For reconnecting the parent proxy instance async */ +static void reconnect_proxy(proxy_instance_t *proxi) +{ + pthread_t pth; + + create_pthread(&pth, proxy_reconnect, proxi); +} + +static void reconnect_generator(const ckpool_t *ckp) +{ + send_proc(ckp->generator, "reconnect"); +} + /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -1722,7 +1747,7 @@ static void *passthrough_recv(void *arg) } if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1734,12 +1759,12 @@ static void *passthrough_recv(void *arg) while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { if (alive) { alive = false; - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); } sleep(5); } if (!alive) - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); /* Make sure we receive a line within 90 seconds */ ret = epoll_wait(epfd, &event, 1, 90000); @@ -1749,7 +1774,7 @@ static void *passthrough_recv(void *arg) LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", proxi->id, proxi->si->url); alive = proxi->alive = false; - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); continue; } /* Simply forward the message on, as is, to the connector to @@ -1771,6 +1796,23 @@ static proxy_instance_t *current_proxy(gdata_t *gdata) return ret; } +static bool subproxies_alive(proxy_instance_t *proxy) +{ + proxy_instance_t *subproxy, *tmp; + bool ret = false; + + mutex_lock(&proxy->proxy_lock); + HASH_ITER(sh, proxy->subproxies, subproxy, tmp) { + if (subproxy->alive) { + ret = true; + break; + } + } + mutex_unlock(&proxy->proxy_lock); + + return ret; +} + /* For receiving messages from the upstream proxy, also responsible for setting * up the connection and testing it's alive. */ static void *proxy_recv(void *arg) @@ -1793,7 +1835,7 @@ static void *proxy_recv(void *arg) } if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1806,20 +1848,25 @@ static void *proxy_recv(void *arg) time_t now; int ret; - while (!proxy_alive(ckp, si, proxi, proxi->cs, true, epfd)) { - if (alive) { - alive = false; - send_proc(ckp->generator, "reconnect"); + if (!proxi->alive) { + while (!subproxies_alive(proxi)) { + reconnect_proxy(proxi); + if (alive) { + LOGWARNING("Proxy %d:%s failed, attempting reconnect", + proxi->id, proxi->si->url); + alive = false; + reconnect_generator(ckp); + } + sleep(5); + proxi->reconnect_time = time(NULL); } - sleep(5); - proxi->reconnect_time = time(NULL); } /* Wait 30 seconds before declaring this upstream pool alive * to prevent switching to unstable pools. */ if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); proxi->reconnect_time = 0; - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); alive = true; } @@ -1855,13 +1902,11 @@ static void *proxy_recv(void *arg) ret = read_socket_line(cs, 5); } if (ret < 1) { - if (parent_proxy(subproxy)) { - proxi->reconnect_time = time(NULL); - alive = false; - LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", - subproxy->id, subproxy->si->url); - send_proc(ckp->generator, "reconnect"); - } + subproxy->alive = false; + if (!parent_proxy(subproxy)) + recruit_subproxy(proxi); + LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", + proxi->id, subproxy->subid, subproxy->si->url); continue; } if (parse_method(ckp, subproxy, cs->buf)) { @@ -1872,7 +1917,7 @@ static void *proxy_recv(void *arg) subproxy->alive = false; disable_subproxy(gdata, proxi, subproxy); if (parent_proxy(subproxy)) { - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", subproxy->id, subproxy->si->url); break; @@ -1933,7 +1978,7 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { - if (proxi->alive) { + if (proxi->alive || subproxies_alive(proxi)) { if (!ret || proxi->id < ret->id) ret = proxi; } @@ -1989,14 +2034,6 @@ retry: } } while (selret < 1); - if (unlikely(proxi->cs->fd < 0)) { - LOGWARNING("Upstream proxy %d:%s socket invalidated, will attempt failover", - proxi->id, proxi->cs->url); - proxi->alive = false; - proxi = NULL; - goto reconnect; - } - sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on proxy socket"); @@ -2160,7 +2197,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "reconnect"); + reconnect_generator(ckp); } int generator(proc_instance_t *pi)