Browse Source

Convert to gzip

master
Con Kolivas 9 years ago
parent
commit
11d14620e6
  1. 4
      configure.ac
  2. 61
      src/ckpool.c
  3. 6
      src/ckpool.h
  4. 67
      src/connector.c
  5. 16
      src/generator.c
  6. 12
      src/libckpool.c
  7. 2
      src/libckpool.h
  8. 8
      src/stratifier.c

4
configure.ac

@ -40,7 +40,7 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h)
AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h)
AC_CHECK_HEADERS(lz4.h) AC_CHECK_HEADERS(zlib.h)
AC_CONFIG_SUBDIRS([src/jansson-2.6]) AC_CONFIG_SUBDIRS([src/jansson-2.6])
JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a"
@ -54,7 +54,7 @@ AC_ARG_WITH([ckdb],
#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1) #AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1) AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(LZ4_compress, lz4, , echo "Error: Required library liblz4-dev not found." && exit 1) AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1)
AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1) AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1)
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then

61
src/ckpool.c

@ -20,7 +20,7 @@
#include <getopt.h> #include <getopt.h>
#include <grp.h> #include <grp.h>
#include <jansson.h> #include <jansson.h>
#include <lz4.h> #include <zlib.h>
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -36,6 +36,8 @@
ckpool_t *global_ckp; ckpool_t *global_ckp;
const char gzip_magic[] = "\x1f\xd5\x01\n";
static void proclog(ckpool_t *ckp, char *msg) static void proclog(ckpool_t *ckp, char *msg)
{ {
FILE *LOGFP; FILE *LOGFP;
@ -567,13 +569,14 @@ out:
return ret; return ret;
} }
static int read_lz4_line(connsock_t *cs, float *timeout) static int read_gz_line(connsock_t *cs, float *timeout)
{ {
int compsize, decompsize, ret, buflen; unsigned long compsize, res, decompsize;
char *buf, *dest = NULL, *eom; char *buf, *dest = NULL, *eom;
int ret, buflen;
uint32_t msglen; uint32_t msglen;
/* Remove lz4 header */ /* Remove gz header */
clear_bufline(cs); clear_bufline(cs);
/* Get data sizes */ /* Get data sizes */
@ -588,35 +591,35 @@ static int read_lz4_line(connsock_t *cs, float *timeout)
decompsize = le32toh(msglen); decompsize = le32toh(msglen);
cs->bufofs = 8; cs->bufofs = 8;
clear_bufline(cs); clear_bufline(cs);
LOGWARNING("Trying to read lz4 %d/%d", compsize, decompsize);
if (unlikely(compsize < 1 || compsize > (int)0x80000000 || if (unlikely(compsize < 1 || compsize > 0x80000000 ||
decompsize < 1 || decompsize > (int)0x80000000)) { decompsize < 1 || decompsize > 0x80000000)) {
LOGWARNING("Invalid message length comp %d decomp %d sent to read_lz4_line", compsize, decompsize); LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize);
ret = -1; ret = -1;
goto out; goto out;
} }
/* Get compressed data */ /* Get compressed data */
ret = read_cs_length(cs, timeout, compsize); ret = read_cs_length(cs, timeout, compsize);
if (ret != compsize) { if (ret != (int)compsize) {
LOGWARNING("Failed to read %d compressed bytes in read_lz4_line, got %d", compsize, ret); LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret);
ret = -1; ret = -1;
goto out; goto out;
} }
/* Do decompresion and buffer reconstruction here */ /* Do decompresion and buffer reconstruction here */
dest = ckalloc(decompsize); dest = ckalloc(decompsize);
ret = LZ4_decompress_safe(cs->buf, dest, compsize, decompsize); res = decompsize;
ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize);
/* Clear out all the compressed data */ /* Clear out all the compressed data */
clear_bufline(cs); clear_bufline(cs);
if (ret != decompsize) { if (ret != Z_OK || res != decompsize) {
LOGWARNING("Failed to decompress %d bytes in read_lz4_line, got %d", decompsize, ret); LOGWARNING("Failed to decompress %lu bytes in read_gz_line, got %d", decompsize, ret);
ret = -1; ret = -1;
goto out; goto out;
} }
eom = dest + decompsize - 1; eom = dest + decompsize - 1;
if (memcmp(eom, "\n", 1)) { if (memcmp(eom, "\n", 1)) {
LOGWARNING("Failed to find EOM in decompressed data in read_lz4_line"); LOGWARNING("Failed to find EOM in decompressed data in read_gz_line");
ret = -1; ret = -1;
goto out; goto out;
} }
@ -714,39 +717,41 @@ out:
if (ret < 0) { if (ret < 0) {
empty_buffer(cs); empty_buffer(cs);
dealloc(cs->buf); dealloc(cs->buf);
} else if (cs->buflen && !strncmp(cs->buf, "lz4", 3)) } else if (cs->buflen && !strncmp(cs->buf, gzip_magic, 3))
ret = read_lz4_line(cs, timeout); ret = read_gz_line(cs, timeout);
return ret; return ret;
} }
/* Lz4 compressed block structure: /* gzip compressed block structure:
* - 4 byte magic header LZ4\n * - 4 byte magic header gzip_magic "\x1f\xd5\x01\n"
* - 4 byte LE encoded compressed size * - 4 byte LE encoded compressed size
* - 4 byte LE encoded decompressed size * - 4 byte LE encoded decompressed size
*/ */
int write_cs(connsock_t *cs, const char *buf, int len) int write_cs(connsock_t *cs, const char *buf, int len)
{ {
int compsize, decompsize = len; unsigned long compsize, decompsize = len;
char *dest = NULL; char *dest = NULL;
uint32_t msglen; uint32_t msglen;
int ret = -1; int ret = -1;
/* Connsock doesn't expect lz4 compressed messages */ /* Connsock doesn't expect gz compressed messages */
if (!cs->lz4 || len <= 1492) { if (!cs->gz || len <= 1492) {
ret = write_socket(cs->fd, buf, len); ret = write_socket(cs->fd, buf, len);
goto out; goto out;
} }
dest = ckalloc(len + 12); compsize = round_up_page(len + 12);
dest = ckalloc(compsize);
compsize -= 12;
/* Do compression here */ /* Do compression here */
compsize = LZ4_compress(buf, dest + 12, len); ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (!compsize) { if (ret != Z_OK) {
LOGWARNING("Failed to LZ4 compress in write_cs, writing uncompressed"); LOGWARNING("Failed to gz compress in write_cs, writing uncompressed");
ret = write_socket(cs->fd, buf, len); ret = write_socket(cs->fd, buf, len);
goto out; goto out;
} }
LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); LOGDEBUG("Writing connsock message compressed %lu from %lu", compsize, decompsize);
/* Copy lz4 magic header */ /* Copy gz magic header */
sprintf(dest, "lz4\n"); sprintf(dest, gzip_magic);
/* Copy compressed message length */ /* Copy compressed message length */
msglen = htole32(compsize); msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4); memcpy(dest + 4, &msglen, 4);

6
src/ckpool.h

@ -83,8 +83,8 @@ struct connsock {
/* Semaphore used to serialise request/responses */ /* Semaphore used to serialise request/responses */
sem_t sem; sem_t sem;
/* Has the other end acknowledged it can receive lz4 compressed data */ /* Has the other end acknowledged it can receive gz compressed data */
bool lz4; bool gz;
}; };
typedef struct connsock connsock_t; typedef struct connsock connsock_t;
@ -284,6 +284,8 @@ static const char __maybe_unused *stratum_msgs[] = {
"" ""
}; };
extern const char gzip_magic[];
#ifdef USE_CKDB #ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else #else

67
src/connector.c

@ -10,12 +10,12 @@
#include "config.h" #include "config.h"
#include <arpa/inet.h> #include <arpa/inet.h>
#include <lz4.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <zlib.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -56,7 +56,7 @@ struct client_instance {
int server; int server;
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; unsigned long bufofs;
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
sender_send_t *sending; sender_send_t *sending;
@ -64,11 +64,11 @@ struct client_instance {
/* Is this the parent passthrough client */ /* Is this the parent passthrough client */
bool passthrough; bool passthrough;
/* Does this client expect lz4 compression? */ /* Does this client expect gz compression? */
bool lz4; bool gz;
bool compressed; /* Currently receiving a compressed message */ bool compressed; /* Currently receiving a compressed message */
int compsize; /* Expected compressed data size */ unsigned long compsize; /* Expected compressed data size */
int decompsize; /* Expected decompressed data size */ unsigned long decompsize; /* Expected decompressed data size */
/* Linked list of shares in redirector mode.*/ /* Linked list of shares in redirector mode.*/
share_t *shares; share_t *shares;
@ -479,7 +479,7 @@ retry:
if (ret < 1) { if (ret < 1) {
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return; return;
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s",
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
@ -487,18 +487,21 @@ retry:
client->bufofs += ret; client->bufofs += ret;
compressed: compressed:
if (client->compressed) { if (client->compressed) {
unsigned long res;
if (client->bufofs < client->compsize) if (client->bufofs < client->compsize)
goto retry; goto retry;
ret = LZ4_decompress_safe(client->buf, msg, client->compsize, client->decompsize); res = client->decompsize;
if (ret != client->decompsize) { ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize);
LOGNOTICE("Failed to decompress %d from %d bytes in parse_client_msg, got %d", if (ret != Z_OK || res != client->decompsize) {
LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d",
client->decompsize, client->compsize, ret); client->decompsize, client->compsize, ret);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
LOGDEBUG("Received client message compressed %d from %d", LOGDEBUG("Received client message compressed %lu from %lu",
client->compsize, client->decompsize); client->compsize, client->decompsize);
msg[ret] = '\0'; msg[res] = '\0';
client->bufofs -= client->compsize; client->bufofs -= client->compsize;
if (client->bufofs) if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs); memmove(client->buf, client->buf + buflen, client->bufofs);
@ -519,7 +522,7 @@ reparse:
} }
/* Look for a compression header */ /* Look for a compression header */
if (!strncmp(client->buf, "lz4", 3)) { if (!strncmp(client->buf, gzip_magic, 3)) {
uint32_t msglen; uint32_t msglen;
/* Do we have the whole header? If not, keep reading */ /* Do we have the whole header? If not, keep reading */
@ -531,7 +534,7 @@ reparse:
client->decompsize = le32toh(msglen); client->decompsize = le32toh(msglen);
if (unlikely(!client->compsize || !client->decompsize || if (unlikely(!client->compsize || !client->decompsize ||
client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) { client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) {
LOGNOTICE("Client id %"PRId64" invalid compressed message size %d/%d, disconnecting", LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting",
client->id, client->compsize, client->decompsize); client->id, client->compsize, client->decompsize);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
@ -1000,20 +1003,24 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
/* Does this client accept compressed data? Only compress if it's /* Does this client accept compressed data? Only compress if it's
* larger than one MTU. */ * larger than one MTU. */
if (client->lz4 && len > 1492) { if (client->gz && len > 1492) {
char *dest = ckalloc(len + 12); unsigned long compsize;
uint32_t msglen; uint32_t msglen;
int compsize; char *dest;
int ret;
compsize = LZ4_compress(buf, dest + 12, len);
if (unlikely(!compsize)) { compsize = round_up_page(len + 12);
LOGWARNING("Failed to LZ4 compress in send_client, sending uncompressed"); dest = ckalloc(compsize);
compsize -= 12;
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (unlikely(ret != Z_OK)) {
LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret);
free(dest); free(dest);
goto out; goto out;
} }
LOGDEBUG("Sending client message compressed %d from %d", compsize, len); LOGDEBUG("Sending client message compressed %lu from %d", compsize, len);
/* Copy lz4 magic header */ /* Copy gz magic header */
sprintf(dest, "lz4\n"); sprintf(dest, gzip_magic);
/* Copy compressed message length */ /* Copy compressed message length */
msglen = htole32(compsize); msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4); memcpy(dest + 4, &msglen, 4);
@ -1054,7 +1061,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
LOGINFO("Connector adding passthrough client %"PRId64, client->id); LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true; client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true, \"lz4\": true}\n"); ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n");
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }
@ -1233,11 +1240,11 @@ retry:
goto out; goto out;
} else if (cmdmatch(buf, "pass")) { } else if (cmdmatch(buf, "pass")) {
client_instance_t *client; client_instance_t *client;
bool lz4 = false; bool gz = false;
if (strstr(buf, "lz4")) { if (strstr(buf, "gz")) {
lz4 = true; gz = true;
ret = sscanf(buf, "passlz4=%"PRId64, &client_id); ret = sscanf(buf, "passgz=%"PRId64, &client_id);
} else } else
ret = sscanf(buf, "passthrough=%"PRId64, &client_id); ret = sscanf(buf, "passthrough=%"PRId64, &client_id);
if (ret < 0) { if (ret < 0) {
@ -1249,7 +1256,7 @@ retry:
LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id);
goto retry; goto retry;
} }
client->lz4 = lz4; client->gz = gz;
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) { } else if (cmdmatch(buf, "getxfd")) {

16
src/generator.c

@ -774,7 +774,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
JSON_CPACK(req, "{ss,sb,s[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.passthrough", "method", "mining.passthrough",
"lz4", json_true(), "gz", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
json_decref(req); json_decref(req);
@ -799,9 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied passthrough for stratum"); LOGWARNING("Denied passthrough for stratum");
goto out; goto out;
} }
json_get_bool(&cs->lz4, val, "lz4"); json_get_bool(&cs->gz, val, "gz");
if (cs->lz4) if (cs->gz)
LOGNOTICE("Negotiated lz4 compression with pool"); LOGNOTICE("Negotiated gz compression with pool");
proxi->passthrough = true; proxi->passthrough = true;
out: out:
if (val) if (val)
@ -820,7 +820,7 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
JSON_CPACK(req, "{ss,sb,s[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.node", "method", "mining.node",
"lz4", json_true(), "gz", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
@ -846,9 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied node setup for stratum"); LOGWARNING("Denied node setup for stratum");
goto out; goto out;
} }
json_get_bool(&cs->lz4, val, "lz4"); json_get_bool(&cs->gz, val, "gz");
if (cs->lz4) if (cs->gz)
LOGNOTICE("Negotiated lz4 compression with pool"); LOGNOTICE("Negotiated gz compression with pool");
proxi->node = true; proxi->node = true;
out: out:
if (val) if (val)

12
src/libckpool.c

@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line)
return ptr; return ptr;
} }
/* Round up to the nearest page size for efficient malloc */
size_t round_up_page(size_t len)
{
int rem = len % PAGESIZE;
if (rem)
len += PAGESIZE - rem;
return len;
}
/* Adequate size s==len*2 + 1 must be alloced to use this variant */ /* Adequate size s==len*2 + 1 must be alloced to use this variant */
void __bin2hex(void *vs, const void *vp, size_t len) void __bin2hex(void *vs, const void *vp, size_t len)
{ {

2
src/libckpool.h

@ -531,6 +531,8 @@ void trail_slash(char **buf);
void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *_ckalloc(size_t len, const char *file, const char *func, const int line);
void *json_ckalloc(size_t size); void *json_ckalloc(size_t size);
void *_ckzalloc(size_t len, const char *file, const char *func, const int line); void *_ckzalloc(size_t len, const char *file, const char *func, const int line);
size_t round_up_page(size_t len);
extern const int hex2bin_tbl[]; extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len); void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len); void *bin2hex(const void *vp, size_t len);

8
src/stratifier.c

@ -5238,10 +5238,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and /* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */ * add it to the list of mining nodes in the stratifier */
json_get_bool(&var, val, "lz4"); json_get_bool(&var, val, "gz");
add_mining_node(sdata, client); add_mining_node(sdata, client);
if (var) if (var)
snprintf(buf, 255, "passlz4=%"PRId64, client_id); snprintf(buf, 255, "passgz=%"PRId64, client_id);
else else
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
@ -5255,10 +5255,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No * is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this * data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */ * stratifier after this so drop the client in the stratifier. */
json_get_bool(&var, val, "lz4"); json_get_bool(&var, val, "gz");
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
if (var) if (var)
snprintf(buf, 255, "passlz4=%"PRId64, client_id); snprintf(buf, 255, "passgz=%"PRId64, client_id);
else else
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);

Loading…
Cancel
Save