Browse Source

Convert the generator loop to use unix receive queues

master
Con Kolivas 10 years ago
parent
commit
24c2a4a7bf
  1. 63
      src/generator.c

63
src/generator.c

@ -267,12 +267,20 @@ static void kill_server(server_instance_t *si)
dealloc(si->data); dealloc(si->data);
} }
static void clear_unix_msg(unix_msg_t **umsg)
{
if (*umsg) {
free((*umsg)->buf);
free(*umsg);
*umsg = NULL;
}
}
static int gen_loop(proc_instance_t *pi) static int gen_loop(proc_instance_t *pi)
{ {
int sockd = -1, ret = 0, selret;
server_instance_t *si = NULL; server_instance_t *si = NULL;
bool reconnecting = false; bool reconnecting = false;
unixsock_t *us = &pi->us; unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
bool started = false; bool started = false;
@ -280,9 +288,10 @@ static int gen_loop(proc_instance_t *pi)
connsock_t *cs; connsock_t *cs;
gbtbase_t *gbt; gbtbase_t *gbt;
char hash[68]; char hash[68];
int ret = 0;
reconnect: reconnect:
Close(sockd); clear_unix_msg(&umsg);
if (si) { if (si) {
kill_server(si); kill_server(si);
reconnecting = true; reconnecting = true;
@ -299,36 +308,24 @@ reconnect:
} }
retry: retry:
Close(sockd); clear_unix_msg(&umsg);
ckmsgq_add(gdata->srvchk, si); ckmsgq_add(gdata->srvchk, si);
do { do {
selret = wait_read_select(us->sockd, 5); umsg = get_unix_msg(pi);
if (!selret && !ping_main(ckp)) { if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting"); LOGEMERG("Generator failed to ping main process, exiting");
ret = 1; ret = 1;
goto out; goto out;
} }
} while (selret < 1); } while (!umsg);
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
LOGWARNING("Bitcoind socket invalidated, will attempt failover"); LOGWARNING("Bitcoind socket invalidated, will attempt failover");
goto reconnect; goto reconnect;
} }
sockd = accept(us->sockd, NULL, NULL); buf = umsg->buf;
if (sockd < 0) {
LOGEMERG("Failed to accept on generator socket");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in gen_loop");
goto retry;
}
LOGDEBUG("Generator received request: %s", buf); LOGDEBUG("Generator received request: %s", buf);
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
@ -338,41 +335,41 @@ retry:
if (!gen_gbtbase(cs, gbt)) { if (!gen_gbtbase(cs, gbt)) {
LOGWARNING("Failed to get block template from %s:%s", LOGWARNING("Failed to get block template from %s:%s",
cs->url, cs->port); cs->url, cs->port);
send_unix_msg(sockd, "Failed"); send_unix_msg(umsg->sockd, "Failed");
goto reconnect; goto reconnect;
} else { } else {
char *s = json_dumps(gbt->json, JSON_NO_UTF8); char *s = json_dumps(gbt->json, JSON_NO_UTF8);
send_unix_msg(sockd, s); send_unix_msg(umsg->sockd, s);
free(s); free(s);
clear_gbtbase(gbt); clear_gbtbase(gbt);
} }
} else if (cmdmatch(buf, "getbest")) { } else if (cmdmatch(buf, "getbest")) {
if (si->notify) if (si->notify)
send_unix_msg(sockd, "notify"); send_unix_msg(umsg->sockd, "notify");
else if (!get_bestblockhash(cs, hash)) { else if (!get_bestblockhash(cs, hash)) {
LOGINFO("No best block hash support from %s:%s", LOGINFO("No best block hash support from %s:%s",
cs->url, cs->port); cs->url, cs->port);
send_unix_msg(sockd, "failed"); send_unix_msg(umsg->sockd, "failed");
} else { } else {
if (unlikely(!started)) { if (unlikely(!started)) {
started = true; started = true;
LOGWARNING("%s generator ready", ckp->name); LOGWARNING("%s generator ready", ckp->name);
} }
send_unix_msg(sockd, hash); send_unix_msg(umsg->sockd, hash);
} }
} else if (cmdmatch(buf, "getlast")) { } else if (cmdmatch(buf, "getlast")) {
int height; int height;
if (si->notify) if (si->notify)
send_unix_msg(sockd, "notify"); send_unix_msg(umsg->sockd, "notify");
else if ((height = get_blockcount(cs)) == -1) { else if ((height = get_blockcount(cs)) == -1) {
send_unix_msg(sockd, "failed"); send_unix_msg(umsg->sockd, "failed");
goto reconnect; goto reconnect;
} else { } else {
LOGDEBUG("Height: %d", height); LOGDEBUG("Height: %d", height);
if (!get_blockhash(cs, height, hash)) { if (!get_blockhash(cs, height, hash)) {
send_unix_msg(sockd, "failed"); send_unix_msg(umsg->sockd, "failed");
goto reconnect; goto reconnect;
} else { } else {
if (unlikely(!started)) { if (unlikely(!started)) {
@ -380,7 +377,7 @@ retry:
LOGWARNING("%s generator ready", ckp->name); LOGWARNING("%s generator ready", ckp->name);
} }
send_unix_msg(sockd, hash); send_unix_msg(umsg->sockd, hash);
LOGDEBUG("Hash: %s", hash); LOGDEBUG("Hash: %s", hash);
} }
} }
@ -395,16 +392,16 @@ retry:
send_proc(ckp->stratifier, blockmsg); send_proc(ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) { } else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10)) if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true"); send_unix_msg(umsg->sockd, "true");
else else
send_unix_msg(sockd, "false"); send_unix_msg(umsg->sockd, "false");
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ping")) { } else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Generator received ping request"); LOGDEBUG("Generator received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(umsg->sockd, "pong");
} }
goto retry; goto retry;
@ -2734,6 +2731,8 @@ int generator(proc_instance_t *pi)
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
gdata->ckp = ckp; gdata->ckp = ckp;
create_unix_receiver(pi);
if (ckp->proxy) { if (ckp->proxy) {
char *buf = NULL; char *buf = NULL;

Loading…
Cancel
Save