Browse Source

Merge branch 'zlib' into multiproxy

master
Con Kolivas 9 years ago
parent
commit
e4876feb71
  1. 19
      configure.ac
  2. 4
      src/Makefile.am
  3. 287
      src/ckpool.c
  4. 6
      src/ckpool.h
  5. 108
      src/connector.c
  6. 14
      src/generator.c
  7. 12
      src/libckpool.c
  8. 2
      src/libckpool.h
  9. 13
      src/stratifier.c

19
configure.ac

@ -40,17 +40,11 @@ AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h) AC_CHECK_HEADERS(gsl/gsl_math.h gsl/gsl_cdf.h)
AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h) AC_CHECK_HEADERS(openssl/x509.h openssl/hmac.h)
AC_CHECK_HEADERS(zlib.h)
PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm"
RT_LIBS="-lrt"
AC_CONFIG_SUBDIRS([src/jansson-2.6]) AC_CONFIG_SUBDIRS([src/jansson-2.6])
JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a" JANSSON_LIBS="jansson-2.6/src/.libs/libjansson.a"
AC_SUBST(PTHREAD_LIBS)
AC_SUBST(MATH_LIBS)
AC_SUBST(RT_LIBS)
AC_SUBST(JANSSON_LIBS) AC_SUBST(JANSSON_LIBS)
AC_ARG_WITH([ckdb], AC_ARG_WITH([ckdb],
@ -58,6 +52,11 @@ AC_ARG_WITH([ckdb],
[ckdb=$withval] [ckdb=$withval]
) )
#AC_SEARCH_LIBS(whatgoeshere?, rt, , echo "Error: Required library realtime not found." && exit 1)
AC_SEARCH_LIBS(exp, m, , echo "Error: Required library math not found." && exit 1)
AC_SEARCH_LIBS(compress, z , , echo "Error: Required library zlib1g-dev not found." && exit 1)
AC_SEARCH_LIBS(pthread_mutex_trylock, pthread, , "Error: Required library pthreads not found." && exit 1)
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev
not found. Install it or disable postgresql support with --without-ckdb" && exit 1) not found. Install it or disable postgresql support with --without-ckdb" && exit 1)
@ -84,8 +83,10 @@ echo "Compilation............: make (or gmake)"
echo " CPPFLAGS.............: $CPPFLAGS" echo " CPPFLAGS.............: $CPPFLAGS"
echo " CFLAGS...............: $CFLAGS" echo " CFLAGS...............: $CFLAGS"
echo " LDFLAGS..............: $LDFLAGS" echo " LDFLAGS..............: $LDFLAGS"
echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" echo " LDADD................: $LIBS $JANSSON_LIBS"
echo " db LDADD.............: $DB_LIBS" if test "x$ckdb" != "xno"; then
echo " db LDADD.............: $LIBS $DB_LIBS $JANSSON_LIBS"
fi
echo echo
echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')"
echo " prefix...............: $prefix" echo " prefix...............: $prefix"

4
src/Makefile.am

@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/jansson-2.6/src
lib_LTLIBRARIES = libckpool.la lib_LTLIBRARIES = libckpool.la
libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h libckpool_la_SOURCES = libckpool.c libckpool.h sha2.c sha2.h
libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ libckpool_la_LIBADD = @LIBS@
bin_PROGRAMS = ckpool ckpmsg notifier bin_PROGRAMS = ckpool ckpmsg notifier
ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \
@ -23,5 +23,5 @@ if WANT_CKDB
bin_PROGRAMS += ckdb bin_PROGRAMS += ckdb
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \
ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h ckdb_crypt.c ckdb.h klist.c ktree.c klist.h ktree.h
ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @MATH_LIBS@ ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @DB_LIBS@ @LIBS@
endif endif

287
src/ckpool.c

@ -20,6 +20,7 @@
#include <getopt.h> #include <getopt.h>
#include <grp.h> #include <grp.h>
#include <jansson.h> #include <jansson.h>
#include <zlib.h>
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -35,6 +36,8 @@
ckpool_t *global_ckp; ckpool_t *global_ckp;
const char gzip_magic[] = "\x1f\xd5\x01\n";
static void proclog(ckpool_t *ckp, char *msg) static void proclog(ckpool_t *ckp, char *msg)
{ {
FILE *LOGFP; FILE *LOGFP;
@ -507,7 +510,170 @@ bool ping_main(ckpool_t *ckp)
void empty_buffer(connsock_t *cs) void empty_buffer(connsock_t *cs)
{ {
if (cs->buf)
cs->buf[0] = '\0';
cs->buflen = cs->bufofs = 0;
}
static void clear_bufline(connsock_t *cs)
{
if (unlikely(!cs->buf))
cs->buf = ckzalloc(PAGESIZE);
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
} else
cs->bufofs = 0;
}
static void add_bufline(connsock_t *cs, const char *readbuf, const int len)
{
int backoff = 1;
size_t buflen;
char *newbuf;
buflen = round_up_page(cs->bufofs + len + 1);
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, len);
cs->bufofs += len;
cs->buf[cs->bufofs] = '\0';
}
static int read_cs_length(connsock_t *cs, float *timeout, int len)
{
tv_t start, now;
float diff;
int ret;
tv_time(&start);
while (cs->bufofs < len) {
char readbuf[PAGESIZE];
int readlen;
if (*timeout < 0) {
LOGDEBUG("Timed out in read_cs_length");
ret = 0;
goto out;
}
ret = wait_read_select(cs->fd, *timeout);
if (ret < 1)
goto out;
readlen = len - cs->bufofs;
if (readlen >= PAGESIZE)
readlen = PAGESIZE - 4;
ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT);
if (ret < 1)
goto out;
add_bufline(cs, readbuf, ret);
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
}
ret = len;
out:
return ret;
}
static int read_gz_line(connsock_t *cs, float *timeout)
{
unsigned long compsize, res, decompsize;
char *buf, *dest = NULL, *eom;
int ret, buflen;
uint32_t msglen;
/* Remove gz header */
clear_bufline(cs);
/* Get data sizes */
ret = read_cs_length(cs, timeout, 8);
if (ret != 8) {
ret = -1;
goto out;
}
memcpy(&msglen, cs->buf, 4);
compsize = le32toh(msglen);
memcpy(&msglen, cs->buf + 4, 4);
decompsize = le32toh(msglen);
/* Remove the gz variables */
cs->buflen = cs->bufofs - 8;
cs->bufofs = 8;
clear_bufline(cs);
if (unlikely(compsize < 1 || compsize > 0x80000000 ||
decompsize < 1 || decompsize > 0x80000000)) {
LOGWARNING("Invalid message length comp %lu decomp %lu sent to read_gz_line", compsize, decompsize);
ret = -1;
goto out;
}
/* Get compressed data */
ret = read_cs_length(cs, timeout, compsize);
if (ret != (int)compsize) {
LOGWARNING("Failed to read %lu compressed bytes in read_gz_line, got %d", compsize, ret);
ret = -1;
goto out;
}
/* Clear out all the compressed data */
cs->buflen = cs->bufofs - compsize;
cs->bufofs = compsize;
clear_bufline(cs);
/* Do decompresion and buffer reconstruction here */
res = round_up_page(decompsize);
dest = ckalloc(res);
ret = uncompress((Bytef *)dest, &res, (Bytef *)cs->buf, compsize);
if (ret != Z_OK || res != decompsize) {
LOGWARNING("Failed to decompress %lu bytes in read_gz_line, result %d got %lu",
decompsize, ret, res);
ret = -1;
goto out;
}
eom = dest + decompsize - 1;
if (memcmp(eom, "\n", 1)) {
LOGWARNING("Failed to find EOM in decompressed data in read_gz_line");
ret = -1;
goto out;
}
*eom = '\0';
ret = decompsize - 1;
/* Wedge the decompressed buffer back to the start of cs->buf */
buf = cs->buf;
buflen = cs->bufofs;
cs->buf = dest;
dest = NULL;
cs->bufofs = decompsize;
if (buflen) {
add_bufline(cs, buf, buflen);
cs->buflen = buflen;
cs->bufofs = decompsize;
} else
cs->buflen = cs->bufofs = 0; cs->buflen = cs->bufofs = 0;
out:
free(dest);
if (ret < 1)
empty_buffer(cs);
return ret;
} }
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0' /* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
@ -517,93 +683,54 @@ int read_socket_line(connsock_t *cs, float *timeout)
{ {
char *eom = NULL; char *eom = NULL;
tv_t start, now; tv_t start, now;
size_t buflen;
int ret = -1; int ret = -1;
bool polled;
float diff; float diff;
if (unlikely(cs->fd < 0)) if (unlikely(cs->fd < 0))
goto out; goto out;
if (unlikely(!cs->buf)) clear_bufline(cs);
cs->buf = ckzalloc(PAGESIZE);
else if (cs->buflen) {
memmove(cs->buf, cs->buf + cs->bufofs, cs->buflen);
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
}
tv_time(&start); tv_time(&start);
rewait:
while (!eom) {
char readbuf[PAGESIZE];
if (*timeout < 0) { if (*timeout < 0) {
LOGDEBUG("Timed out in read_socket_line"); if (cs->ckp->proxy)
LOGINFO("Timed out in read_socket_line");
else
LOGERR("Timed out in read_socket_line");
ret = 0; ret = 0;
goto out; goto out;
} }
ret = wait_read_select(cs->fd, eom ? 0 : *timeout); ret = wait_read_select(cs->fd, *timeout);
polled = true; if (ret < 0) {
if (ret < 1) {
if (!ret) {
if (eom)
goto parse;
LOGDEBUG("Select timed out in read_socket_line");
} else {
if (cs->ckp->proxy) if (cs->ckp->proxy)
LOGINFO("Select failed in read_socket_line"); LOGINFO("Select failed in read_socket_line");
else else
LOGERR("Select failed in read_socket_line"); LOGERR("Select failed in read_socket_line");
}
goto out; goto out;
} }
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
while (42) {
char readbuf[PAGESIZE] = {};
int backoff = 1;
char *newbuf;
ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
/* No more to read or closed socket after valid message */ /* If we have done wait_read_select there should be
if (eom)
break;
/* Have we used up all the timeout yet? If polled is
* set that means poll has said there should be
* something to read and if we get nothing it means the * something to read and if we get nothing it means the
* socket is closed. */ * socket is closed. */
if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret))
goto rewait;
if (cs->ckp->proxy) if (cs->ckp->proxy)
LOGINFO("Failed to recv in read_socket_line"); LOGINFO("Failed to recv in read_socket_line");
else else
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
goto out; goto out;
} }
polled = false; add_bufline(cs, readbuf, ret);
buflen = cs->bufofs + ret + 1;
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, ret);
cs->bufofs += ret;
cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
} }
parse:
ret = eom - cs->buf; ret = eom - cs->buf;
cs->buflen = cs->buf + cs->bufofs - eom - 1; cs->buflen = cs->buf + cs->bufofs - eom - 1;
@ -616,7 +743,53 @@ out:
if (ret < 0) { if (ret < 0) {
empty_buffer(cs); empty_buffer(cs);
dealloc(cs->buf); dealloc(cs->buf);
} } else if (ret == 3 && !memcmp(cs->buf, gzip_magic, 3))
ret = read_gz_line(cs, timeout);
return ret;
}
/* gzip compressed block structure:
* - 4 byte magic header gzip_magic "\x1f\xd5\x01\n"
* - 4 byte LE encoded compressed size
* - 4 byte LE encoded decompressed size
*/
int write_cs(connsock_t *cs, const char *buf, int len)
{
unsigned long compsize, decompsize = len;
char *dest = NULL;
uint32_t msglen;
int ret;
/* Connsock doesn't expect gz compressed messages. Only compress if it's
* larger than one MTU. */
if (!cs->gz || len <= 1492)
return write_socket(cs->fd, buf, len);
compsize = round_up_page(len + 12);
dest = alloca(compsize);
/* Do compression here */
compsize -= 12;
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (ret != Z_OK) {
LOGINFO("Failed to gz compress in write_cs, writing uncompressed");
return write_socket(cs->fd, buf, len);
}
if (unlikely(compsize + 12 >= decompsize))
return write_socket(cs->fd, buf, len);
/* Copy gz magic header */
memcpy(dest, gzip_magic, 4);
/* Copy compressed message length */
msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4);
/* Copy decompressed message length */
msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4);
len = compsize + 12;
LOGDEBUG("Writing connsock message compressed %d from %lu", len, decompsize);
ret = write_socket(cs->fd, dest, len);
if (ret == len)
ret = decompsize;
else
ret = -1;
return ret; return ret;
} }

6
src/ckpool.h

@ -82,6 +82,9 @@ struct connsock {
ckpool_t *ckp; ckpool_t *ckp;
/* Semaphore used to serialise request/responses */ /* Semaphore used to serialise request/responses */
sem_t sem; sem_t sem;
/* Has the other end acknowledged it can receive gz compressed data */
bool gz;
}; };
typedef struct connsock connsock_t; typedef struct connsock connsock_t;
@ -281,6 +284,8 @@ static const char __maybe_unused *stratum_msgs[] = {
"" ""
}; };
extern const char gzip_magic[];
#ifdef USE_CKDB #ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else #else
@ -301,6 +306,7 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
int write_cs(connsock_t *cs, const char *buf, int len);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

108
src/connector.c

@ -15,6 +15,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <zlib.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -55,7 +56,7 @@ struct client_instance {
int server; int server;
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; unsigned long bufofs;
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
sender_send_t *sending; sender_send_t *sending;
@ -63,6 +64,12 @@ struct client_instance {
/* Is this the parent passthrough client */ /* Is this the parent passthrough client */
bool passthrough; bool passthrough;
/* Does this client expect gz compression? */
bool gz;
bool compressed; /* Currently receiving a compressed message */
unsigned long compsize; /* Expected compressed data size */
unsigned long decompsize; /* Expected decompressed data size */
/* Linked list of shares in redirector mode.*/ /* Linked list of shares in redirector mode.*/
share_t *shares; share_t *shares;
@ -472,12 +479,40 @@ retry:
if (ret < 1) { if (ret < 1) {
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return; return;
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s",
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
client->bufofs += ret; client->bufofs += ret;
compressed:
if (client->compressed) {
unsigned long res;
if (client->bufofs < client->compsize)
goto retry;
res = PAGESIZE - 4;
if (unlikely(client->decompsize > res)) {
LOGNOTICE("Client attempting to send oversize compressed message, disconnecting");
invalidate_client(ckp, cdata, client);
return;
}
ret = uncompress((Bytef *)msg, &res, (Bytef *)client->buf, client->compsize);
if (ret != Z_OK || res != client->decompsize) {
LOGNOTICE("Failed to decompress %lu from %lu bytes in parse_client_msg, got %d",
client->decompsize, client->compsize, ret);
invalidate_client(ckp, cdata, client);
return;
}
LOGDEBUG("Received client message compressed %lu from %lu",
client->compsize, client->decompsize);
msg[res] = '\0';
client->bufofs -= client->compsize;
if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs);
client->compressed = false;
goto parse;
}
reparse: reparse:
eol = memchr(client->buf, '\n', client->bufofs); eol = memchr(client->buf, '\n', client->bufofs);
if (!eol) if (!eol)
@ -490,11 +525,40 @@ reparse:
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
/* Look for a compression header */
if (!strncmp(client->buf, gzip_magic, 3)) {
uint32_t msglen;
/* Do we have the whole header? If not, keep reading */
if (client->bufofs < 12)
goto retry;
memcpy(&msglen, client->buf + 4, 4);
client->compsize = le32toh(msglen);
memcpy(&msglen, client->buf + 8, 4);
client->decompsize = le32toh(msglen);
if (unlikely(!client->compsize || !client->decompsize ||
client->compsize > MAX_MSGSIZE || client->decompsize > MAX_MSGSIZE)) {
LOGNOTICE("Client id %"PRId64" invalid compressed message size %lu/%lu, disconnecting",
client->id, client->compsize, client->decompsize);
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs -= 12;
if (client->bufofs > 0)
memmove(client->buf, client->buf + 12, client->bufofs);
client->compressed = true;
if (client->bufofs >= client->compsize)
goto compressed;
goto retry;
}
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
msg[buflen] = '\0'; msg[buflen] = '\0';
client->bufofs -= buflen; client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs); memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0'; client->buf[client->bufofs] = '\0';
parse:
if (!(val = json_loads(msg, 0, NULL))) { if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n"); char *buf = strdup("Invalid JSON, disconnecting\n");
@ -942,6 +1006,36 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
test_redirector_shares(ckp, client, buf); test_redirector_shares(ckp, client, buf);
} }
/* Does this client accept compressed data? Only compress if it's
* larger than one MTU. */
if (client->gz && len > 1492) {
unsigned long compsize, decompsize = len;
uint32_t msglen;
Bytef *dest;
int ret;
compsize = round_up_page(len);
dest = alloca(compsize);
ret = compress(dest, &compsize, (Bytef *)buf, len);
if (unlikely(ret != Z_OK)) {
LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret);
goto out;
}
if (unlikely(compsize + 12 >= decompsize))
goto out;
/* Copy gz magic header */
memcpy(buf, gzip_magic, 4);
/* Copy compressed message length */
msglen = htole32(compsize);
memcpy(buf + 4, &msglen, 4);
/* Copy decompressed message length */
msglen = htole32(decompsize);
memcpy(buf + 8, &msglen, 4);
memcpy(buf + 12, dest, compsize);
len = compsize + 12;
LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize);
}
out:
sender_send = ckzalloc(sizeof(sender_send_t)); sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client; sender_send->client = client;
sender_send->buf = buf; sender_send->buf = buf;
@ -971,7 +1065,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
LOGINFO("Connector adding passthrough client %"PRId64, client->id); LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true; client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true}\n"); ASPRINTF(&buf, "{\"result\": true, \"gz\": true}\n");
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }
@ -1148,9 +1242,14 @@ retry:
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) { } else if (cmdmatch(buf, "shutdown")) {
goto out; goto out;
} else if (cmdmatch(buf, "passthrough")) { } else if (cmdmatch(buf, "pass")) {
client_instance_t *client; client_instance_t *client;
bool gz = false;
if (strstr(buf, "gz")) {
gz = true;
ret = sscanf(buf, "passgz=%"PRId64, &client_id);
} else
ret = sscanf(buf, "passthrough=%"PRId64, &client_id); ret = sscanf(buf, "passthrough=%"PRId64, &client_id);
if (ret < 0) { if (ret < 0) {
LOGDEBUG("Connector failed to parse passthrough command: %s", buf); LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
@ -1161,6 +1260,7 @@ retry:
LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id); LOGINFO("Connector failed to find client id %"PRId64" to pass through", client_id);
goto retry; goto retry;
} }
client->gz = gz;
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) { } else if (cmdmatch(buf, "getxfd")) {

14
src/generator.c

@ -772,8 +772,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.passthrough", "method", "mining.passthrough",
"gz", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
json_decref(req); json_decref(req);
@ -798,6 +799,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied passthrough for stratum"); LOGWARNING("Denied passthrough for stratum");
goto out; goto out;
} }
json_get_bool(&cs->gz, val, "gz");
if (cs->gz)
LOGNOTICE("Negotiated gz compression with pool");
proxi->passthrough = true; proxi->passthrough = true;
out: out:
if (val) if (val)
@ -814,8 +818,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{ss,sb,s[s]}",
"method", "mining.node", "method", "mining.node",
"gz", json_true(),
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req); res = send_json_msg(cs, req);
@ -841,6 +846,9 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Denied node setup for stratum"); LOGWARNING("Denied node setup for stratum");
goto out; goto out;
} }
json_get_bool(&cs->gz, val, "gz");
if (cs->gz)
LOGNOTICE("Negotiated gz compression with pool");
proxi->node = true; proxi->node = true;
out: out:
if (val) if (val)
@ -1795,7 +1803,7 @@ static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm)
LOGDEBUG("Sending upstream json msg: %s", pm->msg); LOGDEBUG("Sending upstream json msg: %s", pm->msg);
len = strlen(pm->msg); len = strlen(pm->msg);
sent = write_socket(cs->fd, pm->msg, len); sent = write_cs(cs, pm->msg, len);
if (unlikely(sent != len)) { if (unlikely(sent != len)) {
LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect", LOGWARNING("Failed to passthrough %d bytes of message %s, attempting reconnect",
len, pm->msg); len, pm->msg);

12
src/libckpool.c

@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line)
return ptr; return ptr;
} }
/* Round up to the nearest page size for efficient malloc */
size_t round_up_page(size_t len)
{
int rem = len % PAGESIZE;
if (rem)
len += PAGESIZE - rem;
return len;
}
/* Adequate size s==len*2 + 1 must be alloced to use this variant */ /* Adequate size s==len*2 + 1 must be alloced to use this variant */
void __bin2hex(void *vs, const void *vp, size_t len) void __bin2hex(void *vs, const void *vp, size_t len)
{ {

2
src/libckpool.h

@ -531,6 +531,8 @@ void trail_slash(char **buf);
void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *_ckalloc(size_t len, const char *file, const char *func, const int line);
void *json_ckalloc(size_t size); void *json_ckalloc(size_t size);
void *_ckzalloc(size_t len, const char *file, const char *func, const int line); void *_ckzalloc(size_t len, const char *file, const char *func, const int line);
size_t round_up_page(size_t len);
extern const int hex2bin_tbl[]; extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len); void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len); void *bin2hex(const void *vp, size_t len);

13
src/stratifier.c

@ -5186,10 +5186,11 @@ static void add_mining_node(sdata_t *sdata, stratum_instance_t *client)
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *val, json_t *id_val, json_t *method_val,
json_t *params_val) json_t *params_val)
{ {
const char *method; const char *method;
bool var;
/* Random broken clients send something not an integer as the id so we /* Random broken clients send something not an integer as the id so we
* copy the json item for id_val as is for the response. By far the * copy the json item for id_val as is for the response. By far the
@ -5237,7 +5238,11 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and /* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */ * add it to the list of mining nodes in the stratifier */
json_get_bool(&var, val, "gz");
add_mining_node(sdata, client); add_mining_node(sdata, client);
if (var)
snprintf(buf, 255, "passgz=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
return; return;
@ -5250,7 +5255,11 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No * is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this * data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */ * stratifier after this so drop the client in the stratifier. */
json_get_bool(&var, val, "gz");
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
if (var)
snprintf(buf, 255, "passgz=%"PRId64, client_id);
else
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
drop_client(ckp, sdata, client_id); drop_client(ckp, sdata, client_id);
@ -5469,7 +5478,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
send_json_err(sdata, client_id, id_val, "-1:params not found"); send_json_err(sdata, client_id, id_val, "-1:params not found");
goto out; goto out;
} }
parse_method(ckp, sdata, client, client_id, id_val, method, params); parse_method(ckp, sdata, client, client_id, val, id_val, method, params);
out: out:
free_smsg(msg); free_smsg(msg);
} }

Loading…
Cancel
Save