From 1432b908294250051fd7aa4c819cdb6581149175 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 10:29:18 +1100 Subject: [PATCH 01/14] Remove code talking to local btcd in proxy mode since we won't be able to submit any realistic block solves locally --- src/ckpool.h | 1 - src/generator.c | 61 +------------------------------------------------ 2 files changed, 1 insertion(+), 61 deletions(-) diff --git a/src/ckpool.h b/src/ckpool.h index 2da96ed5..47150404 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -182,7 +182,6 @@ struct ckpool_instance { char **proxyurl; char **proxyauth; char **proxypass; - server_instance_t *btcdbackup; /* Private data for each process */ void *data; diff --git a/src/generator.c b/src/generator.c index b9d02370..b88685ed 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1556,14 +1556,7 @@ retry: } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { - if (ckp->btcdbackup) { - LOGWARNING("Submitting block data locally!"); - if (submit_block(&ckp->btcdbackup->cs, buf + 12)) - LOGWARNING("Block accepted locally!"); - else - LOGWARNING("Block rejected locally."); - } else - LOGNOTICE("No backup btcd to send block to ourselves"); + LOGNOTICE("Submitting likely block solve share to upstream pool"); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { @@ -1615,41 +1608,6 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) return ret; } -static bool alive_btcd(server_instance_t *si) -{ - connsock_t *cs = &si->cs; - char *userpass = NULL; - gbtbase_t gbt; - - if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { - LOGWARNING("Failed to extract address from btcd %s", si->url); - return false; - } - userpass = strdup(si->auth); - realloc_strcat(&userpass, ":"); - realloc_strcat(&userpass, si->pass); - cs->auth = http_base64(userpass); - dealloc(userpass); - if (!cs->auth) { - LOGWARNING("Failed to create base64 auth from btcd %s", userpass); - return false; - } - if (cs->fd < 0) { - LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); - return false; - } - keep_sockalive(cs->fd); - /* Test we can authorise by getting a gbt, but we won't be using it. */ - memset(&gbt, 0, sizeof(gbtbase_t)); - if (!gen_gbtbase(cs, &gbt)) { - LOGINFO("Failed to get test block template from btcd %s:%s!", - cs->url, cs->port); - return false; - } - clear_gbtbase(&gbt); - return true; -} - static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { proxy_instance_t *proxi; @@ -1676,23 +1634,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) mutex_init(&proxi->share_lock); } - if (ckp->btcds) { - /* If we also have btcds set up in proxy mode, try to talk to - * one of them as a way to submit blocks if we find them when - * submitting them upstream. */ - si = ckp->btcdbackup = ckzalloc(sizeof(server_instance_t)); - si->url = ckp->btcdurl[0]; - si->auth = ckp->btcdauth[0]; - si->pass = ckp->btcdpass[0]; - if (alive_btcd(si)) - LOGNOTICE("Backup btcd %s:%s alive", si->cs.url, si->cs.port); - else { - LOGNOTICE("Backup btcd %s:%s failed!", si->cs.url, si->cs.port); - ckp->btcdbackup = NULL; - free(si); - } - } - LOGWARNING("%s generator ready", ckp->name); ret = proxy_loop(pi); From 9c1e5f233282e755a8db107677f51e4e678e25c0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 10:46:07 +1100 Subject: [PATCH 02/14] Send reconnect instead of dropall to stratifier when killing an upstream proxy and drop them lazily --- src/generator.c | 2 +- src/stratifier.c | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/generator.c b/src/generator.c index b88685ed..6fa53194 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1457,8 +1457,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) notify_instance_t *ni, *tmp; connsock_t *cs; + send_proc(ckp->stratifier, "reconnect"); send_proc(ckp->connector, "reject"); - send_proc(ckp->stratifier, "dropall"); if (!proxi) // This shouldn't happen return; diff --git a/src/stratifier.c b/src/stratifier.c index da4f1d91..9231d6c6 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1465,6 +1465,7 @@ static void stratum_broadcast_message(sdata_t *sdata, const char *msg) static void reconnect_clients(sdata_t *sdata, const char *cmd) { char *port = strdupa(cmd), *url = NULL; + stratum_instance_t *client, *tmp; json_t *json_msg; strsep(&port, ":"); @@ -1480,6 +1481,14 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); stratum_broadcast(sdata, json_msg); + + /* Tag all existing clients as dropped now so they can be removed + * lazily */ + ck_wlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + client->dropped = true; + } + ck_wunlock(&sdata->instance_lock); } static void reset_bestshares(sdata_t *sdata) From 745d95246c12f403cb2c6160bd5194bfdcccc9f6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 10:49:41 +1100 Subject: [PATCH 03/14] Check for client ref counts in dropall --- src/stratifier.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9231d6c6..39188872 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -936,8 +936,11 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { - HASH_DEL(sdata->stratum_instances, client); - __kill_instance(client); + if (!client->ref) { + HASH_DEL(sdata->stratum_instances, client); + __kill_instance(client); + } else + client->dropped = true; kills++; sprintf(buf, "dropclient=%"PRId64, client->id); send_proc(ckp->connector, buf); From 9863dedf5971ea7114a437e8143891d4222429ed Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 11:20:39 +1100 Subject: [PATCH 04/14] Do not mask SIGQUIT to allow us to force coredumps on child processes --- src/ckpool.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 952d2561..cdbfbe39 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -822,7 +822,6 @@ static void launch_process(proc_instance_t *pi) sigaction(SIGUSR1, &handler, NULL); sigaction(SIGTERM, &handler, NULL); signal(SIGINT, SIG_IGN); - signal(SIGQUIT, SIG_IGN); rename_proc(pi->processname); write_namepid(pi); From 5dd35d65552ebed038448825fe75f8b3274c5e0d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 11:29:58 +1100 Subject: [PATCH 05/14] Fix workings when no serverurl is explicitly specified in config --- src/connector.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector.c b/src/connector.c index 1243bace..950dd26c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -895,6 +895,7 @@ int connector(proc_instance_t *pi) goto out; } cdata->serverfd[0] = sockd; + ckp->serverurls = 1; } else { for (i = 0; i < ckp->serverurls; i++) { char oldurl[INET6_ADDRSTRLEN], oldport[8]; From 9dd3d950642e62bd9eda7ebfde17a6141e72f7e8 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 11:58:52 +1100 Subject: [PATCH 06/14] Fix dereference errors with dropall --- src/stratifier.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 39188872..b596452e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -927,6 +927,15 @@ static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) __kill_instance(client); } +/* Removes a client instance we know is on the stratum_instances list and from + * the user client list if it's been placed on it */ +static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user) +{ + HASH_DEL(sdata->stratum_instances, client); + if (user) + DL_DELETE(user->clients, client); +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; @@ -936,13 +945,15 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + int64_t client_id = client->id; + if (!client->ref) { - HASH_DEL(sdata->stratum_instances, client); + __del_client(sdata, client, client->user_instance); __kill_instance(client); } else client->dropped = true; kills++; - sprintf(buf, "dropclient=%"PRId64, client->id); + sprintf(buf, "dropclient=%"PRId64, client_id); send_proc(ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { @@ -1202,9 +1213,7 @@ static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instan stratum_instance_t *old_client = NULL; int ret; - HASH_DEL(sdata->stratum_instances, client); - if (user) - DL_DELETE(user->clients, client); + __del_client(sdata, client, user); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { From 9f07cc11e022b907b376f735a45718166fd0e583 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 12:08:08 +1100 Subject: [PATCH 07/14] Export the char_entry_t type --- src/ckpool.h | 8 ++++++++ src/stratifier.c | 8 -------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/ckpool.h b/src/ckpool.h index 47150404..b73059dc 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -65,6 +65,14 @@ struct connsock { typedef struct connsock connsock_t; +typedef struct char_entry char_entry_t; + +struct char_entry { + char_entry_t *next; + char_entry_t *prev; + char *buf; +}; + struct server_instance { /* Hash table data */ UT_hash_handle hh; diff --git a/src/stratifier.c b/src/stratifier.c index b596452e..0f01e42e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -374,14 +374,6 @@ struct json_entry { json_t *val; }; -typedef struct char_entry char_entry_t; - -struct char_entry { - char_entry_t *next; - char_entry_t *prev; - char *buf; -}; - /* Priority levels for generator messages */ #define GEN_LAX 0 #define GEN_NORMAL 1 From 7e5a0620afad59d99f6b0a96af2db84a2fba98ad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 13:31:45 +1100 Subject: [PATCH 08/14] Cache responses in proxy mode in case they come out of order to be able to successfully managed different subscribe variations --- src/generator.c | 115 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 22 deletions(-) diff --git a/src/generator.c b/src/generator.c index 6fa53194..a18a5cc4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -117,6 +117,8 @@ struct proxy_instance { int64_t share_id; ckmsgq_t *passsends; // passthrough sends + + char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ }; typedef struct proxy_instance proxy_instance_t; @@ -490,40 +492,99 @@ static json_t *find_notify(json_t *val) return ret; } +/* Get stored line in the proxy linked list of messages if any exist or NULL */ +static char *cached_proxy_line(proxy_instance_t *proxi) +{ + char *buf = NULL; + + if (proxi->recvd_lines) { + char_entry_t *char_t = proxi->recvd_lines; + + DL_DELETE(proxi->recvd_lines, char_t); + buf = char_t->buf; + free(char_t); + } + return buf; +} + +/* Get next line in the proxy linked list of messages or a new line from the + * connsock if there are none. */ +static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi) +{ + char *buf = cached_proxy_line(proxi); + + if (!buf && read_socket_line(cs, 5) > 0) + buf = strdup(cs->buf); + return buf; +} + +/* For appending a line to the proxy recv list */ +static void append_proxy_line(proxy_instance_t *proxi, const char *buf) +{ + char_entry_t *char_t = ckalloc(sizeof(char_entry_t)); + char_t->buf = strdup(buf); + DL_APPEND(proxi->recvd_lines, char_t); +} + +/* Get a new line from the connsock and return a copy of it */ +static char *new_proxy_line(connsock_t *cs) +{ + char *buf = NULL; + + if (read_socket_line(cs, 5) < 1) + goto out; + buf = strdup(cs->buf); +out: + return buf; +} + static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *notify_val, *tmp; + bool parsed, ret = false; + int retries = 0, size; const char *string; - bool ret = false; - char *old; - int size; + char *buf, *old; - size = read_socket_line(cs, 5); - if (size < 1) { +retry: + parsed = true; + if (!(buf = new_proxy_line(cs))) { LOGWARNING("Failed to receive line in parse_subscribe"); goto out; } - LOGDEBUG("parse_subscribe received %s", cs->buf); + LOGDEBUG("parse_subscribe received %s", buf); /* Ignore err_val here stored in &tmp */ - val = json_msg_result(cs->buf, &res_val, &tmp); + val = json_msg_result(buf, &res_val, &tmp); if (!val || !res_val) { - LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf); - goto out; + LOGINFO("Failed to get a json result in parse_subscribe, got: %s", buf); + parsed = false; } if (!json_is_array(res_val)) { - LOGWARNING("Result in parse_subscribe not an array"); - goto out; + LOGINFO("Result in parse_subscribe not an array"); + parsed = false; } size = json_array_size(res_val); if (size < 3) { - LOGWARNING("Result in parse_subscribe array too small"); - goto out; + LOGINFO("Result in parse_subscribe array too small"); + parsed = false; } notify_val = find_notify(res_val); if (!notify_val) { - LOGWARNING("Failed to find notify in parse_subscribe"); + LOGINFO("Failed to find notify in parse_subscribe"); + parsed = false; + } + if (!parsed) { + if (++retries < 3) { + /* We don't want this response so put it on the proxy + * recvd list to be parsed later */ + append_proxy_line(proxi, buf); + buf = NULL; + goto retry; + } + LOGWARNING("Failed to parse subscribe response in parse_subscribe"); goto out; } + /* Free up old data in place if we are re-subscribing */ old = proxi->sessionid; proxi->sessionid = NULL; @@ -577,6 +638,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) out: if (val) json_decref(val); + free(buf); return ret; } @@ -962,6 +1024,7 @@ out: static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *req, *err_val; + char *buf = NULL; bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", @@ -979,25 +1042,24 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) /* Read and parse any extra methods sent. Anything left in the buffer * should be the response to our auth request. */ do { - int size; - - size = read_socket_line(cs, 5); - if (size < 1) { + free(buf); + buf = next_proxy_line(cs, proxi); + if (!buf) { LOGWARNING("Failed to receive line in auth_stratum"); ret = false; goto out; } - ret = parse_method(proxi, cs->buf); + ret = parse_method(proxi, buf); } while (ret); - val = json_msg_result(cs->buf, &res_val, &err_val); + val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Failed to get a json result in auth_stratum, got: %s", cs->buf); + LOGWARNING("Failed to get a json result in auth_stratum, got: %s", buf); goto out; } if (err_val && !json_is_null(err_val)) { - LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", cs->buf); + LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", buf); goto out; } if (res_val) { @@ -1014,6 +1076,15 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) out: if (val) json_decref(val); + if (ret) { + /* Now parse any cached responses so there are none in the + * queue and they can be managed one at a time from now on. */ + do { + dealloc(buf); + buf = cached_proxy_line(proxi); + parse_method(proxi, buf); + } while (buf); + } return ret; } From d66befef02797141291b619922830b96f56a038b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 13:56:21 +1100 Subject: [PATCH 09/14] Don't reset the chosen_server value in live_proxy --- src/generator.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/generator.c b/src/generator.c index a18a5cc4..1368985e 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1475,16 +1475,16 @@ out: /* Cycle through the available proxies and find the first alive one */ static proxy_instance_t *live_proxy(ckpool_t *ckp) { + int i, start_from = ckp->chosen_server; proxy_instance_t *alive = NULL; connsock_t *cs; - int i; LOGDEBUG("Attempting to connect to proxy"); retry: if (!ping_main(ckp)) goto out; - for (i = ckp->chosen_server; i < ckp->proxies; i++) { + for (i = start_from; i < ckp->proxies; i++) { proxy_instance_t *proxi; server_instance_t *si; @@ -1499,13 +1499,13 @@ retry: if (!alive) { send_proc(ckp->connector, "reject"); send_proc(ckp->stratifier, "dropall"); - if (!ckp->chosen_server) { + if (!start_from) { LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); sleep(5); } goto retry; } - ckp->chosen_server = 0; + start_from = 0; cs = alive->cs; LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); From 20746db2702781fece185679b0c72a0a90cba25b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 14:01:47 +1100 Subject: [PATCH 10/14] Check against chosen_server in the server watchdog --- src/generator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/generator.c b/src/generator.c index 1368985e..59ca9401 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1747,7 +1747,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) last_t = now_t; /* Is this the highest priority server already? */ - if (cursi == ckp->servers[0]) + if (cursi == ckp->servers[ckp->chosen_server]) return; if (ckp->proxy) From 7f92dfbc7e062f61fdde0b6ee6d06d5a6d3993e3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 14:14:34 +1100 Subject: [PATCH 11/14] Remove unnecessary warning --- src/generator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/generator.c b/src/generator.c index 59ca9401..d2708bdf 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1079,11 +1079,13 @@ out: if (ret) { /* Now parse any cached responses so there are none in the * queue and they can be managed one at a time from now on. */ - do { + while(42) { dealloc(buf); buf = cached_proxy_line(proxi); + if (!buf) + break; parse_method(proxi, buf); - } while (buf); + }; } return ret; } From fcc4309561321718858919bf03584a99b84ea070 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 14:35:11 +1100 Subject: [PATCH 12/14] Add info about proxies to warnings --- src/generator.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/generator.c b/src/generator.c index d2708bdf..00ba3831 100644 --- a/src/generator.c +++ b/src/generator.c @@ -549,7 +549,8 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) retry: parsed = true; if (!(buf = new_proxy_line(cs))) { - LOGWARNING("Failed to receive line in parse_subscribe"); + LOGWARNING("Proxy %d:%s failed to receive line in parse_subscribe", + proxi->id, proxi->si->url); goto out; } LOGDEBUG("parse_subscribe received %s", buf); @@ -581,7 +582,8 @@ retry: buf = NULL; goto retry; } - LOGWARNING("Failed to parse subscribe response in parse_subscribe"); + LOGWARNING("Proxy %d:%s failed to parse subscribe response in parse_subscribe", + proxi->id, proxi->si->url); goto out; } @@ -623,7 +625,7 @@ retry: LOGWARNING("Invalid nonce2len %d in parse_subscribe", size); goto out; } - if (size == 3) + if (size == 3 || (size == 4 && proxi->ckp->clientsvspeed)) LOGWARNING("Nonce2 length %d means proxied clients can't be >5TH each", size); else if (size < 3) { LOGWARNING("Nonce2 length %d too small to be able to proxy", size); @@ -670,7 +672,8 @@ retry: ret = send_json_msg(cs, req); json_decref(req); if (!ret) { - LOGWARNING("Failed to send message in subscribe_stratum"); + LOGWARNING("Proxy %d:%s failed to send message in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } ret = parse_subscribe(cs, proxi); @@ -678,20 +681,24 @@ retry: goto out; if (proxi->no_params) { - LOGWARNING("Failed all subscription options in subscribe_stratum"); + LOGWARNING("Proxy %d:%s failed all subscription options in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } if (proxi->sessionid) { - LOGNOTICE("Failed sessionid reconnect in subscribe_stratum, retrying without"); + LOGNOTICE("Proxy %d:%s failed sessionid reconnect in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_sessionid = true; dealloc(proxi->sessionid); } else { - LOGNOTICE("Failed connecting with parameters in subscribe_stratum, retrying without"); + LOGNOTICE("Proxy %d:%s failed connecting with parameters in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_params = true; } ret = connect_proxy(cs); if (!ret) { - LOGWARNING("Failed to reconnect in subscribe_stratum"); + LOGWARNING("Proxy %d:%s failed to reconnect in subscribe_stratum", + proxi->id, proxi->si->url); goto out; } goto retry; From 416b7a02eb33205d3a76b601ca68e39e919e50db Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 16:05:38 +1100 Subject: [PATCH 13/14] Handle proxy failover correctly for remainder of upstream pools when a reconnect is issued and don't lose original pool details, demoting it to last instead --- src/ckpool.h | 1 - src/generator.c | 156 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 104 insertions(+), 53 deletions(-) diff --git a/src/ckpool.h b/src/ckpool.h index b73059dc..b2f59333 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -183,7 +183,6 @@ struct ckpool_instance { char **serverurl; // Array of URLs to bind our server/proxy to int serverurls; // Number of server bindings int update_interval; // Seconds between stratum updates - int chosen_server; // Chosen server for next connection /* Proxy options */ int proxies; diff --git a/src/generator.c b/src/generator.c index 00ba3831..911c8f08 100644 --- a/src/generator.c +++ b/src/generator.c @@ -71,12 +71,18 @@ struct pass_msg { typedef struct pass_msg pass_msg_t; +typedef struct proxy_instance proxy_instance_t; + /* Per proxied pool instance data */ struct proxy_instance { + proxy_instance_t *next; + proxy_instance_t *prev; + ckpool_t *ckp; connsock_t *cs; server_instance_t *si; bool passthrough; + int id; /* Proxy server id */ const char *auth; const char *pass; @@ -92,14 +98,13 @@ struct proxy_instance { double diff; tv_t last_share; - int id; /* Message id for sending stratum messages */ + int msg_id; /* Message id for sending stratum messages */ bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ bool notified; /* Received new template for work */ bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ - bool replaced; /* This proxy has issued a reconnect with new data */ pthread_mutex_t notify_lock; notify_instance_t *notify_instances; @@ -121,10 +126,10 @@ struct proxy_instance { char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ }; -typedef struct proxy_instance proxy_instance_t; - /* Private data for the generator */ struct generator_data { + pthread_mutex_t lock; /* Lock protecting linked lists */ + proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -653,19 +658,19 @@ retry: /* Attempt to reconnect if the pool supports resuming */ if (proxi->sessionid) { JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION, proxi->sessionid); /* Then attempt to connect with just the client description */ } else if (!proxi->no_params) { JSON_CPACK(req, "{s:i,s:s,s:[s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION); /* Then try without any parameters */ } else { JSON_CPACK(req, "{s:i,s:s,s:[]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.subscribe", "params"); } @@ -875,7 +880,9 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; + proxy_instance_t *newproxi; ckpool_t *ckp = proxi->ckp; + gdata_t *gdata = ckp->data; const char *new_url; bool ret = false; int new_port; @@ -920,27 +927,25 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) ret = true; newsi = ckzalloc(sizeof(server_instance_t)); - newsi->id = ckp->proxies; - ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * (ckp->proxies + 1)); + + mutex_lock(&gdata->lock); + newsi->id = si->id; /* Inherit the old connection's id */ + si->id = ckp->proxies++; /* Give the old connection the lowest id */ + ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies); ckp->servers[newsi->id] = newsi; newsi->url = url; newsi->auth = strdup(si->auth); newsi->pass = strdup(si->pass); proxi->reconnect = true; - proxi->replaced = true; - - /* Reuse variable on a new proxy instance */ - proxi = ckzalloc(sizeof(proxy_instance_t)); - newsi->data = proxi; - proxi->auth = newsi->auth; - proxi->pass = newsi->pass; - proxi->si = newsi; - proxi->ckp = ckp; - proxi->cs = &newsi->cs; - - /* Set chosen server only once all new proxy data exists */ - ckp->proxies++; - ckp->chosen_server = newsi->id; + + newproxi = ckzalloc(sizeof(proxy_instance_t)); + newsi->data = newproxi; + newproxi->auth = newsi->auth; + newproxi->pass = newsi->pass; + newproxi->si = newsi; + newproxi->ckp = ckp; + newproxi->cs = &newsi->cs; + mutex_unlock(&gdata->lock); out: return ret; } @@ -1035,7 +1040,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", - "id", proxi->id++, + "id", proxi->msg_id++, "method", "mining.authorize", "params", proxi->auth, proxi->pass); ret = send_json_msg(cs, req); @@ -1430,9 +1435,6 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * { bool ret = false; - if (proxi->replaced) - return false; - /* Has this proxy already been reconnected? */ if (cs->fd > 0) return true; @@ -1484,21 +1486,29 @@ out: /* Cycle through the available proxies and find the first alive one */ static proxy_instance_t *live_proxy(ckpool_t *ckp) { - int i, start_from = ckp->chosen_server; proxy_instance_t *alive = NULL; + gdata_t *gdata = ckp->data; connsock_t *cs; + int i, srvs; LOGDEBUG("Attempting to connect to proxy"); retry: if (!ping_main(ckp)) goto out; - for (i = start_from; i < ckp->proxies; i++) { + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + + for (i = 0; i < srvs; i++) { proxy_instance_t *proxi; server_instance_t *si; + mutex_lock(&gdata->lock); si = ckp->servers[i]; proxi = si->data; + mutex_unlock(&gdata->lock); + cs = proxi->cs; if (proxy_alive(ckp, si, proxi, cs, false)) { alive = proxi; @@ -1508,13 +1518,11 @@ retry: if (!alive) { send_proc(ckp->connector, "reject"); send_proc(ckp->stratifier, "dropall"); - if (!start_from) { - LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); - sleep(5); - } + LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); + sleep(5); goto retry; } - start_from = 0; + cs = alive->cs; LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); @@ -1579,10 +1587,10 @@ reconnect: if (!proxi) goto out; if (reconnecting) { + reconnecting = false; connsock_t *cs = proxi->cs; LOGWARNING("Successfully reconnected to %s:%s as proxy", cs->url, cs->port); - reconnecting = false; } /* We've just subscribed and authorised so tell the stratifier to @@ -1690,10 +1698,13 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { + gdata_t *gdata = ckp->data; proxy_instance_t *proxi; server_instance_t *si; int i, ret; + mutex_init(&gdata->lock); + /* Create all our proxy structures and pointers */ ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); for (i = 0; i < ckp->proxies; i++) { @@ -1718,6 +1729,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) ret = proxy_loop(pi); + mutex_lock(&gdata->lock); for (i = 0; i < ckp->proxies; i++) { si = ckp->servers[i]; Close(si->cs.fd); @@ -1735,6 +1747,8 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) dealloc(si->pass); dealloc(si); } + mutex_unlock(&gdata->lock); + dealloc(ckp->servers); return ret; } @@ -1743,6 +1757,40 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) * should check to see if the higher priority servers are alive and fallback */ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) { + static time_t last_t = 0; + bool alive = false; + time_t now_t; + int i; + + /* Rate limit to checking only once every 5 seconds */ + now_t = time(NULL); + if (now_t <= last_t + 5) + return; + + last_t = now_t; + + /* Is this the highest priority server already? */ + if (!cursi->id) + return; + + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + /* Have we reached the current server? */ + if (si == cursi) + return; + + alive = server_alive(ckp, si, true); + if (alive) + break; + } + if (alive) + send_proc(ckp->generator, "reconnect"); +} + +static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) +{ + gdata_t *gdata = ckp->data; static time_t last_t = 0; bool alive = false; time_t now_t; @@ -1756,27 +1804,29 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) last_t = now_t; /* Is this the highest priority server already? */ - if (cursi == ckp->servers[ckp->chosen_server]) + if (!cursi->id) return; - if (ckp->proxy) - srvs = ckp->proxies; - else - srvs = ckp->btcds; + mutex_lock(&gdata->lock); + srvs = ckp->proxies; + mutex_unlock(&gdata->lock); + for (i = 0; i < srvs; i++) { - server_instance_t *si = ckp->servers[i]; + proxy_instance_t *proxi; + server_instance_t *si; + connsock_t *cs; + + mutex_lock(&gdata->lock); + si = ckp->servers[i]; + proxi = si->data; + mutex_unlock(&gdata->lock); /* Have we reached the current server? */ if (si == cursi) return; - if (ckp->proxy) { - proxy_instance_t *proxi = si->data; - connsock_t *cs = proxi->cs; - - alive = proxy_alive(ckp, si, proxi, cs, true); - } else - alive = server_alive(ckp, si, true); + cs = proxi->cs; + alive = proxy_alive(ckp, si, proxi, cs, true); if (alive) break; } @@ -1784,6 +1834,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) send_proc(ckp->generator, "reconnect"); } + int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -1793,12 +1844,13 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; - gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); - - if (ckp->proxy) + if (ckp->proxy) { + gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); - else + } else { + gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); ret = server_mode(ckp, pi); + } dealloc(ckp->data); return process_exit(ckp, pi, ret); From 4e95186403e11722fb548bb2db18671d8d251e69 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 16:16:51 +1100 Subject: [PATCH 14/14] Improve warnings and verbosity for various proxy connection failures --- src/generator.c | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/generator.c b/src/generator.c index 911c8f08..9c23e5c3 100644 --- a/src/generator.c +++ b/src/generator.c @@ -554,7 +554,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) retry: parsed = true; if (!(buf = new_proxy_line(cs))) { - LOGWARNING("Proxy %d:%s failed to receive line in parse_subscribe", + LOGNOTICE("Proxy %d:%s failed to receive line in parse_subscribe", proxi->id, proxi->si->url); goto out; } @@ -587,8 +587,8 @@ retry: buf = NULL; goto retry; } - LOGWARNING("Proxy %d:%s failed to parse subscribe response in parse_subscribe", - proxi->id, proxi->si->url); + LOGNOTICE("Proxy %d:%s failed to parse subscribe response in parse_subscribe", + proxi->id, proxi->si->url); goto out; } @@ -631,9 +631,11 @@ retry: goto out; } if (size == 3 || (size == 4 && proxi->ckp->clientsvspeed)) - LOGWARNING("Nonce2 length %d means proxied clients can't be >5TH each", size); + LOGWARNING("Proxy %d:%s Nonce2 length %d means proxied clients can't be >5TH each", + proxi->id, proxi->si->url, size); else if (size < 3) { - LOGWARNING("Nonce2 length %d too small to be able to proxy", size); + LOGWARNING("Proxy %d:%s Nonce2 length %d too small to be able to proxy", + proxi->id, proxi->si->url, size); goto out; } proxi->nonce2len = size; @@ -677,7 +679,7 @@ retry: ret = send_json_msg(cs, req); json_decref(req); if (!ret) { - LOGWARNING("Proxy %d:%s failed to send message in subscribe_stratum", + LOGNOTICE("Proxy %d:%s failed to send message in subscribe_stratum", proxi->id, proxi->si->url); goto out; } @@ -686,23 +688,23 @@ retry: goto out; if (proxi->no_params) { - LOGWARNING("Proxy %d:%s failed all subscription options in subscribe_stratum", + LOGNOTICE("Proxy %d:%s failed all subscription options in subscribe_stratum", proxi->id, proxi->si->url); goto out; } if (proxi->sessionid) { - LOGNOTICE("Proxy %d:%s failed sessionid reconnect in subscribe_stratum, retrying without", - proxi->id, proxi->si->url); + LOGINFO("Proxy %d:%s failed sessionid reconnect in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_sessionid = true; dealloc(proxi->sessionid); } else { - LOGNOTICE("Proxy %d:%s failed connecting with parameters in subscribe_stratum, retrying without", - proxi->id, proxi->si->url); + LOGINFO("Proxy %d:%s failed connecting with parameters in subscribe_stratum, retrying without", + proxi->id, proxi->si->url); proxi->no_params = true; } ret = connect_proxy(cs); if (!ret) { - LOGWARNING("Proxy %d:%s failed to reconnect in subscribe_stratum", + LOGNOTICE("Proxy %d:%s failed to reconnect in subscribe_stratum", proxi->id, proxi->si->url); goto out; } @@ -1046,7 +1048,8 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) ret = send_json_msg(cs, req); json_decref(req); if (!ret) { - LOGWARNING("Failed to send message in auth_stratum"); + LOGNOTICE("Proxy %d:%s failed to send message in auth_stratum", + proxi->id, proxi->si->url); Close(cs->fd); goto out; } @@ -1057,7 +1060,8 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) free(buf); buf = next_proxy_line(cs, proxi); if (!buf) { - LOGWARNING("Failed to receive line in auth_stratum"); + LOGNOTICE("Proxy %d:%s failed to receive line in auth_stratum", + proxi->id, proxi->si->url); ret = false; goto out; } @@ -1066,25 +1070,28 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Failed to get a json result in auth_stratum, got: %s", buf); + LOGWARNING("Proxy %d:%s failed to get a json result in auth_stratum, got: %s", + proxi->id, proxi->si->url, buf); goto out; } if (err_val && !json_is_null(err_val)) { - LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", buf); + LOGWARNING("Proxy %d:%s failed to authorise in auth_stratum due to err_val, got: %s", + proxi->id, proxi->si->url, buf); goto out; } if (res_val) { ret = json_is_true(res_val); if (!ret) { - LOGWARNING("Failed to authorise in auth_stratum"); + LOGWARNING("Proxy %d:%s failed to authorise in auth_stratum", + proxi->id, proxi->si->url); goto out; } } else { /* No result and no error but successful val means auth success */ ret = true; } - LOGINFO("Auth success in auth_stratum"); + LOGINFO("Proxy %d:%s auth success in auth_stratum", proxi->id, proxi->si->url); out: if (val) json_decref(val); @@ -1461,8 +1468,8 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { if (!pinging) { - LOGINFO("Failed initial subscribe to %s:%s !", - cs->url, cs->port); + LOGWARNING("Failed initial subscribe to %s:%s !", + cs->url, cs->port); } goto out; }