Browse Source

Recruit extra user subproxies when there aren't enough

master
Con Kolivas 10 years ago
parent
commit
bf79e0793d
  1. 17
      src/generator.c
  2. 64
      src/stratifier.c

17
src/generator.c

@ -1779,12 +1779,19 @@ static void recruit_subproxies(proxy_instance_t *proxi, const int recruits)
} }
/* Queue up to the requested amount */ /* Queue up to the requested amount */
static void recruit_subproxy(proxy_instance_t *proxi, const char *buf) static void recruit_subproxy(gdata_t *gdata, const char *buf)
{ {
int recruits = 1; int recruits = 1, id = 0;
proxy_instance_t *proxy;
sscanf(buf, "recruit=%d", &recruits); sscanf(buf, "recruit=%d:%d", &id, &recruits);
recruit_subproxies(proxi, recruits); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) {
LOGNOTICE("Generator failed to find proxy id %d to recruit subproxies",
id);
return;
}
recruit_subproxies(proxy, recruits);
} }
static void *proxy_reconnect(void *arg) static void *proxy_reconnect(void *arg)
@ -2442,7 +2449,7 @@ retry:
LOGDEBUG("Proxy received ping request"); LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
} else if (cmdmatch(buf, "recruit")) { } else if (cmdmatch(buf, "recruit")) {
recruit_subproxy(proxi, buf); recruit_subproxy(gdata, buf);
} else if (cmdmatch(buf, "dropproxy")) { } else if (cmdmatch(buf, "dropproxy")) {
drop_proxy(gdata, buf); drop_proxy(gdata, buf);
} else { } else {

64
src/stratifier.c

@ -1295,15 +1295,17 @@ out_unlock:
return headroom; return headroom;
} }
static int64_t userproxy_headroom(sdata_t *sdata, const int userid) static int64_t proxy_headroom(sdata_t *sdata, const int userid)
{ {
proxy_t *proxy, *subproxy, *tmp, *subtmp; proxy_t *proxy, *subproxy, *tmp, *subtmp;
int64_t headroom = 0; int64_t headroom = 0;
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
HASH_ITER(hh, sdata->proxies, proxy, tmp) { HASH_ITER(hh, sdata->proxies, proxy, tmp) {
if (proxy->userid != userid) if (proxy->userid < userid)
continue; continue;
if (proxy->userid > userid)
break;
HASH_ITER(sh, proxy->subproxies, subproxy, subtmp) { HASH_ITER(sh, proxy->subproxies, subproxy, subtmp) {
if (subproxy->dead) if (subproxy->dead)
continue; continue;
@ -1317,11 +1319,11 @@ static int64_t userproxy_headroom(sdata_t *sdata, const int userid)
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); static void reconnect_client(sdata_t *sdata, stratum_instance_t *client);
static void generator_recruit(const ckpool_t *ckp, const int recruits) static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int recruits)
{ {
char buf[256]; char buf[256];
sprintf(buf, "recruit=%d", recruits); sprintf(buf, "recruit=%d:%d", proxyid, recruits);
LOGINFO("Stratifer requesting %d more proxies from generator", recruits); LOGINFO("Stratifer requesting %d more proxies from generator", recruits);
send_generator(ckp, buf, GEN_PRIORITY); send_generator(ckp, buf, GEN_PRIORITY);
} }
@ -1365,7 +1367,7 @@ static void reconnect_clients(sdata_t *sdata)
proxy->id); proxy->id);
} }
if (headroom < 0) if (headroom < 0)
generator_recruit(sdata->ckp, -headroom); generator_recruit(sdata->ckp, proxy->id, -headroom);
} }
static bool __subproxies_alive(proxy_t *proxy) static bool __subproxies_alive(proxy_t *proxy)
@ -1414,8 +1416,8 @@ static void check_bestproxy(sdata_t *sdata)
static void dead_proxyid(sdata_t *sdata, const int id, const int subid) static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
{ {
int reconnects = 0, hard = 0, proxyid = 0;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0, hard = 0;
int64_t headroom; int64_t headroom;
proxy_t *proxy; proxy_t *proxy;
@ -1427,6 +1429,8 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
} }
LOGINFO("Stratifier dropping clients from proxy %d:%d", id, subid); LOGINFO("Stratifier dropping clients from proxy %d:%d", id, subid);
headroom = current_headroom(sdata, &proxy); headroom = current_headroom(sdata, &proxy);
if (proxy)
proxyid = proxy->id;
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
@ -1445,8 +1449,10 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
LOGNOTICE("%d clients flagged to reconnect from dead proxy %d:%d", reconnects, LOGNOTICE("%d clients flagged to reconnect from dead proxy %d:%d", reconnects,
id, subid); id, subid);
} }
/* When a proxy dies, recruit more of the global proxies for them to
* fail over to in case user proxies are unavailable. */
if (headroom < 0) if (headroom < 0)
generator_recruit(sdata->ckp, -headroom); generator_recruit(sdata->ckp, proxyid, -headroom);
} }
static void update_subscribe(ckpool_t *ckp, const char *cmd) static void update_subscribe(ckpool_t *ckp, const char *cmd)
@ -1544,17 +1550,38 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
json_decref(val); json_decref(val);
} }
/* Find the highest priority alive proxy belonging to userid and recruit extra
* subproxies. */
static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int recruits)
{
proxy_t *proxy, *subproxy, *tmp, *subtmp, *best = NULL;
mutex_lock(&sdata->proxy_lock);
HASH_ITER(hh, sdata->proxies, proxy, tmp) {
if (proxy->userid < userid)
continue;
if (proxy->userid > userid)
break;
HASH_ITER(sh, proxy->subproxies, subproxy, subtmp) {
if (subproxy->dead)
continue;
best = proxy;
}
}
mutex_unlock(&sdata->proxy_lock);
if (best)
generator_recruit(sdata->ckp, proxy->id, recruits);
}
/* Check how much headroom the userid proxies have and reconnect any clients /* Check how much headroom the userid proxies have and reconnect any clients
* that are not bound to it */ * that are not bound to it */
static void check_userproxies(sdata_t *sdata, const int userid) static void check_userproxies(sdata_t *sdata, const int userid)
{ {
int64_t headroom = userproxy_headroom(sdata, userid); int64_t headroom = proxy_headroom(sdata, userid);
stratum_instance_t *client, *tmpclient; stratum_instance_t *client, *tmpclient;
int reconnects = 0, hard = 0; int reconnects = 0, hard = 0;
if (!headroom)
return;
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) {
if (client->user_id != userid) if (client->user_id != userid)
@ -1576,7 +1603,8 @@ static void check_userproxies(sdata_t *sdata, const int userid)
LOGNOTICE("%d clients flagged for reconnect to user %d proxies", LOGNOTICE("%d clients flagged for reconnect to user %d proxies",
reconnects, userid); reconnects, userid);
} }
/* FIXME: Recruit extra user proxies when headroom < 0 */ if (headroom < 0)
recruit_best_userproxy(sdata, userid, -headroom);
} }
static void update_notify(ckpool_t *ckp, const char *cmd) static void update_notify(ckpool_t *ckp, const char *cmd)
@ -2386,7 +2414,7 @@ static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client)
LOGNOTICE("Reconnecting client %"PRId64, client->id); LOGNOTICE("Reconnecting client %"PRId64, client->id);
reconnect_client(sdata, client); reconnect_client(sdata, client);
} else { } else {
generator_recruit(sdata->ckp, -headroom); generator_recruit(sdata->ckp, proxy->id, -headroom);
if (!client->reconnect) { if (!client->reconnect) {
LOGNOTICE("Flagging client %"PRId64, client->id); LOGNOTICE("Flagging client %"PRId64, client->id);
client->reconnect = true; client->reconnect = true;
@ -2823,8 +2851,12 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int
LOGWARNING("Temporarily insufficient subproxies to accept more clients"); LOGWARNING("Temporarily insufficient subproxies to accept more clients");
return NULL; return NULL;
} }
if (best->id != current->id || current->headroom < 2) if (best->global && (best->id != current->id || current->headroom < 2))
generator_recruit(ckp, 1); generator_recruit(ckp, current->id, 1);
else if (userid) {
if (proxy_headroom(ckp_sdata, userid) < 2)
generator_recruit(ckp, best->id, 1);
}
return best->sdata; return best->sdata;
} }
@ -3494,7 +3526,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
if (client->sdata && client->sdata->proxy && client->sdata->proxy->global) { if (client->sdata && client->sdata->proxy && client->sdata->proxy->global) {
sdata_t *ckp_sdata = ckp->data; sdata_t *ckp_sdata = ckp->data;
if (userproxy_headroom(ckp_sdata, client->user_id)) if (proxy_headroom(ckp_sdata, client->user_id))
client->reconnect = true; client->reconnect = true;
} }
} else { } else {

Loading…
Cancel
Save