diff --git a/README b/README index 126cb2f6..ee0b7588 100644 --- a/README +++ b/README @@ -310,3 +310,6 @@ maximum. "maxclients" : Optional upper limit on the number of clients ckpool will accept before rejecting further clients. + +"zmqblock" : Optional interface to use for zmq blockhash notification. Requires +use of matched bitcoind -zmqpubhashblock option. Default: tcp://127.0.0.1:28332 diff --git a/ckpool.conf b/ckpool.conf index d4935da8..b7a82c01 100644 --- a/ckpool.conf +++ b/ckpool.conf @@ -35,6 +35,7 @@ "mindiff" : 1, "startdiff" : 42, "maxdiff" : 0, +"zmqblock" : "tcp://127.0.0.1:28332", "logdir" : "logs" } Comments from here on are ignored. diff --git a/configure.ac b/configure.ac index 2deaa69c..904f2a46 100644 --- a/configure.ac +++ b/configure.ac @@ -40,6 +40,7 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) +AC_CHECK_HEADERS(zmq.h) AC_CHECK_PROG(YASM, yasm, yes) AM_CONDITIONAL([HAVE_YASM], [test x$YASM = xyes]) @@ -82,6 +83,7 @@ AC_ARG_WITH([ckdb], AC_SEARCH_LIBS(clock_nanosleep, rt, , "Error: Required library rt not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) +AC_SEARCH_LIBS(zmq_socket, zmq, , "Error: Required library zmq not found." && exit 1) if test "x$ckdb" != "xno"; then AC_SEARCH_LIBS(PQdb, pq, , echo "Error: Required library pq diff --git a/src/ckpool.c b/src/ckpool.c index 791a462a..8b6363e3 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 Con Kolivas + * Copyright 2014-2020 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -1517,6 +1517,7 @@ static void parse_config(ckpool_t *ckp) arr_val = json_object_get(json_conf, "redirecturl"); if (arr_val) parse_redirecturls(ckp, arr_val); + json_get_string(&ckp->zmqblock, json_conf, "zmqblock"); json_decref(json_conf); } @@ -1847,6 +1848,8 @@ int main(int argc, char **argv) quit(0, "No proxy entries found in config file %s", ckp.config); if (ckp.redirector && !ckp.redirecturls) quit(0, "No redirect entries found in config file %s", ckp.config); + if (!ckp.zmqblock) + ckp.zmqblock = "tcp://127.0.0.1:28332"; /* Create the log directory */ trail_slash(&ckp.logdir); diff --git a/src/ckpool.h b/src/ckpool.h index 47e5612e..97d87bf5 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -187,6 +187,9 @@ struct ckpool_instance { bool stratifier_ready; bool connector_ready; + /* Name of protocol used for ZMQ block notifications */ + char *zmqblock; + /* Threads of main process */ pthread_t pth_listener; pthread_t pth_watchdog; diff --git a/src/libckpool.h b/src/libckpool.h index 6479bf99..e016ccef 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -80,15 +80,18 @@ #define __maybe_unused __attribute__((unused)) #define uninitialised_var(x) x = x +#ifndef MAX #define MAX(a,b) \ ({ __typeof__ (a) _a = (a); \ __typeof__ (b) _b = (b); \ _a > _b ? _a : _b; }) - +#endif +#ifndef MIN #define MIN(a,b) \ ({ __typeof__ (a) _a = (a); \ __typeof__ (b) _b = (b); \ _a < _b ? _a : _b; }) +#endif typedef unsigned char uchar; diff --git a/src/stratifier.c b/src/stratifier.c index cf97f653..f67d82e1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 Con Kolivas + * Copyright 2014-2020 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -19,6 +19,7 @@ #include #include #include +#include #include "ckpool.h" #include "libckpool.h" @@ -8648,10 +8649,77 @@ static void read_poolstats(ckpool_t *ckp, int *tvsec_diff) } } +static void *zmqnotify(void *arg) +{ + ckpool_t *ckp = arg; + sdata_t *sdata = ckp->sdata; + void *context, *notify; + int zero = 0, rc; + char *endpoint; + + rename_proc("zmqnotify"); + + context = zmq_ctx_new(); + notify = zmq_socket(context, ZMQ_SUB); + if (!notify) + quit(1, "zmq_socket failed with errno %d", errno); + rc = zmq_setsockopt(notify, ZMQ_SUBSCRIBE, "hashblock", 0); + if (rc < 0) + quit(1, "zmq_setsockopt failed with errno %d", errno); + rc = zmq_connect(notify, ckp->zmqblock); + if (rc < 0) + quit(1, "zmq_connect failed with errno %d", errno); + LOGNOTICE("ZMQ connected to %s", ckp->zmqblock); + + while (42) { + zmq_msg_t message; + + do { + char hexhash[68] = {}; + int size; + + zmq_msg_init(&message); + rc = zmq_msg_recv(&message, notify, 0); + if (unlikely(rc < 0)) { + LOGWARNING("zmq_msg_recv failed with error %d", errno); + sleep(5); + zmq_msg_close(&message); + continue; + } + + size = zmq_msg_size(&message); + switch (size) { + case 9: + LOGDEBUG("ZMQ hashblock message"); + break; + case 4: + LOGDEBUG("ZMQ sequence number"); + break; + case 32: + update_base(sdata, GEN_PRIORITY); + __bin2hex(hexhash, zmq_msg_data(&message), 32); + LOGNOTICE("ZMQ block hash %s", hexhash); + break; + default: + LOGWARNING("ZMQ message size error, size = %d!", size); + break; + } + zmq_msg_close(&message); + } while (zmq_msg_more(&message)); + + LOGDEBUG("ZMQ message complete"); + } + + zmq_close(notify); + zmq_ctx_destroy (context); + + return NULL; +} + void *stratifier(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; + pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat, pth_zmqnotify; int threads, tvsec_diff = 0; ckpool_t *ckp = pi->ckp; int64_t randomiser; @@ -8731,6 +8799,7 @@ void *stratifier(void *arg) create_pthread(&pth_statsupdate, statsupdate, ckp); mutex_init(&sdata->share_lock); + create_pthread(&pth_zmqnotify, zmqnotify, ckp); ckp->stratifier_ready = true; LOGWARNING("%s stratifier ready", ckp->name);