From 0ea4ab043e58de319270d3649228e5bca7fd7196 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 20 Apr 2015 13:41:57 +1000 Subject: [PATCH] Move stratifier to use unix message receiving queues --- src/stratifier.c | 53 +++++++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 7ea1497f..49a2cbf6 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1737,13 +1737,18 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { - int sockd, ret = 0, selret = 0; sdata_t *sdata = ckp->data; - unixsock_t *us = &pi->us; + unix_msg_t *umsg = NULL; tv_t start_tv = {0, 0}; - char *buf = NULL; + int ret = 0; + char *buf; retry: + if (umsg) { + free(umsg->buf); + dealloc(umsg); + } + do { double tdiff; tv_t end_tv; @@ -1761,43 +1766,30 @@ retry: ckp->update_interval); broadcast_ping(sdata); } - continue; } - selret = wait_read_select(us->sockd, 5); - if (!selret && !ping_main(ckp)) { + + umsg = get_unix_msg(pi); + if (unlikely(!umsg &&!ping_main(ckp))) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; goto out; } - } while (selret < 1); + } while (!umsg); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGERR("Failed to accept on stratifier socket, exiting"); - ret = 1; - goto out; - } - - dealloc(buf); - buf = recv_unix_msg(sockd); - if (unlikely(!buf)) { - Close(sockd); - LOGWARNING("Failed to get message in stratum_loop"); - goto retry; - } + buf = umsg->buf; if (likely(buf[0] == '{')) { /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckmsgq_add(sdata->srecvs, buf); - Close(sockd); - buf = NULL; + Close(umsg->sockd); + ckmsgq_add(sdata->srecvs, umsg->buf); + umsg->buf = NULL; goto retry; } if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); - send_unix_msg(sockd, "pong"); - Close(sockd); + send_unix_msg(umsg->sockd, "pong"); + Close(umsg->sockd); goto retry; } if (cmdmatch(buf, "stats")) { @@ -1805,12 +1797,12 @@ retry: LOGDEBUG("Stratifier received stats request"); msg = stratifier_stats(ckp, sdata); - send_unix_msg(sockd, msg); - Close(sockd); + send_unix_msg(umsg->sockd, msg); + Close(umsg->sockd); goto retry; } - Close(sockd); + Close(umsg->sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; @@ -1848,7 +1840,6 @@ retry: goto retry; out: - dealloc(buf); return ret; } @@ -4535,6 +4526,8 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); + create_unix_receiver(pi); + LOGWARNING("%s stratifier ready", ckp->name); ret = stratum_loop(ckp, pi);