From 10ff10fea085e875e2a30d1d0ab9ff31a8f748f1 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 12 Jan 2017 10:40:07 +1100 Subject: [PATCH 1/3] Use a readcount reference count with workbases to avoid holding the workbase lock and recursive locks. --- src/stratifier.c | 115 +++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index b597bc24..9daa03e3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -86,6 +86,10 @@ struct workbase { int64_t id; char idstring[20]; + /* How many readers we currently have of this workbase, set + * under write workbase_lock */ + int readcount; + /* The id a remote workinfo is mapped to locally */ int64_t mapped_id; /* The client id this remote workinfo came from */ @@ -1745,7 +1749,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->remote_workbases, wb, tmp) { - if (!wb->incomplete) + if (!wb->incomplete || wb->readcount) continue; /* Remove the workbase from the hashlist so we can work on it */ HASH_DEL(sdata->remote_workbases, wb); @@ -1786,6 +1790,8 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) break; if (wb == tmp) continue; + if (tmp->readcount) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->remote_workbases, tmp); @@ -1927,7 +1933,7 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t clie LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); } -/* Calculate share diff and fill in hash and swap */ +/* Calculate share diff and fill in hash and swap. Need to hold workbase read count */ static double share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen) @@ -1982,7 +1988,7 @@ share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const return diff_from_target(hash); } -/* Note recursive lock here - entered with workbase lock held, grabs instance lock */ +/* Entered with workbase readcount, grabs instance lock */ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce, const char *nonce2, const uint32_t ntime32, const int64_t jobid, const double diff, const int64_t client_id) @@ -2032,6 +2038,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } +/* Process a block into a message for the generator to submit. Must hold + * workbase readcount */ static void process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) @@ -2072,6 +2080,45 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i free(gbt_block); } +static workbase_t *get_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); + if (wb) + wb->readcount++; + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->remote_workbases, &id, wb); + if (wb) { + if (wb->incomplete) + wb = NULL; + else + wb->readcount++; + } + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static void put_workbase(sdata_t *sdata, workbase_t *wb) +{ + ck_wlock(&sdata->workbase_lock); + wb->readcount--; + ck_wunlock(&sdata->workbase_lock); +} + +#define put_remote_workbase(sdata, wb) put_workbase(sdata, wb) + static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) { char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL; @@ -2116,10 +2163,11 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) - goto out_unlock; + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { + LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); + goto out; + } /* Now we have enough to assemble a block */ coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len); enonce1len = wb->enonce1constlen + wb->enonce1varlen; @@ -2144,21 +2192,16 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) "createby", "code", "createcode", __func__, "createinet", ckp->serverurl[0]); -out_unlock: - ck_runlock(&sdata->workbase_lock); + put_workbase(sdata, wb); - if (unlikely(!wb)) - LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); - else { - block_ckmsg = ckalloc(sizeof(ckmsg_t)); - block_ckmsg->data = json_deep_copy(bval); + block_ckmsg = ckalloc(sizeof(ckmsg_t)); + block_ckmsg->data = json_deep_copy(bval); - mutex_lock(&sdata->block_lock); - DL_APPEND(sdata->block_solves, block_ckmsg); - mutex_unlock(&sdata->block_lock); + mutex_lock(&sdata->block_lock); + DL_APPEND(sdata->block_solves, block_ckmsg); + mutex_unlock(&sdata->block_lock); - ckdbq_add(ckp, ID_BLOCK, bval); - } + ckdbq_add(ckp, ID_BLOCK, bval); out: free(enonce1bin); free(coinbase); @@ -5811,7 +5854,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl json_decref(block_val); } -/* We should already be holding the workbase_lock. Needs to be entered with +/* We should already be holding a wb readcount. Needs to be entered with * client holding a ref count. */ static void test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data, @@ -5876,7 +5919,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc } } -/* Needs to be entered with client holding a ref count. */ +/* Needs to be entered with workbase readcount and client holding a ref count. */ static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale) { @@ -6039,20 +6082,17 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, share = true; - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) { - if (!sdata->current_workbase) { - ck_runlock(&sdata->workbase_lock); + if (unlikely(!sdata->current_workbase)) + return json_boolean(false); - return json_boolean(false); - } + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { id = sdata->current_workbase->id; err = SE_INVALID_JOBID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); strncpy(idstring, job_id, 19); ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); - goto out_unlock; + goto out_nowb; } wdiff = wb->diff; strncpy(idstring, wb->idstring, 19); @@ -6109,14 +6149,15 @@ no_stale: if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) { err = SE_NTIME_INVALID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); - goto out_unlock; + goto out_put; } invalid = false; out_submit: if (sdiff >= wdiff) submit = true; -out_unlock: - ck_runlock(&sdata->workbase_lock); +out_put: + put_workbase(sdata, wb); +out_nowb: /* Accept shares of the old diff until the next update */ if (id < client->diff_change_job_id) @@ -6980,13 +7021,8 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const json_get_int(&cblen, val, "cblen"); json_get_double(&diff, val, "diff"); - if (likely(id && coinbasehex && swaphex && cblen)) { - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->remote_workbases, &id, wb); - if (wb && wb->incomplete) - wb = NULL; - ck_runlock(&sdata->workbase_lock); - } + if (likely(id && coinbasehex && swaphex && cblen)) + wb = get_remote_workbase(sdata, id); if (unlikely(!wb)) LOGWARNING("Inadequate data locally to attempt submit of remote block"); @@ -7001,6 +7037,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const sha256(swap, 80, hash1); sha256(hash1, 32, hash); process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash); + put_remote_workbase(sdata, wb); } workername = json_string_value(workername_val); From d5f727637c50a123141810dd076197b9d6cb2692 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 12 Jan 2017 10:46:46 +1100 Subject: [PATCH 2/3] Check for readcount in ageing as well. --- src/stratifier.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 9daa03e3..29e9b7b8 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1130,6 +1130,8 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl break; if (wb == tmp) continue; + if (wb->readcount) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->workbases, tmp); From f1a0be6994ce445c13a8e7ff8f643d876a2972ed Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 12 Jan 2017 11:09:23 +1100 Subject: [PATCH 3/3] Use a separate lock for the transaction table. --- src/stratifier.c | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 29e9b7b8..32f7f35c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -469,6 +469,9 @@ struct stratifier_data { uint64_t enonce1_64; + /* For protecting the txntable data */ + cklock_t txn_lock; + /* For protecting the hashtable data */ cklock_t workbase_lock; @@ -1175,7 +1178,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char /* Look for transactions we already know about and increment their * refcount if we're still using them. */ - ck_rlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_FIND_STR(sdata->txns, hash, txn); if (txn) { if (!local) @@ -1184,7 +1187,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char txn->refcount = REFCOUNT_LOCAL; txn->seen = found = true; } - ck_runlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (found) return false; @@ -1283,7 +1286,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo /* Find which transactions have their refcount decremented to zero * and remove them. */ - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_ITER(hh, sdata->txns, tmp, tmpa) { json_t *txn_val; @@ -1319,7 +1322,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo HASH_ADD_STR(sdata->txns, hash, tmp); added++; } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (added) { JSON_CPACK(val, "{so}", "transaction", txn_array); @@ -1681,7 +1684,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) memcpy(hash, hashes + i * 65, 64); - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); HASH_FIND_STR(sdata->txns, hash, txn); if (likely(txn)) { txn->refcount = REFCOUNT_REMOTE; @@ -1690,7 +1693,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) "hash", hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); if (likely(txn_val)) continue; @@ -1704,7 +1707,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) } /* We've found it, let's add it to the table */ - ck_wlock(&sdata->workbase_lock); + ck_wlock(&sdata->txn_lock); /* One last check in case it got added while we dropped the lock */ HASH_FIND_STR(sdata->txns, hash, txn); if (likely(!txn)) { @@ -1720,7 +1723,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) "hash", hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->txn_lock); } if (ret) { @@ -3128,6 +3131,7 @@ static void free_proxy(proxy_t *proxy) } mutex_unlock(&dsdata->share_lock); + /* Do we need to check readcount here if freeing the proxy? */ ck_wlock(&dsdata->workbase_lock); HASH_ITER(hh, dsdata->workbases, wb, tmpwb) { HASH_DEL(dsdata->workbases, wb); @@ -6431,12 +6435,12 @@ static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client) txn_array = json_array(); - ck_rlock(&sdata->workbase_lock); + ck_rlock(&sdata->txn_lock); HASH_ITER(hh, sdata->txns, txn, tmp) { JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data); json_array_append_new(txn_array, txn_val); } - ck_runlock(&sdata->workbase_lock); + ck_runlock(&sdata->txn_lock); if (client->trusted) { JSON_CPACK(val, "{ss,so}", "method", stratum_msgs[SM_TRANSACTIONS], @@ -7127,7 +7131,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes) int found = 0; size_t index; - ck_rlock(&sdata->workbase_lock); + ck_rlock(&sdata->txn_lock); json_array_foreach(hashes, index, arr_val) { const char *hash = json_string_value(arr_val); json_t *txn_val; @@ -7141,7 +7145,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes) json_array_append_new(txn_array, txn_val); found++; } - ck_runlock(&sdata->workbase_lock); + ck_runlock(&sdata->txn_lock); return txn_array; } @@ -8574,6 +8578,7 @@ void *stratifier(void *arg) create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); + cklock_init(&sdata->txn_lock); cklock_init(&sdata->workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp);