From 7763b49c2e9b2edeabfa69318772406e343eeaa5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 13 Feb 2015 10:15:48 +1100 Subject: [PATCH] Allow stratifier to choose the best subproxy to attach new users to and request recruitment of more subproxies when the headroom for more clients is low --- src/generator.c | 15 +++++++++++---- src/stratifier.c | 35 +++++++++++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/generator.c b/src/generator.c index a0a60718..baeae613 100644 --- a/src/generator.c +++ b/src/generator.c @@ -125,6 +125,8 @@ struct proxy_instance { time_t reconnect_time; + int epfd; /* Epoll fd used by the parent proxy */ + pthread_mutex_t proxy_lock; /* Lock protecting hashlist of proxies */ int64_t clients_per_proxy; /* How many clients can connect to each subproxy */ int64_t client_headroom; /* How many more clients can we connect */ @@ -1427,7 +1429,7 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) ckmsgq_add(proxi->passsends, pm); } -static bool recruit_subproxy(proxy_instance_t *proxi, int epfd); +static bool recruit_subproxy(proxy_instance_t *proxi); static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging, int epfd) @@ -1488,9 +1490,11 @@ out: if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive"); if (!ckp->passthrough && proxi->proxy == proxi) { + /* We recruit enough proxies to begin with and then + * recruit extra when asked by the stratifier. */ while (proxi->client_headroom < 42) { /* Note recursive call of proxy_alive here */ - if (!recruit_subproxy(proxi, epfd)) { + if (!recruit_subproxy(proxi)) { LOGWARNING("Unable to recruit extra subproxies after just %"PRId64, proxi->client_headroom); break; @@ -1520,9 +1524,10 @@ static proxy_instance_t *create_subproxy(proxy_instance_t *proxi) return subproxy; } -static bool recruit_subproxy(proxy_instance_t *proxi, int epfd) +static bool recruit_subproxy(proxy_instance_t *proxi) { proxy_instance_t *subproxy = create_subproxy(proxi); + int epfd = proxi->epfd; if (!proxy_alive(subproxy->ckp, subproxy->si, subproxy, subproxy->cs, false, epfd)) { LOGNOTICE("Subproxy failed proxy_alive testing"); @@ -1553,7 +1558,7 @@ static void *passthrough_recv(void *arg) rename_proc("passrecv"); - epfd = epoll_create1(EPOLL_CLOEXEC); + proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0){ LOGEMERG("FATAL: Failed to create epoll in passrecv"); return NULL; @@ -1863,6 +1868,8 @@ retry: } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); + } else if (cmdmatch(buf, "recruit")) { + recruit_subproxy(proxi); } else if (ckp->passthrough) { /* Anything remaining should be stratum messages */ passthrough_add_send(proxi, buf); diff --git a/src/stratifier.c b/src/stratifier.c index 484a6fe0..648755d1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -819,7 +819,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio) return buf; } -static void send_generator(ckpool_t *ckp, const char *msg, const int prio) +static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) { sdata_t *sdata = ckp->data; bool set; @@ -1070,6 +1070,7 @@ static proxy_t *__subproxy_by_id(sdata_t *sdata, proxy_t *proxy, const int id) return subproxy; } +#if 0 static proxy_t *proxy_by_id(sdata_t *sdata, const int id) { proxy_t *proxy; @@ -1080,6 +1081,7 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id) return proxy; } +#endif static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid) { @@ -2184,12 +2186,37 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien * running out of room. */ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata) { + proxy_t *proxy, *subproxy, *best = NULL, *tmp; + int64_t headroom = 0, most_headroom = 0; + if (!ckp->proxy || ckp->passthrough) return ckp_sdata; - if (!ckp_sdata->proxy) + proxy = ckp_sdata->proxy; + if (!proxy) { + LOGWARNING("No proxy available yet to generate subscribes"); return NULL; - /* FIXME: Choose a subproxy, not the parent proxy */ - return ckp_sdata->proxy->sdata; + } + mutex_lock(&ckp_sdata->proxy_lock); + HASH_ITER(hh, proxy->subproxies, subproxy, tmp) { + int64_t subproxy_headroom = subproxy->max_clients - subproxy->clients; + + headroom += subproxy_headroom; + if (subproxy_headroom > most_headroom) { + best = subproxy; + most_headroom = subproxy_headroom; + } + } + mutex_unlock(&ckp_sdata->proxy_lock); + + if (headroom < 42) { + LOGNOTICE("Stratifer requesting more proxies from generator"); + send_generator(ckp, "recruit", GEN_PRIORITY); + } + if (!best) { + LOGWARNING("Insufficient subproxies to accept more clients"); + return NULL; + } + return best->sdata; } /* Extranonce1 must be set here. Needs to be entered with client holding a ref