diff --git a/src/ckpool.c b/src/ckpool.c index e7322ee3..161ae8e1 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -202,6 +202,62 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } +static void childsighandler(const int sig); + +/* Create a standalone thread that queues received unix messages for a proc + * instance and adds them to linked list of received messages with their + * associated receive socket, then signal the associated rmsg_cond for the + * process to know we have more queued messages. The unix_msg_t ram must be + * freed by the code that removes the entry from the list. */ +static void *unix_receiver(void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + int rsockd = pi->us.sockd, sockd; + char qname[16]; + + sprintf(qname, "%cunixrq", pi->processname[0]); + rename_proc(qname); + pthread_detach(pthread_self()); + + while (42) { + unix_msg_t *umsg; + char *buf; + + sockd = accept(rsockd, NULL, NULL); + if (unlikely(sockd < 0)) { + LOGEMERG("Failed to accept on %s socket, exiting", qname); + childsighandler(15); + break; + } + buf = recv_unix_msg(sockd); + if (unlikely(!buf)) { + Close(sockd); + LOGWARNING("Failed to get message on %s socket", qname); + continue; + } + umsg = ckalloc(sizeof(unix_msg_t)); + umsg->sockd = sockd; + umsg->buf = buf; + + mutex_lock(&pi->rmsg_lock); + DL_APPEND(pi->unix_msgs, umsg); + pthread_cond_signal(&pi->rmsg_cond); + mutex_unlock(&pi->rmsg_lock); + } + + return NULL; +} + +void create_unix_receiver(proc_instance_t *pi) +{ + pthread_t pth; + + mutex_init(&pi->rmsg_lock); + cond_init(&pi->rmsg_cond); + + create_pthread(&pth, unix_receiver, pi); +} + static void broadcast_proc(ckpool_t *ckp, const char *buf) { int i; @@ -496,8 +552,6 @@ out: return ret; } -static void childsighandler(const int sig); - /* Send a single message to a process instance when there will be no response, * closing the socket immediately. */ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) diff --git a/src/ckpool.h b/src/ckpool.h index fd192f97..08eed951 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -30,6 +30,15 @@ struct ckmsg { typedef struct ckmsg ckmsg_t; +typedef struct unix_msg unix_msg_t; + +struct unix_msg { + unix_msg_t *next; + unix_msg_t *prev; + int sockd; + char *buf; +}; + struct ckmsgq { ckpool_t *ckp; char name[16]; @@ -51,6 +60,11 @@ struct proc_instance { int pid; int oldpid; int (*process)(proc_instance_t *); + + /* Linked list of received messages, locking and conditional */ + unix_msg_t *unix_msgs; + mutex_t rmsg_lock; + pthread_cond_t rmsg_cond; }; struct connsock { @@ -219,6 +233,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); +void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp;