From ca176c8e1d3b0919570189cceda1966a8d664f49 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 24 May 2014 17:09:15 +1000 Subject: [PATCH] Fall over to the first working stratum proxy whenever we are disconnected --- src/generator.c | 321 ++++++++++++++++++++--------------------------- src/stratifier.c | 6 + 2 files changed, 145 insertions(+), 182 deletions(-) diff --git a/src/generator.c b/src/generator.c index 63775138..b98818b2 100644 --- a/src/generator.c +++ b/src/generator.c @@ -759,64 +759,6 @@ static void submit_share(proxy_instance_t *proxi, json_t *val) mutex_unlock(&proxi->psend_lock); } -static int proxy_loop(proc_instance_t *pi, proxy_instance_t *proxi) -{ - unixsock_t *us = &pi->us; - ckpool_t *ckp = pi->ckp; - int sockd, ret = 0; - char *buf = NULL; - - /* We're not subscribed and authorised so tell the stratifier to - * retrieve the first subscription. */ - send_proc(ckp->stratifier, "subscribe"); - send_proc(ckp->stratifier, "notify"); - proxi->notified = false; - -retry: - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - if (interrupted()) - goto retry; - LOGERR("Failed to accept on proxy socket"); - ret = 1; - goto out; - } - dealloc(buf); - buf = recv_unix_msg(sockd); - if (!buf) { - LOGWARNING("Failed to get message in proxy_loop"); - close(sockd); - goto retry; - } - LOGDEBUG("Proxy received request: %s", buf); - if (!strncasecmp(buf, "shutdown", 8)) { - ret = 0; - goto out; - } else if (!strncasecmp(buf, "getsubscribe", 12)) { - send_subscribe(proxi, sockd); - } else if (!strncasecmp(buf, "getnotify", 9)) { - send_notify(proxi, sockd); - } else if (!strncasecmp(buf, "getdiff", 7)) { - send_diff(proxi, sockd); - } else if (!strncasecmp(buf, "ping", 4)) { - LOGDEBUG("Proxy received ping request"); - send_unix_msg(sockd, "pong"); - } else { - /* Anything remaining should be share submissions */ - json_t *val = json_loads(buf, 0, NULL); - - if (!val) - LOGWARNING("Received unrecognised message: %s", buf); - else - submit_share(proxi, val); - } - close(sockd); - goto retry; -out: - close(sockd); - return ret; -} - static void clear_notify(notify_instance_t *ni) { free(ni->jobid); @@ -825,35 +767,8 @@ static void clear_notify(notify_instance_t *ni) free(ni); } -static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi) -{ - notify_instance_t *ni, *tmp; - bool ret = true; - - /* All our notify data is invalid if we reconnect so discard them */ - mutex_lock(&proxi->notify_lock); - HASH_ITER(hh, proxi->notify_instances, ni, tmp) { - HASH_DEL(proxi->notify_instances, ni); - clear_notify(ni); - } - mutex_unlock(&proxi->notify_lock); - - do { - if (!ret) - sleep(5); - close(cs->fd); - ret = connect_proxy(cs); - if (!ret) - continue; - ret = subscribe_stratum(cs, proxi); - if (!ret) - continue; - ret = auth_stratum(cs, proxi); - } while (!ret); -} - /* FIXME: Return something useful to the stratifier based on this result */ -static bool parse_share(ckpool_t *ckp, proxy_instance_t *proxi, const char *buf) +static bool parse_share(proxy_instance_t *proxi, const char *buf) { json_t *val = NULL, *idval; share_msg_t *share; @@ -899,6 +814,9 @@ static void *proxy_recv(void *arg) ckpool_t *ckp = proxi->ckp; rename_proc("proxyrecv"); + /* We don't wait for them to pthread_join in case it takes a while, so + * we detach the threads instead to clean up themselves */ + pthread_detach(pthread_self()); while (42) { notify_instance_t *ni, *tmp; @@ -940,11 +858,10 @@ static void *proxy_recv(void *arg) } while (ret == 0 && ++retries < 24); if (ret < 1) { + /* Send ourselves a reconnect message */ LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - reconnect_stratum(cs, proxi); - send_proc(ckp->stratifier, "subscribe"); - send_proc(ckp->stratifier, "notify"); - continue; + send_proc(ckp->generator, "reconnect"); + break; } if (parse_method(proxi, cs->buf)) { if (proxi->notified) { @@ -957,7 +874,7 @@ static void *proxy_recv(void *arg) } continue; } - if (parse_share(ckp, proxi, cs->buf)) { + if (parse_share(proxi, cs->buf)) { continue; } /* If it's not a method it should be a share result */ @@ -973,6 +890,7 @@ static void *proxy_send(void *arg) connsock_t *cs = proxi->cs; rename_proc("proxysend"); + pthread_detach(pthread_self()); while (42) { notify_instance_t *ni; @@ -1024,61 +942,144 @@ static void *proxy_send(void *arg) return NULL; } -#if 0 -static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, - const char *auth, const char *pass) +/* Cycle through the available proxies and find the first alive one */ +static proxy_instance_t *live_proxy(ckpool_t *ckp) { - proxy_instance_t proxi; - int ret = 1; + proxy_instance_t *alive = NULL; + int i; - memset(&proxi, 0, sizeof(proxi)); - proxi.ckp = ckp; - proxi.cs = cs; + LOGDEBUG("Attempting to connect to proxy"); +retry: + for (i = 0; i < ckp->proxies; i++) { + proxy_instance_t *proxi; + server_instance_t *si; + connsock_t *cs; - if (!connect_proxy(cs)) { - LOGWARNING("FATAL: Failed to connect to %s:%s in proxy_mode!", - cs->url, cs->port); - goto out; + si = ckp->servers[i]; + proxi = si->data; + cs = proxi->cs; + if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { + LOGWARNING("Failed to extract address from %s", si->url); + continue; + } + if (!connect_proxy(cs)) { + LOGINFO("Failed to connect to %s:%s in proxy_mode!", + cs->url, cs->port); + continue; + } + /* Test we can connect, authorise and get stratum information */ + if (!subscribe_stratum(cs, proxi)) { + LOGINFO("Failed initial subscribe to %s:%s !", + cs->url, cs->port); + continue; + } + if (!auth_stratum(cs, proxi)) { + LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", + cs->url, cs->port, si->auth, si->pass); + continue; + } + alive = proxi; + break; } - - /* Test we can connect, authorise and get stratum information */ - if (!subscribe_stratum(cs, &proxi)) { - LOGWARNING("FATAL: Failed initial subscribe to %s:%s !", - cs->url, cs->port); - goto out; + if (!alive) { + LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); + sleep(5); + goto retry; } + LOGNOTICE("Connected to upstream server %s:%s as proxy", alive->cs->url, alive->cs->port); + mutex_init(&alive->notify_lock); + create_pthread(&alive->pth_precv, proxy_recv, alive); + mutex_init(&alive->psend_lock); + cond_init(&alive->psend_cond); + create_pthread(&alive->pth_psend, proxy_send, alive); + return alive; +} - proxi.auth = auth; - proxi.pass = pass; - if (!auth_stratum(cs, &proxi)) { - LOGWARNING("FATAL: Failed initial authorise to %s:%s with %s:%s !", - cs->url, cs->port, auth, pass); - goto out; +static void kill_proxy(proxy_instance_t *proxi) +{ + notify_instance_t *ni, *tmp; + connsock_t *cs = proxi->cs; + + close(cs->fd); + + /* All our notify data is invalid if we reconnect so discard them */ + mutex_lock(&proxi->notify_lock); + HASH_ITER(hh, proxi->notify_instances, ni, tmp) { + HASH_DEL(proxi->notify_instances, ni); + clear_notify(ni); } + mutex_unlock(&proxi->notify_lock); - mutex_init(&proxi.notify_lock); - create_pthread(&proxi.pth_precv, proxy_recv, &proxi); - mutex_init(&proxi.psend_lock); - cond_init(&proxi.psend_cond); - create_pthread(&proxi.pth_psend, proxy_send, &proxi); + pthread_cancel(proxi->pth_precv); + pthread_cancel(proxi->pth_psend); +} - ret = proxy_loop(pi, &proxi); +static int proxy_loop(proc_instance_t *pi) +{ + unixsock_t *us = &pi->us; + ckpool_t *ckp = pi->ckp; + proxy_instance_t *proxi; + int sockd, ret = 0; + char *buf = NULL; - /* Return from the proxy loop means we have received a shutdown - * request */ - pthread_cancel(proxi.pth_precv); - pthread_cancel(proxi.pth_psend); - join_pthread(proxi.pth_precv); - join_pthread(proxi.pth_psend); -out: - close(cs->fd); - free(proxi.enonce1); - free(proxi.enonce1bin); - free(proxi.sessionid); +reconnect: + proxi = live_proxy(ckp); + /* We've just subscribed and authorised so tell the stratifier to + * retrieve the first subscription. */ + send_proc(ckp->stratifier, "subscribe"); + send_proc(ckp->stratifier, "notify"); + proxi->notified = false; + +retry: + sockd = accept(us->sockd, NULL, NULL); + if (sockd < 0) { + if (interrupted()) + goto retry; + LOGERR("Failed to accept on proxy socket"); + ret = 1; + goto out; + } + dealloc(buf); + buf = recv_unix_msg(sockd); + if (!buf) { + LOGWARNING("Failed to get message in proxy_loop"); + close(sockd); + goto retry; + } + LOGDEBUG("Proxy received request: %s", buf); + if (!strncasecmp(buf, "shutdown", 8)) { + ret = 0; + goto out; + } else if (!strncasecmp(buf, "getsubscribe", 12)) { + send_subscribe(proxi, sockd); + } else if (!strncasecmp(buf, "getnotify", 9)) { + send_notify(proxi, sockd); + } else if (!strncasecmp(buf, "getdiff", 7)) { + send_diff(proxi, sockd); + } else if (!strncasecmp(buf, "reconnect", 9)) { + kill_proxy(proxi); + pthread_cancel(proxi->pth_precv); + pthread_cancel(proxi->pth_psend); + goto reconnect; + } else if (!strncasecmp(buf, "ping", 4)) { + LOGDEBUG("Proxy received ping request"); + send_unix_msg(sockd, "pong"); + } else { + /* Anything remaining should be share submissions */ + json_t *val = json_loads(buf, 0, NULL); + if (!val) + LOGWARNING("Received unrecognised message: %s", buf); + else + submit_share(proxi, val); + } + close(sockd); + goto retry; +out: + kill_proxy(proxi); + close(sockd); return ret; } -#endif /* FIXME: Make these use multiple BTCDs instead of just first alive. */ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) @@ -1153,74 +1154,30 @@ out: static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { - int i, ret = 1, alive = 0; proxy_instance_t *proxi; server_instance_t *si; - connsock_t *cs; + int i, ret = 1; + /* Create all our proxy structures and pointers */ ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); for (i = 0; i < ckp->proxies; i++) { + ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); si = ckp->servers[i]; - cs = &si->cs; si->url = ckp->proxyurl[i]; si->auth = ckp->proxyauth[i]; si->pass = ckp->proxypass[i]; - if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { - LOGWARNING("Failed to extract address from %s", si->url); - continue; - } - if (!connect_proxy(cs)) { - LOGWARNING("Failed to connect to %s:%s in proxy_mode!", - cs->url, cs->port); - continue; - } proxi = ckzalloc(sizeof(proxy_instance_t)); si->data = proxi; - /* Test we can connect, authorise and get stratum information */ - if (!subscribe_stratum(cs, proxi)) { - LOGWARNING("Failed initial subscribe to %s:%s !", - cs->url, cs->port); - continue; - } proxi->auth = si->auth; proxi->pass = si->pass; - if (!auth_stratum(cs, proxi)) { - LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", - cs->url, cs->port, si->auth, si->pass); - continue; - } proxi->si = si; proxi->ckp = ckp; - proxi->cs = cs; - si->alive = true; - alive++; + proxi->cs = &si->cs; } - if (!alive) { - LOGEMERG("FATAL: No proxied servers active!"); - goto out; - } - for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) { - if (si->alive) - break; - } - proxi = si->data; - - mutex_init(&proxi->notify_lock); - create_pthread(&proxi->pth_precv, proxy_recv, proxi); - mutex_init(&proxi->psend_lock); - cond_init(&proxi->psend_cond); - create_pthread(&proxi->pth_psend, proxy_send, proxi); - ret = proxy_loop(pi, proxi); + ret = proxy_loop(pi); - /* Return from the proxy loop means we have received a shutdown - * request */ - pthread_cancel(proxi->pth_precv); - pthread_cancel(proxi->pth_psend); - join_pthread(proxi->pth_precv); - join_pthread(proxi->pth_psend); -out: for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) { close(si->cs.fd); proxi = si->data; diff --git a/src/stratifier.c b/src/stratifier.c index b5300b41..8a8f0d3d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2043,6 +2043,7 @@ int stratifier(proc_instance_t *pi) pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_statsupdate; ckpool_t *ckp = pi->ckp; + char *buf; int ret; /* Store this for use elsewhere */ @@ -2054,6 +2055,11 @@ int stratifier(proc_instance_t *pi) * id on restarts */ blockchange_id = workbase_id = ((int64_t)time(NULL)) << 32; + /* Wait for the generator to have something for us */ + do { + buf = send_recv_proc(ckp->generator, "ping"); + } while (!buf); + cklock_init(&instance_lock); mutex_init(&stratum_recv_lock);