diff --git a/configure.ac b/configure.ac index 6097d1a4..a805be4d 100644 --- a/configure.ac +++ b/configure.ac @@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) - -PTHREAD_LIBS="-lpthread" -MATH_LIBS="-lm" -RT_LIBS="-lrt" +AC_CHECK_HEADERS(lz4.h) AC_CONFIG_SUBDIRS([src/jansson-2.6]) JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" -AC_SUBST(PTHREAD_LIBS) -AC_SUBST(MATH_LIBS) -AC_SUBST(RT_LIBS) AC_SUBST(JANSSON_LIBS) AC_ARG_WITH([ckdb], @@ -58,6 +52,11 @@ AC_ARG_WITH([ckdb], [ckdb=$withval] ) +#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) +AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) +AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) +AC_SEARCH_LIBS(pthread_create, pthread, , "Error: Required library pthreads not found." && exit 1) + if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) @@ -84,8 +83,10 @@ echo "Compilation............: make (or gmake)" echo " CPPFLAGS.............: $CPPFLAGS" echo " CFLAGS...............: $CFLAGS" echo " LDFLAGS..............: $LDFLAGS" -echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" -echo " db LDADD.............: $DB_LIBS" +echo " LDADD................: $LIBS $JANSSON_LIBS" +if test "x$ckdb" != "xno"; then + echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS" +fi echo echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo " prefix...............: $prefix" diff --git a/src/Makefile.am b/src/Makefile.am index e95666ac..53561714 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src lib_LTLIBRARIES = libckpool.la libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h -libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ +libckpool_la_LIBADD = @LIBS@ bin_PROGRAMS = ckpool ckpmsg notifier ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ @@ -23,5 +23,5 @@ if WANT_CKDB bin_PROGRAMS += ckdb ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h -ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ +ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@ endif diff --git a/src/ckpool.c b/src/ckpool.c index 15a9a5bf..a5a41108 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -510,6 +511,129 @@ void empty_buffer(connsock_t *cs) cs->buflen = cs->bufofs = 0; } +static void clear_bufline(connsock_t *cs) +{ + if (unlikely(!cs->buf)) + cs->buf = ckzalloc(PAGESIZE); + else if (cs->buflen) { + memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); + memset(cs->buf + cs->buflen, 0, cs->bufofs); + cs->bufofs = cs->buflen; + cs->buflen = 0; + cs->buf[cs->bufofs] = '\0'; + } +} + +static void add_bufline(connsock_t *cs, const char *readbuf, const int len) +{ + int backoff = 1; + size_t buflen; + char *newbuf; + + buflen = cs->bufofs + len + 1; + while (42) { + newbuf = realloc(cs->buf, buflen); + if (likely(newbuf)) + break; + if (backoff == 1) + fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); + cksleep_ms(backoff); + backoff <<= 1; + } + cs->buf = newbuf; + if (unlikely(!cs->buf)) + quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); + memcpy(cs->buf + cs->bufofs, readbuf, len); + cs->bufofs += len; + cs->buf[cs->bufofs] = '\0'; +} + +static int read_cs_length(connsock_t *cs, float *timeout, int len) +{ + int ret = len; + + while (cs->buflen < len) { + char readbuf[PAGESIZE] = {}; + + ret = wait_read_select(cs->fd, *timeout); + if (ret < 1) + goto out; + ret = recv(cs->fd, readbuf, len - cs->buflen, MSG_DONTWAIT); + if (ret < 1) + goto out; + add_bufline(cs, readbuf, ret); + } +out: + return ret; +} + +static int read_lz4_line(connsock_t *cs, float *timeout) +{ + int compsize, decompsize, ret, buflen; + char *buf, *dest = NULL, *eom; + uint32_t msglen; + + /* Remove lz4 header */ + clear_bufline(cs); + + /* Get data sizes */ + ret = read_cs_length(cs, timeout, 8); + if (ret != 8) { + ret = -1; + goto out; + } + memcpy(&msglen, cs->buf, 4); + compsize = le32toh(msglen); + memcpy(&msglen, cs->buf + 4, 4); + decompsize = le32toh(msglen); + cs->bufofs = 8; + clear_bufline(cs); + LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize); + + if (unlikely(compsize < 1 || compsize > (int)0x80000000 || + decompsize < 1 || decompsize > (int)0x80000000)) { + LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize); + ret = -1; + goto out; + } + + /* Get compressed data */ + ret = read_cs_length(cs, timeout, compsize); + if (ret != compsize) { + LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret); + ret = -1; + goto out; + } + /* Do decompresion and buffer reconstruction here */ + dest = ckalloc(decompsize); + ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize); + /* Clear out all the compressed data */ + clear_bufline(cs); + if (ret != decompsize) { + LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret); + ret = -1; + goto out; + } + eom = dest + decompsize - 1; + if (memcmp(eom, "\n", 1)) { + LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line"); + ret = -1; + goto out; + } + *eom = '\0'; + /* Wedge the decompressed buffer back to the start of cs->buf */ + buf = cs->buf; + buflen = cs->buflen; + cs->buf = dest; + dest = NULL; + ret = cs->buflen = decompsize - 1; + if (buflen) + add_bufline(cs, buf, buflen); +out: + free(dest); + return ret; +} + /* Read from a socket into cs->buf till we get an '\n', converting it to '\0' * and storing how much extra data we've received, to be moved to the beginning * of the buffer for use on the next receive. */ @@ -517,7 +641,6 @@ int read_socket_line(connsock_t *cs, float *timeout) { char *eom = NULL; tv_t start, now; - size_t buflen; int ret = -1; bool polled; float diff; @@ -525,16 +648,8 @@ int read_socket_line(connsock_t *cs, float *timeout) if (unlikely(cs->fd < 0)) goto out; - if (unlikely(!cs->buf)) - cs->buf = ckzalloc(PAGESIZE); - else if (cs->buflen) { - memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); - memset(cs->buf + cs->buflen, 0, cs->bufofs); - cs->bufofs = cs->buflen; - cs->buflen = 0; - cs->buf[cs->bufofs] = '\0'; - eom = strchr(cs->buf, '\n'); - } + clear_bufline(cs); + eom = strchr(cs->buf, '\n'); tv_time(&start); rewait: @@ -564,8 +679,6 @@ rewait: *timeout -= diff; while (42) { char readbuf[PAGESIZE] = {}; - int backoff = 1; - char *newbuf; ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { @@ -585,22 +698,7 @@ rewait: goto out; } polled = false; - buflen = cs->bufofs + ret + 1; - while (42) { - newbuf = realloc(cs->buf, buflen); - if (likely(newbuf)) - break; - if (backoff == 1) - fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); - cksleep_ms(backoff); - backoff <<= 1; - } - cs->buf = newbuf; - if (unlikely(!cs->buf)) - quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); - memcpy(cs->buf + cs->bufofs, readbuf, ret); - cs->bufofs += ret; - cs->buf[cs->bufofs] = '\0'; + add_bufline(cs, readbuf, ret); eom = strchr(cs->buf, '\n'); } parse: @@ -616,7 +714,53 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); + } else if (cs->buflen && !strncmp(cs->buf, "lz4", 3)) + ret = read_lz4_line(cs, timeout); + return ret; +} + +/* Lz4 compressed block structure: + * - 4 byte magic header LZ4\n + * - 4 byte LE encoded compressed size + * - 4 byte LE encoded decompressed size + */ +int write_cs(connsock_t *cs, const char *buf, int len) +{ + int compsize, decompsize = len; + char *dest = NULL; + uint32_t msglen; + int ret = -1; + + /* Connsock doesn't expect lz4 compressed messages */ + if (!cs->lz4) { + ret = write_socket(cs->fd, buf, len); + goto out; } + dest = ckalloc(len + 12); + /* Do compression here */ + compsize = LZ4_compress(buf, dest + 12, len); + if (!compsize) { + LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed"); + ret = write_socket(cs->fd, buf, len); + goto out; + } + LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); + /* Copy lz4 magic header */ + sprintf(dest, "lz4\n"); + /* Copy compressed message length */ + msglen = htole32(compsize); + memcpy(dest + 4, &msglen, 4); + /* Copy decompressed message length */ + msglen = htole32(decompsize); + memcpy(dest + 8, &msglen, 4); + len = compsize + 12; + ret = write_socket(cs->fd, dest, len); + if (ret == len) + ret = decompsize; + else + ret = -1; +out: + free(dest); return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 0829d36c..9f2f599a 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -82,6 +82,9 @@ struct connsock { ckpool_t *ckp; /* Semaphore used to serialise request/responses */ sem_t sem; + + /* Has the other end acknowledged it can receive lz4 compressed data */ + bool lz4; }; typedef struct connsock connsock_t; @@ -301,6 +304,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, float *timeout); +int write_cs(connsock_t *cs, const char *buf, int len); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, diff --git a/src/connector.c b/src/connector.c index f78cb9c8..f2858a59 100644 --- a/src/connector.c +++ b/src/connector.c @@ -10,6 +10,7 @@ #include "config.h" #include +#include #include #include #include @@ -63,6 +64,12 @@ struct client_instance { /* Is this the parent passthrough client */ bool passthrough; + /* Does this client expect lz4 compression? */ + bool lz4; + bool compressed; /* Currently receiving a compressed message */ + int compsize; /* Expected compressed data size */ + int decompsize; /* Expected decompressed data size */ + /* Linked list of shares in redirector mode.*/ share_t *shares; @@ -478,6 +485,28 @@ retry: return; } client->bufofs += ret; +compressed: + if (client->compressed) { + if (client->bufofs < client->compsize) + goto retry; + ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize); + if (ret != client->decompsize) { + LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d", + client->decompsize, client->compsize, ret); + invalidate_client(ckp, cdata, client); + return; + } + LOGDEBUG("Received client message compressed %d from %d", + client->compsize, client->decompsize); + msg[ret] = '\0'; + /* Flag this client as able to receive lz4 compressed data now */ + client->lz4 = true; + client->bufofs -= client->compsize; + if (client->bufofs) + memmove(client->buf, client->buf + buflen, client->bufofs); + client->compressed = false; + goto parse; + } reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) @@ -490,11 +519,40 @@ reparse: invalidate_client(ckp, cdata, client); return; } + + /* Look for a compression header */ + if (!strncmp(client->buf, "lz4", 3)) { + uint32_t msglen; + + /* Do we have the whole header? If not, keep reading */ + if (client->bufofs < 12) + goto retry; + memcpy(&msglen, client->buf + 4, 4); + client->compsize = le32toh(msglen); + memcpy(&msglen, client->buf + 8, 4); + client->decompsize = le32toh(msglen); + if (unlikely(!client->compsize || !client->decompsize || + client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting", + client->id, client->compsize, client->decompsize); + invalidate_client(ckp, cdata, client); + return; + } + client->bufofs -= 12; + if (client->bufofs > 0) + memmove(client->buf, client->buf + 12, client->bufofs); + client->compressed = true; + if (client->bufofs >= client->compsize) + goto compressed; + goto retry; + } + memcpy(msg, client->buf, buflen); msg[buflen] = '\0'; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); client->buf[client->bufofs] = '\0'; +parse: if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); @@ -971,7 +1029,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true}\n"); + ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n"); send_client(cdata, client->id, buf); } diff --git a/src/generator.c b/src/generator.c index 3680535a..e9d69ef4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -772,8 +772,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.passthrough", + "lz4", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); json_decref(req); @@ -798,6 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied passthrough for stratum"); goto out; } + json_get_bool(&cs->lz4, val, "lz4"); + if (cs->lz4) + LOGNOTICE("Negotiated lz4 compression with pool"); proxi->passthrough = true; out: if (val) @@ -814,8 +818,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.node", + "lz4", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); @@ -841,6 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied node setup for stratum"); goto out; } + json_get_bool(&cs->lz4, val, "lz4"); + if (cs->lz4) + LOGNOTICE("Negotiated lz4 compression with pool"); proxi->node = true; out: if (val) @@ -1795,7 +1803,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm) LOGDEBUG("Sending upstream json msg: %s", pm->msg); len = strlen(pm->msg); - sent = write_socket(cs->fd, pm->msg, len); + sent = write_cs(cs, pm->msg, len); if (unlikely(sent != len)) { LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect", len, pm->msg);