From fe4a25f861986dfe099572c773ec50b41f07f30c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 9 Feb 2015 15:10:14 +1100 Subject: [PATCH] Create rudimentary subproxy structures, moving proxy receives to epoll in order to receive from multiple connections upstream --- src/generator.c | 94 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/src/generator.c b/src/generator.c index 1fd029f1..acd61fcf 100644 --- a/src/generator.c +++ b/src/generator.c @@ -9,6 +9,7 @@ #include "config.h" +#include #include #include #include @@ -81,7 +82,7 @@ struct proxy_instance { connsock_t *cs; server_instance_t *si; bool passthrough; - int id; /* Proxy server id */ + int id; /* Proxy server id, or subproxy id if this is a subproxy */ const char *auth; const char *pass; @@ -123,6 +124,13 @@ struct proxy_instance { char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ time_t reconnect_time; + + pthread_mutex_t proxy_lock; /* Lock protecting hashlist of proxies */ + int64_t clients_per_proxy; /* How many clients can connect to each subproxy */ + int64_t client_headroom; /* How many more clients can we connect */ + proxy_instance_t *proxy; /* Parent proxy of subproxies */ + proxy_instance_t *subproxies; /* Hashlist of subproxies of this proxy */ + int subproxy_count; /* Number of subproxies */ }; /* Private data for the generator */ @@ -630,15 +638,18 @@ retry: LOGWARNING("Invalid nonce2len %d in parse_subscribe", size); goto out; } - if (size == 3 || (size == 4 && proxi->ckp->clientsvspeed)) - LOGWARNING("Proxy %d:%s Nonce2 length %d means proxied clients can't be >5TH each", - proxi->id, proxi->si->url, size); - else if (size < 3) { + if (size < 3) { LOGWARNING("Proxy %d:%s Nonce2 length %d too small to be able to proxy", proxi->id, proxi->si->url, size); goto out; } proxi->nonce2len = size; + if (!proxi->proxy) { + /* Set the number of clients per proxy on the parent proxy */ + proxi->clients_per_proxy = 1ll << ((size - 3) * 8); + LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url, + proxi->clients_per_proxy); + } LOGINFO("Found notify with enonce %s nonce2len %d", proxi->enonce1, proxi->nonce2len); @@ -1360,8 +1371,9 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) } static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, - connsock_t *cs, bool pinging) + connsock_t *cs, bool pinging, int epfd) { + struct epoll_event event; bool ret = false; /* Has this proxy already been reconnected? */ @@ -1411,6 +1423,11 @@ out: keep_sockalive(cs->fd); if (!ckp->passthrough) send_subscribe(ckp, proxi); + event.events = EPOLLIN; + event.data.ptr = proxi; + /* Add this connsock_t to the epoll list */ + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) + quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive"); } return ret; } @@ -1423,10 +1440,18 @@ static void *passthrough_recv(void *arg) server_instance_t *si = proxi->si; connsock_t *cs = proxi->cs; ckpool_t *ckp = proxi->ckp; + struct epoll_event event; + int epfd; rename_proc("passrecv"); - if (proxy_alive(ckp, si, proxi, cs, false)) { + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0){ + LOGEMERG("FATAL: Failed to create epoll in passrecv"); + return NULL; + } + + if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { proxi->alive = true; send_proc(ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", @@ -1436,7 +1461,7 @@ static void *passthrough_recv(void *arg) while (42) { int ret; - while (!proxy_alive(ckp, si, proxi, cs, true)) { + while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { if (proxi->alive) { proxi->alive = false; send_proc(ckp->generator, "reconnect"); @@ -1448,10 +1473,10 @@ static void *passthrough_recv(void *arg) send_proc(ckp->generator, "reconnect"); } - do { + /* Make sure we receive a line within 90 seconds */ + ret = epoll_wait(epfd, &event, 1, 90000); + if (likely(ret > 0)) ret = read_socket_line(cs, 60); - } while (ret == 0); - if (ret < 1) { LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", proxi->id, proxi->si->url); @@ -1491,10 +1516,18 @@ static void *proxy_recv(void *arg) connsock_t *cs = proxi->cs; ckpool_t *ckp = proxi->ckp; gdata_t *gdata = ckp->data; + struct epoll_event event; + int epfd; rename_proc("proxyrecv"); - if (proxy_alive(ckp, si, proxi, cs, false)) { + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0){ + LOGEMERG("FATAL: Failed to create epoll in proxyrecv"); + return NULL; + } + + if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { proxi->alive = true; send_proc(ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", @@ -1504,10 +1537,10 @@ static void *proxy_recv(void *arg) while (42) { notify_instance_t *ni, *tmp; share_msg_t *share, *tmpshare; - int retries = 0, ret; time_t now; + int ret; - while (!proxy_alive(ckp, si, proxi, cs, true)) { + while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) { if (proxi->alive) { proxi->alive = false; send_proc(ckp->generator, "reconnect"); @@ -1550,17 +1583,12 @@ static void *proxy_recv(void *arg) /* If we don't get an update within 10 minutes the upstream pool * has likely stopped responding. */ - do { - if (cs->fd == -1) { - ret = -1; - break; - } + ret = epoll_wait(epfd, &event, 1, 600000); + if (likely(ret > 0)) ret = read_socket_line(cs, 5); - } while (ret == 0 && ++retries < 120); - if (ret < 1) { if (proxi->alive) { - LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", + LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect", proxi->id, proxi->si->url); } continue; @@ -1589,8 +1617,30 @@ static void *proxy_recv(void *arg) return NULL; } +/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring + * fields we don't use in the subproxy. */ +static proxy_instance_t *create_subproxy(proxy_instance_t *proxi) +{ + proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t)); + + subproxy->ckp = proxi->ckp; + subproxy->cs = ckzalloc(sizeof(connsock_t)); + subproxy->si = proxi->si; + subproxy->id = proxi->subproxy_count++; + subproxy->auth = proxi->auth; + subproxy->pass = proxi->pass; + subproxy->proxy = proxi; + return subproxy; +} + +/* Create a single subproxy instance immediately to be the first used + * by the stratifier. To be used in future code */ static void prepare_proxy(proxy_instance_t *proxi) { + proxy_instance_t *subproxy = create_subproxy(proxi); + + mutex_init(&proxi->proxy_lock); + HASH_ADD_INT(proxi->subproxies, id, subproxy); mutex_init(&proxi->psend_lock); cond_init(&proxi->psend_cond); create_pthread(&proxi->pth_psend, proxy_send, proxi);