diff --git a/src/ckpool.h b/src/ckpool.h index 12b35a83..b5cda264 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -278,6 +278,7 @@ enum stratum_msgtype { SM_SUGGESTDIFF, SM_BLOCK, SM_PONG, + SM_TRANSACTIONS, SM_NONE }; @@ -300,6 +301,7 @@ static const char __maybe_unused *stratum_msgs[] = { "suggestdiff", "block", "pong", + "transactions", "" }; diff --git a/src/libckpool.c b/src/libckpool.c index 3888efc2..59b132c9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -966,7 +966,7 @@ int wait_read_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLIN | EPOLLRDHUP; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); @@ -1048,7 +1048,7 @@ int wait_write_select(int sockd, float timeout) int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); - event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; + event.events = EPOLLOUT | EPOLLRDHUP ; epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; ret = epoll_wait(epfd, &event, 1, timeout); diff --git a/src/stratifier.c b/src/stratifier.c index e740c972..b1648b47 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1140,13 +1140,42 @@ static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const c HASH_ADD_STR(*txns, hash, txn); } +static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) +{ + stratum_instance_t *client; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + ck_rlock(&sdata->instance_lock); + DL_FOREACH(sdata->node_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(txn_val); + + json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + ck_runlock(&sdata->instance_lock); + + if (bulk_send) { + LOGINFO("Sending transactions to mining nodes"); + ssend_bulk_append(sdata, bulk_send, messages); + } +} + /* Distill down a set of transactions into an efficient tree arrangement for * stratum messages and fast work assembly. */ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) { int i, j, binleft, binlen, added = 0, purged = 0; txntable_t *txns = NULL, *tmp, *tmpa; - json_t *arr_val; + json_t *arr_val, *val; uchar *hashbin; wb->txns = json_array_size(txn_array); @@ -1230,6 +1259,8 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } } + txn_array = json_array(); + ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->txns, tmp, tmpa) { if (tmp->refcount--) @@ -1239,7 +1270,11 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) purged++; } HASH_ITER(hh, txns, tmp, tmpa) { + json_t *txn_val; + /* Propagate transaction here */ + JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data); + json_array_append_new(txn_array, txn_val); /* Move to the sdata transaction table */ HASH_DEL(txns, tmp); HASH_ADD_STR(sdata->txns, hash, tmp); @@ -1250,6 +1285,10 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) } ck_wunlock(&sdata->workbase_lock); + JSON_CPACK(val, "{so}", "transaction", txn_array); + send_node_transactions(sdata, val); + json_decref(val); + LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); }