diff --git a/src/ckpool.c b/src/ckpool.c index adc3a5d0..e7ff6d5d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -7,7 +7,6 @@ * any later version. See COPYING for more details. */ -#include #include #include #include @@ -26,14 +25,6 @@ /* Only global variable, to be used only by sighandler */ static ckpool_t *global_ckp; -static void rename_proc(const char *name) -{ - char buf[16]; - - snprintf(buf, 16, "ckp@%s", name); - prctl(PR_SET_NAME, buf, 0, 0, 0); -} - static void *listener(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; diff --git a/src/generator.c b/src/generator.c index b7a21770..887ec866 100644 --- a/src/generator.c +++ b/src/generator.c @@ -24,6 +24,7 @@ static int gen_loop(proc_instance_t *pi, connsock_t *cs) int sockd, ret = 0; char *buf = NULL; gbtbase_t gbt; + char hash[68]; memset(&gbt, 0, sizeof(gbt)); retry: @@ -59,6 +60,30 @@ retry: clear_gbtbase(&gbt); } } + if (!strncasecmp(buf, "getbest", 7)) { + if (!get_bestblockhash(cs, hash)) { + LOGWARNING("No best block hash support from %s:%s", + cs->url, cs->port); + send_unix_msg(sockd, "Failed"); + } else { + send_unix_msg(sockd, hash); + } + } + if (!strncasecmp(buf, "getlast", 7)) { + int height = get_blockcount(cs); + + if (height == -1) + send_unix_msg(sockd, "Failed"); + else { + LOGDEBUG("Height: %d", height); + if (!get_blockhash(cs, height, hash)) + send_unix_msg(sockd, "Failed"); + else { + send_unix_msg(sockd, hash); + LOGDEBUG("Hash: %s", hash); + } + } + } close(sockd); goto retry; diff --git a/src/libckpool.c b/src/libckpool.c index ab3f8e99..d1f4b417 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -16,6 +16,7 @@ #else #include #endif +#include #include #include #include @@ -36,6 +37,15 @@ #define UNIX_PATH_MAX 108 #endif +void rename_proc(const char *name) +{ + char buf[16]; + + snprintf(buf, 15, "ckp@%s", name); + buf[15] = '\0'; + prctl(PR_SET_NAME, buf, 0, 0, 0); +} + void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg) { int ret = pthread_create(thread, NULL, start_routine, arg); @@ -506,7 +516,7 @@ void empty_socket(int fd) do { char buf[PAGESIZE]; - tv_t timeout = {1, 0}; + tv_t timeout = {0, 0}; fd_set rd; FD_ZERO(&rd); @@ -514,8 +524,11 @@ void empty_socket(int fd) ret = select(fd + 1, &rd, NULL, NULL, &timeout); if (ret < 0 && interrupted()) continue; - if (ret > 0) + if (ret > 0) { ret = recv(fd, buf, PAGESIZE - 1, 0); + buf[ret] = 0; + LOGDEBUG("Discarding: %s", buf); + } } while (ret > 0); } @@ -634,10 +647,19 @@ out: * string.*/ char *recv_unix_msg(int sockd) { + tv_t tv_timeout = {1, 0}; char *buf = NULL; uint32_t msglen; + fd_set readfs; int ret, ofs; + FD_ZERO(&readfs); + FD_SET(sockd, &readfs); + ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); + if (ret < 1) { + LOGERR("Select1 failed in recv_unix_msg"); + return false; + } /* Get message length */ ret = read(sockd, &msglen, 4); if (ret < 4) { @@ -653,6 +675,16 @@ char *recv_unix_msg(int sockd) buf[msglen] = 0; ofs = 0; while (msglen) { + tv_timeout.tv_sec = 1; + tv_timeout.tv_usec = 0; + + FD_ZERO(&readfs); + FD_SET(sockd, &readfs); + ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); + if (ret < 1) { + LOGERR("Select2 failed in recv_unix_msg"); + return false; + } ret = read(sockd, buf + ofs, msglen); if (unlikely(ret < 0)) { LOGERR("Failed to read %d bytes in recv_unix_msg", msglen); @@ -668,7 +700,9 @@ out: bool send_unix_msg(int sockd, const char *buf) { + tv_t tv_timeout = {1, 0}; uint32_t msglen, len; + fd_set writefds; int ret, ofs; len = strlen(buf); @@ -677,6 +711,13 @@ bool send_unix_msg(int sockd, const char *buf) return false; } msglen = htole32(len); + FD_ZERO(&writefds); + FD_SET(sockd, &writefds); + ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); + if (ret < 1) { + LOGERR("Select1 failed in send_unix_msg"); + return false; + } ret = write(sockd, &msglen, 4); if (unlikely(ret < 4)) { LOGERR("Failed to write 4 byte length in send_unix_msg"); @@ -684,6 +725,16 @@ bool send_unix_msg(int sockd, const char *buf) } ofs = 0; while (len) { + tv_timeout.tv_sec = 1; + tv_timeout.tv_usec = 0; + + FD_ZERO(&writefds); + FD_SET(sockd, &writefds); + ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); + if (ret < 1) { + LOGERR("Select2 failed in send_unix_msg"); + return false; + } ret = write(sockd, buf + ofs, len); if (unlikely(ret < 0)) { LOGERR("Failed to write %d bytes in send_unix_msg", len); @@ -704,7 +755,7 @@ bool send_proc(proc_instance_t *pi, const char *msg) int sockd; if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message to null path in send_proc"); + LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } if (unlikely(!msg || !strlen(msg))) { @@ -733,7 +784,7 @@ char *send_recv_proc(proc_instance_t *pi, const char *msg) int sockd; if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message to null path in send_proc"); + LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } if (unlikely(!msg || !strlen(msg))) { @@ -799,30 +850,37 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) ret = write_socket(cs->fd, http_req, len); if (ret != len) { LOGWARNING("Failed to write to socket in json_rpc_call"); - empty_socket(cs->fd); - goto out; + goto out_empty; } ret = read_socket_line(cs); if (ret < 1) { LOGWARNING("Failed to read socket line in json_rpc_call"); - goto out; + goto out_empty; } if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) { LOGWARNING("HTTP response not ok: %s", cs->buf); - empty_socket(cs->fd); - goto out; + goto out_empty; } do { ret = read_socket_line(cs); if (ret < 1) { LOGWARNING("Failed to read http socket lines in json_rpc_call"); - goto out; + goto out_empty; } } while (strncmp(cs->buf, "{", 1)); val = json_loads(cs->buf, 0, &err_val); if (!val) LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text); +out_empty: + empty_socket(cs->fd); + if (!val) { + /* Assume that a failed request means the socket will be closed + * and reopen it */ + LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); + close(cs->fd); + cs->fd = connect_socket(cs->url, cs->port); + } out: dealloc(cs->buf); return val; diff --git a/src/libckpool.h b/src/libckpool.h index da373f5c..be186473 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -168,6 +168,7 @@ struct ckpool_instance { int update_interval; // Seconds between stratum updates }; +void rename_proc(const char *name); void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg); void join_pthread(pthread_t thread); diff --git a/src/stratifier.c b/src/stratifier.c index 4e188cf5..a3e0c8b8 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -283,8 +283,36 @@ out: return ret; } +static void *blockupdate(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + char *buf = NULL, hash[68]; + char request[8]; + + rename_proc("blockupdate"); + buf = send_recv_proc(&ckp->generator, "getbest"); + if (buf && strncasecmp(buf, "Failed", 6)) + sprintf(request, "getbest"); + else + sprintf(request, "getlast"); + + memset(hash, 0, 68); + while (42) { + dealloc(buf); + buf = send_recv_proc(&ckp->generator, request); + if (buf && strcmp(buf, hash) && strncasecmp(buf, "Failed", 6)) { + strcpy(hash, buf); + LOGINFO("Detected hash change to %s", hash); + send_proc(&ckp->stratifier, "update"); + } else + cksleep_ms(ckp->blockpoll); + } + return NULL; +} + int stratifier(proc_instance_t *pi) { + pthread_t pth_blockupdate; ckpool_t *ckp = pi->ckp; int ret = 0; @@ -292,6 +320,8 @@ int stratifier(proc_instance_t *pi) hex2bin(scriptsig_header_bin, scriptsig_header, 41); address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress); __bin2hex(pubkeytxn, pubkeytxnbin, 25); + + create_pthread(&pth_blockupdate, blockupdate, ckp); strat_loop(ckp, pi); return ret; }