diff --git a/src/generator.c b/src/generator.c index 77782403..d932826e 100644 --- a/src/generator.c +++ b/src/generator.c @@ -75,8 +75,7 @@ typedef struct proxy_instance proxy_instance_t; /* Per proxied pool instance data */ struct proxy_instance { - proxy_instance_t *next; - proxy_instance_t *prev; + UT_hash_handle hh; ckpool_t *ckp; connsock_t *cs; @@ -104,6 +103,7 @@ struct proxy_instance { bool notified; /* Received new template for work */ bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ + bool alive; pthread_mutex_t notify_lock; notify_instance_t *notify_instances; @@ -128,7 +128,7 @@ struct proxy_instance { /* Private data for the generator */ struct generator_data { pthread_mutex_t lock; /* Lock protecting linked lists */ - proxy_instance_t *proxy_list; /* Linked list of all active proxies */ + proxy_instance_t *proxies; /* Hash list of all proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -878,6 +878,8 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val) return ret; } +static void prepare_proxy(proxy_instance_t *proxi); + static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; @@ -930,6 +932,7 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) newsi = ckzalloc(sizeof(server_instance_t)); mutex_lock(&gdata->lock); + HASH_DEL(gdata->proxies, proxi); newsi->id = si->id; /* Inherit the old connection's id */ si->id = ckp->proxies++; /* Give the old connection the lowest id */ ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies); @@ -946,7 +949,12 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) newproxi->si = newsi; newproxi->ckp = ckp; newproxi->cs = &newsi->cs; + newproxi->id = newsi->id; + HASH_ADD_INT(gdata->proxies, id, proxi); + HASH_ADD_INT(gdata->proxies, id, newproxi); mutex_unlock(&gdata->lock); + + prepare_proxy(newproxi); out: return ret; } @@ -1248,85 +1256,6 @@ out: return ret; } -static void *proxy_recv(void *arg) -{ - proxy_instance_t *proxi = (proxy_instance_t *)arg; - connsock_t *cs = proxi->cs; - ckpool_t *ckp = proxi->ckp; - - rename_proc("proxyrecv"); - - while (42) { - notify_instance_t *ni, *tmp; - share_msg_t *share, *tmpshare; - int retries = 0, ret; - time_t now; - - now = time(NULL); - - /* Age old notifications older than 10 mins old */ - mutex_lock(&proxi->notify_lock); - HASH_ITER(hh, proxi->notify_instances, ni, tmp) { - if (HASH_COUNT(proxi->notify_instances) < 3) - break; - if (ni->notify_time < now - 600) { - HASH_DEL(proxi->notify_instances, ni); - clear_notify(ni); - } - } - mutex_unlock(&proxi->notify_lock); - - /* Similary with shares older than 2 mins without response */ - mutex_lock(&proxi->share_lock); - HASH_ITER(hh, proxi->shares, share, tmpshare) { - if (share->submit_time < now - 120) { - HASH_DEL(proxi->shares, share); - } - } - mutex_unlock(&proxi->share_lock); - - /* If we don't get an update within 10 minutes the upstream pool - * has likely stopped responding. */ - do { - if (cs->fd == -1) { - ret = -1; - break; - } - ret = read_socket_line(cs, 5); - } while (ret == 0 && ++retries < 120); - - if (ret < 1) { - /* Send ourselves a reconnect message */ - LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - send_proc(ckp->generator, "reconnect"); - break; - } - if (parse_method(proxi, cs->buf)) { - if (proxi->notified) { - send_proc(ckp->stratifier, "notify"); - proxi->notified = false; - } - if (proxi->diffed) { - send_proc(ckp->stratifier, "diff"); - proxi->diffed = false; - } - if (proxi->reconnect) { - proxi->reconnect = false; - LOGWARNING("Reconnect issue, dropping existing connection"); - send_proc(ckp->generator, "reconnect"); - break; - } - continue; - } - if (parse_share(proxi, cs->buf)) { - continue; - } - /* If it's not a method it should be a share result */ - LOGWARNING("Unhandled stratum message: %s", cs->buf); - } - return NULL; -} - /* For processing and sending shares */ static void *proxy_send(void *arg) { @@ -1384,36 +1313,6 @@ static void *proxy_send(void *arg) return NULL; } -/* For receiving messages from an upstream pool to pass downstream */ -static void *passthrough_recv(void *arg) -{ - proxy_instance_t *proxi = (proxy_instance_t *)arg; - connsock_t *cs = proxi->cs; - ckpool_t *ckp = proxi->ckp; - - rename_proc("passrecv"); - - while (42) { - int ret; - - do { - ret = read_socket_line(cs, 60); - } while (ret == 0); - - if (ret < 1) { - /* Send ourselves a reconnect message */ - LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - send_proc(ckp->generator, "reconnect"); - break; - } - /* Simply forward the message on, as is, to the connector to - * process. Possibly parse parameters sent by upstream pool - * here */ - send_proc(ckp->connector, cs->buf); - } - return NULL; -} - static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) { int len, sent; @@ -1491,124 +1390,238 @@ out: return ret; } -/* Cycle through the available proxies and find the first alive one */ -static proxy_instance_t *live_proxy(ckpool_t *ckp) +/* For receiving messages from an upstream pool to pass downstream. Responsible + * for setting up the connection and testing pool is live. */ +static void *passthrough_recv(void *arg) { - proxy_instance_t *alive = NULL; - gdata_t *gdata = ckp->data; - connsock_t *cs; - int i, srvs; + proxy_instance_t *proxi = (proxy_instance_t *)arg; + server_instance_t *si = proxi->si; + connsock_t *cs = proxi->cs; + ckpool_t *ckp = proxi->ckp; - LOGDEBUG("Attempting to connect to proxy"); -retry: - if (!ping_main(ckp)) - goto out; + rename_proc("passrecv"); - mutex_lock(&gdata->lock); - srvs = ckp->proxies; - mutex_unlock(&gdata->lock); + if (proxy_alive(ckp, si, proxi, cs, false)) { + proxi->alive = true; + send_proc(ckp->generator, "reconnect"); + LOGWARNING("Proxy %d:%s connection established", + proxi->id, proxi->si->url); + } - for (i = 0; i < srvs; i++) { - proxy_instance_t *proxi; - server_instance_t *si; + while (42) { + int ret; - mutex_lock(&gdata->lock); - si = ckp->servers[i]; - proxi = si->data; - mutex_unlock(&gdata->lock); + while (!proxy_alive(ckp, si, proxi, cs, true)) { + if (proxi->alive) { + proxi->alive = false; + send_proc(ckp->generator, "reconnect"); + } + sleep(5); + } + if (!proxi->alive) { + proxi->alive = true; + send_proc(ckp->generator, "reconnect"); + } - cs = proxi->cs; - if (proxy_alive(ckp, si, proxi, cs, false)) { - alive = proxi; - break; + do { + ret = read_socket_line(cs, 60); + } while (ret == 0); + + if (ret < 1) { + LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); + continue; } + /* Simply forward the message on, as is, to the connector to + * process. Possibly parse parameters sent by upstream pool + * here */ + send_proc(ckp->connector, cs->buf); } - if (!alive) { - send_proc(ckp->connector, "reject"); - send_proc(ckp->stratifier, "dropall"); - LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); - sleep(5); - goto retry; + return NULL; +} + +/* For receiving messages from the upstream proxy, also responsible for setting + * up the connection and testing it's alive. */ +static void *proxy_recv(void *arg) +{ + proxy_instance_t *proxi = (proxy_instance_t *)arg; + server_instance_t *si = proxi->si; + connsock_t *cs = proxi->cs; + ckpool_t *ckp = proxi->ckp; + + rename_proc("proxyrecv"); + + if (proxy_alive(ckp, si, proxi, cs, false)) { + proxi->alive = true; + send_proc(ckp->generator, "reconnect"); + LOGWARNING("Proxy %d:%s connection established", + proxi->id, proxi->si->url); } - cs = alive->cs; - LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, - ckp->passthrough ? " in passthrough mode" : ""); - if (ckp->passthrough) { - create_pthread(&alive->pth_precv, passthrough_recv, alive); - alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); - } else { - create_pthread(&alive->pth_precv, proxy_recv, alive); - mutex_init(&alive->psend_lock); - cond_init(&alive->psend_cond); - create_pthread(&alive->pth_psend, proxy_send, alive); + while (42) { + notify_instance_t *ni, *tmp; + share_msg_t *share, *tmpshare; + int retries = 0, ret; + time_t now; + + while (!proxy_alive(ckp, si, proxi, cs, true)) { + if (proxi->alive) { + proxi->alive = false; + send_proc(ckp->generator, "reconnect"); + } + sleep(5); + } + if (!proxi->alive) { + proxi->alive = true; + send_proc(ckp->generator, "reconnect"); + } + + now = time(NULL); + + /* Age old notifications older than 10 mins old */ + mutex_lock(&proxi->notify_lock); + HASH_ITER(hh, proxi->notify_instances, ni, tmp) { + if (HASH_COUNT(proxi->notify_instances) < 3) + break; + if (ni->notify_time < now - 600) { + HASH_DEL(proxi->notify_instances, ni); + clear_notify(ni); + } + } + mutex_unlock(&proxi->notify_lock); + + /* Similary with shares older than 2 mins without response */ + mutex_lock(&proxi->share_lock); + HASH_ITER(hh, proxi->shares, share, tmpshare) { + if (share->submit_time < now - 120) { + HASH_DEL(proxi->shares, share); + } + } + mutex_unlock(&proxi->share_lock); + + /* If we don't get an update within 10 minutes the upstream pool + * has likely stopped responding. */ + do { + if (cs->fd == -1) { + ret = -1; + break; + } + ret = read_socket_line(cs, 5); + } while (ret == 0 && ++retries < 120); + + if (ret < 1) { + /* Send ourselves a reconnect message */ + LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); + continue; + } + if (parse_method(proxi, cs->buf)) { + if (proxi->notified) { + send_proc(ckp->stratifier, "notify"); + proxi->notified = false; + } + if (proxi->diffed) { + send_proc(ckp->stratifier, "diff"); + proxi->diffed = false; + } + if (proxi->reconnect) { + proxi->reconnect = false; + LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", + proxi->id, proxi->si->url); + send_proc(ckp->generator, "reconnect"); + break; + } + continue; + } + if (parse_share(proxi, cs->buf)) { + continue; + } + /* If it's not a method it should be a share result */ + LOGWARNING("Unhandled stratum message: %s", cs->buf); } -out: - send_proc(ckp->connector, alive ? "accept" : "reject"); - return alive; + return NULL; } -static void kill_proxy(proxy_instance_t *proxi) +static void prepare_proxy(proxy_instance_t *proxi) { - notify_instance_t *ni, *tmp; - connsock_t *cs; + mutex_init(&proxi->psend_lock); + cond_init(&proxi->psend_cond); + create_pthread(&proxi->pth_psend, proxy_send, proxi); + create_pthread(&proxi->pth_precv, proxy_recv, proxi); +} - if (!proxi) // This shouldn't happen - return; +static void setup_proxies(ckpool_t *ckp, gdata_t *gdata) +{ + int i; - LOGNOTICE("Killing proxy connection to %s", proxi->si->url); - cs = proxi->cs; - Close(cs->fd); - empty_buffer(cs); + for (i = 0; i < ckp->proxies; i++) { + proxy_instance_t *proxi; + server_instance_t *si; - /* All our notify data is invalid if we reconnect so discard them */ - mutex_lock(&proxi->notify_lock); - HASH_ITER(hh, proxi->notify_instances, ni, tmp) { - HASH_DEL(proxi->notify_instances, ni); - clear_notify(ni); + si = ckp->servers[i]; + proxi = si->data; + proxi->id = i; + HASH_ADD_INT(gdata->proxies, id, proxi); + if (ckp->passthrough) { + create_pthread(&proxi->pth_precv, passthrough_recv, proxi); + proxi->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); + } else { + prepare_proxy(proxi); + } } - mutex_unlock(&proxi->notify_lock); +} + +static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata) +{ + proxy_instance_t *ret = NULL, *proxi, *tmp; - pthread_cancel(proxi->pth_precv); - pthread_cancel(proxi->pth_psend); + while (42) { + if (!ping_main(ckp)) + break; + HASH_ITER(hh, gdata->proxies, proxi, tmp) { + if (proxi->alive) { + if (!ret) { + ret = proxi; + continue; + } + if (proxi->id < ret->id) + ret = proxi; + } + } + if (ret) + break; + sleep(1); + } + send_proc(ckp->connector, ret ? "accept" : "reject"); + return ret; } static int proxy_loop(proc_instance_t *pi) { + proxy_instance_t *proxi = NULL, *cproxy; int sockd = -1, ret = 0, selret; - proxy_instance_t *proxi = NULL; - bool reconnecting = false; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; gdata_t *gdata = ckp->data; char *buf = NULL; + setup_proxies(ckp, gdata); reconnect: - if (proxi) { - kill_proxy(proxi); - reconnecting = true; - } - proxi = live_proxy(ckp); - if (!proxi) + /* This does not necessarily mean we reconnect, but a change has + * occurred and we need to reexamine the proxies. */ + cproxy = current_proxy(ckp, gdata); + if (!cproxy) goto out; - if (reconnecting) { - reconnecting = false; - connsock_t *cs = proxi->cs; - LOGWARNING("Successfully reconnected to %s:%s as proxy", - cs->url, cs->port); - } - - /* We've just subscribed and authorised so tell the stratifier to - * retrieve the first subscription. */ - if (!ckp->passthrough) { - send_proc(ckp->stratifier, "subscribe"); - send_proc(ckp->stratifier, "notify"); - proxi->notified = false; + if (proxi != cproxy) { + proxi = cproxy; + if (!ckp->passthrough) { + connsock_t *cs = proxi->cs; + LOGWARNING("Successfully connected to %s:%s as proxy", + cs->url, cs->port); + /* Sending subscribe implies stratifier will also do a notify */ + send_proc(ckp->stratifier, "subscribe"); + proxi->notified = false; + } } - retry: - ckmsgq_add(gdata->srvchk, proxi->si); - do { selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { @@ -1670,7 +1683,6 @@ retry: Close(sockd); goto retry; out: - kill_proxy(proxi); Close(sockd); return ret; } @@ -1793,53 +1805,6 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) send_proc(ckp->generator, "reconnect"); } -static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) -{ - gdata_t *gdata = ckp->data; - static time_t last_t = 0; - bool alive = false; - time_t now_t; - int i, srvs; - - /* Rate limit to checking only once every 5 seconds */ - now_t = time(NULL); - if (now_t <= last_t + 5) - return; - - last_t = now_t; - - /* Is this the highest priority server already? */ - if (!cursi->id) - return; - - mutex_lock(&gdata->lock); - srvs = ckp->proxies; - mutex_unlock(&gdata->lock); - - for (i = 0; i < srvs; i++) { - proxy_instance_t *proxi; - server_instance_t *si; - connsock_t *cs; - - mutex_lock(&gdata->lock); - si = ckp->servers[i]; - proxi = si->data; - mutex_unlock(&gdata->lock); - - /* Have we reached the current server? */ - if (si == cursi) - return; - - cs = proxi->cs; - alive = proxy_alive(ckp, si, proxi, cs, true); - if (alive) - break; - } - if (alive) - send_proc(ckp->generator, "reconnect"); -} - - int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -1850,7 +1815,6 @@ int generator(proc_instance_t *pi) gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; if (ckp->proxy) { - gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); } else { gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); diff --git a/src/stratifier.c b/src/stratifier.c index dba09bc2..b0b0ebc1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1006,6 +1006,7 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id) return proxy; } +static void update_notify(ckpool_t *ckp); static void reconnect_clients(sdata_t *sdata, const char *cmd); static void update_subscribe(ckpool_t *ckp) @@ -1068,6 +1069,8 @@ static void update_subscribe(ckpool_t *ckp) proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8)); json_decref(val); + /* Notify implied required now too */ + update_notify(ckp); reconnect_clients(sdata, ""); }