Browse Source

Move to a completely pull based mechanism for reconnecting clients once there are enough upstream connections instead of forcing reconnects when there may not be enough

master
Con Kolivas 10 years ago
parent
commit
182e1b2d8d
  1. 82
      src/stratifier.c

82
src/stratifier.c

@ -259,7 +259,6 @@ struct stratum_instance {
* or other problem and should be dropped lazily if * or other problem and should be dropped lazily if
* this is set to 2 */ * this is set to 2 */
bool reconnect; /* Do we need to send this client a reconnect message */
time_t reconnect_request; /* The time we sent a reconnect message */ time_t reconnect_request; /* The time we sent a reconnect message */
user_instance_t *user_instance; user_instance_t *user_instance;
@ -1363,8 +1362,8 @@ static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int
} }
/* Find how much headroom we have and connect up to that many clients that are /* Find how much headroom we have and connect up to that many clients that are
* not currently on this pool, setting the reconnect for the remainder to be * not currently on this pool, recruiting more slots to switch more clients
* switched lazily. Only reconnect clients bound to global proxies. */ * later on lazily. Only reconnect clients bound to global proxies. */
static void reconnect_clients(sdata_t *sdata) static void reconnect_clients(sdata_t *sdata)
{ {
stratum_instance_t *client, *tmpclient; stratum_instance_t *client, *tmpclient;
@ -1387,18 +1386,16 @@ static void reconnect_clients(sdata_t *sdata)
continue; continue;
if (client->proxyid == proxy->id) if (client->proxyid == proxy->id)
continue; continue;
if (client->reconnect)
continue;
if (headroom-- < 1) if (headroom-- < 1)
continue; continue;
reconnects++; reconnects++;
client->reconnect = true; reconnect_client(sdata, client);
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged for reconnect to proxy %d", reconnects, LOGNOTICE("%d clients flagged for reconnect to global proxy %d",
proxy->id); reconnects, proxy->id);
} }
if (headroom < 0) if (headroom < 0)
generator_recruit(sdata->ckp, proxy->id, -headroom); generator_recruit(sdata->ckp, proxy->id, -headroom);
@ -1448,8 +1445,8 @@ static void check_bestproxy(sdata_t *sdata)
static void dead_proxyid(sdata_t *sdata, const int id, const int subid) static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
{ {
int reconnects = 0, hard = 0, proxyid = 0;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0, proxyid = 0;
int64_t headroom; int64_t headroom;
proxy_t *proxy; proxy_t *proxy;
@ -1468,12 +1465,13 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->proxyid != id || client->subproxyid != subid) if (client->proxyid != id || client->subproxyid != subid)
continue; continue;
/* Clients could remain connected to a dead connection here
* but should be picked up when we recruit enough slots after
* another notify. */
if (headroom-- < 1)
continue;
reconnects++; reconnects++;
if (headroom-- > 0 && client->reconnect && hard <= SOMAXCONN / 2) { reconnect_client(sdata, client);
hard++;
reconnect_client(sdata, client);
} else
client->reconnect = true;
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
@ -1607,7 +1605,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r
} }
/* Check how much headroom the userid proxies have and reconnect any clients /* Check how much headroom the userid proxies have and reconnect any clients
* that are not bound to it */ * that are not bound to it that should be */
static void check_userproxies(sdata_t *sdata, const int userid) static void check_userproxies(sdata_t *sdata, const int userid)
{ {
int64_t headroom = proxy_headroom(sdata, userid); int64_t headroom = proxy_headroom(sdata, userid);
@ -1624,12 +1622,10 @@ static void check_userproxies(sdata_t *sdata, const int userid)
continue; continue;
if (client->proxy->userid == userid) if (client->proxy->userid == userid)
continue; continue;
if (client->reconnect)
continue;
if (headroom-- < 1) if (headroom-- < 1)
continue; continue;
reconnects++; reconnects++;
client->reconnect = true; reconnect_client(sdata, client);
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
@ -2072,7 +2068,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val)
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
ckmsg_t *bulk_send = NULL; ckmsg_t *bulk_send = NULL;
time_t now_t = time(NULL); time_t now_t = time(NULL);
int hard_reconnect = 0;
ckmsgq_t *ssends; ckmsgq_t *ssends;
if (unlikely(!val)) { if (unlikely(!val)) {
@ -2103,13 +2098,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val)
} }
continue; continue;
} }
if (client->reconnect) {
if (hard_reconnect <= SOMAXCONN / 2) {
hard_reconnect++;
reconnect_client(ckp_sdata, client);
}
continue;
}
if (!client_active(client)) if (!client_active(client))
continue; continue;
@ -2420,15 +2408,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
/* Send a single client a reconnect request, setting the time we sent the /* Send a single client a reconnect request, setting the time we sent the
* request so we can drop the client lazily if it hasn't reconnected on its * request so we can drop the client lazily if it hasn't reconnected on its
* own one minute later */ * own more than one minute later if we call reconnect again */
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{ {
json_t *json_msg; json_t *json_msg;
/* Already requested? */ /* Already requested? */
if (client->reconnect_request) { if (client->reconnect_request) {
connector_drop_client(sdata->ckp, client->id); if (time(NULL) - client->reconnect_request >= 60)
client->dropped = true; connector_drop_client(sdata->ckp, client->id);
return; return;
} }
client->reconnect_request = time(NULL); client->reconnect_request = time(NULL);
@ -2445,28 +2433,6 @@ static void dead_proxy(sdata_t *sdata, const char *buf)
dead_proxyid(sdata, id, subid); dead_proxyid(sdata, id, subid);
} }
/* Must hold a reference */
static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{
int64_t headroom;
proxy_t *proxy;
headroom = current_headroom(sdata, &proxy);
if (!proxy)
return;
if (headroom-- > 0) {
LOGNOTICE("Reconnecting client %"PRId64, client->id);
reconnect_client(sdata, client);
} else {
generator_recruit(sdata->ckp, proxy->id, -headroom);
if (!client->reconnect) {
LOGNOTICE("Flagging client %"PRId64, client->id);
client->reconnect = true;
} else /* Already been flagged, force the send */
reconnect_client(sdata, client);
}
}
static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
{ {
stratum_instance_t *client; stratum_instance_t *client;
@ -2476,7 +2442,7 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id); LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id);
return; return;
} }
lazy_reconnect_client(sdata, client); reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
} }
@ -3987,12 +3953,6 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
if (ckp->proxy) { if (ckp->proxy) {
LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s", LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s",
client->id, client->proxyid, client->subproxyid, buf, user->username); client->id, client->proxyid, client->subproxyid, buf, user->username);
if (client->sdata && client->sdata->proxy && client->sdata->proxy->global) {
sdata_t *ckp_sdata = ckp->data;
if (proxy_headroom(ckp_sdata, client->user_id))
client->reconnect = true;
}
} else { } else {
LOGNOTICE("Authorised client %"PRId64" worker %s as user %s", LOGNOTICE("Authorised client %"PRId64" worker %s as user %s",
client->id, buf, user->username); client->id, buf, user->username);
@ -4912,7 +4872,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int64_t client_id = msg->client_id; int64_t client_id = msg->client_id;
if (client->reject == 2 || (client->reconnect_request && time(NULL) - client->reconnect_request > 60)) { if (client->reject == 2) {
LOGINFO("Dropping client %"PRId64" %s tagged for lazy invalidation", LOGINFO("Dropping client %"PRId64" %s tagged for lazy invalidation",
client_id, client->address); client_id, client->address);
connector_drop_client(ckp, client_id); connector_drop_client(ckp, client_id);
@ -5027,10 +4987,6 @@ static void srecv_process(ckpool_t *ckp, char *buf)
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
parse_instance_msg(ckp, sdata, msg, client); parse_instance_msg(ckp, sdata, msg, client);
/* The client is still active but has been issued a reconnect request
* so use this opportunity to send it a reconnect message */
if (unlikely(client->reconnect))
lazy_reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
out: out:
free(buf); free(buf);

Loading…
Cancel
Save