diff --git a/src/ckpool.h b/src/ckpool.h index 12b35a83..b5cda264 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -278,6 +278,7 @@ enum stratum_msgtype { SM_SUGGESTDIFF, SM_BLOCK, SM_PONG, + SM_TRANSACTIONS, SM_NONE }; @@ -300,6 +301,7 @@ static const char __maybe_unused *stratum_msgs[] = { "suggestdiff", "block", "pong", + "transactions", "" }; diff --git a/src/libckpool.c b/src/libckpool.c index 3888efc2..59b132c9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -966,7 +966,7 @@ int wait_read_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLIN | EPOLLRDHUP; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); @@ -1048,7 +1048,7 @@ int wait_write_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLOUT | EPOLLRDHUP ; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); diff --git a/src/stratifier.c b/src/stratifier.c index 47b940c5..623b2400 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -381,7 +381,7 @@ struct txntable { UT_hash_handle hh; int id; char hash[68]; - const char *data; + char *data; int refcount; }; @@ -871,54 +871,54 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) stratum_instance_t *client; ckmsg_t *bulk_send = NULL; int messages = 0; + json_t *wb_val; - ck_rlock(&sdata->instance_lock); - if (sdata->node_instances) { - json_t *wb_val = json_object(); - - json_set_int(wb_val, "jobid", wb->id); - json_set_string(wb_val, "target", wb->target); - json_set_double(wb_val, "diff", wb->diff); - json_set_int(wb_val, "version", wb->version); - json_set_int(wb_val, "curtime", wb->curtime); - json_set_string(wb_val, "prevhash", wb->prevhash); - json_set_string(wb_val, "ntime", wb->ntime); - json_set_string(wb_val, "bbversion", wb->bbversion); - json_set_string(wb_val, "nbit", wb->nbit); - json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); - json_set_int(wb_val, "height", wb->height); - json_set_string(wb_val, "flags", wb->flags); - json_set_int(wb_val, "txns", wb->txns); - if (likely(wb->txns)) - json_set_string(wb_val, "txn_data", wb->txn_data); - /* We don't need txn_hashes */ - json_set_int(wb_val, "merkles", wb->merkles); - json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); - json_set_string(wb_val, "coinb1", wb->coinb1); - json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); - json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); - json_set_int(wb_val, "coinb1len", wb->coinb1len); - json_set_int(wb_val, "coinb2len", wb->coinb2len); - json_set_string(wb_val, "coinb2", wb->coinb2); + wb_val = json_object(); - DL_FOREACH(sdata->node_instances, client) { - ckmsg_t *client_msg; - smsg_t *msg; - json_t *json_msg = json_deep_copy(wb_val); + ck_rlock(&sdata->instance_lock); + json_set_int(wb_val, "jobid", wb->id); + json_set_string(wb_val, "target", wb->target); + json_set_double(wb_val, "diff", wb->diff); + json_set_int(wb_val, "version", wb->version); + json_set_int(wb_val, "curtime", wb->curtime); + json_set_string(wb_val, "prevhash", wb->prevhash); + json_set_string(wb_val, "ntime", wb->ntime); + json_set_string(wb_val, "bbversion", wb->bbversion); + json_set_string(wb_val, "nbit", wb->nbit); + json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); + json_set_int(wb_val, "height", wb->height); + json_set_string(wb_val, "flags", wb->flags); + /* Set to zero to be backwards compat with older node code */ + json_set_int(wb_val, "transactions", 0); + json_set_int(wb_val, "txns", wb->txns); + json_set_string(wb_val, "txn_hashes", wb->txn_hashes); + json_set_int(wb_val, "merkles", wb->merkles); + json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); + json_set_string(wb_val, "coinb1", wb->coinb1); + json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); + json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); + json_set_int(wb_val, "coinb1len", wb->coinb1len); + json_set_int(wb_val, "coinb2len", wb->coinb2len); + json_set_string(wb_val, "coinb2", wb->coinb2); + + DL_FOREACH(sdata->node_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(wb_val); - json_set_string(json_msg, "node.method", stratum_msgs[SM_WORKINFO]); - client_msg = ckalloc(sizeof(ckmsg_t)); - msg = ckzalloc(sizeof(smsg_t)); - msg->json_msg = json_msg; - msg->client_id = client->id; - client_msg->data = msg; - DL_APPEND(bulk_send, client_msg); - messages++; - } - json_decref(wb_val); + json_set_string(json_msg, "node.method", stratum_msgs[SM_WORKINFO]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; } ck_runlock(&sdata->instance_lock); + json_decref(wb_val); + /* We send workinfo postponed till after the stratum updates are sent * out to minimise any lag seen by clients getting updates. It means * the remote node will know about the block change later which will @@ -1117,7 +1117,7 @@ static void broadcast_ping(sdata_t *sdata); /* Build a hashlist of all transactions, allowing us to compare with the list of * existing transactions to determine which need to be propagated */ -static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const char *data) +static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const char *data) { bool found = false; txntable_t *txn; @@ -1129,15 +1129,44 @@ static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const ch found = true; } ck_runlock(&sdata->workbase_lock); + if (found) return; txn = ckzalloc(sizeof(txntable_t)); memcpy(txn->hash, hash, 65); - /* Note that data is pointing to a string in a json struture which will - * be destroyed so we can only use the data value until then. */ - txn->data = data; - HASH_ADD_STR(txns, hash, txn); + txn->data = strdup(data); + txn->refcount = 10; + HASH_ADD_STR(*txns, hash, txn); +} + +static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) +{ + stratum_instance_t *client; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + DL_FOREACH(sdata->node_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(txn_val); + + json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGINFO("Sending transactions to mining nodes"); + ssend_bulk_append(sdata, bulk_send, messages); + } } /* Distill down a set of transactions into an efficient tree arrangement for @@ -1146,7 +1175,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) { int i, j, binleft, binlen, added = 0, purged = 0; txntable_t *txns = NULL, *tmp, *tmpa; - json_t *arr_val; + json_t *arr_val, *val; uchar *hashbin; wb->txns = json_array_size(txn_array); @@ -1180,7 +1209,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) arr_val = json_array_get(txn_array, i); hash = json_string_value(json_object_get(arr_val, "hash")); txn = json_string_value(json_object_get(arr_val, "data")); - add_txn(sdata, txns, hash, txn); + add_txn(sdata, &txns, hash, txn); len = strlen(txn); memcpy(wb->txn_data + ofs, txn, len); ofs += len; @@ -1208,15 +1237,15 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } } else wb->txn_hashes = ckzalloc(1); + wb->merkle_array = json_array(); if (binleft > 1) { while (42) { - uchar merklebin[32]; - if (binleft == 1) break; - memcpy(merklebin, hashbin + 32, 32); - __bin2hex(&wb->merklehash[wb->merkles][0], merklebin, 32); - LOGDEBUG("MH%d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); + memcpy(&wb->merklebin[wb->merkles][0], hashbin + 32, 32); + __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[wb->merkles][0], 32); + json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[wb->merkles][0])); + LOGDEBUG("MerkleHash %d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); wb->merkles++; if (binleft % 2) { memcpy(hashbin + binlen, hashbin + binlen - 32, 32); @@ -1229,28 +1258,38 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) binlen = binleft * 32; } } + LOGINFO("Stored %d transactions", wb->txns); + + txn_array = json_array(); ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->txns, tmp, tmpa) { if (tmp->refcount--) continue; HASH_DEL(sdata->txns, tmp); + dealloc(tmp->data); dealloc(tmp); purged++; } HASH_ITER(hh, txns, tmp, tmpa) { + json_t *txn_val; + /* Propagate transaction here */ + JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data); + json_array_append_new(txn_array, txn_val); /* Move to the sdata transaction table */ HASH_DEL(txns, tmp); - HASH_ADD_STR(sdata->txns, data, tmp); - /* Empty data once used to not dereference since the json structure - * will be destroyed. */ - tmp->data = NULL; + HASH_ADD_STR(sdata->txns, hash, tmp); added++; } ck_wunlock(&sdata->workbase_lock); - LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); + JSON_CPACK(val, "{so}", "transaction", txn_array); + send_node_transactions(sdata, val); + json_decref(val); + + if (added || purged) + LOGINFO("Stratifier added %d transactions and purged %d", added, purged); } /* This function assumes it will only receive a valid json gbt base template @@ -1340,13 +1379,64 @@ out: return NULL; } +static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes) +{ + const char *hashes = json_string_value(txnhashes); + json_t *txn_array; + txntable_t *txn; + bool ret = true; + int i, len; + + if (likely(hashes)) + len = strlen(hashes); + else + len = 0; + if (!hashes || !len) + return ret; + + if (unlikely(len < wb->txns * 65)) { + LOGERR("Truncated transactions in rebuild_txns only %d long", len); + return false; + } + txn_array = json_array(); + + ck_rlock(&sdata->workbase_lock); + for (i = 0; i < wb->txns; i++) { + json_t *txn_val; + char hash[68]; + + memcpy(hash, hashes + i * 65, 64); + hash[64] = '\0'; + HASH_FIND_STR(sdata->txns, hash, txn); + if (unlikely(!txn)) { + LOGNOTICE("Failed to find txn in rebuild_txns"); + ret = false; + goto out_unlock; + } + txn->refcount++; + JSON_CPACK(txn_val, "{ss,ss}", + "hash", hash, "data", txn->data); + json_array_append_new(txn_array, txn_val); + } +out_unlock: + ck_runlock(&sdata->workbase_lock); + + if (ret) { + LOGINFO("Rebuilt txns into workbase with %d transactions", (int)i); + wb_merkle_bins(sdata, wb, txn_array); + } + json_decref(txn_array); + + return ret; +} + static void add_node_base(ckpool_t *ckp, json_t *val) { workbase_t *wb = ckzalloc(sizeof(workbase_t)); sdata_t *sdata = ckp->data; bool new_block = false; + json_t *txnhashes; char header[228]; - int i; wb->ckp = ckp; json_int64cpy(&wb->id, val, "jobid"); @@ -1362,14 +1452,26 @@ static void add_node_base(ckpool_t *ckp, json_t *val) json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); - json_intcpy(&wb->txns, val, "txns"); - if (wb->txns) + /* First see if the server uses the old communication format */ + json_intcpy(&wb->txns, val, "transactions"); + if (wb->txns) { + int i; + json_strdup(&wb->txn_data, val, "txn_data"); - json_intcpy(&wb->merkles, val, "merkles"); - wb->merkle_array = json_object_dup(val, "merklehash"); - for (i = 0; i < wb->merkles; i++) { - strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); - hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); + json_intcpy(&wb->merkles, val, "merkles"); + wb->merkle_array = json_object_dup(val, "merklehash"); + for (i = 0; i < wb->merkles; i++) { + strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); + hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); + } + } else { + json_intcpy(&wb->txns, val, "txns"); + txnhashes = json_object_get(val, "txn_hashes"); + if (!rebuild_txns(sdata, wb, txnhashes)) { + LOGWARNING("Unable to rebuild transactions from hashes to create workinfo"); + free(wb); + return; + } } json_strdup(&wb->coinb1, val, "coinb1"); json_intcpy(&wb->coinb1len, val, "coinb1len"); @@ -5684,7 +5786,33 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const stratum_send_update(sdata, client_id, true); } -static void *set_node_latency(void *arg) +/* When a node first connects it has no transactions so we have to send all + * current ones to it. */ +static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client) +{ + json_t *txn_array, *val, *txn_val; + txntable_t *txn, *tmp; + smsg_t *msg; + + txn_array = json_array(); + + ck_rlock(&sdata->workbase_lock); + HASH_ITER(hh, sdata->txns, txn, tmp) { + JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data); + json_array_append_new(txn_array, txn_val); + } + ck_runlock(&sdata->workbase_lock); + + JSON_CPACK(val, "{ss,so}", "node.method", stratum_msgs[SM_TRANSACTIONS], + "transaction", txn_array); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = val; + msg->client_id = client->id; + ckmsgq_add(sdata->ssends, msg); + LOGNOTICE("Sending new node client %"PRId64" all transactions", client->id); +} + +static void *setup_node(void *arg) { stratum_instance_t *client = (stratum_instance_t *)arg; @@ -5693,6 +5821,8 @@ static void *set_node_latency(void *arg) client->latency = round_trip(client->address) / 2; LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id, client->address, client->latency); + sleep(5); + send_node_all_txns(client->sdata, client); dec_instance_ref(client->sdata, client); return NULL; } @@ -5714,7 +5844,7 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id, client->address, client->server, ckp->serverurl[client->server]); - create_pthread(&pth, set_node_latency, client); + create_pthread(&pth, setup_node, client); } static void add_remote_server(sdata_t *sdata, stratum_instance_t *client) @@ -6151,6 +6281,45 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const LOGWARNING("unrecognised trusted message %s", buf); } +static void add_node_txns(sdata_t *sdata, const json_t *val) +{ + json_t *txn_array, *txn_val, *data_val, *hash_val; + txntable_t *txn; + int i, arr_size; + int added = 0; + + txn_array = json_object_get(val, "transaction"); + arr_size = json_array_size(txn_array); + + ck_wlock(&sdata->workbase_lock); + for (i = 0; i < arr_size; i++) { + const char *hash, *data; + + txn_val = json_array_get(txn_array, i); + data_val = json_object_get(txn_val, "data"); + hash_val = json_object_get(txn_val, "hash"); + data = json_string_value(data_val); + hash = json_string_value(hash_val); + if (unlikely(!data || !hash)) { + LOGERR("Failed to get hash/data in add_node_txns"); + continue; + } + HASH_FIND_STR(sdata->txns, hash, txn); + if (txn) + continue; + txn = ckzalloc(sizeof(txntable_t)); + memcpy(txn->hash, hash, 65); + txn->data = strdup(data); + txn->refcount = 10; + HASH_ADD_STR(sdata->txns, hash, txn); + added++; + } + ck_wunlock(&sdata->workbase_lock); + + if (added) + LOGINFO("Stratifier added %d node transactions", added); +} + /* Entered with client holding ref count */ static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client) { @@ -6211,6 +6380,9 @@ static void parse_node_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const cha } LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]); switch (msg_type) { + case SM_TRANSACTIONS: + add_node_txns(sdata, val); + break; case SM_WORKINFO: add_node_base(ckp, val); break; @@ -7369,10 +7541,8 @@ int stratifier(proc_instance_t *pi) sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - if (!CKP_STANDALONE(ckp)) { - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); - create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); - } + sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); cklock_init(&sdata->workbase_lock);