Browse Source

Make ckdb messaging use the generic ckmsg queues

master
ckolivas 11 years ago
parent
commit
ea27e86a33
  1. 62
      src/stratifier.c

62
src/stratifier.c

@ -162,14 +162,12 @@ static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock; static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock; static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock; static pthread_mutex_t sauth_lock;
static pthread_mutex_t ckdbq_lock;
/* For signalling the threads to wake up and do work */ /* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond; static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond; static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond; static pthread_cond_t sauth_cond;
static pthread_cond_t ckdbq_cond;
/* For the linked list of all queued messages */ /* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_recvs;
@ -190,16 +188,14 @@ static json_params_t *sshares;
static json_params_t *sauths; static json_params_t *sauths;
struct ckdb_msg { struct ckdb_msg {
struct ckdb_msg *next; ckpool_t *ckp;
struct ckdb_msg *prev;
json_t *val; json_t *val;
int idtype; int idtype;
}; };
typedef struct ckdb_msg ckdb_msg_t; typedef struct ckdb_msg ckdb_msg_t;
static ckdb_msg_t *ckdb_msgs; static ckmsgq_t *ckdbq;
static int user_instance_id; static int user_instance_id;
@ -427,11 +423,7 @@ static void ckdbq_add(const int idtype, json_t *val)
msg->val = val; msg->val = val;
msg->idtype = idtype; msg->idtype = idtype;
ckmsgq_add(ckdbq, msg);
mutex_lock(&ckdbq_lock);
DL_APPEND(ckdb_msgs, msg);
pthread_cond_signal(&ckdbq_cond);
mutex_unlock(&ckdbq_lock);
} }
static void send_workinfo(ckpool_t *ckp, workbase_t *wb) static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
@ -2102,42 +2094,21 @@ static void *authoriser(void *arg)
return NULL; return NULL;
} }
static void *ckdbqueue(void *arg) static void ckdbq_process(ckdb_msg_t *data)
{ {
ckpool_t *ckp = (ckpool_t *)arg; bool logged = false;
char *buf = NULL;
pthread_detach(pthread_self());
rename_proc("ckdbqueue");
while (42) {
bool logged = false;
char *buf = NULL;
ckdb_msg_t *msg;
mutex_lock(&ckdbq_lock);
if (!ckdb_msgs)
pthread_cond_wait(&ckdbq_cond, &ckdbq_lock);
msg = ckdb_msgs;
if (likely(msg))
DL_DELETE(ckdb_msgs, msg);
mutex_unlock(&ckdbq_lock);
if (unlikely(!msg))
continue;
while (!buf) { while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val, logged); buf = json_ckdb_call(data->ckp, ckdb_ids[data->idtype], data->val, logged);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Failed to talk to ckdb, queueing messages"); LOGWARNING("Failed to talk to ckdb, queueing messages");
sleep(5); sleep(5);
}
logged = true;
} }
LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); logged = true;
free(buf);
} }
LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf);
return NULL; free(buf);
} }
static const double nonces = 4294967296; static const double nonces = 4294967296;
@ -2388,7 +2359,6 @@ int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser;
pthread_t pth_ckdbqueue;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2430,9 +2400,7 @@ int stratifier(proc_instance_t *pi)
cond_init(&sauth_cond); cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp); create_pthread(&pth_authoriser, authoriser, ckp);
mutex_init(&ckdbq_lock); ckdbq = create_ckmsgq("ckdbqueue", &ckdbq_process);
cond_init(&ckdbq_cond);
create_pthread(&pth_ckdbqueue, ckdbqueue, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)

Loading…
Cancel
Save