From c295d6a8d4f2ec7b2d964a9862f848c885015b92 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 16 Feb 2015 21:46:44 +1100 Subject: [PATCH] Do a soft failover to backup proxies, not disconnecting them unless the upstream subproxy no longer exists, but a hard failover to higher priority proxies --- src/generator.c | 46 +++++++++++++++++++-------------------- src/stratifier.c | 56 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/src/generator.c b/src/generator.c index 36fd8218..7612caa3 100644 --- a/src/generator.c +++ b/src/generator.c @@ -957,6 +957,22 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst recruit_subproxy(gdata, proxi); } +/* If the parent is no longer in use due to reconnect, we shouldn't use any of + * the child subproxies. */ +static void drop_subproxies(proxy_instance_t *proxi) +{ + proxy_instance_t *subproxy, *tmp; + + mutex_lock(&proxi->proxy_lock); + HASH_ITER(sh, proxi->subproxies, subproxy, tmp) { + if (!parent_proxy(subproxy)) { + subproxy->disabled = true; + Close(subproxy->cs->fd); + } + } + mutex_unlock(&proxi->proxy_lock); +} + static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; @@ -1037,6 +1053,7 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) /* Old proxy memory is basically lost here */ prepare_proxy(newproxi); + drop_subproxies(proxi); out: return ret; } @@ -1293,11 +1310,11 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int subid return subproxy; } -static void stratifier_drop_client(ckpool_t *ckp, int64_t id) +static void stratifier_reconnect_client(ckpool_t *ckp, int64_t id) { char buf[256]; - sprintf(buf, "dropclient=%"PRId64, id); + sprintf(buf, "reconnclient=%"PRId64, id); send_proc(ckp->stratifier, buf); } @@ -1317,20 +1334,20 @@ static void submit_share(gdata_t *gdata, json_t *val) proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { LOGWARNING("Failed to find proxy %d to send share to", id); - stratifier_drop_client(ckp, client_id); + stratifier_reconnect_client(ckp, client_id); return json_decref(val); } json_get_int(&subid, val, "subproxy"); proxi = subproxy_by_id(proxy, subid); if (unlikely(!proxi)) { LOGNOTICE("Failed to find proxy %d:%d to send share to", id, subid); - stratifier_drop_client(ckp, client_id); + stratifier_reconnect_client(ckp, client_id); return json_decref(val); } if (!proxi->alive) { LOGNOTICE("Client %"PRId64" attempting to send shares to dead proxy %d:%d, dropping", client_id, id, subid); - stratifier_drop_client(ckp, client_id); + stratifier_reconnect_client(ckp, client_id); return json_decref(val); } @@ -1516,22 +1533,6 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) ckmsgq_add(proxi->passsends, pm); } -/* If the parent is dead, we shouldn't use any of the child subproxies to be - * used. */ -static void drop_subproxies(proxy_instance_t *proxi) -{ - proxy_instance_t *subproxy, *tmp; - - mutex_lock(&proxi->proxy_lock); - HASH_ITER(sh, proxi->subproxies, subproxy, tmp) { - if (!parent_proxy(subproxy)) { - subproxy->disabled = true; - Close(subproxy->cs->fd); - } - } - mutex_unlock(&proxi->proxy_lock); -} - static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging, int epfd) { @@ -1595,8 +1596,6 @@ out: } } proxi->alive = ret; - if (!ret && parent_proxy(proxi)) - drop_subproxies(proxi); return ret; } @@ -1938,7 +1937,6 @@ retry: if (unlikely(proxi->cs->fd < 0)) { LOGWARNING("Upstream proxy %d:%s socket invalidated, will attempt failover", proxi->id, proxi->cs->url); - drop_subproxies(proxi); proxi->alive = false; proxi = NULL; goto reconnect; diff --git a/src/stratifier.c b/src/stratifier.c index f95d14f8..1efb0862 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1111,7 +1111,9 @@ static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid) } /* Iterates over all clients in proxy mode and sets the reconnect bool for the - * message to be sent lazily next time they speak to us */ + * message to be sent lazily next time they speak to us only if the proxy is + * higher priority than the one they're currently connected to or the notify_id + * on their proxy has changed indicating a new subscription. */ static void reconnect_clients(sdata_t *sdata, const int proxyid, const int64_t notify_id) { stratum_instance_t *client, *tmp; @@ -1120,7 +1122,7 @@ static void reconnect_clients(sdata_t *sdata, const int proxyid, const int64_t n ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { - if (client->proxyid != proxyid || client->notify_id != notify_id) + if (client->proxyid > proxyid || (client->proxyid == proxyid && client->notify_id != notify_id)) client->reconnect = true; } ck_runlock(&sdata->instance_lock); @@ -1953,6 +1955,34 @@ static void set_proxy(sdata_t *sdata, const char *buf) reconnect_clients(sdata, proxy->id, proxy->notify_id); } +/* Send a single client a reconnect request, setting the time we sent the + * request so we can drop the client lazily if it hasn't reconnected on its + * own one minute later */ +static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) +{ + json_t *json_msg; + + client->reconnect = false; + client->reconnect_request = time(NULL); + JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", + "params"); + stratum_add_send(sdata, json_msg, client->id); +} + +static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) +{ + stratum_instance_t *client; + + client = ref_instance_by_id(sdata, client_id); + if (!client) { + LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id); + return; + } + LOGNOTICE("Reconnecting client %"PRId64, client_id); + reconnect_client(sdata, client); + dec_instance_ref(sdata, client); +} + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; @@ -2051,6 +2081,14 @@ retry: LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); else drop_client(ckp, sdata, client_id); + } else if (cmdmatch(buf, "reconnclient")) { + int64_t client_id; + + ret = sscanf(buf, "reconnclient=%"PRId64, &client_id); + if (ret < 0) + LOGDEBUG("Stratifier failed to parse reconnclient command: %s", buf); + else + reconnect_client_id(sdata, client_id); } else if (cmdmatch(buf, "dropall")) { drop_allclients(ckp); } else if (cmdmatch(buf, "block")) { @@ -3739,20 +3777,6 @@ out: free_smsg(msg); } -/* Send a single client a reconnect request, setting the time we sent the - * request so we can drop the client lazily if it hasn't reconnected on its - * own one minute later */ -static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) -{ - json_t *json_msg; - - client->reconnect = false; - client->reconnect_request = time(NULL); - JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", - "params"); - stratum_add_send(sdata, json_msg, client->id); -} - static void srecv_process(ckpool_t *ckp, char *buf) { bool noid = false, dropped = false;