diff --git a/src/stratifier.c b/src/stratifier.c index ae268451..261c2ced 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -296,6 +296,7 @@ static int gen_priority; #define ID_USERSTATS 6 #define ID_BLOCK 7 #define ID_ADDRAUTH 8 +#define ID_HEARTBEAT 9 static const char *ckdb_ids[] = { "authorise", @@ -307,6 +308,7 @@ static const char *ckdb_ids[] = { "userstats", "block", "addrauth", + "heartbeat" }; static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) @@ -1540,8 +1542,9 @@ static int send_recv_auth(stratum_instance_t *client) free(json_msg); if (likely(buf)) { - char *secondaryuserid, *response = alloca(128); + char *secondaryuserid, response[PAGESIZE] = {}; + LOGINFO("Got ckdb response: %s", buf); sscanf(buf, "id.%*d.%s", response); secondaryuserid = response; strsep(&secondaryuserid, "."); @@ -2554,8 +2557,18 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) failed = false; LOGWARNING("Successfully resumed talking to ckdb"); } - LOGINFO("Got ckdb response: %s", buf); - free(buf); + /* TODO: Process any requests from ckdb that are heartbeat responses + * with specific requests. */ + if (likely(buf)) { + char response[PAGESIZE] = {}; + + sscanf(buf, "id.%*d.%s", response); + if (safecmp(response, "ok")) + LOGINFO("Got ckdb response: %s", buf); + else + LOGWARNING("Got failed ckdb response: %s", buf); + free(buf); + } } static int transactions_by_jobid(int64_t id) @@ -2935,9 +2948,41 @@ static void *statsupdate(void *arg) return NULL; } +/* Sends a heartbeat to ckdb every second to maintain the relationship of + * ckpool always initiating a request -> getting a ckdb response, but allows + * ckdb to provide specific commands to ckpool. */ +static void *ckdb_heartbeat(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + + pthread_detach(pthread_self()); + rename_proc("heartbeat"); + + while (42) { + char cdfield[64]; + ts_t ts_now; + json_t *val; + + cksleep_ms(1000); + if (unlikely(!ckmsgq_empty(ckdbq))) { + LOGDEBUG("Witholding heartbeat due to ckdb messages being queued"); + continue; + } + ts_realtime(&ts_now); + sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); + JSON_CPACK(val, "{ss,ss,ss,ss}", + "createdate", cdfield, + "createby", "code", + "createcode", __func__, + "createinet", ckp->serverurl); + ckdbq_add(ckp, ID_HEARTBEAT, val); + } + return NULL; +} + int stratifier(proc_instance_t *pi) { - pthread_t pth_blockupdate, pth_statsupdate; + pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; ckpool_t *ckp = pi->ckp; int ret = 1; char *buf; @@ -2987,6 +3032,8 @@ int stratifier(proc_instance_t *pi) sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); + if (!ckp->standalone) + create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); cklock_init(&workbase_lock); if (!ckp->proxy)