diff --git a/src/generator.c b/src/generator.c index 50757137..b3fdbd36 100644 --- a/src/generator.c +++ b/src/generator.c @@ -104,7 +104,6 @@ struct proxy_instance { pthread_mutex_t notify_lock; notify_instance_t *notify_instances; notify_instance_t *current_notify; - int notify_id; pthread_t pth_precv; pthread_t pth_psend; @@ -122,6 +121,8 @@ struct proxy_instance { typedef struct proxy_instance proxy_instance_t; +static int proxy_notify_id; // Globally increasing notify id + static ckmsgq_t *srvchk; // Server check message queue static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) @@ -132,6 +133,9 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) gbtbase_t *gbt; cs = &si->cs; + /* Has this server already been reconnected? */ + if (cs->fd > 0) + return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); return ret; @@ -161,18 +165,20 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) LOGINFO("Failed to get test block template from %s:%s!", cs->url, cs->port); } - goto out_close; + goto out; } clear_gbtbase(gbt); if (!validate_address(cs, ckp->btcaddress)) { LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); - goto out_close; + goto out; } ret = true; -out_close: - if (!ret) +out: + if (!ret) { + /* Close and invalidate the file handle */ close(cs->fd); - else + cs->fd = -1; + } else keep_sockalive(cs->fd); return ret; } @@ -213,8 +219,10 @@ static void kill_server(server_instance_t *si) { connsock_t *cs; - if (!si) + if (!si) // This shouldn't happen return; + + LOGNOTICE("Killing server"); cs = &si->cs; close(cs->fd); cs->fd = -1; @@ -348,7 +356,7 @@ retry: send_unix_msg(sockd, "true"); else send_unix_msg(sockd, "false"); - } else if (cmdmatch(buf, "fallback")) { + } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -388,8 +396,8 @@ static bool connect_proxy(connsock_t *cs) { cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { - LOGWARNING("Failed to connect socket to %s:%s in connect_proxy", - cs->url, cs->port); + LOGINFO("Failed to connect socket to %s:%s in connect_proxy", + cs->url, cs->port); return false; } keep_sockalive(cs->fd); @@ -482,6 +490,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Failed to receive line in parse_subscribe"); goto out; } + LOGDEBUG("parse_subscribe received %s", cs->buf); val = json_msg_result(cs->buf, &res_val); if (!val) { LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf); @@ -588,14 +597,12 @@ retry: json_decref(req); if (!ret) { LOGWARNING("Failed to send message in subscribe_stratum"); - close(cs->fd); goto out; } ret = parse_subscribe(cs, proxi); if (ret) goto out; - close(cs->fd); if (proxi->no_params) { LOGWARNING("Failed all subscription options in subscribe_stratum"); goto out; @@ -616,10 +623,10 @@ retry: goto retry; out: - /* Only keep any downstream connections if we're successfully resuming - * to the existing stratum sessionid */ - if (!ret || !proxi->sessionid) - send_proc(proxi->ckp->stratifier, "dropall"); + if (!ret) { + close(cs->fd); + cs->fd = -1; + } return ret; } @@ -723,7 +730,7 @@ static bool parse_notify(proxy_instance_t *proxi, json_t *val) ni->notify_time = time(NULL); mutex_lock(&proxi->notify_lock); - ni->id = proxi->notify_id++; + ni->id = proxy_notify_id++; HASH_ADD_INT(proxi->notify_instances, id, ni); proxi->current_notify = ni; mutex_unlock(&proxi->notify_lock); @@ -1283,24 +1290,30 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging) { + bool ret = false; + + /* Has this proxy already been reconnected? */ + if (cs->fd > 0) + return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); - return false; + return ret; } if (!connect_proxy(cs)) { if (!pinging) { LOGINFO("Failed to connect to %s:%s in proxy_mode!", cs->url, cs->port); } - return false; + return ret; } if (ckp->passthrough) { if (!passthrough_stratum(cs, proxi)) { LOGWARNING("Failed initial passthrough to %s:%s !", cs->url, cs->port); - return false; + goto out; } - return true; + ret = true; + goto out; } /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { @@ -1308,16 +1321,24 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * LOGINFO("Failed initial subscribe to %s:%s !", cs->url, cs->port); } - return false; + goto out; } if (!auth_stratum(cs, proxi)) { if (!pinging) { LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", cs->url, cs->port, si->auth, si->pass); } - return false; + goto out; } - return true; + ret = true; +out: + if (!ret) { + /* Close and invalidate the file handle */ + close(cs->fd); + cs->fd = -1; + } else + keep_sockalive(cs->fd); + return ret; } /* Cycle through the available proxies and find the first alive one */ @@ -1345,8 +1366,8 @@ retry: } } if (!alive) { - send_proc(ckp->stratifier, "dropall"); send_proc(ckp->connector, "reject"); + send_proc(ckp->stratifier, "dropall"); if (!ckp->chosen_server) { LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); sleep(5); @@ -1357,7 +1378,6 @@ retry: cs = alive->cs; LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); - mutex_init(&alive->notify_lock); if (ckp->passthrough) { create_pthread(&alive->pth_precv, passthrough_recv, alive); alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); @@ -1372,15 +1392,21 @@ out: return alive; } -static void kill_proxy(proxy_instance_t *proxi) +static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) { notify_instance_t *ni, *tmp; connsock_t *cs; - if (!proxi) + send_proc(ckp->connector, "reject"); + send_proc(ckp->stratifier, "dropall"); + + if (!proxi) // This shouldn't happen return; + + LOGNOTICE("Killing proxy"); cs = proxi->cs; close(cs->fd); + cs->fd = -1; /* All our notify data is invalid if we reconnect so discard them */ mutex_lock(&proxi->notify_lock); @@ -1404,8 +1430,10 @@ static int proxy_loop(proc_instance_t *pi) char *buf = NULL; reconnect: - if (proxi) + if (proxi) { + kill_proxy(ckp, proxi); reconnecting = true; + } proxi = live_proxy(ckp); if (!proxi) goto out; @@ -1424,6 +1452,9 @@ reconnect: proxi->notified = false; } +retry: + ckmsgq_add(srvchk, proxi->si); + do { selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { @@ -1432,7 +1463,12 @@ reconnect: goto out; } } while (selret < 1); -retry: + + if (unlikely(proxi->cs->fd < 0)) { + LOGWARNING("Upstream socket invalidated, will attempt failover"); + goto reconnect; + } + sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on proxy socket"); @@ -1457,9 +1493,6 @@ retry: } else if (cmdmatch(buf, "getdiff")) { send_diff(proxi, sockd); } else if (cmdmatch(buf, "reconnect")) { - kill_proxy(proxi); - pthread_cancel(proxi->pth_precv); - pthread_cancel(proxi->pth_psend); goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { if (ckp->btcdbackup) { @@ -1470,8 +1503,6 @@ retry: LOGWARNING("Block rejected locally."); } else LOGNOTICE("No backup btcd to send block to ourselves"); - } else if (cmdmatch(buf, "fallback")) { - goto reconnect; } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { @@ -1492,7 +1523,7 @@ retry: close(sockd); goto retry; out: - kill_proxy(proxi); + kill_proxy(ckp, proxi); close(sockd); return ret; } @@ -1542,7 +1573,6 @@ static bool alive_btcd(server_instance_t *si) LOGWARNING("Failed to create base64 auth from btcd %s", userpass); return false; } - cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); return false; @@ -1581,6 +1611,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) proxi->si = si; proxi->ckp = ckp; proxi->cs = &si->cs; + mutex_init(&proxi->notify_lock); } if (ckp->btcds) { @@ -1667,7 +1698,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "fallback"); + send_proc(ckp->generator, "reconnect"); } int generator(proc_instance_t *pi) diff --git a/src/stratifier.c b/src/stratifier.c index 67863df0..bc350efa 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -744,8 +744,7 @@ static bool update_subscribe(ckpool_t *ckp) free(buf); ck_wlock(&workbase_lock); - if (!proxy_base.diff) - proxy_base.diff = 1; + proxy_base.diff = ckp->startdiff; /* Length is checked by generator */ strcpy(proxy_base.enonce1, json_string_value(json_object_get(val, "enonce1"))); proxy_base.enonce1constlen = strlen(proxy_base.enonce1) / 2;