diff --git a/src/ckpool.c b/src/ckpool.c index 44fcc5ae..16f8d4b9 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -203,6 +203,86 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } +static void childsighandler(const int sig); + +/* Create a standalone thread that queues received unix messages for a proc + * instance and adds them to linked list of received messages with their + * associated receive socket, then signal the associated rmsg_cond for the + * process to know we have more queued messages. The unix_msg_t ram must be + * freed by the code that removes the entry from the list. */ +static void *unix_receiver(void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + int rsockd = pi->us.sockd, sockd; + char qname[16]; + + sprintf(qname, "%cunixrq", pi->processname[0]); + rename_proc(qname); + pthread_detach(pthread_self()); + + while (42) { + unix_msg_t *umsg; + char *buf; + + sockd = accept(rsockd, NULL, NULL); + if (unlikely(sockd < 0)) { + LOGEMERG("Failed to accept on %s socket, exiting", qname); + childsighandler(15); + break; + } + buf = recv_unix_msg(sockd); + if (unlikely(!buf)) { + Close(sockd); + LOGWARNING("Failed to get message on %s socket", qname); + continue; + } + umsg = ckalloc(sizeof(unix_msg_t)); + umsg->sockd = sockd; + umsg->buf = buf; + + mutex_lock(&pi->rmsg_lock); + DL_APPEND(pi->unix_msgs, umsg); + pthread_cond_signal(&pi->rmsg_cond); + mutex_unlock(&pi->rmsg_lock); + } + + return NULL; +} + +/* Get the next message in the receive queue, or wait up to 5 seconds for + * the next message, returning NULL if no message is received in that time. */ +unix_msg_t *get_unix_msg(proc_instance_t *pi) +{ + unix_msg_t *umsg; + + mutex_lock(&pi->rmsg_lock); + if (!pi->unix_msgs) { + tv_t now; + ts_t abs; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += 5; + cond_timedwait(&pi->rmsg_cond, &pi->rmsg_lock, &abs); + } + umsg = pi->unix_msgs; + if (umsg) + DL_DELETE(pi->unix_msgs, umsg); + mutex_unlock(&pi->rmsg_lock); + + return umsg; +} + +void create_unix_receiver(proc_instance_t *pi) +{ + pthread_t pth; + + mutex_init(&pi->rmsg_lock); + cond_init(&pi->rmsg_cond); + + create_pthread(&pth, unix_receiver, pi); +} + static void broadcast_proc(ckpool_t *ckp, const char *buf) { int i; @@ -518,45 +598,31 @@ out: return ret; } -static void childsighandler(const int sig); - -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -/* Send all one way messages asynchronously so we can wait till the receiving - * end closes the socket to ensure all messages are received but no deadlocks - * can occur with 2 processes waiting for each other's socket closure. */ -void *async_send_proc(void *arg) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = (struct proc_message *)arg; - proc_instance_t *pi = pm->pi; - char *msg = pm->msg; - const char *file = pm->file; - const char *func = pm->func; - int line = pm->line; - char *path = pi->us.path; bool ret = false; int sockd; - pthread_detach(pthread_self()); + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to %s in send_proc", pi->processname); + return; + } if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } + /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); if (!pi->pid) { LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - goto out_nofail; + return; } } if (unlikely(kill_pid(pi->pid, 0))) { @@ -573,38 +639,12 @@ void *async_send_proc(void *arg) LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - if (!wait_close(sockd, 5)) - LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -out_nofail: - free(msg); - free(pm); - return NULL; -} - -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - struct proc_message *pm; - pthread_t pth; - - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to %s in send_proc", pi->processname); - return; - } - pm = ckalloc(sizeof(struct proc_message)); - pm->pi = pi; - pm->msg = strdup(msg); - pm->file = file; - pm->func = func; - pm->line = line; - create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index deda5eb2..370bce71 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -29,6 +29,15 @@ struct ckmsg { typedef struct ckmsg ckmsg_t; +typedef struct unix_msg unix_msg_t; + +struct unix_msg { + unix_msg_t *next; + unix_msg_t *prev; + int sockd; + char *buf; +}; + struct ckmsgq { ckpool_t *ckp; char name[16]; @@ -42,6 +51,8 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; +typedef struct proc_instance proc_instance_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -50,6 +61,11 @@ struct proc_instance { int pid; int oldpid; int (*process)(proc_instance_t *); + + /* Linked list of received messages, locking and conditional */ + unix_msg_t *unix_msgs; + mutex_t rmsg_lock; + pthread_cond_t rmsg_cond; }; struct connsock { @@ -219,6 +235,8 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); +unix_msg_t *get_unix_msg(proc_instance_t *pi); +void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp; diff --git a/src/connector.c b/src/connector.c index 682ec8b5..ff0a75a8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -764,25 +764,22 @@ static char *connector_stats(cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int sockd = -1, ret = 0, selret; int64_t client_id64, client_id; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; uint8_t test_cycle = 0; - char *buf = NULL; - - do { - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { - LOGEMERG("Connector failed to ping main process, exiting"); - ret = 1; - goto out; - } - } while (selret < 1); + char *buf; + int ret = 0; LOGWARNING("%s connector ready", ckp->name); retry: + if (umsg) { + Close(umsg->sockd); + free(umsg->buf); + dealloc(umsg); + } + if (!++test_cycle) { /* Test for pthread join every 256 messages */ if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { @@ -797,21 +794,11 @@ retry: } } - Close(sockd); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGEMERG("Failed to accept on connector socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (!buf) { - LOGWARNING("Failed to get message in connector_loop"); - goto retry; - } + do { + umsg = get_unix_msg(pi); + } while (!umsg); + buf = umsg->buf; LOGDEBUG("Connector received message: %s", buf); /* The bulk of the messages will be json messages to send to clients * so look for them first. */ @@ -848,7 +835,7 @@ retry: stratifier_drop_client(ckp, client_id); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Connector received ping request"); - send_unix_msg(sockd, "pong"); + send_unix_msg(umsg->sockd, "pong"); } else if (cmdmatch(buf, "accept")) { LOGDEBUG("Connector received accept signal"); cdata->accept = true; @@ -860,7 +847,7 @@ retry: LOGDEBUG("Connector received stats request"); msg = connector_stats(cdata); - send_unix_msg(sockd, msg); + send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { @@ -885,13 +872,11 @@ retry: sscanf(buf, "getxfd%d", &fdno); if (fdno > -1 && fdno < ckp->serverurls) - send_fd(cdata->serverfd[fdno], sockd); + send_fd(cdata->serverfd[fdno], umsg->sockd); } else LOGWARNING("Unhandled connector message: %s", buf); goto retry; out: - Close(sockd); - dealloc(buf); return ret; } @@ -1003,6 +988,8 @@ int connector(proc_instance_t *pi) create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + create_unix_receiver(pi); + ret = connector_loop(pi, cdata); out: dealloc(ckp->data); diff --git a/src/libckpool.h b/src/libckpool.h index 28ea14c9..4fa68047 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -309,9 +309,6 @@ struct unixsock { typedef struct unixsock unixsock_t; -typedef struct proc_instance proc_instance_t; - - void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); #define json_check(VAL, ERR) _json_check(VAL, ERR, __FILE__, __func__, __LINE__) diff --git a/src/stratifier.c b/src/stratifier.c index ea7b6ba1..e0623908 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2983,13 +2983,18 @@ static void get_poolstats(sdata_t *sdata, int *sockd) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { - int sockd, ret = 0, selret = 0; sdata_t *sdata = ckp->data; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; tv_t start_tv = {0, 0}; - char *buf = NULL; + int ret = 0; + char *buf; retry: + if (umsg) { + free(umsg->buf); + dealloc(umsg); + } + do { double tdiff; tv_t end_tv; @@ -3008,41 +3013,29 @@ retry: broadcast_ping(sdata); } } - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { - LOGEMERG("Generator failed to ping main process, exiting"); + + umsg = get_unix_msg(pi); + if (unlikely(!umsg &&!ping_main(ckp))) { + LOGEMERG("Stratifier failed to ping main process, exiting"); ret = 1; goto out; } - } while (selret < 1); + } while (!umsg); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGERR("Failed to accept on stratifier socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (unlikely(!buf)) { - Close(sockd); - LOGWARNING("Failed to get message in stratum_loop"); - goto retry; - } + buf = umsg->buf; if (likely(buf[0] == '{')) { /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - Close(sockd); - ckmsgq_add(sdata->srecvs, buf); - buf = NULL; + Close(umsg->sockd); + ckmsgq_add(sdata->srecvs, umsg->buf); + umsg->buf = NULL; goto retry; } if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); - send_unix_msg(sockd, "pong"); - Close(sockd); + send_unix_msg(umsg->sockd, "pong"); + Close(umsg->sockd); goto retry; } if (cmdmatch(buf, "stats")) { @@ -3050,69 +3043,69 @@ retry: LOGDEBUG("Stratifier received stats request"); msg = stratifier_stats(ckp, sdata); - send_unix_msg(sockd, msg); - Close(sockd); + send_unix_msg(umsg->sockd, msg); + Close(umsg->sockd); goto retry; } /* Parse API commands here to return a message to sockd */ if (cmdmatch(buf, "clients")) { - getclients(sdata, &sockd); + getclients(sdata, &umsg->sockd); goto retry; } if (cmdmatch(buf, "workers")) { - getworkers(sdata, &sockd); + getworkers(sdata, &umsg->sockd); goto retry; } if (cmdmatch(buf, "users")) { - getusers(sdata, &sockd); + getusers(sdata, &umsg->sockd); goto retry; } if (cmdmatch(buf, "getclient")) { - getclient(sdata, buf + 10, &sockd); + getclient(sdata, buf + 10, &umsg->sockd); goto retry; } if (cmdmatch(buf, "getuser")) { - getuser(sdata, buf + 8, &sockd); + getuser(sdata, buf + 8, &umsg->sockd); goto retry; } if (cmdmatch(buf, "getworker")) { - getworker(sdata, buf + 10, &sockd); + getworker(sdata, buf + 10, &umsg->sockd); goto retry; } if (cmdmatch(buf, "userclients")) { - userclients(sdata, buf + 12, &sockd); + userclients(sdata, buf + 12, &umsg->sockd); goto retry; } if (cmdmatch(buf, "workerclients")) { - workerclients(sdata, buf + 14, &sockd); + workerclients(sdata, buf + 14, &umsg->sockd); goto retry; } if (cmdmatch(buf, "getproxy")) { - getproxy(sdata, buf + 9, &sockd); + getproxy(sdata, buf + 9, &umsg->sockd); goto retry; } if (cmdmatch(buf, "setproxy")) { - setproxy(sdata, buf + 9, &sockd); + setproxy(sdata, buf + 9, &umsg->sockd); goto retry; } if (cmdmatch(buf, "poolstats")) { - get_poolstats(sdata, &sockd); + get_poolstats(sdata, &umsg->sockd); goto retry; } if (cmdmatch(buf, "proxyinfo")) { - proxyinfo(sdata, buf + 10, &sockd); + proxyinfo(sdata, buf + 10, &umsg->sockd); goto retry; } if (cmdmatch(buf, "ucinfo")) { - user_clientinfo(sdata, buf + 7, &sockd); + user_clientinfo(sdata, buf + 7, &umsg->sockd); goto retry; } if (cmdmatch(buf, "wcinfo")) { - worker_clientinfo(sdata, buf + 7, &sockd); + worker_clientinfo(sdata, buf + 7, &umsg->sockd); goto retry; } - Close(sockd); + Close(umsg->sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; @@ -3160,7 +3153,6 @@ retry: goto retry; out: - dealloc(buf); return ret; } @@ -6011,6 +6003,8 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); + create_unix_receiver(pi); + LOGWARNING("%s stratifier ready", ckp->name); ret = stratum_loop(ckp, pi);