Browse Source

Add a block update thread that polls bitcoind regularly for the latest hash and handle failed json requests as closed sockets

master
Con Kolivas 11 years ago
parent
commit
2b1f9510cf
  1. 9
      src/ckpool.c
  2. 25
      src/generator.c
  3. 78
      src/libckpool.c
  4. 1
      src/libckpool.h
  5. 30
      src/stratifier.c

9
src/ckpool.c

@ -7,7 +7,6 @@
* any later version. See COPYING for more details. * any later version. See COPYING for more details.
*/ */
#include <sys/prctl.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
@ -26,14 +25,6 @@
/* Only global variable, to be used only by sighandler */ /* Only global variable, to be used only by sighandler */
static ckpool_t *global_ckp; static ckpool_t *global_ckp;
static void rename_proc(const char *name)
{
char buf[16];
snprintf(buf, 16, "ckp@%s", name);
prctl(PR_SET_NAME, buf, 0, 0, 0);
}
static void *listener(void *arg) static void *listener(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;

25
src/generator.c

@ -24,6 +24,7 @@ static int gen_loop(proc_instance_t *pi, connsock_t *cs)
int sockd, ret = 0; int sockd, ret = 0;
char *buf = NULL; char *buf = NULL;
gbtbase_t gbt; gbtbase_t gbt;
char hash[68];
memset(&gbt, 0, sizeof(gbt)); memset(&gbt, 0, sizeof(gbt));
retry: retry:
@ -59,6 +60,30 @@ retry:
clear_gbtbase(&gbt); clear_gbtbase(&gbt);
} }
} }
if (!strncasecmp(buf, "getbest", 7)) {
if (!get_bestblockhash(cs, hash)) {
LOGWARNING("No best block hash support from %s:%s",
cs->url, cs->port);
send_unix_msg(sockd, "Failed");
} else {
send_unix_msg(sockd, hash);
}
}
if (!strncasecmp(buf, "getlast", 7)) {
int height = get_blockcount(cs);
if (height == -1)
send_unix_msg(sockd, "Failed");
else {
LOGDEBUG("Height: %d", height);
if (!get_blockhash(cs, height, hash))
send_unix_msg(sockd, "Failed");
else {
send_unix_msg(sockd, hash);
LOGDEBUG("Hash: %s", hash);
}
}
}
close(sockd); close(sockd);
goto retry; goto retry;

78
src/libckpool.c

@ -16,6 +16,7 @@
#else #else
#include <sys/un.h> #include <sys/un.h>
#endif #endif
#include <sys/prctl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <netdb.h> #include <netdb.h>
#include <unistd.h> #include <unistd.h>
@ -36,6 +37,15 @@
#define UNIX_PATH_MAX 108 #define UNIX_PATH_MAX 108
#endif #endif
void rename_proc(const char *name)
{
char buf[16];
snprintf(buf, 15, "ckp@%s", name);
buf[15] = '\0';
prctl(PR_SET_NAME, buf, 0, 0, 0);
}
void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg) void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg)
{ {
int ret = pthread_create(thread, NULL, start_routine, arg); int ret = pthread_create(thread, NULL, start_routine, arg);
@ -506,7 +516,7 @@ void empty_socket(int fd)
do { do {
char buf[PAGESIZE]; char buf[PAGESIZE];
tv_t timeout = {1, 0}; tv_t timeout = {0, 0};
fd_set rd; fd_set rd;
FD_ZERO(&rd); FD_ZERO(&rd);
@ -514,8 +524,11 @@ void empty_socket(int fd)
ret = select(fd + 1, &rd, NULL, NULL, &timeout); ret = select(fd + 1, &rd, NULL, NULL, &timeout);
if (ret < 0 && interrupted()) if (ret < 0 && interrupted())
continue; continue;
if (ret > 0) if (ret > 0) {
ret = recv(fd, buf, PAGESIZE - 1, 0); ret = recv(fd, buf, PAGESIZE - 1, 0);
buf[ret] = 0;
LOGDEBUG("Discarding: %s", buf);
}
} while (ret > 0); } while (ret > 0);
} }
@ -634,10 +647,19 @@ out:
* string.*/ * string.*/
char *recv_unix_msg(int sockd) char *recv_unix_msg(int sockd)
{ {
tv_t tv_timeout = {1, 0};
char *buf = NULL; char *buf = NULL;
uint32_t msglen; uint32_t msglen;
fd_set readfs;
int ret, ofs; int ret, ofs;
FD_ZERO(&readfs);
FD_SET(sockd, &readfs);
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout);
if (ret < 1) {
LOGERR("Select1 failed in recv_unix_msg");
return false;
}
/* Get message length */ /* Get message length */
ret = read(sockd, &msglen, 4); ret = read(sockd, &msglen, 4);
if (ret < 4) { if (ret < 4) {
@ -653,6 +675,16 @@ char *recv_unix_msg(int sockd)
buf[msglen] = 0; buf[msglen] = 0;
ofs = 0; ofs = 0;
while (msglen) { while (msglen) {
tv_timeout.tv_sec = 1;
tv_timeout.tv_usec = 0;
FD_ZERO(&readfs);
FD_SET(sockd, &readfs);
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout);
if (ret < 1) {
LOGERR("Select2 failed in recv_unix_msg");
return false;
}
ret = read(sockd, buf + ofs, msglen); ret = read(sockd, buf + ofs, msglen);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGERR("Failed to read %d bytes in recv_unix_msg", msglen); LOGERR("Failed to read %d bytes in recv_unix_msg", msglen);
@ -668,7 +700,9 @@ out:
bool send_unix_msg(int sockd, const char *buf) bool send_unix_msg(int sockd, const char *buf)
{ {
tv_t tv_timeout = {1, 0};
uint32_t msglen, len; uint32_t msglen, len;
fd_set writefds;
int ret, ofs; int ret, ofs;
len = strlen(buf); len = strlen(buf);
@ -677,6 +711,13 @@ bool send_unix_msg(int sockd, const char *buf)
return false; return false;
} }
msglen = htole32(len); msglen = htole32(len);
FD_ZERO(&writefds);
FD_SET(sockd, &writefds);
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout);
if (ret < 1) {
LOGERR("Select1 failed in send_unix_msg");
return false;
}
ret = write(sockd, &msglen, 4); ret = write(sockd, &msglen, 4);
if (unlikely(ret < 4)) { if (unlikely(ret < 4)) {
LOGERR("Failed to write 4 byte length in send_unix_msg"); LOGERR("Failed to write 4 byte length in send_unix_msg");
@ -684,6 +725,16 @@ bool send_unix_msg(int sockd, const char *buf)
} }
ofs = 0; ofs = 0;
while (len) { while (len) {
tv_timeout.tv_sec = 1;
tv_timeout.tv_usec = 0;
FD_ZERO(&writefds);
FD_SET(sockd, &writefds);
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout);
if (ret < 1) {
LOGERR("Select2 failed in send_unix_msg");
return false;
}
ret = write(sockd, buf + ofs, len); ret = write(sockd, buf + ofs, len);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGERR("Failed to write %d bytes in send_unix_msg", len); LOGERR("Failed to write %d bytes in send_unix_msg", len);
@ -704,7 +755,7 @@ bool send_proc(proc_instance_t *pi, const char *msg)
int sockd; int sockd;
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message to null path in send_proc"); LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out; goto out;
} }
if (unlikely(!msg || !strlen(msg))) { if (unlikely(!msg || !strlen(msg))) {
@ -733,7 +784,7 @@ char *send_recv_proc(proc_instance_t *pi, const char *msg)
int sockd; int sockd;
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message to null path in send_proc"); LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out; goto out;
} }
if (unlikely(!msg || !strlen(msg))) { if (unlikely(!msg || !strlen(msg))) {
@ -799,30 +850,37 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
ret = write_socket(cs->fd, http_req, len); ret = write_socket(cs->fd, http_req, len);
if (ret != len) { if (ret != len) {
LOGWARNING("Failed to write to socket in json_rpc_call"); LOGWARNING("Failed to write to socket in json_rpc_call");
empty_socket(cs->fd); goto out_empty;
goto out;
} }
ret = read_socket_line(cs); ret = read_socket_line(cs);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read socket line in json_rpc_call"); LOGWARNING("Failed to read socket line in json_rpc_call");
goto out; goto out_empty;
} }
if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) { if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) {
LOGWARNING("HTTP response not ok: %s", cs->buf); LOGWARNING("HTTP response not ok: %s", cs->buf);
empty_socket(cs->fd); goto out_empty;
goto out;
} }
do { do {
ret = read_socket_line(cs); ret = read_socket_line(cs);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read http socket lines in json_rpc_call"); LOGWARNING("Failed to read http socket lines in json_rpc_call");
goto out; goto out_empty;
} }
} while (strncmp(cs->buf, "{", 1)); } while (strncmp(cs->buf, "{", 1));
val = json_loads(cs->buf, 0, &err_val); val = json_loads(cs->buf, 0, &err_val);
if (!val) if (!val)
LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text); LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text);
out_empty:
empty_socket(cs->fd);
if (!val) {
/* Assume that a failed request means the socket will be closed
* and reopen it */
LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port);
close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port);
}
out: out:
dealloc(cs->buf); dealloc(cs->buf);
return val; return val;

1
src/libckpool.h

@ -168,6 +168,7 @@ struct ckpool_instance {
int update_interval; // Seconds between stratum updates int update_interval; // Seconds between stratum updates
}; };
void rename_proc(const char *name);
void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg); void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg);
void join_pthread(pthread_t thread); void join_pthread(pthread_t thread);

30
src/stratifier.c

@ -283,8 +283,36 @@ out:
return ret; return ret;
} }
static void *blockupdate(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
char *buf = NULL, hash[68];
char request[8];
rename_proc("blockupdate");
buf = send_recv_proc(&ckp->generator, "getbest");
if (buf && strncasecmp(buf, "Failed", 6))
sprintf(request, "getbest");
else
sprintf(request, "getlast");
memset(hash, 0, 68);
while (42) {
dealloc(buf);
buf = send_recv_proc(&ckp->generator, request);
if (buf && strcmp(buf, hash) && strncasecmp(buf, "Failed", 6)) {
strcpy(hash, buf);
LOGINFO("Detected hash change to %s", hash);
send_proc(&ckp->stratifier, "update");
} else
cksleep_ms(ckp->blockpoll);
}
return NULL;
}
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 0; int ret = 0;
@ -292,6 +320,8 @@ int stratifier(proc_instance_t *pi)
hex2bin(scriptsig_header_bin, scriptsig_header, 41); hex2bin(scriptsig_header_bin, scriptsig_header, 41);
address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress); address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress);
__bin2hex(pubkeytxn, pubkeytxnbin, 25); __bin2hex(pubkeytxn, pubkeytxnbin, 25);
create_pthread(&pth_blockupdate, blockupdate, ckp);
strat_loop(ckp, pi); strat_loop(ckp, pi);
return ret; return ret;
} }

Loading…
Cancel
Save