Browse Source

Create basic unix receive message thread for queueing messages to be handled asynchronously

master
Con Kolivas 10 years ago
parent
commit
ba7e48c492
  1. 58
      src/ckpool.c
  2. 15
      src/ckpool.h

58
src/ckpool.c

@ -202,6 +202,62 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
return ret; return ret;
} }
static void childsighandler(const int sig);
/* Create a standalone thread that queues received unix messages for a proc
* instance and adds them to linked list of received messages with their
* associated receive socket, then signal the associated rmsg_cond for the
* process to know we have more queued messages. The unix_msg_t ram must be
* freed by the code that removes the entry from the list. */
static void *unix_receiver(void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
int rsockd = pi->us.sockd, sockd;
char qname[16];
sprintf(qname, "%cunixrq", pi->processname[0]);
rename_proc(qname);
pthread_detach(pthread_self());
while (42) {
unix_msg_t *umsg;
char *buf;
sockd = accept(rsockd, NULL, NULL);
if (unlikely(sockd < 0)) {
LOGEMERG("Failed to accept on %s socket, exiting", qname);
childsighandler(15);
break;
}
buf = recv_unix_msg(sockd);
if (unlikely(!buf)) {
Close(sockd);
LOGWARNING("Failed to get message on %s socket", qname);
continue;
}
umsg = ckalloc(sizeof(unix_msg_t));
umsg->sockd = sockd;
umsg->buf = buf;
mutex_lock(&pi->rmsg_lock);
DL_APPEND(pi->unix_msgs, umsg);
pthread_cond_signal(&pi->rmsg_cond);
mutex_unlock(&pi->rmsg_lock);
}
return NULL;
}
void create_unix_receiver(proc_instance_t *pi)
{
pthread_t pth;
mutex_init(&pi->rmsg_lock);
cond_init(&pi->rmsg_cond);
create_pthread(&pth, unix_receiver, pi);
}
static void broadcast_proc(ckpool_t *ckp, const char *buf) static void broadcast_proc(ckpool_t *ckp, const char *buf)
{ {
int i; int i;
@ -496,8 +552,6 @@ out:
return ret; return ret;
} }
static void childsighandler(const int sig);
/* Send a single message to a process instance when there will be no response, /* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */ * closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)

15
src/ckpool.h

@ -30,6 +30,15 @@ struct ckmsg {
typedef struct ckmsg ckmsg_t; typedef struct ckmsg ckmsg_t;
typedef struct unix_msg unix_msg_t;
struct unix_msg {
unix_msg_t *next;
unix_msg_t *prev;
int sockd;
char *buf;
};
struct ckmsgq { struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
char name[16]; char name[16];
@ -51,6 +60,11 @@ struct proc_instance {
int pid; int pid;
int oldpid; int oldpid;
int (*process)(proc_instance_t *); int (*process)(proc_instance_t *);
/* Linked list of received messages, locking and conditional */
unix_msg_t *unix_msgs;
mutex_t rmsg_lock;
pthread_cond_t rmsg_cond;
}; };
struct connsock { struct connsock {
@ -219,6 +233,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);
void create_unix_receiver(proc_instance_t *pi);
ckpool_t *global_ckp; ckpool_t *global_ckp;

Loading…
Cancel
Save