Browse Source

Rework subproxy recruitment to allow requests to stack but abandon them if the subproxy is not alive

master
Con Kolivas 10 years ago
parent
commit
a6ac868cd3
  1. 33
      src/generator.c
  2. 46
      src/stratifier.c

33
src/generator.c

@ -108,7 +108,7 @@ struct proxy_instance {
bool disabled; /* Subproxy no longer to be used */ bool disabled; /* Subproxy no longer to be used */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool reconnecting; /* Testing in progress */ bool reconnecting; /* Testing in progress */
bool recruiting; /* Recruiting in progress */ int recruit; /* Recruiting in progress */
bool alive; bool alive;
mutex_t notify_lock; mutex_t notify_lock;
@ -1707,26 +1707,47 @@ static void *proxy_recruit(void *arg)
proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg; proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg;
ckpool_t *ckp = parent->ckp; ckpool_t *ckp = parent->ckp;
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
bool recruit, alive;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
retry:
recruit = false;
proxy = create_subproxy(gdata, parent); proxy = create_subproxy(gdata, parent);
if (!proxy_alive(ckp, proxy->si, proxy, proxy->cs, false, parent->epfd)) { alive = proxy_alive(ckp, proxy->si, proxy, proxy->cs, false, parent->epfd);
if (!alive) {
LOGNOTICE("Subproxy failed proxy_alive testing"); LOGNOTICE("Subproxy failed proxy_alive testing");
store_proxy(gdata, proxy); store_proxy(gdata, proxy);
} else } else
add_subproxy(parent, proxy); add_subproxy(parent, proxy);
parent->recruiting = false;
mutex_lock(&parent->proxy_lock);
if (alive) {
if (--parent->recruit > 0)
recruit = true;
}
mutex_unlock(&parent->proxy_lock);
if (recruit)
goto retry;
return NULL; return NULL;
} }
/* Allow up to 42 recruit requests to accumulate */
static void recruit_subproxy(proxy_instance_t *proxi) static void recruit_subproxy(proxy_instance_t *proxi)
{ {
bool recruit = false;
pthread_t pth; pthread_t pth;
if (proxi->recruiting) mutex_lock(&proxi->proxy_lock);
return; if (!proxi->recruit++ > 0)
proxi->recruiting = true; recruit = true;
else if (proxi->recruit > 42)
proxi->recruit = 42;
mutex_unlock(&proxi->proxy_lock);
if (recruit)
create_pthread(&pth, proxy_recruit, proxi); create_pthread(&pth, proxy_recruit, proxi);
} }

46
src/stratifier.c

@ -1160,10 +1160,9 @@ static void reconnect_clients(sdata_t *sdata)
HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) {
if (client->proxyid == proxy->id) if (client->proxyid == proxy->id)
continue; continue;
if (headroom < 1)
break;
headroom--;
reconnects++; reconnects++;
if (headroom-- < 1)
break;
if (client->reconnect) if (client->reconnect)
reconnect_client(sdata, client); reconnect_client(sdata, client);
else else
@ -1174,9 +1173,9 @@ static void reconnect_clients(sdata_t *sdata)
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects, LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects,
proxy->id); proxy->id);
}
if (headroom < 42) if (headroom < 42)
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp);
}
} }
#if 0 #if 0
@ -1209,10 +1208,9 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != id || client->subproxyid != subid) if (client->proxyid != id || client->subproxyid != subid)
continue; continue;
if (headroom < 1)
break;
headroom--;
reconnects++; reconnects++;
if (headroom-- < 1)
break;
if (client->reconnect) if (client->reconnect)
reconnect_client(sdata, client); reconnect_client(sdata, client);
else else
@ -1223,43 +1221,15 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects,
id, subid); id, subid);
}
if (headroom < 42) if (headroom < 42)
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp);
}
static void reassess_headroom(sdata_t *sdata, const proxy_t *proxy)
{
stratum_instance_t *client, *tmpclient;
proxy_t *subproxy, *tmp;
int64_t headroom = 0;
mutex_lock(&sdata->proxy_lock);
HASH_ITER(sh, proxy->subproxies, subproxy, tmp) {
if (subproxy->dead)
continue;
headroom += subproxy->max_clients - subproxy->clients;
}
mutex_unlock(&sdata->proxy_lock);
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) {
if (client->dropped)
continue;
if (client->reconnect || client->proxyid != proxy->id)
headroom--;
} }
ck_runlock(&sdata->instance_lock);
if (headroom < 1)
generator_recruit(sdata->ckp);
} }
static void update_subscribe(ckpool_t *ckp, const char *cmd) static void update_subscribe(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
proxy_t *proxy, *old = NULL; proxy_t *proxy, *old = NULL;
bool current = false;
const char *buf; const char *buf;
int64_t id = 0; int64_t id = 0;
int subid = 0; int subid = 0;
@ -1326,14 +1296,10 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
/* Is this a replacement proxy for the current one */ /* Is this a replacement proxy for the current one */
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
if (sdata->proxy && sdata->proxy->low_id == proxy->low_id && !proxy->subid) { if (sdata->proxy && sdata->proxy->low_id == proxy->low_id && !proxy->subid)
current = true;
sdata->proxy = proxy; sdata->proxy = proxy;
}
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
if (current)
reassess_headroom(sdata, proxy);
if (subid) { if (subid) {
LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64, LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64,
id, subid, proxy->nonce2len, proxy->max_clients); id, subid, proxy->nonce2len, proxy->max_clients);

Loading…
Cancel
Save