Browse Source

Create a pool of workqueue threads for use by the stratifier using them for share processing, stratum receiving and transaction processing

master
Con Kolivas 10 years ago
parent
commit
2d94b18b99
  1. 47
      src/stratifier.c

47
src/stratifier.c

@ -313,12 +313,10 @@ struct stratifier_data {
char lasthash[68];
char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id;
@ -1644,6 +1642,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{
json_t *val = json_object(), *subval;
@ -1690,15 +1703,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "srecvs", subval);
ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval);
json_set_object(val, "ckwqs", subval);
if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
}
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
@ -1706,6 +1718,8 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
return buf;
}
static void srecv_process(ckpool_t *ckp, char *buf);
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{
int sockd, ret = 0, selret = 0;
@ -1760,7 +1774,7 @@ retry:
/* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees
* the buf heap ram */
ckmsgq_add(sdata->srecvs, buf);
ckwq_add(sdata->ckwqs, &srecv_process, buf);
Close(sockd);
buf = NULL;
goto retry;
@ -3282,6 +3296,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client);
}
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
@ -3296,7 +3313,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp);
ckwq_add(sdata->ckwqs, &sshare_process, jp);
return;
}
@ -3375,7 +3392,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp);
ckwq_add(sdata->ckwqs, &send_transactions, jp);
return;
}
/* Unhandled message here */
@ -4391,14 +4408,10 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
/* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN);
sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save