From bf79e0793d7b72dc35e7aa1e4c9a100c9121e8b7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Mar 2015 21:00:50 +1100 Subject: [PATCH] Recruit extra user subproxies when there aren't enough --- src/generator.c | 17 +++++++++---- src/stratifier.c | 64 ++++++++++++++++++++++++++++++++++++------------ 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/generator.c b/src/generator.c index 5a1fac58..79a02dce 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1779,12 +1779,19 @@ static void recruit_subproxies(proxy_instance_t *proxi, const int recruits) } /* Queue up to the requested amount */ -static void recruit_subproxy(proxy_instance_t *proxi, const char *buf) +static void recruit_subproxy(gdata_t *gdata, const char *buf) { - int recruits = 1; + int recruits = 1, id = 0; + proxy_instance_t *proxy; - sscanf(buf, "recruit=%d", &recruits); - recruit_subproxies(proxi, recruits); + sscanf(buf, "recruit=%d:%d", &id, &recruits); + proxy = proxy_by_id(gdata, id); + if (unlikely(!proxy)) { + LOGNOTICE("Generator failed to find proxy id %d to recruit subproxies", + id); + return; + } + recruit_subproxies(proxy, recruits); } static void *proxy_reconnect(void *arg) @@ -2442,7 +2449,7 @@ retry: LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); } else if (cmdmatch(buf, "recruit")) { - recruit_subproxy(proxi, buf); + recruit_subproxy(gdata, buf); } else if (cmdmatch(buf, "dropproxy")) { drop_proxy(gdata, buf); } else { diff --git a/src/stratifier.c b/src/stratifier.c index a7850c7c..2058ebc1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1295,15 +1295,17 @@ out_unlock: return headroom; } -static int64_t userproxy_headroom(sdata_t *sdata, const int userid) +static int64_t proxy_headroom(sdata_t *sdata, const int userid) { proxy_t *proxy, *subproxy, *tmp, *subtmp; int64_t headroom = 0; mutex_lock(&sdata->proxy_lock); HASH_ITER(hh, sdata->proxies, proxy, tmp) { - if (proxy->userid != userid) + if (proxy->userid < userid) continue; + if (proxy->userid > userid) + break; HASH_ITER(sh, proxy->subproxies, subproxy, subtmp) { if (subproxy->dead) continue; @@ -1317,11 +1319,11 @@ static int64_t userproxy_headroom(sdata_t *sdata, const int userid) static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); -static void generator_recruit(const ckpool_t *ckp, const int recruits) +static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int recruits) { char buf[256]; - sprintf(buf, "recruit=%d", recruits); + sprintf(buf, "recruit=%d:%d", proxyid, recruits); LOGINFO("Stratifer requesting %d more proxies from generator", recruits); send_generator(ckp, buf, GEN_PRIORITY); } @@ -1365,7 +1367,7 @@ static void reconnect_clients(sdata_t *sdata) proxy->id); } if (headroom < 0) - generator_recruit(sdata->ckp, -headroom); + generator_recruit(sdata->ckp, proxy->id, -headroom); } static bool __subproxies_alive(proxy_t *proxy) @@ -1414,8 +1416,8 @@ static void check_bestproxy(sdata_t *sdata) static void dead_proxyid(sdata_t *sdata, const int id, const int subid) { + int reconnects = 0, hard = 0, proxyid = 0; stratum_instance_t *client, *tmp; - int reconnects = 0, hard = 0; int64_t headroom; proxy_t *proxy; @@ -1427,6 +1429,8 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid) } LOGINFO("Stratifier dropping clients from proxy %d:%d", id, subid); headroom = current_headroom(sdata, &proxy); + if (proxy) + proxyid = proxy->id; ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { @@ -1445,8 +1449,10 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid) LOGNOTICE("%d clients flagged to reconnect from dead proxy %d:%d", reconnects, id, subid); } + /* When a proxy dies, recruit more of the global proxies for them to + * fail over to in case user proxies are unavailable. */ if (headroom < 0) - generator_recruit(sdata->ckp, -headroom); + generator_recruit(sdata->ckp, proxyid, -headroom); } static void update_subscribe(ckpool_t *ckp, const char *cmd) @@ -1544,17 +1550,38 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) json_decref(val); } +/* Find the highest priority alive proxy belonging to userid and recruit extra + * subproxies. */ +static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int recruits) +{ + proxy_t *proxy, *subproxy, *tmp, *subtmp, *best = NULL; + + mutex_lock(&sdata->proxy_lock); + HASH_ITER(hh, sdata->proxies, proxy, tmp) { + if (proxy->userid < userid) + continue; + if (proxy->userid > userid) + break; + HASH_ITER(sh, proxy->subproxies, subproxy, subtmp) { + if (subproxy->dead) + continue; + best = proxy; + } + } + mutex_unlock(&sdata->proxy_lock); + + if (best) + generator_recruit(sdata->ckp, proxy->id, recruits); +} + /* Check how much headroom the userid proxies have and reconnect any clients * that are not bound to it */ static void check_userproxies(sdata_t *sdata, const int userid) { - int64_t headroom = userproxy_headroom(sdata, userid); + int64_t headroom = proxy_headroom(sdata, userid); stratum_instance_t *client, *tmpclient; int reconnects = 0, hard = 0; - if (!headroom) - return; - ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { if (client->user_id != userid) @@ -1576,7 +1603,8 @@ static void check_userproxies(sdata_t *sdata, const int userid) LOGNOTICE("%d clients flagged for reconnect to user %d proxies", reconnects, userid); } - /* FIXME: Recruit extra user proxies when headroom < 0 */ + if (headroom < 0) + recruit_best_userproxy(sdata, userid, -headroom); } static void update_notify(ckpool_t *ckp, const char *cmd) @@ -2386,7 +2414,7 @@ static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client) LOGNOTICE("Reconnecting client %"PRId64, client->id); reconnect_client(sdata, client); } else { - generator_recruit(sdata->ckp, -headroom); + generator_recruit(sdata->ckp, proxy->id, -headroom); if (!client->reconnect) { LOGNOTICE("Flagging client %"PRId64, client->id); client->reconnect = true; @@ -2823,8 +2851,12 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int LOGWARNING("Temporarily insufficient subproxies to accept more clients"); return NULL; } - if (best->id != current->id || current->headroom < 2) - generator_recruit(ckp, 1); + if (best->global && (best->id != current->id || current->headroom < 2)) + generator_recruit(ckp, current->id, 1); + else if (userid) { + if (proxy_headroom(ckp_sdata, userid) < 2) + generator_recruit(ckp, best->id, 1); + } return best->sdata; } @@ -3494,7 +3526,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ if (client->sdata && client->sdata->proxy && client->sdata->proxy->global) { sdata_t *ckp_sdata = ckp->data; - if (userproxy_headroom(ckp_sdata, client->user_id)) + if (proxy_headroom(ckp_sdata, client->user_id)) client->reconnect = true; } } else {