Browse Source

Move generic ckmsgq functions to ckpool.c to allow ckp to be included

master
ckolivas 11 years ago
parent
commit
e8c47e29fa
  1. 56
      src/ckpool.c
  2. 22
      src/ckpool.h
  3. 54
      src/libckpool.c
  4. 21
      src/libckpool.h
  5. 7
      src/stratifier.c

56
src/ckpool.c

@ -74,6 +74,62 @@ void logmsg(int loglevel, const char *fmt, ...) {
}
}
/* Generic function for creating a message queue receiving and parsing thread */
static void *ckmsg_queue(void *arg)
{
ckmsgq_t *ckmsgq = (ckmsgq_t *)arg;
ckpool_t *ckp = ckmsgq->ckp;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckmsg_t *msg;
mutex_lock(&ckmsgq->lock);
if (!ckmsgq->msgs)
pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock);
msg = ckmsgq->msgs;
if (likely(msg))
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
if (unlikely(!msg))
continue;
ckmsgq->func(ckp, msg);
free(msg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func;
ckmsgq->ckp = ckp;
mutex_init(&ckmsgq->lock);
cond_init(&ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
return ckmsgq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq
* parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
{
ckmsg_t *msg = ckalloc(sizeof(ckmsg_t));
msg->data = data;
mutex_lock(&ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond);
mutex_unlock(&ckmsgq->lock);
}
/* Listen for incoming global requests. Always returns a response if possible */
static void *listener(void *arg)
{

22
src/ckpool.h

@ -126,6 +126,28 @@ struct ckpool_instance {
char **proxypass;
};
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
};
typedef struct ckmsgq ckmsgq_t;
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp);

54
src/libckpool.c

@ -76,60 +76,6 @@ void join_pthread(pthread_t thread)
pthread_join(thread, NULL);
}
/* Generic function for creating a message queue receiving and parsing thread */
static void *ckmsg_queue(void *arg)
{
ckmsgq_t *ckmsgq = (ckmsgq_t *)arg;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckmsg_t *msg;
mutex_lock(&ckmsgq->lock);
if (!ckmsgq->msgs)
pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock);
msg = ckmsgq->msgs;
if (likely(msg))
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
if (unlikely(!msg))
continue;
ckmsgq->func(msg);
free(msg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func;
mutex_init(&ckmsgq->lock);
cond_init(&ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
return ckmsgq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq
* parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
{
ckmsg_t *msg = ckalloc(sizeof(ckmsg_t));
msg->data = data;
mutex_lock(&ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond);
mutex_unlock(&ckmsgq->lock);
}
/* Place holders for when we add lock debugging */
#define GETLOCK(_lock, _file, _func, _line)
#define GOTLOCK(_lock, _file, _func, _line)

21
src/libckpool.h

@ -262,27 +262,6 @@ typedef struct unixsock unixsock_t;
typedef struct proc_instance proc_instance_t;
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(void *);
};
typedef struct ckmsgq ckmsgq_t;
ckmsgq_t *create_ckmsgq(const char *name, const void *func);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
/* No error checking with these, make sure we know they're valid already! */
static inline void json_strcpy(char *buf, json_t *val, const char *key)
{

7
src/stratifier.c

@ -188,7 +188,6 @@ static json_params_t *sshares;
static json_params_t *sauths;
struct ckdb_msg {
ckpool_t *ckp;
json_t *val;
int idtype;
};
@ -2094,13 +2093,13 @@ static void *authoriser(void *arg)
return NULL;
}
static void ckdbq_process(ckdb_msg_t *data)
static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
{
bool logged = false;
char *buf = NULL;
while (!buf) {
buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged);
buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged);
if (unlikely(!buf)) {
LOGWARNING("Failed to talk to ckdb, queueing messages");
sleep(5);
@ -2400,7 +2399,7 @@ int stratifier(proc_instance_t *pi)
cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp);
ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
cklock_init(&workbase_lock);
if (!ckp->proxy)

Loading…
Cancel
Save