Browse Source

Use a single ckmsgq for all calls to update_base, avoiding recruiting threads for each call and guaranteeing serialised calls.

master
Con Kolivas 8 years ago
parent
commit
97b83028ae
  1. 70
      src/stratifier.c

70
src/stratifier.c

@ -480,6 +480,7 @@ struct stratifier_data {
char lasthash[68]; char lasthash[68];
char lastswaphash[68]; char lastswaphash[68];
ckmsgq_t *updateq; // Generator base work updates
ckmsgq_t *ssends; // Stratum sends ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb ckmsgq_t *ckdbq; // ckdb
@ -1170,12 +1171,6 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
sdata->gen_priority = 0; sdata->gen_priority = 0;
} }
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(sdata_t *sdata); static void broadcast_ping(sdata_t *sdata);
/* Build a hashlist of all transactions, allowing us to compare with the list of /* Build a hashlist of all transactions, allowing us to compare with the list of
@ -1429,37 +1424,23 @@ static void gbt_witness_data(workbase_t *wb, json_t *txn_array)
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template * since checking should have been done earlier, and creates the base template
* for generating work templates. */ * for generating work templates. This is a ckmsgq so all uses of this function
static void *do_update(void *arg) * are serialised. */
static void block_update(ckpool_t *ckp, int *prio)
{ {
struct update_req *ur = (struct update_req *)arg;
json_t *val, *txn_array, *rules_array; json_t *val, *txn_array, *rules_array;
const char* witnessdata_check, *rule; const char* witnessdata_check, *rule;
int i, prio = ur->prio, retries = 0;
ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
bool new_block = false; bool new_block = false;
int i, retries = 0;
bool ret = false; bool ret = false;
workbase_t *wb; workbase_t *wb;
time_t now_t; time_t now_t;
pthread_detach(pthread_self());
rename_proc("updater");
/* Serialise access to getbase to avoid out of order new block notifies */
if (prio < GEN_PRIORITY) {
/* Don't queue another routine update if one is already in
* progress. */
if (cksem_trywait(&sdata->update_sem)) {
LOGINFO("Skipped lowprio update base");
goto out_free;
}
} else
cksem_wait(&sdata->update_sem);
retry: retry:
val = generator_genbase(ckp); val = generator_genbase(ckp);
if (unlikely(!val)) { if (unlikely(!val)) {
if (retries++ < 5 || prio == GEN_PRIORITY) { if (retries++ < 5 || *prio == GEN_PRIORITY) {
LOGWARNING("Generator returned failure in update_base, retry #%d", retries); LOGWARNING("Generator returned failure in update_base, retry #%d", retries);
goto retry; goto retry;
} }
@ -1536,10 +1517,7 @@ out:
LOGINFO("Broadcast ping due to failed stratum base update"); LOGINFO("Broadcast ping due to failed stratum base update");
broadcast_ping(sdata); broadcast_ping(sdata);
} }
out_free: free(prio);
free(ur->pth);
free(ur);
return NULL;
} }
static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txnhashes) static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txnhashes)
@ -1963,15 +1941,26 @@ out:
free(enonce1); free(enonce1);
} }
static void update_base(ckpool_t *ckp, const int prio) static void update_base(sdata_t *sdata, const int prio)
{ {
struct update_req *ur = ckalloc(sizeof(struct update_req)); int *uprio;
pthread_t *pth = ckalloc(sizeof(pthread_t));
/* All uses of block_update are serialised so if we have more
* update_base calls waiting there is no point servicing them unless
* they are high priority. */
if (prio < GEN_PRIORITY) {
/* Don't queue another routine update if one is already in
* progress. */
if (cksem_trywait(&sdata->update_sem)) {
LOGINFO("Skipped lowprio update base");
return;
}
} else
cksem_wait(&sdata->update_sem);
ur->pth = pth; uprio = ckalloc(sizeof(int));
ur->ckp = ckp; *uprio = prio;
ur->prio = prio; ckmsgq_add(sdata->updateq, NULL);
create_pthread(pth, do_update, ur);
} }
/* Instead of removing the client instance, we add it to a list of recycled /* Instead of removing the client instance, we add it to a list of recycled
@ -3415,7 +3404,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
json_t *val; json_t *val;
if (!ckp->node) if (!ckp->node)
update_base(ckp, GEN_PRIORITY); update_base(sdata, GEN_PRIORITY);
ts_realtime(&ts_now); ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
@ -4251,7 +4240,7 @@ retry:
if (!ckp->proxy) { if (!ckp->proxy) {
LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", LOGDEBUG("%ds elapsed in strat_loop, updating gbt base",
ckp->update_interval); ckp->update_interval);
update_base(ckp, GEN_NORMAL); update_base(sdata, GEN_NORMAL);
} else if (!ckp->passthrough) { } else if (!ckp->passthrough) {
LOGDEBUG("%ds elapsed in strat_loop, pinging miners", LOGDEBUG("%ds elapsed in strat_loop, pinging miners",
ckp->update_interval); ckp->update_interval);
@ -4351,7 +4340,7 @@ retry:
Close(umsg->sockd); Close(umsg->sockd);
LOGDEBUG("Stratifier received request: %s", buf); LOGDEBUG("Stratifier received request: %s", buf);
if (cmdmatch(buf, "update")) { if (cmdmatch(buf, "update")) {
update_base(ckp, GEN_PRIORITY); update_base(sdata, GEN_PRIORITY);
} else if (cmdmatch(buf, "subscribe")) { } else if (cmdmatch(buf, "subscribe")) {
/* Proxifier has a new subscription */ /* Proxifier has a new subscription */
update_subscribe(ckp, buf); update_subscribe(ckp, buf);
@ -4418,7 +4407,7 @@ static void *blockupdate(void *arg)
if (buf && cmdmatch(buf, "notify")) if (buf && cmdmatch(buf, "notify"))
cksleep_ms(5000); cksleep_ms(5000);
else if (buf && strcmp(buf, sdata->lastswaphash) && !cmdmatch(buf, "failed")) else if (buf && strcmp(buf, sdata->lastswaphash) && !cmdmatch(buf, "failed"))
update_base(ckp, GEN_PRIORITY); update_base(sdata, GEN_PRIORITY);
else else
cksleep_ms(ckp->blockpoll); cksleep_ms(ckp->blockpoll);
} }
@ -8165,6 +8154,7 @@ void *stratifier(void *arg)
/* Create half as many share processing and receiving threads as there /* Create half as many share processing and receiving threads as there
* are CPUs */ * are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->updateq = create_ckmsgq(ckp, "updater", &block_update);
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads); sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);

Loading…
Cancel
Save