From db9a777767b08733191bea3bcb8c48a6669ca7d0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 7 Feb 2015 14:46:43 +1100 Subject: [PATCH] Reconnect clients lazily on proxy switch once we receive another message from them and drop them if they don't disconnect on their own within a minute --- src/stratifier.c | 53 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c0fad849..ec0238a4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -254,6 +254,9 @@ struct stratum_instance { * or other problem and should be dropped lazily if * this is set to 2 */ + bool reconnect; /* Do we need to send this client a reconnect message */ + time_t reconnect_request; /* The time we sent a reconnect message */ + user_instance_t *user_instance; worker_instance_t *worker_instance; @@ -1011,7 +1014,19 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id) return proxy; } -static void reconnect_clients(sdata_t *sdata, const char *cmd); +/* Iterates over all clients and sets the reconnect bool for the message + * to be sent lazily next time they speak to us. */ +static void reconnect_clients(sdata_t *sdata) +{ + stratum_instance_t *client, *tmp; + + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + client->reconnect = true; + } + ck_runlock(&sdata->instance_lock); +} + static void _update_notify(ckpool_t *ckp, const int id); static proxy_t *current_proxy(sdata_t *sdata) @@ -1093,7 +1108,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) /* Notify implied required now too */ _update_notify(ckp, id); if (reconnect) - reconnect_clients(sdata, ""); + reconnect_clients(sdata); } static void update_diff(ckpool_t *ckp); @@ -1576,7 +1591,7 @@ static void stratum_broadcast_message(sdata_t *sdata, const char *msg) /* Send a generic reconnect to all clients without parameters to make them * reconnect to the same server. */ -static void reconnect_clients(sdata_t *sdata, const char *cmd) +static void request_reconnect(sdata_t *sdata, const char *cmd) { char *port = strdupa(cmd), *url = NULL; stratum_instance_t *client, *tmp; @@ -1825,7 +1840,7 @@ static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) mutex_unlock(&sdata->proxy_lock); _update_notify(ckp, id); - reconnect_clients(sdata, ""); + reconnect_clients(sdata); } static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) @@ -1933,7 +1948,7 @@ retry: } else if (cmdmatch(buf, "noblock")) { block_reject(sdata, buf + 8); } else if (cmdmatch(buf, "reconnect")) { - reconnect_clients(sdata, buf); + request_reconnect(sdata, buf); } else if (cmdmatch(buf, "proxy")) { set_proxy(ckp, sdata, buf); } else if (cmdmatch(buf, "loglevel")) { @@ -3513,16 +3528,14 @@ static void free_smsg(smsg_t *msg) } /* Entered with client holding ref count */ -static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client) +static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, stratum_instance_t *client) { json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; - char buf[256]; - if (unlikely(client->reject == 2)) { + if (client->reject == 2 || (client->reconnect_request && time(NULL) - client->reconnect_request > 60)) { LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + connector_drop_client(ckp, client_id); goto out; } @@ -3557,6 +3570,20 @@ out: free_smsg(msg); } +/* Send a single client a reconnect request, setting the time we sent the + * request so we can drop the client lazily if it hasn't reconnected on its + * own one minute later */ +static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) +{ + json_t *json_msg; + + client->reconnect = false; + client->reconnect_request = time(NULL); + JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", + "params"); + stratum_add_send(sdata, json_msg, client->id); +} + static void srecv_process(ckpool_t *ckp, char *buf) { bool noid = false, dropped = false; @@ -3633,7 +3660,11 @@ static void srecv_process(ckpool_t *ckp, char *buf) if (unlikely(noid)) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); - parse_instance_msg(sdata, msg, client); + parse_instance_msg(ckp, sdata, msg, client); + /* The client is still active but has been issued a reconnect request + * so use this opportunity to send it a reconnect message */ + if (unlikely(client->reconnect)) + reconnect_client(sdata, client); dec_instance_ref(sdata, client); out: free(buf);