|
|
|
@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg)
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Generic workqueue function and message receiving and parsing thread */ |
|
|
|
|
static void *ckwq_queue(void *arg) |
|
|
|
|
{ |
|
|
|
|
ckwq_t *ckmsgq = (ckwq_t *)arg; |
|
|
|
|
ckpool_t *ckp = ckmsgq->ckp; |
|
|
|
|
|
|
|
|
|
pthread_detach(pthread_self()); |
|
|
|
|
rename_proc(ckmsgq->name); |
|
|
|
|
|
|
|
|
|
while (42) { |
|
|
|
|
ckwqmsg_t *wqmsg; |
|
|
|
|
tv_t now; |
|
|
|
|
ts_t abs; |
|
|
|
|
|
|
|
|
|
mutex_lock(ckmsgq->lock); |
|
|
|
|
tv_time(&now); |
|
|
|
|
tv_to_ts(&abs, &now); |
|
|
|
|
abs.tv_sec++; |
|
|
|
|
if (!ckmsgq->wqmsgs) |
|
|
|
|
cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); |
|
|
|
|
wqmsg = ckmsgq->wqmsgs; |
|
|
|
|
if (wqmsg) |
|
|
|
|
DL_DELETE(ckmsgq->wqmsgs, wqmsg); |
|
|
|
|
mutex_unlock(ckmsgq->lock); |
|
|
|
|
|
|
|
|
|
if (!wqmsg) |
|
|
|
|
continue; |
|
|
|
|
wqmsg->func(ckp, wqmsg->data); |
|
|
|
|
free(wqmsg); |
|
|
|
|
} |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) |
|
|
|
|
{ |
|
|
|
|
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); |
|
|
|
@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
|
|
|
|
|
return ckmsgq; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) |
|
|
|
|
{ |
|
|
|
|
ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); |
|
|
|
|
mutex_t *lock; |
|
|
|
|
pthread_cond_t *cond; |
|
|
|
|
int i; |
|
|
|
|
|
|
|
|
|
lock = ckalloc(sizeof(mutex_t)); |
|
|
|
|
cond = ckalloc(sizeof(pthread_cond_t)); |
|
|
|
|
mutex_init(lock); |
|
|
|
|
cond_init(cond); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < count; i++) { |
|
|
|
|
snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); |
|
|
|
|
ckwq[i].ckp = ckp; |
|
|
|
|
ckwq[i].lock = lock; |
|
|
|
|
ckwq[i].cond = cond; |
|
|
|
|
create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ckwq; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Generic function for adding messages to a ckmsgq linked list and signal the
|
|
|
|
|
* ckmsgq parsing thread to wake up and process it. */ |
|
|
|
|
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) |
|
|
|
@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
|
|
|
|
|
mutex_lock(ckmsgq->lock); |
|
|
|
|
ckmsgq->messages++; |
|
|
|
|
DL_APPEND(ckmsgq->msgs, msg); |
|
|
|
|
pthread_cond_signal(ckmsgq->cond); |
|
|
|
|
pthread_cond_broadcast(ckmsgq->cond); |
|
|
|
|
mutex_unlock(ckmsgq->lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ckwq_add(ckwq_t *ckwq, const void *func, void *data) |
|
|
|
|
{ |
|
|
|
|
ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); |
|
|
|
|
|
|
|
|
|
wqmsg->func = func; |
|
|
|
|
wqmsg->data = data; |
|
|
|
|
|
|
|
|
|
mutex_lock(ckwq->lock); |
|
|
|
|
ckwq->messages++; |
|
|
|
|
DL_APPEND(ckwq->wqmsgs, wqmsg); |
|
|
|
|
pthread_cond_broadcast(ckwq->cond); |
|
|
|
|
mutex_unlock(ckwq->lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Return whether there are any messages queued in the ckmsgq linked list. */ |
|
|
|
|
bool ckmsgq_empty(ckmsgq_t *ckmsgq) |
|
|
|
|
{ |
|
|
|
|