diff --git a/configure.ac b/configure.ac index 6097d1a4..ff33c36e 100644 --- a/configure.ac +++ b/configure.ac @@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) - -PTHREAD_LIBS="-lpthread" -MATH_LIBS="-lm" -RT_LIBS="-lrt" +AC_CHECK_HEADERS(zlib.h) AC_CONFIG_SUBDIRS([src/jansson-2.6]) JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" -AC_SUBST(PTHREAD_LIBS) -AC_SUBST(MATH_LIBS) -AC_SUBST(RT_LIBS) AC_SUBST(JANSSON_LIBS) AC_ARG_WITH([ckdb], @@ -58,6 +52,11 @@ AC_ARG_WITH([ckdb], [ckdb=$withval] ) +#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) +AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) +AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1) +AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) + if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) @@ -84,8 +83,10 @@ echo "Compilation............: make (or gmake)" echo " CPPFLAGS.............: $CPPFLAGS" echo " CFLAGS...............: $CFLAGS" echo " LDFLAGS..............: $LDFLAGS" -echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" -echo " db LDADD.............: $DB_LIBS" +echo " LDADD................: $LIBS $JANSSON_LIBS" +if test "x$ckdb" != "xno"; then + echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS" +fi echo echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo " prefix...............: $prefix" diff --git a/src/Makefile.am b/src/Makefile.am index e95666ac..53561714 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src lib_LTLIBRARIES = libckpool.la libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h -libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ +libckpool_la_LIBADD = @LIBS@ bin_PROGRAMS = ckpool ckpmsg notifier ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ @@ -23,5 +23,5 @@ if WANT_CKDB bin_PROGRAMS += ckdb ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h -ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ +ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@ endif diff --git a/src/ckpool.c b/src/ckpool.c index 15a9a5bf..00f9a908 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,8 @@ ckpool_t *global_ckp; +const char gzip_magic[] = "\x1f\xd5\x01\n"; + static void proclog(ckpool_t *ckp, char *msg) { FILE *LOGFP; @@ -507,9 +510,172 @@ bool ping_main(ckpool_t *ckp) void empty_buffer(connsock_t *cs) { + if (cs->buf) + cs->buf[0] = '\0'; cs->buflen = cs->bufofs = 0; } +static void clear_bufline(connsock_t *cs) +{ + if (unlikely(!cs->buf)) + cs->buf = ckzalloc(PAGESIZE); + else if (cs->buflen) { + memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); + memset(cs->buf + cs->buflen, 0, cs->bufofs); + cs->bufofs = cs->buflen; + cs->buflen = 0; + cs->buf[cs->bufofs] = '\0'; + } else + cs->bufofs = 0; +} + +static void add_bufline(connsock_t *cs, const char *readbuf, const int len) +{ + int backoff = 1; + size_t buflen; + char *newbuf; + + buflen = round_up_page(cs->bufofs + len + 1); + while (42) { + newbuf = realloc(cs->buf, buflen); + if (likely(newbuf)) + break; + if (backoff == 1) + fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); + cksleep_ms(backoff); + backoff <<= 1; + } + cs->buf = newbuf; + if (unlikely(!cs->buf)) + quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); + memcpy(cs->buf + cs->bufofs, readbuf, len); + cs->bufofs += len; + cs->buf[cs->bufofs] = '\0'; +} + +static int read_cs_length(connsock_t *cs, float *timeout, int len) +{ + tv_t start, now; + float diff; + int ret; + + tv_time(&start); + + while (cs->bufofs < len) { + char readbuf[PAGESIZE]; + int readlen; + + if (*timeout < 0) { + LOGDEBUG("Timed out in read_cs_length"); + ret = 0; + goto out; + } + ret = wait_read_select(cs->fd, *timeout); + if (ret < 1) + goto out; + readlen = len - cs->bufofs; + if (readlen >= PAGESIZE) + readlen = PAGESIZE - 4; + ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); + if (ret < 1) + goto out; + add_bufline(cs, readbuf, ret); + tv_time(&now); + diff = tvdiff(&now, &start); + copy_tv(&start, &now); + *timeout -= diff; + } + ret = len; +out: + return ret; +} + +static int read_gz_line(connsock_t *cs, float *timeout) +{ + unsigned long compsize, res, decompsize; + char *buf, *dest = NULL, *eom; + int ret, buflen; + uint32_t msglen; + + /* Remove gz header */ + clear_bufline(cs); + + /* Get data sizes */ + ret = read_cs_length(cs, timeout, 8); + if (ret != 8) { + ret = -1; + goto out; + } + + memcpy(&msglen, cs->buf, 4); + compsize = le32toh(msglen); + memcpy(&msglen, cs->buf + 4, 4); + decompsize = le32toh(msglen); + + /* Remove the gz variables */ + cs->buflen = cs->bufofs - 8; + cs->bufofs = 8; + clear_bufline(cs); + + if (unlikely(compsize < 1 || compsize > 0x80000000 || + decompsize < 1 || decompsize > 0x80000000)) { + LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize); + ret = -1; + goto out; + } + + /* Get compressed data */ + ret = read_cs_length(cs, timeout, compsize); + if (ret != (int)compsize) { + LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret); + ret = -1; + goto out; + } + + /* Clear out all the compressed data */ + cs->buflen = cs->bufofs - compsize; + cs->bufofs = compsize; + clear_bufline(cs); + + /* Do decompresion and buffer reconstruction here */ + res = round_up_page(decompsize); + dest = ckalloc(res); + ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize); + if (ret != Z_OK || res != decompsize) { + LOGWARNING("Failed to decompress %lu bytes in read_gz_line, result %d got %lu", + decompsize, ret, res); + ret = -1; + goto out; + } + + eom = dest + decompsize - 1; + if (memcmp(eom, "\n", 1)) { + LOGWARNING("Failed to find EOM in decompressed data in read_gz_line"); + ret = -1; + goto out; + } + + *eom = '\0'; + ret = decompsize - 1; + /* Wedge the decompressed buffer back to the start of cs->buf */ + buf = cs->buf; + buflen = cs->bufofs; + cs->buf = dest; + dest = NULL; + cs->bufofs = decompsize; + if (buflen) { + add_bufline(cs, buf, buflen); + cs->buflen = buflen; + cs->bufofs = decompsize; + } else + cs->buflen = cs->bufofs = 0; +out: + free(dest); + if (ret < 1) + empty_buffer(cs); + return ret; +} + /* Read from a socket into cs->buf till we get an '\n', converting it to '\0' * and storing how much extra data we've received, to be moved to the beginning * of the buffer for use on the next receive. */ @@ -517,93 +683,54 @@ int read_socket_line(connsock_t *cs, float *timeout) { char *eom = NULL; tv_t start, now; - size_t buflen; int ret = -1; - bool polled; float diff; if (unlikely(cs->fd < 0)) goto out; - if (unlikely(!cs->buf)) - cs->buf = ckzalloc(PAGESIZE); - else if (cs->buflen) { - memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen); - memset(cs->buf + cs->buflen, 0, cs->bufofs); - cs->bufofs = cs->buflen; - cs->buflen = 0; - cs->buf[cs->bufofs] = '\0'; - eom = strchr(cs->buf, '\n'); - } + clear_bufline(cs); + eom = strchr(cs->buf, '\n'); tv_time(&start); -rewait: - if (*timeout < 0) { - LOGDEBUG("Timed out in read_socket_line"); - ret = 0; - goto out; - } - ret = wait_read_select(cs->fd, eom ? 0 : *timeout); - polled = true; - if (ret < 1) { - if (!ret) { - if (eom) - goto parse; - LOGDEBUG("Select timed out in read_socket_line"); - } else { + + while (!eom) { + char readbuf[PAGESIZE]; + + if (*timeout < 0) { + if (cs->ckp->proxy) + LOGINFO("Timed out in read_socket_line"); + else + LOGERR("Timed out in read_socket_line"); + ret = 0; + goto out; + } + ret = wait_read_select(cs->fd, *timeout); + if (ret < 0) { if (cs->ckp->proxy) LOGINFO("Select failed in read_socket_line"); else LOGERR("Select failed in read_socket_line"); + goto out; } - goto out; - } - tv_time(&now); - diff = tvdiff(&now, &start); - copy_tv(&start, &now); - *timeout -= diff; - while (42) { - char readbuf[PAGESIZE] = {}; - int backoff = 1; - char *newbuf; - ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { - /* No more to read or closed socket after valid message */ - if (eom) - break; - /* Have we used up all the timeout yet? If polled is - * set that means poll has said there should be + /* If we have done wait_read_select there should be * something to read and if we get nothing it means the * socket is closed. */ - if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) - goto rewait; if (cs->ckp->proxy) LOGINFO("Failed to recv in read_socket_line"); else LOGERR("Failed to recv in read_socket_line"); goto out; } - polled = false; - buflen = cs->bufofs + ret + 1; - while (42) { - newbuf = realloc(cs->buf, buflen); - if (likely(newbuf)) - break; - if (backoff == 1) - fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); - cksleep_ms(backoff); - backoff <<= 1; - } - cs->buf = newbuf; - if (unlikely(!cs->buf)) - quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); - memcpy(cs->buf + cs->bufofs, readbuf, ret); - cs->bufofs += ret; - cs->buf[cs->bufofs] = '\0'; + add_bufline(cs, readbuf, ret); eom = strchr(cs->buf, '\n'); + tv_time(&now); + diff = tvdiff(&now, &start); + copy_tv(&start, &now); + *timeout -= diff; } -parse: ret = eom - cs->buf; cs->buflen = cs->buf + cs->bufofs - eom - 1; @@ -616,7 +743,53 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - } + } else if (ret == 3 && !memcmp(cs->buf, gzip_magic, 3)) + ret = read_gz_line(cs, timeout); + return ret; +} + +/* gzip compressed block structure: + * - 4 byte magic header gzip_magic "\x1f\xd5\x01\n" + * - 4 byte LE encoded compressed size + * - 4 byte LE encoded decompressed size + */ +int write_cs(connsock_t *cs, const char *buf, int len) +{ + unsigned long compsize, decompsize = len; + char *dest = NULL; + uint32_t msglen; + int ret; + + /* Connsock doesn't expect gz compressed messages. Only compress if it's + * larger than one MTU. */ + if (!cs->gz || len <= 1492) + return write_socket(cs->fd, buf, len); + compsize = round_up_page(len + 12); + dest = alloca(compsize); + /* Do compression here */ + compsize -= 12; + ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); + if (ret != Z_OK) { + LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); + return write_socket(cs->fd, buf, len); + } + if (unlikely(compsize + 12 >= decompsize)) + return write_socket(cs->fd, buf, len); + /* Copy gz magic header */ + memcpy(dest, gzip_magic, 4); + /* Copy compressed message length */ + msglen = htole32(compsize); + memcpy(dest + 4, &msglen, 4); + /* Copy decompressed message length */ + msglen = htole32(decompsize); + memcpy(dest + 8, &msglen, 4); + len = compsize + 12; + LOGDEBUG("Writing connsock message compressed %d from %lu", len, decompsize); + ret = write_socket(cs->fd, dest, len); + if (ret == len) + ret = decompsize; + else + ret = -1; return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 0829d36c..52017027 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -82,6 +82,9 @@ struct connsock { ckpool_t *ckp; /* Semaphore used to serialise request/responses */ sem_t sem; + + /* Has the other end acknowledged it can receive gz compressed data */ + bool gz; }; typedef struct connsock connsock_t; @@ -281,6 +284,8 @@ static const char __maybe_unused *stratum_msgs[] = { "" }; +extern const char gzip_magic[]; + #ifdef USE_CKDB #define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #else @@ -301,6 +306,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, float *timeout); +int write_cs(connsock_t *cs, const char *buf, int len); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, diff --git a/src/connector.c b/src/connector.c index f78cb9c8..2f373b18 100644 --- a/src/connector.c +++ b/src/connector.c @@ -15,6 +15,7 @@ #include #include #include +#include #include "ckpool.h" #include "libckpool.h" @@ -55,7 +56,7 @@ struct client_instance { int server; char buf[PAGESIZE]; - int bufofs; + unsigned long bufofs; /* Are we currently sending a blocked message from this client */ sender_send_t *sending; @@ -63,6 +64,12 @@ struct client_instance { /* Is this the parent passthrough client */ bool passthrough; + /* Does this client expect gz compression? */ + bool gz; + bool compressed; /* Currently receiving a compressed message */ + unsigned long compsize; /* Expected compressed data size */ + unsigned long decompsize; /* Expected decompressed data size */ + /* Linked list of shares in redirector mode.*/ share_t *shares; @@ -472,12 +479,40 @@ retry: if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } client->bufofs += ret; +compressed: + if (client->compressed) { + unsigned long res; + + if (client->bufofs < client->compsize) + goto retry; + res = PAGESIZE - 4; + if (unlikely(client->decompsize > res)) { + LOGNOTICE("Client attempting to send oversize compressed message, disconnecting"); + invalidate_client(ckp, cdata, client); + return; + } + ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize); + if (ret != Z_OK || res != client->decompsize) { + LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d", + client->decompsize, client->compsize, ret); + invalidate_client(ckp, cdata, client); + return; + } + LOGDEBUG("Received client message compressed %lu from %lu", + client->compsize, client->decompsize); + msg[res] = '\0'; + client->bufofs -= client->compsize; + if (client->bufofs) + memmove(client->buf, client->buf + buflen, client->bufofs); + client->compressed = false; + goto parse; + } reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) @@ -490,11 +525,40 @@ reparse: invalidate_client(ckp, cdata, client); return; } + + /* Look for a compression header */ + if (!strncmp(client->buf, gzip_magic, 3)) { + uint32_t msglen; + + /* Do we have the whole header? If not, keep reading */ + if (client->bufofs < 12) + goto retry; + memcpy(&msglen, client->buf + 4, 4); + client->compsize = le32toh(msglen); + memcpy(&msglen, client->buf + 8, 4); + client->decompsize = le32toh(msglen); + if (unlikely(!client->compsize || !client->decompsize || + client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting", + client->id, client->compsize, client->decompsize); + invalidate_client(ckp, cdata, client); + return; + } + client->bufofs -= 12; + if (client->bufofs > 0) + memmove(client->buf, client->buf + 12, client->bufofs); + client->compressed = true; + if (client->bufofs >= client->compsize) + goto compressed; + goto retry; + } + memcpy(msg, client->buf, buflen); msg[buflen] = '\0'; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); client->buf[client->bufofs] = '\0'; +parse: if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); @@ -942,6 +1006,36 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) test_redirector_shares(ckp, client, buf); } + /* Does this client accept compressed data? Only compress if it's + * larger than one MTU. */ + if (client->gz && len > 1492) { + unsigned long compsize, decompsize = len; + uint32_t msglen; + Bytef *dest; + int ret; + + compsize = round_up_page(len); + dest = alloca(compsize); + ret = compress(dest, &compsize, (Bytef *)buf, len); + if (unlikely(ret != Z_OK)) { + LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret); + goto out; + } + if (unlikely(compsize + 12 >= decompsize)) + goto out; + /* Copy gz magic header */ + memcpy(buf, gzip_magic, 4); + /* Copy compressed message length */ + msglen = htole32(compsize); + memcpy(buf + 4, &msglen, 4); + /* Copy decompressed message length */ + msglen = htole32(decompsize); + memcpy(buf + 8, &msglen, 4); + memcpy(buf + 12, dest, compsize); + len = compsize + 12; + LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize); + } +out: sender_send = ckzalloc(sizeof(sender_send_t)); sender_send->client = client; sender_send->buf = buf; @@ -971,7 +1065,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) LOGINFO("Connector adding passthrough client %"PRId64, client->id); client->passthrough = true; - ASPRINTF(&buf, "{\"result\": true}\n"); + ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n"); send_client(cdata, client->id, buf); } @@ -1148,10 +1242,15 @@ retry: sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { goto out; - } else if (cmdmatch(buf, "passthrough")) { + } else if (cmdmatch(buf, "pass")) { client_instance_t *client; + bool gz = false; - ret = sscanf(buf, "passthrough=%"PRId64, &client_id); + if (strstr(buf, "gz")) { + gz = true; + ret = sscanf(buf, "passgz=%"PRId64, &client_id); + } else + ret = sscanf(buf, "passthrough=%"PRId64, &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse passthrough command: %s", buf); goto retry; @@ -1161,6 +1260,7 @@ retry: LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); goto retry; } + client->gz = gz; passthrough_client(cdata, client); dec_instance_ref(cdata, client); } else if (cmdmatch(buf, "getxfd")) { diff --git a/src/generator.c b/src/generator.c index c44bf8f3..353dc641 100644 --- a/src/generator.c +++ b/src/generator.c @@ -772,8 +772,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.passthrough", + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); json_decref(req); @@ -798,6 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied passthrough for stratum"); goto out; } + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->passthrough = true; out: if (val) @@ -814,8 +818,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) bool res, ret = false; float timeout = 10; - JSON_CPACK(req, "{s:s,s:[s]}", + JSON_CPACK(req, "{ss,sb,s[s]}", "method", "mining.node", + "gz", json_true(), "params", PACKAGE"/"VERSION); res = send_json_msg(cs, req); @@ -841,6 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi) LOGWARNING("Denied node setup for stratum"); goto out; } + json_get_bool(&cs->gz, val, "gz"); + if (cs->gz) + LOGNOTICE("Negotiated gz compression with pool"); proxi->node = true; out: if (val) @@ -1795,7 +1803,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm) LOGDEBUG("Sending upstream json msg: %s", pm->msg); len = strlen(pm->msg); - sent = write_socket(cs->fd, pm->msg, len); + sent = write_cs(cs, pm->msg, len); if (unlikely(sent != len)) { LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect", len, pm->msg); diff --git a/src/libckpool.c b/src/libckpool.c index a0557a71..2e78d330 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } +/* Round up to the nearest page size for efficient malloc */ +size_t round_up_page(size_t len) +{ + int rem = len % PAGESIZE; + + if (rem) + len += PAGESIZE - rem; + return len; +} + + + /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 9d8ba341..10c18248 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -531,6 +531,8 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); +size_t round_up_page(size_t len); + extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); diff --git a/src/stratifier.c b/src/stratifier.c index e87fc7e4..55bdf65c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5186,10 +5186,11 @@ static void add_mining_node(sdata_t *sdata, stratum_instance_t *client) /* Enter with client holding ref count */ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, - const int64_t client_id, json_t *id_val, json_t *method_val, + const int64_t client_id, json_t *val, json_t *id_val, json_t *method_val, json_t *params_val) { const char *method; + bool var; /* Random broken clients send something not an integer as the id so we * copy the json item for id_val as is for the response. By far the @@ -5237,8 +5238,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ + json_get_bool(&var, val, "gz"); add_mining_node(sdata, client); - snprintf(buf, 255, "passthrough=%"PRId64, client_id); + if (var) + snprintf(buf, 255, "passgz=%"PRId64, client_id); + else + snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); return; } @@ -5250,8 +5255,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie * is a passthrough and to manage its messages accordingly. No * data from this client id should ever come back to this * stratifier after this so drop the client in the stratifier. */ + json_get_bool(&var, val, "gz"); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); - snprintf(buf, 255, "passthrough=%"PRId64, client_id); + if (var) + snprintf(buf, 255, "passgz=%"PRId64, client_id); + else + snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); drop_client(ckp, sdata, client_id); return; @@ -5469,7 +5478,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat send_json_err(sdata, client_id, id_val, "-1:params not found"); goto out; } - parse_method(ckp, sdata, client, client_id, id_val, method, params); + parse_method(ckp, sdata, client, client_id, val, id_val, method, params); out: free_smsg(msg); }