Browse Source

Compression of any sort is slower than ordinary network transfers so remove it

master
Con Kolivas 9 years ago
parent
commit
5b816982e5
  1. 4
      README
  2. 1
      configure.ac
  3. 175
      src/ckpool.c
  4. 9
      src/ckpool.h
  5. 102
      src/connector.c
  6. 22
      src/generator.c
  7. 13
      src/stratifier.c

4
README

@ -265,10 +265,6 @@ new network blocks and is 100 by default. It is intended to be a backup only
for when the notifier is not set up and only polls if the "notify" field is
not set on a btcd.
"compress" : When running in a passthrough mode (redirector, passthrough, node),
should we gzip compress large packets. For passthroughs on a local network it
is recommended to disable this. Default is enabled.
"nonce1length" : This is optional allowing the extranonce1 length to be chosen
from 2 to 8. Default 4

1
configure.ac

@ -54,7 +54,6 @@ AC_ARG_WITH([ckdb],
#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1)
AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1)
if test "x$ckdb" != "xno"; then

175
src/ckpool.c

@ -36,8 +36,6 @@
ckpool_t *global_ckp;
const char gzip_magic[] = "\x1f\xd5\x01\n";
static void proclog(ckpool_t *ckp, char *msg)
{
FILE *LOGFP;
@ -553,129 +551,6 @@ static void add_bufline(connsock_t *cs, const char *readbuf, const int len)
cs->buf[cs->bufofs] = '\0';
}
static int read_cs_length(connsock_t *cs, float *timeout, int len)
{
tv_t start, now;
float diff;
int ret;
tv_time(&start);
while (cs->bufofs < len) {
char readbuf[PAGESIZE];
int readlen;
if (*timeout < 0) {
LOGDEBUG("Timed out in read_cs_length");
ret = 0;
goto out;
}
ret = wait_read_select(cs->fd, *timeout);
if (ret < 1)
goto out;
readlen = len - cs->bufofs;
if (readlen >= PAGESIZE)
readlen = PAGESIZE - 4;
ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT);
if (ret < 1)
goto out;
add_bufline(cs, readbuf, ret);
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
}
ret = len;
out:
return ret;
}
static int read_gz_line(connsock_t *cs, float *timeout)
{
unsigned long compsize, res, decompsize;
char *buf, *dest = NULL, *eom;
int ret, buflen;
uint32_t msglen;
/* Remove gz header */
clear_bufline(cs);
/* Get data sizes */
ret = read_cs_length(cs, timeout, 8);
if (ret != 8) {
ret = -1;
goto out;
}
memcpy(&msglen, cs->buf, 4);
compsize = le32toh(msglen);
memcpy(&msglen, cs->buf + 4, 4);
decompsize = le32toh(msglen);
/* Remove the gz variables */
cs->buflen = cs->bufofs - 8;
cs->bufofs = 8;
clear_bufline(cs);
if (unlikely(compsize < 1 || compsize > 0x80000000 ||
decompsize < 1 || decompsize > 0x80000000)) {
LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize);
ret = -1;
goto out;
}
/* Get compressed data */
ret = read_cs_length(cs, timeout, compsize);
if (ret != (int)compsize) {
LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret);
ret = -1;
goto out;
}
/* Clear out all the compressed data */
cs->buflen = cs->bufofs - compsize;
cs->bufofs = compsize;
clear_bufline(cs);
/* Do decompresion and buffer reconstruction here */
res = round_up_page(decompsize);
dest = ckalloc(res);
ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize);
if (ret != Z_OK || res != decompsize) {
LOGWARNING("Failed to decompress %lu bytes in read_gz_line, result %d got %lu",
decompsize, ret, res);
ret = -1;
goto out;
}
eom = dest + decompsize - 1;
if (memcmp(eom, "\n", 1)) {
LOGWARNING("Failed to find EOM in decompressed data in read_gz_line");
ret = -1;
goto out;
}
*eom = '\0';
ret = decompsize - 1;
/* Wedge the decompressed buffer back to the start of cs->buf */
buf = cs->buf;
buflen = cs->bufofs;
cs->buf = dest;
dest = NULL;
cs->bufofs = decompsize;
if (buflen) {
add_bufline(cs, buf, buflen);
cs->buflen = buflen;
cs->bufofs = decompsize;
} else
cs->buflen = cs->bufofs = 0;
out:
free(dest);
if (ret < 1)
empty_buffer(cs);
return ret;
}
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. */
@ -743,53 +618,7 @@ out:
if (ret < 0) {
empty_buffer(cs);
dealloc(cs->buf);
} else if (ret == 3 && !memcmp(cs->buf, gzip_magic, 3))
ret = read_gz_line(cs, timeout);
return ret;
}
/* gzip compressed block structure:
* - 4 byte magic header gzip_magic "\x1f\xd5\x01\n"
* - 4 byte LE encoded compressed size
* - 4 byte LE encoded decompressed size
*/
int write_cs(connsock_t *cs, const char *buf, int len)
{
unsigned long compsize, decompsize = len;
char *dest = NULL;
uint32_t msglen;
int ret;
/* Connsock doesn't expect gz compressed messages. Only compress if it's
* larger than one MTU. */
if (!cs->gz || len <= 1492)
return write_socket(cs->fd, buf, len);
compsize = round_up_page(len + 12);
dest = alloca(compsize);
/* Do compression here */
compsize -= 12;
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (ret != Z_OK) {
LOGINFO("Failed to gz compress in write_cs, writing uncompressed");
return write_socket(cs->fd, buf, len);
}
if (unlikely(compsize + 12 >= decompsize))
return write_socket(cs->fd, buf, len);
/* Copy gz magic header */
memcpy(dest, gzip_magic, 4);
/* Copy compressed message length */
msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4);
/* Copy decompressed message length */
msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4);
len = compsize + 12;
LOGDEBUG("Writing connsock message compressed %d from %lu", len, decompsize);
ret = write_socket(cs->fd, dest, len);
if (ret == len)
ret = decompsize;
else
ret = -1;
return ret;
}
@ -1539,7 +1368,6 @@ static void parse_config(ckpool_t *ckp)
ckp->btcsig[38] = '\0';
}
json_get_int(&ckp->blockpoll, json_conf, "blockpoll");
json_get_bool(&ckp->compress, json_conf, "compress");
json_get_int(&ckp->nonce1length, json_conf, "nonce1length");
json_get_int(&ckp->nonce2length, json_conf, "nonce2length");
json_get_int(&ckp->update_interval, json_conf, "update_interval");
@ -1908,9 +1736,6 @@ int main(int argc, char **argv)
if (ret && errno != EEXIST)
quit(1, "Failed to make directory %s", ckp.socket_dir);
/* Set default on */
ckp.compress = true;
parse_config(&ckp);
/* Set defaults if not found in config file */
if (!ckp.btcds) {

9
src/ckpool.h

@ -82,9 +82,6 @@ struct connsock {
ckpool_t *ckp;
/* Semaphore used to serialise request/responses */
sem_t sem;
/* Has the other end acknowledged it can receive gz compressed data */
bool gz;
};
typedef struct connsock connsock_t;
@ -192,9 +189,6 @@ struct ckpool_instance {
/* Are we a redirecting passthrough */
bool redirector;
/* Should we compress large packets in passthrough modes */
bool compress;
/* Are we running as a proxy */
bool proxy;
@ -287,8 +281,6 @@ static const char __maybe_unused *stratum_msgs[] = {
""
};
extern const char gzip_magic[];
#ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else
@ -309,7 +301,6 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, float *timeout);
int write_cs(connsock_t *cs, const char *buf, int len);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

102
src/connector.c

@ -64,12 +64,6 @@ struct client_instance {
/* Is this the parent passthrough client */
bool passthrough;
/* Does this client expect gz compression? */
bool gz;
bool compressed; /* Currently receiving a compressed message */
unsigned long compsize; /* Expected compressed data size */
unsigned long decompsize; /* Expected decompressed data size */
/* Linked list of shares in redirector mode.*/
share_t *shares;
@ -485,34 +479,6 @@ retry:
return;
}
client->bufofs += ret;
compressed:
if (client->compressed) {
unsigned long res;
if (client->bufofs < client->compsize)
goto retry;
res = PAGESIZE - 4;
if (unlikely(client->decompsize > res)) {
LOGNOTICE("Client attempting to send oversize compressed message, disconnecting");
invalidate_client(ckp, cdata, client);
return;
}
ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize);
if (ret != Z_OK || res != client->decompsize) {
LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d",
client->decompsize, client->compsize, ret);
invalidate_client(ckp, cdata, client);
return;
}
LOGDEBUG("Received client message compressed %lu from %lu",
client->compsize, client->decompsize);
msg[res] = '\0';
client->bufofs -= client->compsize;
if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs);
client->compressed = false;
goto parse;
}
reparse:
eol = memchr(client->buf, '\n', client->bufofs);
if (!eol)
@ -526,39 +492,11 @@ reparse:
return;
}
/* Look for a compression header */
if (!strncmp(client->buf, gzip_magic, 3)) {
uint32_t msglen;
/* Do we have the whole header? If not, keep reading */
if (client->bufofs < 12)
goto retry;
memcpy(&msglen, client->buf + 4, 4);
client->compsize = le32toh(msglen);
memcpy(&msglen, client->buf + 8, 4);
client->decompsize = le32toh(msglen);
if (unlikely(!client->compsize || !client->decompsize ||
client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) {
LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting",
client->id, client->compsize, client->decompsize);
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs -= 12;
if (client->bufofs > 0)
memmove(client->buf, client->buf + 12, client->bufofs);
client->compressed = true;
if (client->bufofs >= client->compsize)
goto compressed;
goto retry;
}
memcpy(msg, client->buf, buflen);
msg[buflen] = '\0';
client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0';
parse:
if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n");
@ -1006,36 +944,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
test_redirector_shares(ckp, client, buf);
}
/* Does this client accept compressed data? Only compress if it's
* larger than one MTU. */
if (client->gz && len > 1492) {
unsigned long compsize, decompsize = len;
uint32_t msglen;
Bytef *dest;
int ret;
compsize = round_up_page(len);
dest = alloca(compsize);
ret = compress(dest, &compsize, (Bytef *)buf, len);
if (unlikely(ret != Z_OK)) {
LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret);
goto out;
}
if (unlikely(compsize + 12 >= decompsize))
goto out;
/* Copy gz magic header */
memcpy(buf, gzip_magic, 4);
/* Copy compressed message length */
msglen = htole32(compsize);
memcpy(buf + 4, &msglen, 4);
/* Copy decompressed message length */
msglen = htole32(decompsize);
memcpy(buf + 8, &msglen, 4);
memcpy(buf + 12, dest, compsize);
len = compsize + 12;
LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize);
}
out:
sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client;
sender_send->buf = buf;
@ -1065,7 +973,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n");
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(cdata, client->id, buf);
}
@ -1242,14 +1150,9 @@ retry:
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) {
goto out;
} else if (cmdmatch(buf, "pass")) {
} else if (cmdmatch(buf, "passthrough")) {
client_instance_t *client;
bool gz = false;
if (strstr(buf, "gz")) {
gz = true;
ret = sscanf(buf, "passgz=%"PRId64, &client_id);
} else
ret = sscanf(buf, "passthrough=%"PRId64, &client_id);
if (ret < 0) {
LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
@ -1260,7 +1163,6 @@ retry:
LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id);
goto retry;
}
client->gz = gz;
passthrough_client(cdata, client);
dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) {

22
src/generator.c

@ -766,15 +766,14 @@ out:
}
/* cs semaphore must be held */
static bool passthrough_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
json_t *req, *val = NULL, *res_val, *err_val;
bool res, ret = false;
float timeout = 10;
JSON_CPACK(req, "{ss,sb,s[s]}",
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.passthrough",
"gz", ckp->compress,
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
json_decref(req);
@ -799,9 +798,6 @@ static bool passthrough_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t
LOGWARNING("Denied passthrough for stratum");
goto out;
}
json_get_bool(&cs->gz, val, "gz");
if (cs->gz)
LOGNOTICE("Negotiated gz compression with pool");
proxi->passthrough = true;
out:
if (val)
@ -812,15 +808,14 @@ out:
}
/* cs semaphore must be held */
static bool node_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
json_t *req, *val = NULL, *res_val, *err_val;
bool res, ret = false;
float timeout = 10;
JSON_CPACK(req, "{ss,sb,s[s]}",
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.node",
"gz", ckp->compress,
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
@ -846,9 +841,6 @@ static bool node_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied node setup for stratum");
goto out;
}
json_get_bool(&cs->gz, val, "gz");
if (cs->gz)
LOGNOTICE("Negotiated gz compression with pool");
proxi->node = true;
out:
if (val)
@ -1803,7 +1795,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm)
LOGDEBUG("Sending upstream json msg: %s", pm->msg);
len = strlen(pm->msg);
sent = write_cs(cs, pm->msg, len);
sent = write_socket(cs->fd, pm->msg, len);
if (unlikely(sent != len && cs->fd)) {
LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect",
len, pm->msg);
@ -1855,7 +1847,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
goto out;
}
if (ckp->node) {
if (!node_stratum(ckp, cs, proxi)) {
if (!node_stratum(cs, proxi)) {
LOGWARNING("Failed initial node setup to %s:%s !",
cs->url, cs->port);
goto out;
@ -1864,7 +1856,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
goto out;
}
if (ckp->passthrough) {
if (!passthrough_stratum(ckp, cs, proxi)) {
if (!passthrough_stratum(cs, proxi)) {
LOGWARNING("Failed initial passthrough to %s:%s !",
cs->url, cs->port);
goto out;

13
src/stratifier.c

@ -5186,11 +5186,10 @@ static void add_mining_node(sdata_t *sdata, stratum_instance_t *client)
/* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *val, json_t *id_val, json_t *method_val,
const int64_t client_id, json_t *id_val, json_t *method_val,
json_t *params_val)
{
const char *method;
bool var;
/* Random broken clients send something not an integer as the id so we
* copy the json item for id_val as is for the response. By far the
@ -5238,11 +5237,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */
json_get_bool(&var, val, "gz");
add_mining_node(sdata, client);
if (var)
snprintf(buf, 255, "passgz=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
return;
@ -5255,11 +5250,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */
json_get_bool(&var, val, "gz");
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
if (var)
snprintf(buf, 255, "passgz=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
drop_client(ckp, sdata, client_id);
@ -5478,7 +5469,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
send_json_err(sdata, client_id, id_val, "-1:params not found");
goto out;
}
parse_method(ckp, sdata, client, client_id, val, id_val, method, params);
parse_method(ckp, sdata, client, client_id, id_val, method, params);
out:
free_smsg(msg);
}

Loading…
Cancel
Save