From 950b855b2d799a06688344adc0f90948db8f9cce Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Jan 2016 16:32:58 +1100 Subject: [PATCH 01/16] Implement backwardly compatible lz4 compression support to be used by passthroughs and nodes --- configure.ac | 19 ++--- src/Makefile.am | 4 +- src/ckpool.c | 202 +++++++++++++++++++++++++++++++++++++++++------- src/ckpool.h | 4 + src/connector.c | 60 +++++++++++++- src/generator.c | 14 +++- 6 files changed, 259 insertions(+), 44 deletions(-) diff --git a/configure.ac b/configure.ac index 6097d1a4..a805be4d 100644 --- a/configure.ac +++ b/configure.ac @@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) - -PTHREAD_LIBS="-lpthread" -MATH_LIBS="-lm" -RT_LIBS="-lrt" +AC_CHECK_HEADERS(lz4.h) AC_CONFIG_SUBDIRS([src/jansson-2.6]) JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" -AC_SUBST(PTHREAD_LIBS) -AC_SUBST(MATH_LIBS) -AC_SUBST(RT_LIBS) AC_SUBST(JANSSON_LIBS) AC_ARG_WITH([ckdb], @@ -58,6 +52,11 @@ AC_ARG_WITH([ckdb], [ckdb=$withval] ) +#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) +AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) +AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) +AC_SEARCH_LIBS(pthread_create, pthread, , "Error: Required library pthreads not found." && exit 1) + if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) @@ -84,8 +83,10 @@ echo "Compilation............: make (or gmake)" echo " CPPFLAGS.............: $CPPFLAGS" echo " CFLAGS...............: $CFLAGS" echo " LDFLAGS..............: $LDFLAGS" -echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" -echo " db LDADD.............: $DB_LIBS" +echo " LDADD................: $LIBS $JANSSON_LIBS" +if test "x$ckdb" != "xno"; then + echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS" +fi echo echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo " prefix...............: $prefix" diff --git a/src/Makefile.am b/src/Makefile.am index e95666ac..53561714 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src lib_LTLIBRARIES = libckpool.la libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h -libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ +libckpool_la_LIBADD = @LIBS@ bin_PROGRAMS = ckpool ckpmsg notifier ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ @@ -23,5 +23,5 @@ if WANT_CKDB bin_PROGRAMS += ckdb ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h -ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ +ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@ endif diff --git a/src/ckpool.c b/src/ckpool.c index 15a9a5bf..a5a41108 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -510,6 +511,129 @@ void empty_buffer(connsock_t *cs) cs->buflen = cs->bufofs = 0; } +static void clear_bufline(connsock_t *cs) +{ + if (unlikely(!cs->buf)) + cs->buf = ckzalloc(PAGESIZE); + else if (cs->buflen) { + memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); + memset(cs->buf + cs->buflen, 0, cs->bufofs); + cs->bufofs = cs->buflen; + cs->buflen = 0; + cs->buf[cs->bufofs] = '\0'; + } +} + +static void add_bufline(connsock_t *cs, const char *readbuf, const int len) +{ + int backoff = 1; + size_t buflen; + char *newbuf; + + buflen = cs->bufofs + len + 1; + while (42) { + newbuf = realloc(cs->buf, buflen); + if (likely(newbuf)) + break; + if (backoff == 1) + fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); + cksleep_ms(backoff); + backoff <<= 1; + } + cs->buf = newbuf; + if (unlikely(!cs->buf)) + quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); + memcpy(cs->buf + cs->bufofs, readbuf, len); + cs->bufofs += len; + cs->buf[cs->bufofs] = '\0'; +} + +static int read_cs_length(connsock_t *cs, float *timeout, int len) +{ + int ret = len; + + while (cs->buflen < len) { + char readbuf[PAGESIZE] = {}; + + ret = wait_read_select(cs->fd, *timeout); + if (ret < 1) + goto out; + ret = recv(cs->fd, readbuf, len - cs->buflen, MSG_DONTWAIT); + if (ret < 1) + goto out; + add_bufline(cs, readbuf, ret); + } +out: + return ret; +} + +static int read_lz4_line(connsock_t *cs, float *timeout) +{ + int compsize, decompsize, ret, buflen; + char *buf, *dest = NULL, *eom; + uint32_t msglen; + + /* Remove lz4 header */ + clear_bufline(cs); + + /* Get data sizes */ + ret = read_cs_length(cs, timeout, 8); + if (ret != 8) { + ret = -1; + goto out; + } + memcpy(&msglen, cs->buf, 4); + compsize = le32toh(msglen); + memcpy(&msglen, cs->buf + 4, 4); + decompsize = le32toh(msglen); + cs->bufofs = 8; + clear_bufline(cs); + LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize); + + if (unlikely(compsize < 1 || compsize > (int)0x80000000 || + decompsize < 1 || decompsize > (int)0x80000000)) { + LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize); + ret = -1; + goto out; + } + + /* Get compressed data */ + ret = read_cs_length(cs, timeout, compsize); + if (ret != compsize) { + LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret); + ret = -1; + goto out; + } + /* Do decompresion and buffer reconstruction here */ + dest = ckalloc(decompsize); + ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize); + /* Clear out all the compressed data */ + clear_bufline(cs); + if (ret != decompsize) { + LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret); + ret = -1; + goto out; + } + eom = dest + decompsize - 1; + if (memcmp(eom, "\n", 1)) { + LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line"); + ret = -1; + goto out; + } + *eom = '\0'; + /* Wedge the decompressed buffer back to the start of cs->buf */ + buf = cs->buf; + buflen = cs->buflen; + cs->buf = dest; + dest = NULL; + ret = cs->buflen = decompsize - 1; + if (buflen) + add_bufline(cs, buf, buflen); +out: + free(dest); + return ret; +} + /* Read from a socket into cs->buf till we get an '\n', converting it to '\0' * and storing how much extra data we've received, to be moved to the beginning * of the buffer for use on the next receive. */ @@ -517,7 +641,6 @@ int read_socket_line(connsock_t *cs, float *timeout) { char *eom = NULL; tv_t start, now; - size_t buflen; int ret = -1; bool polled; float diff; @@ -525,16 +648,8 @@ int read_socket_line(connsock_t *cs, float *timeout) if (unlikely(cs->fd < 0)) goto out; - if (unlikely(!cs->buf)) - cs->buf = ckzalloc(PAGESIZE); - else if (cs->buflen) { - memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); - memset(cs->buf + cs->buflen, 0, cs->bufofs); - cs->bufofs = cs->buflen; - cs->buflen = 0; - cs->buf[cs->bufofs] = '\0'; - eom = strchr(cs->buf, '\n'); - } + clear_bufline(cs); + eom = strchr(cs->buf, '\n'); tv_time(&start); rewait: @@ -564,8 +679,6 @@ rewait: *timeout -= diff; while (42) { char readbuf[PAGESIZE] = {}; - int backoff = 1; - char *newbuf; ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { @@ -585,22 +698,7 @@ rewait: goto out; } polled = false; - buflen = cs->bufofs + ret + 1; - while (42) { - newbuf = realloc(cs->buf, buflen); - if (likely(newbuf)) - break; - if (backoff == 1) - fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); - cksleep_ms(backoff); - backoff <<= 1; - } - cs->buf = newbuf; - if (unlikely(!cs->buf)) - quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); - memcpy(cs->buf + cs->bufofs, readbuf, ret); - cs->bufofs += ret; - cs->buf[cs->bufofs] = '\0'; + add_bufline(cs, readbuf, ret); eom = strchr(cs->buf, '\n'); } parse: @@ -616,7 +714,53 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); + } else if (cs->buflen && !strncmp(cs->buf, "lz4", 3)) + ret = read_lz4_line(cs, timeout); + return ret; +} + +/* Lz4 compressed block structure: + * - 4 byte magic header LZ4\n + * - 4 byte LE encoded compressed size + * - 4 byte LE encoded decompressed size + */ +int write_cs(connsock_t *cs, const char *buf, int len) +{ + int compsize, decompsize = len; + char *dest = NULL; + uint32_t msglen; + int ret = -1; + + /* Connsock doesn't expect lz4 compressed messages */ + if (!cs->lz4) { + ret = write_socket(cs->fd, buf, len); + goto out; } + dest = ckalloc(len + 12); + /* Do compression here */ + compsize = LZ4_compress(buf, dest + 12, len); + if (!compsize) { + LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed"); + ret = write_socket(cs->fd, buf, len); + goto out; + } + LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); + /* Copy lz4 magic header */ + sprintf(dest, "lz4\n"); + /* Copy compressed message length */ + msglen = htole32(compsize); + memcpy(dest + 4, &msglen, 4); + /* Copy decompressed message length */ + msglen = htole32(decompsize); + memcpy(dest + 8, &msglen, 4); + len = compsize + 12; + ret = write_socket(cs->fd, dest, len); + if (ret == len) + ret = decompsize; + else + ret = -1; +out: + free(dest); return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 0829d36c..9f2f599a 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -82,6 +82,9 @@ struct connsock { ckpool_t *ckp; /* Semaphore used to serialise request/responses */ sem_t sem; + + /* Has the other end acknowledged it can receive lz4 compressed data */ + bool lz4; }; typedef struct connsock connsock_t; @@ -301,6 +304,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, float *timeout); +int write_cs(connsock_t *cs, const char *buf, int len); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, diff --git a/src/connector.c b/src/connector.c index f78cb9c8..f2858a59 100644 --- a/src/connector.c +++ b/src/connector.c @@ -10,6 +10,7 @@ #include "config.h" #include +#include #include #include #include @@ -63,6 +64,12 @@ struct client_instance { /* Is this the parent passthrough client */ bool passthrough; + /* Does this client expect lz4 compression? */ + bool lz4; + bool compressed; /* Currently receiving a compressed message */ + int compsize; /* Expected compressed data size */ + int decompsize; /* Expected decompressed data size */ + /* Linked list of shares in redirector mode.*/ share_t *shares; @@ -478,6 +485,28 @@ retry: return; } client->bufofs += ret; +compressed: + if (client->compressed) { + if (client->bufofs < client->compsize) + goto retry; + ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize); + if (ret != client->decompsize) { + LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d", + client->decompsize, client->compsize, ret); + invalidate_client(ckp, cdata, client); + return; + } + LOGDEBUG("Received client message compressed %d from %d", + client->compsize, client->decompsize); + msg[ret] = '\0'; + /* Flag this client as able to receive lz4 compressed data now */ + client->lz4 = true; + client->bufofs -= client->compsize; + if (client->bufofs) + memmove(client->buf, client->buf + buflen, client->bufofs); + client->compressed = false; + goto parse; + } reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) @@ -490,11 +519,40 @@ reparse: invalidate_client(ckp, cdata, client); return; } + + /* Look for a compression header */ + if (!strncmp(client->buf, "lz4", 3)) { + uint32_t msglen; + + /* Do we have the whole header? If not, keep reading */ + if (client->bufofs < 12) + goto retry; + memcpy(&msglen, client->buf + 4, 4); + client->compsize = le32toh(msglen); + memcpy(&msglen, client->buf + 8, 4); + client->decompsize = le32toh(msglen); + if (unlikely(!client->compsize || !client->decompsize || + client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting", + client->id, client->compsize, client->decompsize); + invalidate_client(ckp, cdata, client); + return; + } + client->bufofs -= 12; + if (client->bufofs > 0) + memmove(client->buf, client->buf + 12, client->bufofs); + client->compressed = true; + if (client->bufofs >= client->compsize) + goto compressed; + goto retry; + } + memcpy(msg, client->buf, buflen); msg[buflen] = '\0'; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); client->buf[client->bufofs] = '\0'; +parse: if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); @@ -971,7 +1029,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true}\n"); + ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n"); send_client(cdata, client->id, buf); } diff --git a/src/generator.c b/src/generator.c index 3680535a..e9d69ef4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -772,8 +772,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.passthrough", + "lz4", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); json_decref(req); @@ -798,6 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied passthrough for stratum"); goto out; } + json_get_bool(&cs->lz4, val, "lz4"); + if (cs->lz4) + LOGNOTICE("Negotiated lz4 compression with pool"); proxi->passthrough = true; out: if (val) @@ -814,8 +818,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.node", + "lz4", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); @@ -841,6 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied node setup for stratum"); goto out; } + json_get_bool(&cs->lz4, val, "lz4"); + if (cs->lz4) + LOGNOTICE("Negotiated lz4 compression with pool"); proxi->node = true; out: if (val) @@ -1795,7 +1803,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm) LOGDEBUG("Sending upstream json msg: %s", pm->msg); len = strlen(pm->msg); - sent = write_socket(cs->fd, pm->msg, len); + sent = write_cs(cs, pm->msg, len); if (unlikely(sent != len)) { LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect", len, pm->msg); From 4a52065a55b119dc20b106165aeb141dd1fcfa43 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Jan 2016 16:35:28 +1100 Subject: [PATCH 02/16] Only send packets compressed if they're smaller --- src/ckpool.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index a5a41108..35839db8 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -744,6 +744,12 @@ int write_cs(connsock_t *cs, const char *buf, int len) ret = write_socket(cs->fd, buf, len); goto out; } + if (compsize + 12 >= len) { + /* Selectively send compressed packets only when they're + * smaller. */ + ret = write_socket(cs->fd, buf, len); + goto out; + } LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); /* Copy lz4 magic header */ sprintf(dest, "lz4\n"); From 17cc4411afa7b2b2c3a163bf4725b56e4e04f117 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Jan 2016 17:07:19 +1100 Subject: [PATCH 03/16] Fix compilation and send lz4 compatible clients compressed data --- configure.ac | 2 +- src/connector.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index a805be4d..b99bb405 100644 --- a/configure.ac +++ b/configure.ac @@ -55,7 +55,7 @@ AC_ARG_WITH([ckdb], #AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) -AC_SEARCH_LIBS(pthread_create, pthread, , "Error: Required library pthreads not found." && exit 1) +AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev diff --git a/src/connector.c b/src/connector.c index f2858a59..e550c9f8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1000,6 +1000,37 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) test_redirector_shares(ckp, client, buf); } + /* Does this client accept compressed data? */ + if (client->lz4) { + char *dest = ckalloc(len + 12); + uint32_t msglen; + int compsize; + + compsize = LZ4_compress(buf, dest + 12, len); + if (unlikely(!compsize)) { + LOGWARNING("Failed to LZ4 compress in send_client, sending uncompressed"); + free(dest); + goto out; + } + if (compsize + 12 >= len) { + /* Only end it compressed if it's smaller */ + free(dest); + goto out; + } + LOGDEBUG("Sending client message compressed %d from %d", compsize, len); + /* Copy lz4 magic header */ + sprintf(dest, "lz4\n"); + /* Copy compressed message length */ + msglen = htole32(compsize); + memcpy(dest + 4, &msglen, 4); + /* Copy decompressed message length */ + msglen = htole32(len); + memcpy(dest + 8, &msglen, 4); + len = compsize + 12; + free(buf); + buf = dest; + } +out: sender_send = ckzalloc(sizeof(sender_send_t)); sender_send->client = client; sender_send->buf = buf; From e3fc6a1e56c84bd7dfade6802c9bb10a660695ed Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Jan 2016 17:48:21 +1100 Subject: [PATCH 04/16] Selectively compress only large packets greater than one MTU and identify lz4 compatible clients immediately --- src/ckpool.c | 8 +------- src/connector.c | 22 +++++++++++----------- src/stratifier.c | 17 +++++++++++++---- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 35839db8..25ec3331 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -732,7 +732,7 @@ int write_cs(connsock_t *cs, const char *buf, int len) int ret = -1; /* Connsock doesn't expect lz4 compressed messages */ - if (!cs->lz4) { + if (!cs->lz4 || len <= 1492) { ret = write_socket(cs->fd, buf, len); goto out; } @@ -744,12 +744,6 @@ int write_cs(connsock_t *cs, const char *buf, int len) ret = write_socket(cs->fd, buf, len); goto out; } - if (compsize + 12 >= len) { - /* Selectively send compressed packets only when they're - * smaller. */ - ret = write_socket(cs->fd, buf, len); - goto out; - } LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); /* Copy lz4 magic header */ sprintf(dest, "lz4\n"); diff --git a/src/connector.c b/src/connector.c index e550c9f8..1faa82ba 100644 --- a/src/connector.c +++ b/src/connector.c @@ -499,8 +499,6 @@ compressed: LOGDEBUG("Received client message compressed %d from %d", client->compsize, client->decompsize); msg[ret] = '\0'; - /* Flag this client as able to receive lz4 compressed data now */ - client->lz4 = true; client->bufofs -= client->compsize; if (client->bufofs) memmove(client->buf, client->buf + buflen, client->bufofs); @@ -1000,8 +998,9 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) test_redirector_shares(ckp, client, buf); } - /* Does this client accept compressed data? */ - if (client->lz4) { + /* Does this client accept compressed data? Only compress if it's + * larger than one MTU. */ + if (client->lz4 && len > 1492) { char *dest = ckalloc(len + 12); uint32_t msglen; int compsize; @@ -1012,11 +1011,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) free(dest); goto out; } - if (compsize + 12 >= len) { - /* Only end it compressed if it's smaller */ - free(dest); - goto out; - } LOGDEBUG("Sending client message compressed %d from %d", compsize, len); /* Copy lz4 magic header */ sprintf(dest, "lz4\n"); @@ -1237,10 +1231,15 @@ retry: sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { goto out; - } else if (cmdmatch(buf, "passthrough")) { + } else if (cmdmatch(buf, "pass")) { client_instance_t *client; + bool lz4 = false; - ret = sscanf(buf, "passthrough=%"PRId64, &client_id); + if (strstr(buf, "lz4")) { + lz4 = true; + ret = sscanf(buf, "passlz4=%"PRId64, &client_id); + } else + ret = sscanf(buf, "passthrough=%"PRId64, &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse passthrough command: %s", buf); goto retry; @@ -1250,6 +1249,7 @@ retry: LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); goto retry; } + client->lz4 = lz4; passthrough_client(cdata, client); dec_instance_ref(cdata, client); } else if (cmdmatch(buf, "getxfd")) { diff --git a/src/stratifier.c b/src/stratifier.c index e87fc7e4..bc618dfb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5186,10 +5186,11 @@ static void add_mining_node(sdata_t *sdata, stratum_instance_t *client) /* Enter with client holding ref count */ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, - const int64_t client_id, json_t *id_val, json_t *method_val, + const int64_t client_id, json_t *val, json_t *id_val, json_t *method_val, json_t *params_val) { const char *method; + bool var; /* Random broken clients send something not an integer as the id so we * copy the json item for id_val as is for the response. By far the @@ -5237,8 +5238,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ + json_get_bool(&var, val, "lz4"); add_mining_node(sdata, client); - snprintf(buf, 255, "passthrough=%"PRId64, client_id); + if (var) + snprintf(buf, 255, "passlz4=%"PRId64, client_id); + else + snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); return; } @@ -5250,8 +5255,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie * is a passthrough and to manage its messages accordingly. No * data from this client id should ever come back to this * stratifier after this so drop the client in the stratifier. */ + json_get_bool(&var, val, "lz4"); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); - snprintf(buf, 255, "passthrough=%"PRId64, client_id); + if (var) + snprintf(buf, 255, "passlz4=%"PRId64, client_id); + else + snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); drop_client(ckp, sdata, client_id); return; @@ -5469,7 +5478,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat send_json_err(sdata, client_id, id_val, "-1:params not found"); goto out; } - parse_method(ckp, sdata, client, client_id, id_val, method, params); + parse_method(ckp, sdata, client, client_id, val, id_val, method, params); out: free_smsg(msg); } From 11d14620e6818cbb78d6408ee9f8f4336a942e69 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 00:16:10 +1100 Subject: [PATCH 05/16] Convert to gzip --- configure.ac | 4 +-- src/ckpool.c | 61 +++++++++++++++++++++++-------------------- src/ckpool.h | 6 +++-- src/connector.c | 67 ++++++++++++++++++++++++++---------------------- src/generator.c | 16 ++++++------ src/libckpool.c | 12 +++++++++ src/libckpool.h | 2 ++ src/stratifier.c | 8 +++--- 8 files changed, 102 insertions(+), 74 deletions(-) diff --git a/configure.ac b/configure.ac index b99bb405..ff33c36e 100644 --- a/configure.ac +++ b/configure.ac @@ -40,7 +40,7 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) -AC_CHECK_HEADERS(lz4.h) +AC_CHECK_HEADERS(zlib.h) AC_CONFIG_SUBDIRS([src/jansson-2.6]) JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" @@ -54,7 +54,7 @@ AC_ARG_WITH([ckdb], #AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) -AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) +AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1) AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) if test "x$ckdb" != "xno"; then diff --git a/src/ckpool.c b/src/ckpool.c index 25ec3331..e7b698ea 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include @@ -36,6 +36,8 @@ ckpool_t *global_ckp; +const char gzip_magic[] = "\x1f\xd5\x01\n"; + static void proclog(ckpool_t *ckp, char *msg) { FILE *LOGFP; @@ -567,13 +569,14 @@ out: return ret; } -static int read_lz4_line(connsock_t *cs, float *timeout) +static int read_gz_line(connsock_t *cs, float *timeout) { - int compsize, decompsize, ret, buflen; + unsigned long compsize, res, decompsize; char *buf, *dest = NULL, *eom; + int ret, buflen; uint32_t msglen; - /* Remove lz4 header */ + /* Remove gz header */ clear_bufline(cs); /* Get data sizes */ @@ -588,35 +591,35 @@ static int read_lz4_line(connsock_t *cs, float *timeout) decompsize = le32toh(msglen); cs->bufofs = 8; clear_bufline(cs); - LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize); - if (unlikely(compsize < 1 || compsize > (int)0x80000000 || - decompsize < 1 || decompsize > (int)0x80000000)) { - LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize); + if (unlikely(compsize < 1 || compsize > 0x80000000 || + decompsize < 1 || decompsize > 0x80000000)) { + LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize); ret = -1; goto out; } /* Get compressed data */ ret = read_cs_length(cs, timeout, compsize); - if (ret != compsize) { - LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret); + if (ret != (int)compsize) { + LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret); ret = -1; goto out; } /* Do decompresion and buffer reconstruction here */ dest = ckalloc(decompsize); - ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize); + res = decompsize; + ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); /* Clear out all the compressed data */ clear_bufline(cs); - if (ret != decompsize) { - LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret); + if (ret != Z_OK || res != decompsize) { + LOGWARNING("Failed to decompress %lu bytes in read_gz_line, got %d", decompsize, ret); ret = -1; goto out; } eom = dest + decompsize - 1; if (memcmp(eom, "\n", 1)) { - LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line"); + LOGWARNING("Failed to find EOM in decompressed data in read_gz_line"); ret = -1; goto out; } @@ -714,39 +717,41 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - } else if (cs->buflen && !strncmp(cs->buf, "lz4", 3)) - ret = read_lz4_line(cs, timeout); + } else if (cs->buflen && !strncmp(cs->buf, gzip_magic, 3)) + ret = read_gz_line(cs, timeout); return ret; } -/* Lz4 compressed block structure: - * - 4 byte magic header LZ4\n +/* gzip compressed block structure: + * - 4 byte magic header gzip_magic "\x1f\xd5\x01\n" * - 4 byte LE encoded compressed size * - 4 byte LE encoded decompressed size */ int write_cs(connsock_t *cs, const char *buf, int len) { - int compsize, decompsize = len; + unsigned long compsize, decompsize = len; char *dest = NULL; uint32_t msglen; int ret = -1; - /* Connsock doesn't expect lz4 compressed messages */ - if (!cs->lz4 || len <= 1492) { + /* Connsock doesn't expect gz compressed messages */ + if (!cs->gz || len <= 1492) { ret = write_socket(cs->fd, buf, len); goto out; } - dest = ckalloc(len + 12); + compsize = round_up_page(len + 12); + dest = ckalloc(compsize); + compsize -= 12; /* Do compression here */ - compsize = LZ4_compress(buf, dest + 12, len); - if (!compsize) { - LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed"); + ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + if (ret != Z_OK) { + LOGWARNING("Failed to gz compress in write_cs, writing uncompressed"); ret = write_socket(cs->fd, buf, len); goto out; } - LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); - /* Copy lz4 magic header */ - sprintf(dest, "lz4\n"); + LOGDEBUG("Writing connsock message compressed %lu from %lu", compsize, decompsize); + /* Copy gz magic header */ + sprintf(dest, gzip_magic); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); diff --git a/src/ckpool.h b/src/ckpool.h index 9f2f599a..52017027 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -83,8 +83,8 @@ struct connsock { /* Semaphore used to serialise request/responses */ sem_t sem; - /* Has the other end acknowledged it can receive lz4 compressed data */ - bool lz4; + /* Has the other end acknowledged it can receive gz compressed data */ + bool gz; }; typedef struct connsock connsock_t; @@ -284,6 +284,8 @@ static const char __maybe_unused *stratum_msgs[] = { "" }; +extern const char gzip_magic[]; + #ifdef USE_CKDB #define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #else diff --git a/src/connector.c b/src/connector.c index 1faa82ba..cb1d240d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -10,12 +10,12 @@ #include "config.h" #include -#include #include #include #include #include #include +#include #include "ckpool.h" #include "libckpool.h" @@ -56,7 +56,7 @@ struct client_instance { int server; char buf[PAGESIZE]; - int bufofs; + unsigned long bufofs; /* Are we currently sending a blocked message from this client */ sender_send_t *sending; @@ -64,11 +64,11 @@ struct client_instance { /* Is this the parent passthrough client */ bool passthrough; - /* Does this client expect lz4 compression? */ - bool lz4; + /* Does this client expect gz compression? */ + bool gz; bool compressed; /* Currently receiving a compressed message */ - int compsize; /* Expected compressed data size */ - int decompsize; /* Expected decompressed data size */ + unsigned long compsize; /* Expected compressed data size */ + unsigned long decompsize; /* Expected decompressed data size */ /* Linked list of shares in redirector mode.*/ share_t *shares; @@ -479,7 +479,7 @@ retry: if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; @@ -487,18 +487,21 @@ retry: client->bufofs += ret; compressed: if (client->compressed) { + unsigned long res; + if (client->bufofs < client->compsize) goto retry; - ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize); - if (ret != client->decompsize) { - LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d", + res = client->decompsize; + ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize); + if (ret != Z_OK || res != client->decompsize) { + LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d", client->decompsize, client->compsize, ret); invalidate_client(ckp, cdata, client); return; } - LOGDEBUG("Received client message compressed %d from %d", + LOGDEBUG("Received client message compressed %lu from %lu", client->compsize, client->decompsize); - msg[ret] = '\0'; + msg[res] = '\0'; client->bufofs -= client->compsize; if (client->bufofs) memmove(client->buf, client->buf + buflen, client->bufofs); @@ -519,7 +522,7 @@ reparse: } /* Look for a compression header */ - if (!strncmp(client->buf, "lz4", 3)) { + if (!strncmp(client->buf, gzip_magic, 3)) { uint32_t msglen; /* Do we have the whole header? If not, keep reading */ @@ -531,7 +534,7 @@ reparse: client->decompsize = le32toh(msglen); if (unlikely(!client->compsize || !client->decompsize || client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { - LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting", + LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting", client->id, client->compsize, client->decompsize); invalidate_client(ckp, cdata, client); return; @@ -1000,20 +1003,24 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->lz4 && len > 1492) { - char *dest = ckalloc(len + 12); + if (client->gz && len > 1492) { + unsigned long compsize; uint32_t msglen; - int compsize; - - compsize = LZ4_compress(buf, dest + 12, len); - if (unlikely(!compsize)) { - LOGWARNING("Failed to LZ4 compress in send_client, sending uncompressed"); + char *dest; + int ret; + + compsize = round_up_page(len + 12); + dest = ckalloc(compsize); + compsize -= 12; + ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + if (unlikely(ret != Z_OK)) { + LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret); free(dest); goto out; } - LOGDEBUG("Sending client message compressed %d from %d", compsize, len); - /* Copy lz4 magic header */ - sprintf(dest, "lz4\n"); + LOGDEBUG("Sending client message compressed %lu from %d", compsize, len); + /* Copy gz magic header */ + sprintf(dest, gzip_magic); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); @@ -1054,7 +1061,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n"); + ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n"); send_client(cdata, client->id, buf); } @@ -1233,11 +1240,11 @@ retry: goto out; } else if (cmdmatch(buf, "pass")) { client_instance_t *client; - bool lz4 = false; + bool gz = false; - if (strstr(buf, "lz4")) { - lz4 = true; - ret = sscanf(buf, "passlz4=%"PRId64, &client_id); + if (strstr(buf, "gz")) { + gz = true; + ret = sscanf(buf, "passgz=%"PRId64, &client_id); } else ret = sscanf(buf, "passthrough=%"PRId64, &client_id); if (ret < 0) { @@ -1249,7 +1256,7 @@ retry: LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); goto retry; } - client->lz4 = lz4; + client->gz = gz; passthrough_client(cdata, client); dec_instance_ref(cdata, client); } else if (cmdmatch(buf, "getxfd")) { diff --git a/src/generator.c b/src/generator.c index e9d69ef4..f40bbfb2 100644 --- a/src/generator.c +++ b/src/generator.c @@ -774,7 +774,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.passthrough", - "lz4", json_true(), + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); json_decref(req); @@ -799,9 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied passthrough for stratum"); goto out; } - json_get_bool(&cs->lz4, val, "lz4"); - if (cs->lz4) - LOGNOTICE("Negotiated lz4 compression with pool"); + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->passthrough = true; out: if (val) @@ -820,7 +820,7 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.node", - "lz4", json_true(), + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); @@ -846,9 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied node setup for stratum"); goto out; } - json_get_bool(&cs->lz4, val, "lz4"); - if (cs->lz4) - LOGNOTICE("Negotiated lz4 compression with pool"); + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->node = true; out: if (val) diff --git a/src/libckpool.c b/src/libckpool.c index a0557a71..2e78d330 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } +/* Round up to the nearest page size for efficient malloc */ +size_t round_up_page(size_t len) +{ + int rem = len % PAGESIZE; + + if (rem) + len += PAGESIZE - rem; + return len; +} + + + /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 9d8ba341..10c18248 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -531,6 +531,8 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); +size_t round_up_page(size_t len); + extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); diff --git a/src/stratifier.c b/src/stratifier.c index bc618dfb..55bdf65c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5238,10 +5238,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ - json_get_bool(&var, val, "lz4"); + json_get_bool(&var, val, "gz"); add_mining_node(sdata, client); if (var) - snprintf(buf, 255, "passlz4=%"PRId64, client_id); + snprintf(buf, 255, "passgz=%"PRId64, client_id); else snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); @@ -5255,10 +5255,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie * is a passthrough and to manage its messages accordingly. No * data from this client id should ever come back to this * stratifier after this so drop the client in the stratifier. */ - json_get_bool(&var, val, "lz4"); + json_get_bool(&var, val, "gz"); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); if (var) - snprintf(buf, 255, "passlz4=%"PRId64, client_id); + snprintf(buf, 255, "passgz=%"PRId64, client_id); else snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); From c9f0858c560889fc5fb6a61deffcd963a7732343 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 00:45:19 +1100 Subject: [PATCH 06/16] Fixes? --- src/ckpool.c | 7 +++---- src/connector.c | 16 +++++++--------- src/libckpool.c | 12 ------------ src/libckpool.h | 2 -- 4 files changed, 10 insertions(+), 27 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index e7b698ea..2f49936a 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -739,13 +739,12 @@ int write_cs(connsock_t *cs, const char *buf, int len) ret = write_socket(cs->fd, buf, len); goto out; } - compsize = round_up_page(len + 12); - dest = ckalloc(compsize); - compsize -= 12; + dest = ckalloc(len + 12); /* Do compression here */ + compsize = len; ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); if (ret != Z_OK) { - LOGWARNING("Failed to gz compress in write_cs, writing uncompressed"); + LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); ret = write_socket(cs->fd, buf, len); goto out; } diff --git a/src/connector.c b/src/connector.c index cb1d240d..dd173478 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1003,29 +1003,27 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->gz && len > 1492) { - unsigned long compsize; + if (client->gz) { + unsigned long compsize, decompsize = len; + char *dest = ckalloc(len + 12); uint32_t msglen; - char *dest; int ret; - compsize = round_up_page(len + 12); - dest = ckalloc(compsize); - compsize -= 12; + compsize = len; ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); if (unlikely(ret != Z_OK)) { - LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret); + LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret); free(dest); goto out; } - LOGDEBUG("Sending client message compressed %lu from %d", compsize, len); + LOGDEBUG("Sending client message compressed %lu from %lu", compsize, decompsize); /* Copy gz magic header */ sprintf(dest, gzip_magic); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); /* Copy decompressed message length */ - msglen = htole32(len); + msglen = htole32(decompsize); memcpy(dest + 8, &msglen, 4); len = compsize + 12; free(buf); diff --git a/src/libckpool.c b/src/libckpool.c index 2e78d330..a0557a71 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1389,18 +1389,6 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } -/* Round up to the nearest page size for efficient malloc */ -size_t round_up_page(size_t len) -{ - int rem = len % PAGESIZE; - - if (rem) - len += PAGESIZE - rem; - return len; -} - - - /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 10c18248..9d8ba341 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -531,8 +531,6 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); -size_t round_up_page(size_t len); - extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); From d69ed7ffcf5aca6921aaba10734aeff8839c3260 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 08:18:30 +1100 Subject: [PATCH 07/16] More fixes --- src/ckpool.c | 24 ++++++++++-------------- src/connector.c | 8 +++++--- src/libckpool.c | 12 ++++++++++++ src/libckpool.h | 2 ++ 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 2f49936a..6cfbd9f9 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -555,7 +555,7 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len) int ret = len; while (cs->buflen < len) { - char readbuf[PAGESIZE] = {}; + char readbuf[PAGESIZE]; ret = wait_read_select(cs->fd, *timeout); if (ret < 1) @@ -629,7 +629,7 @@ static int read_gz_line(connsock_t *cs, float *timeout) buflen = cs->buflen; cs->buf = dest; dest = NULL; - ret = cs->buflen = decompsize - 1; + ret = cs->buflen = decompsize; if (buflen) add_bufline(cs, buf, buflen); out: @@ -732,23 +732,20 @@ int write_cs(connsock_t *cs, const char *buf, int len) unsigned long compsize, decompsize = len; char *dest = NULL; uint32_t msglen; - int ret = -1; + int ret; /* Connsock doesn't expect gz compressed messages */ - if (!cs->gz || len <= 1492) { - ret = write_socket(cs->fd, buf, len); - goto out; - } - dest = ckalloc(len + 12); + if (!cs->gz || len <= 1492) + return write_socket(cs->fd, buf, len); + compsize = round_up_page(len + 12); + dest = alloca(compsize); /* Do compression here */ - compsize = len; + compsize -= 12; ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); if (ret != Z_OK) { LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); - ret = write_socket(cs->fd, buf, len); - goto out; + return write_socket(cs->fd, buf, len); } - LOGDEBUG("Writing connsock message compressed %lu from %lu", compsize, decompsize); /* Copy gz magic header */ sprintf(dest, gzip_magic); /* Copy compressed message length */ @@ -758,13 +755,12 @@ int write_cs(connsock_t *cs, const char *buf, int len) msglen = htole32(decompsize); memcpy(dest + 8, &msglen, 4); len = compsize + 12; + LOGDEBUG("Writing connsock message compressed %d from %lu", len, decompsize); ret = write_socket(cs->fd, dest, len); if (ret == len) ret = decompsize; else ret = -1; -out: - free(dest); return ret; } diff --git a/src/connector.c b/src/connector.c index dd173478..0f7633fb 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1005,18 +1005,19 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) * larger than one MTU. */ if (client->gz) { unsigned long compsize, decompsize = len; - char *dest = ckalloc(len + 12); uint32_t msglen; + char *dest; int ret; - compsize = len; + compsize = round_up_page(len + 12); + dest = ckalloc(compsize); + compsize -= 12; ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); if (unlikely(ret != Z_OK)) { LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret); free(dest); goto out; } - LOGDEBUG("Sending client message compressed %lu from %lu", compsize, decompsize); /* Copy gz magic header */ sprintf(dest, gzip_magic); /* Copy compressed message length */ @@ -1026,6 +1027,7 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) msglen = htole32(decompsize); memcpy(dest + 8, &msglen, 4); len = compsize + 12; + LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize); free(buf); buf = dest; } diff --git a/src/libckpool.c b/src/libckpool.c index a0557a71..2e78d330 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } +/* Round up to the nearest page size for efficient malloc */ +size_t round_up_page(size_t len) +{ + int rem = len % PAGESIZE; + + if (rem) + len += PAGESIZE - rem; + return len; +} + + + /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 9d8ba341..10c18248 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -531,6 +531,8 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); +size_t round_up_page(size_t len); + extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); From 4ef8ab49e334d86b1795d3ae4f1168d75de86b5e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 11:02:17 +1100 Subject: [PATCH 08/16] Fix reinsertion logic --- src/ckpool.c | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 6cfbd9f9..c54b75e7 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -523,7 +523,8 @@ static void clear_bufline(connsock_t *cs) cs->bufofs = cs->buflen; cs->buflen = 0; cs->buf[cs->bufofs] = '\0'; - } + } else + cs->bufofs = 0; } static void add_bufline(connsock_t *cs, const char *readbuf, const int len) @@ -554,7 +555,7 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len) { int ret = len; - while (cs->buflen < len) { + while (cs->bufofs < len) { char readbuf[PAGESIZE]; ret = wait_read_select(cs->fd, *timeout); @@ -585,10 +586,14 @@ static int read_gz_line(connsock_t *cs, float *timeout) ret = -1; goto out; } + memcpy(&msglen, cs->buf, 4); compsize = le32toh(msglen); memcpy(&msglen, cs->buf + 4, 4); decompsize = le32toh(msglen); + + /* Remove the gz variables */ + cs->buflen = cs->bufofs - 8; cs->bufofs = 8; clear_bufline(cs); @@ -606,32 +611,44 @@ static int read_gz_line(connsock_t *cs, float *timeout) ret = -1; goto out; } + /* Clear out all the compressed data */ + cs->buflen = cs->bufofs - compsize; + cs->bufofs = compsize; + clear_bufline(cs); + /* Do decompresion and buffer reconstruction here */ dest = ckalloc(decompsize); res = decompsize; ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); - /* Clear out all the compressed data */ - clear_bufline(cs); + if (ret != Z_OK || res != decompsize) { LOGWARNING("Failed to decompress %lu bytes in read_gz_line, got %d", decompsize, ret); ret = -1; goto out; } + eom = dest + decompsize - 1; if (memcmp(eom, "\n", 1)) { LOGWARNING("Failed to find EOM in decompressed data in read_gz_line"); ret = -1; goto out; } + *eom = '\0'; + ret = decompsize - 1; /* Wedge the decompressed buffer back to the start of cs->buf */ buf = cs->buf; - buflen = cs->buflen; + buflen = cs->bufofs; cs->buf = dest; dest = NULL; - ret = cs->buflen = decompsize; - if (buflen) + cs->bufofs = decompsize; + if (buflen) { + LOGWARNING("Remainder %s", buf); add_bufline(cs, buf, buflen); + cs->buflen = buflen; + cs->bufofs = decompsize; + } else + cs->buflen = cs->bufofs = 0; out: free(dest); return ret; From ea945e863c0e538ed3bc66f6fde522e6327b0274 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 11:03:04 +1100 Subject: [PATCH 09/16] Reinstate mtu size for compression to clients --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 0f7633fb..2a9500cf 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1003,7 +1003,7 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->gz) { + if (client->gz && len > 1492) { unsigned long compsize, decompsize = len; uint32_t msglen; char *dest; From 5146b715fcbbd445326742b27fe00b9677822837 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 11:05:01 +1100 Subject: [PATCH 10/16] Empty buffer on failure --- src/ckpool.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index c54b75e7..2dd7cd3e 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -651,6 +651,8 @@ static int read_gz_line(connsock_t *cs, float *timeout) cs->buflen = cs->bufofs = 0; out: free(dest); + if (ret < 1) + empty_buffer(cs); return ret; } From 5fb2ea342e8c966f360000c06e791f2c6aeea9c3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 11:48:06 +1100 Subject: [PATCH 11/16] Various fixes --- src/ckpool.c | 14 ++++++++++---- src/connector.c | 9 +++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 2dd7cd3e..5fcc7c9b 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -510,6 +510,8 @@ bool ping_main(ckpool_t *ckp) void empty_buffer(connsock_t *cs) { + if (cs->buf) + cs->buf[0] = '\0'; cs->buflen = cs->bufofs = 0; } @@ -557,11 +559,15 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len) while (cs->bufofs < len) { char readbuf[PAGESIZE]; + int readlen; ret = wait_read_select(cs->fd, *timeout); if (ret < 1) goto out; - ret = recv(cs->fd, readbuf, len - cs->buflen, MSG_DONTWAIT); + readlen = len - cs->buflen; + if (readlen >= PAGESIZE) + readlen = PAGESIZE - 4; + ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); if (ret < 1) goto out; add_bufline(cs, readbuf, ret); @@ -617,8 +623,8 @@ static int read_gz_line(connsock_t *cs, float *timeout) clear_bufline(cs); /* Do decompresion and buffer reconstruction here */ - dest = ckalloc(decompsize); - res = decompsize; + res = round_up_page(decompsize); + dest = ckalloc(res); ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); if (ret != Z_OK || res != decompsize) { @@ -736,7 +742,7 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - } else if (cs->buflen && !strncmp(cs->buf, gzip_magic, 3)) + } else if (ret == 3 && !strncmp(cs->buf, gzip_magic, 3)) ret = read_gz_line(cs, timeout); return ret; } diff --git a/src/connector.c b/src/connector.c index 2a9500cf..5593476b 100644 --- a/src/connector.c +++ b/src/connector.c @@ -491,7 +491,12 @@ compressed: if (client->bufofs < client->compsize) goto retry; - res = client->decompsize; + res = PAGESIZE - 4; + if (unlikely(client->decompsize > res)) { + LOGNOTICE("Client attempting to send oversize compressed message, disconnecting"); + invalidate_client(ckp, cdata, client); + return; + } ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize); if (ret != Z_OK || res != client->decompsize) { LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d", @@ -1003,7 +1008,7 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->gz && len > 1492) { + if (client->gz) { unsigned long compsize, decompsize = len; uint32_t msglen; char *dest; From 7e4fee659e1020e3a6fe8a62c1b7cde838a67a72 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 14:46:03 +1100 Subject: [PATCH 12/16] Minor fixes --- src/ckpool.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 5fcc7c9b..442ac412 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -535,7 +535,7 @@ static void add_bufline(connsock_t *cs, const char *readbuf, const int len) size_t buflen; char *newbuf; - buflen = cs->bufofs + len + 1; + buflen = round_up_page(cs->bufofs + len + 1); while (42) { newbuf = realloc(cs->buf, buflen); if (likely(newbuf)) @@ -564,7 +564,7 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len) ret = wait_read_select(cs->fd, *timeout); if (ret < 1) goto out; - readlen = len - cs->buflen; + readlen = len - cs->bufofs; if (readlen >= PAGESIZE) readlen = PAGESIZE - 4; ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); @@ -617,6 +617,7 @@ static int read_gz_line(connsock_t *cs, float *timeout) ret = -1; goto out; } + /* Clear out all the compressed data */ cs->buflen = cs->bufofs - compsize; cs->bufofs = compsize; @@ -626,9 +627,9 @@ static int read_gz_line(connsock_t *cs, float *timeout) res = round_up_page(decompsize); dest = ckalloc(res); ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); - if (ret != Z_OK || res != decompsize) { - LOGWARNING("Failed to decompress %lu bytes in read_gz_line, got %d", decompsize, ret); + LOGWARNING("Failed to decompress %lu bytes in read_gz_line, result %d got %lu", + decompsize, ret, res); ret = -1; goto out; } @@ -649,7 +650,6 @@ static int read_gz_line(connsock_t *cs, float *timeout) dest = NULL; cs->bufofs = decompsize; if (buflen) { - LOGWARNING("Remainder %s", buf); add_bufline(cs, buf, buflen); cs->buflen = buflen; cs->bufofs = decompsize; From 1912613e913aba4f4102072ddb0935eac3fc36b6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 2 Jan 2016 15:00:57 +1100 Subject: [PATCH 13/16] Hold semaphore till we've sent the buffer --- src/generator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/generator.c b/src/generator.c index f40bbfb2..353dc641 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2022,20 +2022,20 @@ static void *passthrough_recv(void *arg) /* Make sure we receive a line within 90 seconds */ cksem_wait(&cs->sem); ret = read_socket_line(cs, &timeout); - cksem_post(&cs->sem); - if (ret < 1) { - reconnect_generator(ckp); LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", proxi->id, proxi->url); alive = proxi->alive = false; Close(cs->fd); + reconnect_generator(ckp); + cksem_post(&cs->sem); continue; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ send_proc(ckp->connector, cs->buf); + cksem_post(&cs->sem); } return NULL; } From ff5e74fbd1fde42f7e80fc6782f5804f2367b7a4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 3 Jan 2016 09:54:59 +1100 Subject: [PATCH 14/16] Rework read_socket_line loop to not bother reading more once it has a message --- src/ckpool.c | 53 ++++++++++++++++++++-------------------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 442ac412..58cb831a 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -670,7 +670,6 @@ int read_socket_line(connsock_t *cs, float *timeout) char *eom = NULL; tv_t start, now; int ret = -1; - bool polled; float diff; if (unlikely(cs->fd < 0)) @@ -680,56 +679,44 @@ int read_socket_line(connsock_t *cs, float *timeout) eom = strchr(cs->buf, '\n'); tv_time(&start); -rewait: - if (*timeout < 0) { - LOGDEBUG("Timed out in read_socket_line"); - ret = 0; - goto out; - } - ret = wait_read_select(cs->fd, eom ? 0 : *timeout); - polled = true; - if (ret < 1) { - if (!ret) { - if (eom) - goto parse; - LOGDEBUG("Select timed out in read_socket_line"); - } else { + + while (!eom) { + char readbuf[PAGESIZE]; + + if (*timeout < 0) { + if (cs->ckp->proxy) + LOGINFO("Timed out in read_socket_line"); + else + LOGERR("Timed out in read_socket_line"); + ret = 0; + goto out; + } + ret = wait_read_select(cs->fd, *timeout); + if (ret < 0) { if (cs->ckp->proxy) LOGINFO("Select failed in read_socket_line"); else LOGERR("Select failed in read_socket_line"); + goto out; } - goto out; - } - tv_time(&now); - diff = tvdiff(&now, &start); - copy_tv(&start, &now); - *timeout -= diff; - while (42) { - char readbuf[PAGESIZE] = {}; - ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { - /* No more to read or closed socket after valid message */ - if (eom) - break; - /* Have we used up all the timeout yet? If polled is - * set that means poll has said there should be + /* If we have done wait_read_select there should be * something to read and if we get nothing it means the * socket is closed. */ - if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) - goto rewait; if (cs->ckp->proxy) LOGINFO("Failed to recv in read_socket_line"); else LOGERR("Failed to recv in read_socket_line"); goto out; } - polled = false; add_bufline(cs, readbuf, ret); eom = strchr(cs->buf, '\n'); + tv_time(&now); + diff = tvdiff(&now, &start); + copy_tv(&start, &now); + *timeout -= diff; } -parse: ret = eom - cs->buf; cs->buflen = cs->buf + cs->bufofs - eom - 1; From 85302af03f6dd6ee3174f56497f1c5fea7194a7c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 3 Jan 2016 10:49:05 +1100 Subject: [PATCH 15/16] Decrement timeout in read_cs_length --- src/ckpool.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 58cb831a..f2c6054a 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -555,24 +555,36 @@ static void add_bufline(connsock_t *cs, const char *readbuf, const int len) static int read_cs_length(connsock_t *cs, float *timeout, int len) { + tv_t start, now; int ret = len; + float diff; + + tv_time(&start); while (cs->bufofs < len) { char readbuf[PAGESIZE]; int readlen; + if (*timeout < 0) { + LOGDEBUG("Timed out in read_cs_length"); + ret = 0; + break; + } ret = wait_read_select(cs->fd, *timeout); if (ret < 1) - goto out; + break; readlen = len - cs->bufofs; if (readlen >= PAGESIZE) readlen = PAGESIZE - 4; ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); if (ret < 1) - goto out; + break; add_bufline(cs, readbuf, ret); + tv_time(&now); + diff = tvdiff(&now, &start); + copy_tv(&start, &now); + *timeout -= diff; } -out: return ret; } From effeba49991c89cbcfa72f1f1c26a16bd214f12f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 3 Jan 2016 18:19:19 +1100 Subject: [PATCH 16/16] Fix logic fails --- src/ckpool.c | 19 ++++++++++++------- src/connector.c | 25 ++++++++++++------------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index f2c6054a..00f9a908 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -556,8 +556,8 @@ static void add_bufline(connsock_t *cs, const char *readbuf, const int len) static int read_cs_length(connsock_t *cs, float *timeout, int len) { tv_t start, now; - int ret = len; float diff; + int ret; tv_time(&start); @@ -568,23 +568,25 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len) if (*timeout < 0) { LOGDEBUG("Timed out in read_cs_length"); ret = 0; - break; + goto out; } ret = wait_read_select(cs->fd, *timeout); if (ret < 1) - break; + goto out; readlen = len - cs->bufofs; if (readlen >= PAGESIZE) readlen = PAGESIZE - 4; ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); if (ret < 1) - break; + goto out; add_bufline(cs, readbuf, ret); tv_time(&now); diff = tvdiff(&now, &start); copy_tv(&start, &now); *timeout -= diff; } + ret = len; +out: return ret; } @@ -741,7 +743,7 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - } else if (ret == 3 && !strncmp(cs->buf, gzip_magic, 3)) + } else if (ret == 3 && !memcmp(cs->buf, gzip_magic, 3)) ret = read_gz_line(cs, timeout); return ret; } @@ -758,7 +760,8 @@ int write_cs(connsock_t *cs, const char *buf, int len) uint32_t msglen; int ret; - /* Connsock doesn't expect gz compressed messages */ + /* Connsock doesn't expect gz compressed messages. Only compress if it's + * larger than one MTU. */ if (!cs->gz || len <= 1492) return write_socket(cs->fd, buf, len); compsize = round_up_page(len + 12); @@ -770,8 +773,10 @@ int write_cs(connsock_t *cs, const char *buf, int len) LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); return write_socket(cs->fd, buf, len); } + if (unlikely(compsize + 12 >= decompsize)) + return write_socket(cs->fd, buf, len); /* Copy gz magic header */ - sprintf(dest, gzip_magic); + memcpy(dest, gzip_magic, 4); /* Copy compressed message length */ msglen = htole32(compsize); memcpy(dest + 4, &msglen, 4); diff --git a/src/connector.c b/src/connector.c index 5593476b..2f373b18 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1008,33 +1008,32 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) /* Does this client accept compressed data? Only compress if it's * larger than one MTU. */ - if (client->gz) { + if (client->gz && len > 1492) { unsigned long compsize, decompsize = len; uint32_t msglen; - char *dest; + Bytef *dest; int ret; - compsize = round_up_page(len + 12); - dest = ckalloc(compsize); - compsize -= 12; - ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + compsize = round_up_page(len); + dest = alloca(compsize); + ret = compress(dest, &compsize, (Bytef *)buf, len); if (unlikely(ret != Z_OK)) { - LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret); - free(dest); + LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret); goto out; } + if (unlikely(compsize + 12 >= decompsize)) + goto out; /* Copy gz magic header */ - sprintf(dest, gzip_magic); + memcpy(buf, gzip_magic, 4); /* Copy compressed message length */ msglen = htole32(compsize); - memcpy(dest + 4, &msglen, 4); + memcpy(buf + 4, &msglen, 4); /* Copy decompressed message length */ msglen = htole32(decompsize); - memcpy(dest + 8, &msglen, 4); + memcpy(buf + 8, &msglen, 4); + memcpy(buf + 12, dest, compsize); len = compsize + 12; LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize); - free(buf); - buf = dest; } out: sender_send = ckzalloc(sizeof(sender_send_t));