Browse Source

Create new instances of proxies on reconnect instead of trying to overwrite old ones on reconnect

master
Con Kolivas 10 years ago
parent
commit
2f2d454763
  1. 66
      src/generator.c
  2. 104
      src/stratifier.c

66
src/generator.c

@ -85,6 +85,7 @@ struct proxy_instance {
server_instance_t *si;
bool passthrough;
int64_t id; /* Proxy server id*/
int low_id; /* Low bits of id */
int subid; /* Subproxy id */
const char *auth;
@ -653,8 +654,8 @@ retry:
}
if (size < 3) {
if (!proxi->subid) {
LOGWARNING("Proxy %ld %s Nonce2 length %d too small for fast miners",
proxi->id, proxi->si->url, size);
LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners",
proxi->low_id, proxi->si->url, size);
} else {
LOGNOTICE("Proxy %ld:%d Nonce2 length %d too small for fast miners",
proxi->id, proxi->subid, size);
@ -973,26 +974,10 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
store_proxy(gdata, subproxy);
}
/* If the parent is no longer in use due to reconnect, we shouldn't use any of
* the child subproxies. */
static void drop_subproxies(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy, *tmp;
mutex_lock(&proxi->proxy_lock);
HASH_ITER(sh, proxi->subproxies, subproxy, tmp) {
if (!parent_proxy(subproxy)) {
send_stratifier_deadproxy(proxi->ckp, proxi->id, subproxy->subid);
subproxy->disabled = true;
Close(subproxy->cs->fd);
}
}
mutex_unlock(&proxi->proxy_lock);
}
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{
server_instance_t *newsi, *si = proxi->si;
int64_t high_id, low_id, new_id;
proxy_instance_t *newproxi;
ckpool_t *ckp = proxi->ckp;
gdata_t *gdata = ckp->data;
@ -1049,8 +1034,12 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
newsi = ckzalloc(sizeof(server_instance_t));
mutex_lock(&gdata->lock);
newsi->id = si->id; /* Inherit the old connection's id */
ckp->servers[newsi->id] = newsi;
high_id = proxi->id >> 32; /* Use the high bits for the reconnect id */
high_id++;
high_id <<= 32;
low_id = proxi->id & 0x00000000FFFFFFFFll; /* Use the low bits for the master id */
new_id = high_id | low_id;
ckp->servers[low_id] = newsi;
newsi->url = url;
newsi->auth = strdup(si->auth);
newsi->pass = strdup(si->pass);
@ -1063,14 +1052,14 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
newproxi->ckp = ckp;
newproxi->cs = &newsi->cs;
newproxi->cs->ckp = ckp;
newproxi->id = newsi->id;
newproxi->low_id = low_id;
newproxi->id = new_id;
newproxi->subproxy_count = ++proxi->subproxy_count;
HASH_REPLACE_I64(gdata->proxies, id, newproxi, proxi);
HASH_ADD_I64(gdata->proxies, id, newproxi);
mutex_unlock(&gdata->lock);
/* Old proxy memory is basically lost here */
proxi->disabled = true;
prepare_proxy(newproxi);
drop_subproxies(proxi);
out:
return ret;
}
@ -1247,21 +1236,21 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
val = json_msg_result(buf, &res_val, &err_val);
if (!val) {
LOGWARNING("Proxy %ld:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf);
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
goto out;
}
if (err_val && !json_is_null(err_val)) {
LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum due to err_val, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf);
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
goto out;
}
if (res_val) {
ret = json_is_true(res_val);
if (!ret) {
LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf);
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
goto out;
}
} else {
@ -1764,8 +1753,8 @@ static void *passthrough_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
reconnect_generator(ckp);
LOGWARNING("Proxy %ld:%s connection established",
proxi->id, proxi->si->url);
LOGWARNING("Proxy %d:%s connection established",
proxi->low_id, proxi->si->url);
}
alive = proxi->alive;
@ -1970,7 +1959,7 @@ static void setup_proxies(ckpool_t *ckp, gdata_t *gdata)
si = ckp->servers[i];
proxi = si->data;
proxi->id = i;
proxi->id = proxi->low_id = i;
HASH_ADD_I64(gdata->proxies, id, proxi);
if (ckp->passthrough) {
create_pthread(&proxi->pth_precv, passthrough_recv, proxi);
@ -1991,8 +1980,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->proxies, proxi, tmp) {
if (proxi->disabled)
continue;
if (proxi->alive || subproxies_alive(proxi)) {
if (!ret || proxi->id < ret->id)
if (!ret || proxi->low_id < ret->low_id)
ret = proxi;
}
}
@ -2029,8 +2020,8 @@ reconnect:
proxi = cproxy;
if (!ckp->passthrough) {
connsock_t *cs = proxi->cs;
LOGWARNING("Successfully connected to proxy %ld %s:%s as proxy",
proxi->id, cs->url, cs->port);
LOGWARNING("Successfully connected to proxy %d %s:%s as proxy",
proxi->low_id, cs->url, cs->port);
dealloc(buf);
ASPRINTF(&buf, "proxy=%ld", proxi->id);
async_send_proc(ckp, ckp->stratifier, buf);
@ -2134,7 +2125,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
for (i = 0; i < ckp->proxies; i++) {
ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[i];
si->id = i;
si->url = strdup(ckp->proxyurl[i]);
si->auth = strdup(ckp->proxyauth[i]);
si->pass = strdup(ckp->proxypass[i]);

104
src/stratifier.c

@ -306,6 +306,7 @@ struct proxy_base {
proxy_t *next; /* For retired subproxies */
proxy_t *prev;
int64_t id;
int low_id;
int subid;
double diff;
@ -1027,6 +1028,7 @@ static proxy_t *__generate_proxy(sdata_t *sdata, const int64_t id)
proxy_t *proxy = ckzalloc(sizeof(proxy_t));
proxy->id = id;
proxy->low_id = id & 0xFFFFFFFF;
proxy->sdata = duplicate_sdata(sdata);
proxy->sdata->subproxy = proxy;
proxy->sdata->verbose = true;
@ -1191,20 +1193,22 @@ static proxy_t *current_proxy(sdata_t *sdata)
return proxy;
}
static void dead_parent_proxy(sdata_t *sdata, const int64_t id)
static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
{
stratum_instance_t *client, *tmp;
int reconnects = 0;
int64_t headroom;
proxy_t *proxy;
proxy = existing_subproxy(sdata, id, subid);
if (proxy)
proxy->dead = true;
LOGINFO("Stratifier dropping clients from proxy %ld:%d", id, subid);
headroom = current_headroom(sdata, &proxy);
if (!proxy)
return;
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != id || client->subproxyid)
if (client->proxyid != id || client->subproxyid != subid)
continue;
headroom--;
reconnects++;
@ -1216,55 +1220,18 @@ static void dead_parent_proxy(sdata_t *sdata, const int64_t id)
ck_runlock(&sdata->instance_lock);
if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld",
reconnects, id);
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects,
id, subid);
if (headroom < 42)
generator_recruit(sdata->ckp);
}
}
static void new_proxy(sdata_t *sdata, const int64_t id)
{
proxy_t *proxy, *subproxy, *tmp, *proxy_list = NULL;
bool exists = false, current = false;
mutex_lock(&sdata->proxy_lock);
HASH_FIND_I64(sdata->proxies, &id, proxy);
if (proxy) {
exists = true;
HASH_DEL(sdata->proxies, proxy);
DL_APPEND(sdata->retired_proxies, proxy);
if (proxy == sdata->proxy)
current = true;
proxy_list = proxy->subproxies;
HASH_DELETE(sh, proxy_list, proxy);
proxy->subproxies = NULL;
}
proxy = __generate_proxy(sdata, id);
if (current)
sdata->proxy = proxy;
/* The old proxy had subproxies on its list so steal its list and add
* ourselves to it. */
if (proxy_list) {
HASH_DELETE(sh, proxy->subproxies, proxy);
proxy->subproxies = proxy_list;
HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy);
HASH_ITER(sh, proxy->subproxies, subproxy, tmp)
subproxy->parent = proxy;
}
mutex_unlock(&sdata->proxy_lock);
if (exists) {
LOGNOTICE("Stratifier replaced old proxy %ld", id);
dead_parent_proxy(sdata, id);
}
}
static void update_subscribe(ckpool_t *ckp, const char *cmd)
{
sdata_t *sdata = ckp->data, *dsdata;
proxy_t *proxy, *old = NULL;
const char *buf;
proxy_t *proxy;
int64_t id = 0;
int subid = 0;
json_t *val;
@ -1289,12 +1256,18 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
return;
}
if (!subid) {
new_proxy(sdata, id);
if (!subid)
LOGNOTICE("Got updated subscribe for proxy %ld", id);
} else
else
LOGINFO("Got updated subscribe for proxy %ld:%d", id, subid);
/* Is this a replacement for an existing proxy id? */
old = existing_subproxy(sdata, id, subid);
if (old) {
dead_proxyid(sdata, id, subid);
proxy = old;
proxy->dead = false;
} else
proxy = subproxy_by_id(sdata, id, subid);
dsdata = proxy->sdata;
@ -1323,6 +1296,12 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
proxy->enonce1u.u64 = 0;
ck_wunlock(&dsdata->workbase_lock);
/* Is this a replacement proxy for the current one */
mutex_lock(&sdata->proxy_lock);
if (sdata->proxy && sdata->proxy->low_id == proxy->low_id)
sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock);
if (subid) {
LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64,
id, subid, proxy->nonce2len, proxy->max_clients);
@ -2246,38 +2225,11 @@ static void set_proxy(sdata_t *sdata, const char *buf)
static void dead_proxy(sdata_t *sdata, const char *buf)
{
stratum_instance_t *client, *tmp;
int64_t headroom, id = 0;
int reconnects = 0;
proxy_t *proxy;
int64_t id = 0;
int subid = 0;
sscanf(buf, "deadproxy=%ld:%d", &id, &subid);
proxy = existing_subproxy(sdata, id, subid);
if (proxy)
proxy->dead = true;
LOGNOTICE("Stratifier dropping clients from proxy %ld:%d", id, subid);
headroom = current_headroom(sdata, &proxy);
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != id || client->subproxyid != subid)
continue;
headroom--;
reconnects++;
if (client->reconnect && headroom > 0)
reconnect_client(sdata, client);
else
client->reconnect = true;
}
ck_runlock(&sdata->instance_lock);
if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects,
id, subid);
if (headroom < 42)
generator_recruit(sdata->ckp);
}
dead_proxyid(sdata, id, subid);
}
/* Must hold a reference */

Loading…
Cancel
Save