Browse Source

Implement backwardly compatible lz4 compression support to be used by passthroughs and nodes

master
Con Kolivas 9 years ago
parent
commit
950b855b2d
  1. 19
      configure.ac
  2. 4
      src/Makefile.am
  3. 202
      src/ckpool.c
  4. 4
      src/ckpool.h
  5. 60
      src/connector.c
  6. 14
      src/generator.c

19
configure.ac

@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h)
AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h)
AC_CHECK_HEADERS(lz4.h)
PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm"
RT_LIBS="-lrt"
AC_CONFIG_SUBDIRS([src/jansson-2.6]) AC_CONFIG_SUBDIRS([src/jansson-2.6])
JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a"
AC_SUBST(PTHREAD_LIBS)
AC_SUBST(MATH_LIBS)
AC_SUBST(RT_LIBS)
AC_SUBST(JANSSON_LIBS) AC_SUBST(JANSSON_LIBS)
AC_ARG_WITH([ckdb], AC_ARG_WITH([ckdb],
@ -58,6 +52,11 @@ AC_ARG_WITH([ckdb],
[ckdb=$withval] [ckdb=$withval]
) )
#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1)
AC_SEARCH_LIBS(pthread_create, pthread, , "Error: Required library pthreads not found." && exit 1)
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev
not found. Install it or disable postgresql support with --without-ckdb" && exit 1) not found. Install it or disable postgresql support with --without-ckdb" && exit 1)
@ -84,8 +83,10 @@ echo "Compilation............: make (or gmake)"
echo " CPPFLAGS.............: $CPPFLAGS" echo " CPPFLAGS.............: $CPPFLAGS"
echo " CFLAGS...............: $CFLAGS" echo " CFLAGS...............: $CFLAGS"
echo " LDFLAGS..............: $LDFLAGS" echo " LDFLAGS..............: $LDFLAGS"
echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" echo " LDADD................: $LIBS $JANSSON_LIBS"
echo " db LDADD.............: $DB_LIBS" if test "x$ckdb" != "xno"; then
echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS"
fi
echo echo
echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')"
echo " prefix...............: $prefix" echo " prefix...............: $prefix"

4
src/Makefile.am

@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src
lib_LTLIBRARIES = libckpool.la lib_LTLIBRARIES = libckpool.la
libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h
libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ libckpool_la_LIBADD = @LIBS@
bin_PROGRAMS = ckpool ckpmsg notifier bin_PROGRAMS = ckpool ckpmsg notifier
ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \
@ -23,5 +23,5 @@ if WANT_CKDB
bin_PROGRAMS += ckdb bin_PROGRAMS += ckdb
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \
ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h
ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@
endif endif

202
src/ckpool.c

@ -20,6 +20,7 @@
#include <getopt.h> #include <getopt.h>
#include <grp.h> #include <grp.h>
#include <jansson.h> #include <jansson.h>
#include <lz4.h>
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -510,6 +511,129 @@ void empty_buffer(connsock_t *cs)
cs->buflen = cs->bufofs = 0; cs->buflen = cs->bufofs = 0;
} }
static void clear_bufline(connsock_t *cs)
{
if (unlikely(!cs->buf))
cs->buf = ckzalloc(PAGESIZE);
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
}
}
static void add_bufline(connsock_t *cs, const char *readbuf, const int len)
{
int backoff = 1;
size_t buflen;
char *newbuf;
buflen = cs->bufofs + len + 1;
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, len);
cs->bufofs += len;
cs->buf[cs->bufofs] = '\0';
}
static int read_cs_length(connsock_t *cs, float *timeout, int len)
{
int ret = len;
while (cs->buflen < len) {
char readbuf[PAGESIZE] = {};
ret = wait_read_select(cs->fd, *timeout);
if (ret < 1)
goto out;
ret = recv(cs->fd, readbuf, len - cs->buflen, MSG_DONTWAIT);
if (ret < 1)
goto out;
add_bufline(cs, readbuf, ret);
}
out:
return ret;
}
static int read_lz4_line(connsock_t *cs, float *timeout)
{
int compsize, decompsize, ret, buflen;
char *buf, *dest = NULL, *eom;
uint32_t msglen;
/* Remove lz4 header */
clear_bufline(cs);
/* Get data sizes */
ret = read_cs_length(cs, timeout, 8);
if (ret != 8) {
ret = -1;
goto out;
}
memcpy(&msglen, cs->buf, 4);
compsize = le32toh(msglen);
memcpy(&msglen, cs->buf + 4, 4);
decompsize = le32toh(msglen);
cs->bufofs = 8;
clear_bufline(cs);
LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize);
if (unlikely(compsize < 1 || compsize > (int)0x80000000 ||
decompsize < 1 || decompsize > (int)0x80000000)) {
LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize);
ret = -1;
goto out;
}
/* Get compressed data */
ret = read_cs_length(cs, timeout, compsize);
if (ret != compsize) {
LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret);
ret = -1;
goto out;
}
/* Do decompresion and buffer reconstruction here */
dest = ckalloc(decompsize);
ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize);
/* Clear out all the compressed data */
clear_bufline(cs);
if (ret != decompsize) {
LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret);
ret = -1;
goto out;
}
eom = dest + decompsize - 1;
if (memcmp(eom, "\n", 1)) {
LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line");
ret = -1;
goto out;
}
*eom = '\0';
/* Wedge the decompressed buffer back to the start of cs->buf */
buf = cs->buf;
buflen = cs->buflen;
cs->buf = dest;
dest = NULL;
ret = cs->buflen = decompsize - 1;
if (buflen)
add_bufline(cs, buf, buflen);
out:
free(dest);
return ret;
}
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0' /* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning * and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
@ -517,7 +641,6 @@ int read_socket_line(connsock_t *cs, float *timeout)
{ {
char *eom = NULL; char *eom = NULL;
tv_t start, now; tv_t start, now;
size_t buflen;
int ret = -1; int ret = -1;
bool polled; bool polled;
float diff; float diff;
@ -525,16 +648,8 @@ int read_socket_line(connsock_t *cs, float *timeout)
if (unlikely(cs->fd < 0)) if (unlikely(cs->fd < 0))
goto out; goto out;
if (unlikely(!cs->buf)) clear_bufline(cs);
cs->buf = ckzalloc(PAGESIZE); eom = strchr(cs->buf, '\n');
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n');
}
tv_time(&start); tv_time(&start);
rewait: rewait:
@ -564,8 +679,6 @@ rewait:
*timeout -= diff; *timeout -= diff;
while (42) { while (42) {
char readbuf[PAGESIZE] = {}; char readbuf[PAGESIZE] = {};
int backoff = 1;
char *newbuf;
ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
@ -585,22 +698,7 @@ rewait:
goto out; goto out;
} }
polled = false; polled = false;
buflen = cs->bufofs + ret + 1; add_bufline(cs, readbuf, ret);
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, ret);
cs->bufofs += ret;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
} }
parse: parse:
@ -616,7 +714,53 @@ out:
if (ret < 0) { if (ret < 0) {
empty_buffer(cs); empty_buffer(cs);
dealloc(cs->buf); dealloc(cs->buf);
} else if (cs->buflen && !strncmp(cs->buf, "lz4", 3))
ret = read_lz4_line(cs, timeout);
return ret;
}
/* Lz4 compressed block structure:
* - 4 byte magic header LZ4\n
* - 4 byte LE encoded compressed size
* - 4 byte LE encoded decompressed size
*/
int write_cs(connsock_t *cs, const char *buf, int len)
{
int compsize, decompsize = len;
char *dest = NULL;
uint32_t msglen;
int ret = -1;
/* Connsock doesn't expect lz4 compressed messages */
if (!cs->lz4) {
ret = write_socket(cs->fd, buf, len);
goto out;
} }
dest = ckalloc(len + 12);
/* Do compression here */
compsize = LZ4_compress(buf, dest + 12, len);
if (!compsize) {
LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed");
ret = write_socket(cs->fd, buf, len);
goto out;
}
LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize);
/* Copy lz4 magic header */
sprintf(dest, "lz4\n");
/* Copy compressed message length */
msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4);
/* Copy decompressed message length */
msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4);
len = compsize + 12;
ret = write_socket(cs->fd, dest, len);
if (ret == len)
ret = decompsize;
else
ret = -1;
out:
free(dest);
return ret; return ret;
} }

4
src/ckpool.h

@ -82,6 +82,9 @@ struct connsock {
ckpool_t *ckp; ckpool_t *ckp;
/* Semaphore used to serialise request/responses */ /* Semaphore used to serialise request/responses */
sem_t sem; sem_t sem;
/* Has the other end acknowledged it can receive lz4 compressed data */
bool lz4;
}; };
typedef struct connsock connsock_t; typedef struct connsock connsock_t;
@ -301,6 +304,7 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
int write_cs(connsock_t *cs, const char *buf, int len);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

60
src/connector.c

@ -10,6 +10,7 @@
#include "config.h" #include "config.h"
#include <arpa/inet.h> #include <arpa/inet.h>
#include <lz4.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -63,6 +64,12 @@ struct client_instance {
/* Is this the parent passthrough client */ /* Is this the parent passthrough client */
bool passthrough; bool passthrough;
/* Does this client expect lz4 compression? */
bool lz4;
bool compressed; /* Currently receiving a compressed message */
int compsize; /* Expected compressed data size */
int decompsize; /* Expected decompressed data size */
/* Linked list of shares in redirector mode.*/ /* Linked list of shares in redirector mode.*/
share_t *shares; share_t *shares;
@ -478,6 +485,28 @@ retry:
return; return;
} }
client->bufofs += ret; client->bufofs += ret;
compressed:
if (client->compressed) {
if (client->bufofs < client->compsize)
goto retry;
ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize);
if (ret != client->decompsize) {
LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d",
client->decompsize, client->compsize, ret);
invalidate_client(ckp, cdata, client);
return;
}
LOGDEBUG("Received client message compressed %d from %d",
client->compsize, client->decompsize);
msg[ret] = '\0';
/* Flag this client as able to receive lz4 compressed data now */
client->lz4 = true;
client->bufofs -= client->compsize;
if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs);
client->compressed = false;
goto parse;
}
reparse: reparse:
eol = memchr(client->buf, '\n', client->bufofs); eol = memchr(client->buf, '\n', client->bufofs);
if (!eol) if (!eol)
@ -490,11 +519,40 @@ reparse:
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
/* Look for a compression header */
if (!strncmp(client->buf, "lz4", 3)) {
uint32_t msglen;
/* Do we have the whole header? If not, keep reading */
if (client->bufofs < 12)
goto retry;
memcpy(&msglen, client->buf + 4, 4);
client->compsize = le32toh(msglen);
memcpy(&msglen, client->buf + 8, 4);
client->decompsize = le32toh(msglen);
if (unlikely(!client->compsize || !client->decompsize ||
client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) {
LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting",
client->id, client->compsize, client->decompsize);
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs -= 12;
if (client->bufofs > 0)
memmove(client->buf, client->buf + 12, client->bufofs);
client->compressed = true;
if (client->bufofs >= client->compsize)
goto compressed;
goto retry;
}
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
msg[buflen] = '\0'; msg[buflen] = '\0';
client->bufofs -= buflen; client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs); memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0'; client->buf[client->bufofs] = '\0';
parse:
if (!(val = json_loads(msg, 0, NULL))) { if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n"); char *buf = strdup("Invalid JSON, disconnecting\n");
@ -971,7 +1029,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
LOGINFO("Connector adding passthrough client %"PRId64, client->id); LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true; client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true}\n"); ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n");
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }

14
src/generator.c

@ -772,8 +772,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.passthrough", "method", "mining.passthrough",
"lz4", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
json_decref(req); json_decref(req);
@ -798,6 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied passthrough for stratum"); LOGWARNING("Denied passthrough for stratum");
goto out; goto out;
} }
json_get_bool(&cs->lz4, val, "lz4");
if (cs->lz4)
LOGNOTICE("Negotiated lz4 compression with pool");
proxi->passthrough = true; proxi->passthrough = true;
out: out:
if (val) if (val)
@ -814,8 +818,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.node", "method", "mining.node",
"lz4", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
@ -841,6 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied node setup for stratum"); LOGWARNING("Denied node setup for stratum");
goto out; goto out;
} }
json_get_bool(&cs->lz4, val, "lz4");
if (cs->lz4)
LOGNOTICE("Negotiated lz4 compression with pool");
proxi->node = true; proxi->node = true;
out: out:
if (val) if (val)
@ -1795,7 +1803,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm)
LOGDEBUG("Sending upstream json msg: %s", pm->msg); LOGDEBUG("Sending upstream json msg: %s", pm->msg);
len = strlen(pm->msg); len = strlen(pm->msg);
sent = write_socket(cs->fd, pm->msg, len); sent = write_cs(cs, pm->msg, len);
if (unlikely(sent != len)) { if (unlikely(sent != len)) {
LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect", LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect",
len, pm->msg); len, pm->msg);

Loading…
Cancel
Save