Browse Source

Move stratifier to use unix message receiving queues

master
Con Kolivas 10 years ago
parent
commit
0ea4ab043e
  1. 53
      src/stratifier.c

53
src/stratifier.c

@ -1737,13 +1737,18 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret = 0;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
unixsock_t *us = &pi->us; unix_msg_t *umsg = NULL;
tv_t start_tv = {0, 0}; tv_t start_tv = {0, 0};
char *buf = NULL; int ret = 0;
char *buf;
retry: retry:
if (umsg) {
free(umsg->buf);
dealloc(umsg);
}
do { do {
double tdiff; double tdiff;
tv_t end_tv; tv_t end_tv;
@ -1761,43 +1766,30 @@ retry:
ckp->update_interval); ckp->update_interval);
broadcast_ping(sdata); broadcast_ping(sdata);
} }
continue;
} }
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) { umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting"); LOGEMERG("Generator failed to ping main process, exiting");
ret = 1; ret = 1;
goto out; goto out;
} }
} while (selret < 1); } while (!umsg);
sockd = accept(us->sockd, NULL, NULL); buf = umsg->buf;
if (sockd < 0) {
LOGERR("Failed to accept on stratifier socket, exiting");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (unlikely(!buf)) {
Close(sockd);
LOGWARNING("Failed to get message in stratum_loop");
goto retry;
}
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
/* The bulk of the messages will be received json from the /* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees * connector so look for this first. The srecv_process frees
* the buf heap ram */ * the buf heap ram */
ckmsgq_add(sdata->srecvs, buf); Close(umsg->sockd);
Close(sockd); ckmsgq_add(sdata->srecvs, umsg->buf);
buf = NULL; umsg->buf = NULL;
goto retry; goto retry;
} }
if (cmdmatch(buf, "ping")) { if (cmdmatch(buf, "ping")) {
LOGDEBUG("Stratifier received ping request"); LOGDEBUG("Stratifier received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(umsg->sockd, "pong");
Close(sockd); Close(umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "stats")) { if (cmdmatch(buf, "stats")) {
@ -1805,12 +1797,12 @@ retry:
LOGDEBUG("Stratifier received stats request"); LOGDEBUG("Stratifier received stats request");
msg = stratifier_stats(ckp, sdata); msg = stratifier_stats(ckp, sdata);
send_unix_msg(sockd, msg); send_unix_msg(umsg->sockd, msg);
Close(sockd); Close(umsg->sockd);
goto retry; goto retry;
} }
Close(sockd); Close(umsg->sockd);
LOGDEBUG("Stratifier received request: %s", buf); LOGDEBUG("Stratifier received request: %s", buf);
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
@ -1848,7 +1840,6 @@ retry:
goto retry; goto retry;
out: out:
dealloc(buf);
return ret; return ret;
} }
@ -4535,6 +4526,8 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->share_lock); mutex_init(&sdata->share_lock);
mutex_init(&sdata->block_lock); mutex_init(&sdata->block_lock);
create_unix_receiver(pi);
LOGWARNING("%s stratifier ready", ckp->name); LOGWARNING("%s stratifier ready", ckp->name);
ret = stratum_loop(ckp, pi); ret = stratum_loop(ckp, pi);

Loading…
Cancel
Save