From 42d2e0f5c2528a636f3dfd2888d03e5b80d9a065 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 02:50:25 +1100 Subject: [PATCH 01/25] Increment client reference before adding it to the epoll list --- src/connector.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 627977b8..4c25f064 100644 --- a/src/connector.c +++ b/src/connector.c @@ -305,13 +305,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { - LOGERR("Failed to epoll_ctl add in accept_client"); - return 0; - } - /* We increase the ref count on this client as epoll creates a pointer * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ @@ -321,6 +314,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &client->sendbufsize, &optlen); LOGDEBUG("Client sendbufsize detected as %d", client->sendbufsize); + event.data.u64 = client->id; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + LOGERR("Failed to epoll_ctl add in accept_client"); + dec_instance_ref(cdata, client); + return 0; + } + return 1; } From 6902bc23dc7da1a76b716aa93965daedd3a1b948 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 03:24:23 +1100 Subject: [PATCH 02/25] API msgq is unused --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index a5861899..c9aa494f 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1888,7 +1888,7 @@ int main(int argc, char **argv) ckp.maxclients = ret * 9 / 10; } - ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); + // ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); create_pthread(&ckp.pth_listener, listener, &ckp.main); handler.sa_handler = &sighandler; From 31434bc0b901a80e401a276279dd1d71c74d3cae Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 08:55:01 +1100 Subject: [PATCH 03/25] Create unix receivers during child setup --- src/ckpool.c | 3 ++- src/ckpool.h | 1 - src/connector.c | 2 -- src/generator.c | 1 - src/stratifier.c | 2 -- 5 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index c9aa494f..daa9b034 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -286,7 +286,7 @@ unix_msg_t *get_unix_msg(proc_instance_t *pi) return umsg; } -void create_unix_receiver(proc_instance_t *pi) +static void create_unix_receiver(proc_instance_t *pi) { pthread_t pth; @@ -1499,6 +1499,7 @@ static void prepare_child(ckpool_t *ckp, proc_instance_t *pi, void *process, cha pi->sockname = pi->processname; create_process_unixsock(pi); create_pthread(&pi->pth_process, process, pi); + create_unix_receiver(pi); } #ifdef USE_CKDB diff --git a/src/ckpool.h b/src/ckpool.h index 08de2914..6877bad5 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -318,7 +318,6 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); unix_msg_t *get_unix_msg(proc_instance_t *pi); -void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp; diff --git a/src/connector.c b/src/connector.c index 4c25f064..467200f4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1577,8 +1577,6 @@ void *connector(void *arg) create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL); - create_unix_receiver(pi); - ret = connector_loop(pi, cdata); out: dealloc(ckp->cdata); diff --git a/src/generator.c b/src/generator.c index 418077d9..154b87cc 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2915,7 +2915,6 @@ void *generator(void *arg) gdata = ckzalloc(sizeof(gdata_t)); ckp->gdata = gdata; gdata->ckp = ckp; - create_unix_receiver(pi); if (ckp->proxy) { char *buf = NULL; diff --git a/src/stratifier.c b/src/stratifier.c index e9afeb99..a319997d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -7609,8 +7609,6 @@ void *stratifier(void *arg) mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); - create_unix_receiver(pi); - LOGWARNING("%s stratifier ready", ckp->name); stratum_loop(ckp, pi); From bde77b155c3e6ca6b5c5fa955e5a03ebd94754de Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:04:52 +1100 Subject: [PATCH 04/25] Connector loop should never return --- src/connector.c | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/connector.c b/src/connector.c index 467200f4..0260092c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1309,7 +1309,7 @@ static char *connector_stats(cdata_t *cdata, const int runtime) return buf; } -static int connector_loop(proc_instance_t *pi, cdata_t *cdata) +static void connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; @@ -1409,8 +1409,6 @@ retry: send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); - } else if (cmdmatch(buf, "shutdown")) { - goto out; } else if (cmdmatch(buf, "passthrough")) { client_instance_t *client; @@ -1450,16 +1448,14 @@ retry: } else LOGWARNING("Unhandled connector message: %s", buf); goto retry; -out: - return ret; } void *connector(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; cdata_t *cdata = ckzalloc(sizeof(cdata_t)); - int threads, sockd, ret = 0, i, tries = 0; char newurl[INET6_ADDRSTRLEN], newport[8]; + int threads, sockd, i, tries = 0, ret; ckpool_t *ckp = pi->ckp; const int on = 1; @@ -1478,7 +1474,6 @@ void *connector(void *arg) sockd = socket(AF_INET, SOCK_STREAM, 0); if (sockd < 0) { LOGERR("Connector failed to open socket"); - ret = 1; goto out; } setsockopt(sockd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); @@ -1488,6 +1483,7 @@ void *connector(void *arg) serv_addr.sin_port = htons(ckp->proxy ? 3334 : 3333); do { ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); + if (!ret) break; LOGWARNING("Connector failed to bind to socket, retrying in 5s"); @@ -1518,7 +1514,6 @@ void *connector(void *arg) if (!url_from_serverurl(serverurl, newurl, newport)) { LOGWARNING("Failed to extract resolved url from %s", serverurl); - ret = 1; goto out; } sockd = ckp->oldconnfd[i]; @@ -1542,7 +1537,6 @@ void *connector(void *arg) if (sockd < 0) { LOGERR("Connector failed to bind to socket for 2 minutes"); - ret = 1; goto out; } if (listen(sockd, 8192) < 0) { @@ -1559,10 +1553,9 @@ void *connector(void *arg) cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); - if (ckp->remote && !setup_upstream(ckp, cdata)) { - ret = 1; + if (ckp->remote && !setup_upstream(ckp, cdata)) goto out; - } + cklock_init(&cdata->lock); cdata->pi = pi; cdata->nfds = 0; @@ -1577,8 +1570,10 @@ void *connector(void *arg) create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL); - ret = connector_loop(pi, cdata); + connector_loop(pi, cdata); out: - dealloc(ckp->cdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Connector failure, shutting down"); + exit(1); return NULL; } From 95e1ed04bb2bd3f7c8c57d282c1ce8e515c88147 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:06:20 +1100 Subject: [PATCH 05/25] Stratifier loop should never return --- src/stratifier.c | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index a319997d..95567f70 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -7613,16 +7613,8 @@ void *stratifier(void *arg) stratum_loop(ckp, pi); out: - if (ckp->proxy) { - proxy_t *proxy, *tmpproxy; - - mutex_lock(&sdata->proxy_lock); - HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) { - HASH_DEL(sdata->proxies, proxy); - dealloc(proxy); - } - mutex_unlock(&sdata->proxy_lock); - } - dealloc(ckp->sdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Stratifier failure, shutting down"); + exit(1); return NULL; } From fd680659ee5ec23c987c3993933f6533de767f79 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:10:40 +1100 Subject: [PATCH 06/25] Generator loop should never return --- src/generator.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/generator.c b/src/generator.c index 154b87cc..ca2c6adf 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2928,6 +2928,8 @@ void *generator(void *arg) proxy_mode(ckp, pi); } else server_mode(ckp, pi); - dealloc(ckp->gdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Generator failure, shutting down"); + exit(1); return NULL; } From 3dac4d4fc8026feaf2f473085626860b16a8f7d7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:16:21 +1100 Subject: [PATCH 07/25] Determine when messages are being sent to ckmsgqs not set up yet --- src/ckpool.c | 3 ++- src/ckpool.h | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index daa9b034..0ee16a78 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -180,11 +180,12 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread(s) to wake up and process it. */ -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line) { ckmsg_t *msg; if (unlikely(!ckmsgq)) { + LOGWARNING("Sending messages to no queue from %s %s:%d", file, func, line); /* Discard data if we're unlucky enough to be sending it to * msg queues not set up during start up */ free(data); diff --git a/src/ckpool.h b/src/ckpool.h index 6877bad5..66297959 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -315,7 +315,8 @@ static const char __maybe_unused *stratum_msgs[] = { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); +void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line); +#define ckmsgq_add(ckmsgq, data) _ckmsgq_add(ckmsgq, data, __FILE__, __func__, __LINE__) bool ckmsgq_empty(ckmsgq_t *ckmsgq); unix_msg_t *get_unix_msg(proc_instance_t *pi); From aca7bb39bf3d9c8f224dcbebf3195bb32a8f5bc2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:22:33 +1100 Subject: [PATCH 08/25] Wait for stratifier before processing messages in connector receiver --- src/connector.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 0260092c..96274f2f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -625,8 +625,10 @@ static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; struct epoll_event *event = ckzalloc(sizeof(struct epoll_event)); + ckpool_t *ckp = cdata->ckp; uint64_t serverfds, i; int ret, epfd; + char *buf; rename_proc("creceiver"); @@ -635,7 +637,7 @@ static void *receiver(void *arg) LOGEMERG("FATAL: Failed to create epoll in receiver"); goto out; } - serverfds = cdata->ckp->serverurls; + serverfds = ckp->serverurls; /* Add all the serverfds to the epoll */ for (i = 0; i < serverfds; i++) { /* The small values will be less than the first client ids */ @@ -648,8 +650,11 @@ static void *receiver(void *arg) } } - while (!cdata->accept) - cksleep_ms(1); + /* Wait for the stratifier to be ready for us */ + do { + buf = send_recv_proc(ckp->stratifier, "ping"); + } while (!buf); + free(buf); while (42) { uint64_t edu64; From 344935a932acdc58ae395126964c6e31897b89e1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:24:43 +1100 Subject: [PATCH 09/25] Reinstate "Microoptimise ckdbq_process" This reverts commit f35f54a6428a99bacad6daedbd7cd8367c985242. --- src/stratifier.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 95567f70..f55825a1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -6779,7 +6779,7 @@ static bool test_and_clear(bool *val, mutex_t *lock) static void ckdbq_process(ckpool_t *ckp, char *msg) { sdata_t *sdata = ckp->sdata; - size_t responselen = 0; + size_t responselen; char *buf = NULL; while (!buf) { @@ -6799,16 +6799,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) /* Process any requests from ckdb that are heartbeat responses with * specific requests. */ - if (likely(buf)) - responselen = strlen(buf); - if (likely(responselen > 0)) { + responselen = strlen(buf); + if (likely(responselen > 1)) { char *response = alloca(responselen); int offset = 0; memset(response, 0, responselen); - if (sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0) { - strcpy(response+1, buf+offset); - if (safecmp(response, "ok")) { + if (likely(sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0)) { + strcpy(response + 1, buf + offset); + if (likely(safecmp(response, "ok"))) { char *cmd; cmd = response; @@ -6822,8 +6821,8 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) LOGWARNING("Got ckdb failure response: %s", buf); } else LOGWARNING("Got bad ckdb response: %s", buf); - free(buf); } + free(buf); } static int transactions_by_jobid(sdata_t *sdata, const int64_t id) From f368692c493d67e759988f99305b8db8dac01a33 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:25:43 +1100 Subject: [PATCH 10/25] Reinstate "Further thread the next two biggest CPU users" This reverts commit 96aa070652fa8a2438445c5aebc40df33f3b7fe5. --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index f55825a1..3ad4914a 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -7586,11 +7586,11 @@ void *stratifier(void *arg) * are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); From e148e76d6192ef4edec59f447f3e8143e4c98297 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:31:55 +1100 Subject: [PATCH 11/25] Name and detach server watchdog thread --- src/generator.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/generator.c b/src/generator.c index ca2c6adf..e0f14369 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2787,6 +2787,10 @@ static void *server_watchdog(void *arg) ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->gdata; + rename_proc("swatchdog"); + + pthread_detach(pthread_self()); + while (42) { server_instance_t *best = NULL; ts_t timer_t; From c4680466511f6fe9967d9647e629c5ea7f16d0fc Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:43:16 +1100 Subject: [PATCH 12/25] Read oldpid in handover mode before getting handover --- src/ckpool.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 0ee16a78..85ca432c 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1824,6 +1824,7 @@ int main(int argc, char **argv) ckp.main.sockname = strdup("listener"); name_process_sockname(&ckp.main.us, &ckp.main); ckp.oldconnfd = ckzalloc(sizeof(int *) * ckp.serverurls); + manage_old_instance(&ckp, &ckp.main); if (ckp.handover) { const char *path = ckp.main.us.path; @@ -1872,8 +1873,6 @@ int main(int argc, char **argv) } } - if (!ckp.handover) - manage_old_instance(&ckp, &ckp.main); write_namepid(&ckp.main); open_process_sock(&ckp, &ckp.main, &ckp.main.us); launch_logger(&ckp); From 0af56e8b2aa02dce9b9014fa0d763909a2e1410a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:44:37 +1100 Subject: [PATCH 13/25] Fix typo --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 85ca432c..68d31c2d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1561,7 +1561,7 @@ static bool send_recv_path(const char *path, const char *msg) LOGWARNING("Received: %s in response to %s request", response, msg); dealloc(response); } else - LOGWARNING("Received not response to %s request", msg); + LOGWARNING("Received no response to %s request", msg); Close(sockd); return ret; } From 52cf7c8c29524a61b18bec21a535b575ed20d77e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 27 Feb 2016 09:50:05 +1100 Subject: [PATCH 14/25] More info --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 68d31c2d..f062ad93 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -910,7 +910,7 @@ out: static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid_t oldpid) { if (!ckp->killold) { - quit(1, "Process %s pid %d still exists, start ckpool with -k if you wish to kill it", + quit(1, "Process %s pid %d still exists, start ckpool with -H to get a handover or -k if you wish to kill it", pi->processname, oldpid); } LOGNOTICE("Terminating old process %s pid %d", pi->processname, oldpid); From 2ed69d253494bfe2823337a19c53be692255a0b4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 28 Feb 2016 11:26:46 +1100 Subject: [PATCH 15/25] Allow the socket close to be responsible for removing client fds from the epoll list --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 96274f2f..1df23262 100644 --- a/src/connector.c +++ b/src/connector.c @@ -333,12 +333,12 @@ static int __drop_client(cdata_t *cdata, client_instance_t *client) goto out; client->invalid = true; ret = client->fd; + /* Closing the fd will automatically remove it from the epoll list */ Close(client->fd); - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, ret, NULL); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the - * epoll list. */ + * epoll list. */ __dec_instance_ref(client); cdata->dead_generated++; out: From 7357525712afadcfa42567b2dc87c7b16a7a45c5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 28 Feb 2016 11:34:20 +1100 Subject: [PATCH 16/25] Rearm the epoll client fd only when it's still valid after processing its message --- src/connector.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/connector.c b/src/connector.c index 1df23262..a1eb413e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -570,25 +570,19 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) client = ref_client_by_id(cdata, id); if (unlikely(!client)) { - LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); + LOGWARNING("Failed to find client by id %"PRId64" in receiver!", id); goto outnoclient; } - if (unlikely(client->invalid)) - goto out; /* We can have both messages and read hang ups so process the - * message first. */ + * message first. */ if (likely(events & EPOLLIN)) { /* Rearm the client for epoll events if we have successfully * parsed a message from it */ - if (parse_client_msg(ckp, cdata, client)) { - event->data.u64 = id; - event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); - } else + if (unlikely(!parse_client_msg(ckp, cdata, client))) { invalidate_client(ckp, cdata, client); + goto out; + } } - if (unlikely(client->invalid)) - goto out; if (unlikely(events & EPOLLERR)) { socklen_t errlen = sizeof(int); int error = 0; @@ -614,6 +608,12 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) invalidate_client(cdata->pi->ckp, cdata, client); } out: + if (likely(!client->invalid)) { + /* Rearm the fd in the epoll list if it's still active */ + event->data.u64 = id; + event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); + } dec_instance_ref(cdata, client); outnoclient: free(event); From 689ab86fd04a2f79b35a92f39d336df8a4118e99 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 1 Mar 2016 09:00:12 +1100 Subject: [PATCH 17/25] Now that we've confirmed failing to find a client id in the receiver is a rare event again, the loglevel can be dropped in priority --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index a1eb413e..ea5144f9 100644 --- a/src/connector.c +++ b/src/connector.c @@ -570,7 +570,7 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) client = ref_client_by_id(cdata, id); if (unlikely(!client)) { - LOGWARNING("Failed to find client by id %"PRId64" in receiver!", id); + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); goto outnoclient; } /* We can have both messages and read hang ups so process the From 06a29983f5163200dbca7d847b3dfabb8d61b743 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 1 Mar 2016 09:03:35 +1100 Subject: [PATCH 18/25] Reset worker mindiff to startdiff when it is set to zero --- src/stratifier.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 3ad4914a..66bc831f 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5765,6 +5765,8 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); return; } + if (!mindiff) + mindiff = ckp->startdiff; if (mindiff < ckp->mindiff) mindiff = ckp->mindiff; if (mindiff == worker->mindiff) From 81638e5a0628b2a7b651c9a2048498131af3c35c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 08:05:23 +1100 Subject: [PATCH 19/25] Correctly reset worker diff when it's set to zero via ckdb --- src/stratifier.c | 90 ++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 66bc831f..9cfe10fd 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4772,7 +4772,51 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, return user; } -static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff); +static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff) +{ + stratum_instance_t *client; + sdata_t *sdata = ckp->sdata; + worker_instance_t *worker; + user_instance_t *user; + + /* Find the user first */ + user = user_by_workername(sdata, workername); + + /* Then find the matching worker user */ + worker = get_worker(sdata, user, workername); + + if (mindiff < 1) { + if (likely(!mindiff)) { + worker->mindiff = 0; + return; + } + LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); + return; + } + if (mindiff < ckp->mindiff) + mindiff = ckp->mindiff; + if (mindiff == worker->mindiff) + return; + worker->mindiff = mindiff; + + /* Iterate over all the workers from this user to find any with the + * matching worker that are currently live and send them a new diff + * if we can. Otherwise it will only act as a clamp on next share + * submission. */ + ck_rlock(&sdata->instance_lock); + DL_FOREACH(user->clients, client) { + if (client->worker_instance != worker) + continue; + /* Per connection suggest diff overrides worker mindiff ugh */ + if (mindiff < client->suggest_diff) + continue; + if (mindiff == client->diff) + continue; + client->diff = mindiff; + stratum_send_diff(sdata, client); + } + ck_runlock(&sdata->instance_lock); +} static void parse_worker_diffs(ckpool_t *ckp, json_t *worker_array) { @@ -5748,50 +5792,6 @@ static json_params_t return jp; } -static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff) -{ - stratum_instance_t *client; - sdata_t *sdata = ckp->sdata; - worker_instance_t *worker; - user_instance_t *user; - - /* Find the user first */ - user = user_by_workername(sdata, workername); - - /* Then find the matching worker user */ - worker = get_worker(sdata, user, workername); - - if (mindiff < 0) { - LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); - return; - } - if (!mindiff) - mindiff = ckp->startdiff; - if (mindiff < ckp->mindiff) - mindiff = ckp->mindiff; - if (mindiff == worker->mindiff) - return; - worker->mindiff = mindiff; - - /* Iterate over all the workers from this user to find any with the - * matching worker that are currently live and send them a new diff - * if we can. Otherwise it will only act as a clamp on next share - * submission. */ - ck_rlock(&sdata->instance_lock); - DL_FOREACH(user->clients, client) { - if (client->worker_instance != worker) - continue; - /* Per connection suggest diff overrides worker mindiff ugh */ - if (mindiff < client->suggest_diff) - continue; - if (mindiff == client->diff) - continue; - client->diff = mindiff; - stratum_send_diff(sdata, client); - } - ck_runlock(&sdata->instance_lock); -} - /* Implement support for the diff in the params as well as the originally * documented form of placing diff within the method. Needs to be entered with * client holding a ref count. */ From 00a2f5970794843244b6bdb0d590b4a8753f7630 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 08:50:59 +1100 Subject: [PATCH 20/25] Clamp suggest diff to pool mindiff rather than handling pool mindiff separately --- src/stratifier.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9cfe10fd..e8235774 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5795,10 +5795,10 @@ static json_params_t /* Implement support for the diff in the params as well as the originally * documented form of placing diff within the method. Needs to be entered with * client holding a ref count. */ -static void suggest_diff(stratum_instance_t *client, const char *method, const json_t *params_val) +static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char *method, + const json_t *params_val) { json_t *arr_val = json_array_get(params_val, 0); - sdata_t *sdata = client->ckp->sdata; int64_t sdiff; if (unlikely(!client_active(client))) { @@ -5811,16 +5811,16 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id); return; } + /* Clamp suggest diff to global pool mindiff */ + if (sdiff < ckp->mindiff) + sdiff = ckp->mindiff; if (sdiff == client->suggest_diff) return; client->suggest_diff = sdiff; if (client->diff == sdiff) return; - if (sdiff < client->ckp->mindiff) - client->diff = client->ckp->mindiff; - else - client->diff = sdiff; - stratum_send_diff(sdata, client); + client->diff = sdiff; + stratum_send_diff(ckp->sdata, client); } /* Send diff first when sending the first stratum template after subscribing */ @@ -6034,7 +6034,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie } if (cmdmatch(method, "mining.suggest")) { - suggest_diff(client, method, params_val); + suggest_diff(ckp, client, method, params_val); return; } From f57aa8bafea14f56775173a7d64e3051e3d53f08 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 08:53:26 +1100 Subject: [PATCH 21/25] Only set suggest_diff with the suggest-diff stratum command --- src/stratifier.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e8235774..886d4d57 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4889,7 +4889,6 @@ static int send_recv_auth(stratum_instance_t *client) responselen = strlen(buf); if (likely(responselen > 0)) { char *cmd = NULL, *secondaryuserid = NULL, *response; - worker_instance_t *worker = client->worker_instance; json_error_t err_val; json_t *val = NULL; int offset = 0; @@ -4921,7 +4920,6 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); - client->suggest_diff = worker->mindiff; user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || @@ -4962,9 +4960,6 @@ static void queue_delayed_auth(stratum_instance_t *client) json_t *val; ts_t now; - /* Read off any cached mindiff from previous auths */ - client->suggest_diff = client->worker_instance->mindiff; - ts_realtime(&now); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); @@ -5070,9 +5065,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ /* Preauth workers for the first 10 minutes after the user is * first authorised by ckdb to avoid floods of worker auths. * *errnum is implied zero already so ret will be set true */ - if (user->auth_time && time(NULL) - user->auth_time < 600) - client->suggest_diff = client->worker_instance->mindiff; - else + if (!user->auth_time || time(NULL) - user->auth_time > 600) *errnum = send_recv_auth(client); if (!*errnum) ret = true; From 505dc002d95ce8c273ca29ec513bcdde017fdb3a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 09:02:31 +1100 Subject: [PATCH 22/25] Handle suggest_diff separately from worker mindiff when authorising --- src/stratifier.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 886d4d57..12576e93 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -6670,10 +6670,10 @@ static stratum_instance_t *preauth_ref_instance_by_id(sdata_t *sdata, const int6 static void sauth_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; sdata_t *sdata = ckp->sdata; - int mindiff, errnum = 0; - int64_t client_id; + stratum_instance_t *client; + int64_t mindiff, client_id; + int errnum = 0; client_id = jp->client_id; @@ -6703,12 +6703,18 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT); - if (!json_is_true(result_val) || !client->suggest_diff) + if (!json_is_true(result_val)) goto out; /* Update the client now if they have set a valid mindiff different - * from the startdiff */ - mindiff = MAX(ckp->mindiff, client->suggest_diff); + * from the startdiff. suggest_diff overrides worker mindiff */ + if (client->suggest_diff) + mindiff = client->suggest_diff; + else + mindiff = client->worker_instance->mindiff; + if (!mindiff) + goto out; + mindiff = MAX(ckp->mindiff, mindiff); if (mindiff != client->diff) { client->diff = mindiff; stratum_send_diff(sdata, client); From bfe83a46b39c8c64cb0fa8bf9c403cc5cfaab57d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 09:15:35 +1100 Subject: [PATCH 23/25] Tidy up and make clear clamping process in add_submit --- src/stratifier.c | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 12576e93..6744cdee 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5158,7 +5158,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d worker_instance_t *worker = client->worker_instance; double tdiff, bdiff, dsps, drr, network_diff, bias; user_instance_t *user = client->user_instance; - int64_t next_blockid, optimal; + int64_t next_blockid, optimal, mindiff; tv_t now_t; mutex_lock(&ckp_sdata->stats_lock); @@ -5233,8 +5233,13 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d if (drr > 0.15 && drr < 0.4) return; + /* Client suggest diff overrides worker mindiff */ + if (client->suggest_diff) + mindiff = client->suggest_diff; + else + mindiff = worker->mindiff; /* Allow slightly lower diffs when users choose their own mindiff */ - if (worker->mindiff || client->suggest_diff) { + if (mindiff) { if (drr < 0.5) return; optimal = lround(dsps * 2.4); @@ -5242,18 +5247,20 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d optimal = lround(dsps * 3.33); /* Clamp to mindiff ~ network_diff */ - if (optimal < ckp->mindiff) - optimal = ckp->mindiff; - /* Client suggest diff overrides worker mindiff */ - if (client->suggest_diff) { - if (optimal < client->suggest_diff) - optimal = client->suggest_diff; - } else if (optimal < worker->mindiff) - optimal = worker->mindiff; - if (ckp->maxdiff && optimal > ckp->maxdiff) - optimal = ckp->maxdiff; - if (optimal > network_diff) - optimal = network_diff; + + /* Set to higher of pool mindiff and optimal */ + optimal = MAX(optimal, ckp->mindiff); + + /* Set to higher of optimal and user chosen diff */ + optimal = MAX(optimal, mindiff); + + /* Set to lower of optimal and pool maxdiff */ + if (ckp->maxdiff) + optimal = MIN(optimal, ckp->maxdiff); + + /* Set to lower of optimal and network_diff */ + optimal = MIN(optimal, network_diff); + if (client->diff == optimal) return; From f636f9eef048f4be2bcee8be536fbb6088c4cb52 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 09:52:45 +1100 Subject: [PATCH 24/25] Set old diff on changing mindiff or sugget diff --- src/stratifier.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 6744cdee..ab8ecd0f 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4812,6 +4812,8 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif continue; if (mindiff == client->diff) continue; + client->diff_change_job_id = sdata->workbase_id + 1; + client->old_diff = client->diff; client->diff = mindiff; stratum_send_diff(sdata, client); } @@ -5819,6 +5821,8 @@ static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char * client->suggest_diff = sdiff; if (client->diff == sdiff) return; + client->diff_change_job_id = client->sdata->workbase_id + 1; + client->old_diff = client->diff; client->diff = sdiff; stratum_send_diff(ckp->sdata, client); } From 7a06c4f009175844cf5eeb59edfc47e847efa481 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 2 Mar 2016 09:55:05 +1100 Subject: [PATCH 25/25] Accept only old diffs until the diff change job id --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index ab8ecd0f..03358643 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5585,8 +5585,8 @@ out_submit: out_unlock: ck_runlock(&sdata->workbase_lock); - /* Accept the lower of new and old diffs until the next update */ - if (id < client->diff_change_job_id && client->old_diff < client->diff) + /* Accept shares of the old diff until the next update */ + if (id < client->diff_change_job_id) diff = client->old_diff; if (!invalid) { char wdiffsuffix[16];