diff --git a/src/ckpool.h b/src/ckpool.h index 0f00e926..12b35a83 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -277,6 +277,7 @@ enum stratum_msgtype { SM_WORKINFO, SM_SUGGESTDIFF, SM_BLOCK, + SM_PONG, SM_NONE }; @@ -298,6 +299,7 @@ static const char __maybe_unused *stratum_msgs[] = { "workinfo", "suggestdiff", "block", + "pong", "" }; diff --git a/src/connector.c b/src/connector.c index 7576a8d8..e97d4740 100644 --- a/src/connector.c +++ b/src/connector.c @@ -55,7 +55,7 @@ struct client_instance { /* Which serverurl is this instance connected to */ int server; - char buf[PAGESIZE]; + char *buf; unsigned long bufofs; /* Are we currently sending a blocked message from this client */ @@ -205,6 +205,10 @@ static client_instance_t *recruit_client(cdata_t *cdata) client = ckzalloc(sizeof(client_instance_t)); } else LOGDEBUG("Connector recycled client instance"); + + client->buf = realloc(client->buf, PAGESIZE); + client->buf[0] = '\0'; + return client; } @@ -457,20 +461,22 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { ckpool_t *ckp = cdata->ckp; - char msg[PAGESIZE], *eol; int buflen, ret; + char *msg, *eol; json_t *val; retry: if (unlikely(client->bufofs > MAX_MSGSIZE)) { - LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", - client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + if (!client->remote) { + LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", + client->id, client->fd); + invalidate_client(ckp, cdata, client); + return; + } + client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1)); } - buflen = PAGESIZE - client->bufofs; /* This read call is non-blocking since the socket is set to O_NOBLOCK */ - ret = read(client->fd, client->buf + client->bufofs, buflen); + ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE); if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; @@ -487,12 +493,13 @@ reparse: /* Do something useful with this message now */ buflen = eol - client->buf + 1; - if (unlikely(buflen > MAX_MSGSIZE)) { + if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); return; } + msg = alloca(round_up_page(buflen + 1)); memcpy(msg, client->buf, buflen); msg[buflen] = '\0'; client->bufofs -= buflen; @@ -1009,6 +1016,7 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs) bool res, ret = false; float timeout = 10; + cksem_wait(&cs->sem); cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port); @@ -1047,6 +1055,8 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs) LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port); ret = true; out: + cksem_post(&cs->sem); + return ret; } @@ -1078,10 +1088,92 @@ out: free(buf); } +static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf) +{ + const char *gbt_block = json_string_value(json_object_get(val, "submitblock")); + + if (unlikely(!gbt_block)) { + LOGWARNING("Failed to find submitblock data from upstream submitblock method %s", + buf); + return; + } + LOGWARNING("Submitting possible upstream block!"); + send_proc(ckp->generator, gbt_block); +} + +static void ping_upstream(cdata_t *cdata) +{ + char *buf; + + ASPRINTF(&buf, "{\"method\":\"ping\"}\n"); + ckmsgq_add(cdata->upstream_sends, buf); +} + +static void *urecv_process(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + cdata_t *cdata = ckp->data; + connsock_t *cs = &cdata->upstream_cs; + bool alive = true; + + ckp->proxy = true; + + rename_proc("ureceiver"); + + pthread_detach(pthread_self()); + + while (42) { + const char *method; + float timeout = 5; + json_t *val; + int ret; + + cksem_wait(&cs->sem); + ret = read_socket_line(cs, &timeout); + if (ret < 1) { + ping_upstream(cdata); + if (likely(!ret)) { + LOGDEBUG("No message from upstream pool"); + } else { + LOGNOTICE("Failed to read from upstream pool"); + alive = false; + } + goto nomsg; + } + alive = true; + val = json_loads(cs->buf, 0, NULL); + if (unlikely(!val)) { + LOGWARNING("Received non-json msg from upstream pool %s", + cs->buf); + goto nomsg; + } + method = json_string_value(json_object_get(val, "method")); + if (unlikely(!method)) { + LOGWARNING("Failed to find method from upstream pool json %s", + cs->buf); + json_decref(val); + goto nomsg; + } + if (!safecmp(method, "submitblock")) + parse_remote_submitblock(ckp, val, cs->buf); + else if (!safecmp(method, "pong")) + LOGDEBUG("Received upstream pong"); + else + LOGWARNING("Unrecognised upstream method %s", method); +nomsg: + cksem_post(&cs->sem); + + if (!alive) + sleep(5); + } + return NULL; +} + static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) { connsock_t *cs = &cdata->upstream_cs; bool ret = false; + pthread_t pth; cs->ckp = ckp; if (!ckp->upstream) { @@ -1093,11 +1185,14 @@ static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) goto out; } + cksem_init(&cs->sem); + cksem_post(&cs->sem); /* Must succeed on initial connect to upstream pool */ if (!connect_upstream(ckp, cs)) { LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port); goto out; } + create_pthread(&pth, urecv_process, ckp); cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process); ret = true; out: @@ -1139,7 +1234,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) client_id = id & 0xffffffffll; /* We have a direct connection to the passthrough's connector so we * can send it any regular commands. */ - ASPRINTF(&msg, "dropclient=%"PRId64, client_id); + ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id); send_client(cdata, id, msg); } diff --git a/src/stratifier.c b/src/stratifier.c index edd8b5b9..ae482593 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -472,6 +472,7 @@ struct stratifier_data { stratum_instance_t *stratum_instances; stratum_instance_t *recycled_instances; stratum_instance_t *node_instances; + stratum_instance_t *remote_instances; int stratum_generated; int disconnected_generated; @@ -1378,6 +1379,56 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } +static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block) +{ + char *buf; + + ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n", + gbt_block); + send_proc(ckp->connector, buf); + free(buf); +} + +static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const stratum_instance_t *source) +{ + stratum_instance_t *client; + sdata_t *sdata = ckp->data; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + if (sdata->remote_instances) { + json_t *val = json_object(); + + JSON_CPACK(val, "{ss,ss}", + "method", "submitblock", + "submitblock", gbt_block); + DL_FOREACH(sdata->remote_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg; + + if (client == source) + continue; + json_msg = json_copy(val); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + json_decref(val); + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGNOTICE("Sending submitblock to downstream servers"); + ssend_bulk_prepend(sdata, bulk_send, messages); + } +} + static void process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) @@ -1414,9 +1465,11 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i if (wb->transactions) realloc_strcat(&gbt_block, wb->txn_data); send_generator(ckp, gbt_block, GEN_PRIORITY); + if (ckp->remote) + upstream_blocksubmit(ckp, gbt_block); + else + downstream_blocksubmits(ckp, gbt_block, NULL); free(gbt_block); - - } static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) @@ -2444,6 +2497,8 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (unlikely(client->node)) DL_DELETE(sdata->node_instances, client); + if (unlikely(client->remote)) + DL_DELETE(sdata->remote_instances, client); if (client->workername) { if (user) { ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", @@ -2602,11 +2657,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type) return; } - if (ckp->node) { - json_decref(val); - return; - } - /* Use this locking as an opportunity to test other clients. */ ck_rlock(&ckp_sdata->instance_lock); HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) { @@ -2892,8 +2942,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) } mutex_unlock(&sdata->block_lock); - if (unlikely(!found)) { - LOGERR("Failed to find blockhash %s in block_solve!", blockhash); + if (!found) { + LOGINFO("Failed to find blockhash %s in block_solve, possibly from downstream", + blockhash); return; } @@ -2971,8 +3022,9 @@ static void block_reject(sdata_t *sdata, const char *blockhash) } mutex_unlock(&sdata->block_lock); - if (unlikely(!found)) { - LOGERR("Failed to find blockhash %s in block_reject!", blockhash); + if (!found) { + LOGINFO("Failed to find blockhash %s in block_reject, possibly from downstream", + blockhash); return; } val = found->data; @@ -5539,9 +5591,8 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c { pthread_t pth; - client->node = true; - ck_wlock(&sdata->instance_lock); + client->node = true; DL_APPEND(sdata->node_instances, client); __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); @@ -5552,6 +5603,14 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c create_pthread(&pth, set_node_latency, client); } +static void add_remote_server(sdata_t *sdata, stratum_instance_t *client) +{ + ck_wlock(&sdata->instance_lock); + client->remote = true; + DL_APPEND(sdata->remote_instances, client); + ck_wunlock(&sdata->instance_lock); +} + /* Enter with client holding ref count */ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, @@ -5610,10 +5669,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie client_id, client->address, client->server); connector_drop_client(ckp, client_id); } else { + add_remote_server(sdata, client); snprintf(buf, 255, "remote=%"PRId64, client_id); send_proc(ckp->connector, buf); } - client->remote = true; return; } @@ -5893,6 +5952,23 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf) LOGDEBUG("Adding %d remote workers to user %s", workers, username); } +static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf, + const stratum_instance_t *client) +{ + json_t *submitblock_val; + const char *gbt_block; + + submitblock_val = json_object_get(val, "submitblock"); + gbt_block = json_string_value(submitblock_val); + if (unlikely(!gbt_block)) { + LOGWARNING("Failed to get submitblock data from remote message %s", buf); + return; + } + LOGWARNING("Submitting possible downstream block!"); + send_generator(ckp, gbt_block, GEN_PRIORITY); + downstream_blocksubmits(ckp, gbt_block, client); +} + static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) { json_t *workername_val = json_object_get(val, "workername"), @@ -5923,7 +5999,16 @@ static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) reset_bestshares(sdata); } -static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) +static void send_remote_pong(sdata_t *sdata, stratum_instance_t *client) +{ + json_t *json_msg; + + JSON_CPACK(json_msg, "{ss}", "method", "pong"); + stratum_add_send(sdata, json_msg, client->id, SM_PONG); +} + +static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf, + stratum_instance_t *client) { json_t *method_val = json_object_get(val, "method"); const char *method; @@ -5938,8 +6023,12 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const parse_remote_shares(ckp, sdata, val, buf); else if (!safecmp(method, "workers")) parse_remote_workers(sdata, val, buf); + else if (!safecmp(method, "submitblock")) + parse_remote_blocksubmit(ckp, val, buf, client); else if (!safecmp(method, "block")) parse_remote_block(sdata, val, buf); + else if (!safecmp(method, "ping")) + send_remote_pong(sdata, client); else LOGWARNING("unrecognised trusted message %s", buf); } @@ -6135,7 +6224,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); if (client->remote) - parse_trusted_msg(ckp, sdata, msg->json_msg, buf); + parse_trusted_msg(ckp, sdata, msg->json_msg, buf, client); else if (ckp->node) node_client_msg(ckp, msg->json_msg, buf, client); else