From 3dfe1796051b0a347c5167d038c2eefe33f0508a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Dec 2016 22:11:47 +1100 Subject: [PATCH] Deprecate send_proc by adding messages to other proc instances directly to their message queues for now. --- src/ckpool.c | 39 ++++++++++++++------------------------- src/ckpool.h | 4 ++-- src/generator.c | 2 +- src/stratifier.c | 6 +++--- 4 files changed, 20 insertions(+), 31 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index bb665da1..307b3d89 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -709,37 +709,26 @@ out: return ret; } -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* We used to send messages between each proc_instance via unix sockets when + * ckpool was a multi-process model but that is no longer required so we can + * place the messages directly on the other proc_instance's queue until we + * deprecate this mechanism. */ +void _queue_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - char *path = pi->us.path; - bool ret = false; - int sockd; + unix_msg_t *umsg; if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to %s in send_proc", pi->processname); + LOGWARNING("Null msg passed to queue_proc from %s %s:%d", file, func, line); return; } + umsg = ckalloc(sizeof(unix_msg_t)); + umsg->sockd = -1; + umsg->buf = strdup(msg); - if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); - goto out; - } - - sockd = open_unix_client(path); - if (unlikely(sockd < 0)) { - LOGWARNING("Failed to open socket %s", path); - goto out; - } - if (unlikely(!send_unix_msg(sockd, msg))) - LOGWARNING("Failed to send %s to socket %s", msg, path); - else - ret = true; - Close(sockd); -out: - if (unlikely(!ret)) - LOGERR("Failure in send_proc from %s %s:%d", file, func, line); + mutex_lock(&pi->rmsg_lock); + DL_APPEND(pi->unix_msgs, umsg); + pthread_cond_signal(&pi->rmsg_cond); + mutex_unlock(&pi->rmsg_lock); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 59c0c9c7..7c4c4274 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -342,8 +342,8 @@ void empty_buffer(connsock_t *cs); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int read_socket_line(connsock_t *cs, float *timeout); -void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define send_proc(pi, msg) _send_proc(&(pi), msg, __FILE__, __func__, __LINE__) +void _queue_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define send_proc(pi, msg) _queue_proc(&(pi), msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(&(pi), msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__) diff --git a/src/generator.c b/src/generator.c index f324b44d..3a678efc 100644 --- a/src/generator.c +++ b/src/generator.c @@ -787,7 +787,7 @@ out: static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni); -static void reconnect_generator(const ckpool_t *ckp) +static void reconnect_generator(ckpool_t *ckp) { send_proc(ckp->generator, "reconnect"); } diff --git a/src/stratifier.c b/src/stratifier.c index 3b4ba61f..f034cbe1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1155,7 +1155,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio) return buf; } -static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) +static void send_generator(ckpool_t *ckp, const char *msg, const int prio) { sdata_t *sdata = ckp->sdata; bool set; @@ -2342,7 +2342,7 @@ static int64_t best_userproxy_headroom(sdata_t *sdata, const int userid) static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); -static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int recruits) +static void generator_recruit(ckpool_t *ckp, const int proxyid, const int recruits) { char buf[256]; @@ -4529,7 +4529,7 @@ static proxy_t *__best_subproxy(proxy_t *proxy) * in proxy mode where we find a subproxy based on the current proxy with room * for more clients. Signal the generator to recruit more subproxies if we are * running out of room. */ -static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) +static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) { proxy_t *global, *proxy, *tmp, *best = NULL;