Browse Source

Send transactions to nodes

master
Con Kolivas 9 years ago
parent
commit
7615369d4a
  1. 2
      src/ckpool.h
  2. 4
      src/libckpool.c
  3. 41
      src/stratifier.c

2
src/ckpool.h

@ -278,6 +278,7 @@ enum stratum_msgtype {
SM_SUGGESTDIFF, SM_SUGGESTDIFF,
SM_BLOCK, SM_BLOCK,
SM_PONG, SM_PONG,
SM_TRANSACTIONS,
SM_NONE SM_NONE
}; };
@ -300,6 +301,7 @@ static const char __maybe_unused *stratum_msgs[] = {
"suggestdiff", "suggestdiff",
"block", "block",
"pong", "pong",
"transactions",
"" ""
}; };

4
src/libckpool.c

@ -966,7 +966,7 @@ int wait_read_select(int sockd, float timeout)
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; event.events = EPOLLIN | EPOLLRDHUP;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000; timeout *= 1000;
ret = epoll_wait(epfd, &event, 1, timeout); ret = epoll_wait(epfd, &event, 1, timeout);
@ -1048,7 +1048,7 @@ int wait_write_select(int sockd, float timeout)
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; event.events = EPOLLOUT | EPOLLRDHUP ;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000; timeout *= 1000;
ret = epoll_wait(epfd, &event, 1, timeout); ret = epoll_wait(epfd, &event, 1, timeout);

41
src/stratifier.c

@ -1140,13 +1140,42 @@ static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const c
HASH_ADD_STR(*txns, hash, txn); HASH_ADD_STR(*txns, hash, txn);
} }
static void send_node_transactions(sdata_t *sdata, const json_t *txn_val)
{
stratum_instance_t *client;
ckmsg_t *bulk_send = NULL;
int messages = 0;
ck_rlock(&sdata->instance_lock);
DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg = json_deep_copy(txn_val);
json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&sdata->instance_lock);
if (bulk_send) {
LOGINFO("Sending transactions to mining nodes");
ssend_bulk_append(sdata, bulk_send, messages);
}
}
/* Distill down a set of transactions into an efficient tree arrangement for /* Distill down a set of transactions into an efficient tree arrangement for
* stratum messages and fast work assembly. */ * stratum messages and fast work assembly. */
static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array) static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
{ {
int i, j, binleft, binlen, added = 0, purged = 0; int i, j, binleft, binlen, added = 0, purged = 0;
txntable_t *txns = NULL, *tmp, *tmpa; txntable_t *txns = NULL, *tmp, *tmpa;
json_t *arr_val; json_t *arr_val, *val;
uchar *hashbin; uchar *hashbin;
wb->txns = json_array_size(txn_array); wb->txns = json_array_size(txn_array);
@ -1230,6 +1259,8 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
} }
} }
txn_array = json_array();
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->txns, tmp, tmpa) { HASH_ITER(hh, sdata->txns, tmp, tmpa) {
if (tmp->refcount--) if (tmp->refcount--)
@ -1239,7 +1270,11 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
purged++; purged++;
} }
HASH_ITER(hh, txns, tmp, tmpa) { HASH_ITER(hh, txns, tmp, tmpa) {
json_t *txn_val;
/* Propagate transaction here */ /* Propagate transaction here */
JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data);
json_array_append_new(txn_array, txn_val);
/* Move to the sdata transaction table */ /* Move to the sdata transaction table */
HASH_DEL(txns, tmp); HASH_DEL(txns, tmp);
HASH_ADD_STR(sdata->txns, hash, tmp); HASH_ADD_STR(sdata->txns, hash, tmp);
@ -1250,6 +1285,10 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
JSON_CPACK(val, "{so}", "transaction", txn_array);
send_node_transactions(sdata, val);
json_decref(val);
LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged);
} }

Loading…
Cancel
Save