diff --git a/src/ckpool.c b/src/ckpool.c index 63593b21..484eda48 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -134,39 +134,6 @@ static void *ckmsg_queue(void *arg) return NULL; } -/* Generic workqueue function and message receiving and parsing thread */ -static void *ckwq_queue(void *arg) -{ - ckwq_t *ckwq = (ckwq_t *)arg; - ckpool_t *ckp = ckwq->ckp; - - pthread_detach(pthread_self()); - rename_proc(ckwq->name); - - while (42) { - ckwqmsg_t *wqmsg; - tv_t now; - ts_t abs; - - mutex_lock(ckwq->lock); - tv_time(&now); - tv_to_ts(&abs, &now); - abs.tv_sec++; - if (!ckwq->wqmsgs) - cond_timedwait(ckwq->cond, ckwq->lock, &abs); - wqmsg = ckwq->wqmsgs; - if (wqmsg) - DL_DELETE(ckwq->wqmsgs, wqmsg); - mutex_unlock(ckwq->lock); - - if (!wqmsg) - continue; - wqmsg->func(ckp, wqmsg->data); - free(wqmsg); - } - return NULL; -} - ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -207,29 +174,6 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons return ckmsgq; } -ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) -{ - ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count); - mutex_t *lock; - pthread_cond_t *cond; - int i; - - lock = ckalloc(sizeof(mutex_t)); - cond = ckalloc(sizeof(pthread_cond_t)); - mutex_init(lock); - cond_init(cond); - - for (i = 0; i < count; i++) { - snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); - ckwq[i].ckp = ckp; - ckwq[i].lock = lock; - ckwq[i].cond = cond; - create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); - } - - return ckwq; -} - /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -241,24 +185,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_broadcast(ckmsgq->cond); + pthread_cond_signal(ckmsgq->cond); mutex_unlock(ckmsgq->lock); } -void ckwq_add(ckwq_t *ckwq, const void *func, void *data) -{ - ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); - - wqmsg->func = func; - wqmsg->data = data; - - mutex_lock(ckwq->lock); - ckwq->messages++; - DL_APPEND(ckwq->wqmsgs, wqmsg); - pthread_cond_broadcast(ckwq->cond); - mutex_unlock(ckwq->lock); -} - /* Return whether there are any messages queued in the ckmsgq linked list. */ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { diff --git a/src/ckpool.h b/src/ckpool.h index d840528e..e0e0f947 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -21,22 +21,13 @@ typedef struct ckpool_instance ckpool_t; -typedef struct ckmsg ckmsg_t; - struct ckmsg { - ckmsg_t *next; - ckmsg_t *prev; + struct ckmsg *next; + struct ckmsg *prev; void *data; }; -typedef struct ckwqmsg ckwqmsg_t; - -struct ckwqmsg { - ckwqmsg_t *next; - ckwqmsg_t *prev; - void *data; - void (*func)(ckpool_t *, void *); -}; +typedef struct ckmsg ckmsg_t; struct ckmsgq { ckpool_t *ckp; @@ -51,18 +42,6 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; -struct ckwq { - ckpool_t *ckp; - char name[16]; - pthread_t pth; - mutex_t *lock; - pthread_cond_t *cond; - ckwqmsg_t *wqmsgs; - int64_t messages; -}; - -typedef struct ckwq ckwq_t; - struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -222,9 +201,7 @@ struct ckpool_instance { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); -ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); -void ckwq_add(ckwq_t *ckwq, const void *func, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); ckpool_t *global_ckp; diff --git a/src/stratifier.c b/src/stratifier.c index f09494ad..ee29c978 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -363,10 +363,12 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; - ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends + ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb + ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations + ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -843,21 +845,33 @@ static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) sdata->gen_priority = 0; } +struct update_req { + pthread_t *pth; + ckpool_t *ckp; + int prio; +}; + static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void do_update(ckpool_t *ckp, int *prio) +static void *do_update(void *arg) { + struct update_req *ur = (struct update_req *)arg; + ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; + int prio = ur->prio; bool ret = false; workbase_t *wb; json_t *val; char *buf; - buf = send_recv_generator(ckp, "getbase", *prio); + pthread_detach(pthread_self()); + rename_proc("updater"); + + buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; @@ -919,17 +933,21 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } - free(buf); - free(prio); + dealloc(buf); + free(ur->pth); + free(ur); + return NULL; } static void update_base(ckpool_t *ckp, const int prio) { - int *pprio = ckalloc(sizeof(int)); - sdata_t *sdata = ckp->data; + struct update_req *ur = ckalloc(sizeof(struct update_req)); + pthread_t *pth = ckalloc(sizeof(pthread_t)); - *pprio = prio; - ckwq_add(sdata->ckwqs, &do_update, pprio); + ur->pth = pth; + ur->ckp = ckp; + ur->prio = prio; + create_pthread(pth, do_update, ur); } static void __kill_instance(stratum_instance_t *client) @@ -1010,9 +1028,11 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata) /* Use the same work queues for all subproxies */ dsdata->ssends = sdata->ssends; - dsdata->ckwqs = sdata->ckwqs; + dsdata->srecvs = sdata->srecvs; dsdata->ckdbq = sdata->ckdbq; + dsdata->sshareq = sdata->sshareq; dsdata->sauthq = sdata->sauthq; + dsdata->stxnq = sdata->stxnq; /* Give the sbuproxy its own workbase list and lock */ cklock_init(&dsdata->workbase_lock); @@ -2111,21 +2131,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } -static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) -{ - int objects, generated; - int64_t memsize; - ckwqmsg_t *wqmsg; - - mutex_lock(ckwq->lock); - DL_COUNT(ckwq->wqmsgs, wqmsg, objects); - generated = ckwq->messages; - mutex_unlock(ckwq->lock); - - memsize = (sizeof(ckwqmsg_t) + size) * objects; - JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); -} - static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -2172,14 +2177,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); - /* Don't know exactly how big the string is so just count the pointer for now */ - ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); - json_set_object(val, "ckwqs", subval); + ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); + json_set_object(val, "srecvs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } + ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); + json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -2267,7 +2273,6 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) lazy_reconnect_client(sdata, client); dec_instance_ref(sdata, client); } -static void srecv_process(ckpool_t *ckp, char *buf); static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { @@ -2323,7 +2328,7 @@ retry: * connector so look for this first. The srecv_process frees * the buf heap ram */ Close(sockd); - ckwq_add(sdata->ckwqs, &srecv_process, buf); + ckmsgq_add(sdata->srecvs, buf); buf = NULL; goto retry; } @@ -3931,9 +3936,6 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } -static void sshare_process(ckpool_t *ckp, json_params_t *jp); -static void send_transactions(ckpool_t *ckp, json_params_t *jp); - /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) @@ -3947,7 +3949,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &sshare_process, jp); + ckmsgq_add(sdata->sshareq, jp); return; } @@ -4026,7 +4028,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &send_transactions, jp); + ckmsgq_add(sdata->stxnq, jp); return; } /* Unhandled message here */ @@ -5048,10 +5050,14 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create as many generic workqueue threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN); - sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + /* Create half as many share processing threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); + /* Create 1/4 as many stratum processing threads as there are CPUs */ + threads = threads / 2 ? : 1; + sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);