From 6880b9f03d4c339d9b2357060c2f2e1255968d31 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 16 Sep 2014 10:21:39 +1000 Subject: [PATCH] Implement fallback in proxy mode and keep existing connection when testing if servers or proxies are alive --- src/generator.c | 72 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/src/generator.c b/src/generator.c index 50757137..23fa3250 100644 --- a/src/generator.c +++ b/src/generator.c @@ -132,6 +132,9 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) gbtbase_t *gbt; cs = &si->cs; + /* Has this server already been reconnected? */ + if (cs->fd > 0) + return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); return ret; @@ -161,18 +164,20 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) LOGINFO("Failed to get test block template from %s:%s!", cs->url, cs->port); } - goto out_close; + goto out; } clear_gbtbase(gbt); if (!validate_address(cs, ckp->btcaddress)) { LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); - goto out_close; + goto out; } ret = true; -out_close: - if (!ret) +out: + if (!ret) { + /* Close and invalidate the file handle */ close(cs->fd); - else + cs->fd = -1; + } else keep_sockalive(cs->fd); return ret; } @@ -348,7 +353,7 @@ retry: send_unix_msg(sockd, "true"); else send_unix_msg(sockd, "false"); - } else if (cmdmatch(buf, "fallback")) { + } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -388,8 +393,8 @@ static bool connect_proxy(connsock_t *cs) { cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { - LOGWARNING("Failed to connect socket to %s:%s in connect_proxy", - cs->url, cs->port); + LOGINFO("Failed to connect socket to %s:%s in connect_proxy", + cs->url, cs->port); return false; } keep_sockalive(cs->fd); @@ -1283,24 +1288,30 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging) { + bool ret = false; + + /* Has this proxy already been reconnected? */ + if (cs->fd > 0) + return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); - return false; + return ret; } if (!connect_proxy(cs)) { if (!pinging) { LOGINFO("Failed to connect to %s:%s in proxy_mode!", cs->url, cs->port); } - return false; + return ret; } if (ckp->passthrough) { if (!passthrough_stratum(cs, proxi)) { LOGWARNING("Failed initial passthrough to %s:%s !", cs->url, cs->port); - return false; + goto out; } - return true; + ret = true; + goto out; } /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { @@ -1308,16 +1319,24 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * LOGINFO("Failed initial subscribe to %s:%s !", cs->url, cs->port); } - return false; + goto out; } if (!auth_stratum(cs, proxi)) { if (!pinging) { LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", cs->url, cs->port, si->auth, si->pass); } - return false; + goto out; } - return true; + ret = true; +out: + if (!ret) { + /* Close and invalidate the file handle */ + close(cs->fd); + cs->fd = -1; + } else + keep_sockalive(cs->fd); + return ret; } /* Cycle through the available proxies and find the first alive one */ @@ -1381,6 +1400,7 @@ static void kill_proxy(proxy_instance_t *proxi) return; cs = proxi->cs; close(cs->fd); + cs->fd = -1; /* All our notify data is invalid if we reconnect so discard them */ mutex_lock(&proxi->notify_lock); @@ -1404,8 +1424,10 @@ static int proxy_loop(proc_instance_t *pi) char *buf = NULL; reconnect: - if (proxi) + if (proxi) { + kill_proxy(proxi); reconnecting = true; + } proxi = live_proxy(ckp); if (!proxi) goto out; @@ -1424,6 +1446,9 @@ reconnect: proxi->notified = false; } +retry: + ckmsgq_add(srvchk, proxi->si); + do { selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { @@ -1432,7 +1457,12 @@ reconnect: goto out; } } while (selret < 1); -retry: + + if (unlikely(proxi->cs->fd < 0)) { + LOGWARNING("Upstream socket invalidated, will attempt failover"); + goto reconnect; + } + sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on proxy socket"); @@ -1457,9 +1487,6 @@ retry: } else if (cmdmatch(buf, "getdiff")) { send_diff(proxi, sockd); } else if (cmdmatch(buf, "reconnect")) { - kill_proxy(proxi); - pthread_cancel(proxi->pth_precv); - pthread_cancel(proxi->pth_psend); goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { if (ckp->btcdbackup) { @@ -1470,8 +1497,6 @@ retry: LOGWARNING("Block rejected locally."); } else LOGNOTICE("No backup btcd to send block to ourselves"); - } else if (cmdmatch(buf, "fallback")) { - goto reconnect; } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { @@ -1542,7 +1567,6 @@ static bool alive_btcd(server_instance_t *si) LOGWARNING("Failed to create base64 auth from btcd %s", userpass); return false; } - cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); return false; @@ -1667,7 +1691,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "fallback"); + send_proc(ckp->generator, "reconnect"); } int generator(proc_instance_t *pi)