From a6ac868cd305490f10f982d556f53f72e1b50b5e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 24 Feb 2015 21:16:37 +1100 Subject: [PATCH] Rework subproxy recruitment to allow requests to stack but abandon them if the subproxy is not alive --- src/generator.c | 35 +++++++++++++++++++++++++------- src/stratifier.c | 52 +++++++++--------------------------------------- 2 files changed, 37 insertions(+), 50 deletions(-) diff --git a/src/generator.c b/src/generator.c index 06111319..a873eb08 100644 --- a/src/generator.c +++ b/src/generator.c @@ -108,7 +108,7 @@ struct proxy_instance { bool disabled; /* Subproxy no longer to be used */ bool reconnect; /* We need to drop and reconnect */ bool reconnecting; /* Testing in progress */ - bool recruiting; /* Recruiting in progress */ + int recruit; /* Recruiting in progress */ bool alive; mutex_t notify_lock; @@ -1707,27 +1707,48 @@ static void *proxy_recruit(void *arg) proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg; ckpool_t *ckp = parent->ckp; gdata_t *gdata = ckp->data; + bool recruit, alive; pthread_detach(pthread_self()); +retry: + recruit = false; proxy = create_subproxy(gdata, parent); - if (!proxy_alive(ckp, proxy->si, proxy, proxy->cs, false, parent->epfd)) { + alive = proxy_alive(ckp, proxy->si, proxy, proxy->cs, false, parent->epfd); + if (!alive) { LOGNOTICE("Subproxy failed proxy_alive testing"); store_proxy(gdata, proxy); } else add_subproxy(parent, proxy); - parent->recruiting = false; + + mutex_lock(&parent->proxy_lock); + if (alive) { + if (--parent->recruit > 0) + recruit = true; + } + mutex_unlock(&parent->proxy_lock); + + if (recruit) + goto retry; + return NULL; } +/* Allow up to 42 recruit requests to accumulate */ static void recruit_subproxy(proxy_instance_t *proxi) { + bool recruit = false; pthread_t pth; - if (proxi->recruiting) - return; - proxi->recruiting = true; - create_pthread(&pth, proxy_recruit, proxi); + mutex_lock(&proxi->proxy_lock); + if (!proxi->recruit++ > 0) + recruit = true; + else if (proxi->recruit > 42) + proxi->recruit = 42; + mutex_unlock(&proxi->proxy_lock); + + if (recruit) + create_pthread(&pth, proxy_recruit, proxi); } static void *proxy_reconnect(void *arg) diff --git a/src/stratifier.c b/src/stratifier.c index c154713e..12e1a8a5 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1160,10 +1160,9 @@ static void reconnect_clients(sdata_t *sdata) HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { if (client->proxyid == proxy->id) continue; - if (headroom < 1) - break; - headroom--; reconnects++; + if (headroom-- < 1) + break; if (client->reconnect) reconnect_client(sdata, client); else @@ -1174,9 +1173,9 @@ static void reconnect_clients(sdata_t *sdata) if (reconnects) { LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects, proxy->id); + if (headroom < 42) + generator_recruit(sdata->ckp); } - if (headroom < 42) - generator_recruit(sdata->ckp); } #if 0 @@ -1209,10 +1208,9 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid) HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->proxyid != id || client->subproxyid != subid) continue; - if (headroom < 1) - break; - headroom--; reconnects++; + if (headroom-- < 1) + break; if (client->reconnect) reconnect_client(sdata, client); else @@ -1223,43 +1221,15 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid) if (reconnects) { LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, id, subid); + if (headroom < 42) + generator_recruit(sdata->ckp); } - if (headroom < 42) - generator_recruit(sdata->ckp); -} - -static void reassess_headroom(sdata_t *sdata, const proxy_t *proxy) -{ - stratum_instance_t *client, *tmpclient; - proxy_t *subproxy, *tmp; - int64_t headroom = 0; - - mutex_lock(&sdata->proxy_lock); - HASH_ITER(sh, proxy->subproxies, subproxy, tmp) { - if (subproxy->dead) - continue; - headroom += subproxy->max_clients - subproxy->clients; - } - mutex_unlock(&sdata->proxy_lock); - - ck_rlock(&sdata->instance_lock); - HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { - if (client->dropped) - continue; - if (client->reconnect || client->proxyid != proxy->id) - headroom--; - } - ck_runlock(&sdata->instance_lock); - - if (headroom < 1) - generator_recruit(sdata->ckp); } static void update_subscribe(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->data, *dsdata; proxy_t *proxy, *old = NULL; - bool current = false; const char *buf; int64_t id = 0; int subid = 0; @@ -1326,14 +1296,10 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) /* Is this a replacement proxy for the current one */ mutex_lock(&sdata->proxy_lock); - if (sdata->proxy && sdata->proxy->low_id == proxy->low_id && !proxy->subid) { - current = true; + if (sdata->proxy && sdata->proxy->low_id == proxy->low_id && !proxy->subid) sdata->proxy = proxy; - } mutex_unlock(&sdata->proxy_lock); - if (current) - reassess_headroom(sdata, proxy); if (subid) { LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64, id, subid, proxy->nonce2len, proxy->max_clients);