From 2f2d454763a77f032fddc0171abe9a9cc74d1a24 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 24 Feb 2015 14:01:37 +1100 Subject: [PATCH] Create new instances of proxies on reconnect instead of trying to overwrite old ones on reconnect --- src/generator.c | 66 +++++++++++++---------------- src/stratifier.c | 106 +++++++++++++---------------------------------- 2 files changed, 57 insertions(+), 115 deletions(-) diff --git a/src/generator.c b/src/generator.c index 008adf16..38e020d7 100644 --- a/src/generator.c +++ b/src/generator.c @@ -85,6 +85,7 @@ struct proxy_instance { server_instance_t *si; bool passthrough; int64_t id; /* Proxy server id*/ + int low_id; /* Low bits of id */ int subid; /* Subproxy id */ const char *auth; @@ -653,8 +654,8 @@ retry: } if (size < 3) { if (!proxi->subid) { - LOGWARNING("Proxy %ld %s Nonce2 length %d too small for fast miners", - proxi->id, proxi->si->url, size); + LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners", + proxi->low_id, proxi->si->url, size); } else { LOGNOTICE("Proxy %ld:%d Nonce2 length %d too small for fast miners", proxi->id, proxi->subid, size); @@ -973,26 +974,10 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst store_proxy(gdata, subproxy); } -/* If the parent is no longer in use due to reconnect, we shouldn't use any of - * the child subproxies. */ -static void drop_subproxies(proxy_instance_t *proxi) -{ - proxy_instance_t *subproxy, *tmp; - - mutex_lock(&proxi->proxy_lock); - HASH_ITER(sh, proxi->subproxies, subproxy, tmp) { - if (!parent_proxy(subproxy)) { - send_stratifier_deadproxy(proxi->ckp, proxi->id, subproxy->subid); - subproxy->disabled = true; - Close(subproxy->cs->fd); - } - } - mutex_unlock(&proxi->proxy_lock); -} - static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; + int64_t high_id, low_id, new_id; proxy_instance_t *newproxi; ckpool_t *ckp = proxi->ckp; gdata_t *gdata = ckp->data; @@ -1049,8 +1034,12 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) newsi = ckzalloc(sizeof(server_instance_t)); mutex_lock(&gdata->lock); - newsi->id = si->id; /* Inherit the old connection's id */ - ckp->servers[newsi->id] = newsi; + high_id = proxi->id >> 32; /* Use the high bits for the reconnect id */ + high_id++; + high_id <<= 32; + low_id = proxi->id & 0x00000000FFFFFFFFll; /* Use the low bits for the master id */ + new_id = high_id | low_id; + ckp->servers[low_id] = newsi; newsi->url = url; newsi->auth = strdup(si->auth); newsi->pass = strdup(si->pass); @@ -1063,14 +1052,14 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) newproxi->ckp = ckp; newproxi->cs = &newsi->cs; newproxi->cs->ckp = ckp; - newproxi->id = newsi->id; + newproxi->low_id = low_id; + newproxi->id = new_id; newproxi->subproxy_count = ++proxi->subproxy_count; - HASH_REPLACE_I64(gdata->proxies, id, newproxi, proxi); + HASH_ADD_I64(gdata->proxies, id, newproxi); mutex_unlock(&gdata->lock); - /* Old proxy memory is basically lost here */ + proxi->disabled = true; prepare_proxy(newproxi); - drop_subproxies(proxi); out: return ret; } @@ -1247,21 +1236,21 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi) val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Proxy %ld:%d %s failed to get a json result in auth_stratum, got: %s", - proxi->id, proxi->subid, proxi->si->url, buf); + LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", + proxi->low_id, proxi->subid, proxi->si->url, buf); goto out; } if (err_val && !json_is_null(err_val)) { - LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum due to err_val, got: %s", - proxi->id, proxi->subid, proxi->si->url, buf); + LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s", + proxi->low_id, proxi->subid, proxi->si->url, buf); goto out; } if (res_val) { ret = json_is_true(res_val); if (!ret) { - LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum, got: %s", - proxi->id, proxi->subid, proxi->si->url, buf); + LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s", + proxi->low_id, proxi->subid, proxi->si->url, buf); goto out; } } else { @@ -1764,8 +1753,8 @@ static void *passthrough_recv(void *arg) if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { reconnect_generator(ckp); - LOGWARNING("Proxy %ld:%s connection established", - proxi->id, proxi->si->url); + LOGWARNING("Proxy %d:%s connection established", + proxi->low_id, proxi->si->url); } alive = proxi->alive; @@ -1970,7 +1959,7 @@ static void setup_proxies(ckpool_t *ckp, gdata_t *gdata) si = ckp->servers[i]; proxi = si->data; - proxi->id = i; + proxi->id = proxi->low_id = i; HASH_ADD_I64(gdata->proxies, id, proxi); if (ckp->passthrough) { create_pthread(&proxi->pth_precv, passthrough_recv, proxi); @@ -1991,8 +1980,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { + if (proxi->disabled) + continue; if (proxi->alive || subproxies_alive(proxi)) { - if (!ret || proxi->id < ret->id) + if (!ret || proxi->low_id < ret->low_id) ret = proxi; } } @@ -2029,8 +2020,8 @@ reconnect: proxi = cproxy; if (!ckp->passthrough) { connsock_t *cs = proxi->cs; - LOGWARNING("Successfully connected to proxy %ld %s:%s as proxy", - proxi->id, cs->url, cs->port); + LOGWARNING("Successfully connected to proxy %d %s:%s as proxy", + proxi->low_id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%ld", proxi->id); async_send_proc(ckp, ckp->stratifier, buf); @@ -2134,7 +2125,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) for (i = 0; i < ckp->proxies; i++) { ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); si = ckp->servers[i]; - si->id = i; si->url = strdup(ckp->proxyurl[i]); si->auth = strdup(ckp->proxyauth[i]); si->pass = strdup(ckp->proxypass[i]); diff --git a/src/stratifier.c b/src/stratifier.c index 172d5d19..2afbd530 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -306,6 +306,7 @@ struct proxy_base { proxy_t *next; /* For retired subproxies */ proxy_t *prev; int64_t id; + int low_id; int subid; double diff; @@ -1027,6 +1028,7 @@ static proxy_t *__generate_proxy(sdata_t *sdata, const int64_t id) proxy_t *proxy = ckzalloc(sizeof(proxy_t)); proxy->id = id; + proxy->low_id = id & 0xFFFFFFFF; proxy->sdata = duplicate_sdata(sdata); proxy->sdata->subproxy = proxy; proxy->sdata->verbose = true; @@ -1191,20 +1193,22 @@ static proxy_t *current_proxy(sdata_t *sdata) return proxy; } -static void dead_parent_proxy(sdata_t *sdata, const int64_t id) +static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid) { stratum_instance_t *client, *tmp; int reconnects = 0; int64_t headroom; proxy_t *proxy; + proxy = existing_subproxy(sdata, id, subid); + if (proxy) + proxy->dead = true; + LOGINFO("Stratifier dropping clients from proxy %ld:%d", id, subid); headroom = current_headroom(sdata, &proxy); - if (!proxy) - return; ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { - if (client->proxyid != id || client->subproxyid) + if (client->proxyid != id || client->subproxyid != subid) continue; headroom--; reconnects++; @@ -1216,55 +1220,18 @@ static void dead_parent_proxy(sdata_t *sdata, const int64_t id) ck_runlock(&sdata->instance_lock); if (reconnects) { - LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld", - reconnects, id); + LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, + id, subid); if (headroom < 42) generator_recruit(sdata->ckp); } } -static void new_proxy(sdata_t *sdata, const int64_t id) -{ - proxy_t *proxy, *subproxy, *tmp, *proxy_list = NULL; - bool exists = false, current = false; - - mutex_lock(&sdata->proxy_lock); - HASH_FIND_I64(sdata->proxies, &id, proxy); - if (proxy) { - exists = true; - HASH_DEL(sdata->proxies, proxy); - DL_APPEND(sdata->retired_proxies, proxy); - if (proxy == sdata->proxy) - current = true; - proxy_list = proxy->subproxies; - HASH_DELETE(sh, proxy_list, proxy); - proxy->subproxies = NULL; - } - proxy = __generate_proxy(sdata, id); - if (current) - sdata->proxy = proxy; - /* The old proxy had subproxies on its list so steal its list and add - * ourselves to it. */ - if (proxy_list) { - HASH_DELETE(sh, proxy->subproxies, proxy); - proxy->subproxies = proxy_list; - HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy); - HASH_ITER(sh, proxy->subproxies, subproxy, tmp) - subproxy->parent = proxy; - } - mutex_unlock(&sdata->proxy_lock); - - if (exists) { - LOGNOTICE("Stratifier replaced old proxy %ld", id); - dead_parent_proxy(sdata, id); - } -} - static void update_subscribe(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->data, *dsdata; + proxy_t *proxy, *old = NULL; const char *buf; - proxy_t *proxy; int64_t id = 0; int subid = 0; json_t *val; @@ -1289,13 +1256,19 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) return; } - if (!subid) { - new_proxy(sdata, id); + if (!subid) LOGNOTICE("Got updated subscribe for proxy %ld", id); - } else + else LOGINFO("Got updated subscribe for proxy %ld:%d", id, subid); - proxy = subproxy_by_id(sdata, id, subid); + /* Is this a replacement for an existing proxy id? */ + old = existing_subproxy(sdata, id, subid); + if (old) { + dead_proxyid(sdata, id, subid); + proxy = old; + proxy->dead = false; + } else + proxy = subproxy_by_id(sdata, id, subid); dsdata = proxy->sdata; ck_wlock(&dsdata->workbase_lock); @@ -1323,6 +1296,12 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) proxy->enonce1u.u64 = 0; ck_wunlock(&dsdata->workbase_lock); + /* Is this a replacement proxy for the current one */ + mutex_lock(&sdata->proxy_lock); + if (sdata->proxy && sdata->proxy->low_id == proxy->low_id) + sdata->proxy = proxy; + mutex_unlock(&sdata->proxy_lock); + if (subid) { LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64, id, subid, proxy->nonce2len, proxy->max_clients); @@ -2246,38 +2225,11 @@ static void set_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf) { - stratum_instance_t *client, *tmp; - int64_t headroom, id = 0; - int reconnects = 0; - proxy_t *proxy; + int64_t id = 0; int subid = 0; sscanf(buf, "deadproxy=%ld:%d", &id, &subid); - proxy = existing_subproxy(sdata, id, subid); - if (proxy) - proxy->dead = true; - LOGNOTICE("Stratifier dropping clients from proxy %ld:%d", id, subid); - headroom = current_headroom(sdata, &proxy); - - ck_rlock(&sdata->instance_lock); - HASH_ITER(hh, sdata->stratum_instances, client, tmp) { - if (client->proxyid != id || client->subproxyid != subid) - continue; - headroom--; - reconnects++; - if (client->reconnect && headroom > 0) - reconnect_client(sdata, client); - else - client->reconnect = true; - } - ck_runlock(&sdata->instance_lock); - - if (reconnects) { - LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, - id, subid); - if (headroom < 42) - generator_recruit(sdata->ckp); - } + dead_proxyid(sdata, id, subid); } /* Must hold a reference */