From 11d14620e6818cbb78d6408ee9f8f4336a942e69 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 00:16:10 +1100 Subject: [PATCH] Convert to gzip --- configure.ac | 4 +-- src/ckpool.c | 61 +++++++++++++++++++++++-------------------- src/ckpool.h | 6 +++-- src/connector.c | 67 ++++++++++++++++++++++++++---------------------- src/generator.c | 16 ++++++------ src/libckpool.c | 12 +++++++++ src/libckpool.h | 2 ++ src/stratifier.c | 8 +++--- 8 files changed, 102 insertions(+), 74 deletions(-) diff --git a/configure.ac b/configure.ac index b99bb405..ff33c36e 100644 --- a/configure.ac +++ b/configure.ac @@ -40,7 +40,7 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) -AC_CHECK_HEADERS(lz4.h) +AC_CHECK_HEADERS(zlib.h) AC_CONFIG_SUBDIRS([src/jansson-2.6]) JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" @@ -54,7 +54,7 @@ AC_ARG_WITH([ckdb], #AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) -AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) +AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1) AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) if test "x$ckdb" != "xno"; then diff --git a/src/ckpool.c b/src/ckpool.c index 25ec3331..e7b698ea 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include @@ -36,6 +36,8 @@ ckpool_t *global_ckp; +const char gzip_magic[] = "\x1f\xd5\x01\n"; + static void proclog(ckpool_t *ckp, char *msg) { FILE *LOGFP; @@ -567,13 +569,14 @@ out: return ret; } -static int read_lz4_line(connsock_t *cs, float *timeout) +static int read_gz_line(connsock_t *cs, float *timeout) { - int compsize, decompsize, ret, buflen; + unsigned long compsize, res, decompsize; char *buf, *dest = NULL, *eom; + int ret, buflen; uint32_t msglen; - /* Remove lz4 header */ + /* Remove gz header */ clear_bufline(cs); /* Get data sizes */ @@ -588,35 +591,35 @@ static int read_lz4_line(connsock_t *cs, float *timeout) decompsize = le32toh(msglen); cs->bufofs = 8; clear_bufline(cs); - LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize); - if (unlikely(compsize < 1 || compsize > (int)0x80000000 || - decompsize < 1 || decompsize > (int)0x80000000)) { - LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize); + if (unlikely(compsize < 1 || compsize > 0x80000000 || + decompsize < 1 || decompsize > 0x80000000)) { + LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize); ret = -1; goto out; } /* Get compressed data */ ret = read_cs_length(cs, timeout, compsize); - if (ret != compsize) { - LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret); + if (ret != (int)compsize) { + LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret); ret = -1; goto out; } /* Do decompresion and buffer reconstruction here */ dest = ckalloc(decompsize); - ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize); + res = decompsize; + ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); /* Clear out all the compressed data */ clear_bufline(cs); - if (ret != decompsize) { - LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret); + if (ret != Z_OK || res != decompsize) { + LOGWARNING("Failed to decompress %lu bytes in read_gz_line, got %d", decompsize, ret); ret = -1; goto out; } eom = dest + decompsize - 1; if (memcmp(eom, "\n", 1)) { - LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line"); + LOGWARNING("Failed to find EOM in decompressed data in read_gz_line"); ret = -1; goto out; } @@ -714,39 +717,41 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - } else if (cs->buflen && !strncmp(cs->buf, "lz4", 3)) - ret = read_lz4_line(cs, timeout); + } else if (cs->buflen && !strncmp(cs->buf, gzip_magic, 3)) + ret = read_gz_line(cs, timeout); return ret; } -/* Lz4 compressed block structure: - * - 4 byte magic header LZ4\n +/* gzip compressed block structure: + * - 4 byte magic header gzip_magic "\x1f\xd5\x01\n" * - 4 byte LE encoded compressed size * - 4 byte LE encoded decompressed size */ int write_cs(connsock_t *cs, const char *buf, int len) { - int compsize, decompsize = len; + unsigned long compsize, decompsize = len; char *dest = NULL; uint32_t msglen; int ret = -1; - /* Connsock doesn't expect lz4 compressed messages */ - if (!cs->lz4 || len <= 1492) { + /* Connsock doesn't expect gz compressed messages */ + if (!cs->gz || len <= 1492) { ret = write_socket(cs->fd, buf, len); goto out; } - dest = ckalloc(len + 12); + compsize = round_up_page(len + 12); + dest = ckalloc(compsize); + compsize -= 12; /* Do compression here */ - compsize = LZ4_compress(buf, dest + 12, len); - if (!compsize) { - LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed"); + ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + if (ret != Z_OK) { + LOGWARNING("Failed to gz compress in write_cs, writing uncompressed"); ret = write_socket(cs->fd, buf, len); goto out; } - LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); - /* Copy lz4 magic header */ - sprintf(dest, "lz4\n"); + LOGDEBUG("Writing connsock message compressed %lu from %lu", compsize, decompsize); + /* Copy gz magic header */ + sprintf(dest, gzip_magic); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); diff --git a/src/ckpool.h b/src/ckpool.h index 9f2f599a..52017027 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -83,8 +83,8 @@ struct connsock { /* Semaphore used to serialise request/responses */ sem_t sem; - /* Has the other end acknowledged it can receive lz4 compressed data */ - bool lz4; + /* Has the other end acknowledged it can receive gz compressed data */ + bool gz; }; typedef struct connsock connsock_t; @@ -284,6 +284,8 @@ static const char __maybe_unused *stratum_msgs[] = { "" }; +extern const char gzip_magic[]; + #ifdef USE_CKDB #define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #else diff --git a/src/connector.c b/src/connector.c index 1faa82ba..cb1d240d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -10,12 +10,12 @@ #include "config.h" #include -#include #include #include #include #include #include +#include #include "ckpool.h" #include "libckpool.h" @@ -56,7 +56,7 @@ struct client_instance { int server; char buf[PAGESIZE]; - int bufofs; + unsigned long bufofs; /* Are we currently sending a blocked message from this client */ sender_send_t *sending; @@ -64,11 +64,11 @@ struct client_instance { /* Is this the parent passthrough client */ bool passthrough; - /* Does this client expect lz4 compression? */ - bool lz4; + /* Does this client expect gz compression? */ + bool gz; bool compressed; /* Currently receiving a compressed message */ - int compsize; /* Expected compressed data size */ - int decompsize; /* Expected decompressed data size */ + unsigned long compsize; /* Expected compressed data size */ + unsigned long decompsize; /* Expected decompressed data size */ /* Linked list of shares in redirector mode.*/ share_t *shares; @@ -479,7 +479,7 @@ retry: if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; @@ -487,18 +487,21 @@ retry: client->bufofs += ret; compressed: if (client->compressed) { + unsigned long res; + if (client->bufofs < client->compsize) goto retry; - ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize); - if (ret != client->decompsize) { - LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d", + res = client->decompsize; + ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize); + if (ret != Z_OK || res != client->decompsize) { + LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d", client->decompsize, client->compsize, ret); invalidate_client(ckp, cdata, client); return; } - LOGDEBUG("Received client message compressed %d from %d", + LOGDEBUG("Received client message compressed %lu from %lu", client->compsize, client->decompsize); - msg[ret] = '\0'; + msg[res] = '\0'; client->bufofs -= client->compsize; if (client->bufofs) memmove(client->buf, client->buf + buflen, client->bufofs); @@ -519,7 +522,7 @@ reparse: } /* Look for a compression header */ - if (!strncmp(client->buf, "lz4", 3)) { + if (!strncmp(client->buf, gzip_magic, 3)) { uint32_t msglen; /* Do we have the whole header? If not, keep reading */ @@ -531,7 +534,7 @@ reparse: client->decompsize = le32toh(msglen); if (unlikely(!client->compsize || !client->decompsize || client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { - LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting", + LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting", client->id, client->compsize, client->decompsize); invalidate_client(ckp, cdata, client); return; @@ -1000,20 +1003,24 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->lz4 && len > 1492) { - char *dest = ckalloc(len + 12); + if (client->gz && len > 1492) { + unsigned long compsize; uint32_t msglen; - int compsize; - - compsize = LZ4_compress(buf, dest + 12, len); - if (unlikely(!compsize)) { - LOGWARNING("Failed to LZ4 compress in send_client, sending uncompressed"); + char *dest; + int ret; + + compsize = round_up_page(len + 12); + dest = ckalloc(compsize); + compsize -= 12; + ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + if (unlikely(ret != Z_OK)) { + LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret); free(dest); goto out; } - LOGDEBUG("Sending client message compressed %d from %d", compsize, len); - /* Copy lz4 magic header */ - sprintf(dest, "lz4\n"); + LOGDEBUG("Sending client message compressed %lu from %d", compsize, len); + /* Copy gz magic header */ + sprintf(dest, gzip_magic); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); @@ -1054,7 +1061,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n"); + ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n"); send_client(cdata, client->id, buf); } @@ -1233,11 +1240,11 @@ retry: goto out; } else if (cmdmatch(buf, "pass")) { client_instance_t *client; - bool lz4 = false; + bool gz = false; - if (strstr(buf, "lz4")) { - lz4 = true; - ret = sscanf(buf, "passlz4=%"PRId64, &client_id); + if (strstr(buf, "gz")) { + gz = true; + ret = sscanf(buf, "passgz=%"PRId64, &client_id); } else ret = sscanf(buf, "passthrough=%"PRId64, &client_id); if (ret < 0) { @@ -1249,7 +1256,7 @@ retry: LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); goto retry; } - client->lz4 = lz4; + client->gz = gz; passthrough_client(cdata, client); dec_instance_ref(cdata, client); } else if (cmdmatch(buf, "getxfd")) { diff --git a/src/generator.c b/src/generator.c index e9d69ef4..f40bbfb2 100644 --- a/src/generator.c +++ b/src/generator.c @@ -774,7 +774,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.passthrough", - "lz4", json_true(), + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); json_decref(req); @@ -799,9 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied passthrough for stratum"); goto out; } - json_get_bool(&cs->lz4, val, "lz4"); - if (cs->lz4) - LOGNOTICE("Negotiated lz4 compression with pool"); + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->passthrough = true; out: if (val) @@ -820,7 +820,7 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.node", - "lz4", json_true(), + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); @@ -846,9 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied node setup for stratum"); goto out; } - json_get_bool(&cs->lz4, val, "lz4"); - if (cs->lz4) - LOGNOTICE("Negotiated lz4 compression with pool"); + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->node = true; out: if (val) diff --git a/src/libckpool.c b/src/libckpool.c index a0557a71..2e78d330 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } +/* Round up to the nearest page size for efficient malloc */ +size_t round_up_page(size_t len) +{ + int rem = len % PAGESIZE; + + if (rem) + len += PAGESIZE - rem; + return len; +} + + + /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 9d8ba341..10c18248 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -531,6 +531,8 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); +size_t round_up_page(size_t len); + extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); diff --git a/src/stratifier.c b/src/stratifier.c index bc618dfb..55bdf65c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5238,10 +5238,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ - json_get_bool(&var, val, "lz4"); + json_get_bool(&var, val, "gz"); add_mining_node(sdata, client); if (var) - snprintf(buf, 255, "passlz4=%"PRId64, client_id); + snprintf(buf, 255, "passgz=%"PRId64, client_id); else snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); @@ -5255,10 +5255,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie * is a passthrough and to manage its messages accordingly. No * data from this client id should ever come back to this * stratifier after this so drop the client in the stratifier. */ - json_get_bool(&var, val, "lz4"); + json_get_bool(&var, val, "gz"); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); if (var) - snprintf(buf, 255, "passlz4=%"PRId64, client_id); + snprintf(buf, 255, "passgz=%"PRId64, client_id); else snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf);