diff --git a/src/stratifier.c b/src/stratifier.c index 7fd4ae16..dcffe6a8 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -480,6 +480,7 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; + ckmsgq_t *updateq; // Generator base work updates ckmsgq_t *ssends; // Stratum sends ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb @@ -1170,12 +1171,6 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) sdata->gen_priority = 0; } -struct update_req { - pthread_t *pth; - ckpool_t *ckp; - int prio; -}; - static void broadcast_ping(sdata_t *sdata); /* Build a hashlist of all transactions, allowing us to compare with the list of @@ -1429,37 +1424,23 @@ static void gbt_witness_data(workbase_t *wb, json_t *txn_array) /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template - * for generating work templates. */ -static void *do_update(void *arg) + * for generating work templates. This is a ckmsgq so all uses of this function + * are serialised. */ +static void block_update(ckpool_t *ckp, int *prio) { - struct update_req *ur = (struct update_req *)arg; json_t *val, *txn_array, *rules_array; const char* witnessdata_check, *rule; - int i, prio = ur->prio, retries = 0; - ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->sdata; bool new_block = false; + int i, retries = 0; bool ret = false; workbase_t *wb; time_t now_t; - pthread_detach(pthread_self()); - rename_proc("updater"); - - /* Serialise access to getbase to avoid out of order new block notifies */ - if (prio < GEN_PRIORITY) { - /* Don't queue another routine update if one is already in - * progress. */ - if (cksem_trywait(&sdata->update_sem)) { - LOGINFO("Skipped lowprio update base"); - goto out_free; - } - } else - cksem_wait(&sdata->update_sem); retry: val = generator_genbase(ckp); if (unlikely(!val)) { - if (retries++ < 5 || prio == GEN_PRIORITY) { + if (retries++ < 5 || *prio == GEN_PRIORITY) { LOGWARNING("Generator returned failure in update_base, retry #%d", retries); goto retry; } @@ -1536,10 +1517,7 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } -out_free: - free(ur->pth); - free(ur); - return NULL; + free(prio); } static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *txnhashes) @@ -1963,15 +1941,26 @@ out: free(enonce1); } -static void update_base(ckpool_t *ckp, const int prio) +static void update_base(sdata_t *sdata, const int prio) { - struct update_req *ur = ckalloc(sizeof(struct update_req)); - pthread_t *pth = ckalloc(sizeof(pthread_t)); + int *uprio; + + /* All uses of block_update are serialised so if we have more + * update_base calls waiting there is no point servicing them unless + * they are high priority. */ + if (prio < GEN_PRIORITY) { + /* Don't queue another routine update if one is already in + * progress. */ + if (cksem_trywait(&sdata->update_sem)) { + LOGINFO("Skipped lowprio update base"); + return; + } + } else + cksem_wait(&sdata->update_sem); - ur->pth = pth; - ur->ckp = ckp; - ur->prio = prio; - create_pthread(pth, do_update, ur); + uprio = ckalloc(sizeof(int)); + *uprio = prio; + ckmsgq_add(sdata->updateq, NULL); } /* Instead of removing the client instance, we add it to a list of recycled @@ -3415,7 +3404,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) json_t *val; if (!ckp->node) - update_base(ckp, GEN_PRIORITY); + update_base(sdata, GEN_PRIORITY); ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); @@ -4251,7 +4240,7 @@ retry: if (!ckp->proxy) { LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", ckp->update_interval); - update_base(ckp, GEN_NORMAL); + update_base(sdata, GEN_NORMAL); } else if (!ckp->passthrough) { LOGDEBUG("%ds elapsed in strat_loop, pinging miners", ckp->update_interval); @@ -4351,7 +4340,7 @@ retry: Close(umsg->sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "update")) { - update_base(ckp, GEN_PRIORITY); + update_base(sdata, GEN_PRIORITY); } else if (cmdmatch(buf, "subscribe")) { /* Proxifier has a new subscription */ update_subscribe(ckp, buf); @@ -4418,7 +4407,7 @@ static void *blockupdate(void *arg) if (buf && cmdmatch(buf, "notify")) cksleep_ms(5000); else if (buf && strcmp(buf, sdata->lastswaphash) && !cmdmatch(buf, "failed")) - update_base(ckp, GEN_PRIORITY); + update_base(sdata, GEN_PRIORITY); else cksleep_ms(ckp->blockpoll); } @@ -8165,6 +8154,7 @@ void *stratifier(void *arg) /* Create half as many share processing and receiving threads as there * are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sdata->updateq = create_ckmsgq(ckp, "updater", &block_update); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);