Browse Source

Propagate and submit all new transactions across upstream and remote servers making for similar transaction selection across servers and allowing future local block submissions.

master
Con Kolivas 8 years ago
parent
commit
5606c864d9
  1. 120
      src/stratifier.c

120
src/stratifier.c

@ -1152,8 +1152,8 @@ static void broadcast_ping(sdata_t *sdata);
/* Build a hashlist of all transactions, allowing us to compare with the list of /* Build a hashlist of all transactions, allowing us to compare with the list of
* existing transactions to determine which need to be propagated */ * existing transactions to determine which need to be propagated */
static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char *hash, static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char *hash,
const char *data) const char *data, bool local)
{ {
bool found = false; bool found = false;
txntable_t *txn; txntable_t *txn;
@ -1163,16 +1163,16 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char
ck_rlock(&sdata->workbase_lock); ck_rlock(&sdata->workbase_lock);
HASH_FIND_STR(sdata->txns, hash, txn); HASH_FIND_STR(sdata->txns, hash, txn);
if (txn) { if (txn) {
if (ckp->node) if (!local)
txn->refcount = 100; txn->refcount = 100;
else else if (txn->refcount < 20)
txn->refcount = 20; txn->refcount = 20;
found = true; found = true;
} }
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
if (found) if (found)
return; return false;
txn = ckzalloc(sizeof(txntable_t)); txn = ckzalloc(sizeof(txntable_t));
memcpy(txn->hash, hash, 65); memcpy(txn->hash, hash, 65);
@ -1182,6 +1182,8 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char
else else
txn->refcount = 20; txn->refcount = 20;
HASH_ADD_STR(*txns, hash, txn); HASH_ADD_STR(*txns, hash, txn);
return true;
} }
static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *txn_val) static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *txn_val)
@ -1227,13 +1229,54 @@ static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *
} }
} }
static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool local)
{
json_t *val, *txn_array = json_array();
int added = 0, purged = 0;
txntable_t *tmp, *tmpa;
/* Find which transactions have their refcount decremented to zero
* and remove them. */
ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->txns, tmp, tmpa) {
if (tmp->refcount-- > 0)
continue;
HASH_DEL(sdata->txns, tmp);
dealloc(tmp->data);
dealloc(tmp);
purged++;
}
/* Add the new transactions to the transaction table */
HASH_ITER(hh, txns, tmp, tmpa) {
json_t *txn_val;
/* Propagate transaction here */
JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data);
json_array_append_new(txn_array, txn_val);
/* Move to the sdata transaction table */
HASH_DEL(txns, tmp);
HASH_ADD_STR(sdata->txns, hash, tmp);
added++;
}
ck_wunlock(&sdata->workbase_lock);
JSON_CPACK(val, "{so}", "transaction", txn_array);
send_node_transactions(ckp, sdata, val);
json_decref(val);
if (added || purged) {
LOGINFO("Stratifier added %d %stransactions and purged %d", added,
local ? "" : "remote ", purged);
}
}
/* Distill down a set of transactions into an efficient tree arrangement for /* Distill down a set of transactions into an efficient tree arrangement for
* stratum messages and fast work assembly. */ * stratum messages and fast work assembly. */
static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txn_array) static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txn_array)
{ {
int i, j, binleft, binlen, added = 0, purged = 0; int i, j, binleft, binlen;
txntable_t *txns = NULL, *tmp, *tmpa; txntable_t *txns = NULL;
json_t *arr_val, *val; json_t *arr_val;
uchar *hashbin; uchar *hashbin;
wb->txns = json_array_size(txn_array); wb->txns = json_array_size(txn_array);
@ -1276,7 +1319,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t
return; return;
} }
txn = json_string_value(json_object_get(arr_val, "data")); txn = json_string_value(json_object_get(arr_val, "data"));
add_txn(ckp, sdata, &txns, hash, txn); add_txn(ckp, sdata, &txns, hash, txn, true);
len = strlen(txn); len = strlen(txn);
memcpy(wb->txn_data + ofs, txn, len); memcpy(wb->txn_data + ofs, txn, len);
ofs += len; ofs += len;
@ -1312,39 +1355,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t
} }
LOGNOTICE("Stored %d transactions", wb->txns); LOGNOTICE("Stored %d transactions", wb->txns);
txn_array = json_array(); update_txns(ckp, sdata, txns, true);
/* Find which transactions have their refcount decremented to zero
* and remove them. */
ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->txns, tmp, tmpa) {
if (tmp->refcount-- > 0)
continue;
HASH_DEL(sdata->txns, tmp);
dealloc(tmp->data);
dealloc(tmp);
purged++;
}
/* Add the new transactions to the transaction table */
HASH_ITER(hh, txns, tmp, tmpa) {
json_t *txn_val;
/* Propagate transaction here */
JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data);
json_array_append_new(txn_array, txn_val);
/* Move to the sdata transaction table */
HASH_DEL(txns, tmp);
HASH_ADD_STR(sdata->txns, hash, tmp);
added++;
}
ck_wunlock(&sdata->workbase_lock);
JSON_CPACK(val, "{so}", "transaction", txn_array);
send_node_transactions(ckp, sdata, val);
json_decref(val);
if (added || purged)
LOGINFO("Stratifier added %d transactions and purged %d", added, purged);
} }
static const unsigned char witness_nonce[32] = {0}; static const unsigned char witness_nonce[32] = {0};
@ -6825,7 +6836,7 @@ static void send_remote_pong(sdata_t *sdata, stratum_instance_t *client)
static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val) static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val)
{ {
json_t *txn_array, *txn_val, *data_val, *hash_val; json_t *txn_array, *txn_val, *data_val, *hash_val;
txntable_t *txn; txntable_t *txns = NULL;
int i, arr_size; int i, arr_size;
int added = 0; int added = 0;
@ -6845,33 +6856,16 @@ static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val)
continue; continue;
} }
ck_rlock(&sdata->workbase_lock); if (!add_txn(ckp, sdata, &txns, hash, data, false))
HASH_FIND_STR(sdata->txns, hash, txn);
if (txn) {
txn->refcount = 100;
ck_runlock(&sdata->workbase_lock);
continue; continue;
}
ck_runlock(&sdata->workbase_lock);
/* Submit transactions if we haven't seen them before */
submit_transaction(ckp, data); submit_transaction(ckp, data);
txn = ckzalloc(sizeof(txntable_t));
memcpy(txn->hash, hash, 65);
txn->data = strdup(data);
/* Set the refcount for node transactions greater than the
* upstream pool to ensure we never age them faster than the
* pool does. */
txn->refcount = 100;
ck_wlock(&sdata->workbase_lock);
HASH_ADD_STR(sdata->txns, hash, txn);
ck_wunlock(&sdata->workbase_lock);
added++; added++;
} }
if (added) if (added)
LOGINFO("Stratifier added %d remote transactions", added); update_txns(ckp, sdata, txns, false);
} }
void parse_remote_txns(ckpool_t *ckp, const json_t *val) void parse_remote_txns(ckpool_t *ckp, const json_t *val)

Loading…
Cancel
Save