From d9e5cf03f15f32eb8601672aeb483274405deb24 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 20 Dec 2016 16:19:53 +1100 Subject: [PATCH] Send transactions up and downstream in trusted mode to be submitted to the local btcd. --- src/connector.c | 8 +++-- src/stratifier.c | 94 ++++++++++++++++++++++++++++++++++++++---------- src/stratifier.h | 1 + 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/connector.c b/src/connector.c index 5ade5b5c..6e3451d4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1186,14 +1186,18 @@ static void *urecv_process(void *arg) LOGWARNING("Failed to find method from upstream pool json %s", cs->buf); json_decref(val); - goto nomsg; + goto decref; } - if (!safecmp(method, "submitblock")) + if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) + parse_remote_txns(ckp, val); + else if (!safecmp(method, "submitblock")) parse_remote_submitblock(ckp, val, cs->buf); else if (!safecmp(method, "pong")) LOGDEBUG("Received upstream pong"); else LOGWARNING("Unrecognised upstream method %s", method); +decref: + json_decref(val); nomsg: cksem_post(&cs->sem); diff --git a/src/stratifier.c b/src/stratifier.c index d846ede0..38470e4d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1168,18 +1168,30 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char HASH_ADD_STR(*txns, hash, txn); } -static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) +static void upstream_txns(ckpool_t *ckp, const json_t *txn_val) +{ + json_t *json_msg = json_deep_copy(txn_val); + char *buf; + + json_set_string(json_msg, "method", stratum_msgs[SM_TRANSACTIONS]); + ASPRINTF(&buf, "upstream=%s", json_dumps(json_msg, JSON_EOL)); + json_decref(json_msg); + send_proc(ckp->connector, buf); + free(buf); +} + +static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *txn_val) { stratum_instance_t *client; ckmsg_t *bulk_send = NULL; + ckmsg_t *client_msg; int messages = 0; + json_t *json_msg; + smsg_t *msg; ck_rlock(&sdata->instance_lock); DL_FOREACH(sdata->node_instances, client) { - ckmsg_t *client_msg; - smsg_t *msg; - json_t *json_msg = json_deep_copy(txn_val); - + json_msg = json_deep_copy(txn_val); json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]); client_msg = ckalloc(sizeof(ckmsg_t)); msg = ckzalloc(sizeof(smsg_t)); @@ -1189,8 +1201,22 @@ static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) DL_APPEND(bulk_send, client_msg); messages++; } + DL_FOREACH(sdata->remote_instances, client) { + json_msg = json_deep_copy(txn_val); + json_set_string(json_msg, "method", stratum_msgs[SM_TRANSACTIONS]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } ck_runlock(&sdata->instance_lock); + if (ckp->remote) + upstream_txns(ckp, txn_val); + if (bulk_send) { LOGINFO("Sending transactions to mining nodes"); ssend_bulk_append(sdata, bulk_send, messages); @@ -1310,7 +1336,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t ck_wunlock(&sdata->workbase_lock); JSON_CPACK(val, "{so}", "transaction", txn_array); - send_node_transactions(sdata, val); + send_node_transactions(ckp, sdata, val); json_decref(val); if (added || purged) @@ -6356,6 +6382,18 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna return user; } +/* Submit the transactions in node/remote mode so the local btcd has all the + * transactions that will go into the next blocksolve. */ +static void submit_transaction(ckpool_t *ckp, const char *hash) +{ + char *buf; + + if (unlikely(!ckp->generator_ready)) + return; + ASPRINTF(&buf, "submittxn:%s", hash); + send_generator(ckp, buf, GEN_LAX); + free(buf); +} static void parse_remote_shares(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) { @@ -6401,6 +6439,35 @@ static void parse_remote_shares(ckpool_t *ckp, sdata_t *sdata, json_t *val, cons LOGINFO("Added %"PRId64" remote shares to worker %s", diff, workername); } +/* In remote mode we simply submit them to our bitcoind to guarantee fast + * block change and compact block propagation between pools. */ +void parse_remote_txns(ckpool_t *ckp, const json_t *val) +{ + json_t *txn_array, *txn_val, *data_val; + int i, arr_size; + int added = 0; + + txn_array = json_object_get(val, "transaction"); + arr_size = json_array_size(txn_array); + + for (i = 0; i < arr_size; i++) { + const char *data; + + txn_val = json_array_get(txn_array, i); + data_val = json_object_get(txn_val, "data"); + data = json_string_value(data_val); + if (unlikely(!data)) { + LOGERR("Failed to get hash/data in parse_remote_txns"); + continue; + } + submit_transaction(ckp, data); + added++; + } + + if (added) + LOGNOTICE("Submitted %d remote transactions", added); +} + /* Get the remote worker count once per minute from all the remote servers */ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf) { @@ -6491,6 +6558,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu } if (likely(!safecmp(method, "shares"))) parse_remote_shares(ckp, sdata, val, buf); + else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) + parse_remote_txns(ckp, val); else if (!safecmp(method, "workers")) parse_remote_workers(sdata, val, buf); else if (!safecmp(method, "submitblock")) @@ -6505,19 +6574,6 @@ out: free(buf); } -/* Submit the transactions in node mode so the local btcd has all the - * transactions that will go into the next blocksolve. */ -static void submit_transaction(ckpool_t *ckp, const char *hash) -{ - char *buf; - - if (unlikely(!ckp->generator_ready)) - return; - ASPRINTF(&buf, "submittxn:%s", hash); - send_generator(ckp, buf, GEN_LAX); - free(buf); -} - static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val) { json_t *txn_array, *txn_val, *data_val, *hash_val; diff --git a/src/stratifier.h b/src/stratifier.h index 5bdf9fda..dab06747 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,6 +10,7 @@ #ifndef STRATIFIER_H #define STRATIFIER_H +void parse_remote_txns(ckpool_t *ckp, const json_t *val); void stratifier_add_recv(ckpool_t *ckp, json_t *val); void *stratifier(void *arg);