Browse Source

Implement fallback in proxy mode and keep existing connection when testing if servers or proxies are alive

master
Con Kolivas 10 years ago
parent
commit
6880b9f03d
  1. 72
      src/generator.c

72
src/generator.c

@ -132,6 +132,9 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
gbtbase_t *gbt; gbtbase_t *gbt;
cs = &si->cs; cs = &si->cs;
/* Has this server already been reconnected? */
if (cs->fd > 0)
return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return ret; return ret;
@ -161,18 +164,20 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
LOGINFO("Failed to get test block template from %s:%s!", LOGINFO("Failed to get test block template from %s:%s!",
cs->url, cs->port); cs->url, cs->port);
} }
goto out_close; goto out;
} }
clear_gbtbase(gbt); clear_gbtbase(gbt);
if (!validate_address(cs, ckp->btcaddress)) { if (!validate_address(cs, ckp->btcaddress)) {
LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress);
goto out_close; goto out;
} }
ret = true; ret = true;
out_close: out:
if (!ret) if (!ret) {
/* Close and invalidate the file handle */
close(cs->fd); close(cs->fd);
else cs->fd = -1;
} else
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
return ret; return ret;
} }
@ -348,7 +353,7 @@ retry:
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
else else
send_unix_msg(sockd, "false"); send_unix_msg(sockd, "false");
} else if (cmdmatch(buf, "fallback")) { } else if (cmdmatch(buf, "reconnect")) {
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
@ -388,8 +393,8 @@ static bool connect_proxy(connsock_t *cs)
{ {
cs->fd = connect_socket(cs->url, cs->port); cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect socket to %s:%s in connect_proxy", LOGINFO("Failed to connect socket to %s:%s in connect_proxy",
cs->url, cs->port); cs->url, cs->port);
return false; return false;
} }
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
@ -1283,24 +1288,30 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging) connsock_t *cs, bool pinging)
{ {
bool ret = false;
/* Has this proxy already been reconnected? */
if (cs->fd > 0)
return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return false; return ret;
} }
if (!connect_proxy(cs)) { if (!connect_proxy(cs)) {
if (!pinging) { if (!pinging) {
LOGINFO("Failed to connect to %s:%s in proxy_mode!", LOGINFO("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port); cs->url, cs->port);
} }
return false; return ret;
} }
if (ckp->passthrough) { if (ckp->passthrough) {
if (!passthrough_stratum(cs, proxi)) { if (!passthrough_stratum(cs, proxi)) {
LOGWARNING("Failed initial passthrough to %s:%s !", LOGWARNING("Failed initial passthrough to %s:%s !",
cs->url, cs->port); cs->url, cs->port);
return false; goto out;
} }
return true; ret = true;
goto out;
} }
/* Test we can connect, authorise and get stratum information */ /* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) { if (!subscribe_stratum(cs, proxi)) {
@ -1308,16 +1319,24 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
LOGINFO("Failed initial subscribe to %s:%s !", LOGINFO("Failed initial subscribe to %s:%s !",
cs->url, cs->port); cs->url, cs->port);
} }
return false; goto out;
} }
if (!auth_stratum(cs, proxi)) { if (!auth_stratum(cs, proxi)) {
if (!pinging) { if (!pinging) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass); cs->url, cs->port, si->auth, si->pass);
} }
return false; goto out;
} }
return true; ret = true;
out:
if (!ret) {
/* Close and invalidate the file handle */
close(cs->fd);
cs->fd = -1;
} else
keep_sockalive(cs->fd);
return ret;
} }
/* Cycle through the available proxies and find the first alive one */ /* Cycle through the available proxies and find the first alive one */
@ -1381,6 +1400,7 @@ static void kill_proxy(proxy_instance_t *proxi)
return; return;
cs = proxi->cs; cs = proxi->cs;
close(cs->fd); close(cs->fd);
cs->fd = -1;
/* All our notify data is invalid if we reconnect so discard them */ /* All our notify data is invalid if we reconnect so discard them */
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
@ -1404,8 +1424,10 @@ static int proxy_loop(proc_instance_t *pi)
char *buf = NULL; char *buf = NULL;
reconnect: reconnect:
if (proxi) if (proxi) {
kill_proxy(proxi);
reconnecting = true; reconnecting = true;
}
proxi = live_proxy(ckp); proxi = live_proxy(ckp);
if (!proxi) if (!proxi)
goto out; goto out;
@ -1424,6 +1446,9 @@ reconnect:
proxi->notified = false; proxi->notified = false;
} }
retry:
ckmsgq_add(srvchk, proxi->si);
do { do {
selret = wait_read_select(us->sockd, 5); selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) { if (!selret && !ping_main(ckp)) {
@ -1432,7 +1457,12 @@ reconnect:
goto out; goto out;
} }
} while (selret < 1); } while (selret < 1);
retry:
if (unlikely(proxi->cs->fd < 0)) {
LOGWARNING("Upstream socket invalidated, will attempt failover");
goto reconnect;
}
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGEMERG("Failed to accept on proxy socket"); LOGEMERG("Failed to accept on proxy socket");
@ -1457,9 +1487,6 @@ retry:
} else if (cmdmatch(buf, "getdiff")) { } else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, sockd); send_diff(proxi, sockd);
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
kill_proxy(proxi);
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "submitblock:")) { } else if (cmdmatch(buf, "submitblock:")) {
if (ckp->btcdbackup) { if (ckp->btcdbackup) {
@ -1470,8 +1497,6 @@ retry:
LOGWARNING("Block rejected locally."); LOGWARNING("Block rejected locally.");
} else } else
LOGNOTICE("No backup btcd to send block to ourselves"); LOGNOTICE("No backup btcd to send block to ourselves");
} else if (cmdmatch(buf, "fallback")) {
goto reconnect;
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ping")) { } else if (cmdmatch(buf, "ping")) {
@ -1542,7 +1567,6 @@ static bool alive_btcd(server_instance_t *si)
LOGWARNING("Failed to create base64 auth from btcd %s", userpass); LOGWARNING("Failed to create base64 auth from btcd %s", userpass);
return false; return false;
} }
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port);
return false; return false;
@ -1667,7 +1691,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "fallback"); send_proc(ckp->generator, "reconnect");
} }
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)

Loading…
Cancel
Save