diff --git a/src/bitcoin.c b/src/bitcoin.c index e8283ee2..f369b78c 100644 --- a/src/bitcoin.c +++ b/src/bitcoin.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -355,13 +355,15 @@ out: return ret; } -void submit_txn(connsock_t *cs, char *params) +void submit_txn(connsock_t *cs, const char *params) { char *rpc_req; int len; - if (unlikely(!cs->alive)) + if (unlikely(!cs->alive)) { + LOGDEBUG("Failed to submit_txn due to connsock dead"); return; + } len = strlen(params) + 64; rpc_req = ckalloc(len); @@ -369,3 +371,34 @@ void submit_txn(connsock_t *cs, char *params) json_rpc_msg(cs, rpc_req); dealloc(rpc_req); } + +char *get_txn(connsock_t *cs, const char *hash) +{ + char *rpc_req, *ret = NULL; + json_t *val, *res_val; + + if (unlikely(!cs->alive)) { + LOGDEBUG("Failed to get_txn due to connsock dead"); + goto out; + } + + ASPRINTF(&rpc_req, "{\"method\": \"getrawtransaction\", \"params\": [\"%s\"]}\n", hash); + val = json_rpc_call(cs, rpc_req); + dealloc(rpc_req); + if (unlikely(!val)) { + LOGWARNING("%s:%s Failed to get valid json response to get_txn", cs->url, cs->port); + goto out; + } + res_val = json_object_get(val, "result"); + if (unlikely(!res_val)) { + LOGWARNING("Failed to get result in json response to getrawtransaction"); + goto out; + } + if (!json_is_null(res_val) && json_is_string(res_val)) { + ret = strdup(json_string_value(res_val)); + LOGDEBUG("get_txn for hash %s got data %s", hash, ret); + } else + LOGDEBUG("get_txn did not retrieve data for hash %s", hash); +out: + return ret; +} diff --git a/src/bitcoin.h b/src/bitcoin.h index 49e0ba82..e1672d59 100644 --- a/src/bitcoin.h +++ b/src/bitcoin.h @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -39,6 +39,7 @@ int get_blockcount(connsock_t *cs); bool get_blockhash(connsock_t *cs, int height, char *hash); bool get_bestblockhash(connsock_t *cs, char *hash); bool submit_block(connsock_t *cs, char *params); -void submit_txn(connsock_t *cs, char *params); +void submit_txn(connsock_t *cs, const char *params); +char *get_txn(connsock_t *cs, const char *hash); #endif /* BITCOIN_H */ diff --git a/src/ckpool.h b/src/ckpool.h index fc1fae5a..7d152fe6 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -296,6 +296,7 @@ enum stratum_msgtype { SM_TRANSACTIONS, SM_SHAREERR, SM_WORKERSTATS, + SM_REQTXNS, SM_NONE }; @@ -321,6 +322,7 @@ static const char __maybe_unused *stratum_msgs[] = { "transactions", "shareerr", "workerstats", + "reqtxns", "" }; diff --git a/src/generator.c b/src/generator.c index 6df23ef5..19483c60 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -884,6 +884,24 @@ out: return ret; } +char *generator_get_txn(ckpool_t *ckp, const char *hash) +{ + gdata_t *gdata = ckp->gdata; + server_instance_t *si; + char *ret = NULL; + connsock_t *cs; + + si = gdata->current_si; + if (unlikely(!si)) { + LOGWARNING("No live current server in generator_checkaddr"); + goto out; + } + cs = &si->cs; + ret = get_txn(cs, hash); +out: + return ret; +} + static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val) { const char *prev_hash, *bbversion, *nbit, *ntime; diff --git a/src/generator.h b/src/generator.h index 498fab98..dede0654 100644 --- a/src/generator.h +++ b/src/generator.h @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -20,6 +20,7 @@ void generator_add_send(ckpool_t *ckp, json_t *val); json_t *generator_genbase(ckpool_t *ckp); int generator_getbest(ckpool_t *ckp, char *hash); bool generator_checkaddr(ckpool_t *ckp, const char *addr); +char *generator_get_txn(ckpool_t *ckp, const char *hash); void generator_submitblock(ckpool_t *ckp, char *buf); void *generator(void *arg); diff --git a/src/stratifier.c b/src/stratifier.c index 80c8ab80..26207f55 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1512,19 +1512,115 @@ out: free(prio); } +#define SSEND_PREPEND 0 +#define SSEND_APPEND 1 +#define SSEND_POSTPONE 2 + +/* Downstream a json message to all remote servers except for the one matching + * client_id */ +static void downstream_json(sdata_t *sdata, const json_t *val, const int64_t client_id, + const int prio) +{ + stratum_instance_t *client; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + DL_FOREACH(sdata->remote_instances, client) { + ckmsg_t *client_msg; + json_t *json_msg; + smsg_t *msg; + + /* Don't send remote workinfo back to same remote */ + if (client->id == client_id) + continue; + json_msg = json_deep_copy(val); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGINFO("Sending json to %d remote servers", messages); + switch (prio) { + case SSEND_PREPEND: + ssend_bulk_prepend(sdata, bulk_send, messages); + break; + case SSEND_APPEND: + ssend_bulk_append(sdata, bulk_send, messages); + break; + case SSEND_POSTPONE: + ssend_bulk_postpone(sdata, bulk_send, messages); + break; + } + } +} + +/* Find any transactions that are missing from our transaction table during + * rebuild_txns by requesting their data from our bitcoind or requesting them + * from another server. */ +static void request_txns(ckpool_t *ckp, sdata_t *sdata, json_t *txns) +{ + json_t *val, *arr_val; + size_t index; + + /* See if our bitcoind has the transactions first to avoid requesting + * them from another server. */ + json_array_foreach(txns, index, arr_val) { + const char *hash = json_string_value(arr_val); + char *data = generator_get_txn(ckp, hash); + txntable_t *txn; + + if (!data) + continue; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_STR(sdata->txns, hash, txn); + if (likely(!txn)) { + txn = ckzalloc(sizeof(txntable_t)); + memcpy(txn->hash, hash, 65); + txn->data = data; + txn->refcount = 100; + HASH_ADD_STR(sdata->txns, hash, txn); + } else + free(data); + ck_wunlock(&sdata->workbase_lock); + + /* We're removing this object so decrement index for next pass */ + json_array_remove(txns, index); + index--; + } + + JSON_CPACK(val, "{so}", "hash", txns); + if (ckp->remote) + upstream_msgtype(ckp, val, SM_REQTXNS); + else if (ckp->node) { + /* Nodes have no way to signal upstream pool yet */ + } else { + /* We don't know which remote sent the transaction hash so ask + * all of them for it */ + json_set_string(val, "method", stratum_msgs[SM_REQTXNS]); + downstream_json(sdata, val, 0, SSEND_POSTPONE); + } +} + /* Rebuilds transactions from txnhashes to be able to construct wb_merkle_bins */ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txnhashes) { const char *hashes = json_string_value(txnhashes); - json_t *txn_array; + json_t *txn_array, *missing_txns; + char hash[68] = {}; bool ret = false; txntable_t *txn; - int i, len; + int i, len = 0; if (likely(hashes)) len = strlen(hashes); - else - len = 0; if (!hashes || !len) return ret; @@ -1534,22 +1630,20 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t * } ret = true; txn_array = json_array(); + missing_txns = json_array(); ck_rlock(&sdata->workbase_lock); for (i = 0; i < wb->txns; i++) { json_t *txn_val; - char hash[68]; memcpy(hash, hashes + i * 65, 64); - hash[64] = '\0'; HASH_FIND_STR(sdata->txns, hash, txn); if (unlikely(!txn)) { + txn_val = json_string(hash); + json_array_append_new(missing_txns, txn_val); ret = false; continue; } - /* This is unnecessary most of the time since it will be set - * in rebuild_txns as well but helps in the case we can't call - * rebuild_txns due to missing some txns */ txn->refcount = 100; txn->seen = true; JSON_CPACK(txn_val, "{ss,ss}", @@ -1561,9 +1655,13 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t * if (ret) { LOGINFO("Rebuilt txns into workbase with %d transactions", (int)i); wb_merkle_bins(ckp, sdata, wb, txn_array, false); - } else + } else { LOGNOTICE("Failed to find all txns in rebuild_txns"); + request_txns(ckp, sdata, missing_txns); + } + json_decref(txn_array); + json_decref(missing_txns); return ret; } @@ -5619,38 +5717,6 @@ static void add_remote_blockdata(ckpool_t *ckp, json_t *val, const int cblen, co free(buf); } -static void downstream_blockdata(sdata_t *sdata, const json_t *val, int64_t client_id) -{ - stratum_instance_t *client; - ckmsg_t *bulk_send = NULL; - int messages = 0; - - ck_rlock(&sdata->instance_lock); - DL_FOREACH(sdata->remote_instances, client) { - ckmsg_t *client_msg; - json_t *json_msg; - smsg_t *msg; - - /* Don't send remote workinfo back to same remote */ - if (client->id == client_id) - continue; - json_msg = json_deep_copy(val); - client_msg = ckalloc(sizeof(ckmsg_t)); - msg = ckzalloc(sizeof(smsg_t)); - msg->json_msg = json_msg; - msg->client_id = client->id; - client_msg->data = msg; - DL_APPEND(bulk_send, client_msg); - messages++; - } - ck_runlock(&sdata->instance_lock); - - if (bulk_send) { - LOGINFO("Sending block to %d remote servers", messages); - ssend_bulk_postpone(sdata, bulk_send, messages); - } -} - static void downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cblen, const char *coinbase, const uchar *data) @@ -5661,7 +5727,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl strip_fields(ckp, block_val); json_set_string(block_val, "method", stratum_msgs[SM_BLOCK]); add_remote_blockdata(ckp, block_val, cblen, coinbase, data); - downstream_blockdata(sdata, block_val, 0); + downstream_json(sdata, block_val, 0, SSEND_PREPEND); json_decref(block_val); } @@ -6887,7 +6953,7 @@ out_add: val = json_deep_copy(val); remap_workinfo_id(sdata, val); if (!ckp->remote) - downstream_blockdata(sdata, val, client_id); + downstream_json(sdata, val, client_id, SSEND_PREPEND); ckdbq_add(ckp, ID_BLOCK, val); }