Browse Source

Create generic workqueue function and message receiving and parsing helpers

master
Con Kolivas 10 years ago
parent
commit
adec278e7c
  1. 72
      src/ckpool.c
  2. 29
      src/ckpool.h

72
src/ckpool.c

@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg)
return NULL;
}
/* Generic workqueue function and message receiving and parsing thread */
static void *ckwq_queue(void *arg)
{
ckwq_t *ckmsgq = (ckwq_t *)arg;
ckpool_t *ckp = ckmsgq->ckp;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckwqmsg_t *wqmsg;
tv_t now;
ts_t abs;
mutex_lock(ckmsgq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckmsgq->wqmsgs)
cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
wqmsg = ckmsgq->wqmsgs;
if (wqmsg)
DL_DELETE(ckmsgq->wqmsgs, wqmsg);
mutex_unlock(ckmsgq->lock);
if (!wqmsg)
continue;
wqmsg->func(ckp, wqmsg->data);
free(wqmsg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
return ckmsgq;
}
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count)
{
ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count);
mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckwq[i].name, 15, "%.6swq%d", name, i);
ckwq[i].ckp = ckp;
ckwq[i].lock = lock;
ckwq[i].cond = cond;
create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]);
}
return ckwq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
mutex_lock(ckmsgq->lock);
ckmsgq->messages++;
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(ckmsgq->cond);
pthread_cond_broadcast(ckmsgq->cond);
mutex_unlock(ckmsgq->lock);
}
void ckwq_add(ckwq_t *ckwq, const void *func, void *data)
{
ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t));
wqmsg->func = func;
wqmsg->data = data;
mutex_lock(ckwq->lock);
ckwq->messages++;
DL_APPEND(ckwq->wqmsgs, wqmsg);
pthread_cond_broadcast(ckwq->cond);
mutex_unlock(ckwq->lock);
}
/* Return whether there are any messages queued in the ckmsgq linked list. */
bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{

29
src/ckpool.h

@ -21,13 +21,22 @@
typedef struct ckpool_instance ckpool_t;
typedef struct ckmsg ckmsg_t;
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
ckmsg_t *next;
ckmsg_t *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
typedef struct ckwqmsg ckwqmsg_t;
struct ckwqmsg {
ckwqmsg_t *next;
ckwqmsg_t *prev;
void *data;
void (*func)(ckpool_t *, void *);
};
struct ckmsgq {
ckpool_t *ckp;
@ -42,6 +51,18 @@ struct ckmsgq {
typedef struct ckmsgq ckmsgq_t;
struct ckwq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
mutex_t *lock;
pthread_cond_t *cond;
ckwqmsg_t *wqmsgs;
int64_t messages;
};
typedef struct ckwq ckwq_t;
struct proc_instance {
ckpool_t *ckp;
unixsock_t us;
@ -201,7 +222,9 @@ struct ckpool_instance {
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
void ckwq_add(ckwq_t *ckwq, const void *func, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq);
ckpool_t *global_ckp;

Loading…
Cancel
Save