Browse Source

Reconnect clients lazily on proxy switch once we receive another message from them and drop them if they don't disconnect on their own within a minute

master
Con Kolivas 10 years ago
parent
commit
db9a777767
  1. 53
      src/stratifier.c

53
src/stratifier.c

@ -254,6 +254,9 @@ struct stratum_instance {
* or other problem and should be dropped lazily if
* this is set to 2 */
bool reconnect; /* Do we need to send this client a reconnect message */
time_t reconnect_request; /* The time we sent a reconnect message */
user_instance_t *user_instance;
worker_instance_t *worker_instance;
@ -1011,7 +1014,19 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id)
return proxy;
}
static void reconnect_clients(sdata_t *sdata, const char *cmd);
/* Iterates over all clients and sets the reconnect bool for the message
* to be sent lazily next time they speak to us. */
static void reconnect_clients(sdata_t *sdata)
{
stratum_instance_t *client, *tmp;
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
client->reconnect = true;
}
ck_runlock(&sdata->instance_lock);
}
static void _update_notify(ckpool_t *ckp, const int id);
static proxy_t *current_proxy(sdata_t *sdata)
@ -1093,7 +1108,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
/* Notify implied required now too */
_update_notify(ckp, id);
if (reconnect)
reconnect_clients(sdata, "");
reconnect_clients(sdata);
}
static void update_diff(ckpool_t *ckp);
@ -1576,7 +1591,7 @@ static void stratum_broadcast_message(sdata_t *sdata, const char *msg)
/* Send a generic reconnect to all clients without parameters to make them
* reconnect to the same server. */
static void reconnect_clients(sdata_t *sdata, const char *cmd)
static void request_reconnect(sdata_t *sdata, const char *cmd)
{
char *port = strdupa(cmd), *url = NULL;
stratum_instance_t *client, *tmp;
@ -1825,7 +1840,7 @@ static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf)
mutex_unlock(&sdata->proxy_lock);
_update_notify(ckp, id);
reconnect_clients(sdata, "");
reconnect_clients(sdata);
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
@ -1933,7 +1948,7 @@ retry:
} else if (cmdmatch(buf, "noblock")) {
block_reject(sdata, buf + 8);
} else if (cmdmatch(buf, "reconnect")) {
reconnect_clients(sdata, buf);
request_reconnect(sdata, buf);
} else if (cmdmatch(buf, "proxy")) {
set_proxy(ckp, sdata, buf);
} else if (cmdmatch(buf, "loglevel")) {
@ -3513,16 +3528,14 @@ static void free_smsg(smsg_t *msg)
}
/* Entered with client holding ref count */
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client)
static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, stratum_instance_t *client)
{
json_t *val = msg->json_msg, *id_val, *method, *params;
int64_t client_id = msg->client_id;
char buf[256];
if (unlikely(client->reject == 2)) {
if (client->reject == 2 || (client->reconnect_request && time(NULL) - client->reconnect_request > 60)) {
LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id);
send_proc(client->ckp->connector, buf);
connector_drop_client(ckp, client_id);
goto out;
}
@ -3557,6 +3570,20 @@ out:
free_smsg(msg);
}
/* Send a single client a reconnect request, setting the time we sent the
* request so we can drop the client lazily if it hasn't reconnected on its
* own one minute later */
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
{
json_t *json_msg;
client->reconnect = false;
client->reconnect_request = time(NULL);
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
"params");
stratum_add_send(sdata, json_msg, client->id);
}
static void srecv_process(ckpool_t *ckp, char *buf)
{
bool noid = false, dropped = false;
@ -3633,7 +3660,11 @@ static void srecv_process(ckpool_t *ckp, char *buf)
if (unlikely(noid))
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
parse_instance_msg(sdata, msg, client);
parse_instance_msg(ckp, sdata, msg, client);
/* The client is still active but has been issued a reconnect request
* so use this opportunity to send it a reconnect message */
if (unlikely(client->reconnect))
reconnect_client(sdata, client);
dec_instance_ref(sdata, client);
out:
free(buf);

Loading…
Cancel
Save