Browse Source

Create a pool of workqueue threads for use by the stratifier using them for share processing, stratum receiving and transaction processing

Conflicts:
	src/stratifier.c
master
Con Kolivas 10 years ago
parent
commit
e40d560f57
  1. 46
      src/stratifier.c

46
src/stratifier.c

@ -365,12 +365,10 @@ struct stratifier_data {
char lasthash[68]; char lasthash[68];
char lastswaphash[68]; char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id; int64_t user_instance_id;
@ -2053,6 +2051,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
} }
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{ {
json_t *val = json_object(), *subval; json_t *val = json_object(), *subval;
@ -2099,15 +2112,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval); json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */ /* Don't know exactly how big the string is so just count the pointer for now */
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval);
json_set_object(val, "srecvs", subval); json_set_object(val, "ckwqs", subval);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval); json_set_object(val, "ckdbq", subval);
} }
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val); json_decref(val);
@ -2226,6 +2238,7 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
lazy_reconnect_client(sdata, client); lazy_reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
} }
static void srecv_process(ckpool_t *ckp, char *buf);
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
@ -2281,7 +2294,7 @@ retry:
* connector so look for this first. The srecv_process frees * connector so look for this first. The srecv_process frees
* the buf heap ram */ * the buf heap ram */
Close(sockd); Close(sockd);
ckmsgq_add(sdata->srecvs, buf); ckwq_add(sdata->ckwqs, &srecv_process, buf);
buf = NULL; buf = NULL;
goto retry; goto retry;
} }
@ -3891,6 +3904,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
} }
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address) json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
@ -3904,7 +3920,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp); ckwq_add(sdata->ckwqs, &sshare_process, jp);
return; return;
} }
@ -3983,7 +3999,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) { if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp); ckwq_add(sdata->ckwqs, &send_transactions, jp);
return; return;
} }
/* Unhandled message here */ /* Unhandled message here */
@ -5005,14 +5021,10 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock); mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create half as many share processing threads as there are CPUs */ /* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; threads = sysconf(_SC_NPROCESSORS_ONLN);
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save