From 10ff10fea085e875e2a30d1d0ab9ff31a8f748f1 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 12 Jan 2017 10:40:07 +1100 Subject: [PATCH] Use a readcount reference count with workbases to avoid holding the workbase lock and recursive locks. --- src/stratifier.c | 115 +++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index b597bc24..9daa03e3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -86,6 +86,10 @@ struct workbase { int64_t id; char idstring[20]; + /* How many readers we currently have of this workbase, set + * under write workbase_lock */ + int readcount; + /* The id a remote workinfo is mapped to locally */ int64_t mapped_id; /* The client id this remote workinfo came from */ @@ -1745,7 +1749,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) ck_wlock(&sdata->workbase_lock); HASH_ITER(hh, sdata->remote_workbases, wb, tmp) { - if (!wb->incomplete) + if (!wb->incomplete || wb->readcount) continue; /* Remove the workbase from the hashlist so we can work on it */ HASH_DEL(sdata->remote_workbases, wb); @@ -1786,6 +1790,8 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) break; if (wb == tmp) continue; + if (tmp->readcount) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->remote_workbases, tmp); @@ -1927,7 +1933,7 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t clie LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); } -/* Calculate share diff and fill in hash and swap */ +/* Calculate share diff and fill in hash and swap. Need to hold workbase read count */ static double share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen) @@ -1982,7 +1988,7 @@ share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const return diff_from_target(hash); } -/* Note recursive lock here - entered with workbase lock held, grabs instance lock */ +/* Entered with workbase readcount, grabs instance lock */ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce, const char *nonce2, const uint32_t ntime32, const int64_t jobid, const double diff, const int64_t client_id) @@ -2032,6 +2038,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } +/* Process a block into a message for the generator to submit. Must hold + * workbase readcount */ static void process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) @@ -2072,6 +2080,45 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i free(gbt_block); } +static workbase_t *get_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); + if (wb) + wb->readcount++; + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id) +{ + workbase_t *wb; + + ck_wlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->remote_workbases, &id, wb); + if (wb) { + if (wb->incomplete) + wb = NULL; + else + wb->readcount++; + } + ck_wunlock(&sdata->workbase_lock); + + return wb; +} + +static void put_workbase(sdata_t *sdata, workbase_t *wb) +{ + ck_wlock(&sdata->workbase_lock); + wb->readcount--; + ck_wunlock(&sdata->workbase_lock); +} + +#define put_remote_workbase(sdata, wb) put_workbase(sdata, wb) + static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) { char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL; @@ -2116,10 +2163,11 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) - goto out_unlock; + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { + LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); + goto out; + } /* Now we have enough to assemble a block */ coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len); enonce1len = wb->enonce1constlen + wb->enonce1varlen; @@ -2144,21 +2192,16 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) "createby", "code", "createcode", __func__, "createinet", ckp->serverurl[0]); -out_unlock: - ck_runlock(&sdata->workbase_lock); + put_workbase(sdata, wb); - if (unlikely(!wb)) - LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id); - else { - block_ckmsg = ckalloc(sizeof(ckmsg_t)); - block_ckmsg->data = json_deep_copy(bval); + block_ckmsg = ckalloc(sizeof(ckmsg_t)); + block_ckmsg->data = json_deep_copy(bval); - mutex_lock(&sdata->block_lock); - DL_APPEND(sdata->block_solves, block_ckmsg); - mutex_unlock(&sdata->block_lock); + mutex_lock(&sdata->block_lock); + DL_APPEND(sdata->block_solves, block_ckmsg); + mutex_unlock(&sdata->block_lock); - ckdbq_add(ckp, ID_BLOCK, bval); - } + ckdbq_add(ckp, ID_BLOCK, bval); out: free(enonce1bin); free(coinbase); @@ -5811,7 +5854,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl json_decref(block_val); } -/* We should already be holding the workbase_lock. Needs to be entered with +/* We should already be holding a wb readcount. Needs to be entered with * client holding a ref count. */ static void test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data, @@ -5876,7 +5919,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc } } -/* Needs to be entered with client holding a ref count. */ +/* Needs to be entered with workbase readcount and client holding a ref count. */ static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2, const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale) { @@ -6039,20 +6082,17 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, share = true; - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->workbases, &id, wb); - if (unlikely(!wb)) { - if (!sdata->current_workbase) { - ck_runlock(&sdata->workbase_lock); + if (unlikely(!sdata->current_workbase)) + return json_boolean(false); - return json_boolean(false); - } + wb = get_workbase(sdata, id); + if (unlikely(!wb)) { id = sdata->current_workbase->id; err = SE_INVALID_JOBID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); strncpy(idstring, job_id, 19); ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); - goto out_unlock; + goto out_nowb; } wdiff = wb->diff; strncpy(idstring, wb->idstring, 19); @@ -6109,14 +6149,15 @@ no_stale: if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) { err = SE_NTIME_INVALID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); - goto out_unlock; + goto out_put; } invalid = false; out_submit: if (sdiff >= wdiff) submit = true; -out_unlock: - ck_runlock(&sdata->workbase_lock); +out_put: + put_workbase(sdata, wb); +out_nowb: /* Accept shares of the old diff until the next update */ if (id < client->diff_change_job_id) @@ -6980,13 +7021,8 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const json_get_int(&cblen, val, "cblen"); json_get_double(&diff, val, "diff"); - if (likely(id && coinbasehex && swaphex && cblen)) { - ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->remote_workbases, &id, wb); - if (wb && wb->incomplete) - wb = NULL; - ck_runlock(&sdata->workbase_lock); - } + if (likely(id && coinbasehex && swaphex && cblen)) + wb = get_remote_workbase(sdata, id); if (unlikely(!wb)) LOGWARNING("Inadequate data locally to attempt submit of remote block"); @@ -7001,6 +7037,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const sha256(swap, 80, hash1); sha256(hash1, 32, hash); process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash); + put_remote_workbase(sdata, wb); } workername = json_string_value(workername_val);