|
|
|
@ -202,6 +202,86 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void childsighandler(const int sig); |
|
|
|
|
|
|
|
|
|
/* Create a standalone thread that queues received unix messages for a proc
|
|
|
|
|
* instance and adds them to linked list of received messages with their |
|
|
|
|
* associated receive socket, then signal the associated rmsg_cond for the |
|
|
|
|
* process to know we have more queued messages. The unix_msg_t ram must be |
|
|
|
|
* freed by the code that removes the entry from the list. */ |
|
|
|
|
static void *unix_receiver(void *arg) |
|
|
|
|
{ |
|
|
|
|
proc_instance_t *pi = (proc_instance_t *)arg; |
|
|
|
|
int rsockd = pi->us.sockd, sockd; |
|
|
|
|
char qname[16]; |
|
|
|
|
|
|
|
|
|
sprintf(qname, "%cunixrq", pi->processname[0]); |
|
|
|
|
rename_proc(qname); |
|
|
|
|
pthread_detach(pthread_self()); |
|
|
|
|
|
|
|
|
|
while (42) { |
|
|
|
|
unix_msg_t *umsg; |
|
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
sockd = accept(rsockd, NULL, NULL); |
|
|
|
|
if (unlikely(sockd < 0)) { |
|
|
|
|
LOGEMERG("Failed to accept on %s socket, exiting", qname); |
|
|
|
|
childsighandler(15); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
buf = recv_unix_msg(sockd); |
|
|
|
|
if (unlikely(!buf)) { |
|
|
|
|
Close(sockd); |
|
|
|
|
LOGWARNING("Failed to get message on %s socket", qname); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
umsg = ckalloc(sizeof(unix_msg_t)); |
|
|
|
|
umsg->sockd = sockd; |
|
|
|
|
umsg->buf = buf; |
|
|
|
|
|
|
|
|
|
mutex_lock(&pi->rmsg_lock); |
|
|
|
|
DL_APPEND(pi->unix_msgs, umsg); |
|
|
|
|
pthread_cond_signal(&pi->rmsg_cond); |
|
|
|
|
mutex_unlock(&pi->rmsg_lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Get the next message in the receive queue, or wait up to 5 seconds for
|
|
|
|
|
* the next message, returning NULL if no message is received in that time. */ |
|
|
|
|
unix_msg_t *get_unix_msg(proc_instance_t *pi) |
|
|
|
|
{ |
|
|
|
|
unix_msg_t *umsg; |
|
|
|
|
|
|
|
|
|
mutex_lock(&pi->rmsg_lock); |
|
|
|
|
if (!pi->unix_msgs) { |
|
|
|
|
tv_t now; |
|
|
|
|
ts_t abs; |
|
|
|
|
|
|
|
|
|
tv_time(&now); |
|
|
|
|
tv_to_ts(&abs, &now); |
|
|
|
|
abs.tv_sec += 5; |
|
|
|
|
cond_timedwait(&pi->rmsg_cond, &pi->rmsg_lock, &abs); |
|
|
|
|
} |
|
|
|
|
umsg = pi->unix_msgs; |
|
|
|
|
if (umsg) |
|
|
|
|
DL_DELETE(pi->unix_msgs, umsg); |
|
|
|
|
mutex_unlock(&pi->rmsg_lock); |
|
|
|
|
|
|
|
|
|
return umsg; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void create_unix_receiver(proc_instance_t *pi) |
|
|
|
|
{ |
|
|
|
|
pthread_t pth; |
|
|
|
|
|
|
|
|
|
mutex_init(&pi->rmsg_lock); |
|
|
|
|
cond_init(&pi->rmsg_cond); |
|
|
|
|
|
|
|
|
|
create_pthread(&pth, unix_receiver, pi); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void broadcast_proc(ckpool_t *ckp, const char *buf) |
|
|
|
|
{ |
|
|
|
|
int i; |
|
|
|
@ -496,45 +576,31 @@ out:
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void childsighandler(const int sig); |
|
|
|
|
|
|
|
|
|
struct proc_message { |
|
|
|
|
proc_instance_t *pi; |
|
|
|
|
char *msg; |
|
|
|
|
const char *file; |
|
|
|
|
const char *func; |
|
|
|
|
int line; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/* Send all one way messages asynchronously so we can wait till the receiving
|
|
|
|
|
* end closes the socket to ensure all messages are received but no deadlocks |
|
|
|
|
* can occur with 2 processes waiting for each other's socket closure. */ |
|
|
|
|
void *async_send_proc(void *arg) |
|
|
|
|
/* Send a single message to a process instance when there will be no response,
|
|
|
|
|
* closing the socket immediately. */ |
|
|
|
|
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) |
|
|
|
|
{ |
|
|
|
|
struct proc_message *pm = (struct proc_message *)arg; |
|
|
|
|
proc_instance_t *pi = pm->pi; |
|
|
|
|
char *msg = pm->msg; |
|
|
|
|
const char *file = pm->file; |
|
|
|
|
const char *func = pm->func; |
|
|
|
|
int line = pm->line; |
|
|
|
|
|
|
|
|
|
char *path = pi->us.path; |
|
|
|
|
bool ret = false; |
|
|
|
|
int sockd; |
|
|
|
|
|
|
|
|
|
pthread_detach(pthread_self()); |
|
|
|
|
if (unlikely(!msg || !strlen(msg))) { |
|
|
|
|
LOGERR("Attempted to send null message to %s in send_proc", pi->processname); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (unlikely(!path || !strlen(path))) { |
|
|
|
|
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* At startup the pid fields are not set up before some processes are
|
|
|
|
|
* forked so they never inherit them. */ |
|
|
|
|
if (unlikely(!pi->pid)) { |
|
|
|
|
pi->pid = get_proc_pid(pi); |
|
|
|
|
if (!pi->pid) { |
|
|
|
|
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); |
|
|
|
|
goto out_nofail; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (unlikely(kill_pid(pi->pid, 0))) { |
|
|
|
@ -551,38 +617,12 @@ void *async_send_proc(void *arg)
|
|
|
|
|
LOGWARNING("Failed to send %s to socket %s", msg, path); |
|
|
|
|
else |
|
|
|
|
ret = true; |
|
|
|
|
if (!wait_close(sockd, 5)) |
|
|
|
|
LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); |
|
|
|
|
Close(sockd); |
|
|
|
|
out: |
|
|
|
|
if (unlikely(!ret)) { |
|
|
|
|
LOGERR("Failure in send_proc from %s %s:%d", file, func, line); |
|
|
|
|
childsighandler(15); |
|
|
|
|
} |
|
|
|
|
out_nofail: |
|
|
|
|
free(msg); |
|
|
|
|
free(pm); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Send a single message to a process instance when there will be no response,
|
|
|
|
|
* closing the socket immediately. */ |
|
|
|
|
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) |
|
|
|
|
{ |
|
|
|
|
struct proc_message *pm; |
|
|
|
|
pthread_t pth; |
|
|
|
|
|
|
|
|
|
if (unlikely(!msg || !strlen(msg))) { |
|
|
|
|
LOGERR("Attempted to send null message to %s in send_proc", pi->processname); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
pm = ckalloc(sizeof(struct proc_message)); |
|
|
|
|
pm->pi = pi; |
|
|
|
|
pm->msg = strdup(msg); |
|
|
|
|
pm->file = file; |
|
|
|
|
pm->func = func; |
|
|
|
|
pm->line = line; |
|
|
|
|
create_pthread(&pth, async_send_proc, pm); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Send a single message to a process instance and retrieve the response, then
|
|
|
|
|