Browse Source

Implement support for zmqpubhashblock notifier.

master
ckolivas 5 years ago
parent
commit
f0016bdd65
  1. 3
      README
  2. 1
      ckpool.conf
  3. 2
      configure.ac
  4. 5
      src/ckpool.c
  5. 3
      src/ckpool.h
  6. 5
      src/libckpool.h
  7. 73
      src/stratifier.c

3
README

@ -310,3 +310,6 @@ maximum.
"maxclients" : Optional upper limit on the number of clients ckpool will "maxclients" : Optional upper limit on the number of clients ckpool will
accept before rejecting further clients. accept before rejecting further clients.
"zmqblock" : Optional interface to use for zmq blockhash notification. Requires
use of matched bitcoind -zmqpubhashblock option. Default: tcp://127.0.0.1:28332

1
ckpool.conf

@ -35,6 +35,7 @@
"mindiff" : 1, "mindiff" : 1,
"startdiff" : 42, "startdiff" : 42,
"maxdiff" : 0, "maxdiff" : 0,
"zmqblock" : "tcp://127.0.0.1:28332",
"logdir" : "logs" "logdir" : "logs"
} }
Comments from here on are ignored. Comments from here on are ignored.

2
configure.ac

@ -40,6 +40,7 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h)
AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h)
AC_CHECK_HEADERS(zmq.h)
AC_CHECK_PROG(YASM, yasm, yes) AC_CHECK_PROG(YASM, yasm, yes)
AM_CONDITIONAL([HAVE_YASM], [test x$YASM = xyes]) AM_CONDITIONAL([HAVE_YASM], [test x$YASM = xyes])
@ -82,6 +83,7 @@ AC_ARG_WITH([ckdb],
AC_SEARCH_LIBS(clock_nanosleep, rt, , "Error: Required library rt not found." && exit 1) AC_SEARCH_LIBS(clock_nanosleep, rt, , "Error: Required library rt not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1)
AC_SEARCH_LIBS(zmq_socket, zmq, , "Error: Required library zmq not found." && exit 1)
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_SEARCH_LIBS(PQdb, pq, , echo "Error: Required library pq AC_SEARCH_LIBS(PQdb, pq, , echo "Error: Required library pq

5
src/ckpool.c

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2018 Con Kolivas * Copyright 2014-2020 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -1517,6 +1517,7 @@ static void parse_config(ckpool_t *ckp)
arr_val = json_object_get(json_conf, "redirecturl"); arr_val = json_object_get(json_conf, "redirecturl");
if (arr_val) if (arr_val)
parse_redirecturls(ckp, arr_val); parse_redirecturls(ckp, arr_val);
json_get_string(&ckp->zmqblock, json_conf, "zmqblock");
json_decref(json_conf); json_decref(json_conf);
} }
@ -1847,6 +1848,8 @@ int main(int argc, char **argv)
quit(0, "No proxy entries found in config file %s", ckp.config); quit(0, "No proxy entries found in config file %s", ckp.config);
if (ckp.redirector && !ckp.redirecturls) if (ckp.redirector && !ckp.redirecturls)
quit(0, "No redirect entries found in config file %s", ckp.config); quit(0, "No redirect entries found in config file %s", ckp.config);
if (!ckp.zmqblock)
ckp.zmqblock = "tcp://127.0.0.1:28332";
/* Create the log directory */ /* Create the log directory */
trail_slash(&ckp.logdir); trail_slash(&ckp.logdir);

3
src/ckpool.h

@ -187,6 +187,9 @@ struct ckpool_instance {
bool stratifier_ready; bool stratifier_ready;
bool connector_ready; bool connector_ready;
/* Name of protocol used for ZMQ block notifications */
char *zmqblock;
/* Threads of main process */ /* Threads of main process */
pthread_t pth_listener; pthread_t pth_listener;
pthread_t pth_watchdog; pthread_t pth_watchdog;

5
src/libckpool.h

@ -80,15 +80,18 @@
#define __maybe_unused __attribute__((unused)) #define __maybe_unused __attribute__((unused))
#define uninitialised_var(x) x = x #define uninitialised_var(x) x = x
#ifndef MAX
#define MAX(a,b) \ #define MAX(a,b) \
({ __typeof__ (a) _a = (a); \ ({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \ __typeof__ (b) _b = (b); \
_a > _b ? _a : _b; }) _a > _b ? _a : _b; })
#endif
#ifndef MIN
#define MIN(a,b) \ #define MIN(a,b) \
({ __typeof__ (a) _a = (a); \ ({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \ __typeof__ (b) _b = (b); \
_a < _b ? _a : _b; }) _a < _b ? _a : _b; })
#endif
typedef unsigned char uchar; typedef unsigned char uchar;

73
src/stratifier.c

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2018 Con Kolivas * Copyright 2014-2020 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -19,6 +19,7 @@
#include <math.h> #include <math.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <zmq.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -8648,10 +8649,77 @@ static void read_poolstats(ckpool_t *ckp, int *tvsec_diff)
} }
} }
static void *zmqnotify(void *arg)
{
ckpool_t *ckp = arg;
sdata_t *sdata = ckp->sdata;
void *context, *notify;
int zero = 0, rc;
char *endpoint;
rename_proc("zmqnotify");
context = zmq_ctx_new();
notify = zmq_socket(context, ZMQ_SUB);
if (!notify)
quit(1, "zmq_socket failed with errno %d", errno);
rc = zmq_setsockopt(notify, ZMQ_SUBSCRIBE, "hashblock", 0);
if (rc < 0)
quit(1, "zmq_setsockopt failed with errno %d", errno);
rc = zmq_connect(notify, ckp->zmqblock);
if (rc < 0)
quit(1, "zmq_connect failed with errno %d", errno);
LOGNOTICE("ZMQ connected to %s", ckp->zmqblock);
while (42) {
zmq_msg_t message;
do {
char hexhash[68] = {};
int size;
zmq_msg_init(&message);
rc = zmq_msg_recv(&message, notify, 0);
if (unlikely(rc < 0)) {
LOGWARNING("zmq_msg_recv failed with error %d", errno);
sleep(5);
zmq_msg_close(&message);
continue;
}
size = zmq_msg_size(&message);
switch (size) {
case 9:
LOGDEBUG("ZMQ hashblock message");
break;
case 4:
LOGDEBUG("ZMQ sequence number");
break;
case 32:
update_base(sdata, GEN_PRIORITY);
__bin2hex(hexhash, zmq_msg_data(&message), 32);
LOGNOTICE("ZMQ block hash %s", hexhash);
break;
default:
LOGWARNING("ZMQ message size error, size = %d!", size);
break;
}
zmq_msg_close(&message);
} while (zmq_msg_more(&message));
LOGDEBUG("ZMQ message complete");
}
zmq_close(notify);
zmq_ctx_destroy (context);
return NULL;
}
void *stratifier(void *arg) void *stratifier(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat, pth_zmqnotify;
int threads, tvsec_diff = 0; int threads, tvsec_diff = 0;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int64_t randomiser; int64_t randomiser;
@ -8731,6 +8799,7 @@ void *stratifier(void *arg)
create_pthread(&pth_statsupdate, statsupdate, ckp); create_pthread(&pth_statsupdate, statsupdate, ckp);
mutex_init(&sdata->share_lock); mutex_init(&sdata->share_lock);
create_pthread(&pth_zmqnotify, zmqnotify, ckp);
ckp->stratifier_ready = true; ckp->stratifier_ready = true;
LOGWARNING("%s stratifier ready", ckp->name); LOGWARNING("%s stratifier ready", ckp->name);

Loading…
Cancel
Save