Browse Source

Selectively compress only large packets greater than one MTU and identify lz4 compatible clients immediately

master
Con Kolivas 9 years ago
parent
commit
e3fc6a1e56
  1. 8
      src/ckpool.c
  2. 22
      src/connector.c
  3. 17
      src/stratifier.c

8
src/ckpool.c

@ -732,7 +732,7 @@ int write_cs(connsock_t *cs, const char *buf, int len)
int ret = -1; int ret = -1;
/* Connsock doesn't expect lz4 compressed messages */ /* Connsock doesn't expect lz4 compressed messages */
if (!cs->lz4) { if (!cs->lz4 || len <= 1492) {
ret = write_socket(cs->fd, buf, len); ret = write_socket(cs->fd, buf, len);
goto out; goto out;
} }
@ -744,12 +744,6 @@ int write_cs(connsock_t *cs, const char *buf, int len)
ret = write_socket(cs->fd, buf, len); ret = write_socket(cs->fd, buf, len);
goto out; goto out;
} }
if (compsize + 12 >= len) {
/* Selectively send compressed packets only when they're
* smaller. */
ret = write_socket(cs->fd, buf, len);
goto out;
}
LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize); LOGDEBUG("Writing connsock message compressed %d from %d", compsize, decompsize);
/* Copy lz4 magic header */ /* Copy lz4 magic header */
sprintf(dest, "lz4\n"); sprintf(dest, "lz4\n");

22
src/connector.c

@ -499,8 +499,6 @@ compressed:
LOGDEBUG("Received client message compressed %d from %d", LOGDEBUG("Received client message compressed %d from %d",
client->compsize, client->decompsize); client->compsize, client->decompsize);
msg[ret] = '\0'; msg[ret] = '\0';
/* Flag this client as able to receive lz4 compressed data now */
client->lz4 = true;
client->bufofs -= client->compsize; client->bufofs -= client->compsize;
if (client->bufofs) if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs); memmove(client->buf, client->buf + buflen, client->bufofs);
@ -1000,8 +998,9 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
test_redirector_shares(ckp, client, buf); test_redirector_shares(ckp, client, buf);
} }
/* Does this client accept compressed data? */ /* Does this client accept compressed data? Only compress if it's
if (client->lz4) { * larger than one MTU. */
if (client->lz4 && len > 1492) {
char *dest = ckalloc(len + 12); char *dest = ckalloc(len + 12);
uint32_t msglen; uint32_t msglen;
int compsize; int compsize;
@ -1012,11 +1011,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
free(dest); free(dest);
goto out; goto out;
} }
if (compsize + 12 >= len) {
/* Only end it compressed if it's smaller */
free(dest);
goto out;
}
LOGDEBUG("Sending client message compressed %d from %d", compsize, len); LOGDEBUG("Sending client message compressed %d from %d", compsize, len);
/* Copy lz4 magic header */ /* Copy lz4 magic header */
sprintf(dest, "lz4\n"); sprintf(dest, "lz4\n");
@ -1237,10 +1231,15 @@ retry:
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) { } else if (cmdmatch(buf, "shutdown")) {
goto out; goto out;
} else if (cmdmatch(buf, "passthrough")) { } else if (cmdmatch(buf, "pass")) {
client_instance_t *client; client_instance_t *client;
bool lz4 = false;
ret = sscanf(buf, "passthrough=%"PRId64, &client_id); if (strstr(buf, "lz4")) {
lz4 = true;
ret = sscanf(buf, "passlz4=%"PRId64, &client_id);
} else
ret = sscanf(buf, "passthrough=%"PRId64, &client_id);
if (ret < 0) { if (ret < 0) {
LOGDEBUG("Connector failed to parse passthrough command: %s", buf); LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry; goto retry;
@ -1250,6 +1249,7 @@ retry:
LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id);
goto retry; goto retry;
} }
client->lz4 = lz4;
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) { } else if (cmdmatch(buf, "getxfd")) {

17
src/stratifier.c

@ -5186,10 +5186,11 @@ static void add_mining_node(sdata_t *sdata, stratum_instance_t *client)
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *val, json_t *id_val, json_t *method_val,
json_t *params_val) json_t *params_val)
{ {
const char *method; const char *method;
bool var;
/* Random broken clients send something not an integer as the id so we /* Random broken clients send something not an integer as the id so we
* copy the json item for id_val as is for the response. By far the * copy the json item for id_val as is for the response. By far the
@ -5237,8 +5238,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and /* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */ * add it to the list of mining nodes in the stratifier */
json_get_bool(&var, val, "lz4");
add_mining_node(sdata, client); add_mining_node(sdata, client);
snprintf(buf, 255, "passthrough=%"PRId64, client_id); if (var)
snprintf(buf, 255, "passlz4=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
return; return;
} }
@ -5250,8 +5255,12 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No * is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this * data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */ * stratifier after this so drop the client in the stratifier. */
json_get_bool(&var, val, "lz4");
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
snprintf(buf, 255, "passthrough=%"PRId64, client_id); if (var)
snprintf(buf, 255, "passlz4=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
drop_client(ckp, sdata, client_id); drop_client(ckp, sdata, client_id);
return; return;
@ -5469,7 +5478,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
send_json_err(sdata, client_id, id_val, "-1:params not found"); send_json_err(sdata, client_id, id_val, "-1:params not found");
goto out; goto out;
} }
parse_method(ckp, sdata, client, client_id, id_val, method, params); parse_method(ckp, sdata, client, client_id, val, id_val, method, params);
out: out:
free_smsg(msg); free_smsg(msg);
} }

Loading…
Cancel
Save