diff --git a/src/ckpool.h b/src/ckpool.h index 2da96ed5..b2f59333 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -65,6 +65,14 @@ struct connsock { typedef struct connsock connsock_t; +typedef struct char_entry char_entry_t; + +struct char_entry { + char_entry_t *next; + char_entry_t *prev; + char *buf; +}; + struct server_instance { /* Hash table data */ UT_hash_handle hh; @@ -175,14 +183,12 @@ struct ckpool_instance { char **serverurl; // Array of URLs to bind our server/proxy to int serverurls; // Number of server bindings int update_interval; // Seconds between stratum updates - int chosen_server; // Chosen server for next connection /* Proxy options */ int proxies; char **proxyurl; char **proxyauth; char **proxypass; - server_instance_t *btcdbackup; /* Private data for each process */ void *data; diff --git a/src/generator.c b/src/generator.c index b9d02370..9c23e5c3 100644 --- a/src/generator.c +++ b/src/generator.c @@ -71,12 +71,18 @@ struct pass_msg { typedef struct pass_msg pass_msg_t; +typedef struct proxy_instance proxy_instance_t; + /* Per proxied pool instance data */ struct proxy_instance { + proxy_instance_t *next; + proxy_instance_t *prev; + ckpool_t *ckp; connsock_t *cs; server_instance_t *si; bool passthrough; + int id; /* Proxy server id */ const char *auth; const char *pass; @@ -92,14 +98,13 @@ struct proxy_instance { double diff; tv_t last_share; - int id; /* Message id for sending stratum messages */ + int msg_id; /* Message id for sending stratum messages */ bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ bool notified; /* Received new template for work */ bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ - bool replaced; /* This proxy has issued a reconnect with new data */ pthread_mutex_t notify_lock; notify_instance_t *notify_instances; @@ -117,12 +122,14 @@ struct proxy_instance { int64_t share_id; ckmsgq_t *passsends; // passthrough sends -}; -typedef struct proxy_instance proxy_instance_t; + char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ +}; /* Private data for the generator */ struct generator_data { + pthread_mutex_t lock; /* Lock protecting linked lists */ + proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -490,40 +497,101 @@ static json_t *find_notify(json_t *val) return ret; } +/* Get stored line in the proxy linked list of messages if any exist or NULL */ +static char *cached_proxy_line(proxy_instance_t *proxi) +{ + char *buf = NULL; + + if (proxi->recvd_lines) { + char_entry_t *char_t = proxi->recvd_lines; + + DL_DELETE(proxi->recvd_lines, char_t); + buf = char_t->buf; + free(char_t); + } + return buf; +} + +/* Get next line in the proxy linked list of messages or a new line from the + * connsock if there are none. */ +static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi) +{ + char *buf = cached_proxy_line(proxi); + + if (!buf && read_socket_line(cs, 5) > 0) + buf = strdup(cs->buf); + return buf; +} + +/* For appending a line to the proxy recv list */ +static void append_proxy_line(proxy_instance_t *proxi, const char *buf) +{ + char_entry_t *char_t = ckalloc(sizeof(char_entry_t)); + char_t->buf = strdup(buf); + DL_APPEND(proxi->recvd_lines, char_t); +} + +/* Get a new line from the connsock and return a copy of it */ +static char *new_proxy_line(connsock_t *cs) +{ + char *buf = NULL; + + if (read_socket_line(cs, 5) < 1) + goto out; + buf = strdup(cs->buf); +out: + return buf; +} + static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *notify_val, *tmp; + bool parsed, ret = false; + int retries = 0, size; const char *string; - bool ret = false; - char *old; - int size; + char *buf, *old; - size = read_socket_line(cs, 5); - if (size < 1) { - LOGWARNING("Failed to receive line in parse_subscribe"); +retry: + parsed = true; + if (!(buf = new_proxy_line(cs))) { + LOGNOTICE("Proxy %d:%s failed to receive line in parse_subscribe", + proxi->id, proxi->si->url); goto out; } - LOGDEBUG("parse_subscribe received %s", cs->buf); + LOGDEBUG("parse_subscribe received %s", buf); /* Ignore err_val here stored in &tmp */ - val = json_msg_result(cs->buf, &res_val, &tmp); + val = json_msg_result(buf, &res_val, &tmp); if (!val || !res_val) { - LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf); - goto out; + LOGINFO("Failed to get a json result in parse_subscribe, got: %s", buf); + parsed = false; } if (!json_is_array(res_val)) { - LOGWARNING("Result in parse_subscribe not an array"); - goto out; + LOGINFO("Result in parse_subscribe not an array"); + parsed = false; } size = json_array_size(res_val); if (size < 3) { - LOGWARNING("Result in parse_subscribe array too small"); - goto out; + LOGINFO("Result in parse_subscribe array too small"); + parsed = false; } notify_val = find_notify(res_val); if (!notify_val) { - LOGWARNING("Failed to find notify in parse_subscribe"); + LOGINFO("Failed to find notify in parse_subscribe"); + parsed = false; + } + if (!parsed) { + if (++retries < 3) { + /* We don't want this response so put it on the proxy + * recvd list to be parsed later */ + append_proxy_line(proxi, buf); + buf = NULL; + goto retry; + } + LOGNOTICE("Proxy %d:%s failed to parse subscribe response in parse_subscribe", + proxi->id, proxi->si->url); goto out; } + /* Free up old data in place if we are re-subscribing */ old = proxi->sessionid; proxi->sessionid = NULL; @@ -562,10 +630,12 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Invalid nonce2len %d in parse_subscribe", size); goto out; } - if (size == 3) - LOGWARNING("Nonce2 length %d means proxied clients can't be >5TH each", size); + if (size == 3 || (size == 4 && proxi->ckp->clientsvspeed)) + LOGWARNING("Proxy %d:%s Nonce2 length %d means proxied clients can't be >5TH each", + proxi->id, proxi->si->url, size); else if (size < 3) { - LOGWARNING("Nonce2 length %d too small to be able to proxy", size); + LOGWARNING("Proxy %d:%s Nonce2 length %d too small to be able to proxy", + proxi->id, proxi->si->url, size); goto out; } proxi->nonce2len = size; @@ -577,6 +647,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) out: if (val) json_decref(val); + free(buf); return ret; } @@ -589,26 +660,27 @@ retry: /* Attempt to reconnect if the pool supports resuming */ if (proxi->sessionid) { JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION, proxi->sessionid); /* Then attempt to connect with just the client description */ } else if (!proxi->no_params) { JSON_CPACK(req, "{s:i,s:s,s:[s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION); /* Then try without any parameters */ } else { JSON_CPACK(req, "{s:i,s:s,s:[]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params"); } ret = send_json_msg(cs, req); json_decref(req); if (!ret) { - LOGWARNING("Failed to send message in subscribe_stratum"); + LOGNOTICE("Proxy %d:%s failed to send message in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } ret = parse_subscribe(cs, proxi); @@ -616,20 +688,24 @@ retry: goto out; if (proxi->no_params) { - LOGWARNING("Failed all subscription options in subscribe_stratum"); + LOGNOTICE("Proxy %d:%s failed all subscription options in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } if (proxi->sessionid) { - LOGNOTICE("Failed sessionid reconnect in subscribe_stratum, retrying without"); + LOGINFO("Proxy %d:%s failed sessionid reconnect in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_sessionid = true; dealloc(proxi->sessionid); } else { - LOGNOTICE("Failed connecting with parameters in subscribe_stratum, retrying without"); + LOGINFO("Proxy %d:%s failed connecting with parameters in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_params = true; } ret = connect_proxy(cs); if (!ret) { - LOGWARNING("Failed to reconnect in subscribe_stratum"); + LOGNOTICE("Proxy %d:%s failed to reconnect in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } goto retry; @@ -806,7 +882,9 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; + proxy_instance_t *newproxi; ckpool_t *ckp = proxi->ckp; + gdata_t *gdata = ckp->data; const char *new_url; bool ret = false; int new_port; @@ -851,27 +929,25 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) ret = true; newsi = ckzalloc(sizeof(server_instance_t)); - newsi->id = ckp->proxies; - ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * (ckp->proxies + 1)); + + mutex_lock(&gdata->lock); + newsi->id = si->id; /* Inherit the old connection's id */ + si->id = ckp->proxies++; /* Give the old connection the lowest id */ + ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies); ckp->servers[newsi->id] = newsi; newsi->url = url; newsi->auth = strdup(si->auth); newsi->pass = strdup(si->pass); proxi->reconnect = true; - proxi->replaced = true; - - /* Reuse variable on a new proxy instance */ - proxi = ckzalloc(sizeof(proxy_instance_t)); - newsi->data = proxi; - proxi->auth = newsi->auth; - proxi->pass = newsi->pass; - proxi->si = newsi; - proxi->ckp = ckp; - proxi->cs = &newsi->cs; - - /* Set chosen server only once all new proxy data exists */ - ckp->proxies++; - ckp->chosen_server = newsi->id; + + newproxi = ckzalloc(sizeof(proxy_instance_t)); + newsi->data = newproxi; + newproxi->auth = newsi->auth; + newproxi->pass = newsi->pass; + newproxi->si = newsi; + newproxi->ckp = ckp; + newproxi->cs = &newsi->cs; + mutex_unlock(&gdata->lock); out: return ret; } @@ -962,16 +1038,18 @@ out: static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *req, *err_val; + char *buf = NULL; bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.authorize", "params", proxi->auth, proxi->pass); ret = send_json_msg(cs, req); json_decref(req); if (!ret) { - LOGWARNING("Failed to send message in auth_stratum"); + LOGNOTICE("Proxy %d:%s failed to send message in auth_stratum", + proxi->id, proxi->si->url); Close(cs->fd); goto out; } @@ -979,41 +1057,55 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) /* Read and parse any extra methods sent. Anything left in the buffer * should be the response to our auth request. */ do { - int size; - - size = read_socket_line(cs, 5); - if (size < 1) { - LOGWARNING("Failed to receive line in auth_stratum"); + free(buf); + buf = next_proxy_line(cs, proxi); + if (!buf) { + LOGNOTICE("Proxy %d:%s failed to receive line in auth_stratum", + proxi->id, proxi->si->url); ret = false; goto out; } - ret = parse_method(proxi, cs->buf); + ret = parse_method(proxi, buf); } while (ret); - val = json_msg_result(cs->buf, &res_val, &err_val); + val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Failed to get a json result in auth_stratum, got: %s", cs->buf); + LOGWARNING("Proxy %d:%s failed to get a json result in auth_stratum, got: %s", + proxi->id, proxi->si->url, buf); goto out; } if (err_val && !json_is_null(err_val)) { - LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", cs->buf); + LOGWARNING("Proxy %d:%s failed to authorise in auth_stratum due to err_val, got: %s", + proxi->id, proxi->si->url, buf); goto out; } if (res_val) { ret = json_is_true(res_val); if (!ret) { - LOGWARNING("Failed to authorise in auth_stratum"); + LOGWARNING("Proxy %d:%s failed to authorise in auth_stratum", + proxi->id, proxi->si->url); goto out; } } else { /* No result and no error but successful val means auth success */ ret = true; } - LOGINFO("Auth success in auth_stratum"); + LOGINFO("Proxy %d:%s auth success in auth_stratum", proxi->id, proxi->si->url); out: if (val) json_decref(val); + if (ret) { + /* Now parse any cached responses so there are none in the + * queue and they can be managed one at a time from now on. */ + while(42) { + dealloc(buf); + buf = cached_proxy_line(proxi); + if (!buf) + break; + parse_method(proxi, buf); + }; + } return ret; } @@ -1350,9 +1442,6 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * { bool ret = false; - if (proxi->replaced) - return false; - /* Has this proxy already been reconnected? */ if (cs->fd > 0) return true; @@ -1379,8 +1468,8 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { if (!pinging) { - LOGINFO("Failed initial subscribe to %s:%s !", - cs->url, cs->port); + LOGWARNING("Failed initial subscribe to %s:%s !", + cs->url, cs->port); } goto out; } @@ -1405,20 +1494,28 @@ out: static proxy_instance_t *live_proxy(ckpool_t *ckp) { proxy_instance_t *alive = NULL; + gdata_t *gdata = ckp->data; connsock_t *cs; - int i; + int i, srvs; LOGDEBUG("Attempting to connect to proxy"); retry: if (!ping_main(ckp)) goto out; - for (i = ckp->chosen_server; i < ckp->proxies; i++) { + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + + for (i = 0; i < srvs; i++) { proxy_instance_t *proxi; server_instance_t *si; + mutex_lock(&gdata->lock); si = ckp->servers[i]; proxi = si->data; + mutex_unlock(&gdata->lock); + cs = proxi->cs; if (proxy_alive(ckp, si, proxi, cs, false)) { alive = proxi; @@ -1428,13 +1525,11 @@ retry: if (!alive) { send_proc(ckp->connector, "reject"); send_proc(ckp->stratifier, "dropall"); - if (!ckp->chosen_server) { - LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); - sleep(5); - } + LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); + sleep(5); goto retry; } - ckp->chosen_server = 0; + cs = alive->cs; LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); @@ -1457,8 +1552,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) notify_instance_t *ni, *tmp; connsock_t *cs; + send_proc(ckp->stratifier, "reconnect"); send_proc(ckp->connector, "reject"); - send_proc(ckp->stratifier, "dropall"); if (!proxi) // This shouldn't happen return; @@ -1499,10 +1594,10 @@ reconnect: if (!proxi) goto out; if (reconnecting) { + reconnecting = false; connsock_t *cs = proxi->cs; LOGWARNING("Successfully reconnected to %s:%s as proxy", cs->url, cs->port); - reconnecting = false; } /* We've just subscribed and authorised so tell the stratifier to @@ -1556,14 +1651,7 @@ retry: } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { - if (ckp->btcdbackup) { - LOGWARNING("Submitting block data locally!"); - if (submit_block(&ckp->btcdbackup->cs, buf + 12)) - LOGWARNING("Block accepted locally!"); - else - LOGWARNING("Block rejected locally."); - } else - LOGNOTICE("No backup btcd to send block to ourselves"); + LOGNOTICE("Submitting likely block solve share to upstream pool"); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { @@ -1615,47 +1703,15 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) return ret; } -static bool alive_btcd(server_instance_t *si) -{ - connsock_t *cs = &si->cs; - char *userpass = NULL; - gbtbase_t gbt; - - if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { - LOGWARNING("Failed to extract address from btcd %s", si->url); - return false; - } - userpass = strdup(si->auth); - realloc_strcat(&userpass, ":"); - realloc_strcat(&userpass, si->pass); - cs->auth = http_base64(userpass); - dealloc(userpass); - if (!cs->auth) { - LOGWARNING("Failed to create base64 auth from btcd %s", userpass); - return false; - } - if (cs->fd < 0) { - LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); - return false; - } - keep_sockalive(cs->fd); - /* Test we can authorise by getting a gbt, but we won't be using it. */ - memset(&gbt, 0, sizeof(gbtbase_t)); - if (!gen_gbtbase(cs, &gbt)) { - LOGINFO("Failed to get test block template from btcd %s:%s!", - cs->url, cs->port); - return false; - } - clear_gbtbase(&gbt); - return true; -} - static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { + gdata_t *gdata = ckp->data; proxy_instance_t *proxi; server_instance_t *si; int i, ret; + mutex_init(&gdata->lock); + /* Create all our proxy structures and pointers */ ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); for (i = 0; i < ckp->proxies; i++) { @@ -1676,27 +1732,11 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) mutex_init(&proxi->share_lock); } - if (ckp->btcds) { - /* If we also have btcds set up in proxy mode, try to talk to - * one of them as a way to submit blocks if we find them when - * submitting them upstream. */ - si = ckp->btcdbackup = ckzalloc(sizeof(server_instance_t)); - si->url = ckp->btcdurl[0]; - si->auth = ckp->btcdauth[0]; - si->pass = ckp->btcdpass[0]; - if (alive_btcd(si)) - LOGNOTICE("Backup btcd %s:%s alive", si->cs.url, si->cs.port); - else { - LOGNOTICE("Backup btcd %s:%s failed!", si->cs.url, si->cs.port); - ckp->btcdbackup = NULL; - free(si); - } - } - LOGWARNING("%s generator ready", ckp->name); ret = proxy_loop(pi); + mutex_lock(&gdata->lock); for (i = 0; i < ckp->proxies; i++) { si = ckp->servers[i]; Close(si->cs.fd); @@ -1714,6 +1754,8 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) dealloc(si->pass); dealloc(si); } + mutex_unlock(&gdata->lock); + dealloc(ckp->servers); return ret; } @@ -1722,6 +1764,40 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) * should check to see if the higher priority servers are alive and fallback */ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) { + static time_t last_t = 0; + bool alive = false; + time_t now_t; + int i; + + /* Rate limit to checking only once every 5 seconds */ + now_t = time(NULL); + if (now_t <= last_t + 5) + return; + + last_t = now_t; + + /* Is this the highest priority server already? */ + if (!cursi->id) + return; + + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + /* Have we reached the current server? */ + if (si == cursi) + return; + + alive = server_alive(ckp, si, true); + if (alive) + break; + } + if (alive) + send_proc(ckp->generator, "reconnect"); +} + +static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) +{ + gdata_t *gdata = ckp->data; static time_t last_t = 0; bool alive = false; time_t now_t; @@ -1735,27 +1811,29 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) last_t = now_t; /* Is this the highest priority server already? */ - if (cursi == ckp->servers[0]) + if (!cursi->id) return; - if (ckp->proxy) - srvs = ckp->proxies; - else - srvs = ckp->btcds; + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + for (i = 0; i < srvs; i++) { - server_instance_t *si = ckp->servers[i]; + proxy_instance_t *proxi; + server_instance_t *si; + connsock_t *cs; + + mutex_lock(&gdata->lock); + si = ckp->servers[i]; + proxi = si->data; + mutex_unlock(&gdata->lock); /* Have we reached the current server? */ if (si == cursi) return; - if (ckp->proxy) { - proxy_instance_t *proxi = si->data; - connsock_t *cs = proxi->cs; - - alive = proxy_alive(ckp, si, proxi, cs, true); - } else - alive = server_alive(ckp, si, true); + cs = proxi->cs; + alive = proxy_alive(ckp, si, proxi, cs, true); if (alive) break; } @@ -1763,6 +1841,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) send_proc(ckp->generator, "reconnect"); } + int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -1772,12 +1851,13 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; - gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); - - if (ckp->proxy) + if (ckp->proxy) { + gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); - else + } else { + gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); ret = server_mode(ckp, pi); + } dealloc(ckp->data); return process_exit(ckp, pi, ret); diff --git a/src/stratifier.c b/src/stratifier.c index 2fc5ae64..0f01e42e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -374,14 +374,6 @@ struct json_entry { json_t *val; }; -typedef struct char_entry char_entry_t; - -struct char_entry { - char_entry_t *next; - char_entry_t *prev; - char *buf; -}; - /* Priority levels for generator messages */ #define GEN_LAX 0 #define GEN_NORMAL 1 @@ -927,6 +919,15 @@ static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) __kill_instance(client); } +/* Removes a client instance we know is on the stratum_instances list and from + * the user client list if it's been placed on it */ +static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user) +{ + HASH_DEL(sdata->stratum_instances, client); + if (user) + DL_DELETE(user->clients, client); +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; @@ -936,13 +937,15 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + int64_t client_id = client->id; + if (!client->ref) { - HASH_DEL(sdata->stratum_instances, client); + __del_client(sdata, client, client->user_instance); __kill_instance(client); } else client->dropped = true; kills++; - sprintf(buf, "dropclient=%"PRId64, client->id); + sprintf(buf, "dropclient=%"PRId64, client_id); send_proc(ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { @@ -1202,9 +1205,7 @@ static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instan stratum_instance_t *old_client = NULL; int ret; - HASH_DEL(sdata->stratum_instances, client); - if (user) - DL_DELETE(user->clients, client); + __del_client(sdata, client, user); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { @@ -1468,6 +1469,7 @@ static void stratum_broadcast_message(sdata_t *sdata, const char *msg) static void reconnect_clients(sdata_t *sdata, const char *cmd) { char *port = strdupa(cmd), *url = NULL; + stratum_instance_t *client, *tmp; json_t *json_msg; strsep(&port, ":"); @@ -1483,6 +1485,14 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); stratum_broadcast(sdata, json_msg); + + /* Tag all existing clients as dropped now so they can be removed + * lazily */ + ck_wlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + client->dropped = true; + } + ck_wunlock(&sdata->instance_lock); } static void reset_bestshares(sdata_t *sdata)