Browse Source

Make the stratifier tell the generator precisely how many extra connections we need

master
Con Kolivas 10 years ago
parent
commit
918a6ebe0c
  1. 18
      src/generator.c
  2. 33
      src/stratifier.c

18
src/generator.c

@ -1734,17 +1734,20 @@ retry:
return NULL; return NULL;
} }
/* Allow up to 42 recruit requests to accumulate */ /* Queue up to the requested amount */
static void recruit_subproxy(proxy_instance_t *proxi) static void recruit_subproxy(proxy_instance_t *proxi, const char *buf)
{ {
bool recruit = false; bool recruit = false;
int recruits = 1;
pthread_t pth; pthread_t pth;
sscanf(buf, "recruit=%d", &recruits);
mutex_lock(&proxi->proxy_lock); mutex_lock(&proxi->proxy_lock);
if (!proxi->recruit++ > 0) if (!proxi->recruit)
recruit = true; recruit = true;
else if (proxi->recruit > 42) if (proxi->recruit < recruits)
proxi->recruit = 42; proxi->recruit = recruits;
mutex_unlock(&proxi->proxy_lock); mutex_unlock(&proxi->proxy_lock);
if (recruit) if (recruit)
@ -1972,8 +1975,7 @@ static void *proxy_recv(void *arg)
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
subproxy->low_id, subproxy->si->url); subproxy->low_id, subproxy->si->url);
break; break;
} else }
recruit_subproxy(proxi);
} }
continue; continue;
} }
@ -2115,7 +2117,7 @@ retry:
LOGDEBUG("Proxy received ping request"); LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
} else if (cmdmatch(buf, "recruit")) { } else if (cmdmatch(buf, "recruit")) {
recruit_subproxy(proxi); recruit_subproxy(proxi, buf);
} else if (cmdmatch(buf, "dropproxy")) { } else if (cmdmatch(buf, "dropproxy")) {
drop_proxy(gdata, buf); drop_proxy(gdata, buf);
} else if (ckp->passthrough) { } else if (ckp->passthrough) {

33
src/stratifier.c

@ -1136,10 +1136,13 @@ out_unlock:
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); static void reconnect_client(sdata_t *sdata, stratum_instance_t *client);
static void generator_recruit(ckpool_t *ckp) static void generator_recruit(ckpool_t *ckp, const int recruits)
{ {
LOGINFO("Stratifer requesting more proxies from generator"); char buf[256];
send_generator(ckp, "recruit", GEN_PRIORITY);
sprintf(buf, "recruit=%d", recruits);
LOGINFO("Stratifer requesting %d more proxies from generator", recruits);
send_generator(ckp, buf, GEN_PRIORITY);
} }
/* Find how much headroom we have and connect up to that many clients that are /* Find how much headroom we have and connect up to that many clients that are
@ -1160,9 +1163,9 @@ static void reconnect_clients(sdata_t *sdata)
HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) {
if (client->proxyid == proxy->id) if (client->proxyid == proxy->id)
continue; continue;
reconnects++;
if (headroom-- < 1) if (headroom-- < 1)
break; continue;
reconnects++;
if (client->reconnect) if (client->reconnect)
reconnect_client(sdata, client); reconnect_client(sdata, client);
else else
@ -1173,9 +1176,9 @@ static void reconnect_clients(sdata_t *sdata)
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects, LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects,
proxy->id); proxy->id);
if (headroom < 42)
generator_recruit(sdata->ckp);
} }
if (headroom < 0)
generator_recruit(sdata->ckp, -headroom);
} }
#if 0 #if 0
@ -1208,9 +1211,9 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != id || client->subproxyid != subid) if (client->proxyid != id || client->subproxyid != subid)
continue; continue;
reconnects++;
if (headroom-- < 1) if (headroom-- < 1)
break; continue;
reconnects++;
if (client->reconnect) if (client->reconnect)
reconnect_client(sdata, client); reconnect_client(sdata, client);
else else
@ -1221,9 +1224,9 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects,
id, subid); id, subid);
if (headroom < 42)
generator_recruit(sdata->ckp);
} }
if (headroom < 0)
generator_recruit(sdata->ckp, -headroom);
} }
static void update_subscribe(ckpool_t *ckp, const char *cmd) static void update_subscribe(ckpool_t *ckp, const char *cmd)
@ -2239,11 +2242,11 @@ static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client)
headroom = current_headroom(sdata, &proxy); headroom = current_headroom(sdata, &proxy);
if (!proxy) if (!proxy)
return; return;
if (headroom > 0) { if (headroom-- > 0) {
LOGNOTICE("Reconnecting client %"PRId64, client->id); LOGNOTICE("Reconnecting client %"PRId64, client->id);
reconnect_client(sdata, client); reconnect_client(sdata, client);
} else { } else {
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp, -headroom);
if (!client->reconnect) { if (!client->reconnect) {
LOGNOTICE("Flagging client %"PRId64, client->id); LOGNOTICE("Flagging client %"PRId64, client->id);
client->reconnect = true; client->reconnect = true;
@ -2574,8 +2577,8 @@ static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata)
} }
mutex_unlock(&ckp_sdata->proxy_lock); mutex_unlock(&ckp_sdata->proxy_lock);
if (best_id != current->id || current->headroom < 42) if (best_id != current->id || current->headroom < 2)
generator_recruit(ckp); generator_recruit(ckp, 1);
if (best_id == ckp_sdata->proxy_count) { if (best_id == ckp_sdata->proxy_count) {
LOGNOTICE("Temporarily insufficient subproxies to accept more clients"); LOGNOTICE("Temporarily insufficient subproxies to accept more clients");
return NULL; return NULL;

Loading…
Cancel
Save