Browse Source

Use a readcount reference count with workbases to avoid holding the workbase lock and recursive locks.

master
ckolivas 8 years ago
parent
commit
10ff10fea0
  1. 103
      src/stratifier.c

103
src/stratifier.c

@ -86,6 +86,10 @@ struct workbase {
int64_t id; int64_t id;
char idstring[20]; char idstring[20];
/* How many readers we currently have of this workbase, set
* under write workbase_lock */
int readcount;
/* The id a remote workinfo is mapped to locally */ /* The id a remote workinfo is mapped to locally */
int64_t mapped_id; int64_t mapped_id;
/* The client id this remote workinfo came from */ /* The client id this remote workinfo came from */
@ -1745,7 +1749,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->remote_workbases, wb, tmp) { HASH_ITER(hh, sdata->remote_workbases, wb, tmp) {
if (!wb->incomplete) if (!wb->incomplete || wb->readcount)
continue; continue;
/* Remove the workbase from the hashlist so we can work on it */ /* Remove the workbase from the hashlist so we can work on it */
HASH_DEL(sdata->remote_workbases, wb); HASH_DEL(sdata->remote_workbases, wb);
@ -1786,6 +1790,8 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
break; break;
if (wb == tmp) if (wb == tmp)
continue; continue;
if (tmp->readcount)
continue;
/* Age old workbases older than 10 minutes old */ /* Age old workbases older than 10 minutes old */
if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) {
HASH_DEL(sdata->remote_workbases, tmp); HASH_DEL(sdata->remote_workbases, tmp);
@ -1927,7 +1933,7 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t clie
LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); LOGNOTICE("Block hash changed to %s", sdata->lastswaphash);
} }
/* Calculate share diff and fill in hash and swap */ /* Calculate share diff and fill in hash and swap. Need to hold workbase read count */
static double static double
share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2, share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2,
const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen) const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen)
@ -1982,7 +1988,7 @@ share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const
return diff_from_target(hash); return diff_from_target(hash);
} }
/* Note recursive lock here - entered with workbase lock held, grabs instance lock */ /* Entered with workbase readcount, grabs instance lock */
static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce, static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce,
const char *nonce2, const uint32_t ntime32, const int64_t jobid, const char *nonce2, const uint32_t ntime32, const int64_t jobid,
const double diff, const int64_t client_id) const double diff, const int64_t client_id)
@ -2032,6 +2038,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
} }
} }
/* Process a block into a message for the generator to submit. Must hold
* workbase readcount */
static void static void
process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen,
const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) const uchar *data, const uchar *hash, uchar *swap32, char *blockhash)
@ -2072,6 +2080,45 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i
free(gbt_block); free(gbt_block);
} }
static workbase_t *get_workbase(sdata_t *sdata, const int64_t id)
{
workbase_t *wb;
ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->workbases, &id, wb);
if (wb)
wb->readcount++;
ck_wunlock(&sdata->workbase_lock);
return wb;
}
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id)
{
workbase_t *wb;
ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
if (wb) {
if (wb->incomplete)
wb = NULL;
else
wb->readcount++;
}
ck_wunlock(&sdata->workbase_lock);
return wb;
}
static void put_workbase(sdata_t *sdata, workbase_t *wb)
{
ck_wlock(&sdata->workbase_lock);
wb->readcount--;
ck_wunlock(&sdata->workbase_lock);
}
#define put_remote_workbase(sdata, wb) put_workbase(sdata, wb)
static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
{ {
char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL; char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL;
@ -2116,10 +2163,11 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
ts_realtime(&ts_now); ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
ck_rlock(&sdata->workbase_lock); wb = get_workbase(sdata, id);
HASH_FIND_I64(sdata->workbases, &id, wb); if (unlikely(!wb)) {
if (unlikely(!wb)) LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id);
goto out_unlock; goto out;
}
/* Now we have enough to assemble a block */ /* Now we have enough to assemble a block */
coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len); coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len);
enonce1len = wb->enonce1constlen + wb->enonce1varlen; enonce1len = wb->enonce1constlen + wb->enonce1varlen;
@ -2144,12 +2192,8 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", ckp->serverurl[0]); "createinet", ckp->serverurl[0]);
out_unlock: put_workbase(sdata, wb);
ck_runlock(&sdata->workbase_lock);
if (unlikely(!wb))
LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id);
else {
block_ckmsg = ckalloc(sizeof(ckmsg_t)); block_ckmsg = ckalloc(sizeof(ckmsg_t));
block_ckmsg->data = json_deep_copy(bval); block_ckmsg->data = json_deep_copy(bval);
@ -2158,7 +2202,6 @@ out_unlock:
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
ckdbq_add(ckp, ID_BLOCK, bval); ckdbq_add(ckp, ID_BLOCK, bval);
}
out: out:
free(enonce1bin); free(enonce1bin);
free(coinbase); free(coinbase);
@ -5811,7 +5854,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl
json_decref(block_val); json_decref(block_val);
} }
/* We should already be holding the workbase_lock. Needs to be entered with /* We should already be holding a wb readcount. Needs to be entered with
* client holding a ref count. */ * client holding a ref count. */
static void static void
test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data, test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data,
@ -5876,7 +5919,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc
} }
} }
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with workbase readcount and client holding a ref count. */
static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2, static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2,
const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale) const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale)
{ {
@ -6039,20 +6082,17 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
share = true; share = true;
ck_rlock(&sdata->workbase_lock); if (unlikely(!sdata->current_workbase))
HASH_FIND_I64(sdata->workbases, &id, wb);
if (unlikely(!wb)) {
if (!sdata->current_workbase) {
ck_runlock(&sdata->workbase_lock);
return json_boolean(false); return json_boolean(false);
}
wb = get_workbase(sdata, id);
if (unlikely(!wb)) {
id = sdata->current_workbase->id; id = sdata->current_workbase->id;
err = SE_INVALID_JOBID; err = SE_INVALID_JOBID;
json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
strncpy(idstring, job_id, 19); strncpy(idstring, job_id, 19);
ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir);
goto out_unlock; goto out_nowb;
} }
wdiff = wb->diff; wdiff = wb->diff;
strncpy(idstring, wb->idstring, 19); strncpy(idstring, wb->idstring, 19);
@ -6109,14 +6149,15 @@ no_stale:
if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) { if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) {
err = SE_NTIME_INVALID; err = SE_NTIME_INVALID;
json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
goto out_unlock; goto out_put;
} }
invalid = false; invalid = false;
out_submit: out_submit:
if (sdiff >= wdiff) if (sdiff >= wdiff)
submit = true; submit = true;
out_unlock: out_put:
ck_runlock(&sdata->workbase_lock); put_workbase(sdata, wb);
out_nowb:
/* Accept shares of the old diff until the next update */ /* Accept shares of the old diff until the next update */
if (id < client->diff_change_job_id) if (id < client->diff_change_job_id)
@ -6980,13 +7021,8 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
json_get_int(&cblen, val, "cblen"); json_get_int(&cblen, val, "cblen");
json_get_double(&diff, val, "diff"); json_get_double(&diff, val, "diff");
if (likely(id && coinbasehex && swaphex && cblen)) { if (likely(id && coinbasehex && swaphex && cblen))
ck_rlock(&sdata->workbase_lock); wb = get_remote_workbase(sdata, id);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
if (wb && wb->incomplete)
wb = NULL;
ck_runlock(&sdata->workbase_lock);
}
if (unlikely(!wb)) if (unlikely(!wb))
LOGWARNING("Inadequate data locally to attempt submit of remote block"); LOGWARNING("Inadequate data locally to attempt submit of remote block");
@ -7001,6 +7037,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
sha256(swap, 80, hash1); sha256(swap, 80, hash1);
sha256(hash1, 32, hash); sha256(hash1, 32, hash);
process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash); process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash);
put_remote_workbase(sdata, wb);
} }
workername = json_string_value(workername_val); workername = json_string_value(workername_val);

Loading…
Cancel
Save