diff --git a/src/stratifier.c b/src/stratifier.c index a877a076..364615e7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1152,8 +1152,8 @@ static void broadcast_ping(sdata_t *sdata); /* Build a hashlist of all transactions, allowing us to compare with the list of * existing transactions to determine which need to be propagated */ -static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char *hash, - const char *data) +static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char *hash, + const char *data, bool local) { bool found = false; txntable_t *txn; @@ -1163,16 +1163,16 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char ck_rlock(&sdata->workbase_lock); HASH_FIND_STR(sdata->txns, hash, txn); if (txn) { - if (ckp->node) + if (!local) txn->refcount = 100; - else + else if (txn->refcount < 20) txn->refcount = 20; found = true; } ck_runlock(&sdata->workbase_lock); if (found) - return; + return false; txn = ckzalloc(sizeof(txntable_t)); memcpy(txn->hash, hash, 65); @@ -1182,6 +1182,8 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char else txn->refcount = 20; HASH_ADD_STR(*txns, hash, txn); + + return true; } static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *txn_val) @@ -1227,13 +1229,54 @@ static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t * } } +static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool local) +{ + json_t *val, *txn_array = json_array(); + int added = 0, purged = 0; + txntable_t *tmp, *tmpa; + + /* Find which transactions have their refcount decremented to zero + * and remove them. */ + ck_wlock(&sdata->workbase_lock); + HASH_ITER(hh, sdata->txns, tmp, tmpa) { + if (tmp->refcount-- > 0) + continue; + HASH_DEL(sdata->txns, tmp); + dealloc(tmp->data); + dealloc(tmp); + purged++; + } + /* Add the new transactions to the transaction table */ + HASH_ITER(hh, txns, tmp, tmpa) { + json_t *txn_val; + + /* Propagate transaction here */ + JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data); + json_array_append_new(txn_array, txn_val); + /* Move to the sdata transaction table */ + HASH_DEL(txns, tmp); + HASH_ADD_STR(sdata->txns, hash, tmp); + added++; + } + ck_wunlock(&sdata->workbase_lock); + + JSON_CPACK(val, "{so}", "transaction", txn_array); + send_node_transactions(ckp, sdata, val); + json_decref(val); + + if (added || purged) { + LOGINFO("Stratifier added %d %stransactions and purged %d", added, + local ? "" : "remote ", purged); + } +} + /* Distill down a set of transactions into an efficient tree arrangement for * stratum messages and fast work assembly. */ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txn_array) { - int i, j, binleft, binlen, added = 0, purged = 0; - txntable_t *txns = NULL, *tmp, *tmpa; - json_t *arr_val, *val; + int i, j, binleft, binlen; + txntable_t *txns = NULL; + json_t *arr_val; uchar *hashbin; wb->txns = json_array_size(txn_array); @@ -1276,7 +1319,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t return; } txn = json_string_value(json_object_get(arr_val, "data")); - add_txn(ckp, sdata, &txns, hash, txn); + add_txn(ckp, sdata, &txns, hash, txn, true); len = strlen(txn); memcpy(wb->txn_data + ofs, txn, len); ofs += len; @@ -1312,39 +1355,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t } LOGNOTICE("Stored %d transactions", wb->txns); - txn_array = json_array(); - - /* Find which transactions have their refcount decremented to zero - * and remove them. */ - ck_wlock(&sdata->workbase_lock); - HASH_ITER(hh, sdata->txns, tmp, tmpa) { - if (tmp->refcount-- > 0) - continue; - HASH_DEL(sdata->txns, tmp); - dealloc(tmp->data); - dealloc(tmp); - purged++; - } - /* Add the new transactions to the transaction table */ - HASH_ITER(hh, txns, tmp, tmpa) { - json_t *txn_val; - - /* Propagate transaction here */ - JSON_CPACK(txn_val, "{ss,ss}", "hash", tmp->hash, "data", tmp->data); - json_array_append_new(txn_array, txn_val); - /* Move to the sdata transaction table */ - HASH_DEL(txns, tmp); - HASH_ADD_STR(sdata->txns, hash, tmp); - added++; - } - ck_wunlock(&sdata->workbase_lock); - - JSON_CPACK(val, "{so}", "transaction", txn_array); - send_node_transactions(ckp, sdata, val); - json_decref(val); - - if (added || purged) - LOGINFO("Stratifier added %d transactions and purged %d", added, purged); + update_txns(ckp, sdata, txns, true); } static const unsigned char witness_nonce[32] = {0}; @@ -6825,7 +6836,7 @@ static void send_remote_pong(sdata_t *sdata, stratum_instance_t *client) static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val) { json_t *txn_array, *txn_val, *data_val, *hash_val; - txntable_t *txn; + txntable_t *txns = NULL; int i, arr_size; int added = 0; @@ -6845,33 +6856,16 @@ static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val) continue; } - ck_rlock(&sdata->workbase_lock); - HASH_FIND_STR(sdata->txns, hash, txn); - if (txn) { - txn->refcount = 100; - ck_runlock(&sdata->workbase_lock); + if (!add_txn(ckp, sdata, &txns, hash, data, false)) continue; - } - ck_runlock(&sdata->workbase_lock); + /* Submit transactions if we haven't seen them before */ submit_transaction(ckp, data); - txn = ckzalloc(sizeof(txntable_t)); - memcpy(txn->hash, hash, 65); - txn->data = strdup(data); - /* Set the refcount for node transactions greater than the - * upstream pool to ensure we never age them faster than the - * pool does. */ - txn->refcount = 100; - - ck_wlock(&sdata->workbase_lock); - HASH_ADD_STR(sdata->txns, hash, txn); - ck_wunlock(&sdata->workbase_lock); - added++; } if (added) - LOGINFO("Stratifier added %d remote transactions", added); + update_txns(ckp, sdata, txns, false); } void parse_remote_txns(ckpool_t *ckp, const json_t *val)