Browse Source

Merge branch 'txnlist'

master
Con Kolivas 9 years ago
parent
commit
2ba1b073d4
  1. 2
      src/ckpool.h
  2. 4
      src/libckpool.c
  3. 236
      src/stratifier.c

2
src/ckpool.h

@ -278,6 +278,7 @@ enum stratum_msgtype {
SM_SUGGESTDIFF, SM_SUGGESTDIFF,
SM_BLOCK, SM_BLOCK,
SM_PONG, SM_PONG,
SM_TRANSACTIONS,
SM_NONE SM_NONE
}; };
@ -300,6 +301,7 @@ static const char __maybe_unused *stratum_msgs[] = {
"suggestdiff", "suggestdiff",
"block", "block",
"pong", "pong",
"transactions",
"" ""
}; };

4
src/libckpool.c

@ -966,7 +966,7 @@ int wait_read_select(int sockd, float timeout)
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; event.events = EPOLLIN | EPOLLRDHUP;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000; timeout *= 1000;
ret = epoll_wait(epfd, &event, 1, timeout); ret = epoll_wait(epfd, &event, 1, timeout);
@ -1048,7 +1048,7 @@ int wait_write_select(int sockd, float timeout)
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; event.events = EPOLLOUT | EPOLLRDHUP ;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000; timeout *= 1000;
ret = epoll_wait(epfd, &event, 1, timeout); ret = epoll_wait(epfd, &event, 1, timeout);

236
src/stratifier.c

@ -381,7 +381,7 @@ struct txntable {
UT_hash_handle hh; UT_hash_handle hh;
int id; int id;
char hash[68]; char hash[68];
const char *data; char *data;
int refcount; int refcount;
}; };
@ -871,11 +871,11 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
stratum_instance_t *client; stratum_instance_t *client;
ckmsg_t *bulk_send = NULL; ckmsg_t *bulk_send = NULL;
int messages = 0; int messages = 0;
json_t *wb_val;
ck_rlock(&sdata->instance_lock); wb_val = json_object();
if (sdata->node_instances) {
json_t *wb_val = json_object();
ck_rlock(&sdata->instance_lock);
json_set_int(wb_val, "jobid", wb->id); json_set_int(wb_val, "jobid", wb->id);
json_set_string(wb_val, "target", wb->target); json_set_string(wb_val, "target", wb->target);
json_set_double(wb_val, "diff", wb->diff); json_set_double(wb_val, "diff", wb->diff);
@ -888,10 +888,10 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue);
json_set_int(wb_val, "height", wb->height); json_set_int(wb_val, "height", wb->height);
json_set_string(wb_val, "flags", wb->flags); json_set_string(wb_val, "flags", wb->flags);
/* Set to zero to be backwards compat with older node code */
json_set_int(wb_val, "transactions", 0);
json_set_int(wb_val, "txns", wb->txns); json_set_int(wb_val, "txns", wb->txns);
if (likely(wb->txns)) json_set_string(wb_val, "txn_hashes", wb->txn_hashes);
json_set_string(wb_val, "txn_data", wb->txn_data);
/* We don't need txn_hashes */
json_set_int(wb_val, "merkles", wb->merkles); json_set_int(wb_val, "merkles", wb->merkles);
json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array));
json_set_string(wb_val, "coinb1", wb->coinb1); json_set_string(wb_val, "coinb1", wb->coinb1);
@ -915,10 +915,10 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
DL_APPEND(bulk_send, client_msg); DL_APPEND(bulk_send, client_msg);
messages++; messages++;
} }
json_decref(wb_val);
}
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
json_decref(wb_val);
/* We send workinfo postponed till after the stratum updates are sent /* We send workinfo postponed till after the stratum updates are sent
* out to minimise any lag seen by clients getting updates. It means * out to minimise any lag seen by clients getting updates. It means
* the remote node will know about the block change later which will * the remote node will know about the block change later which will
@ -1117,7 +1117,7 @@ static void broadcast_ping(sdata_t *sdata);
/* Build a hashlist of all transactions, allowing us to compare with the list of /* Build a hashlist of all transactions, allowing us to compare with the list of
* existing transactions to determine which need to be propagated */ * existing transactions to determine which need to be propagated */
static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const char *data) static void add_txn(sdata_t *sdata, txntable_t **txns, const char *hash, const char *data)
{ {
bool found = false; bool found = false;
txntable_t *txn; txntable_t *txn;
@ -1129,15 +1129,44 @@ static void add_txn(sdata_t *sdata, txntable_t *txns, const char *hash, const ch
found = true; found = true;
} }
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
if (found) if (found)
return; return;
txn = ckzalloc(sizeof(txntable_t)); txn = ckzalloc(sizeof(txntable_t));
memcpy(txn->hash, hash, 65); memcpy(txn->hash, hash, 65);
/* Note that data is pointing to a string in a json struture which will txn->data = strdup(data);
* be destroyed so we can only use the data value until then. */ txn->refcount = 10;
txn->data = data; HASH_ADD_STR(*txns, hash, txn);
HASH_ADD_STR(txns, hash, txn); }
static void send_node_transactions(sdata_t *sdata, const json_t *txn_val)
{
stratum_instance_t *client;
ckmsg_t *bulk_send = NULL;
int messages = 0;
ck_rlock(&sdata->instance_lock);
DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg = json_deep_copy(txn_val);
json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&sdata->instance_lock);
if (bulk_send) {
LOGINFO("Sending transactions to mining nodes");
ssend_bulk_append(sdata, bulk_send, messages);
}
} }
/* Distill down a set of transactions into an efficient tree arrangement for /* Distill down a set of transactions into an efficient tree arrangement for
@ -1146,7 +1175,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
{ {
int i, j, binleft, binlen, added = 0, purged = 0; int i, j, binleft, binlen, added = 0, purged = 0;
txntable_t *txns = NULL, *tmp, *tmpa; txntable_t *txns = NULL, *tmp, *tmpa;
json_t *arr_val; json_t *arr_val, *val;
uchar *hashbin; uchar *hashbin;
wb->txns = json_array_size(txn_array); wb->txns = json_array_size(txn_array);
@ -1180,7 +1209,7 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
arr_val = json_array_get(txn_array, i); arr_val = json_array_get(txn_array, i);
hash = json_string_value(json_object_get(arr_val, "hash")); hash = json_string_value(json_object_get(arr_val, "hash"));
txn = json_string_value(json_object_get(arr_val, "data")); txn = json_string_value(json_object_get(arr_val, "data"));
add_txn(sdata, txns, hash, txn); add_txn(sdata, &txns, hash, txn);
len = strlen(txn); len = strlen(txn);
memcpy(wb->txn_data + ofs, txn, len); memcpy(wb->txn_data + ofs, txn, len);
ofs += len; ofs += len;
@ -1208,15 +1237,15 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
} }
} else } else
wb->txn_hashes = ckzalloc(1); wb->txn_hashes = ckzalloc(1);
wb->merkle_array = json_array();
if (binleft > 1) { if (binleft > 1) {
while (42) { while (42) {
uchar merklebin[32];
if (binleft == 1) if (binleft == 1)
break; break;
memcpy(merklebin, hashbin + 32, 32); memcpy(&wb->merklebin[wb->merkles][0], hashbin + 32, 32);
__bin2hex(&wb->merklehash[wb->merkles][0], merklebin, 32); __bin2hex(&wb->merklehash[wb->merkles][0], &wb->merklebin[wb->merkles][0], 32);
LOGDEBUG("MH%d %s",wb->merkles, &wb->merklehash[wb->merkles][0]); json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[wb->merkles][0]));
LOGDEBUG("MerkleHash %d %s",wb->merkles, &wb->merklehash[wb->merkles][0]);
wb->merkles++; wb->merkles++;
if (binleft % 2) { if (binleft % 2) {
memcpy(hashbin + binlen, hashbin + binlen - 32, 32); memcpy(hashbin + binlen, hashbin + binlen - 32, 32);
@ -1229,28 +1258,38 @@ static void wb_merkle_bins(sdata_t *sdata, workbase_t *wb, json_t *txn_array)
binlen = binleft * 32; binlen = binleft * 32;
} }
} }
LOGINFO("Stored %d transactions", wb->txns);
txn_array = json_array();
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->txns, tmp, tmpa) { HASH_ITER(hh, sdata->txns, tmp, tmpa) {
if (tmp->refcount--) if (tmp->refcount--)
continue; continue;
HASH_DEL(sdata->txns, tmp); HASH_DEL(sdata->txns, tmp);
dealloc(tmp->data);
dealloc(tmp); dealloc(tmp);
purged++; purged++;
} }
HASH_ITER(hh, txns, tmp, tmpa) { HASH_ITER(hh, txns, tmp, tmpa) {
json_t *txn_val;
/* Propagate transaction here */ /* Propagate transaction here */
JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data);
json_array_append_new(txn_array, txn_val);
/* Move to the sdata transaction table */ /* Move to the sdata transaction table */
HASH_DEL(txns, tmp); HASH_DEL(txns, tmp);
HASH_ADD_STR(sdata->txns, data, tmp); HASH_ADD_STR(sdata->txns, hash, tmp);
/* Empty data once used to not dereference since the json structure
* will be destroyed. */
tmp->data = NULL;
added++; added++;
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
LOGDEBUG("Stratifier added %d transactions and purged %d", added, purged); JSON_CPACK(val, "{so}", "transaction", txn_array);
send_node_transactions(sdata, val);
json_decref(val);
if (added || purged)
LOGINFO("Stratifier added %d transactions and purged %d", added, purged);
} }
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
@ -1340,13 +1379,64 @@ out:
return NULL; return NULL;
} }
static bool rebuild_txns(sdata_t *sdata, workbase_t *wb, json_t *txnhashes)
{
const char *hashes = json_string_value(txnhashes);
json_t *txn_array;
txntable_t *txn;
bool ret = true;
int i, len;
if (likely(hashes))
len = strlen(hashes);
else
len = 0;
if (!hashes || !len)
return ret;
if (unlikely(len < wb->txns * 65)) {
LOGERR("Truncated transactions in rebuild_txns only %d long", len);
return false;
}
txn_array = json_array();
ck_rlock(&sdata->workbase_lock);
for (i = 0; i < wb->txns; i++) {
json_t *txn_val;
char hash[68];
memcpy(hash, hashes + i * 65, 64);
hash[64] = '\0';
HASH_FIND_STR(sdata->txns, hash, txn);
if (unlikely(!txn)) {
LOGNOTICE("Failed to find txn in rebuild_txns");
ret = false;
goto out_unlock;
}
txn->refcount++;
JSON_CPACK(txn_val, "{ss,ss}",
"hash", hash, "data", txn->data);
json_array_append_new(txn_array, txn_val);
}
out_unlock:
ck_runlock(&sdata->workbase_lock);
if (ret) {
LOGINFO("Rebuilt txns into workbase with %d transactions", (int)i);
wb_merkle_bins(sdata, wb, txn_array);
}
json_decref(txn_array);
return ret;
}
static void add_node_base(ckpool_t *ckp, json_t *val) static void add_node_base(ckpool_t *ckp, json_t *val)
{ {
workbase_t *wb = ckzalloc(sizeof(workbase_t)); workbase_t *wb = ckzalloc(sizeof(workbase_t));
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
bool new_block = false; bool new_block = false;
json_t *txnhashes;
char header[228]; char header[228];
int i;
wb->ckp = ckp; wb->ckp = ckp;
json_int64cpy(&wb->id, val, "jobid"); json_int64cpy(&wb->id, val, "jobid");
@ -1362,8 +1452,11 @@ static void add_node_base(ckpool_t *ckp, json_t *val)
json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue");
json_intcpy(&wb->height, val, "height"); json_intcpy(&wb->height, val, "height");
json_strdup(&wb->flags, val, "flags"); json_strdup(&wb->flags, val, "flags");
json_intcpy(&wb->txns, val, "txns"); /* First see if the server uses the old communication format */
if (wb->txns) json_intcpy(&wb->txns, val, "transactions");
if (wb->txns) {
int i;
json_strdup(&wb->txn_data, val, "txn_data"); json_strdup(&wb->txn_data, val, "txn_data");
json_intcpy(&wb->merkles, val, "merkles"); json_intcpy(&wb->merkles, val, "merkles");
wb->merkle_array = json_object_dup(val, "merklehash"); wb->merkle_array = json_object_dup(val, "merklehash");
@ -1371,6 +1464,15 @@ static void add_node_base(ckpool_t *ckp, json_t *val)
strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i)));
hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32);
} }
} else {
json_intcpy(&wb->txns, val, "txns");
txnhashes = json_object_get(val, "txn_hashes");
if (!rebuild_txns(sdata, wb, txnhashes)) {
LOGWARNING("Unable to rebuild transactions from hashes to create workinfo");
free(wb);
return;
}
}
json_strdup(&wb->coinb1, val, "coinb1"); json_strdup(&wb->coinb1, val, "coinb1");
json_intcpy(&wb->coinb1len, val, "coinb1len"); json_intcpy(&wb->coinb1len, val, "coinb1len");
wb->coinb1bin = ckzalloc(wb->coinb1len); wb->coinb1bin = ckzalloc(wb->coinb1len);
@ -5684,7 +5786,33 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const
stratum_send_update(sdata, client_id, true); stratum_send_update(sdata, client_id, true);
} }
static void *set_node_latency(void *arg) /* When a node first connects it has no transactions so we have to send all
* current ones to it. */
static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client)
{
json_t *txn_array, *val, *txn_val;
txntable_t *txn, *tmp;
smsg_t *msg;
txn_array = json_array();
ck_rlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->txns, txn, tmp) {
JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data);
json_array_append_new(txn_array, txn_val);
}
ck_runlock(&sdata->workbase_lock);
JSON_CPACK(val, "{ss,so}", "node.method", stratum_msgs[SM_TRANSACTIONS],
"transaction", txn_array);
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val;
msg->client_id = client->id;
ckmsgq_add(sdata->ssends, msg);
LOGNOTICE("Sending new node client %"PRId64" all transactions", client->id);
}
static void *setup_node(void *arg)
{ {
stratum_instance_t *client = (stratum_instance_t *)arg; stratum_instance_t *client = (stratum_instance_t *)arg;
@ -5693,6 +5821,8 @@ static void *set_node_latency(void *arg)
client->latency = round_trip(client->address) / 2; client->latency = round_trip(client->address) / 2;
LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id, LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id,
client->address, client->latency); client->address, client->latency);
sleep(5);
send_node_all_txns(client->sdata, client);
dec_instance_ref(client->sdata, client); dec_instance_ref(client->sdata, client);
return NULL; return NULL;
} }
@ -5714,7 +5844,7 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id, LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id,
client->address, client->server, ckp->serverurl[client->server]); client->address, client->server, ckp->serverurl[client->server]);
create_pthread(&pth, set_node_latency, client); create_pthread(&pth, setup_node, client);
} }
static void add_remote_server(sdata_t *sdata, stratum_instance_t *client) static void add_remote_server(sdata_t *sdata, stratum_instance_t *client)
@ -6151,6 +6281,45 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
LOGWARNING("unrecognised trusted message %s", buf); LOGWARNING("unrecognised trusted message %s", buf);
} }
static void add_node_txns(sdata_t *sdata, const json_t *val)
{
json_t *txn_array, *txn_val, *data_val, *hash_val;
txntable_t *txn;
int i, arr_size;
int added = 0;
txn_array = json_object_get(val, "transaction");
arr_size = json_array_size(txn_array);
ck_wlock(&sdata->workbase_lock);
for (i = 0; i < arr_size; i++) {
const char *hash, *data;
txn_val = json_array_get(txn_array, i);
data_val = json_object_get(txn_val, "data");
hash_val = json_object_get(txn_val, "hash");
data = json_string_value(data_val);
hash = json_string_value(hash_val);
if (unlikely(!data || !hash)) {
LOGERR("Failed to get hash/data in add_node_txns");
continue;
}
HASH_FIND_STR(sdata->txns, hash, txn);
if (txn)
continue;
txn = ckzalloc(sizeof(txntable_t));
memcpy(txn->hash, hash, 65);
txn->data = strdup(data);
txn->refcount = 10;
HASH_ADD_STR(sdata->txns, hash, txn);
added++;
}
ck_wunlock(&sdata->workbase_lock);
if (added)
LOGINFO("Stratifier added %d node transactions", added);
}
/* Entered with client holding ref count */ /* Entered with client holding ref count */
static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client) static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client)
{ {
@ -6211,6 +6380,9 @@ static void parse_node_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const cha
} }
LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]); LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]);
switch (msg_type) { switch (msg_type) {
case SM_TRANSACTIONS:
add_node_txns(sdata, val);
break;
case SM_WORKINFO: case SM_WORKINFO:
add_node_base(ckp, val); add_node_base(ckp, val);
break; break;
@ -7369,10 +7541,8 @@ int stratifier(proc_instance_t *pi)
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
}
read_poolstats(ckp); read_poolstats(ckp);
cklock_init(&sdata->workbase_lock); cklock_init(&sdata->workbase_lock);

Loading…
Cancel
Save