Browse Source

Do a soft failover to backup proxies, not disconnecting them unless the upstream subproxy no longer exists, but a hard failover to higher priority proxies

master
Con Kolivas 10 years ago
parent
commit
c295d6a8d4
  1. 46
      src/generator.c
  2. 56
      src/stratifier.c

46
src/generator.c

@ -957,6 +957,22 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
recruit_subproxy(gdata, proxi); recruit_subproxy(gdata, proxi);
} }
/* If the parent is no longer in use due to reconnect, we shouldn't use any of
* the child subproxies. */
static void drop_subproxies(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy, *tmp;
mutex_lock(&proxi->proxy_lock);
HASH_ITER(sh, proxi->subproxies, subproxy, tmp) {
if (!parent_proxy(subproxy)) {
subproxy->disabled = true;
Close(subproxy->cs->fd);
}
}
mutex_unlock(&proxi->proxy_lock);
}
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{ {
server_instance_t *newsi, *si = proxi->si; server_instance_t *newsi, *si = proxi->si;
@ -1037,6 +1053,7 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
/* Old proxy memory is basically lost here */ /* Old proxy memory is basically lost here */
prepare_proxy(newproxi); prepare_proxy(newproxi);
drop_subproxies(proxi);
out: out:
return ret; return ret;
} }
@ -1293,11 +1310,11 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int subid
return subproxy; return subproxy;
} }
static void stratifier_drop_client(ckpool_t *ckp, int64_t id) static void stratifier_reconnect_client(ckpool_t *ckp, int64_t id)
{ {
char buf[256]; char buf[256];
sprintf(buf, "dropclient=%"PRId64, id); sprintf(buf, "reconnclient=%"PRId64, id);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
@ -1317,20 +1334,20 @@ static void submit_share(gdata_t *gdata, json_t *val)
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGWARNING("Failed to find proxy %d to send share to", id); LOGWARNING("Failed to find proxy %d to send share to", id);
stratifier_drop_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
proxi = subproxy_by_id(proxy, subid); proxi = subproxy_by_id(proxy, subid);
if (unlikely(!proxi)) { if (unlikely(!proxi)) {
LOGNOTICE("Failed to find proxy %d:%d to send share to", id, subid); LOGNOTICE("Failed to find proxy %d:%d to send share to", id, subid);
stratifier_drop_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
if (!proxi->alive) { if (!proxi->alive) {
LOGNOTICE("Client %"PRId64" attempting to send shares to dead proxy %d:%d, dropping", LOGNOTICE("Client %"PRId64" attempting to send shares to dead proxy %d:%d, dropping",
client_id, id, subid); client_id, id, subid);
stratifier_drop_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
@ -1516,22 +1533,6 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
ckmsgq_add(proxi->passsends, pm); ckmsgq_add(proxi->passsends, pm);
} }
/* If the parent is dead, we shouldn't use any of the child subproxies to be
* used. */
static void drop_subproxies(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy, *tmp;
mutex_lock(&proxi->proxy_lock);
HASH_ITER(sh, proxi->subproxies, subproxy, tmp) {
if (!parent_proxy(subproxy)) {
subproxy->disabled = true;
Close(subproxy->cs->fd);
}
}
mutex_unlock(&proxi->proxy_lock);
}
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging, int epfd) connsock_t *cs, bool pinging, int epfd)
{ {
@ -1595,8 +1596,6 @@ out:
} }
} }
proxi->alive = ret; proxi->alive = ret;
if (!ret && parent_proxy(proxi))
drop_subproxies(proxi);
return ret; return ret;
} }
@ -1938,7 +1937,6 @@ retry:
if (unlikely(proxi->cs->fd < 0)) { if (unlikely(proxi->cs->fd < 0)) {
LOGWARNING("Upstream proxy %d:%s socket invalidated, will attempt failover", LOGWARNING("Upstream proxy %d:%s socket invalidated, will attempt failover",
proxi->id, proxi->cs->url); proxi->id, proxi->cs->url);
drop_subproxies(proxi);
proxi->alive = false; proxi->alive = false;
proxi = NULL; proxi = NULL;
goto reconnect; goto reconnect;

56
src/stratifier.c

@ -1111,7 +1111,9 @@ static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid)
} }
/* Iterates over all clients in proxy mode and sets the reconnect bool for the /* Iterates over all clients in proxy mode and sets the reconnect bool for the
* message to be sent lazily next time they speak to us */ * message to be sent lazily next time they speak to us only if the proxy is
* higher priority than the one they're currently connected to or the notify_id
* on their proxy has changed indicating a new subscription. */
static void reconnect_clients(sdata_t *sdata, const int proxyid, const int64_t notify_id) static void reconnect_clients(sdata_t *sdata, const int proxyid, const int64_t notify_id)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
@ -1120,7 +1122,7 @@ static void reconnect_clients(sdata_t *sdata, const int proxyid, const int64_t n
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != proxyid || client->notify_id != notify_id) if (client->proxyid > proxyid || (client->proxyid == proxyid && client->notify_id != notify_id))
client->reconnect = true; client->reconnect = true;
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
@ -1953,6 +1955,34 @@ static void set_proxy(sdata_t *sdata, const char *buf)
reconnect_clients(sdata, proxy->id, proxy->notify_id); reconnect_clients(sdata, proxy->id, proxy->notify_id);
} }
/* Send a single client a reconnect request, setting the time we sent the
* request so we can drop the client lazily if it hasn't reconnected on its
* own one minute later */
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{
json_t *json_msg;
client->reconnect = false;
client->reconnect_request = time(NULL);
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
"params");
stratum_add_send(sdata, json_msg, client->id);
}
static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
{
stratum_instance_t *client;
client = ref_instance_by_id(sdata, client_id);
if (!client) {
LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id);
return;
}
LOGNOTICE("Reconnecting client %"PRId64, client_id);
reconnect_client(sdata, client);
dec_instance_ref(sdata, client);
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret = 0; int sockd, ret = 0, selret = 0;
@ -2051,6 +2081,14 @@ retry:
LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf);
else else
drop_client(ckp, sdata, client_id); drop_client(ckp, sdata, client_id);
} else if (cmdmatch(buf, "reconnclient")) {
int64_t client_id;
ret = sscanf(buf, "reconnclient=%"PRId64, &client_id);
if (ret < 0)
LOGDEBUG("Stratifier failed to parse reconnclient command: %s", buf);
else
reconnect_client_id(sdata, client_id);
} else if (cmdmatch(buf, "dropall")) { } else if (cmdmatch(buf, "dropall")) {
drop_allclients(ckp); drop_allclients(ckp);
} else if (cmdmatch(buf, "block")) { } else if (cmdmatch(buf, "block")) {
@ -3739,20 +3777,6 @@ out:
free_smsg(msg); free_smsg(msg);
} }
/* Send a single client a reconnect request, setting the time we sent the
* request so we can drop the client lazily if it hasn't reconnected on its
* own one minute later */
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{
json_t *json_msg;
client->reconnect = false;
client->reconnect_request = time(NULL);
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
"params");
stratum_add_send(sdata, json_msg, client->id);
}
static void srecv_process(ckpool_t *ckp, char *buf) static void srecv_process(ckpool_t *ckp, char *buf)
{ {
bool noid = false, dropped = false; bool noid = false, dropped = false;

Loading…
Cancel
Save