From 182e1b2d8da6310ea3506ebc969ba463e59534b4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 20:09:07 +1000 Subject: [PATCH] Move to a completely pull based mechanism for reconnecting clients once there are enough upstream connections instead of forcing reconnects when there may not be enough --- src/stratifier.c | 82 +++++++++++------------------------------------- 1 file changed, 19 insertions(+), 63 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index d9182035..925a711b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -259,7 +259,6 @@ struct stratum_instance { * or other problem and should be dropped lazily if * this is set to 2 */ - bool reconnect; /* Do we need to send this client a reconnect message */ time_t reconnect_request; /* The time we sent a reconnect message */ user_instance_t *user_instance; @@ -1363,8 +1362,8 @@ static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int } /* Find how much headroom we have and connect up to that many clients that are - * not currently on this pool, setting the reconnect for the remainder to be - * switched lazily. Only reconnect clients bound to global proxies. */ + * not currently on this pool, recruiting more slots to switch more clients + * later on lazily. Only reconnect clients bound to global proxies. */ static void reconnect_clients(sdata_t *sdata) { stratum_instance_t *client, *tmpclient; @@ -1387,18 +1386,16 @@ static void reconnect_clients(sdata_t *sdata) continue; if (client->proxyid == proxy->id) continue; - if (client->reconnect) - continue; if (headroom-- < 1) continue; reconnects++; - client->reconnect = true; + reconnect_client(sdata, client); } ck_runlock(&sdata->instance_lock); if (reconnects) { - LOGNOTICE("%d clients flagged for reconnect to proxy %d", reconnects, - proxy->id); + LOGNOTICE("%d clients flagged for reconnect to global proxy %d", + reconnects, proxy->id); } if (headroom < 0) generator_recruit(sdata->ckp, proxy->id, -headroom); @@ -1448,8 +1445,8 @@ static void check_bestproxy(sdata_t *sdata) static void dead_proxyid(sdata_t *sdata, const int id, const int subid) { - int reconnects = 0, hard = 0, proxyid = 0; stratum_instance_t *client, *tmp; + int reconnects = 0, proxyid = 0; int64_t headroom; proxy_t *proxy; @@ -1468,12 +1465,13 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid) HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->proxyid != id || client->subproxyid != subid) continue; + /* Clients could remain connected to a dead connection here + * but should be picked up when we recruit enough slots after + * another notify. */ + if (headroom-- < 1) + continue; reconnects++; - if (headroom-- > 0 && client->reconnect && hard <= SOMAXCONN / 2) { - hard++; - reconnect_client(sdata, client); - } else - client->reconnect = true; + reconnect_client(sdata, client); } ck_runlock(&sdata->instance_lock); @@ -1607,7 +1605,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r } /* Check how much headroom the userid proxies have and reconnect any clients - * that are not bound to it */ + * that are not bound to it that should be */ static void check_userproxies(sdata_t *sdata, const int userid) { int64_t headroom = proxy_headroom(sdata, userid); @@ -1624,12 +1622,10 @@ static void check_userproxies(sdata_t *sdata, const int userid) continue; if (client->proxy->userid == userid) continue; - if (client->reconnect) - continue; if (headroom-- < 1) continue; reconnects++; - client->reconnect = true; + reconnect_client(sdata, client); } ck_runlock(&sdata->instance_lock); @@ -2072,7 +2068,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val) stratum_instance_t *client, *tmp; ckmsg_t *bulk_send = NULL; time_t now_t = time(NULL); - int hard_reconnect = 0; ckmsgq_t *ssends; if (unlikely(!val)) { @@ -2103,13 +2098,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val) } continue; } - if (client->reconnect) { - if (hard_reconnect <= SOMAXCONN / 2) { - hard_reconnect++; - reconnect_client(ckp_sdata, client); - } - continue; - } if (!client_active(client)) continue; @@ -2420,15 +2408,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) /* Send a single client a reconnect request, setting the time we sent the * request so we can drop the client lazily if it hasn't reconnected on its - * own one minute later */ + * own more than one minute later if we call reconnect again */ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) { json_t *json_msg; /* Already requested? */ if (client->reconnect_request) { - connector_drop_client(sdata->ckp, client->id); - client->dropped = true; + if (time(NULL) - client->reconnect_request >= 60) + connector_drop_client(sdata->ckp, client->id); return; } client->reconnect_request = time(NULL); @@ -2445,28 +2433,6 @@ static void dead_proxy(sdata_t *sdata, const char *buf) dead_proxyid(sdata, id, subid); } -/* Must hold a reference */ -static void lazy_reconnect_client(sdata_t *sdata, stratum_instance_t *client) -{ - int64_t headroom; - proxy_t *proxy; - - headroom = current_headroom(sdata, &proxy); - if (!proxy) - return; - if (headroom-- > 0) { - LOGNOTICE("Reconnecting client %"PRId64, client->id); - reconnect_client(sdata, client); - } else { - generator_recruit(sdata->ckp, proxy->id, -headroom); - if (!client->reconnect) { - LOGNOTICE("Flagging client %"PRId64, client->id); - client->reconnect = true; - } else /* Already been flagged, force the send */ - reconnect_client(sdata, client); - } -} - static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) { stratum_instance_t *client; @@ -2476,7 +2442,7 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) LOGINFO("reconnect_client_id failed to find client %"PRId64, client_id); return; } - lazy_reconnect_client(sdata, client); + reconnect_client(sdata, client); dec_instance_ref(sdata, client); } @@ -3987,12 +3953,6 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ if (ckp->proxy) { LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s", client->id, client->proxyid, client->subproxyid, buf, user->username); - if (client->sdata && client->sdata->proxy && client->sdata->proxy->global) { - sdata_t *ckp_sdata = ckp->data; - - if (proxy_headroom(ckp_sdata, client->user_id)) - client->reconnect = true; - } } else { LOGNOTICE("Authorised client %"PRId64" worker %s as user %s", client->id, buf, user->username); @@ -4912,7 +4872,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; - if (client->reject == 2 || (client->reconnect_request && time(NULL) - client->reconnect_request > 60)) { + if (client->reject == 2) { LOGINFO("Dropping client %"PRId64" %s tagged for lazy invalidation", client_id, client->address); connector_drop_client(ckp, client_id); @@ -5027,10 +4987,6 @@ static void srecv_process(ckpool_t *ckp, char *buf) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); parse_instance_msg(ckp, sdata, msg, client); - /* The client is still active but has been issued a reconnect request - * so use this opportunity to send it a reconnect message */ - if (unlikely(client->reconnect)) - lazy_reconnect_client(sdata, client); dec_instance_ref(sdata, client); out: free(buf);