From a0753a3965e03de3f662e12e10bb9ad05947fa35 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 10:12:04 +1100 Subject: [PATCH 01/20] Handle other forms of read_socket_line ending after message complete as not a failure --- src/ckpool.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index e90d0bb5..55f9185d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -419,9 +419,9 @@ int read_socket_line(connsock_t *cs, const int timeout) char *newbuf; ret = wait_read_select(fd, eom ? 0 : timeout); - if (eom && !ret) - break; if (ret < 1) { + if (eom) + break; if (!ret) LOGDEBUG("Select timed out in read_socket_line"); else @@ -431,7 +431,7 @@ int read_socket_line(connsock_t *cs, const int timeout) ret = recv(fd, readbuf, PAGESIZE - 4, 0); if (ret < 1) { /* Closed socket after valid message */ - if (!ret && eom) + if (eom) break; LOGERR("Failed to recv in read_socket_line"); ret = -1; From 08ef8ef3debe889d6d5423ddf0b7f97d0229c786 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 14 Feb 2015 11:25:33 +1100 Subject: [PATCH 02/20] Differentiate pong from other spurious messages from clients --- src/stratifier.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index a62530e7..57d73a0a 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3414,7 +3414,11 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * if (res_val) { const char *result = json_string_value(res_val); - LOGDEBUG("Received spurious response %s", result ? result : ""); + if (!safecmp(result, "pong")) + LOGDEBUG("Received pong from client %"PRId64, client_id); + else + LOGDEBUG("Received spurious response %s from client %"PRId64, + result ? result : "", client_id); goto out; } send_json_err(sdata, client_id, id_val, "-3:method not found"); From 2b97f1833faabdf56b6a4f79cc1aad68f4e16751 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 15:57:55 +1100 Subject: [PATCH 03/20] Shutdown instead of closing a socket after sending a unix message allowing the receiving end to close the socket after receiving the data --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 55f9185d..0d176bac 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -525,7 +525,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - Close(sockd); + shutdown(sockd, SHUT_WR); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); From 316ceba75bbc17df77cfdb471de23dc5aecbdd04 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 17:11:38 +1100 Subject: [PATCH 04/20] Check for pid in send_recv_proc as well --- src/ckpool.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 0d176bac..d6091ff8 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -549,6 +549,8 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) + pi->pid = get_proc_pid(pi); if (unlikely(kill_pid(pi->pid, 0))) { LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); goto out; From 90c682177f67e86e19c477c1831c9d03f5edbfd8 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 17:49:56 +1100 Subject: [PATCH 05/20] Wait for the other end to close a unix socket to ensure the message has gone through --- src/ckpool.c | 2 ++ src/libckpool.c | 14 ++++++++++++++ src/libckpool.h | 1 + 3 files changed, 17 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index d6091ff8..004a09a3 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -526,6 +526,8 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch else ret = true; shutdown(sockd, SHUT_WR); + if (!wait_close(sockd, 5)) + LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); diff --git a/src/libckpool.c b/src/libckpool.c index 19fe19ed..cae50bda 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -910,6 +910,20 @@ out: return sockd; } +/* Wait till a socket has been closed at the other end */ +int wait_close(int sockd, int timeout) +{ + struct pollfd sfd; + + if (unlikely(sockd < 0)) + return -1; + sfd.fd = sockd; + sfd.events = POLLHUP; + sfd.revents = 0; + timeout *= 1000; + return poll(&sfd, 1, timeout); +} + /* Emulate a select read wait for high fds that select doesn't support */ int wait_read_select(int sockd, int timeout) { diff --git a/src/libckpool.h b/src/libckpool.h index e21acc96..79f0f8fd 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -493,6 +493,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) +int wait_close(int sockd, int timeout); int wait_read_select(int sockd, int timeout); int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); From 236634239e9b680b01f3be88b87c55131396c510 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 20:04:03 +1100 Subject: [PATCH 06/20] Close our end of the socket in send_proc --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 004a09a3..3b433d44 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -525,9 +525,9 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - shutdown(sockd, SHUT_WR); if (!wait_close(sockd, 5)) LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); + Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); From 07874e9f3027cf6723baa594d70892fdf91a607c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 23:59:42 +1100 Subject: [PATCH 07/20] Show message associated with no close fd detection --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 3b433d44..70d33adb 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -526,7 +526,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch else ret = true; if (!wait_close(sockd, 5)) - LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); + LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); Close(sockd); out: if (unlikely(!ret)) { From 85b17f1b78563baa3705b5e0b152e30bd6c6ff7f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 00:34:48 +1100 Subject: [PATCH 08/20] Check for correct condition in wait_close --- src/libckpool.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index cae50bda..7fc3017b 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -914,14 +914,18 @@ out: int wait_close(int sockd, int timeout) { struct pollfd sfd; + int ret; if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLHUP; + sfd.events = POLLIN; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret < 1) + return 0; + return sfd.revents & POLLHUP; } /* Emulate a select read wait for high fds that select doesn't support */ From a23060d786600ee337aad92b1ca9089a064096a2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:08:26 +1100 Subject: [PATCH 09/20] Fix buf dereference error --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 57d73a0a..9bf2ab98 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4342,8 +4342,8 @@ int stratifier(proc_instance_t *pi) ckpool_t *ckp = pi->ckp; int ret = 1, threads; int64_t randomiser; + char *buf = NULL; sdata_t *sdata; - char *buf; LOGWARNING("%s stratifier starting", ckp->name); sdata = ckzalloc(sizeof(sdata_t)); From 487e918ff7eaafaeade6b57d7a9e6d26e882946c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:16:05 +1100 Subject: [PATCH 10/20] Return value of send_proc is never used --- src/ckpool.c | 3 +-- src/ckpool.h | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 70d33adb..4b753074 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -494,7 +494,7 @@ out: /* Send a single message to a process instance when there will be no response, * closing the socket immediately. */ -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { char *path = pi->us.path; bool ret = false; @@ -533,7 +533,6 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } - return ret; } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 797beb24..771ac692 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -212,7 +212,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) From bd68f928b710816c984ec3a8dbc852f10ed3f6b4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:49:24 +1100 Subject: [PATCH 11/20] Make all one way send_procs asynchronous to avoid message response deadlocks --- src/ckpool.c | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 4b753074..989b1b65 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -492,14 +492,32 @@ out: return pid; } -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -533,6 +551,24 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } + free(msg); + free(pm); + return NULL; +} + +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + pthread_t pth; + + pm->pi = pi; + pm->msg = strdup(msg); + pm->file = file; + pm->func = func; + pm->line = line; + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then From 33508b2243818bea6c96037800ced4a59da3d0c5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 09:47:37 +1100 Subject: [PATCH 12/20] Reset the pi pid after a failure to find the process alive so we can look it up again in case it has changed --- src/ckpool.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 989b1b65..b4b280f4 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -589,6 +589,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co if (unlikely(!pi->pid)) pi->pid = get_proc_pid(pi); if (unlikely(kill_pid(pi->pid, 0))) { + /* Reset the pid value in case we are still looking for an old + * process */ + pi->pid = 0; LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); goto out; } From 646d4a95602ead1e6cad397e3ef18c62f11d8e29 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 17:50:50 +1100 Subject: [PATCH 13/20] Cope with unknown pids in various send msg commands without terminal failure --- src/ckpool.c | 60 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index b4b280f4..56d1c385 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -237,7 +237,25 @@ static int pid_wait(const pid_t pid, const int ms) return ret; } -static int send_procmsg(const proc_instance_t *pi, const char *buf) +static int get_proc_pid(const proc_instance_t *pi) +{ + int ret, pid = 0; + char path[256]; + FILE *fp; + + sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); + fp = fopen(path, "re"); + if (!fp) + goto out; + ret = fscanf(fp, "%d", &pid); + if (ret < 1) + pid = 0; + fclose(fp); +out: + return pid; +} + +static int send_procmsg(proc_instance_t *pi, const char *buf) { char *path = pi->us.path; int ret = -1; @@ -251,6 +269,12 @@ static int send_procmsg(const proc_instance_t *pi, const char *buf) LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) { + pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + + } if (unlikely(kill_pid(pi->pid, 0))) { LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname); goto out; @@ -474,24 +498,6 @@ out: static void childsighandler(const int sig); -static int get_proc_pid(const proc_instance_t *pi) -{ - int ret, pid = 0; - char path[256]; - FILE *fp; - - sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); - fp = fopen(path, "re"); - if (!fp) - goto out; - ret = fscanf(fp, "%d", &pid); - if (ret < 1) - pid = 0; - fclose(fp); -out: - return pid; -} - struct proc_message { proc_instance_t *pi; char *msg; @@ -528,10 +534,16 @@ void *async_send_proc(void *arg) } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } + } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -551,6 +563,7 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } +out_nofail: free(msg); free(pm); return NULL; @@ -586,8 +599,11 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + } if (unlikely(kill_pid(pi->pid, 0))) { /* Reset the pid value in case we are still looking for an old * process */ From d15ccdf54d713da45e734833b448e7c60b6ebeea Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 17:59:59 +1100 Subject: [PATCH 14/20] Remove the old pid file per process when preparing the new child processes --- src/ckpool.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 56d1c385..a1c4a514 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1260,6 +1260,8 @@ static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *nam pi->process = process; create_process_unixsock(pi); manage_old_child(ckp, pi); + /* Remove the old pid file if we've succeeded in coming this far */ + rm_namepid(pi); return pi; } From d594f86520731384278ba32f3190bef6ba64ea9f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 09:54:39 +1100 Subject: [PATCH 15/20] Create generic workqueue function and message receiving and parsing helpers --- src/ckpool.c | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/ckpool.h | 29 ++++++++++++++++++--- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index a1c4a514..6b8fa480 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg) return NULL; } +/* Generic workqueue function and message receiving and parsing thread */ +static void *ckwq_queue(void *arg) +{ + ckwq_t *ckmsgq = (ckwq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckwqmsg_t *wqmsg; + tv_t now; + ts_t abs; + + mutex_lock(ckmsgq->lock); + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec++; + if (!ckmsgq->wqmsgs) + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + wqmsg = ckmsgq->wqmsgs; + if (wqmsg) + DL_DELETE(ckmsgq->wqmsgs, wqmsg); + mutex_unlock(ckmsgq->lock); + + if (!wqmsg) + continue; + wqmsg->func(ckp, wqmsg->data); + free(wqmsg); + } + return NULL; +} + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons return ckmsgq; } +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) +{ + ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); + mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); + ckwq[i].ckp = ckp; + ckwq[i].lock = lock; + ckwq[i].cond = cond; + create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); + } + + return ckwq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(ckmsgq->cond); + pthread_cond_broadcast(ckmsgq->cond); mutex_unlock(ckmsgq->lock); } +void ckwq_add(ckwq_t *ckwq, const void *func, void *data) +{ + ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); + + wqmsg->func = func; + wqmsg->data = data; + + mutex_lock(ckwq->lock); + ckwq->messages++; + DL_APPEND(ckwq->wqmsgs, wqmsg); + pthread_cond_broadcast(ckwq->cond); + mutex_unlock(ckwq->lock); +} + /* Return whether there are any messages queued in the ckmsgq linked list. */ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { diff --git a/src/ckpool.h b/src/ckpool.h index 771ac692..eaf166fd 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -22,13 +22,22 @@ struct ckpool_instance; typedef struct ckpool_instance ckpool_t; +typedef struct ckmsg ckmsg_t; + struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; + ckmsg_t *next; + ckmsg_t *prev; void *data; }; -typedef struct ckmsg ckmsg_t; +typedef struct ckwqmsg ckwqmsg_t; + +struct ckwqmsg { + ckwqmsg_t *next; + ckwqmsg_t *prev; + void *data; + void (*func)(ckpool_t *, void *); +}; struct ckmsgq { ckpool_t *ckp; @@ -43,6 +52,18 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; +struct ckwq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + mutex_t *lock; + pthread_cond_t *cond; + ckwqmsg_t *wqmsgs; + int64_t messages; +}; + +typedef struct ckwq ckwq_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -204,7 +225,9 @@ struct ckpool_instance { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); +void ckwq_add(ckwq_t *ckwq, const void *func, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); ckpool_t *global_ckp; From 2d94b18b99b2cef3bc5dc5c987eff267ff50f123 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 10:17:17 +1100 Subject: [PATCH 16/20] Create a pool of workqueue threads for use by the stratifier using them for share processing, stratum receiving and transaction processing --- src/stratifier.c | 47 ++++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9bf2ab98..442175d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -313,12 +313,10 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; + ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends - ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb - ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations - ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -1644,6 +1642,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } +static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) +{ + int objects, generated; + int64_t memsize; + ckwqmsg_t *wqmsg; + + mutex_lock(ckwq->lock); + DL_COUNT(ckwq->wqmsgs, wqmsg, objects); + generated = ckwq->messages; + mutex_unlock(ckwq->lock); + + memsize = (sizeof(ckwqmsg_t) + size) * objects; + JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); +} + static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -1690,15 +1703,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); + /* Don't know exactly how big the string is so just count the pointer for now */ - ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); - json_set_object(val, "srecvs", subval); + ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); + json_set_object(val, "ckwqs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } - ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); - json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -1706,6 +1718,8 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) return buf; } +static void srecv_process(ckpool_t *ckp, char *buf); + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; @@ -1760,7 +1774,7 @@ retry: /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckmsgq_add(sdata->srecvs, buf); + ckwq_add(sdata->ckwqs, &srecv_process, buf); Close(sockd); buf = NULL; goto retry; @@ -3282,6 +3296,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } +static void sshare_process(ckpool_t *ckp, json_params_t *jp); +static void send_transactions(ckpool_t *ckp, json_params_t *jp); + /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) @@ -3296,7 +3313,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->sshareq, jp); + ckwq_add(sdata->ckwqs, &sshare_process, jp); return; } @@ -3375,7 +3392,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->stxnq, jp); + ckwq_add(sdata->ckwqs, &send_transactions, jp); return; } /* Unhandled message here */ @@ -4391,14 +4408,10 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create half as many share processing threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; - sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - /* Create 1/4 as many stratum processing threads as there are CPUs */ - threads = threads / 2 ? : 1; - sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); + /* Create as many generic workqueue threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN); + sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); From 4c4b48795bdd4fea53762078ad39a57344c3b84f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 10:27:22 +1100 Subject: [PATCH 17/20] Use the generic workqueues for do_update --- src/stratifier.c | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 442175d2..2956ad36 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -800,33 +800,21 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) sdata->gen_priority = 0; } -struct update_req { - pthread_t *pth; - ckpool_t *ckp; - int prio; -}; - static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void *do_update(void *arg) +static void do_update(ckpool_t *ckp, int *prio) { - struct update_req *ur = (struct update_req *)arg; - ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; - int prio = ur->prio; bool ret = false; workbase_t *wb; json_t *val; char *buf; - pthread_detach(pthread_self()); - rename_proc("updater"); - - buf = send_recv_generator(ckp, "getbase", prio); + buf = send_recv_generator(ckp, "getbase", *prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; @@ -886,21 +874,17 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } - dealloc(buf); - free(ur->pth); - free(ur); - return NULL; + free(buf); + free(prio); } static void update_base(ckpool_t *ckp, const int prio) { - struct update_req *ur = ckalloc(sizeof(struct update_req)); - pthread_t *pth = ckalloc(sizeof(pthread_t)); + int *pprio = ckalloc(sizeof(int)); + sdata_t *sdata = ckp->data; - ur->pth = pth; - ur->ckp = ckp; - ur->prio = prio; - create_pthread(pth, do_update, ur); + *pprio = prio; + ckwq_add(sdata->ckwqs, &do_update, pprio); } static void __kill_instance(stratum_instance_t *client) From 5f9f01e89442c85cd927f5fed82af7b08394ec17 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:11:00 +1100 Subject: [PATCH 18/20] Revert to synchronous proc messages in anticipation of new async functions --- src/ckpool.c | 55 ++++++++-------------------------------------------- 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 6b8fa480..2108e515 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -568,32 +568,14 @@ out: static void childsighandler(const int sig); -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -/* Send all one way messages asynchronously so we can wait till the receiving - * end closes the socket to ensure all messages are received but no deadlocks - * can occur with 2 processes waiting for each other's socket closure. */ -void *async_send_proc(void *arg) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = (struct proc_message *)arg; - proc_instance_t *pi = pm->pi; - char *msg = pm->msg; - const char *file = pm->file; - const char *func = pm->func; - int line = pm->line; - char *path = pi->us.path; bool ret = false; int sockd; - pthread_detach(pthread_self()); - if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -604,16 +586,14 @@ void *async_send_proc(void *arg) } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) { + if (unlikely(!pi->pid)) pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - goto out_nofail; - } + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + return; } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s pid %d", - msg, pi->processname, pi->pid); + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); goto out; } sockd = open_unix_client(path); @@ -633,25 +613,6 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -out_nofail: - free(msg); - free(pm); - return NULL; -} - -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - struct proc_message *pm = ckalloc(sizeof(struct proc_message)); - pthread_t pth; - - pm->pi = pi; - pm->msg = strdup(msg); - pm->file = file; - pm->func = func; - pm->line = line; - create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then From 2865a0378fe7e60b5ac9dcc43f6f99421c4bb068 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:15:44 +1100 Subject: [PATCH 19/20] Keep track of per process ckwqs in the ckpool structure --- src/ckpool.h | 2 ++ src/stratifier.c | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ckpool.h b/src/ckpool.h index eaf166fd..a734258a 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -213,6 +213,8 @@ struct ckpool_instance { /* Private data for each process */ void *data; + /* Private generic workqueues if this process has them */ + ckwq_t *ckwqs; }; #ifdef USE_CKDB diff --git a/src/stratifier.c b/src/stratifier.c index 2956ad36..94c70c24 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4394,7 +4394,7 @@ int stratifier(proc_instance_t *pi) sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); /* Create as many generic workqueue threads as there are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN); - sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); From 428cabdfc4c8fe0cf3be17aef5033295eeffb50f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:26:10 +1100 Subject: [PATCH 20/20] Add an asynchronous send proc function which uses each process' generic workqueues if they exist --- src/ckpool.c | 35 +++++++++++++++++++++++++++++++++++ src/ckpool.h | 2 ++ 2 files changed, 37 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 2108e515..1d676aa7 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -615,6 +615,41 @@ out: } } +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) +{ + _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); + free(pm->msg); + free(pm); +} + +/* Fore sending asynchronous messages to another process, the sending process + * must have ckwqs of its own, referenced in the ckpool structure */ +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + struct proc_message *pm; + + if (unlikely(!ckp->ckwqs)) { + LOGALERT("Workqueues not set up in async_send_proc!"); + _send_proc(pi, msg, file, func, line); + return; + } + pm = ckzalloc(sizeof(struct proc_message)); + pm->pi = pi; + pm->msg = strdup(msg); + pm->file = file; + pm->func = func; + pm->line = line; + ckwq_add(ckp->ckwqs, &asp_send, pm); +} + /* Send a single message to a process instance and retrieve the response, then * close the socket. */ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) diff --git a/src/ckpool.h b/src/ckpool.h index a734258a..74dc5e3e 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -239,6 +239,8 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);