From e227470015c99dfe8511798ecaed53f97c1b2dbb Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 3 Feb 2016 16:42:02 +1100 Subject: [PATCH 01/15] Build json merkle array and store merklebins --- src/stratifier.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 4fc6609a..c3a9ade5 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1210,12 +1210,11 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) wb->txn_hashes = ckzalloc(1); if (binleft > 1) { while (42) { - uchar merklebin[32]; - if (binleft == 1) break; - memcpy(merklebin, hashbin + 32, 32); - __bin2hex(&wb->merklehash[wb->merkles][0], merklebin, 32); + memcpy(&wb->merklebin[i][0], hashbin + 32, 32); + __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[i][0], 32); + json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[i][0])); LOGDEBUG("MH%d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); wb->merkles++; if (binleft % 2) { From 820db4e36727f858fc7e32f2e96ece06d9806568 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 3 Feb 2016 16:45:56 +1100 Subject: [PATCH 02/15] Write to correct merklebin offset --- src/stratifier.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c3a9ade5..eddd1d54 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1212,9 +1212,9 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) while (42) { if (binleft == 1) break; - memcpy(&wb->merklebin[i][0], hashbin + 32, 32); - __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[i][0], 32); - json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[i][0])); + memcpy(&wb->merklebin[wb->merkles][0], hashbin + 32, 32); + __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[wb->merkles][0], 32); + json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[wb->merkles][0])); LOGDEBUG("MH%d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); wb->merkles++; if (binleft % 2) { From c8e09a829cd640f338e5b0da47620b82ebdca0c3 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 3 Feb 2016 16:48:00 +1100 Subject: [PATCH 03/15] Initialise json merkle array --- src/stratifier.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stratifier.c b/src/stratifier.c index eddd1d54..94c67e80 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1208,6 +1208,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } } else wb->txn_hashes = ckzalloc(1); + wb->merkle_array = json_array(); if (binleft > 1) { while (42) { if (binleft == 1) From 55b52d034671ebf42c2f1b67694d570070293233 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 3 Feb 2016 16:50:53 +1100 Subject: [PATCH 04/15] Add to correct transaction table --- src/stratifier.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 94c67e80..ce32f6e4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1117,7 +1117,7 @@ static void broadcast_ping(sdata_t *sdata); /* Build a hashlist of all transactions, allowing us to compare with the list of * existing transactions to determine which need to be propagated */ -static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const char *data) +static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const char *data) { bool found = false; txntable_t *txn; @@ -1137,7 +1137,7 @@ static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const ch /* Note that data is pointing to a string in a json struture which will * be destroyed so we can only use the data value until then. */ txn->data = data; - HASH_ADD_STR(txns, hash, txn); + HASH_ADD_STR(*txns, hash, txn); } /* Distill down a set of transactions into an efficient tree arrangement for @@ -1180,7 +1180,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) arr_val = json_array_get(txn_array, i); hash = json_string_value(json_object_get(arr_val, "hash")); txn = json_string_value(json_object_get(arr_val, "data")); - add_txn(sdata, txns, hash, txn); + add_txn(sdata, &txns, hash, txn); len = strlen(txn); memcpy(wb->txn_data + ofs, txn, len); ofs += len; From 3c64adfb001e5d6b266da525ccc56151b74a561d Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 3 Feb 2016 16:52:53 +1100 Subject: [PATCH 05/15] Add correct string to txn table --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index ce32f6e4..e740c972 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1242,7 +1242,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) /* Propagate transaction here */ /* Move to the sdata transaction table */ HASH_DEL(txns, tmp); - HASH_ADD_STR(sdata->txns, data, tmp); + HASH_ADD_STR(sdata->txns, hash, tmp); /* Empty data once used to not dereference since the json structure * will be destroyed. */ tmp->data = NULL; From 7615369d4a823c5230a085d64f142057b22cd992 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 21:20:46 +1100 Subject: [PATCH 06/15] Send transactions to nodes --- src/ckpool.h | 2 ++ src/libckpool.c | 4 ++-- src/stratifier.c | 41 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/ckpool.h b/src/ckpool.h index 12b35a83..b5cda264 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -278,6 +278,7 @@ enum stratum_msgtype { SM_SUGGESTDIFF, SM_BLOCK, SM_PONG, + SM_TRANSACTIONS, SM_NONE }; @@ -300,6 +301,7 @@ static const char __maybe_unused *stratum_msgs[] = { "suggestdiff", "block", "pong", + "transactions", "" }; diff --git a/src/libckpool.c b/src/libckpool.c index 3888efc2..59b132c9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -966,7 +966,7 @@ int wait_read_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLIN | EPOLLRDHUP; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); @@ -1048,7 +1048,7 @@ int wait_write_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLOUT | EPOLLRDHUP ; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); diff --git a/src/stratifier.c b/src/stratifier.c index e740c972..b1648b47 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1140,13 +1140,42 @@ static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const c HASH_ADD_STR(*txns, hash, txn); } +static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) +{ + stratum_instance_t *client; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + DL_FOREACH(sdata->node_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(txn_val); + + json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGINFO("Sending transactions to mining nodes"); + ssend_bulk_append(sdata, bulk_send, messages); + } +} + /* Distill down a set of transactions into an efficient tree arrangement for * stratum messages and fast work assembly. */ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) { int i, j, binleft, binlen, added = 0, purged = 0; txntable_t *txns = NULL, *tmp, *tmpa; - json_t *arr_val; + json_t *arr_val, *val; uchar *hashbin; wb->txns = json_array_size(txn_array); @@ -1230,6 +1259,8 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } } + txn_array = json_array(); + ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->txns, tmp, tmpa) { if (tmp->refcount--) @@ -1239,7 +1270,11 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) purged++; } HASH_ITER(hh, txns, tmp, tmpa) { + json_t *txn_val; + /* Propagate transaction here */ + JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data); + json_array_append_new(txn_array, txn_val); /* Move to the sdata transaction table */ HASH_DEL(txns, tmp); HASH_ADD_STR(sdata->txns, hash, tmp); @@ -1250,6 +1285,10 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } ck_wunlock(&sdata->workbase_lock); + JSON_CPACK(val, "{so}", "transaction", txn_array); + send_node_transactions(sdata, val); + json_decref(val); + LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); } From 254b7ec5ca7d8bb0822898a4119ebe3246d5a7bb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 21:35:19 +1100 Subject: [PATCH 07/15] Don't drop nodes or trusted remotes for lack of being authorised --- src/stratifier.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index b1648b47..26dc8b90 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -6903,6 +6903,9 @@ static void *statsupdate(void *arg) continue; } + if (client->node || client->remote) + continue; + /* Test for clients that haven't authed in over a minute * and drop them lazily */ if (!client->authorised) { From 41a9788cc9d93cff85ba627ac13b945a1f42762f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 21:57:45 +1100 Subject: [PATCH 08/15] Add transactions when received by nodes to the txntable --- src/stratifier.c | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 26dc8b90..c46df4d4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1289,7 +1289,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) send_node_transactions(sdata, val); json_decref(val); - LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); + LOGINFO("Stratifier added %d transactions and purged %d", added, purged); } /* This function assumes it will only receive a valid json gbt base template @@ -6190,6 +6190,42 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const LOGWARNING("unrecognised trusted message %s", buf); } +static void add_node_txns(sdata_t *sdata, const json_t *val) +{ + json_t *txn_array, *txn_val, *data_val, *hash_val; + txntable_t *txn; + int added = 0; + size_t i; + + txn_array = json_object_get(val, "transaction"); + + ck_wlock(&sdata->workbase_lock); + json_array_foreach(txn_array, i, txn_val) { + const char *hash, *data; + + data_val = json_object_get(txn_val, "data"); + hash_val = json_object_get(txn_val, "hash"); + data = json_string_value(data_val); + hash = json_string_value(hash_val); + if (unlikely(!data || !hash)) { + LOGERR("Failed to get hash/data in add_node_txns"); + continue; + } + HASH_FIND_STR(sdata->txns, hash, txn); + if (txn) + continue; + txn = ckzalloc(sizeof(txntable_t)); + memcpy(txn->hash, hash, 65); + txn->data = strdup(data); + txn->refcount = 10; + HASH_ADD_STR(sdata->txns, hash, txn); + added++; + } + ck_wunlock(&sdata->workbase_lock); + + LOGINFO("Stratifier added %d node transactions", added); +} + /* Entered with client holding ref count */ static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client) { @@ -6250,6 +6286,9 @@ static void parse_node_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const cha } LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]); switch (msg_type) { + case SM_TRANSACTIONS: + add_node_txns(sdata, val); + break; case SM_WORKINFO: add_node_base(ckp, val); break; From 8e76983fa85582d8afd8630f9962f09b8ee1372f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 22:17:35 +1100 Subject: [PATCH 09/15] Send all transactions to new nodes, and set initial refcount on transactions to 10 --- src/stratifier.c | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c46df4d4..fe04e5e3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -381,7 +381,7 @@ struct txntable { UT_hash_handle hh; int id; char hash[68]; - const char *data; + char *data; int refcount; }; @@ -1129,14 +1129,14 @@ static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const c found = true; } ck_runlock(&sdata->workbase_lock); + if (found) return; txn = ckzalloc(sizeof(txntable_t)); memcpy(txn->hash, hash, 65); - /* Note that data is pointing to a string in a json struture which will - * be destroyed so we can only use the data value until then. */ - txn->data = data; + txn->data = strdup(data); + txn->refcount = 10; HASH_ADD_STR(*txns, hash, txn); } @@ -1266,6 +1266,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) if (tmp->refcount--) continue; HASH_DEL(sdata->txns, tmp); + dealloc(tmp->data); dealloc(tmp); purged++; } @@ -1278,9 +1279,6 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) /* Move to the sdata transaction table */ HASH_DEL(txns, tmp); HASH_ADD_STR(sdata->txns, hash, tmp); - /* Empty data once used to not dereference since the json structure - * will be destroyed. */ - tmp->data = NULL; added++; } ck_wunlock(&sdata->workbase_lock); @@ -5723,7 +5721,33 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const stratum_send_update(sdata, client_id, true); } -static void *set_node_latency(void *arg) +/* When a node first connects it has no transactions so we have to send all + * current ones to it. */ +static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client) +{ + json_t *txn_array, *val, *txn_val; + txntable_t *txn, *tmp; + smsg_t *msg; + + txn_array = json_array(); + + ck_rlock(&sdata->workbase_lock); + HASH_ITER(hh, sdata->txns, txn, tmp) { + JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data); + json_array_append_new(txn_array, txn_val); + } + ck_runlock(&sdata->workbase_lock); + + JSON_CPACK(val, "{ss,so}", "node.method", stratum_msgs[SM_TRANSACTIONS], + "transaction", txn_array); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = val; + msg->client_id = client->id; + ckmsgq_add(sdata->ssends, msg); + LOGNOTICE("Sending new node client %"PRId64" all transactions", client->id); +} + +static void *setup_node(void *arg) { stratum_instance_t *client = (stratum_instance_t *)arg; @@ -5732,6 +5756,8 @@ static void *set_node_latency(void *arg) client->latency = round_trip(client->address) / 2; LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id, client->address, client->latency); + sleep(5); + send_node_all_txns(client->sdata, client); dec_instance_ref(client->sdata, client); return NULL; } @@ -5753,7 +5779,7 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id, client->address, client->server, ckp->serverurl[client->server]); - create_pthread(&pth, set_node_latency, client); + create_pthread(&pth, setup_node, client); } static void add_remote_server(sdata_t *sdata, stratum_instance_t *client) @@ -6223,7 +6249,7 @@ static void add_node_txns(sdata_t *sdata, const json_t *val) } ck_wunlock(&sdata->workbase_lock); - LOGINFO("Stratifier added %d node transactions", added); + LOGNOTICE("Stratifier added %d node transactions", added); } /* Entered with client holding ref count */ From 0b7e3994b29858931a131cd2c88c7d9f9b4e9841 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 22:56:25 +1100 Subject: [PATCH 10/15] Rebuild workinfo from transaction hashes on nodes --- src/stratifier.c | 147 +++++++++++++++++++++++++++++++---------------- 1 file changed, 98 insertions(+), 49 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index fe04e5e3..5133fd1e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -868,57 +868,66 @@ static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_i static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) { + json_t *wb_val, *txn_array; stratum_instance_t *client; ckmsg_t *bulk_send = NULL; + txntable_t *txn, *tmp; int messages = 0; + txn_array = json_array(); + + ck_rlock(&sdata->workbase_lock); + HASH_ITER(hh, sdata->txns, txn, tmp) { + json_array_append_new(txn_array, json_string(txn->hash)); + } + ck_runlock(&sdata->workbase_lock); + + wb_val = json_object(); + ck_rlock(&sdata->instance_lock); - if (sdata->node_instances) { - json_t *wb_val = json_object(); - - json_set_int(wb_val, "jobid", wb->id); - json_set_string(wb_val, "target", wb->target); - json_set_double(wb_val, "diff", wb->diff); - json_set_int(wb_val, "version", wb->version); - json_set_int(wb_val, "curtime", wb->curtime); - json_set_string(wb_val, "prevhash", wb->prevhash); - json_set_string(wb_val, "ntime", wb->ntime); - json_set_string(wb_val, "bbversion", wb->bbversion); - json_set_string(wb_val, "nbit", wb->nbit); - json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); - json_set_int(wb_val, "height", wb->height); - json_set_string(wb_val, "flags", wb->flags); - json_set_int(wb_val, "txns", wb->txns); - if (likely(wb->txns)) - json_set_string(wb_val, "txn_data", wb->txn_data); - /* We don't need txn_hashes */ - json_set_int(wb_val, "merkles", wb->merkles); - json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); - json_set_string(wb_val, "coinb1", wb->coinb1); - json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); - json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); - json_set_int(wb_val, "coinb1len", wb->coinb1len); - json_set_int(wb_val, "coinb2len", wb->coinb2len); - json_set_string(wb_val, "coinb2", wb->coinb2); + json_set_int(wb_val, "jobid", wb->id); + json_set_string(wb_val, "target", wb->target); + json_set_double(wb_val, "diff", wb->diff); + json_set_int(wb_val, "version", wb->version); + json_set_int(wb_val, "curtime", wb->curtime); + json_set_string(wb_val, "prevhash", wb->prevhash); + json_set_string(wb_val, "ntime", wb->ntime); + json_set_string(wb_val, "bbversion", wb->bbversion); + json_set_string(wb_val, "nbit", wb->nbit); + json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); + json_set_int(wb_val, "height", wb->height); + json_set_string(wb_val, "flags", wb->flags); + /* Set to zero to be backwards compat with older node code */ + json_set_int(wb_val, "transactions", 0); + json_set_int(wb_val, "txns", wb->txns); + json_object_set_new_nocheck(wb_val, "txnhashes", txn_array); + json_set_int(wb_val, "merkles", wb->merkles); + json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); + json_set_string(wb_val, "coinb1", wb->coinb1); + json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); + json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); + json_set_int(wb_val, "coinb1len", wb->coinb1len); + json_set_int(wb_val, "coinb2len", wb->coinb2len); + json_set_string(wb_val, "coinb2", wb->coinb2); - DL_FOREACH(sdata->node_instances, client) { - ckmsg_t *client_msg; - smsg_t *msg; - json_t *json_msg = json_deep_copy(wb_val); + DL_FOREACH(sdata->node_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(wb_val); - json_set_string(json_msg, "node.method", stratum_msgs[SM_WORKINFO]); - client_msg = ckalloc(sizeof(ckmsg_t)); - msg = ckzalloc(sizeof(smsg_t)); - msg->json_msg = json_msg; - msg->client_id = client->id; - client_msg->data = msg; - DL_APPEND(bulk_send, client_msg); - messages++; - } - json_decref(wb_val); + json_set_string(json_msg, "node.method", stratum_msgs[SM_WORKINFO]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; } ck_runlock(&sdata->instance_lock); + json_decref(wb_val); + /* We send workinfo postponed till after the stratum updates are sent * out to minimise any lag seen by clients getting updates. It means * the remote node will know about the block change later which will @@ -1377,13 +1386,55 @@ out: return NULL; } +static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes) +{ + json_t *txn_array, *hash_val; + txntable_t *txn; + bool ret = true; + size_t i; + + txn_array = json_array(); + + ck_rlock(&sdata->workbase_lock); + json_array_foreach(txnhashes, i, hash_val) { + const char *hash; + json_t *txn_val; + + hash = json_string_value(hash_val); + if (unlikely(!hash)) { + LOGERR("Failed to get hash in rebuild_txns"); + ret = false; + goto out_unlock; + } + HASH_FIND_STR(sdata->txns, hash, txn); + if (unlikely(!txn)) { + LOGNOTICE("Failed to find txn in rebuild_txns"); + ret = false; + goto out_unlock; + } + JSON_CPACK(txn_val, "{ss,ss}", + "hash", hash, "data", txn->data); + json_array_append_new(txn_array, txn_val); + } +out_unlock: + ck_runlock(&sdata->workbase_lock); + + if (ret) { + LOGINFO("Rebuilt txns into workbase with %d transactions", (int)i); + wb_merkle_bins(sdata, wb, txn_array); + } + json_decref(txn_array); + + return ret; +} + static void add_node_base(ckpool_t *ckp, json_t *val) { workbase_t *wb = ckzalloc(sizeof(workbase_t)); sdata_t *sdata = ckp->data; bool new_block = false; + json_t *txnhashes; char header[228]; - int i; wb->ckp = ckp; json_int64cpy(&wb->id, val, "jobid"); @@ -1400,13 +1451,11 @@ static void add_node_base(ckpool_t *ckp, json_t *val) json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); json_intcpy(&wb->txns, val, "txns"); - if (wb->txns) - json_strdup(&wb->txn_data, val, "txn_data"); - json_intcpy(&wb->merkles, val, "merkles"); - wb->merkle_array = json_object_dup(val, "merklehash"); - for (i = 0; i < wb->merkles; i++) { - strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); - hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); + txnhashes = json_object_get(val, "txnhashes"); + if (!rebuild_txns(sdata, wb, txnhashes)) { + LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); + free(wb); + return; } json_strdup(&wb->coinb1, val, "coinb1"); json_intcpy(&wb->coinb1len, val, "coinb1len"); From b826ac25ab78d927b99f26af49343896ad0314ad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 23:01:58 +1100 Subject: [PATCH 11/15] Purge old transactions on mining nodes whose refcount drops to zero --- src/stratifier.c | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 5133fd1e..77bfc1ce 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1296,7 +1296,8 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) send_node_transactions(sdata, val); json_decref(val); - LOGINFO("Stratifier added %d transactions and purged %d", added, purged); + if (added || purged) + LOGINFO("Stratifier added %d transactions and purged %d", added, purged); } /* This function assumes it will only receive a valid json gbt base template @@ -1412,6 +1413,7 @@ static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes) ret = false; goto out_unlock; } + txn->refcount++; JSON_CPACK(txn_val, "{ss,ss}", "hash", hash, "data", txn->data); json_array_append_new(txn_array, txn_val); @@ -1433,8 +1435,10 @@ static void add_node_base(ckpool_t *ckp, json_t *val) workbase_t *wb = ckzalloc(sizeof(workbase_t)); sdata_t *sdata = ckp->data; bool new_block = false; + txntable_t *txn, *tmp; json_t *txnhashes; char header[228]; + int purged = 0; wb->ckp = ckp; json_int64cpy(&wb->id, val, "jobid"); @@ -1481,6 +1485,20 @@ static void add_node_base(ckpool_t *ckp, json_t *val) add_base(ckp, sdata, wb, &new_block); if (new_block) LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); + + ck_wlock(&sdata->workbase_lock); + HASH_ITER(hh, sdata->txns, txn, tmp) { + if (txn->refcount--) + continue; + HASH_DEL(sdata->txns, txn); + dealloc(txn->data); + dealloc(txn); + purged++; + } + ck_wunlock(&sdata->workbase_lock); + + if (purged) + LOGINFO("Stratifier purged %d node transactions", purged); } /* Calculate share diff and fill in hash and swap */ @@ -6298,7 +6316,8 @@ static void add_node_txns(sdata_t *sdata, const json_t *val) } ck_wunlock(&sdata->workbase_lock); - LOGNOTICE("Stratifier added %d node transactions", added); + if (added) + LOGINFO("Stratifier added %d node transactions", added); } /* Entered with client holding ref count */ From 8d3dbb790f166293ffddf34e1e81a7154cbc788a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 3 Feb 2016 23:42:14 +1100 Subject: [PATCH 12/15] Array transaction order matters and json_array_foreach does not iterate in order --- src/stratifier.c | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 77bfc1ce..a83c0780 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -899,7 +899,6 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) json_set_string(wb_val, "flags", wb->flags); /* Set to zero to be backwards compat with older node code */ json_set_int(wb_val, "transactions", 0); - json_set_int(wb_val, "txns", wb->txns); json_object_set_new_nocheck(wb_val, "txnhashes", txn_array); json_set_int(wb_val, "merkles", wb->merkles); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); @@ -1267,6 +1266,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) binlen = binleft * 32; } } + LOGINFO("Stored %d transactions", wb->txns); txn_array = json_array(); @@ -1389,18 +1389,19 @@ out: static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes) { + int i, arr_size = json_array_size(txnhashes); json_t *txn_array, *hash_val; txntable_t *txn; bool ret = true; - size_t i; txn_array = json_array(); ck_rlock(&sdata->workbase_lock); - json_array_foreach(txnhashes, i, hash_val) { + for (i = 0; i < arr_size; i++) { const char *hash; json_t *txn_val; + hash_val = json_array_get(txnhashes, i); hash = json_string_value(hash_val); if (unlikely(!hash)) { LOGERR("Failed to get hash in rebuild_txns"); @@ -1435,10 +1436,8 @@ static void add_node_base(ckpool_t *ckp, json_t *val) workbase_t *wb = ckzalloc(sizeof(workbase_t)); sdata_t *sdata = ckp->data; bool new_block = false; - txntable_t *txn, *tmp; json_t *txnhashes; char header[228]; - int purged = 0; wb->ckp = ckp; json_int64cpy(&wb->id, val, "jobid"); @@ -1454,7 +1453,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val) json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); - json_intcpy(&wb->txns, val, "txns"); txnhashes = json_object_get(val, "txnhashes"); if (!rebuild_txns(sdata, wb, txnhashes)) { LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); @@ -1485,20 +1483,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val) add_base(ckp, sdata, wb, &new_block); if (new_block) LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); - - ck_wlock(&sdata->workbase_lock); - HASH_ITER(hh, sdata->txns, txn, tmp) { - if (txn->refcount--) - continue; - HASH_DEL(sdata->txns, txn); - dealloc(txn->data); - dealloc(txn); - purged++; - } - ck_wunlock(&sdata->workbase_lock); - - if (purged) - LOGINFO("Stratifier purged %d node transactions", purged); } /* Calculate share diff and fill in hash and swap */ @@ -6287,15 +6271,17 @@ static void add_node_txns(sdata_t *sdata, const json_t *val) { json_t *txn_array, *txn_val, *data_val, *hash_val; txntable_t *txn; + int i, arr_size; int added = 0; - size_t i; txn_array = json_object_get(val, "transaction"); + arr_size = json_array_size(txn_array); ck_wlock(&sdata->workbase_lock); - json_array_foreach(txn_array, i, txn_val) { + for (i = 0; i < arr_size; i++) { const char *hash, *data; + txn_val = json_array_get(txn_array, i); data_val = json_object_get(txn_val, "data"); hash_val = json_object_get(txn_val, "hash"); data = json_string_value(data_val); From 76dea2d7faad95098b26a9b19fab7f4172a6c95c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 4 Feb 2016 00:34:18 +1100 Subject: [PATCH 13/15] Use the existing txn_hashes string to rebuild transactions on nodes --- src/stratifier.c | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index a83c0780..6b724e7e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -868,19 +868,10 @@ static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_i static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) { - json_t *wb_val, *txn_array; stratum_instance_t *client; ckmsg_t *bulk_send = NULL; - txntable_t *txn, *tmp; int messages = 0; - - txn_array = json_array(); - - ck_rlock(&sdata->workbase_lock); - HASH_ITER(hh, sdata->txns, txn, tmp) { - json_array_append_new(txn_array, json_string(txn->hash)); - } - ck_runlock(&sdata->workbase_lock); + json_t *wb_val; wb_val = json_object(); @@ -899,7 +890,8 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) json_set_string(wb_val, "flags", wb->flags); /* Set to zero to be backwards compat with older node code */ json_set_int(wb_val, "transactions", 0); - json_object_set_new_nocheck(wb_val, "txnhashes", txn_array); + json_set_int(wb_val, "txns", wb->txns); + json_set_string(wb_val, "txn_hashes", wb->txn_hashes); json_set_int(wb_val, "merkles", wb->merkles); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); json_set_string(wb_val, "coinb1", wb->coinb1); @@ -1253,7 +1245,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) memcpy(&wb->merklebin[wb->merkles][0], hashbin + 32, 32); __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[wb->merkles][0], 32); json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[wb->merkles][0])); - LOGDEBUG("MH%d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); + LOGDEBUG("MerkleHash %d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); wb->merkles++; if (binleft % 2) { memcpy(hashbin + binlen, hashbin + binlen - 32, 32); @@ -1389,25 +1381,32 @@ out: static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes) { - int i, arr_size = json_array_size(txnhashes); - json_t *txn_array, *hash_val; + const char *hashes = json_string_value(txnhashes); + json_t *txn_array; txntable_t *txn; bool ret = true; + int i, len; + if (likely(hashes)) + len = strlen(hashes); + else + len = 0; + if (!hashes || !len) + return ret; + + if (unlikely(len < wb->txns * 65)) { + LOGERR("Truncated transactions in rebuild_txns only %d long", len); + return false; + } txn_array = json_array(); ck_rlock(&sdata->workbase_lock); - for (i = 0; i < arr_size; i++) { - const char *hash; + for (i = 0; i < wb->txns; i++) { json_t *txn_val; + char hash[68]; - hash_val = json_array_get(txnhashes, i); - hash = json_string_value(hash_val); - if (unlikely(!hash)) { - LOGERR("Failed to get hash in rebuild_txns"); - ret = false; - goto out_unlock; - } + memcpy(hash, hashes + i * 65, 64); + hash[64] = '\0'; HASH_FIND_STR(sdata->txns, hash, txn); if (unlikely(!txn)) { LOGNOTICE("Failed to find txn in rebuild_txns"); @@ -1453,7 +1452,8 @@ static void add_node_base(ckpool_t *ckp, json_t *val) json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); - txnhashes = json_object_get(val, "txnhashes"); + json_intcpy(&wb->txns, val, "txns"); + txnhashes = json_object_get(val, "txn_hashes"); if (!rebuild_txns(sdata, wb, txnhashes)) { LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); free(wb); From 71f0077f748a9faa20cb6f6fc15f4a574c2143ea Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 4 Feb 2016 09:13:00 +1100 Subject: [PATCH 14/15] Make new node compatible with old pool comms protocol --- src/stratifier.c | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 6b724e7e..3a27b84a 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1452,12 +1452,26 @@ static void add_node_base(ckpool_t *ckp, json_t *val) json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); - json_intcpy(&wb->txns, val, "txns"); - txnhashes = json_object_get(val, "txn_hashes"); - if (!rebuild_txns(sdata, wb, txnhashes)) { - LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); - free(wb); - return; + /* First see if the server uses the old communication format */ + json_intcpy(&wb->txns, val, "transactions"); + if (wb->txns) { + int i; + + json_strdup(&wb->txn_data, val, "txn_data"); + json_intcpy(&wb->merkles, val, "merkles"); + wb->merkle_array = json_object_dup(val, "merklehash"); + for (i = 0; i < wb->merkles; i++) { + strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); + hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); + } + } else { + json_intcpy(&wb->txns, val, "txns"); + txnhashes = json_object_get(val, "txn_hashes"); + if (!rebuild_txns(sdata, wb, txnhashes)) { + LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); + free(wb); + return; + } } json_strdup(&wb->coinb1, val, "coinb1"); json_intcpy(&wb->coinb1len, val, "coinb1len"); From 2c0b03877bb03d453760022023a40f57109ae1e9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 4 Feb 2016 09:50:31 +1100 Subject: [PATCH 15/15] Create the ckdb heartbeat thread to keep throbber moving even when standalone pool/node is idle --- src/stratifier.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 3a27b84a..623b2400 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -7541,10 +7541,8 @@ int stratifier(proc_instance_t *pi) sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - if (!CKP_STANDALONE(ckp)) { - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); - create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); - } + sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); cklock_init(&sdata->workbase_lock);