diff --git a/src/stratifier.c b/src/stratifier.c index 165dfd9c..9cbc14b4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1141,11 +1141,14 @@ static int64_t current_headroom(sdata_t *sdata, proxy_t **proxy) mutex_lock(&sdata->proxy_lock); *proxy = sdata->proxy; + if (!*proxy) + goto out_unlock; HASH_ITER(sh, (*proxy)->subproxies, subproxy, tmp) { if (subproxy->dead) continue; headroom += subproxy->max_clients - subproxy->clients; } +out_unlock: mutex_unlock(&sdata->proxy_lock); return headroom; @@ -1170,6 +1173,8 @@ static void reconnect_clients(sdata_t *sdata) proxy_t *proxy; headroom = current_headroom(sdata, &proxy); + if (!proxy) + return; ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { @@ -1192,7 +1197,7 @@ static void reconnect_clients(sdata_t *sdata) flagged, proxy->id); } if (flagged) - generator_recruit(client->ckp); + generator_recruit(sdata->ckp); } static proxy_t *current_proxy(sdata_t *sdata) @@ -2121,6 +2126,12 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) { json_t *json_msg; + /* Already requested? */ + if (client->reconnect_request) { + connector_drop_client(sdata->ckp, client->id); + client->dropped = true; + return; + } client->reconnect = false; client->reconnect_request = time(NULL); JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", @@ -2151,7 +2162,9 @@ static void set_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf) { stratum_instance_t *client, *tmp; + int reconnects = 0, flagged = 0; int id = 0, subid = 0; + int64_t headroom; proxy_t *proxy; sscanf(buf, "deadproxy=%d:%d", &id, &subid); @@ -2159,13 +2172,54 @@ static void dead_proxy(sdata_t *sdata, const char *buf) if (proxy) proxy->dead = true; LOGNOTICE("Stratifier dropping clients from proxy %d:%d", id, subid); + headroom = current_headroom(sdata, &proxy); + if (!proxy) + return; ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { - if (client->proxyid == id && client->subproxyid == subid) - reconnect_client(sdata, client); + if (client->proxyid != id || client->subproxyid != subid) + continue; + if (reconnects >= headroom) { + if (!client->reconnect) { + client->reconnect = true; + flagged++; + } + continue; + } + client->reconnect = true; + reconnects++; + reconnect_client(sdata, client); } ck_runlock(&sdata->instance_lock); + + if (reconnects || flagged) { + LOGNOTICE("Reconnected %d clients, flagged %d from dead proxy %d:%d", reconnects, + flagged, id, subid); + } + if (flagged) + generator_recruit(sdata->ckp); +} + +/* Must hold a reference */ +static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client) +{ + int64_t headroom; + proxy_t *proxy; + + headroom = current_headroom(sdata, &proxy); + if (!proxy) + return; + if (headroom > 0) { + LOGNOTICE("Reconnecting client %"PRId64, client->id); + reconnect_client(sdata, client); + } else { + generator_recruit(sdata->ckp); + if (!client->reconnect) { + LOGNOTICE("Flagging client %"PRId64, client->id); + client->reconnect = true; + } + } } static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) @@ -2177,8 +2231,7 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id); return; } - LOGNOTICE("Reconnecting client %"PRId64, client_id); - reconnect_client(sdata, client); + lazy_reconnect_client(sdata, client); dec_instance_ref(sdata, client); } @@ -4079,7 +4132,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) /* The client is still active but has been issued a reconnect request * so use this opportunity to send it a reconnect message */ if (unlikely(client->reconnect)) - reconnect_client(sdata, client); + lazy_reconnect_client(sdata, client); dec_instance_ref(sdata, client); out: free(buf);