diff --git a/src/generator.c b/src/generator.c index acd61fcf..8cba9777 100644 --- a/src/generator.c +++ b/src/generator.c @@ -644,7 +644,7 @@ retry: goto out; } proxi->nonce2len = size; - if (!proxi->proxy) { + if (proxi->proxy == proxi) { /* Set the number of clients per proxy on the parent proxy */ proxi->clients_per_proxy = 1ll << ((size - 3) * 8); LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url, @@ -1370,6 +1370,8 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) ckmsgq_add(proxi->passsends, pm); } +static bool recruit_subproxy(proxy_instance_t *proxi, int epfd); + static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging, int epfd) { @@ -1428,10 +1430,57 @@ out: /* Add this connsock_t to the epoll list */ if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive"); + if (!ckp->passthrough && proxi->proxy == proxi) { + while (proxi->client_headroom < 42) { + /* Note recursive call of proxy_alive here */ + if (!recruit_subproxy(proxi, epfd)) { + LOGWARNING("Unable to recruit extra subproxies after just %"PRId64, + proxi->client_headroom); + break; + } + LOGWARNING("Proxy %d:%s recruited extra subproxy!", + proxi->id, cs->url); + } + } } return ret; } +/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring + * fields we don't use in the subproxy. */ +static proxy_instance_t *create_subproxy(proxy_instance_t *proxi) +{ + proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t)); + + subproxy->ckp = proxi->ckp; + subproxy->cs = ckzalloc(sizeof(connsock_t)); + subproxy->si = proxi->si; + subproxy->id = proxi->subproxy_count++; + subproxy->auth = proxi->auth; + subproxy->pass = proxi->pass; + subproxy->proxy = proxi; + return subproxy; +} + +static bool recruit_subproxy(proxy_instance_t *proxi, int epfd) +{ + proxy_instance_t *subproxy = create_subproxy(proxi); + + if (!proxy_alive(subproxy->ckp, subproxy->si, subproxy, subproxy->cs, false, epfd)) { + LOGNOTICE("Subproxy failed proxy_alive testing"); + free(subproxy->cs); + free(subproxy); + return false; + } + + mutex_lock(&proxi->proxy_lock); + HASH_ADD_INT(proxi->subproxies, id, subproxy); + proxi->client_headroom += proxi->clients_per_proxy; + mutex_unlock(&proxi->proxy_lock); + + return true; +} + /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -1537,10 +1586,11 @@ static void *proxy_recv(void *arg) while (42) { notify_instance_t *ni, *tmp; share_msg_t *share, *tmpshare; + proxy_instance_t *subproxy; time_t now; int ret; - while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { + while (!proxy_alive(ckp, si, proxi, proxi->cs, true, epfd)) { if (proxi->alive) { proxi->alive = false; send_proc(ckp->generator, "reconnect"); @@ -1584,8 +1634,11 @@ static void *proxy_recv(void *arg) /* If we don't get an update within 10 minutes the upstream pool * has likely stopped responding. */ ret = epoll_wait(epfd, &event, 1, 600000); - if (likely(ret > 0)) + if (likely(ret > 0)) { + subproxy = event.data.ptr; + cs = subproxy->cs; ret = read_socket_line(cs, 5); + } if (ret < 1) { if (proxi->alive) { LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", @@ -1617,30 +1670,10 @@ static void *proxy_recv(void *arg) return NULL; } -/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring - * fields we don't use in the subproxy. */ -static proxy_instance_t *create_subproxy(proxy_instance_t *proxi) -{ - proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t)); - - subproxy->ckp = proxi->ckp; - subproxy->cs = ckzalloc(sizeof(connsock_t)); - subproxy->si = proxi->si; - subproxy->id = proxi->subproxy_count++; - subproxy->auth = proxi->auth; - subproxy->pass = proxi->pass; - subproxy->proxy = proxi; - return subproxy; -} - -/* Create a single subproxy instance immediately to be the first used - * by the stratifier. To be used in future code */ static void prepare_proxy(proxy_instance_t *proxi) { - proxy_instance_t *subproxy = create_subproxy(proxi); - + proxi->proxy = proxi; mutex_init(&proxi->proxy_lock); - HASH_ADD_INT(proxi->subproxies, id, subproxy); mutex_init(&proxi->psend_lock); cond_init(&proxi->psend_cond); create_pthread(&proxi->pth_psend, proxy_send, proxi);