Browse Source

Create a set of generic queue receiving thread and processing functions

master
ckolivas 11 years ago
parent
commit
50463445c0
  1. 52
      src/libckpool.c
  2. 21
      src/libckpool.h

52
src/libckpool.c

@ -32,6 +32,7 @@
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
#include "sha2.h" #include "sha2.h"
#include "utlist.h"
#ifndef UNIX_PATH_MAX #ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108 #define UNIX_PATH_MAX 108
@ -75,6 +76,57 @@ void join_pthread(pthread_t thread)
pthread_join(thread, NULL); pthread_join(thread, NULL);
} }
/* Generic function for creating a message queue receiving and parsing thread */
void *ckmsg_queue(void *arg)
{
ckmsgq_t *ckmsgq = (ckmsgq_t *)arg;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckmsg_t *msg;
mutex_lock(&ckmsgq->lock);
if (!ckmsgq->msgs)
pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock);
msg = ckmsgq->msgs;
if (likely(msg))
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
if (unlikely(!msg))
continue;
ckmsgq->func(msg);
free(msg);
}
return NULL;
}
void create_ckmsgq(const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func;
mutex_init(&ckmsgq->lock);
cond_init(&ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
}
/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq
* parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
{
ckmsg_t *msg = ckalloc(sizeof(ckmsg_t));
msg->data = data;
mutex_lock(&ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond);
mutex_unlock(&ckmsgq->lock);
}
/* Place holders for when we add lock debugging */ /* Place holders for when we add lock debugging */
#define GETLOCK(_lock, _file, _func, _line) #define GETLOCK(_lock, _file, _func, _line)

21
src/libckpool.h

@ -32,6 +32,8 @@
# include <sys/endian.h> # include <sys/endian.h>
#endif #endif
#include "utlist.h"
#ifndef bswap_16 #ifndef bswap_16
#define bswap_16 __builtin_bswap16 #define bswap_16 __builtin_bswap16
#define bswap_32 __builtin_bswap32 #define bswap_32 __builtin_bswap32
@ -260,6 +262,25 @@ typedef struct unixsock unixsock_t;
typedef struct proc_instance proc_instance_t; typedef struct proc_instance proc_instance_t;
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(void *);
};
typedef struct ckmsgq ckmsgq_t;
/* No error checking with these, make sure we know they're valid already! */ /* No error checking with these, make sure we know they're valid already! */
static inline void json_strcpy(char *buf, json_t *val, const char *key) static inline void json_strcpy(char *buf, json_t *val, const char *key)
{ {

Loading…
Cancel
Save