Browse Source

Roll back to M22

master
Con Kolivas 9 years ago
parent
commit
fbbed17d7a
  1. 107
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 275
      src/connector.c
  4. 44
      src/generator.c
  5. 253
      src/libckpool.c
  6. 23
      src/libckpool.h
  7. 140
      src/stratifier.c

107
src/ckpool.c

@ -223,7 +223,6 @@ static void *unix_receiver(void *arg)
while (42) {
unix_msg_t *umsg;
uint32_t msglen;
char *buf;
sockd = accept(rsockd, NULL, NULL);
@ -232,7 +231,7 @@ static void *unix_receiver(void *arg)
childsighandler(15);
break;
}
buf = recv_unix(sockd, &msglen);
buf = recv_unix_msg(sockd);
if (unlikely(!buf)) {
Close(sockd);
LOGWARNING("Failed to get message on %s socket", qname);
@ -241,7 +240,6 @@ static void *unix_receiver(void *arg)
umsg = ckalloc(sizeof(unix_msg_t));
umsg->sockd = sockd;
umsg->buf = buf;
umsg->msglen = msglen;
mutex_lock(&pi->rmsg_lock);
DL_APPEND(pi->unix_msgs, umsg);
@ -579,7 +577,6 @@ static void clear_bufline(connsock_t *cs)
if (unlikely(!cs->buf)) {
socklen_t optlen = sizeof(cs->rcvbufsiz);
cs->bufofs = 0;
cs->buf = ckzalloc(PAGESIZE);
cs->bufsize = PAGESIZE;
getsockopt(cs->fd, SOL_SOCKET, SO_RCVBUF, &cs->rcvbufsiz, &optlen);
@ -590,8 +587,8 @@ static void clear_bufline(connsock_t *cs)
memset(cs->buf + cs->buflen, 0, cs->bufofs);
cs->bufofs = cs->buflen;
cs->buflen = 0;
cs->buf[cs->bufofs] = '\0';
}
cs->buf[cs->bufofs] = '\0';
}
static void add_buflen(ckpool_t *ckp, connsock_t *cs, const char *readbuf, const int len)
@ -641,62 +638,6 @@ static int recv_available(ckpool_t *ckp, connsock_t *cs)
return len;
}
static bool read_cs_length(ckpool_t *ckp, connsock_t *cs, float *timeout, int len)
{
bool ret = false;
tv_t start;
tv_time(&start);
while (cs->bufofs < len) {
float diff;
tv_t now;
int res;
if (*timeout < 0) {
LOGDEBUG("Timed out in read_cs_length");
ret = 0;
goto out;
}
res = wait_read_select(cs->fd, *timeout);
if (res < 1)
goto out;
res = recv_available(ckp, cs);
if (res < 1)
goto out;
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
}
ret = true;
out:
return ret;
}
static char *bkey_eom(ckpool_t *ckp, connsock_t *cs, char *bkey, float *timeout)
{
int slen, msglen;
char *ret;
/* String length till bkey */
slen = bkey - cs->buf - 1;
if (!read_cs_length(ckp, cs, timeout, slen + BKEY_LENOFS + BKEY_LENLEN)) {
LOGWARNING("Failed to read cs bkey header");
ret = cs->buf + cs->bufofs;
goto out;
}
msglen = bkey_len(bkey);
if (!read_cs_length(ckp, cs, timeout, slen + msglen)) {
LOGWARNING("Failed to read cs bkey msg length %d", msglen);
ret = cs->buf + cs->bufofs;
goto out;
}
ret = cs->buf + slen + msglen;
out:
return ret;
}
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. Returns length of the line if a
@ -704,16 +645,16 @@ out:
* and -1 on error. */
int read_socket_line(connsock_t *cs, float *timeout)
{
char *eom = NULL, *bkey = NULL;
ckpool_t *ckp = cs->ckp;
bool proxy = ckp->proxy;
char *eom = NULL;
tv_t start, now;
float diff;
int ret;
clear_bufline(cs);
recv_available(ckp, cs); // Intentionally ignore return value
eom = memchr(cs->buf, '\n', cs->bufofs);
eom = strchr(cs->buf, '\n');
tv_time(&start);
@ -751,24 +692,20 @@ int read_socket_line(connsock_t *cs, float *timeout)
ret = -1;
goto out;
}
eom = memchr(cs->buf, '\n', cs->bufofs);
eom = strchr(cs->buf, '\n');
tv_time(&now);
diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff;
}
ret = eom - cs->buf;
if (unlikely(ret > 5 && (bkey = strstr(cs->buf + ret - 5, "bkey\n")))) {
eom = bkey_eom(ckp, cs, bkey, timeout);
ret = eom - cs->buf;
} else
*eom = '\0';
if (cs->bufofs > ret + 1) {
cs->buflen = cs->bufofs - ret - 1;
cs->bufofs = ret + 1;
} else
cs->buflen = cs->buf + cs->bufofs - eom - 1;
if (cs->buflen)
cs->bufofs = eom - cs->buf + 1;
else
cs->bufofs = 0;
*eom = '\0';
out:
if (ret < 0) {
empty_buffer(cs);
@ -779,13 +716,17 @@ out:
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file,
const char *func, const int line)
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path;
bool ret = false;
int sockd;
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return;
}
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
@ -810,7 +751,7 @@ void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const c
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix(sockd, msg, len)))
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
ret = true;
@ -820,18 +761,6 @@ out:
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
}
/* As per send_proc_data but must be a string */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
uint32_t len;
if (unlikely(!msg || !(len = strlen(msg)))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return;
}
return _send_proc_data(pi, msg, len, file, func, line);
}
/* Send a single message to a process instance and retrieve the response, then
* close the socket. */
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

4
src/ckpool.h

@ -39,7 +39,6 @@ struct unix_msg {
unix_msg_t *prev;
int sockd;
char *buf;
uint32_t msglen;
};
struct ckmsgq {
@ -326,9 +325,6 @@ void empty_buffer(connsock_t *cs);
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len);
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len);
int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file,
const char *func, const int line);
#define send_proc_data(pi, msg, len) _send_proc_data(pi, msg, len, __FILE__, __func__, __LINE__)
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

275
src/connector.c

@ -78,9 +78,6 @@ struct client_instance {
/* The size of the socket send buffer */
int sendbufsize;
/* Does this client accept bkeys */
bool bkey;
};
struct sender_send {
@ -163,13 +160,6 @@ struct connector_data {
typedef struct connector_data cdata_t;
struct binmsg {
char *buf;
int len;
};
typedef struct binmsg binmsg_t;
/* Increase the reference count of instance */
static void __inc_instance_ref(client_instance_t *client)
{
@ -437,7 +427,7 @@ static void drop_all_clients(cdata_t *cdata)
ck_wunlock(&cdata->lock);
}
static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, int len);
static void send_client(cdata_t *cdata, int64_t id, char *buf);
/* Look for shares being submitted via a redirector and add them to a linked
* list for looking up the responses. */
@ -467,29 +457,12 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val)
}
}
static void send_client_msg(cdata_t *cdata, const int64_t id, char *buf)
{
uint32_t len;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
return;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
free(buf);
return;
}
send_client(cdata, id, buf, len, len);
}
/* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{
int buflen, ret, slen = 0, blen = 0;
ckpool_t *ckp = cdata->ckp;
char *eol, *bkey = NULL;
int buflen, ret;
char *msg, *eol;
json_t *val;
retry:
@ -517,41 +490,31 @@ reparse:
eol = memchr(client->buf, '\n', client->bufofs);
if (!eol)
goto retry;
if (unlikely(client->bufofs > 5 && (bkey = strstr(eol - 5, "bkey\n" )))) {
int len;
/* We have bkey data. Do we have enough to parse it? */
slen = bkey - client->buf - 1;
len = client->bufofs - slen;
if (len < BKEY_LENOFS + BKEY_LENLEN)
goto retry;
blen = bkey_len(bkey);
if (len < blen)
goto retry;
buflen = slen + blen + 1;
} else
buflen = eol - client->buf + 1;
/* Do something useful with this message now */
buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) {
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd);
invalidate_client(ckp, cdata, client);
return;
}
if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) {
msg = alloca(round_up_page(buflen + 1));
memcpy(msg, client->buf, buflen);
msg[buflen] = '\0';
client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0';
if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf);
send_client_msg(cdata, client->id, buf);
LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg);
send_client(cdata, client->id, buf);
invalidate_client(ckp, cdata, client);
return;
} else {
char *s;
if (unlikely(blen)) {
json_append_bkeys(val, bkey, blen);
blen = 0;
}
if (client->passthrough) {
int64_t passthrough_id;
@ -559,7 +522,7 @@ reparse:
passthrough_id = (client->id << 32) | passthrough_id;
json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id));
} else {
if (ckp->redirector && !client->redirected && strstr(client->buf, "mining.submit"))
if (ckp->redirector && !client->redirected && strstr(msg, "mining.submit"))
parse_redirector_share(client, val);
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
@ -580,10 +543,6 @@ reparse:
free(s);
json_decref(val);
}
client->bufofs -= buflen;
if (client->bufofs)
memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0';
if (client->bufofs)
goto reparse;
@ -928,17 +887,24 @@ out:
/* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */
static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, int len)
static void send_client(cdata_t *cdata, const int64_t id, char *buf)
{
ckpool_t *ckp = cdata->ckp;
sender_send_t *sender_send;
client_instance_t *client;
uint32_t blen = 0;
char *bkey = NULL;
json_t *val;
int len;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
return;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
free(buf);
return;
}
/* Node messages will have come from the generator which will have
* already extracted any bkeys */
if (unlikely(ckp->node && !id)) {
LOGDEBUG("Message for node: %s", buf);
send_proc(ckp->stratifier, buf);
@ -946,12 +912,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, i
return;
}
if (unlikely(len > slen)) {
bkey = strstr(buf + slen - 5, "bkey\n");
if (bkey)
blen = len - (bkey - buf);
}
/* Grab a reference to this client until the sender_send has
* completed processing. Is this a passthrough subclient ? */
if (id > 0xffffffffll) {
@ -983,19 +943,14 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, i
return;
}
if (ckp->node) {
json_t *val = json_loads(buf, 0, NULL);
char *msg;
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (!val) {
// Can happen if client sent invalid json message
len = slen;
if (!val) // Can happen if client sent invalid json message
goto out;
}
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server));
if (bkey)
json_append_bkeys(val, bkey, blen);
msg = json_dumps(val, JSON_COMPACT);
json_decref(val);
send_proc(ckp->stratifier, msg);
@ -1004,22 +959,6 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, i
if (ckp->redirector && !client->redirected)
test_redirector_shares(ckp, client, buf);
}
/* Append bkeys as regular json for clients that can't decode them */
if (unlikely(bkey && !client->bkey)) {
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGERR("Failed to decode json in bkey encoded message %s", buf);
bkey = '\0';
len = strlen(buf);
goto out;
}
json_append_bkeys(val, bkey, blen);
free(buf);
buf = json_dumps(val, JSON_COMPACT | JSON_EOL);
len = strlen(buf);
json_decref(val);
}
out:
sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client;
@ -1050,8 +989,8 @@ static void passthrough_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t
LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true;
ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n");
send_client_msg(cdata, client->id, buf);
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(cdata, client->id, buf);
if (!ckp->rmem_warn)
set_recvbufsize(ckp, client->fd, 1048576);
if (!ckp->wmem_warn)
@ -1065,12 +1004,10 @@ static void remote_server(ckpool_t *ckp, cdata_t *cdata, client_instance_t *clie
LOGWARNING("Connector adding client %"PRId64" %s as remote trusted server",
client->id, client->address_name);
client->remote = true;
ASPRINTF(&buf, "{\"result\":true,\"bkey\":true}\n");
send_client_msg(cdata, client->id, buf);
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(cdata, client->id, buf);
if (!ckp->rmem_warn)
set_recvbufsize(ckp, client->fd, 2097152);
if (!ckp->wmem_warn)
client->sendbufsize = set_sendbufsize(ckp, client->fd, 2097152);
set_recvbufsize(ckp, client->fd, 1048576);
}
static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
@ -1088,14 +1025,11 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
keep_sockalive(cs->fd);
/* We want large send buffers for upstreaming messages */
if (!ckp->rmem_warn)
set_recvbufsize(ckp, cs->fd, 2097152);
if (!ckp->wmem_warn)
cs->sendbufsiz = set_sendbufsize(ckp, cs->fd, 2097152);
cs->sendbufsiz = set_sendbufsize(ckp, cs->fd, 1048576);
JSON_CPACK(req, "{ss,sb,s[s]}",
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.remote",
"bkey", true,
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
json_decref(req);
@ -1126,20 +1060,21 @@ out:
return ret;
}
static void usend_process(ckpool_t *ckp, binmsg_t *binmsg)
static void usend_process(ckpool_t *ckp, char *buf)
{
cdata_t *cdata = ckp->data;
connsock_t *cs = &cdata->upstream_cs;
int sent;
int len, sent;
if (unlikely(!binmsg->buf || !strlen(binmsg->buf))) {
if (unlikely(!buf || !strlen(buf))) {
LOGERR("Send empty message to usend_process");
goto out;
}
LOGDEBUG("Sending upstream msg: %s", binmsg->buf);
LOGDEBUG("Sending upstream msg: %s", buf);
len = strlen(buf);
while (42) {
sent = write_socket(cs->fd, binmsg->buf, binmsg->len);
if (sent == binmsg->len)
sent = write_socket(cs->fd, buf, len);
if (sent == len)
break;
if (cs->fd > 0) {
LOGWARNING("Upstream pool failed, attempting reconnect while caching messages");
@ -1150,69 +1085,28 @@ static void usend_process(ckpool_t *ckp, binmsg_t *binmsg)
while (!connect_upstream(ckp, cs));
}
out:
free(binmsg->buf);
free(binmsg);
free(buf);
}
static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf)
{
json_t *hash_val, *data_val;
const char *hash, *data;
char *gbt_block;
hash_val = json_object_get(val, "hash");
hash = json_string_value(hash_val);
data_val = json_object_get(val, "data");
data = json_string_value(data_val);
if (unlikely(!hash || !data)) {
LOGWARNING("Failed to extract hash and data from remote submitblock msg %s", buf);
const char *gbt_block = json_string_value(json_object_get(val, "submitblock"));
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to find submitblock data from upstream submitblock method %s",
buf);
return;
}
ASPRINTF(&gbt_block, "submitblock:%s,%s", hash, data);
LOGWARNING("Submitting possible upstream block!");
send_proc(ckp->generator, gbt_block);
free(gbt_block);
}
static void ping_upstream(cdata_t *cdata)
{
binmsg_t *binmsg = ckalloc(sizeof(binmsg_t));
ASPRINTF(&binmsg->buf, "{\"method\":\"ping\"}\n");
binmsg->len = strlen(binmsg->buf);
ckmsgq_add(cdata->upstream_sends, binmsg);
}
static json_t *urecv_loads(const char *buf, const int len)
{
json_t *val = NULL;
char *bkey = NULL;
int slen;
slen = strlen(buf);
if (unlikely(!slen)) {
LOGWARNING("Received empty message from upstream pool");
goto out;
}
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGWARNING("Received non-json msg from upstream pool %s",
buf);
goto out;
}
if (len == slen)
goto out;
bkey = strstr(buf + slen - 5, "bkey\n");
if (likely(bkey)) {
int blen;
char *buf;
LOGDEBUG("Bkey found in upstream pool msg");
blen = len - (bkey - buf);
json_append_bkeys(val, bkey, blen);
} else
LOGWARNING("Non-bkey extranous data from upstream pool msg %s", buf);
out:
return val;
ASPRINTF(&buf, "{\"method\":\"ping\"}\n");
ckmsgq_add(cdata->upstream_sends, buf);
}
static void *urecv_process(void *arg)
@ -1247,9 +1141,12 @@ static void *urecv_process(void *arg)
goto nomsg;
}
alive = true;
val = urecv_loads(cs->buf, ret);
if (unlikely(!val))
val = json_loads(cs->buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Received non-json msg from upstream pool %s",
cs->buf);
goto nomsg;
}
method = json_string_value(json_object_get(val, "method"));
if (unlikely(!method)) {
LOGWARNING("Failed to find method from upstream pool json %s",
@ -1302,22 +1199,13 @@ out:
return ret;
}
static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
static void process_client_msg(cdata_t *cdata, const char *buf)
{
char *msg, *bkey = NULL;
uint32_t slen, blen = 0;
int64_t client_id;
json_t *json_msg;
char *msg;
slen = strlen(buf);
if (likely(slen > 5)) {
bkey = strstr(buf + slen - 5, "bkey\n");
if (bkey) {
LOGDEBUG("Bkey found in process_client_msg");
blen = msglen - (bkey - buf);
}
}
json_msg = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
@ -1332,16 +1220,7 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
slen = strlen(msg);
if (blen) {
/* We're overwriting the EOL so remove it from msglen */
msglen = slen + blen - 1;
msg = realloc(msg, msglen);
/* Overwrite the EOL here */
memcpy(msg + slen - 1, bkey, blen);
send_client(cdata, client_id, msg, slen, msglen);
} else
send_client(cdata, client_id, msg, slen, slen);
send_client(cdata, client_id, msg);
json_decref(json_msg);
}
@ -1356,7 +1235,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id)
/* We have a direct connection to the passthrough's connector so we
* can send it any regular commands. */
ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id);
send_client_msg(cdata, id, msg);
send_client(cdata, id, msg);
}
static char *connector_stats(cdata_t *cdata, const int runtime)
@ -1454,16 +1333,12 @@ retry:
/* The bulk of the messages will be json messages to send to clients
* so look for them first. */
if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf, umsg->msglen);
process_client_msg(cdata, buf);
} else if (cmdmatch(buf, "upstream=")) {
binmsg_t *binmsg = ckalloc(sizeof(binmsg_t));
char *msg = strdup(buf + 9);
binmsg->buf = ckalloc(umsg->msglen);
binmsg->len = umsg->msglen - 9;
memcpy(binmsg->buf, buf + 9, binmsg->len);
LOGDEBUG("Upstreaming %s", umsg->buf);
ckmsgq_add(cdata->upstream_sends, binmsg);
LOGDEBUG("Upstreaming %s", msg);
ckmsgq_add(cdata->upstream_sends, msg);
goto retry;
} else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client;
@ -1549,22 +1424,6 @@ retry:
}
remote_server(ckp, cdata, client);
dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "bkeyclient")) {
client_instance_t *client;
ret = sscanf(buf, "bkeyclient=%"PRId64, &client_id);
if (ret < 0) {
LOGDEBUG("Connector failed to parse bkeyclient command: %s", buf);
goto retry;
}
client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %"PRId64" to set bkey", client_id);
goto retry;
}
LOGINFO("Set bkey on client %"PRId64, client_id);
client->bkey = true;
dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) {
int fdno = -1;

44
src/generator.c

@ -705,9 +705,8 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false;
float timeout = 10;
JSON_CPACK(req, "{ss,sb,s[s]}",
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.passthrough",
"bkey", true,
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
json_decref(req);
@ -748,9 +747,8 @@ static bool node_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool res, ret = false;
float timeout = 10;
JSON_CPACK(req, "{ss,sb,s[s]}",
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.node",
"bkey", true,
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
@ -1922,37 +1920,6 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi);
}
static void forward_passthrough_msg(ckpool_t *ckp, char *buf, int len)
{
int slen = strlen(buf);
char *bkey = NULL;
if (unlikely(len > slen)) {
bkey = strstr(buf + slen - 5, "bkey\n");
if (bkey) {
json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
int blen;
LOGDEBUG("Bkey found in forward_passthrough_msg");
blen = len - (bkey - buf);
if (unlikely(!val)) {
LOGWARNING("No json in bkey appended message %s", buf);
goto out;
}
json_append_bkeys(val, bkey, blen);
buf = json_dumps(val, JSON_COMPACT);
json_decref(val);
LOGDEBUG("Passthrough recv received upstream bkey msg: %s", buf);
send_proc(ckp->connector, buf);
free(buf);
return;
}
}
out:
LOGDEBUG("Passthrough recv received upstream msg: %s", buf);
send_proc(ckp->connector, buf);
}
/* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg)
@ -1987,9 +1954,10 @@ static void *passthrough_recv(void *arg)
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
if (likely(ret > 0))
forward_passthrough_msg(ckp, cs->buf, ret);
else if (ret < 0) {
if (likely(ret > 0)) {
LOGDEBUG("Passthrough recv received upstream msg: %s", cs->buf);
send_proc(ckp->connector, cs->buf);
} else if (ret < 0) {
/* Read failure */
LOGWARNING("Passthrough %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect",
proxi->id, proxi->url);

253
src/libckpool.c

@ -795,7 +795,7 @@ int write_socket(int fd, const void *buf, size_t nbyte)
{
int ret;
ret = wait_write_select(fd, 60);
ret = wait_write_select(fd, 5);
if (ret < 1) {
if (!ret)
LOGNOTICE("Select timed out in write_socket");
@ -997,13 +997,12 @@ int read_length(int sockd, void *buf, int len)
/* Use a standard message across the unix sockets:
* 4 byte length of message as little endian encoded uint32_t followed by the
* string. Return NULL in case of failure. */
char *_recv_unix(int sockd, uint32_t *msglen, int timeout1, int timeout2, const char *file,
const char *func, const int line)
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line)
{
char *buf = NULL;
uint32_t msglen;
int ret, ern;
*msglen = 0;
ret = wait_read_select(sockd, timeout1);
if (unlikely(ret < 1)) {
ern = errno;
@ -1011,15 +1010,15 @@ char *_recv_unix(int sockd, uint32_t *msglen, int timeout1, int timeout2, const
goto out;
}
/* Get message length */
ret = read_length(sockd, msglen, 4);
ret = read_length(sockd, &msglen, 4);
if (unlikely(ret < 4)) {
ern = errno;
LOGERR("Failed to read 4 byte length in recv_unix_msg (%d?)", ern);
goto out;
}
*msglen = le32toh(*msglen);
if (unlikely(*msglen < 1 || *msglen > 0x80000000)) {
LOGWARNING("Invalid message length %u sent to recv_unix_msg", *msglen);
msglen = le32toh(msglen);
if (unlikely(msglen < 1 || msglen > 0x80000000)) {
LOGWARNING("Invalid message length %u sent to recv_unix_msg", msglen);
goto out;
}
ret = wait_read_select(sockd, timeout2);
@ -1028,11 +1027,11 @@ char *_recv_unix(int sockd, uint32_t *msglen, int timeout1, int timeout2, const
LOGERR("Select2 failed in recv_unix_msg (%d)", ern);
goto out;
}
buf = ckzalloc(*msglen + 1);
ret = read_length(sockd, buf, *msglen);
if (unlikely(ret < (int)*msglen)) {
buf = ckzalloc(msglen + 1);
ret = read_length(sockd, buf, msglen);
if (unlikely(ret < (int)msglen)) {
ern = errno;
LOGERR("Failed to read %u bytes in recv_unix_msg (%d?)", *msglen, ern);
LOGERR("Failed to read %u bytes in recv_unix_msg (%d?)", msglen, ern);
dealloc(buf);
}
out:
@ -1042,13 +1041,6 @@ out:
return buf;
}
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line)
{
uint32_t msglen;
return _recv_unix(sockd, &msglen, timeout1, timeout2, file, func, line);
}
/* Emulate a select write wait for high fds that select doesn't support */
int wait_write_select(int sockd, float timeout)
{
@ -1093,64 +1085,56 @@ int _write_length(int sockd, const void *buf, int len, const char *file, const c
return ofs;
}
bool _send_unix(int sockd, const char *buf, uint32_t len, int timeout, const char *file,
const char *func, const int line)
bool _send_unix_msg(int sockd, const char *buf, int timeout, const char *file, const char *func, const int line)
{
uint32_t msglen, len;
bool retval = false;
uint32_t msglen;
int ret, ern;
if (unlikely(sockd < 0)) {
LOGWARNING("Attempting to send unix message to invalidated sockd %d", sockd);
goto out;
}
if (unlikely(!buf)) {
LOGWARNING("Null message sent to send_unix_msg");
goto out;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Zero length message sent to send_unix_msg");
goto out;
}
msglen = htole32(len);
ret = wait_write_select(sockd, timeout);
if (unlikely(ret < 1)) {
ern = errno;
LOGERR("Select1 failed in send_unix (%d)", ern);
LOGERR("Select1 failed in send_unix_msg (%d)", ern);
goto out;
}
ret = _write_length(sockd, &msglen, 4, file, func, line);
if (unlikely(ret < 4)) {
LOGERR("Failed to write 4 byte length in send_unix");
LOGERR("Failed to write 4 byte length in send_unix_msg");
goto out;
}
ret = wait_write_select(sockd, timeout);
if (unlikely(ret < 1)) {
ern = errno;
LOGERR("Select2 failed in send_unix (%d)", ern);
LOGERR("Select2 failed in send_unix_msg (%d)", ern);
goto out;
}
ret = _write_length(sockd, buf, len, file, func, line);
if (unlikely(ret < 0)) {
LOGERR("Failed to write %d bytes in send_unix", len);
LOGERR("Failed to write %d bytes in send_unix_msg", len);
goto out;
}
retval = true;
out:
shutdown(sockd, SHUT_WR);
if (unlikely(!retval))
LOGERR("Failure in send_unix from %s %s:%d", file, func, line);
LOGERR("Failure in send_unix_msg from %s %s:%d", file, func, line);
return retval;
}
bool _send_unix_msg(int sockd, const char *buf, int timeout, const char *file, const char *func, const int line)
{
uint32_t len;
if (unlikely(!buf)) {
LOGWARNING("Null message sent to send_unix_msg");
return NULL;
}
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Zero length message sent to send_unix_msg");
return NULL;
}
return _send_unix(sockd, buf, len, timeout, file, func, line);
}
bool _send_unix_data(int sockd, const struct msghdr *msg, const char *file, const char *func, const int line)
{
bool retval = false;
@ -1267,189 +1251,6 @@ out:
}
/* Bkey structure:
* "bkey\n\0"
* 32 bit LE encoded bkey length
* "$keyname\0"
* 32 bit LE encoded key length
* binary data for key
* append further key:len:data combinations
*/
static inline uint32_t *bkey_lenptr(const char *bkey)
{
return (uint32_t *)(bkey + BKEY_LENOFS);
}
/* Create an empty binary key object */
char *bkey_object(void)
{
char *bkey = ckalloc(PAGESIZE);
uint32_t *lenptr;
sprintf(bkey, "bkey\n");
lenptr = bkey_lenptr(bkey);
*lenptr = htole32(BKEY_LENOFS + BKEY_LENLEN);
return bkey;
}
/* Extract bkey length */
uint32_t bkey_len(const char *bkey)
{
uint32_t *lenptr = bkey_lenptr(bkey);
return le32toh(*lenptr);
}
/* Add binary from hex to a bkey message */
void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *file, const char *func, const int line)
{
uint32_t msglen, *lenptr, newlen;
int hlen, len;
if (unlikely(!*bkey || !key || !hex)) {
LOGEMERG("Null sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
hlen = strlen(hex) / 2;
if (unlikely(!hlen)) {
LOGERR("Zero length hex sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
len = strlen(key);
if (unlikely(!len)) {
LOGERR("Zero length key sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
/* Null terminator */
len += 1;
/* Get current message length */
lenptr = bkey_lenptr(*bkey);
msglen = le32toh(*lenptr);
/* Add $key+length+bin */
newlen = msglen + len + BKEY_LENLEN + hlen;
*bkey = realloc(*bkey, round_up_page(newlen));
/* Append keyname */
LOGDEBUG("Adding key %s @ ofs %u", key, msglen);
sprintf(*bkey + msglen, "%s", key);
msglen += len;
/* Append bin length */
LOGDEBUG("Adding len %u @ ofs %u", hlen, msglen);
lenptr = (uint32_t *)(*bkey + msglen);
*lenptr = htole32(hlen);
msglen += BKEY_LENLEN;
/* Append binary data */
LOGDEBUG("Adding hex of len %u @ ofs %u %s", hlen, msglen, hex);
hex2bin(*bkey + msglen, hex, hlen);
/* Adjust message length header */
lenptr = bkey_lenptr(*bkey);
*lenptr = htole32(newlen);
}
void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line)
{
uint32_t msglen, *lenptr, newlen;
int len;
if (unlikely(!*bkey || !key || !bin)) {
LOGEMERG("Null sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
if (unlikely(!blen)) {
LOGERR("Zero length bin sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
len = strlen(key);
if (unlikely(!len)) {
LOGERR("Zero length key sent to bkey_add from %s %s:%d",
file, func, line);
return;
}
/* Null terminator */
len += 1;
/* Get current message length */
lenptr = bkey_lenptr(*bkey);
msglen = le32toh(*lenptr);
/* Add $key+length+bin */
newlen = msglen + len + BKEY_LENLEN + blen;
*bkey = realloc(*bkey, round_up_page(newlen));
/* Append keyname */
LOGDEBUG("Adding key %s @ ofs %u", key, msglen);
sprintf(*bkey + msglen, "%s", key);
msglen += len;
/* Append bin length */
LOGDEBUG("Adding len %u @ ofs %u", blen, msglen);
lenptr = (uint32_t *)(*bkey + msglen);
*lenptr = htole32(blen);
msglen += BKEY_LENLEN;
/* Append binary data */
LOGDEBUG("Adding bin of len %u @ ofs %u", blen, msglen);
memcpy(*bkey + msglen, bin, blen);
/* Adjust message length header */
lenptr = bkey_lenptr(*bkey);
*lenptr = htole32(newlen);
}
bool _json_append_bkeys(json_t *val, const char *bkey, const uint32_t len, const char *file,
const char *func, const int line)
{
uint32_t ofs = BKEY_LENOFS + BKEY_LENLEN;
uint32_t msglen;
msglen = bkey_len(bkey);
if (unlikely(!msglen || msglen > 0x80000000)) {
LOGDEBUG("Invalid msglen %u sent to json_append_bkey from %s %s:%d",
msglen, file, func, line);
return false;
}
while (ofs < msglen) {
uint32_t binlen, *lenptr;
const char *key;
char *hex;
key = bkey + ofs;
LOGDEBUG("Found key %s @ ofs %u", key, ofs);
ofs += strlen(key) + 1;
if (unlikely(ofs >= len)) {
LOGDEBUG("Unable to seek to bkey offset %u beyond length %d",
ofs, len);
return false;
}
lenptr = (uint32_t *)(bkey + ofs);
binlen = le32toh(*lenptr);
LOGDEBUG("Found binlen %u @ ofs %u", binlen, ofs);
ofs += BKEY_LENLEN;
if (unlikely(ofs >= len)) {
LOGDEBUG("Unable to seek to bkey offset %u beyond length %d",
ofs, len);
return false;
}
hex = bin2hex(bkey + ofs, binlen);
LOGDEBUG("Found hex %s @ ofs %u", hex, ofs);
json_set_string(val, key, hex);
free(hex);
ofs += binlen;
}
return true;
}
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line)
{
if (likely(val))

23
src/libckpool.h

@ -318,21 +318,6 @@ struct unixsock {
typedef struct unixsock unixsock_t;
#define BKEY_LENOFS 6
#define BKEY_LENLEN 4
char *bkey_object(void);
uint32_t bkey_len(const char *bkey);
void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *file, const char *func, const int line);
#define bkey_add_hex(bkey, key, hex) _bkey_add_hex(&(bkey), key, hex, __FILE__, __func__, __LINE__)
void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line);
#define bkey_add_bin(bkey, key, bin) _bkey_add_bin(&(bkey), key, bin, __FILE__, __func__, __LINE__)
bool _json_append_bkeys(json_t *val, const char *bkey, const uint32_t len, const char *file,
const char *func, const int line);
#define json_append_bkeys(val, bkey, len) _json_append_bkeys(val, bkey, len, __FILE__, __func__, __LINE__)
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line);
#define json_check(VAL, ERR) _json_check(VAL, ERR, __FILE__, __func__, __LINE__)
@ -346,7 +331,7 @@ void _json_check(json_t *val, json_error_t *err, const char *file, const char *f
/* No error checking with these, make sure we know they're valid already! */
static inline void json_strcpy(char *buf, json_t *val, const char *key)
{
strcpy(buf, json_string_value(json_object_get(val, key)) ? : "");
strcpy(buf, json_string_value(json_object_get(val, key)));
}
static inline void json_dblcpy(double *dbl, json_t *val, const char *key)
@ -518,21 +503,15 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun
int wait_close(int sockd, int timeout);
int wait_read_select(int sockd, float timeout);
int read_length(int sockd, void *buf, int len);
char *_recv_unix(int sockd, uint32_t *msglen, int timeout1, int timeout2, const char *file,
const char *func, const int line);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);
#define RECV_UNIX_TIMEOUT1 30
#define RECV_UNIX_TIMEOUT2 5
#define recv_unix(sockd, msglen) _recv_unix(sockd, msglen, UNIX_READ_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__)
#define recv_unix_msg(sockd) _recv_unix_msg(sockd, UNIX_READ_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__)
int wait_write_select(int sockd, float timeout);
#define write_length(sockd, buf, len) _write_length(sockd, buf, len, __FILE__, __func__, __LINE__)
int _write_length(int sockd, const void *buf, int len, const char *file, const char *func, const int line);
bool _send_unix(int sockd, const char *buf, uint32_t len, int timeout, const char *file,
const char *func, const int line);
#define send_unix(sockd, buf, len) _send_unix(sockd, buf, len, UNIX_WRITE_TIMEOUT, __FILE__, __func__, __LINE__)
bool _send_unix_msg(int sockd, const char *buf, int timeout, const char *file, const char *func, const int line);
#define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, UNIX_WRITE_TIMEOUT, __FILE__, __func__, __LINE__)
bool _send_unix_data(int sockd, const struct msghdr *msg, const char *file, const char *func, const int line);

140
src/stratifier.c

@ -154,10 +154,6 @@ typedef struct json_params json_params_t;
struct smsg {
json_t *json_msg;
int64_t client_id;
/* bkey data if any */
char *bkey;
uint32_t bkeylen;
};
typedef struct smsg smsg_t;
@ -313,7 +309,6 @@ struct stratum_instance {
int subproxyid; /* Which subproxy */
bool remote; /* Is this a trusted remote server */
bool bkey; /* Does this client accept bkeys */
};
struct share {
@ -869,34 +864,31 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
ck_rlock(&sdata->instance_lock);
if (sdata->node_instances) {
json_t *wb_val = json_object();
char *bkey = bkey_object();
uint32_t bkeylen;
json_set_int(wb_val, "jobid", wb->id);
bkey_add_hex(bkey, "target", wb->target);
json_set_string(wb_val, "target", wb->target);
json_set_double(wb_val, "diff", wb->diff);
json_set_int(wb_val, "version", wb->version);
json_set_int(wb_val, "curtime", wb->curtime);
bkey_add_hex(bkey, "prevhash", wb->prevhash);
bkey_add_hex(bkey, "ntime", wb->ntime);
bkey_add_hex(bkey, "bbversion", wb->bbversion);
bkey_add_hex(bkey, "nbit", wb->nbit);
json_set_string(wb_val, "prevhash", wb->prevhash);
json_set_string(wb_val, "ntime", wb->ntime);
json_set_string(wb_val, "bbversion", wb->bbversion);
json_set_string(wb_val, "nbit", wb->nbit);
json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue);
json_set_int(wb_val, "height", wb->height);
json_set_string(wb_val, "flags", wb->flags);
json_set_int(wb_val, "transactions", wb->transactions);
if (likely(wb->transactions))
bkey_add_hex(bkey, "txn_data", wb->txn_data);
json_set_string(wb_val, "txn_data", wb->txn_data);
/* We don't need txn_hashes */
json_set_int(wb_val, "merkles", wb->merkles);
json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array));
bkey_add_hex(bkey, "coinb1", wb->coinb1);
json_set_string(wb_val, "coinb1", wb->coinb1);
json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen);
json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen);
json_set_int(wb_val, "coinb1len", wb->coinb1len);
json_set_int(wb_val, "coinb2len", wb->coinb2len);
bkey_add_hex(bkey, "coinb2", wb->coinb2);
bkeylen = bkey_len(bkey);
json_set_string(wb_val, "coinb2", wb->coinb2);
DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg;
@ -908,15 +900,11 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
msg->bkey = ckalloc(bkeylen);
memcpy(msg->bkey, bkey, bkeylen);
msg->bkeylen = bkeylen;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(wb_val);
free(bkey);
}
ck_runlock(&sdata->instance_lock);
@ -1391,25 +1379,17 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
}
}
static void upstream_blocksubmit(ckpool_t *ckp, const char *hash, const char *data)
static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block)
{
char *buf, *bkey = bkey_object();
uint32_t bkeylen, len;
char *buf;
bkey_add_hex(bkey, "hash", hash);
bkey_add_hex(bkey, "data", data);
bkeylen = bkey_len(bkey);
ASPRINTF(&buf, "upstream={\"method\":\"submitblock\"}");
len = strlen(buf);
buf = realloc(buf, round_up_page(len + 1 + bkeylen));
memcpy(buf + len, bkey, bkeylen);
free(bkey);
send_proc_data(ckp->connector, buf, len + bkeylen);
ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n",
gbt_block);
send_proc(ckp->connector, buf);
free(buf);
}
static void downstream_blocksubmits(ckpool_t *ckp, const char *hash, const char *data,
const stratum_instance_t *source)
static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const stratum_instance_t *source)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->data;
@ -1419,14 +1399,10 @@ static void downstream_blocksubmits(ckpool_t *ckp, const char *hash, const char
ck_rlock(&sdata->instance_lock);
if (sdata->remote_instances) {
json_t *val = json_object();
char *bkey = bkey_object();
uint32_t bkeylen;
JSON_CPACK(val, "{ss}", "method", "submitblock");
bkey_add_hex(bkey, "hash", hash);
bkey_add_hex(bkey, "data", data);
bkeylen = bkey_len(bkey);
JSON_CPACK(val, "{ss,ss}",
"method", "submitblock",
"submitblock", gbt_block);
DL_FOREACH(sdata->remote_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
@ -1439,15 +1415,11 @@ static void downstream_blocksubmits(ckpool_t *ckp, const char *hash, const char
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
msg->bkey = ckalloc(bkeylen);
memcpy(msg->bkey, bkey, bkeylen);
msg->bkeylen = bkeylen;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(val);
free(bkey);
}
ck_runlock(&sdata->instance_lock);
@ -1494,9 +1466,9 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i
realloc_strcat(&gbt_block, wb->txn_data);
send_generator(ckp, gbt_block, GEN_PRIORITY);
if (ckp->remote)
upstream_blocksubmit(ckp, blockhash, gbt_block + 12 + 64 + 1);
upstream_blocksubmit(ckp, gbt_block);
else
downstream_blocksubmits(ckp, blockhash, gbt_block + 12 + 64 + 1, NULL);
downstream_blocksubmits(ckp, gbt_block, NULL);
free(gbt_block);
}
@ -5644,28 +5616,12 @@ static void add_remote_server(sdata_t *sdata, stratum_instance_t *client)
ck_wunlock(&sdata->instance_lock);
}
static void set_client_bkey(ckpool_t *ckp, stratum_instance_t *client, const int64_t client_id,
const json_t *val, char *buf)
{
json_t *bkey_val = json_object_get(val, "bkey");
if (bkey_val) {
client->bkey = json_is_true(bkey_val);
if (client->bkey) {
snprintf(buf, 255, "bkeyclient=%"PRId64, client_id);
send_proc(ckp->connector, buf);
}
}
}
/* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, const json_t *val, json_t *id_val,
json_t *method_val, json_t *params_val)
const int64_t client_id, json_t *id_val, json_t *method_val,
json_t *params_val)
{
const char *method;
char buf[256];
/* Random broken clients send something not an integer as the id so we
* copy the json item for id_val as is for the response. By far the
@ -5685,7 +5641,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
}
if (cmdmatch(method, "mining.subscribe")) {
json_t *sval, *result_val;
json_t *val, *result_val;
if (unlikely(client->subscribed)) {
LOGNOTICE("Client %"PRId64" %s trying to subscribe twice",
@ -5698,17 +5654,19 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
LOGWARNING("parse_subscribe returned NULL result_val");
return;
}
sval = json_object();
json_object_set_new_nocheck(sval, "result", result_val);
json_object_set_nocheck(sval, "id", id_val);
json_object_set_new_nocheck(sval, "error", json_null());
stratum_add_send(sdata, sval, client_id, SM_SUBSCRIBERESULT);
val = json_object();
json_object_set_new_nocheck(val, "result", result_val);
json_object_set_nocheck(val, "id", id_val);
json_object_set_new_nocheck(val, "error", json_null());
stratum_add_send(sdata, val, client_id, SM_SUBSCRIBERESULT);
if (likely(client->subscribed))
init_client(sdata, client, client_id);
return;
}
if (unlikely(cmdmatch(method, "mining.remote"))) {
char buf[256];
/* Add this client as a trusted remote node in the connector and
* drop the client in the stratifier */
if (!ckp->trusted[client->server] || ckp->proxy) {
@ -5716,7 +5674,6 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
client_id, client->address, client->server);
connector_drop_client(ckp, client_id);
} else {
set_client_bkey(ckp, client, client_id, val, buf);
add_remote_server(sdata, client);
snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf);
@ -5725,6 +5682,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
}
if (unlikely(cmdmatch(method, "mining.node"))) {
char buf[256];
/* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */
if (!ckp->nodeserver[client->server] || ckp->proxy) {
@ -5733,7 +5692,6 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
connector_drop_client(ckp, client_id);
drop_client(ckp, sdata, client_id);
} else {
set_client_bkey(ckp, client, client_id, val, buf);
add_mining_node(ckp, sdata, client);
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
@ -5742,6 +5700,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
}
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
char buf[256];
if (ckp->proxy) {
LOGNOTICE("Dropping client %"PRId64" %s trying to connect as passthrough on proxy server %d",
client_id, client->address, client->server);
@ -5752,7 +5712,6 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */
set_client_bkey(ckp, client, client_id, val, buf);
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
@ -6001,23 +5960,18 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf)
static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf,
const stratum_instance_t *client)
{
json_t *hash_val, *data_val;
const char *hash, *data;
char *gbt_block;
json_t *submitblock_val;
const char *gbt_block;
hash_val = json_object_get(val, "hash");
hash = json_string_value(hash_val);
data_val = json_object_get(val, "data");
data = json_string_value(data_val);
if (unlikely(!hash || !data)) {
LOGWARNING("Failed to extract hash and data from remote submitblock msg %s", buf);
submitblock_val = json_object_get(val, "submitblock");
gbt_block = json_string_value(submitblock_val);
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to get submitblock data from remote message %s", buf);
return;
}
ASPRINTF(&gbt_block, "submitblock:%s,%s", hash, data);
LOGWARNING("Submitting possible downstream block!");
send_generator(ckp, gbt_block, GEN_PRIORITY);
downstream_blocksubmits(ckp, hash, data, client);
free(gbt_block);
downstream_blocksubmits(ckp, gbt_block, client);
}
static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)
@ -6203,7 +6157,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
if (!(++delays % 50))
LOGWARNING("%d Second delay waiting for bitcoind at startup", delays / 10);
}
parse_method(ckp, sdata, client, client_id, val, id_val, method, params);
parse_method(ckp, sdata, client, client_id, id_val, method, params);
}
static void srecv_process(ckpool_t *ckp, char *buf)
@ -6216,7 +6170,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
json_t *val;
int server;
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
val = json_loads(buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Received unrecognised non-json message: %s", buf);
goto out;
@ -6301,15 +6255,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, JSON_COMPACT);
if (unlikely(msg->bkeylen)) {
int len = strlen(s);
s = realloc(s, len + msg->bkeylen);
memcpy(s + len, msg->bkey, msg->bkeylen);
free(msg->bkey);
send_proc_data(ckp->connector, s, len + msg->bkeylen);
} else
send_proc(ckp->connector, s);
send_proc(ckp->connector, s);
free(s);
free_smsg(msg);
}

Loading…
Cancel
Save