diff --git a/src/stratifier.c b/src/stratifier.c index 9bf2ab98..442175d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -313,12 +313,10 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; + ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends - ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb - ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations - ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -1644,6 +1642,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } +static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) +{ + int objects, generated; + int64_t memsize; + ckwqmsg_t *wqmsg; + + mutex_lock(ckwq->lock); + DL_COUNT(ckwq->wqmsgs, wqmsg, objects); + generated = ckwq->messages; + mutex_unlock(ckwq->lock); + + memsize = (sizeof(ckwqmsg_t) + size) * objects; + JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); +} + static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -1690,15 +1703,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); + /* Don't know exactly how big the string is so just count the pointer for now */ - ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); - json_set_object(val, "srecvs", subval); + ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); + json_set_object(val, "ckwqs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } - ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); - json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -1706,6 +1718,8 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) return buf; } +static void srecv_process(ckpool_t *ckp, char *buf); + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; @@ -1760,7 +1774,7 @@ retry: /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckmsgq_add(sdata->srecvs, buf); + ckwq_add(sdata->ckwqs, &srecv_process, buf); Close(sockd); buf = NULL; goto retry; @@ -3282,6 +3296,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } +static void sshare_process(ckpool_t *ckp, json_params_t *jp); +static void send_transactions(ckpool_t *ckp, json_params_t *jp); + /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) @@ -3296,7 +3313,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->sshareq, jp); + ckwq_add(sdata->ckwqs, &sshare_process, jp); return; } @@ -3375,7 +3392,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->stxnq, jp); + ckwq_add(sdata->ckwqs, &send_transactions, jp); return; } /* Unhandled message here */ @@ -4391,14 +4408,10 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create half as many share processing threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; - sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - /* Create 1/4 as many stratum processing threads as there are CPUs */ - threads = threads / 2 ? : 1; - sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); + /* Create as many generic workqueue threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN); + sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);