Browse Source

These weren't the droids we were looking for

master
Con Kolivas 10 years ago
parent
commit
228c2cf19b
  1. 72
      src/ckpool.c
  2. 29
      src/ckpool.h
  3. 82
      src/stratifier.c

72
src/ckpool.c

@ -134,39 +134,6 @@ static void *ckmsg_queue(void *arg)
return NULL; return NULL;
} }
/* Generic workqueue function and message receiving and parsing thread */
static void *ckwq_queue(void *arg)
{
ckwq_t *ckwq = (ckwq_t *)arg;
ckpool_t *ckp = ckwq->ckp;
pthread_detach(pthread_self());
rename_proc(ckwq->name);
while (42) {
ckwqmsg_t *wqmsg;
tv_t now;
ts_t abs;
mutex_lock(ckwq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckwq->wqmsgs)
cond_timedwait(ckwq->cond, ckwq->lock, &abs);
wqmsg = ckwq->wqmsgs;
if (wqmsg)
DL_DELETE(ckwq->wqmsgs, wqmsg);
mutex_unlock(ckwq->lock);
if (!wqmsg)
continue;
wqmsg->func(ckp, wqmsg->data);
free(wqmsg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{ {
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
@ -207,29 +174,6 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
return ckmsgq; return ckmsgq;
} }
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count)
{
ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count);
mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckwq[i].name, 15, "%.6swq%d", name, i);
ckwq[i].ckp = ckp;
ckwq[i].lock = lock;
ckwq[i].cond = cond;
create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]);
}
return ckwq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the /* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */ * ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -241,24 +185,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
mutex_lock(ckmsgq->lock); mutex_lock(ckmsgq->lock);
ckmsgq->messages++; ckmsgq->messages++;
DL_APPEND(ckmsgq->msgs, msg); DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_broadcast(ckmsgq->cond); pthread_cond_signal(ckmsgq->cond);
mutex_unlock(ckmsgq->lock); mutex_unlock(ckmsgq->lock);
} }
void ckwq_add(ckwq_t *ckwq, const void *func, void *data)
{
ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t));
wqmsg->func = func;
wqmsg->data = data;
mutex_lock(ckwq->lock);
ckwq->messages++;
DL_APPEND(ckwq->wqmsgs, wqmsg);
pthread_cond_broadcast(ckwq->cond);
mutex_unlock(ckwq->lock);
}
/* Return whether there are any messages queued in the ckmsgq linked list. */ /* Return whether there are any messages queued in the ckmsgq linked list. */
bool ckmsgq_empty(ckmsgq_t *ckmsgq) bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{ {

29
src/ckpool.h

@ -21,22 +21,13 @@
typedef struct ckpool_instance ckpool_t; typedef struct ckpool_instance ckpool_t;
typedef struct ckmsg ckmsg_t;
struct ckmsg { struct ckmsg {
ckmsg_t *next; struct ckmsg *next;
ckmsg_t *prev; struct ckmsg *prev;
void *data; void *data;
}; };
typedef struct ckwqmsg ckwqmsg_t; typedef struct ckmsg ckmsg_t;
struct ckwqmsg {
ckwqmsg_t *next;
ckwqmsg_t *prev;
void *data;
void (*func)(ckpool_t *, void *);
};
struct ckmsgq { struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
@ -51,18 +42,6 @@ struct ckmsgq {
typedef struct ckmsgq ckmsgq_t; typedef struct ckmsgq ckmsgq_t;
struct ckwq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
mutex_t *lock;
pthread_cond_t *cond;
ckwqmsg_t *wqmsgs;
int64_t messages;
};
typedef struct ckwq ckwq_t;
struct proc_instance { struct proc_instance {
ckpool_t *ckp; ckpool_t *ckp;
unixsock_t us; unixsock_t us;
@ -222,9 +201,7 @@ struct ckpool_instance {
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
void ckwq_add(ckwq_t *ckwq, const void *func, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);
ckpool_t *global_ckp; ckpool_t *global_ckp;

82
src/stratifier.c

@ -363,10 +363,12 @@ struct stratifier_data {
char lasthash[68]; char lasthash[68];
char lastswaphash[68]; char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id; int64_t user_instance_id;
@ -843,21 +845,33 @@ static void send_generator(const ckpool_t *ckp, const char *msg, const int prio)
sdata->gen_priority = 0; sdata->gen_priority = 0;
} }
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(sdata_t *sdata); static void broadcast_ping(sdata_t *sdata);
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template * since checking should have been done earlier, and creates the base template
* for generating work templates. */ * for generating work templates. */
static void do_update(ckpool_t *ckp, int *prio) static void *do_update(void *arg)
{ {
struct update_req *ur = (struct update_req *)arg;
ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
bool new_block = false; bool new_block = false;
int prio = ur->prio;
bool ret = false; bool ret = false;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
char *buf; char *buf;
buf = send_recv_generator(ckp, "getbase", *prio); pthread_detach(pthread_self());
rename_proc("updater");
buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGNOTICE("Get base in update_base delayed due to higher priority request"); LOGNOTICE("Get base in update_base delayed due to higher priority request");
goto out; goto out;
@ -919,17 +933,21 @@ out:
LOGINFO("Broadcast ping due to failed stratum base update"); LOGINFO("Broadcast ping due to failed stratum base update");
broadcast_ping(sdata); broadcast_ping(sdata);
} }
free(buf); dealloc(buf);
free(prio); free(ur->pth);
free(ur);
return NULL;
} }
static void update_base(ckpool_t *ckp, const int prio) static void update_base(ckpool_t *ckp, const int prio)
{ {
int *pprio = ckalloc(sizeof(int)); struct update_req *ur = ckalloc(sizeof(struct update_req));
sdata_t *sdata = ckp->data; pthread_t *pth = ckalloc(sizeof(pthread_t));
*pprio = prio; ur->pth = pth;
ckwq_add(sdata->ckwqs, &do_update, pprio); ur->ckp = ckp;
ur->prio = prio;
create_pthread(pth, do_update, ur);
} }
static void __kill_instance(stratum_instance_t *client) static void __kill_instance(stratum_instance_t *client)
@ -1010,9 +1028,11 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata)
/* Use the same work queues for all subproxies */ /* Use the same work queues for all subproxies */
dsdata->ssends = sdata->ssends; dsdata->ssends = sdata->ssends;
dsdata->ckwqs = sdata->ckwqs; dsdata->srecvs = sdata->srecvs;
dsdata->ckdbq = sdata->ckdbq; dsdata->ckdbq = sdata->ckdbq;
dsdata->sshareq = sdata->sshareq;
dsdata->sauthq = sdata->sauthq; dsdata->sauthq = sdata->sauthq;
dsdata->stxnq = sdata->stxnq;
/* Give the sbuproxy its own workbase list and lock */ /* Give the sbuproxy its own workbase list and lock */
cklock_init(&dsdata->workbase_lock); cklock_init(&dsdata->workbase_lock);
@ -2111,21 +2131,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
} }
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{ {
json_t *val = json_object(), *subval; json_t *val = json_object(), *subval;
@ -2172,14 +2177,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval); json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */ /* Don't know exactly how big the string is so just count the pointer for now */
ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "ckwqs", subval); json_set_object(val, "srecvs", subval);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval); json_set_object(val, "ckdbq", subval);
} }
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val); json_decref(val);
@ -2267,7 +2273,6 @@ static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
lazy_reconnect_client(sdata, client); lazy_reconnect_client(sdata, client);
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
} }
static void srecv_process(ckpool_t *ckp, char *buf);
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
@ -2323,7 +2328,7 @@ retry:
* connector so look for this first. The srecv_process frees * connector so look for this first. The srecv_process frees
* the buf heap ram */ * the buf heap ram */
Close(sockd); Close(sockd);
ckwq_add(sdata->ckwqs, &srecv_process, buf); ckmsgq_add(sdata->srecvs, buf);
buf = NULL; buf = NULL;
goto retry; goto retry;
} }
@ -3931,9 +3936,6 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
} }
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address) json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
@ -3947,7 +3949,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &sshare_process, jp); ckmsgq_add(sdata->sshareq, jp);
return; return;
} }
@ -4026,7 +4028,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) { if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &send_transactions, jp); ckmsgq_add(sdata->stxnq, jp);
return; return;
} }
/* Unhandled message here */ /* Unhandled message here */
@ -5048,10 +5050,14 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock); mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create as many generic workqueue threads as there are CPUs */ /* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN); threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save