diff --git a/src/stratifier.c b/src/stratifier.c index edd8b5b9..68603b72 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -472,6 +472,7 @@ struct stratifier_data { stratum_instance_t *stratum_instances; stratum_instance_t *recycled_instances; stratum_instance_t *node_instances; + stratum_instance_t *remote_instances; int stratum_generated; int disconnected_generated; @@ -1378,6 +1379,56 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } +static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block) +{ + char *buf; + + ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n", + gbt_block); + send_proc(ckp->connector, buf); + free(buf); +} + +static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const stratum_instance_t *source) +{ + stratum_instance_t *client; + sdata_t *sdata = ckp->data; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + if (sdata->remote_instances) { + json_t *val = json_object(); + + JSON_CPACK(val, "{ss,ss}", + "method", "submitblock", + "submitblock", gbt_block); + DL_FOREACH(sdata->remote_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg; + + if (client == source) + continue; + json_msg = json_copy(val); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + json_decref(val); + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGNOTICE("Sending submitblock to downstream servers"); + ssend_bulk_prepend(sdata, bulk_send, messages); + } +} + static void process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) @@ -1414,9 +1465,11 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i if (wb->transactions) realloc_strcat(&gbt_block, wb->txn_data); send_generator(ckp, gbt_block, GEN_PRIORITY); + if (ckp->remote) + upstream_blocksubmit(ckp, gbt_block); + else + downstream_blocksubmits(ckp, gbt_block, NULL); free(gbt_block); - - } static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) @@ -2444,6 +2497,8 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (unlikely(client->node)) DL_DELETE(sdata->node_instances, client); + if (unlikely(client->remote)) + DL_DELETE(sdata->remote_instances, client); if (client->workername) { if (user) { ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", @@ -2602,11 +2657,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type) return; } - if (ckp->node) { - json_decref(val); - return; - } - /* Use this locking as an opportunity to test other clients. */ ck_rlock(&ckp_sdata->instance_lock); HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) { @@ -2892,8 +2942,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) } mutex_unlock(&sdata->block_lock); - if (unlikely(!found)) { - LOGERR("Failed to find blockhash %s in block_solve!", blockhash); + if (!found) { + LOGINFO("Failed to find blockhash %s in block_solve, possibly from downstream", + blockhash); return; } @@ -2971,8 +3022,9 @@ static void block_reject(sdata_t *sdata, const char *blockhash) } mutex_unlock(&sdata->block_lock); - if (unlikely(!found)) { - LOGERR("Failed to find blockhash %s in block_reject!", blockhash); + if (!found) { + LOGINFO("Failed to find blockhash %s in block_reject, possibly from downstream", + blockhash); return; } val = found->data; @@ -5539,9 +5591,8 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c { pthread_t pth; - client->node = true; - ck_wlock(&sdata->instance_lock); + client->node = true; DL_APPEND(sdata->node_instances, client); __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); @@ -5552,6 +5603,14 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c create_pthread(&pth, set_node_latency, client); } +static void add_remote_server(sdata_t *sdata, stratum_instance_t *client) +{ + ck_wlock(&sdata->instance_lock); + client->remote = true; + DL_APPEND(sdata->remote_instances, client); + ck_wunlock(&sdata->instance_lock); +} + /* Enter with client holding ref count */ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, @@ -5610,10 +5669,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie client_id, client->address, client->server); connector_drop_client(ckp, client_id); } else { + add_remote_server(sdata, client); snprintf(buf, 255, "remote=%"PRId64, client_id); send_proc(ckp->connector, buf); } - client->remote = true; return; } @@ -5893,6 +5952,23 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf) LOGDEBUG("Adding %d remote workers to user %s", workers, username); } +static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf, + const stratum_instance_t *client) +{ + json_t *submitblock_val; + const char *gbt_block; + + submitblock_val = json_object_get(val, "submitblock"); + gbt_block = json_string_value(submitblock_val); + if (unlikely(!gbt_block)) { + LOGWARNING("Failed to get submitblock data from remote message %s", buf); + return; + } + LOGWARNING("Submitting possible downstream block!"); + send_generator(ckp, gbt_block, GEN_PRIORITY); + downstream_blocksubmits(ckp, gbt_block, client); +} + static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) { json_t *workername_val = json_object_get(val, "workername"), @@ -5923,7 +5999,8 @@ static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) reset_bestshares(sdata); } -static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) +static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf, + stratum_instance_t *client) { json_t *method_val = json_object_get(val, "method"); const char *method; @@ -5938,6 +6015,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const parse_remote_shares(ckp, sdata, val, buf); else if (!safecmp(method, "workers")) parse_remote_workers(sdata, val, buf); + else if (!safecmp(method, "submitblock")) + parse_remote_blocksubmit(ckp, val, buf, client); else if (!safecmp(method, "block")) parse_remote_block(sdata, val, buf); else @@ -6135,7 +6214,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); if (client->remote) - parse_trusted_msg(ckp, sdata, msg->json_msg, buf); + parse_trusted_msg(ckp, sdata, msg->json_msg, buf, client); else if (ckp->node) node_client_msg(ckp, msg->json_msg, buf, client); else