Browse Source

Make it possible to create many threads associated with a ckmsgq

master
Con Kolivas 10 years ago
parent
commit
995fce28c1
  1. 46
      src/ckpool.c
  2. 5
      src/ckpool.h
  3. 6
      src/stratifier.c

46
src/ckpool.c

@ -115,16 +115,16 @@ static void *ckmsg_queue(void *arg)
tv_t now;
ts_t abs;
mutex_lock(&ckmsgq->lock);
mutex_lock(ckmsgq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckmsgq->msgs)
pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs);
pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
msg = ckmsgq->msgs;
if (msg)
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
mutex_unlock(ckmsgq->lock);
if (!msg)
continue;
@ -141,13 +141,39 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func;
ckmsgq->ckp = ckp;
mutex_init(&ckmsgq->lock);
cond_init(&ckmsgq->cond);
ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t));
ckmsgq->cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(ckmsgq->lock);
cond_init(ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
return ckmsgq;
}
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count);
pthread_mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(pthread_mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckmsgq[i].name, 15, "%.8s%x", name, i);
ckmsgq[i].func = func;
ckmsgq[i].ckp = ckp;
ckmsgq[i].lock = lock;
ckmsgq[i].cond = cond;
create_pthread(&ckmsgq[i].pth, ckmsg_queue, &ckmsgq[i]);
}
return ckmsgq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -156,10 +182,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
msg->data = data;
mutex_lock(&ckmsgq->lock);
mutex_lock(ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond);
mutex_unlock(&ckmsgq->lock);
pthread_cond_signal(ckmsgq->cond);
mutex_unlock(ckmsgq->lock);
}
/* Return whether there are any messages queued in the ckmsgq linked list. */
@ -167,10 +193,10 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{
bool ret = true;
mutex_lock(&ckmsgq->lock);
mutex_lock(ckmsgq->lock);
if (ckmsgq->msgs)
ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev);
mutex_unlock(&ckmsgq->lock);
mutex_unlock(ckmsgq->lock);
return ret;
}

5
src/ckpool.h

@ -34,8 +34,8 @@ struct ckmsgq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_mutex_t *lock;
pthread_cond_t *cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
};
@ -181,6 +181,7 @@ struct ckpool_instance {
#endif
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq);

6
src/stratifier.c

@ -1110,13 +1110,13 @@ static void stratum_broadcast(json_t *val)
if (!bulk_send)
return;
mutex_lock(&ssends->lock);
mutex_lock(ssends->lock);
if (ssends->msgs)
DL_CONCAT(ssends->msgs, bulk_send);
else
ssends->msgs = bulk_send;
pthread_cond_signal(&ssends->cond);
mutex_unlock(&ssends->lock);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
}
static void stratum_add_send(json_t *val, int64_t client_id)

Loading…
Cancel
Save