Browse Source

Recruit extra subproxies to ensure at least 42 client headroom, receiving data from correct socket in proxyrecv

master
Con Kolivas 10 years ago
parent
commit
04d43b3afb
  1. 81
      src/generator.c

81
src/generator.c

@ -644,7 +644,7 @@ retry:
goto out; goto out;
} }
proxi->nonce2len = size; proxi->nonce2len = size;
if (!proxi->proxy) { if (proxi->proxy == proxi) {
/* Set the number of clients per proxy on the parent proxy */ /* Set the number of clients per proxy on the parent proxy */
proxi->clients_per_proxy = 1ll << ((size - 3) * 8); proxi->clients_per_proxy = 1ll << ((size - 3) * 8);
LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url, LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url,
@ -1370,6 +1370,8 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
ckmsgq_add(proxi->passsends, pm); ckmsgq_add(proxi->passsends, pm);
} }
static bool recruit_subproxy(proxy_instance_t *proxi, int epfd);
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging, int epfd) connsock_t *cs, bool pinging, int epfd)
{ {
@ -1428,10 +1430,57 @@ out:
/* Add this connsock_t to the epoll list */ /* Add this connsock_t to the epoll list */
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1))
quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive"); quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive");
if (!ckp->passthrough && proxi->proxy == proxi) {
while (proxi->client_headroom < 42) {
/* Note recursive call of proxy_alive here */
if (!recruit_subproxy(proxi, epfd)) {
LOGWARNING("Unable to recruit extra subproxies after just %"PRId64,
proxi->client_headroom);
break;
}
LOGWARNING("Proxy %d:%s recruited extra subproxy!",
proxi->id, cs->url);
}
}
} }
return ret; return ret;
} }
/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring
* fields we don't use in the subproxy. */
static proxy_instance_t *create_subproxy(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t));
subproxy->ckp = proxi->ckp;
subproxy->cs = ckzalloc(sizeof(connsock_t));
subproxy->si = proxi->si;
subproxy->id = proxi->subproxy_count++;
subproxy->auth = proxi->auth;
subproxy->pass = proxi->pass;
subproxy->proxy = proxi;
return subproxy;
}
static bool recruit_subproxy(proxy_instance_t *proxi, int epfd)
{
proxy_instance_t *subproxy = create_subproxy(proxi);
if (!proxy_alive(subproxy->ckp, subproxy->si, subproxy, subproxy->cs, false, epfd)) {
LOGNOTICE("Subproxy failed proxy_alive testing");
free(subproxy->cs);
free(subproxy);
return false;
}
mutex_lock(&proxi->proxy_lock);
HASH_ADD_INT(proxi->subproxies, id, subproxy);
proxi->client_headroom += proxi->clients_per_proxy;
mutex_unlock(&proxi->proxy_lock);
return true;
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -1537,10 +1586,11 @@ static void *proxy_recv(void *arg)
while (42) { while (42) {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
proxy_instance_t *subproxy;
time_t now; time_t now;
int ret; int ret;
while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { while (!proxy_alive(ckp, si, proxi, proxi->cs, true, epfd)) {
if (proxi->alive) { if (proxi->alive) {
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
@ -1584,8 +1634,11 @@ static void *proxy_recv(void *arg)
/* If we don't get an update within 10 minutes the upstream pool /* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */ * has likely stopped responding. */
ret = epoll_wait(epfd, &event, 1, 600000); ret = epoll_wait(epfd, &event, 1, 600000);
if (likely(ret > 0)) if (likely(ret > 0)) {
subproxy = event.data.ptr;
cs = subproxy->cs;
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, 5);
}
if (ret < 1) { if (ret < 1) {
if (proxi->alive) { if (proxi->alive) {
LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect",
@ -1617,30 +1670,10 @@ static void *proxy_recv(void *arg)
return NULL; return NULL;
} }
/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring
* fields we don't use in the subproxy. */
static proxy_instance_t *create_subproxy(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t));
subproxy->ckp = proxi->ckp;
subproxy->cs = ckzalloc(sizeof(connsock_t));
subproxy->si = proxi->si;
subproxy->id = proxi->subproxy_count++;
subproxy->auth = proxi->auth;
subproxy->pass = proxi->pass;
subproxy->proxy = proxi;
return subproxy;
}
/* Create a single subproxy instance immediately to be the first used
* by the stratifier. To be used in future code */
static void prepare_proxy(proxy_instance_t *proxi) static void prepare_proxy(proxy_instance_t *proxi)
{ {
proxy_instance_t *subproxy = create_subproxy(proxi); proxi->proxy = proxi;
mutex_init(&proxi->proxy_lock); mutex_init(&proxi->proxy_lock);
HASH_ADD_INT(proxi->subproxies, id, subproxy);
mutex_init(&proxi->psend_lock); mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond); cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi); create_pthread(&proxi->pth_psend, proxy_send, proxi);

Loading…
Cancel
Save