Browse Source

Send a heartbeat every second to ckdb to allow ckdb to send information not specifically requested while maintaining ckpool msg-response relationship. Do not send heartbeats if there are messages queued already for when ckdb is offline/busy.

master
Con Kolivas 10 years ago
parent
commit
991877134a
  1. 55
      src/stratifier.c

55
src/stratifier.c

@ -296,6 +296,7 @@ static int gen_priority;
#define ID_USERSTATS 6 #define ID_USERSTATS 6
#define ID_BLOCK 7 #define ID_BLOCK 7
#define ID_ADDRAUTH 8 #define ID_ADDRAUTH 8
#define ID_HEARTBEAT 9
static const char *ckdb_ids[] = { static const char *ckdb_ids[] = {
"authorise", "authorise",
@ -307,6 +308,7 @@ static const char *ckdb_ids[] = {
"userstats", "userstats",
"block", "block",
"addrauth", "addrauth",
"heartbeat"
}; };
static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) static void generate_coinbase(ckpool_t *ckp, workbase_t *wb)
@ -1540,8 +1542,9 @@ static int send_recv_auth(stratum_instance_t *client)
free(json_msg); free(json_msg);
if (likely(buf)) { if (likely(buf)) {
char *secondaryuserid, *response = alloca(128); char *secondaryuserid, response[PAGESIZE] = {};
LOGINFO("Got ckdb response: %s", buf);
sscanf(buf, "id.%*d.%s", response); sscanf(buf, "id.%*d.%s", response);
secondaryuserid = response; secondaryuserid = response;
strsep(&secondaryuserid, "."); strsep(&secondaryuserid, ".");
@ -2554,8 +2557,18 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
failed = false; failed = false;
LOGWARNING("Successfully resumed talking to ckdb"); LOGWARNING("Successfully resumed talking to ckdb");
} }
LOGINFO("Got ckdb response: %s", buf); /* TODO: Process any requests from ckdb that are heartbeat responses
free(buf); * with specific requests. */
if (likely(buf)) {
char response[PAGESIZE] = {};
sscanf(buf, "id.%*d.%s", response);
if (safecmp(response, "ok"))
LOGINFO("Got ckdb response: %s", buf);
else
LOGWARNING("Got failed ckdb response: %s", buf);
free(buf);
}
} }
static int transactions_by_jobid(int64_t id) static int transactions_by_jobid(int64_t id)
@ -2935,9 +2948,41 @@ static void *statsupdate(void *arg)
return NULL; return NULL;
} }
/* Sends a heartbeat to ckdb every second to maintain the relationship of
* ckpool always initiating a request -> getting a ckdb response, but allows
* ckdb to provide specific commands to ckpool. */
static void *ckdb_heartbeat(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("heartbeat");
while (42) {
char cdfield[64];
ts_t ts_now;
json_t *val;
cksleep_ms(1000);
if (unlikely(!ckmsgq_empty(ckdbq))) {
LOGDEBUG("Witholding heartbeat due to ckdb messages being queued");
continue;
}
ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
JSON_CPACK(val, "{ss,ss,ss,ss}",
"createdate", cdfield,
"createby", "code",
"createcode", __func__,
"createinet", ckp->serverurl);
ckdbq_add(ckp, ID_HEARTBEAT, val);
}
return NULL;
}
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_statsupdate; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 1; int ret = 1;
char *buf; char *buf;
@ -2987,6 +3032,8 @@ int stratifier(proc_instance_t *pi)
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!ckp->standalone)
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)

Loading…
Cancel
Save