Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 8 years ago
parent
commit
fe0549953d
  1. 134
      src/stratifier.c

134
src/stratifier.c

@ -86,6 +86,10 @@ struct workbase {
int64_t id; int64_t id;
char idstring[20]; char idstring[20];
/* How many readers we currently have of this workbase, set
* under write workbase_lock */
int readcount;
/* The id a remote workinfo is mapped to locally */ /* The id a remote workinfo is mapped to locally */
int64_t mapped_id; int64_t mapped_id;
/* The client id this remote workinfo came from */ /* The client id this remote workinfo came from */
@ -465,6 +469,9 @@ struct stratifier_data {
uint64_t enonce1_64; uint64_t enonce1_64;
/* For protecting the txntable data */
cklock_t txn_lock;
/* For protecting the hashtable data */ /* For protecting the hashtable data */
cklock_t workbase_lock; cklock_t workbase_lock;
@ -1126,6 +1133,8 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
break; break;
if (wb == tmp) if (wb == tmp)
continue; continue;
if (wb->readcount)
continue;
/* Age old workbases older than 10 minutes old */ /* Age old workbases older than 10 minutes old */
if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) {
HASH_DEL(sdata->workbases, tmp); HASH_DEL(sdata->workbases, tmp);
@ -1169,7 +1178,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char
/* Look for transactions we already know about and increment their /* Look for transactions we already know about and increment their
* refcount if we're still using them. */ * refcount if we're still using them. */
ck_rlock(&sdata->workbase_lock); ck_wlock(&sdata->txn_lock);
HASH_FIND_STR(sdata->txns, hash, txn); HASH_FIND_STR(sdata->txns, hash, txn);
if (txn) { if (txn) {
if (!local) if (!local)
@ -1178,7 +1187,7 @@ static bool add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char
txn->refcount = REFCOUNT_LOCAL; txn->refcount = REFCOUNT_LOCAL;
txn->seen = found = true; txn->seen = found = true;
} }
ck_runlock(&sdata->workbase_lock); ck_wunlock(&sdata->txn_lock);
if (found) if (found)
return false; return false;
@ -1277,7 +1286,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo
/* Find which transactions have their refcount decremented to zero /* Find which transactions have their refcount decremented to zero
* and remove them. */ * and remove them. */
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->txn_lock);
HASH_ITER(hh, sdata->txns, tmp, tmpa) { HASH_ITER(hh, sdata->txns, tmp, tmpa) {
json_t *txn_val; json_t *txn_val;
@ -1313,7 +1322,7 @@ static void update_txns(ckpool_t *ckp, sdata_t *sdata, txntable_t *txns, bool lo
HASH_ADD_STR(sdata->txns, hash, tmp); HASH_ADD_STR(sdata->txns, hash, tmp);
added++; added++;
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->txn_lock);
if (added) { if (added) {
JSON_CPACK(val, "{so}", "transaction", txn_array); JSON_CPACK(val, "{so}", "transaction", txn_array);
@ -1680,7 +1689,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
memcpy(hash, hashes + i * 65, 64); memcpy(hash, hashes + i * 65, 64);
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->txn_lock);
HASH_FIND_STR(sdata->txns, hash, txn); HASH_FIND_STR(sdata->txns, hash, txn);
if (likely(txn)) { if (likely(txn)) {
txn->refcount = REFCOUNT_REMOTE; txn->refcount = REFCOUNT_REMOTE;
@ -1689,7 +1698,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
"hash", hash, "data", txn->data); "hash", hash, "data", txn->data);
json_array_append_new(txn_array, txn_val); json_array_append_new(txn_array, txn_val);
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->txn_lock);
if (likely(txn_val)) if (likely(txn_val))
continue; continue;
@ -1703,7 +1712,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
} }
/* We've found it, let's add it to the table */ /* We've found it, let's add it to the table */
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->txn_lock);
/* One last check in case it got added while we dropped the lock */ /* One last check in case it got added while we dropped the lock */
HASH_FIND_STR(sdata->txns, hash, txn); HASH_FIND_STR(sdata->txns, hash, txn);
if (likely(!txn)) { if (likely(!txn)) {
@ -1719,7 +1728,7 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
"hash", hash, "data", txn->data); "hash", hash, "data", txn->data);
json_array_append_new(txn_array, txn_val); json_array_append_new(txn_array, txn_val);
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->txn_lock);
} }
if (ret) { if (ret) {
@ -1750,7 +1759,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_ITER(hh, sdata->remote_workbases, wb, tmp) { HASH_ITER(hh, sdata->remote_workbases, wb, tmp) {
if (!wb->incomplete) if (!wb->incomplete || wb->readcount)
continue; continue;
/* Remove the workbase from the hashlist so we can work on it */ /* Remove the workbase from the hashlist so we can work on it */
HASH_DEL(sdata->remote_workbases, wb); HASH_DEL(sdata->remote_workbases, wb);
@ -1791,6 +1800,8 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
break; break;
if (wb == tmp) if (wb == tmp)
continue; continue;
if (tmp->readcount)
continue;
/* Age old workbases older than 10 minutes old */ /* Age old workbases older than 10 minutes old */
if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) {
HASH_DEL(sdata->remote_workbases, tmp); HASH_DEL(sdata->remote_workbases, tmp);
@ -1930,7 +1941,7 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t clie
LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); LOGNOTICE("Block hash changed to %s", sdata->lastswaphash);
} }
/* Calculate share diff and fill in hash and swap */ /* Calculate share diff and fill in hash and swap. Need to hold workbase read count */
static double static double
share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2, share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const char *nonce2,
const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen) const uint32_t ntime32, const char *nonce, uchar *hash, uchar *swap, int *cblen)
@ -1985,7 +1996,7 @@ share_diff(char *coinbase, const uchar *enonce1bin, const workbase_t *wb, const
return diff_from_target(hash); return diff_from_target(hash);
} }
/* Note recursive lock here - entered with workbase lock held, grabs instance lock */ /* Entered with workbase readcount, grabs instance lock */
static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce, static void send_node_block(sdata_t *sdata, const char *enonce1, const char *nonce,
const char *nonce2, const uint32_t ntime32, const int64_t jobid, const char *nonce2, const uint32_t ntime32, const int64_t jobid,
const double diff, const int64_t client_id) const double diff, const int64_t client_id)
@ -2035,6 +2046,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
} }
} }
/* Process a block into a message for the generator to submit. Must hold
* workbase readcount */
static void static void
process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen,
const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) const uchar *data, const uchar *hash, uchar *swap32, char *blockhash)
@ -2075,6 +2088,45 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i
free(gbt_block); free(gbt_block);
} }
static workbase_t *get_workbase(sdata_t *sdata, const int64_t id)
{
workbase_t *wb;
ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->workbases, &id, wb);
if (wb)
wb->readcount++;
ck_wunlock(&sdata->workbase_lock);
return wb;
}
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id)
{
workbase_t *wb;
ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
if (wb) {
if (wb->incomplete)
wb = NULL;
else
wb->readcount++;
}
ck_wunlock(&sdata->workbase_lock);
return wb;
}
static void put_workbase(sdata_t *sdata, workbase_t *wb)
{
ck_wlock(&sdata->workbase_lock);
wb->readcount--;
ck_wunlock(&sdata->workbase_lock);
}
#define put_remote_workbase(sdata, wb) put_workbase(sdata, wb)
static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
{ {
char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL; char *coinbase = NULL, *enonce1 = NULL, *nonce = NULL, *nonce2 = NULL;
@ -2119,10 +2171,11 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
ts_realtime(&ts_now); ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
ck_rlock(&sdata->workbase_lock); wb = get_workbase(sdata, id);
HASH_FIND_I64(sdata->workbases, &id, wb); if (unlikely(!wb)) {
if (unlikely(!wb)) LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id);
goto out_unlock; goto out;
}
/* Now we have enough to assemble a block */ /* Now we have enough to assemble a block */
coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len); coinbase = ckalloc(wb->coinb1len + wb->enonce1constlen + wb->enonce1varlen + wb->enonce2varlen + wb->coinb2len);
enonce1len = wb->enonce1constlen + wb->enonce1varlen; enonce1len = wb->enonce1constlen + wb->enonce1varlen;
@ -2147,12 +2200,8 @@ static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", ckp->serverurl[0]); "createinet", ckp->serverurl[0]);
out_unlock: put_workbase(sdata, wb);
ck_runlock(&sdata->workbase_lock);
if (unlikely(!wb))
LOGWARNING("Failed to find workbase with jobid %"PRId64" in node method block", id);
else {
block_ckmsg = ckalloc(sizeof(ckmsg_t)); block_ckmsg = ckalloc(sizeof(ckmsg_t));
block_ckmsg->data = json_deep_copy(bval); block_ckmsg->data = json_deep_copy(bval);
@ -2161,7 +2210,6 @@ out_unlock:
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
ckdbq_add(ckp, ID_BLOCK, bval); ckdbq_add(ckp, ID_BLOCK, bval);
}
out: out:
free(enonce1bin); free(enonce1bin);
free(coinbase); free(coinbase);
@ -3086,6 +3134,7 @@ static void free_proxy(proxy_t *proxy)
} }
mutex_unlock(&dsdata->share_lock); mutex_unlock(&dsdata->share_lock);
/* Do we need to check readcount here if freeing the proxy? */
ck_wlock(&dsdata->workbase_lock); ck_wlock(&dsdata->workbase_lock);
HASH_ITER(hh, dsdata->workbases, wb, tmpwb) { HASH_ITER(hh, dsdata->workbases, wb, tmpwb) {
HASH_DEL(dsdata->workbases, wb); HASH_DEL(dsdata->workbases, wb);
@ -5814,7 +5863,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl
json_decref(block_val); json_decref(block_val);
} }
/* We should already be holding the workbase_lock. Needs to be entered with /* We should already be holding a wb readcount. Needs to be entered with
* client holding a ref count. */ * client holding a ref count. */
static void static void
test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data, test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uchar *data,
@ -5879,7 +5928,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc
} }
} }
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with workbase readcount and client holding a ref count. */
static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2, static double submission_diff(const stratum_instance_t *client, const workbase_t *wb, const char *nonce2,
const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale) const uint32_t ntime32, const char *nonce, uchar *hash, const bool stale)
{ {
@ -6042,20 +6091,17 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
share = true; share = true;
ck_rlock(&sdata->workbase_lock); if (unlikely(!sdata->current_workbase))
HASH_FIND_I64(sdata->workbases, &id, wb);
if (unlikely(!wb)) {
if (!sdata->current_workbase) {
ck_runlock(&sdata->workbase_lock);
return json_boolean(false); return json_boolean(false);
}
wb = get_workbase(sdata, id);
if (unlikely(!wb)) {
id = sdata->current_workbase->id; id = sdata->current_workbase->id;
err = SE_INVALID_JOBID; err = SE_INVALID_JOBID;
json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
strncpy(idstring, job_id, 19); strncpy(idstring, job_id, 19);
ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir);
goto out_unlock; goto out_nowb;
} }
wdiff = wb->diff; wdiff = wb->diff;
strncpy(idstring, wb->idstring, 19); strncpy(idstring, wb->idstring, 19);
@ -6112,14 +6158,15 @@ no_stale:
if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) { if (ntime32 < wb->ntime32 || ntime32 > wb->ntime32 + 7000) {
err = SE_NTIME_INVALID; err = SE_NTIME_INVALID;
json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
goto out_unlock; goto out_put;
} }
invalid = false; invalid = false;
out_submit: out_submit:
if (sdiff >= wdiff) if (sdiff >= wdiff)
submit = true; submit = true;
out_unlock: out_put:
ck_runlock(&sdata->workbase_lock); put_workbase(sdata, wb);
out_nowb:
/* Accept shares of the old diff until the next update */ /* Accept shares of the old diff until the next update */
if (id < client->diff_change_job_id) if (id < client->diff_change_job_id)
@ -6391,12 +6438,12 @@ static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client)
txn_array = json_array(); txn_array = json_array();
ck_rlock(&sdata->workbase_lock); ck_rlock(&sdata->txn_lock);
HASH_ITER(hh, sdata->txns, txn, tmp) { HASH_ITER(hh, sdata->txns, txn, tmp) {
JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data); JSON_CPACK(txn_val, "{ss,ss}", "hash", txn->hash, "data", txn->data);
json_array_append_new(txn_array, txn_val); json_array_append_new(txn_array, txn_val);
} }
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->txn_lock);
if (client->trusted) { if (client->trusted) {
JSON_CPACK(val, "{ss,so}", "method", stratum_msgs[SM_TRANSACTIONS], JSON_CPACK(val, "{ss,so}", "method", stratum_msgs[SM_TRANSACTIONS],
@ -6983,13 +7030,8 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
json_get_int(&cblen, val, "cblen"); json_get_int(&cblen, val, "cblen");
json_get_double(&diff, val, "diff"); json_get_double(&diff, val, "diff");
if (likely(id && coinbasehex && swaphex && cblen)) { if (likely(id && coinbasehex && swaphex && cblen))
ck_rlock(&sdata->workbase_lock); wb = get_remote_workbase(sdata, id);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
if (wb && wb->incomplete)
wb = NULL;
ck_runlock(&sdata->workbase_lock);
}
if (unlikely(!wb)) if (unlikely(!wb))
LOGWARNING("Inadequate data locally to attempt submit of remote block"); LOGWARNING("Inadequate data locally to attempt submit of remote block");
@ -7004,6 +7046,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
sha256(swap, 80, hash1); sha256(swap, 80, hash1);
sha256(hash1, 32, hash); sha256(hash1, 32, hash);
process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash); process_block(ckp, wb, coinbase, cblen, swap, hash, swap32, blockhash);
put_remote_workbase(sdata, wb);
} }
workername = json_string_value(workername_val); workername = json_string_value(workername_val);
@ -7091,7 +7134,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes)
int found = 0; int found = 0;
size_t index; size_t index;
ck_rlock(&sdata->workbase_lock); ck_rlock(&sdata->txn_lock);
json_array_foreach(hashes, index, arr_val) { json_array_foreach(hashes, index, arr_val) {
const char *hash = json_string_value(arr_val); const char *hash = json_string_value(arr_val);
json_t *txn_val; json_t *txn_val;
@ -7105,7 +7148,7 @@ static json_t *get_hash_transactions(sdata_t *sdata, const json_t *hashes)
json_array_append_new(txn_array, txn_val); json_array_append_new(txn_array, txn_val);
found++; found++;
} }
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->txn_lock);
return txn_array; return txn_array;
} }
@ -8538,6 +8581,7 @@ void *stratifier(void *arg)
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
read_poolstats(ckp); read_poolstats(ckp);
cklock_init(&sdata->txn_lock);
cklock_init(&sdata->workbase_lock); cklock_init(&sdata->workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)
create_pthread(&pth_blockupdate, blockupdate, ckp); create_pthread(&pth_blockupdate, blockupdate, ckp);

Loading…
Cancel
Save