Browse Source

Only reconnect clients when there is room for them on the new proxy, tagging the rest and recruiting as needed

master
Con Kolivas 10 years ago
parent
commit
a470ca6431
  1. 65
      src/stratifier.c

65
src/stratifier.c

@ -1141,11 +1141,14 @@ static int64_t current_headroom(sdata_t *sdata, proxy_t **proxy)
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
*proxy = sdata->proxy; *proxy = sdata->proxy;
if (!*proxy)
goto out_unlock;
HASH_ITER(sh, (*proxy)->subproxies, subproxy, tmp) { HASH_ITER(sh, (*proxy)->subproxies, subproxy, tmp) {
if (subproxy->dead) if (subproxy->dead)
continue; continue;
headroom += subproxy->max_clients - subproxy->clients; headroom += subproxy->max_clients - subproxy->clients;
} }
out_unlock:
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
return headroom; return headroom;
@ -1170,6 +1173,8 @@ static void reconnect_clients(sdata_t *sdata)
proxy_t *proxy; proxy_t *proxy;
headroom = current_headroom(sdata, &proxy); headroom = current_headroom(sdata, &proxy);
if (!proxy)
return;
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) { HASH_ITER(hh, sdata->stratum_instances, client, tmpclient) {
@ -1192,7 +1197,7 @@ static void reconnect_clients(sdata_t *sdata)
flagged, proxy->id); flagged, proxy->id);
} }
if (flagged) if (flagged)
generator_recruit(client->ckp); generator_recruit(sdata->ckp);
} }
static proxy_t *current_proxy(sdata_t *sdata) static proxy_t *current_proxy(sdata_t *sdata)
@ -2121,6 +2126,12 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{ {
json_t *json_msg; json_t *json_msg;
/* Already requested? */
if (client->reconnect_request) {
connector_drop_client(sdata->ckp, client->id);
client->dropped = true;
return;
}
client->reconnect = false; client->reconnect = false;
client->reconnect_request = time(NULL); client->reconnect_request = time(NULL);
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
@ -2151,7 +2162,9 @@ static void set_proxy(sdata_t *sdata, const char *buf)
static void dead_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0, flagged = 0;
int id = 0, subid = 0; int id = 0, subid = 0;
int64_t headroom;
proxy_t *proxy; proxy_t *proxy;
sscanf(buf, "deadproxy=%d:%d", &id, &subid); sscanf(buf, "deadproxy=%d:%d", &id, &subid);
@ -2159,13 +2172,54 @@ static void dead_proxy(sdata_t *sdata, const char *buf)
if (proxy) if (proxy)
proxy->dead = true; proxy->dead = true;
LOGNOTICE("Stratifier dropping clients from proxy %d:%d", id, subid); LOGNOTICE("Stratifier dropping clients from proxy %d:%d", id, subid);
headroom = current_headroom(sdata, &proxy);
if (!proxy)
return;
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid == id && client->subproxyid == subid) if (client->proxyid != id || client->subproxyid != subid)
reconnect_client(sdata, client); continue;
if (reconnects >= headroom) {
if (!client->reconnect) {
client->reconnect = true;
flagged++;
}
continue;
}
client->reconnect = true;
reconnects++;
reconnect_client(sdata, client);
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects || flagged) {
LOGNOTICE("Reconnected %d clients, flagged %d from dead proxy %d:%d", reconnects,
flagged, id, subid);
}
if (flagged)
generator_recruit(sdata->ckp);
}
/* Must hold a reference */
static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{
int64_t headroom;
proxy_t *proxy;
headroom = current_headroom(sdata, &proxy);
if (!proxy)
return;
if (headroom > 0) {
LOGNOTICE("Reconnecting client %"PRId64, client->id);
reconnect_client(sdata, client);
} else {
generator_recruit(sdata->ckp);
if (!client->reconnect) {
LOGNOTICE("Flagging client %"PRId64, client->id);
client->reconnect = true;
}
}
} }
static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
@ -2177,8 +2231,7 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id); LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id);
return; return;
} }
LOGNOTICE("Reconnecting client %"PRId64, client_id); lazy_reconnect_client(sdata, client);
reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
} }
@ -4079,7 +4132,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
/* The client is still active but has been issued a reconnect request /* The client is still active but has been issued a reconnect request
* so use this opportunity to send it a reconnect message */ * so use this opportunity to send it a reconnect message */
if (unlikely(client->reconnect)) if (unlikely(client->reconnect))
reconnect_client(sdata, client); lazy_reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
out: out:
free(buf); free(buf);

Loading…
Cancel
Save