Browse Source

Merge branch 'proxydev'

master
Con Kolivas 9 years ago
parent
commit
7904a8f29c
  1. 31
      README
  2. 34
      cknode.conf
  3. 15
      ckpassthrough.conf
  4. 1
      ckproxy.conf
  5. 23
      ckredirector.conf
  6. 20
      configure.ac
  7. 4
      src/Makefile.am
  8. 243
      src/ckpool.c
  9. 76
      src/ckpool.h
  10. 300
      src/connector.c
  11. 2451
      src/generator.c
  12. 19
      src/libckpool.c
  13. 2
      src/libckpool.h
  14. 3422
      src/stratifier.c
  15. 2
      src/uthash.h

31
README

@ -134,8 +134,10 @@ ckpool supports the following options:
-n NAME | --name NAME -n NAME | --name NAME
-P | --passthrough -P | --passthrough
-p | --proxy -p | --proxy
-R | --redirector
-S CKDB-SOCKDIR | --ckdb-sockdir CKDB-SOCKDIR -S CKDB-SOCKDIR | --ckdb-sockdir CKDB-SOCKDIR
-s SOCKDIR | --sockdir SOCKDIR -s SOCKDIR | --sockdir SOCKDIR
-u | --userproxy
-A Standalone mode tells ckpool not to try to communicate with ckdb or log any -A Standalone mode tells ckpool not to try to communicate with ckdb or log any
@ -144,8 +146,9 @@ are automatically accepted without any attempt to authorise users in any way.
This option is explicitly enabled when built without ckdb support. This option is explicitly enabled when built without ckdb support.
-c <CONFIG> tells ckpool to override its default configuration filename and -c <CONFIG> tells ckpool to override its default configuration filename and
load the specified one. If -c is not specified, ckpool looks for ckpool.conf load the specified one. If -c is not specified, ckpool looks for ckpool.conf,
whereas in proxy or passthrough modes it will look for ckproxy.conf in proxy mode it looks for ckproxy.conf, in passthrough mode for
ckpassthrough.conf and in redirector mode for ckredirector.conf
-d <CKDB-NAME> tells ckpool what the name of the ckdb process is that it should -d <CKDB-NAME> tells ckpool what the name of the ckdb process is that it should
speak to, otherwise it will look for ckdb. speak to, otherwise it will look for ckdb.
@ -169,8 +172,14 @@ and then workbase.
-l <LOGLEVEL will change the log level to that specified. Default is 5 and -l <LOGLEVEL will change the log level to that specified. Default is 5 and
maximum debug is level 7. maximum debug is level 7.
-N will start ckpool in passthrough node mode where it behaves like a
passthrough but requires a locally running bitcoind and can submit blocks
itself in addition to passing the shares back to the upstream pool. It also
monitors hashrate and requires more resources than a simple passthrough.
-n <NAME> will change the ckpool process name to that specified, allowing -n <NAME> will change the ckpool process name to that specified, allowing
multiple different named instances to be running. multiple different named instances to be running. By default the variant
names are used: ckpool, ckproxy, ckpassthrough, ckredirector, cknode.
-P will start ckpool in passthrough proxy mode where it collates all incoming -P will start ckpool in passthrough proxy mode where it collates all incoming
connections and streams all information on a single connection to an upstream connections and streams all information on a single connection to an upstream
@ -182,6 +191,13 @@ clients as separate entities while presenting shares as a single user to the
upstream pool specified. Note that the upstream pool needs to be a ckpool for upstream pool specified. Note that the upstream pool needs to be a ckpool for
it to scale to large hashrates. Standalone mode is Optional. it to scale to large hashrates. Standalone mode is Optional.
-R will start ckpool in a variant of passthrough mode. It is designed to be a
front end to filter out users that never contribute any shares. Once an
accepted share from the upstream pool is detected, it will issue a redirect to
one of the redirecturl entries in the configuration file. It will cycle over
entries if multiple exist, but try to keep all clients from the same IP
redirecting to the same pool.
-S <CKDB-SOCKDIR> tells ckpool which directory to look for the ckdb socket to -S <CKDB-SOCKDIR> tells ckpool which directory to look for the ckdb socket to
talk to. talk to.
This option does not exist when built without ckdb support. This option does not exist when built without ckdb support.
@ -189,6 +205,12 @@ This option does not exist when built without ckdb support.
-s <SOCKDIR> tells ckpool which directory to place its own communication -s <SOCKDIR> tells ckpool which directory to place its own communication
sockets (/tmp by default) sockets (/tmp by default)
-u Userproxy mode will start ckpool in proxy mode as per the -p option above,
but in addition it will accept username/passwords from the stratum connects
and try to open additional connections with those credentials to the upstream
pool specified in the configuration file and then reconnect miners to mine with
their chosen username/password to the upstream pool.
ckdb takes the following options: ckdb takes the following options:
@ -259,6 +281,9 @@ and 3334 in proxy mode. Multiple entries can be specified as an array by
either IP or resolvable domain name but the executable must be able to bind to either IP or resolvable domain name but the executable must be able to bind to
all of them and ports up to 1024 usually require privileged access. all of them and ports up to 1024 usually require privileged access.
"redirecturl" : This is an array of URLs that ckpool will redirect active
miners to in redirector mode. They must be valid resolvable URLs+ports.
"mindiff" : Minimum diff that vardiff will allow miners to drop to. Default 1 "mindiff" : Minimum diff that vardiff will allow miners to drop to. Default 1
"startdiff" : Starting diff that new clients are given. Default 42 "startdiff" : Starting diff that new clients are given. Default 42

34
cknode.conf

@ -0,0 +1,34 @@
{
"btcd" : [
{
"url" : "localhost:8332",
"auth" : "user",
"pass" : "pass",
"notify" : true
},
{
"url" : "backup:8332",
"auth" : "user",
"pass" : "pass",
"notify" : false
}
],
"proxy" : [
{
"url" : "ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
},
{
"url" : "backup.ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
}
],
"serverurl" : [
"192.168.1.100:3334",
"127.0.0.1:3334"
],
"logdir" : "logs"
}
Comments from here on are ignored.

15
ckpassthrough.conf

@ -0,0 +1,15 @@
{
"proxy" : [
{
"url" : "ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
}
],
"serverurl" : [
"192.168.1.100:3334",
"127.0.0.1:3334"
],
"logdir" : "logs"
}
Comments from here on are ignored.

1
ckproxy.conf

@ -19,7 +19,6 @@
"mindiff" : 1, "mindiff" : 1,
"startdiff" : 42, "startdiff" : 42,
"maxdiff" : 0, "maxdiff" : 0,
"clientsvspeed" : false,
"logdir" : "logs" "logdir" : "logs"
} }
Comments from here on are ignored. Comments from here on are ignored.

23
ckredirector.conf

@ -0,0 +1,23 @@
{
"proxy" : [
{
"url" : "ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
}
],
"update_interval" : 30,
"serverurl" : [
"192.168.1.100:3334",
"127.0.0.1:3334"
],
"redirecturl" : [
"node1.ckpool.org:3333",
"node2.ckpool.org:3333"
],
"mindiff" : 1,
"startdiff" : 42,
"maxdiff" : 0,
"logdir" : "logs"
}
Comments from here on are ignored.

20
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.8.8, kernel@kolivas.org) AC_INIT(ckpool, 0.9.0, kernel@kolivas.org)
AC_CANONICAL_SYSTEM AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_MACRO_DIR([m4])
@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h)
AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h)
AC_CHECK_HEADERS(zlib.h)
PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm"
RT_LIBS="-lrt"
AC_CONFIG_SUBDIRS([src/jansson-2.6]) AC_CONFIG_SUBDIRS([src/jansson-2.6])
JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a"
AC_SUBST(PTHREAD_LIBS)
AC_SUBST(MATH_LIBS)
AC_SUBST(RT_LIBS)
AC_SUBST(JANSSON_LIBS) AC_SUBST(JANSSON_LIBS)
AC_ARG_WITH([ckdb], AC_ARG_WITH([ckdb],
@ -58,6 +52,10 @@ AC_ARG_WITH([ckdb],
[ckdb=$withval] [ckdb=$withval]
) )
#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1)
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev
not found. Install it or disable postgresql support with --without-ckdb" && exit 1) not found. Install it or disable postgresql support with --without-ckdb" && exit 1)
@ -84,8 +82,10 @@ echo "Compilation............: make (or gmake)"
echo " CPPFLAGS.............: $CPPFLAGS" echo " CPPFLAGS.............: $CPPFLAGS"
echo " CFLAGS...............: $CFLAGS" echo " CFLAGS...............: $CFLAGS"
echo " LDFLAGS..............: $LDFLAGS" echo " LDFLAGS..............: $LDFLAGS"
echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" echo " LDADD................: $LIBS $JANSSON_LIBS"
echo " db LDADD.............: $DB_LIBS" if test "x$ckdb" != "xno"; then
echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS"
fi
echo echo
echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')"
echo " prefix...............: $prefix" echo " prefix...............: $prefix"

4
src/Makefile.am

@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src
lib_LTLIBRARIES = libckpool.la lib_LTLIBRARIES = libckpool.la
libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h
libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ libckpool_la_LIBADD = @LIBS@
bin_PROGRAMS = ckpool ckpmsg notifier bin_PROGRAMS = ckpool ckpmsg notifier
ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \
@ -23,5 +23,5 @@ if WANT_CKDB
bin_PROGRAMS += ckdb bin_PROGRAMS += ckdb
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \
ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h
ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@
endif endif

243
src/ckpool.c

@ -20,6 +20,7 @@
#include <getopt.h> #include <getopt.h>
#include <grp.h> #include <grp.h>
#include <jansson.h> #include <jansson.h>
#include <zlib.h>
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -375,6 +376,17 @@ out:
return ret; return ret;
} }
static void api_message(ckpool_t *ckp, char **buf, int *sockd)
{
apimsg_t *apimsg = ckalloc(sizeof(apimsg_t));
apimsg->buf = *buf;
*buf = NULL;
apimsg->sockd = *sockd;
*sockd = -1;
ckmsgq_add(ckp->ckpapi, apimsg);
}
/* Listen for incoming global requests. Always returns a response if possible */ /* Listen for incoming global requests. Always returns a response if possible */
static void *listener(void *arg) static void *listener(void *arg)
{ {
@ -397,6 +409,9 @@ retry:
if (!buf) { if (!buf) {
LOGWARNING("Failed to get message in listener"); LOGWARNING("Failed to get message in listener");
send_unix_msg(sockd, "failed"); send_unix_msg(sockd, "failed");
} else if (buf[0] == '{') {
/* Any JSON messages received are for the RPC API to handle */
api_message(ckp, &buf, &sockd);
} else if (cmdmatch(buf, "shutdown")) { } else if (cmdmatch(buf, "shutdown")) {
LOGWARNING("Listener received shutdown message, terminating ckpool"); LOGWARNING("Listener received shutdown message, terminating ckpool");
send_unix_msg(sockd, "exiting"); send_unix_msg(sockd, "exiting");
@ -492,9 +507,49 @@ bool ping_main(ckpool_t *ckp)
void empty_buffer(connsock_t *cs) void empty_buffer(connsock_t *cs)
{ {
if (cs->buf)
cs->buf[0] = '\0';
cs->buflen = cs->bufofs = 0; cs->buflen = cs->bufofs = 0;
} }
static void clear_bufline(connsock_t *cs)
{
if (unlikely(!cs->buf))
cs->buf = ckzalloc(PAGESIZE);
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
} else
cs->bufofs = 0;
}
static void add_bufline(connsock_t *cs, const char *readbuf, const int len)
{
int backoff = 1;
size_t buflen;
char *newbuf;
buflen = round_up_page(cs->bufofs + len + 1);
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, len);
cs->bufofs += len;
cs->buf[cs->bufofs] = '\0';
}
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0' /* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning * and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
@ -502,86 +557,54 @@ int read_socket_line(connsock_t *cs, float *timeout)
{ {
char *eom = NULL; char *eom = NULL;
tv_t start, now; tv_t start, now;
size_t buflen;
int ret = -1; int ret = -1;
bool polled;
float diff; float diff;
if (unlikely(cs->fd < 0)) if (unlikely(cs->fd < 0))
goto out; goto out;
if (unlikely(!cs->buf)) clear_bufline(cs);
cs->buf = ckzalloc(PAGESIZE);
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
}
tv_time(&start); tv_time(&start);
rewait:
while (!eom) {
char readbuf[PAGESIZE];
if (*timeout < 0) { if (*timeout < 0) {
LOGDEBUG("Timed out in read_socket_line"); if (cs->ckp->proxy)
LOGINFO("Timed out in read_socket_line");
else
LOGERR("Timed out in read_socket_line");
ret = 0; ret = 0;
goto out; goto out;
} }
ret = wait_read_select(cs->fd, eom ? 0 : *timeout); ret = wait_read_select(cs->fd, *timeout);
polled = true;
if (ret < 1) { if (ret < 1) {
if (!ret) { if (cs->ckp->proxy)
if (eom) LOGINFO("Select %s in read_socket_line", !ret ? "timed out" : "failed");
goto parse; else
LOGDEBUG("Select timed out in read_socket_line"); LOGERR("Select %s in read_socket_line", !ret ? "timed out" : "failed");
} else
LOGERR("Select failed in read_socket_line");
goto out; goto out;
} }
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
while (42) {
char readbuf[PAGESIZE] = {};
int backoff = 1;
char *newbuf;
ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
/* No more to read or closed socket after valid message */ /* If we have done wait_read_select there should be
if (eom)
break;
/* Have we used up all the timeout yet? If polled is
* set that means poll has said there should be
* something to read and if we get nothing it means the * something to read and if we get nothing it means the
* socket is closed. */ * socket is closed. */
if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (cs->ckp->proxy)
goto rewait; LOGINFO("Failed to recv in read_socket_line");
else
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
goto out; goto out;
} }
polled = false; add_bufline(cs, readbuf, ret);
buflen = cs->bufofs + ret + 1;
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, ret);
cs->bufofs += ret;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
} }
parse:
ret = eom - cs->buf; ret = eom - cs->buf;
cs->buflen = cs->buf + cs->bufofs - eom - 1; cs->buflen = cs->buf + cs->bufofs - eom - 1;
@ -1126,7 +1149,7 @@ bool json_get_int64(int64_t *store, const json_t *val, const char *res)
goto out; goto out;
} }
if (!json_is_integer(entry)) { if (!json_is_integer(entry)) {
LOGWARNING("Json entry %s is not an integer", res); LOGINFO("Json entry %s is not an integer", res);
goto out; goto out;
} }
*store = json_integer_value(entry); *store = json_integer_value(entry);
@ -1176,7 +1199,7 @@ out:
return ret; return ret;
} }
static bool json_get_bool(bool *store, const json_t *val, const char *res) bool json_get_bool(bool *store, const json_t *val, const char *res)
{ {
json_t *entry = json_object_get(val, res); json_t *entry = json_object_get(val, res);
bool ret = false; bool ret = false;
@ -1186,7 +1209,7 @@ static bool json_get_bool(bool *store, const json_t *val, const char *res)
goto out; goto out;
} }
if (!json_is_boolean(entry)) { if (!json_is_boolean(entry)) {
LOGWARNING("Json entry %s is not a boolean", res); LOGINFO("Json entry %s is not a boolean", res);
goto out; goto out;
} }
*store = json_is_true(entry); *store = json_is_true(entry);
@ -1196,6 +1219,26 @@ out:
return ret; return ret;
} }
bool json_getdel_int(int *store, json_t *val, const char *res)
{
bool ret;
ret = json_get_int(store, val, res);
if (ret)
json_object_del(val, res);
return ret;
}
bool json_getdel_int64(int64_t *store, json_t *val, const char *res)
{
bool ret;
ret = json_get_int64(store, val, res);
if (ret)
json_object_del(val, res);
return ret;
}
static void parse_btcds(ckpool_t *ckp, const json_t *arr_val, const int arr_size) static void parse_btcds(ckpool_t *ckp, const json_t *arr_val, const int arr_size)
{ {
json_t *val; json_t *val;
@ -1261,6 +1304,43 @@ out:
return ret; return ret;
} }
static bool parse_redirecturls(ckpool_t *ckp, const json_t *arr_val)
{
bool ret = false;
int arr_size, i;
char *redirecturl, url[INET6_ADDRSTRLEN], port[8];
redirecturl = alloca(INET6_ADDRSTRLEN);
if (!arr_val)
goto out;
if (!json_is_array(arr_val)) {
LOGNOTICE("Unable to parse redirecturl entries as an array");
goto out;
}
arr_size = json_array_size(arr_val);
if (!arr_size) {
LOGWARNING("redirecturl array empty");
goto out;
}
ckp->redirecturls = arr_size;
ckp->redirecturl = ckalloc(sizeof(char *) * arr_size);
ckp->redirectport = ckalloc(sizeof(char *) * arr_size);
for (i = 0; i < arr_size; i++) {
json_t *val = json_array_get(arr_val, i);
strncpy(redirecturl, json_string_value(val), INET6_ADDRSTRLEN - 1);
/* See that the url properly resolves */
if (!url_from_serverurl(redirecturl, url, port))
quit(1, "Invalid redirecturl entry %d %s", i, redirecturl);
ckp->redirecturl[i] = strdup(strsep(&redirecturl, ":"));
ckp->redirectport[i] = strdup(port);
}
ret = true;
out:
return ret;
}
static void parse_config(ckpool_t *ckp) static void parse_config(ckpool_t *ckp)
{ {
json_t *json_conf, *arr_val; json_t *json_conf, *arr_val;
@ -1310,7 +1390,10 @@ static void parse_config(ckpool_t *ckp)
if (arr_size) if (arr_size)
parse_proxies(ckp, arr_val, arr_size); parse_proxies(ckp, arr_val, arr_size);
} }
json_get_bool(&ckp->clientsvspeed, json_conf, "clientsvspeed"); arr_val = json_object_get(json_conf, "redirecturl");
if (arr_val)
parse_redirecturls(ckp, arr_val);
json_decref(json_conf); json_decref(json_conf);
} }
@ -1436,10 +1519,13 @@ static struct option long_options[] = {
{"log-shares", no_argument, 0, 'L'}, {"log-shares", no_argument, 0, 'L'},
{"loglevel", required_argument, 0, 'l'}, {"loglevel", required_argument, 0, 'l'},
{"name", required_argument, 0, 'n'}, {"name", required_argument, 0, 'n'},
{"node", no_argument, 0, 'N'},
{"passthrough", no_argument, 0, 'P'}, {"passthrough", no_argument, 0, 'P'},
{"proxy", no_argument, 0, 'p'}, {"proxy", no_argument, 0, 'p'},
{"redirector", no_argument, 0, 'R'},
{"ckdb-sockdir",required_argument, 0, 'S'}, {"ckdb-sockdir",required_argument, 0, 'S'},
{"sockdir", required_argument, 0, 's'}, {"sockdir", required_argument, 0, 's'},
{"userproxy", no_argument, 0, 'u'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
#else #else
@ -1453,9 +1539,12 @@ static struct option long_options[] = {
{"log-shares", no_argument, 0, 'L'}, {"log-shares", no_argument, 0, 'L'},
{"loglevel", required_argument, 0, 'l'}, {"loglevel", required_argument, 0, 'l'},
{"name", required_argument, 0, 'n'}, {"name", required_argument, 0, 'n'},
{"node", no_argument, 0, 'N'},
{"passthrough", no_argument, 0, 'P'}, {"passthrough", no_argument, 0, 'P'},
{"proxy", no_argument, 0, 'p'}, {"proxy", no_argument, 0, 'p'},
{"redirector", no_argument, 0, 'R'},
{"sockdir", required_argument, 0, 's'}, {"sockdir", required_argument, 0, 's'},
{"userproxy", no_argument, 0, 'u'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
#endif #endif
@ -1499,7 +1588,7 @@ int main(int argc, char **argv)
ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = strdup(argv[ckp.args]);
ckp.initial_args[ckp.args] = NULL; ckp.initial_args[ckp.args] = NULL;
while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpS:s:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:Nn:PpRS:s:u", long_options, &i)) != -1) {
switch (c) { switch (c) {
case 'A': case 'A':
ckp.standalone = true; ckp.standalone = true;
@ -1550,30 +1639,49 @@ int main(int argc, char **argv)
LOG_EMERG, LOG_DEBUG, ckp.loglevel); LOG_EMERG, LOG_DEBUG, ckp.loglevel);
} }
break; break;
case 'N':
if (ckp.proxy || ckp.redirector || ckp.userproxy || ckp.passthrough)
quit(1, "Cannot set another proxy type or redirector and node mode");
ckp.standalone = ckp.proxy = ckp.passthrough = ckp.node = true;
break;
case 'n': case 'n':
ckp.name = optarg; ckp.name = optarg;
break; break;
case 'P': case 'P':
if (ckp.proxy) if (ckp.proxy || ckp.redirector || ckp.userproxy || ckp.node)
quit(1, "Cannot set both proxy and passthrough mode"); quit(1, "Cannot set another proxy type or redirector and passthrough mode");
ckp.standalone = ckp.proxy = ckp.passthrough = true; ckp.standalone = ckp.proxy = ckp.passthrough = true;
break; break;
case 'p': case 'p':
if (ckp.passthrough) if (ckp.passthrough || ckp.redirector || ckp.userproxy || ckp.node)
quit(1, "Cannot set both passthrough and proxy mode"); quit(1, "Cannot set another proxy type or redirector and proxy mode");
ckp.proxy = true; ckp.proxy = true;
break; break;
case 'R':
if (ckp.proxy || ckp.passthrough || ckp.userproxy || ckp.node)
quit(1, "Cannot set a proxy type or passthrough and redirector modes");
ckp.standalone = ckp.proxy = ckp.passthrough = ckp.redirector = true;
break;
case 'S': case 'S':
ckp.ckdb_sockdir = strdup(optarg); ckp.ckdb_sockdir = strdup(optarg);
break; break;
case 's': case 's':
ckp.socket_dir = strdup(optarg); ckp.socket_dir = strdup(optarg);
break; break;
case 'u':
if (ckp.proxy || ckp.redirector || ckp.passthrough || ckp.node)
quit(1, "Cannot set both userproxy and another proxy type or redirector");
ckp.userproxy = ckp.proxy = true;
break;
} }
} }
if (!ckp.name) { if (!ckp.name) {
if (ckp.passthrough) if (ckp.node)
ckp.name = "cknode";
else if (ckp.redirector)
ckp.name = "ckredirector";
else if (ckp.passthrough)
ckp.name = "ckpassthrough"; ckp.name = "ckpassthrough";
else if (ckp.proxy) else if (ckp.proxy)
ckp.name = "ckproxy"; ckp.name = "ckproxy";
@ -1629,7 +1737,7 @@ int main(int argc, char **argv)
parse_config(&ckp); parse_config(&ckp);
/* Set defaults if not found in config file */ /* Set defaults if not found in config file */
if (!ckp.btcds && !ckp.proxy) { if (!ckp.btcds) {
ckp.btcds = 1; ckp.btcds = 1;
ckp.btcdurl = ckzalloc(sizeof(char *)); ckp.btcdurl = ckzalloc(sizeof(char *));
ckp.btcdauth = ckzalloc(sizeof(char *)); ckp.btcdauth = ckzalloc(sizeof(char *));
@ -1672,6 +1780,8 @@ int main(int argc, char **argv)
ckp.serverurl = ckzalloc(sizeof(char *)); ckp.serverurl = ckzalloc(sizeof(char *));
if (ckp.proxy && !ckp.proxies) if (ckp.proxy && !ckp.proxies)
quit(0, "No proxy entries found in config file %s", ckp.config); quit(0, "No proxy entries found in config file %s", ckp.config);
if (ckp.redirector && !ckp.redirecturls)
quit(0, "No redirect entries found in config file %s", ckp.config);
/* Create the log directory */ /* Create the log directory */
trail_slash(&ckp.logdir); trail_slash(&ckp.logdir);
@ -1763,6 +1873,7 @@ int main(int argc, char **argv)
launch_logger(&ckp.main); launch_logger(&ckp.main);
ckp.logfd = fileno(ckp.logfp); ckp.logfd = fileno(ckp.logfp);
ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api);
create_pthread(&ckp.pth_listener, listener, &ckp.main); create_pthread(&ckp.pth_listener, listener, &ckp.main);
/* Launch separate processes from here */ /* Launch separate processes from here */

76
src/ckpool.h

@ -79,6 +79,7 @@ struct connsock {
char *buf; char *buf;
int bufofs; int bufofs;
int buflen; int buflen;
ckpool_t *ckp;
/* Semaphore used to serialise request/responses */ /* Semaphore used to serialise request/responses */
sem_t sem; sem_t sem;
}; };
@ -160,6 +161,9 @@ struct ckpool_instance {
/* How many clients maximum to accept before rejecting further */ /* How many clients maximum to accept before rejecting further */
int maxclients; int maxclients;
/* API message queue */
ckmsgq_t *ckpapi;
/* Logger message queue NOTE: Unique per process */ /* Logger message queue NOTE: Unique per process */
ckmsgq_t *logger; ckmsgq_t *logger;
/* Process instance data of parent/child processes */ /* Process instance data of parent/child processes */
@ -176,18 +180,24 @@ struct ckpool_instance {
pthread_t pth_listener; pthread_t pth_listener;
pthread_t pth_watchdog; pthread_t pth_watchdog;
/* Are we running in node proxy mode */
bool node;
/* Are we running in passthrough mode */ /* Are we running in passthrough mode */
bool passthrough; bool passthrough;
/* Are we a redirecting passthrough */
bool redirector;
/* Are we running as a proxy */ /* Are we running as a proxy */
bool proxy; bool proxy;
/* Do we prefer more proxy clients over support for >5TH clients */
bool clientsvspeed;
/* Are we running without ckdb */ /* Are we running without ckdb */
bool standalone; bool standalone;
/* Are we running in userproxy mode */
bool userproxy;
/* Should we daemonise the ckpool process */ /* Should we daemonise the ckpool process */
bool daemon; bool daemon;
@ -224,10 +234,53 @@ struct ckpool_instance {
char **proxyauth; char **proxyauth;
char **proxypass; char **proxypass;
/* Passthrough redirect options */
int redirecturls;
char **redirecturl;
char **redirectport;
/* Private data for each process */ /* Private data for each process */
void *data; void *data;
}; };
enum stratum_msgtype {
SM_RECONNECT = 0,
SM_DIFF,
SM_MSG,
SM_UPDATE,
SM_ERROR,
SM_SUBSCRIBE,
SM_SUBSCRIBERESULT,
SM_SHARE,
SM_SHARERESULT,
SM_AUTH,
SM_AUTHRESULT,
SM_TXNS,
SM_TXNSRESULT,
SM_PING,
SM_WORKINFO,
SM_NONE
};
static const char __maybe_unused *stratum_msgs[] = {
"reconnect",
"diff",
"message",
"update",
"error",
"subscribe",
"subscribe.result",
"share",
"share.result",
"auth",
"auth.result",
"txns",
"txns.result",
"ping",
"workinfo",
""
};
#ifdef USE_CKDB #ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else #else
@ -267,5 +320,22 @@ bool json_get_string(char **store, const json_t *val, const char *res);
bool json_get_int64(int64_t *store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res);
bool json_get_int(int *store, const json_t *val, const char *res); bool json_get_int(int *store, const json_t *val, const char *res);
bool json_get_double(double *store, const json_t *val, const char *res); bool json_get_double(double *store, const json_t *val, const char *res);
bool json_get_bool(bool *store, const json_t *val, const char *res);
bool json_getdel_int(int *store, json_t *val, const char *res);
bool json_getdel_int64(int64_t *store, json_t *val, const char *res);
/* API Placeholders for future API implementation */
typedef struct apimsg apimsg_t;
struct apimsg {
char *buf;
int sockd;
};
static inline void ckpool_api(ckpool_t __maybe_unused *ckp, apimsg_t __maybe_unused *apimsg) {};
static inline json_t *json_encode_errormsg(json_error_t __maybe_unused *err_val) { return NULL; };
static inline json_t *json_errormsg(const char __maybe_unused *fmt, ...) { return NULL; };
static inline void send_api_response(json_t __maybe_unused *val, const int __maybe_unused sockd) {};
#endif /* CKPOOL_H */ #endif /* CKPOOL_H */

300
src/connector.c

@ -15,6 +15,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <zlib.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -25,6 +26,8 @@
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
typedef struct sender_send sender_send_t; typedef struct sender_send sender_send_t;
typedef struct share share_t;
typedef struct redirect redirect_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
@ -53,12 +56,20 @@ struct client_instance {
int server; int server;
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; unsigned long bufofs;
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
sender_send_t *sending; sender_send_t *sending;
/* Is this the parent passthrough client */
bool passthrough; bool passthrough;
/* Linked list of shares in redirector mode.*/
share_t *shares;
/* Has this client already been told to redirect */
bool redirected;
/* Time this client started blocking, 0 when not blocked */ /* Time this client started blocking, 0 when not blocked */
time_t blocked_time; time_t blocked_time;
}; };
@ -73,6 +84,21 @@ struct sender_send {
int ofs; int ofs;
}; };
struct share {
share_t *next;
share_t *prev;
time_t submitted;
int64_t id;
};
struct redirect {
UT_hash_handle hh;
char address_name[INET6_ADDRSTRLEN];
int id;
int redirect_no;
};
/* Private data for the connector */ /* Private data for the connector */
struct connector_data { struct connector_data {
ckpool_t *ckp; ckpool_t *ckp;
@ -115,6 +141,11 @@ struct connector_data {
/* For protecting the pending sends list */ /* For protecting the pending sends list */
mutex_t sender_lock; mutex_t sender_lock;
pthread_cond_t sender_cond; pthread_cond_t sender_cond;
/* Hash list of all redirected IP address in redirector mode */
redirect_t *redirects;
/* What redirect we're currently up to */
int redirect;
}; };
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
@ -125,6 +156,13 @@ static void __inc_instance_ref(client_instance_t *client)
client->ref++; client->ref++;
} }
static void inc_instance_ref(cdata_t *cdata, client_instance_t *client)
{
ck_wlock(&cdata->lock);
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
}
/* Increase the reference count of instance */ /* Increase the reference count of instance */
static void __dec_instance_ref(client_instance_t *client) static void __dec_instance_ref(client_instance_t *client)
{ {
@ -261,25 +299,34 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
return 1; return 1;
} }
/* Client must hold a reference count */ static int __drop_client(cdata_t *cdata, client_instance_t *client)
static int drop_client(cdata_t *cdata, client_instance_t *client)
{ {
int64_t client_id = 0; int ret = -1;
int fd = -1;
ck_wlock(&cdata->lock); if (client->invalid)
if (!client->invalid) { goto out;
client->invalid = true; client->invalid = true;
client_id = client->id; ret = client->fd;
fd = client->fd; Close(client->fd);
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, ret, NULL);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the /* This is the reference to this client's presence in the
* epoll list. */ * epoll list. */
__dec_instance_ref(client); __dec_instance_ref(client);
cdata->dead_generated++; cdata->dead_generated++;
out:
return ret;
} }
/* Client must hold a reference count */
static int drop_client(cdata_t *cdata, client_instance_t *client)
{
int64_t client_id = client->id;
int fd = -1;
ck_wlock(&cdata->lock);
fd = __drop_client(cdata, client);
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
if (fd > -1) if (fd > -1)
@ -297,7 +344,7 @@ static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client
JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address",
client->address_name, "server", client->server, "method", "mining.term", client->address_name, "server", client->server, "method", "mining.term",
"params"); "params");
s = json_dumps(val, 0); s = json_dumps(val, JSON_COMPACT);
json_decref(val); json_decref(val);
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
free(s); free(s);
@ -351,8 +398,60 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
return ret; return ret;
} }
static void drop_all_clients(cdata_t *cdata)
{
client_instance_t *client, *tmp;
ck_wlock(&cdata->lock);
HASH_ITER(hh, cdata->clients, client, tmp) {
__drop_client(cdata, client);
}
ck_wunlock(&cdata->lock);
}
static void send_client(cdata_t *cdata, int64_t id, char *buf); static void send_client(cdata_t *cdata, int64_t id, char *buf);
/* Look for shares being submitted via a redirector and add them to a linked
* list for looking up the responses. */
static void parse_redirector_share(client_instance_t *client, const char *msg, const json_t *val)
{
share_t *share, *tmp;
time_t now;
int64_t id;
if (!json_get_int64(&id, val, "id")) {
LOGNOTICE("Failed to find redirector share id");
return;
}
/* If this is not a share, delete any matching ID messages so we
* don't falsely assume the client has had an accepted share based on
* a true result to a different message. */
if (!strstr(msg, "mining.submit")) {
LOGDEBUG("Redirector client %"PRId64" non share message: %s", client->id, msg);
DL_FOREACH_SAFE(client->shares, share, tmp) {
if (share->id == id) {
DL_DELETE(client->shares, share);
dealloc(share);
}
}
return;
}
share = ckzalloc(sizeof(share_t));
now = time(NULL);
share->submitted = now;
share->id = id;
DL_APPEND(client->shares, share);
LOGINFO("Redirector adding client %"PRId64" share id: %"PRId64, client->id, id);
/* Age old shares. */
DL_FOREACH_SAFE(client->shares, share, tmp) {
if (now > share->submitted + 120) {
DL_DELETE(client->shares, share);
dealloc(share);
}
}
}
/* Client is holding a reference count from being on the epoll list */ /* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{ {
@ -374,7 +473,7 @@ retry:
if (ret < 1) { if (ret < 1) {
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return; return;
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s",
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
@ -392,6 +491,7 @@ reparse:
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
msg[buflen] = '\0'; msg[buflen] = '\0';
client->bufofs -= buflen; client->bufofs -= buflen;
@ -405,29 +505,31 @@ reparse:
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} else { } else {
int64_t passthrough_id;
char *s; char *s;
if (client->passthrough) { if (client->passthrough) {
passthrough_id = json_integer_value(json_object_get(val, "client_id")); int64_t passthrough_id;
json_object_del(val, "client_id");
json_getdel_int64(&passthrough_id, val, "client_id");
passthrough_id = (client->id << 32) | passthrough_id; passthrough_id = (client->id << 32) | passthrough_id;
json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id));
} else { } else {
if (ckp->redirector && !client->redirected && strstr(msg, "mining.submit"))
parse_redirector_share(client, msg, val);
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "address", json_string(client->address_name));
} }
json_object_set_new_nocheck(val, "server", json_integer(client->server)); json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, 0); s = json_dumps(val, JSON_COMPACT);
/* Do not send messages of clients we've already dropped. We /* Do not send messages of clients we've already dropped. We
* do this unlocked as the occasional false negative can be * do this unlocked as the occasional false negative can be
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(!client->invalid)) { if (likely(!client->invalid)) {
if (!ckp->passthrough || ckp->node)
send_proc(ckp->stratifier, s);
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
else
send_proc(ckp->stratifier, s);
} }
free(s); free(s);
@ -477,7 +579,7 @@ void *receiver(void *arg)
for (i = 0; i < serverfds; i++) { for (i = 0; i < serverfds; i++) {
/* The small values will be less than the first client ids */ /* The small values will be less than the first client ids */
event.data.u64 = i; event.data.u64 = i;
event.events = EPOLLIN; event.events = EPOLLIN | EPOLLRDHUP;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
if (ret < 0) { if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
@ -660,6 +762,115 @@ static void *sender(void *arg)
return NULL; return NULL;
} }
static int add_redirect(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{
redirect_t *redirect;
bool found;
ck_wlock(&cdata->lock);
HASH_FIND_STR(cdata->redirects, client->address_name, redirect);
if (!redirect) {
redirect = ckzalloc(sizeof(redirect_t));
strcpy(redirect->address_name, client->address_name);
redirect->redirect_no = cdata->redirect++;
if (cdata->redirect >= ckp->redirecturls)
cdata->redirect = 0;
HASH_ADD_STR(cdata->redirects, address_name, redirect);
found = false;
} else
found = true;
ck_wunlock(&cdata->lock);
LOGNOTICE("Redirecting client %"PRId64" from %s IP %s to redirecturl %d",
client->id, found ? "matching" : "new", client->address_name, redirect->redirect_no);
return redirect->redirect_no;
}
static void redirect_client(ckpool_t *ckp, client_instance_t *client)
{
sender_send_t *sender_send;
cdata_t *cdata = ckp->data;
json_t *val;
char *buf;
int num;
/* Set the redirected boool to only try redirecting them once */
client->redirected = true;
num = add_redirect(ckp, cdata, client);
JSON_CPACK(val, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect",
"params", ckp->redirecturl[num], ckp->redirectport[num], 0);
buf = json_dumps(val, JSON_EOL | JSON_COMPACT);
json_decref(val);
sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client;
sender_send->buf = buf;
sender_send->len = strlen(buf);
inc_instance_ref(cdata, client);
mutex_lock(&cdata->sender_lock);
cdata->sends_generated++;
DL_APPEND(cdata->sender_sends, sender_send);
pthread_cond_signal(&cdata->sender_cond);
mutex_unlock(&cdata->sender_lock);
}
/* Look for accepted shares in redirector mode to know we can redirect this
* client to a protected server. */
static void test_redirector_shares(ckpool_t *ckp, client_instance_t *client, const char *buf)
{
json_t *val = json_loads(buf, 0, NULL);
share_t *share, *found = NULL;
int64_t id;
if (!val) {
LOGNOTICE("Invalid json response to client %"PRId64, client->id);
return;
}
if (!json_get_int64(&id, val, "id")) {
LOGINFO("Failed to find response id");
goto out;
}
DL_FOREACH(client->shares, share) {
if (share->id == id) {
LOGDEBUG("Found matching share %"PRId64" in trs for client %"PRId64,
id, client->id);
DL_DELETE(client->shares, share);
found = share;
break;
}
}
if (found) {
bool result = false;
dealloc(found);
if (!json_get_bool(&result, val, "result")) {
LOGINFO("Failed to find result in trs share");
goto out;
}
if (!json_is_null(json_object_get(val, "error"))) {
LOGINFO("Got error for trs share");
goto out;
}
if (!result) {
LOGDEBUG("Rejected trs share");
goto out;
}
LOGNOTICE("Found accepted share for client %"PRId64" - redirecting",
client->id);
redirect_client(ckp, client);
/* Clear the list now since we don't need it any more */
DL_FOREACH_SAFE(client->shares, share, found) {
DL_DELETE(client->shares, share);
dealloc(share);
}
}
out:
json_decref(val);
}
/* Send a client by id a heap allocated buffer, allowing this function to /* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */ * free the ram. */
static void send_client(cdata_t *cdata, const int64_t id, char *buf) static void send_client(cdata_t *cdata, const int64_t id, char *buf)
@ -680,6 +891,13 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
return; return;
} }
if (unlikely(ckp->node && !id)) {
LOGDEBUG("Message for node: %s", buf);
send_proc(ckp->stratifier, buf);
free(buf);
return;
}
/* Grab a reference to this client until the sender_send has /* Grab a reference to this client until the sender_send has
* completed processing. Is this a passthrough subclient ? */ * completed processing. Is this a passthrough subclient ? */
if (id > 0xffffffffll) { if (id > 0xffffffffll) {
@ -710,6 +928,20 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
free(buf); free(buf);
return; return;
} }
if (ckp->node) {
json_t *val = json_loads(buf, 0, NULL);
char *msg;
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server));
msg = json_dumps(val, JSON_COMPACT);
json_decref(val);
send_proc(ckp->stratifier, msg);
free(msg);
}
if (ckp->redirector && !client->redirected)
test_redirector_shares(ckp, client, buf);
} }
sender_send = ckzalloc(sizeof(sender_send_t)); sender_send = ckzalloc(sizeof(sender_send_t));
@ -724,6 +956,17 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
} }
static bool client_exists(cdata_t *cdata, const int64_t id)
{
client_instance_t *client;
ck_rlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
ck_runlock(&cdata->lock);
return !!client;
}
static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{ {
char *buf; char *buf;
@ -742,7 +985,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }
@ -753,7 +996,8 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
* upstream client_id instead of the passthrough's. */ * upstream client_id instead of the passthrough's. */
if (client_id > 0xffffffffll) if (client_id > 0xffffffffll)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL);
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(cdata, client_id, msg); send_client(cdata, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
} }
@ -874,6 +1118,17 @@ retry:
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
if (ret >= 0) if (ret >= 0)
LOGINFO("Connector dropped client id: %"PRId64, client_id); LOGINFO("Connector dropped client id: %"PRId64, client_id);
} else if (cmdmatch(buf, "testclient")) {
ret = sscanf(buf, "testclient=%"PRId64, &client_id);
if (unlikely(ret < 0)) {
LOGDEBUG("Connector failed to parse testclient command: %s", buf);
goto retry;
}
client_id &= 0xffffffffll;
if (client_exists(cdata, client_id))
goto retry;
LOGINFO("Connector detected non-existent client id: %"PRId64, client_id);
stratifier_drop_id(ckp, client_id);
} else if (cmdmatch(buf, "ping")) { } else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Connector received ping request"); LOGDEBUG("Connector received ping request");
send_unix_msg(umsg->sockd, "pong"); send_unix_msg(umsg->sockd, "pong");
@ -883,7 +1138,8 @@ retry:
} else if (cmdmatch(buf, "reject")) { } else if (cmdmatch(buf, "reject")) {
LOGDEBUG("Connector received reject signal"); LOGDEBUG("Connector received reject signal");
cdata->accept = false; cdata->accept = false;
send_proc(ckp->stratifier, "dropall"); if (ckp->passthrough)
drop_all_clients(cdata);
} else if (cmdmatch(buf, "stats")) { } else if (cmdmatch(buf, "stats")) {
char *msg; char *msg;

2451
src/generator.c

File diff suppressed because it is too large Load Diff

19
src/libckpool.c

@ -747,12 +747,12 @@ int write_socket(int fd, const void *buf, size_t nbyte)
if (!ret) if (!ret)
LOGNOTICE("Select timed out in write_socket"); LOGNOTICE("Select timed out in write_socket");
else else
LOGERR("Select failed in write_socket"); LOGNOTICE("Select failed in write_socket");
goto out; goto out;
} }
ret = write_length(fd, buf, nbyte); ret = write_length(fd, buf, nbyte);
if (ret < 0) if (ret < 0)
LOGWARNING("Failed to write in write_socket"); LOGNOTICE("Failed to write in write_socket");
out: out:
return ret; return ret;
} }
@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line)
return ptr; return ptr;
} }
/* Round up to the nearest page size for efficient malloc */
size_t round_up_page(size_t len)
{
int rem = len % PAGESIZE;
if (rem)
len += PAGESIZE - rem;
return len;
}
/* Adequate size s==len*2 + 1 must be alloced to use this variant */ /* Adequate size s==len*2 + 1 must be alloced to use this variant */
void __bin2hex(void *vs, const void *vp, size_t len) void __bin2hex(void *vs, const void *vp, size_t len)
{ {
@ -1976,7 +1988,10 @@ double diff_from_nbits(char *nbits)
if (powdiff < 8) if (powdiff < 8)
powdiff = 8; powdiff = 8;
diff32 = be32toh(*((uint32_t *)nbits)) & 0x00FFFFFF; diff32 = be32toh(*((uint32_t *)nbits)) & 0x00FFFFFF;
if (likely(powdiff > 0))
numerator = 0xFFFFULL << powdiff; numerator = 0xFFFFULL << powdiff;
else
numerator = 0xFFFFULL >> -powdiff;
return numerator / (double)diff32; return numerator / (double)diff32;
} }

2
src/libckpool.h

@ -531,6 +531,8 @@ void trail_slash(char **buf);
void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *_ckalloc(size_t len, const char *file, const char *func, const int line);
void *json_ckalloc(size_t size); void *json_ckalloc(size_t size);
void *_ckzalloc(size_t len, const char *file, const char *func, const int line); void *_ckzalloc(size_t len, const char *file, const char *func, const int line);
size_t round_up_page(size_t len);
extern const int hex2bin_tbl[]; extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len); void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len); void *bin2hex(const void *vp, size_t len);

3422
src/stratifier.c

File diff suppressed because it is too large Load Diff

2
src/uthash.h

@ -265,6 +265,8 @@ do {
HASH_FIND(hh,head,findint,sizeof(int64_t),out) HASH_FIND(hh,head,findint,sizeof(int64_t),out)
#define HASH_ADD_I64(head,intfield,add) \ #define HASH_ADD_I64(head,intfield,add) \
HASH_ADD(hh,head,intfield,sizeof(int64_t),add) HASH_ADD(hh,head,intfield,sizeof(int64_t),add)
#define HASH_REPLACE_I64(head,intfield,add,replaced) \
HASH_REPLACE(hh,head,intfield,sizeof(int64_t),add,replaced)
#define HASH_FIND_PTR(head,findptr,out) \ #define HASH_FIND_PTR(head,findptr,out) \
HASH_FIND(hh,head,findptr,sizeof(void *),out) HASH_FIND(hh,head,findptr,sizeof(void *),out)
#define HASH_ADD_PTR(head,ptrfield,add) \ #define HASH_ADD_PTR(head,ptrfield,add) \

Loading…
Cancel
Save