From 416b7a02eb33205d3a76b601ca68e39e919e50db Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 16:05:38 +1100 Subject: [PATCH] Handle proxy failover correctly for remainder of upstream pools when a reconnect is issued and don't lose original pool details, demoting it to last instead --- src/ckpool.h | 1 - src/generator.c | 156 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 104 insertions(+), 53 deletions(-) diff --git a/src/ckpool.h b/src/ckpool.h index b73059dc..b2f59333 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -183,7 +183,6 @@ struct ckpool_instance { char **serverurl; // Array of URLs to bind our server/proxy to int serverurls; // Number of server bindings int update_interval; // Seconds between stratum updates - int chosen_server; // Chosen server for next connection /* Proxy options */ int proxies; diff --git a/src/generator.c b/src/generator.c index 00ba3831..911c8f08 100644 --- a/src/generator.c +++ b/src/generator.c @@ -71,12 +71,18 @@ struct pass_msg { typedef struct pass_msg pass_msg_t; +typedef struct proxy_instance proxy_instance_t; + /* Per proxied pool instance data */ struct proxy_instance { + proxy_instance_t *next; + proxy_instance_t *prev; + ckpool_t *ckp; connsock_t *cs; server_instance_t *si; bool passthrough; + int id; /* Proxy server id */ const char *auth; const char *pass; @@ -92,14 +98,13 @@ struct proxy_instance { double diff; tv_t last_share; - int id; /* Message id for sending stratum messages */ + int msg_id; /* Message id for sending stratum messages */ bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ bool notified; /* Received new template for work */ bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ - bool replaced; /* This proxy has issued a reconnect with new data */ pthread_mutex_t notify_lock; notify_instance_t *notify_instances; @@ -121,10 +126,10 @@ struct proxy_instance { char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ }; -typedef struct proxy_instance proxy_instance_t; - /* Private data for the generator */ struct generator_data { + pthread_mutex_t lock; /* Lock protecting linked lists */ + proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -653,19 +658,19 @@ retry: /* Attempt to reconnect if the pool supports resuming */ if (proxi->sessionid) { JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION, proxi->sessionid); /* Then attempt to connect with just the client description */ } else if (!proxi->no_params) { JSON_CPACK(req, "{s:i,s:s,s:[s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION); /* Then try without any parameters */ } else { JSON_CPACK(req, "{s:i,s:s,s:[]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params"); } @@ -875,7 +880,9 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; + proxy_instance_t *newproxi; ckpool_t *ckp = proxi->ckp; + gdata_t *gdata = ckp->data; const char *new_url; bool ret = false; int new_port; @@ -920,27 +927,25 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) ret = true; newsi = ckzalloc(sizeof(server_instance_t)); - newsi->id = ckp->proxies; - ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * (ckp->proxies + 1)); + + mutex_lock(&gdata->lock); + newsi->id = si->id; /* Inherit the old connection's id */ + si->id = ckp->proxies++; /* Give the old connection the lowest id */ + ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies); ckp->servers[newsi->id] = newsi; newsi->url = url; newsi->auth = strdup(si->auth); newsi->pass = strdup(si->pass); proxi->reconnect = true; - proxi->replaced = true; - - /* Reuse variable on a new proxy instance */ - proxi = ckzalloc(sizeof(proxy_instance_t)); - newsi->data = proxi; - proxi->auth = newsi->auth; - proxi->pass = newsi->pass; - proxi->si = newsi; - proxi->ckp = ckp; - proxi->cs = &newsi->cs; - - /* Set chosen server only once all new proxy data exists */ - ckp->proxies++; - ckp->chosen_server = newsi->id; + + newproxi = ckzalloc(sizeof(proxy_instance_t)); + newsi->data = newproxi; + newproxi->auth = newsi->auth; + newproxi->pass = newsi->pass; + newproxi->si = newsi; + newproxi->ckp = ckp; + newproxi->cs = &newsi->cs; + mutex_unlock(&gdata->lock); out: return ret; } @@ -1035,7 +1040,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.authorize", "params", proxi->auth, proxi->pass); ret = send_json_msg(cs, req); @@ -1430,9 +1435,6 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * { bool ret = false; - if (proxi->replaced) - return false; - /* Has this proxy already been reconnected? */ if (cs->fd > 0) return true; @@ -1484,21 +1486,29 @@ out: /* Cycle through the available proxies and find the first alive one */ static proxy_instance_t *live_proxy(ckpool_t *ckp) { - int i, start_from = ckp->chosen_server; proxy_instance_t *alive = NULL; + gdata_t *gdata = ckp->data; connsock_t *cs; + int i, srvs; LOGDEBUG("Attempting to connect to proxy"); retry: if (!ping_main(ckp)) goto out; - for (i = start_from; i < ckp->proxies; i++) { + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + + for (i = 0; i < srvs; i++) { proxy_instance_t *proxi; server_instance_t *si; + mutex_lock(&gdata->lock); si = ckp->servers[i]; proxi = si->data; + mutex_unlock(&gdata->lock); + cs = proxi->cs; if (proxy_alive(ckp, si, proxi, cs, false)) { alive = proxi; @@ -1508,13 +1518,11 @@ retry: if (!alive) { send_proc(ckp->connector, "reject"); send_proc(ckp->stratifier, "dropall"); - if (!start_from) { - LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); - sleep(5); - } + LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); + sleep(5); goto retry; } - start_from = 0; + cs = alive->cs; LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); @@ -1579,10 +1587,10 @@ reconnect: if (!proxi) goto out; if (reconnecting) { + reconnecting = false; connsock_t *cs = proxi->cs; LOGWARNING("Successfully reconnected to %s:%s as proxy", cs->url, cs->port); - reconnecting = false; } /* We've just subscribed and authorised so tell the stratifier to @@ -1690,10 +1698,13 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { + gdata_t *gdata = ckp->data; proxy_instance_t *proxi; server_instance_t *si; int i, ret; + mutex_init(&gdata->lock); + /* Create all our proxy structures and pointers */ ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); for (i = 0; i < ckp->proxies; i++) { @@ -1718,6 +1729,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) ret = proxy_loop(pi); + mutex_lock(&gdata->lock); for (i = 0; i < ckp->proxies; i++) { si = ckp->servers[i]; Close(si->cs.fd); @@ -1735,6 +1747,8 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) dealloc(si->pass); dealloc(si); } + mutex_unlock(&gdata->lock); + dealloc(ckp->servers); return ret; } @@ -1743,6 +1757,40 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) * should check to see if the higher priority servers are alive and fallback */ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) { + static time_t last_t = 0; + bool alive = false; + time_t now_t; + int i; + + /* Rate limit to checking only once every 5 seconds */ + now_t = time(NULL); + if (now_t <= last_t + 5) + return; + + last_t = now_t; + + /* Is this the highest priority server already? */ + if (!cursi->id) + return; + + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + /* Have we reached the current server? */ + if (si == cursi) + return; + + alive = server_alive(ckp, si, true); + if (alive) + break; + } + if (alive) + send_proc(ckp->generator, "reconnect"); +} + +static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) +{ + gdata_t *gdata = ckp->data; static time_t last_t = 0; bool alive = false; time_t now_t; @@ -1756,27 +1804,29 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) last_t = now_t; /* Is this the highest priority server already? */ - if (cursi == ckp->servers[ckp->chosen_server]) + if (!cursi->id) return; - if (ckp->proxy) - srvs = ckp->proxies; - else - srvs = ckp->btcds; + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + for (i = 0; i < srvs; i++) { - server_instance_t *si = ckp->servers[i]; + proxy_instance_t *proxi; + server_instance_t *si; + connsock_t *cs; + + mutex_lock(&gdata->lock); + si = ckp->servers[i]; + proxi = si->data; + mutex_unlock(&gdata->lock); /* Have we reached the current server? */ if (si == cursi) return; - if (ckp->proxy) { - proxy_instance_t *proxi = si->data; - connsock_t *cs = proxi->cs; - - alive = proxy_alive(ckp, si, proxi, cs, true); - } else - alive = server_alive(ckp, si, true); + cs = proxi->cs; + alive = proxy_alive(ckp, si, proxi, cs, true); if (alive) break; } @@ -1784,6 +1834,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) send_proc(ckp->generator, "reconnect"); } + int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -1793,12 +1844,13 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; - gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); - - if (ckp->proxy) + if (ckp->proxy) { + gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); - else + } else { + gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); ret = server_mode(ckp, pi); + } dealloc(ckp->data); return process_exit(ckp, pi, ret);