diff --git a/src/ckpool.c b/src/ckpool.c index d3516b1a..70725e08 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -202,6 +202,86 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } +static void childsighandler(const int sig); + +/* Create a standalone thread that queues received unix messages for a proc + * instance and adds them to linked list of received messages with their + * associated receive socket, then signal the associated rmsg_cond for the + * process to know we have more queued messages. The unix_msg_t ram must be + * freed by the code that removes the entry from the list. */ +static void *unix_receiver(void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + int rsockd = pi->us.sockd, sockd; + char qname[16]; + + sprintf(qname, "%cunixrq", pi->processname[0]); + rename_proc(qname); + pthread_detach(pthread_self()); + + while (42) { + unix_msg_t *umsg; + char *buf; + + sockd = accept(rsockd, NULL, NULL); + if (unlikely(sockd < 0)) { + LOGEMERG("Failed to accept on %s socket, exiting", qname); + childsighandler(15); + break; + } + buf = recv_unix_msg(sockd); + if (unlikely(!buf)) { + Close(sockd); + LOGWARNING("Failed to get message on %s socket", qname); + continue; + } + umsg = ckalloc(sizeof(unix_msg_t)); + umsg->sockd = sockd; + umsg->buf = buf; + + mutex_lock(&pi->rmsg_lock); + DL_APPEND(pi->unix_msgs, umsg); + pthread_cond_signal(&pi->rmsg_cond); + mutex_unlock(&pi->rmsg_lock); + } + + return NULL; +} + +/* Get the next message in the receive queue, or wait up to 5 seconds for + * the next message, returning NULL if no message is received in that time. */ +unix_msg_t *get_unix_msg(proc_instance_t *pi) +{ + unix_msg_t *umsg; + + mutex_lock(&pi->rmsg_lock); + if (!pi->unix_msgs) { + tv_t now; + ts_t abs; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += 5; + cond_timedwait(&pi->rmsg_cond, &pi->rmsg_lock, &abs); + } + umsg = pi->unix_msgs; + if (umsg) + DL_DELETE(pi->unix_msgs, umsg); + mutex_unlock(&pi->rmsg_lock); + + return umsg; +} + +void create_unix_receiver(proc_instance_t *pi) +{ + pthread_t pth; + + mutex_init(&pi->rmsg_lock); + cond_init(&pi->rmsg_cond); + + create_pthread(&pth, unix_receiver, pi); +} + static void broadcast_proc(ckpool_t *ckp, const char *buf) { int i; @@ -496,45 +576,31 @@ out: return ret; } -static void childsighandler(const int sig); - -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -/* Send all one way messages asynchronously so we can wait till the receiving - * end closes the socket to ensure all messages are received but no deadlocks - * can occur with 2 processes waiting for each other's socket closure. */ -void *async_send_proc(void *arg) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = (struct proc_message *)arg; - proc_instance_t *pi = pm->pi; - char *msg = pm->msg; - const char *file = pm->file; - const char *func = pm->func; - int line = pm->line; - char *path = pi->us.path; bool ret = false; int sockd; - pthread_detach(pthread_self()); + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to %s in send_proc", pi->processname); + return; + } if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } + /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); if (!pi->pid) { LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - goto out_nofail; + return; } } if (unlikely(kill_pid(pi->pid, 0))) { @@ -551,38 +617,12 @@ void *async_send_proc(void *arg) LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - if (!wait_close(sockd, 5)) - LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -out_nofail: - free(msg); - free(pm); - return NULL; -} - -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - struct proc_message *pm; - pthread_t pth; - - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to %s in send_proc", pi->processname); - return; - } - pm = ckalloc(sizeof(struct proc_message)); - pm->pi = pi; - pm->msg = strdup(msg); - pm->file = file; - pm->func = func; - pm->line = line; - create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index fd192f97..2388aedd 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -30,6 +30,15 @@ struct ckmsg { typedef struct ckmsg ckmsg_t; +typedef struct unix_msg unix_msg_t; + +struct unix_msg { + unix_msg_t *next; + unix_msg_t *prev; + int sockd; + char *buf; +}; + struct ckmsgq { ckpool_t *ckp; char name[16]; @@ -43,6 +52,8 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; +typedef struct proc_instance proc_instance_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -51,6 +62,11 @@ struct proc_instance { int pid; int oldpid; int (*process)(proc_instance_t *); + + /* Linked list of received messages, locking and conditional */ + unix_msg_t *unix_msgs; + mutex_t rmsg_lock; + pthread_cond_t rmsg_cond; }; struct connsock { @@ -219,6 +235,8 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); +unix_msg_t *get_unix_msg(proc_instance_t *pi); +void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp; diff --git a/src/connector.c b/src/connector.c index 424483a8..43f268e5 100644 --- a/src/connector.c +++ b/src/connector.c @@ -768,25 +768,22 @@ static char *connector_stats(cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int sockd = -1, ret = 0, selret; int64_t client_id64, client_id; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; uint8_t test_cycle = 0; - char *buf = NULL; - - do { - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { - LOGEMERG("Connector failed to ping main process, exiting"); - ret = 1; - goto out; - } - } while (selret < 1); + char *buf; + int ret = 0; LOGWARNING("%s connector ready", ckp->name); retry: + if (umsg) { + Close(umsg->sockd); + free(umsg->buf); + dealloc(umsg); + } + if (!++test_cycle) { /* Test for pthread join every 256 messages */ if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { @@ -801,21 +798,11 @@ retry: } } - Close(sockd); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGEMERG("Failed to accept on connector socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (!buf) { - LOGWARNING("Failed to get message in connector_loop"); - goto retry; - } + do { + umsg = get_unix_msg(pi); + } while (!umsg); + buf = umsg->buf; LOGDEBUG("Connector received message: %s", buf); /* The bulk of the messages will be json messages to send to clients * so look for them first. */ @@ -841,7 +828,7 @@ retry: LOGINFO("Connector dropped client id: %"PRId64, client_id); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Connector received ping request"); - send_unix_msg(sockd, "pong"); + send_unix_msg(umsg->sockd, "pong"); } else if (cmdmatch(buf, "accept")) { LOGDEBUG("Connector received accept signal"); cdata->accept = true; @@ -853,7 +840,7 @@ retry: LOGDEBUG("Connector received stats request"); msg = connector_stats(cdata); - send_unix_msg(sockd, msg); + send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { @@ -878,13 +865,11 @@ retry: sscanf(buf, "getxfd%d", &fdno); if (fdno > -1 && fdno < ckp->serverurls) - send_fd(cdata->serverfd[fdno], sockd); + send_fd(cdata->serverfd[fdno], umsg->sockd); } else LOGWARNING("Unhandled connector message: %s", buf); goto retry; out: - Close(sockd); - dealloc(buf); return ret; } @@ -996,6 +981,8 @@ int connector(proc_instance_t *pi) create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + create_unix_receiver(pi); + ret = connector_loop(pi, cdata); out: dealloc(ckp->data); diff --git a/src/libckpool.h b/src/libckpool.h index 19158427..8901e3be 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -312,9 +312,6 @@ struct unixsock { typedef struct unixsock unixsock_t; -typedef struct proc_instance proc_instance_t; - - void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); #define json_check(VAL, ERR) _json_check(VAL, ERR, __FILE__, __func__, __LINE__) diff --git a/src/stratifier.c b/src/stratifier.c index 7ea1497f..500cb835 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1737,13 +1737,18 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { - int sockd, ret = 0, selret = 0; sdata_t *sdata = ckp->data; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; tv_t start_tv = {0, 0}; - char *buf = NULL; + int ret = 0; + char *buf; retry: + if (umsg) { + free(umsg->buf); + dealloc(umsg); + } + do { double tdiff; tv_t end_tv; @@ -1761,43 +1766,30 @@ retry: ckp->update_interval); broadcast_ping(sdata); } - continue; } - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { - LOGEMERG("Generator failed to ping main process, exiting"); + + umsg = get_unix_msg(pi); + if (unlikely(!umsg &&!ping_main(ckp))) { + LOGEMERG("Stratifier failed to ping main process, exiting"); ret = 1; goto out; } - } while (selret < 1); + } while (!umsg); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGERR("Failed to accept on stratifier socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (unlikely(!buf)) { - Close(sockd); - LOGWARNING("Failed to get message in stratum_loop"); - goto retry; - } + buf = umsg->buf; if (likely(buf[0] == '{')) { /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckmsgq_add(sdata->srecvs, buf); - Close(sockd); - buf = NULL; + Close(umsg->sockd); + ckmsgq_add(sdata->srecvs, umsg->buf); + umsg->buf = NULL; goto retry; } if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); - send_unix_msg(sockd, "pong"); - Close(sockd); + send_unix_msg(umsg->sockd, "pong"); + Close(umsg->sockd); goto retry; } if (cmdmatch(buf, "stats")) { @@ -1805,12 +1797,12 @@ retry: LOGDEBUG("Stratifier received stats request"); msg = stratifier_stats(ckp, sdata); - send_unix_msg(sockd, msg); - Close(sockd); + send_unix_msg(umsg->sockd, msg); + Close(umsg->sockd); goto retry; } - Close(sockd); + Close(umsg->sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; @@ -1848,7 +1840,6 @@ retry: goto retry; out: - dealloc(buf); return ret; } @@ -4535,6 +4526,8 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); + create_unix_receiver(pi); + LOGWARNING("%s stratifier ready", ckp->name); ret = stratum_loop(ckp, pi);