diff --git a/src/stratifier.c b/src/stratifier.c index c13d13bb..5c8148a9 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -86,6 +86,10 @@ struct workbase { int64_t id; char idstring[20]; + /* How many readers we currently have of this workbase, set + * under write workbase_lock */ + int readcount; + /* The id a remote workinfo is mapped to locally */ int64_t mapped_id; /* The client id this remote workinfo came from */ @@ -465,6 +469,9 @@ struct stratifier_data { uint64_t enonce1_64; + /* For protecting the txntable data */ + cklock_t txn_lock; + /* For protecting the hashtable data */ cklock_t workbase_lock; @@ -1126,6 +1133,8 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl break; if (wb == tmp) continue; + if (wb->readcount) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->workbases, tmp); @@ -1169,7 +1178,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char /* Look for transactions we already know about and increment their * refcount if we're still using them. */ - ck_rlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_FIND_STR(sdata->txns, hash, txn); if (txn) { if (!local) @@ -1178,7 +1187,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char txn->refcount = REFCOUNT_LOCAL; txn->seen = found = true; } - ck_runlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (found) return false; @@ -1277,7 +1286,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo /* Find which transactions have their refcount decremented to zero * and remove them. */ - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_ITER(hh, sdata->txns, tmp, tmpa) { json_t *txn_val; @@ -1313,7 +1322,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo HASH_ADD_STR(sdata->txns, hash, tmp); added++; } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (added) { JSON_CPACK(val, "{so}", "transaction", txn_array); @@ -1680,7 +1689,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) memcpy(hash, hashes + i * 65, 64); - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_FIND_STR(sdata->txns, hash, txn); if (likely(txn)) { txn->refcount = REFCOUNT_REMOTE; @@ -1689,7 +1698,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) "hash", hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (likely(txn_val)) continue; @@ -1703,7 +1712,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) } /* We've found it, let's add it to the table */ - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); /* One last check in case it got added while we dropped the lock */ HASH_FIND_STR(sdata->txns, hash, txn); if (likely(!txn)) { @@ -1719,7 +1728,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) "hash", hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); } if (ret) { @@ -1750,7 +1759,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->remote_workbases, wb, tmp) { - if (!wb->incomplete) + if (!wb->incomplete || wb->readcount) continue; /* Remove the workbase from the hashlist so we can work on it */ HASH_DEL(sdata->remote_workbases, wb); @@ -1791,6 +1800,8 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) break; if (wb == tmp) continue; + if (tmp->readcount) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->remote_workbases, tmp); @@ -1930,7 +1941,7 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t clie LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); } -/* Calculate share diff and fill in hash and swap */ +/* Calculate share diff and fill in hash and swap. Need to hold workbase read count */ static double share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen) @@ -1985,7 +1996,7 @@ share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const return diff_from_target(hash); } -/* Note recursive lock here - entered with workbase lock held, grabs instance lock */ +/* Entered with workbase readcount, grabs instance lock */ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce, const char *nonce2, const uint32_t ntime32, const int64_t jobid, const double diff, const int64_t client_id) @@ -2035,6 +2046,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } +/* Process a block into a message for the generator to submit. Must hold + * workbase readcount */ static void process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) @@ -2075,6 +2088,45 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i free(gbt_block); } +static workbase_t *get_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); + if (wb) + wb->readcount++; + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->remote_workbases, &id, wb); + if (wb) { + if (wb->incomplete) + wb = NULL; + else + wb->readcount++; + } + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static void put_workbase(sdata_t *sdata, workbase_t *wb) +{ + ck_wlock(&sdata->workbase_lock); + wb->readcount--; + ck_wunlock(&sdata->workbase_lock); +} + +#define put_remote_workbase(sdata, wb) put_workbase(sdata, wb) + static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) { char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL; @@ -2119,10 +2171,11 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) - goto out_unlock; + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { + LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); + goto out; + } /* Now we have enough to assemble a block */ coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len); enonce1len = wb->enonce1constlen + wb->enonce1varlen; @@ -2147,21 +2200,16 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) "createby", "code", "createcode", __func__, "createinet", ckp->serverurl[0]); -out_unlock: - ck_runlock(&sdata->workbase_lock); + put_workbase(sdata, wb); - if (unlikely(!wb)) - LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); - else { - block_ckmsg = ckalloc(sizeof(ckmsg_t)); - block_ckmsg->data = json_deep_copy(bval); + block_ckmsg = ckalloc(sizeof(ckmsg_t)); + block_ckmsg->data = json_deep_copy(bval); - mutex_lock(&sdata->block_lock); - DL_APPEND(sdata->block_solves, block_ckmsg); - mutex_unlock(&sdata->block_lock); + mutex_lock(&sdata->block_lock); + DL_APPEND(sdata->block_solves, block_ckmsg); + mutex_unlock(&sdata->block_lock); - ckdbq_add(ckp, ID_BLOCK, bval); - } + ckdbq_add(ckp, ID_BLOCK, bval); out: free(enonce1bin); free(coinbase); @@ -3086,6 +3134,7 @@ static void free_proxy(proxy_t *proxy) } mutex_unlock(&dsdata->share_lock); + /* Do we need to check readcount here if freeing the proxy? */ ck_wlock(&dsdata->workbase_lock); HASH_ITER(hh, dsdata->workbases, wb, tmpwb) { HASH_DEL(dsdata->workbases, wb); @@ -5814,7 +5863,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl json_decref(block_val); } -/* We should already be holding the workbase_lock. Needs to be entered with +/* We should already be holding a wb readcount. Needs to be entered with * client holding a ref count. */ static void test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data, @@ -5879,7 +5928,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc } } -/* Needs to be entered with client holding a ref count. */ +/* Needs to be entered with workbase readcount and client holding a ref count. */ static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale) { @@ -6042,20 +6091,17 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, share = true; - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) { - if (!sdata->current_workbase) { - ck_runlock(&sdata->workbase_lock); + if (unlikely(!sdata->current_workbase)) + return json_boolean(false); - return json_boolean(false); - } + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { id = sdata->current_workbase->id; err = SE_INVALID_JOBID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); strncpy(idstring, job_id, 19); ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); - goto out_unlock; + goto out_nowb; } wdiff = wb->diff; strncpy(idstring, wb->idstring, 19); @@ -6112,14 +6158,15 @@ no_stale: if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) { err = SE_NTIME_INVALID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); - goto out_unlock; + goto out_put; } invalid = false; out_submit: if (sdiff >= wdiff) submit = true; -out_unlock: - ck_runlock(&sdata->workbase_lock); +out_put: + put_workbase(sdata, wb); +out_nowb: /* Accept shares of the old diff until the next update */ if (id < client->diff_change_job_id) @@ -6391,12 +6438,12 @@ static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client) txn_array = json_array(); - ck_rlock(&sdata->workbase_lock); + ck_rlock(&sdata->txn_lock); HASH_ITER(hh, sdata->txns, txn, tmp) { JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_runlock(&sdata->workbase_lock); + ck_runlock(&sdata->txn_lock); if (client->trusted) { JSON_CPACK(val, "{ss,so}", "method", stratum_msgs[SM_TRANSACTIONS], @@ -6983,13 +7030,8 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const json_get_int(&cblen, val, "cblen"); json_get_double(&diff, val, "diff"); - if (likely(id && coinbasehex && swaphex && cblen)) { - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->remote_workbases, &id, wb); - if (wb && wb->incomplete) - wb = NULL; - ck_runlock(&sdata->workbase_lock); - } + if (likely(id && coinbasehex && swaphex && cblen)) + wb = get_remote_workbase(sdata, id); if (unlikely(!wb)) LOGWARNING("Inadequate data locally to attempt submit of remote block"); @@ -7004,6 +7046,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const sha256(swap, 80, hash1); sha256(hash1, 32, hash); process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash); + put_remote_workbase(sdata, wb); } workername = json_string_value(workername_val); @@ -7091,7 +7134,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes) int found = 0; size_t index; - ck_rlock(&sdata->workbase_lock); + ck_rlock(&sdata->txn_lock); json_array_foreach(hashes, index, arr_val) { const char *hash = json_string_value(arr_val); json_t *txn_val; @@ -7105,7 +7148,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes) json_array_append_new(txn_array, txn_val); found++; } - ck_runlock(&sdata->workbase_lock); + ck_runlock(&sdata->txn_lock); return txn_array; } @@ -8538,6 +8581,7 @@ void *stratifier(void *arg) create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); + cklock_init(&sdata->txn_lock); cklock_init(&sdata->workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp);