Browse Source

These weren't the droids we were looking for.

master
Con Kolivas 10 years ago
parent
commit
d4304de798
  1. 138
      src/ckpool.c
  2. 33
      src/ckpool.h
  3. 10
      src/connector.c
  4. 37
      src/generator.c
  5. 95
      src/stratifier.c

138
src/ckpool.c

@ -134,39 +134,6 @@ static void *ckmsg_queue(void *arg)
return NULL; return NULL;
} }
/* Generic workqueue function and message receiving and parsing thread */
static void *ckwq_queue(void *arg)
{
ckwq_t *ckwq = (ckwq_t *)arg;
ckpool_t *ckp = ckwq->ckp;
pthread_detach(pthread_self());
rename_proc(ckwq->name);
while (42) {
ckwqmsg_t *wqmsg;
tv_t now;
ts_t abs;
mutex_lock(ckwq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckwq->wqmsgs)
cond_timedwait(ckwq->cond, ckwq->lock, &abs);
wqmsg = ckwq->wqmsgs;
if (wqmsg)
DL_DELETE(ckwq->wqmsgs, wqmsg);
mutex_unlock(ckwq->lock);
if (!wqmsg)
continue;
wqmsg->func(ckp, wqmsg->data);
free(wqmsg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{ {
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
@ -207,29 +174,6 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
return ckmsgq; return ckmsgq;
} }
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count)
{
ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count);
mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckwq[i].name, 15, "%.6swq%d", name, i);
ckwq[i].ckp = ckp;
ckwq[i].lock = lock;
ckwq[i].cond = cond;
create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]);
}
return ckwq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the /* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */ * ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -241,24 +185,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
mutex_lock(ckmsgq->lock); mutex_lock(ckmsgq->lock);
ckmsgq->messages++; ckmsgq->messages++;
DL_APPEND(ckmsgq->msgs, msg); DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_broadcast(ckmsgq->cond); pthread_cond_signal(ckmsgq->cond);
mutex_unlock(ckmsgq->lock); mutex_unlock(ckmsgq->lock);
} }
void ckwq_add(ckwq_t *ckwq, const void *func, void *data)
{
ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t));
wqmsg->func = func;
wqmsg->data = data;
mutex_lock(ckwq->lock);
ckwq->messages++;
DL_APPEND(ckwq->wqmsgs, wqmsg);
pthread_cond_broadcast(ckwq->cond);
mutex_unlock(ckwq->lock);
}
/* Return whether there are any messages queued in the ckmsgq linked list. */ /* Return whether there are any messages queued in the ckmsgq linked list. */
bool ckmsgq_empty(ckmsgq_t *ckmsgq) bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{ {
@ -568,14 +498,32 @@ out:
static void childsighandler(const int sig); static void childsighandler(const int sig);
/* Send a single message to a process instance when there will be no response, struct proc_message {
* closing the socket immediately. */ proc_instance_t *pi;
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) char *msg;
const char *file;
const char *func;
int line;
};
/* Send all one way messages asynchronously so we can wait till the receiving
* end closes the socket to ensure all messages are received but no deadlocks
* can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
{ {
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path; char *path = pi->us.path;
bool ret = false; bool ret = false;
int sockd; int sockd;
pthread_detach(pthread_self());
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out; goto out;
@ -586,14 +534,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
} }
/* At startup the pid fields are not set up before some processes are /* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */ * forked so they never inherit them. */
if (unlikely(!pi->pid)) if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi); pi->pid = get_proc_pid(pi);
if (!pi->pid) { if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
return; goto out_nofail;
}
} }
if (unlikely(kill_pid(pi->pid, 0))) { if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
goto out; goto out;
} }
sockd = open_unix_client(path); sockd = open_unix_client(path);
@ -613,41 +563,25 @@ out:
LOGERR("Failure in send_proc from %s %s:%d", file, func, line); LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15); childsighandler(15);
} }
} out_nofail:
free(msg);
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm)
{
_send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line);
free(pm->msg);
free(pm); free(pm);
return NULL;
} }
/* Fore sending asynchronous messages to another process, the sending process /* Send a single message to a process instance when there will be no response,
* must have ckwqs of its own, referenced in the ckpool structure */ * closing the socket immediately. */
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{ {
struct proc_message *pm; struct proc_message *pm = ckalloc(sizeof(struct proc_message));
pthread_t pth;
if (unlikely(!ckp->ckwqs)) {
LOGALERT("Workqueues not set up in async_send_proc!");
_send_proc(pi, msg, file, func, line);
return;
}
pm = ckzalloc(sizeof(struct proc_message));
pm->pi = pi; pm->pi = pi;
pm->msg = strdup(msg); pm->msg = strdup(msg);
pm->file = file; pm->file = file;
pm->func = func; pm->func = func;
pm->line = line; pm->line = line;
ckwq_add(ckp->ckwqs, &asp_send, pm); create_pthread(&pth, async_send_proc, pm);
} }
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then

33
src/ckpool.h

@ -22,22 +22,13 @@
struct ckpool_instance; struct ckpool_instance;
typedef struct ckpool_instance ckpool_t; typedef struct ckpool_instance ckpool_t;
typedef struct ckmsg ckmsg_t;
struct ckmsg { struct ckmsg {
ckmsg_t *next; struct ckmsg *next;
ckmsg_t *prev; struct ckmsg *prev;
void *data; void *data;
}; };
typedef struct ckwqmsg ckwqmsg_t; typedef struct ckmsg ckmsg_t;
struct ckwqmsg {
ckwqmsg_t *next;
ckwqmsg_t *prev;
void *data;
void (*func)(ckpool_t *, void *);
};
struct ckmsgq { struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
@ -52,18 +43,6 @@ struct ckmsgq {
typedef struct ckmsgq ckmsgq_t; typedef struct ckmsgq ckmsgq_t;
struct ckwq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
mutex_t *lock;
pthread_cond_t *cond;
ckwqmsg_t *wqmsgs;
int64_t messages;
};
typedef struct ckwq ckwq_t;
struct proc_instance { struct proc_instance {
ckpool_t *ckp; ckpool_t *ckp;
unixsock_t us; unixsock_t us;
@ -213,8 +192,6 @@ struct ckpool_instance {
/* Private data for each process */ /* Private data for each process */
void *data; void *data;
/* Private generic workqueues if this process has them */
ckwq_t *ckwqs;
}; };
#ifdef USE_CKDB #ifdef USE_CKDB
@ -227,9 +204,7 @@ struct ckpool_instance {
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
void ckwq_add(ckwq_t *ckwq, const void *func, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);
ckpool_t *global_ckp; ckpool_t *global_ckp;
@ -239,8 +214,6 @@ void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout); int read_socket_line(connsock_t *cs, const int timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);

10
src/connector.c

@ -97,8 +97,6 @@ struct connector_data {
/* For protecting the pending sends list */ /* For protecting the pending sends list */
mutex_t sender_lock; mutex_t sender_lock;
pthread_cond_t sender_cond; pthread_cond_t sender_cond;
ckwq_t *ckwqs;
}; };
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
@ -244,7 +242,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
char buf[256]; char buf[256];
sprintf(buf, "dropclient=%"PRId64, id); sprintf(buf, "dropclient=%"PRId64, id);
async_send_proc(ckp, ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
/* Invalidate this instance. Remove them from the hashtables we look up /* Invalidate this instance. Remove them from the hashtables we look up
@ -363,9 +361,9 @@ reparse:
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(client->fd != -1)) { if (likely(client->fd != -1)) {
if (ckp->passthrough) if (ckp->passthrough)
async_send_proc(ckp, ckp->generator, s); send_proc(ckp->generator, s);
else else
async_send_proc(ckp, ckp->stratifier, s); send_proc(ckp->stratifier, s);
} }
free(s); free(s);
@ -863,8 +861,6 @@ int connector(proc_instance_t *pi)
LOGWARNING("%s connector starting", ckp->name); LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata; ckp->data = cdata;
cdata->ckp = ckp; cdata->ckp = ckp;
/* Connector only requires one work queue */
ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1);
if (!ckp->serverurls) if (!ckp->serverurls)
cdata->serverfd = ckalloc(sizeof(int *)); cdata->serverfd = ckalloc(sizeof(int *));

37
src/generator.c

@ -132,7 +132,6 @@ struct generator_data {
proxy_instance_t *proxy_list; /* Linked list of all active proxies */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
ckwq_t *ckwqs;
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -222,7 +221,7 @@ retry:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); send_proc(ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -368,7 +367,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1); ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1); memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
async_send_proc(ckp, ckp->stratifier, blockmsg); send_proc(ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) { } else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10)) if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
@ -1298,22 +1297,22 @@ static void *proxy_recv(void *arg)
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */ /* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
async_send_proc(ckp, ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
break; break;
} }
if (parse_method(proxi, cs->buf)) { if (parse_method(proxi, cs->buf)) {
if (proxi->notified) { if (proxi->notified) {
async_send_proc(ckp, ckp->stratifier, "notify"); send_proc(ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
} }
if (proxi->diffed) { if (proxi->diffed) {
async_send_proc(ckp, ckp->stratifier, "diff"); send_proc(ckp->stratifier, "diff");
proxi->diffed = false; proxi->diffed = false;
} }
if (proxi->reconnect) { if (proxi->reconnect) {
proxi->reconnect = false; proxi->reconnect = false;
LOGWARNING("Reconnect issue, dropping existing connection"); LOGWARNING("Reconnect issue, dropping existing connection");
async_send_proc(ckp, ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
break; break;
} }
continue; continue;
@ -1403,13 +1402,13 @@ static void *passthrough_recv(void *arg)
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */ /* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
async_send_proc(ckp, ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
break; break;
} }
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
async_send_proc(ckp, ckp->connector, cs->buf); send_proc(ckp->connector, cs->buf);
} }
return NULL; return NULL;
} }
@ -1524,8 +1523,8 @@ retry:
} }
} }
if (!alive) { if (!alive) {
async_send_proc(ckp, ckp->connector, "reject"); send_proc(ckp->connector, "reject");
async_send_proc(ckp, ckp->stratifier, "dropall"); send_proc(ckp->stratifier, "dropall");
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5); sleep(5);
goto retry; goto retry;
@ -1544,7 +1543,7 @@ retry:
create_pthread(&alive->pth_psend, proxy_send, alive); create_pthread(&alive->pth_psend, proxy_send, alive);
} }
out: out:
async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); send_proc(ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -1553,8 +1552,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi)
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; connsock_t *cs;
async_send_proc(ckp, ckp->stratifier, "reconnect"); send_proc(ckp->stratifier, "reconnect");
async_send_proc(ckp, ckp->connector, "reject"); send_proc(ckp->connector, "reject");
if (!proxi) // This shouldn't happen if (!proxi) // This shouldn't happen
return; return;
@ -1604,8 +1603,8 @@ reconnect:
/* We've just subscribed and authorised so tell the stratifier to /* We've just subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */ * retrieve the first subscription. */
if (!ckp->passthrough) { if (!ckp->passthrough) {
async_send_proc(ckp, ckp->stratifier, "subscribe"); send_proc(ckp->stratifier, "subscribe");
async_send_proc(ckp, ckp->stratifier, "notify"); send_proc(ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
} }
@ -1793,7 +1792,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
async_send_proc(ckp, ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
} }
static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
@ -1839,7 +1838,7 @@ static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
async_send_proc(ckp, ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
} }
@ -1852,8 +1851,6 @@ int generator(proc_instance_t *pi)
LOGWARNING("%s generator starting", ckp->name); LOGWARNING("%s generator starting", ckp->name);
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
/* Generator only requires one work queue */
ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1);
if (ckp->proxy) { if (ckp->proxy) {
gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog);
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);

95
src/stratifier.c

@ -313,10 +313,12 @@ struct stratifier_data {
char lasthash[68]; char lasthash[68];
char lastswaphash[68]; char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id; int64_t user_instance_id;
@ -795,26 +797,38 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
set = true; set = true;
} else } else
set = false; set = false;
async_send_proc(ckp, ckp->generator, msg); send_proc(ckp->generator, msg);
if (set) if (set)
sdata->gen_priority = 0; sdata->gen_priority = 0;
} }
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(sdata_t *sdata); static void broadcast_ping(sdata_t *sdata);
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template * since checking should have been done earlier, and creates the base template
* for generating work templates. */ * for generating work templates. */
static void do_update(ckpool_t *ckp, int *prio) static void *do_update(void *arg)
{ {
struct update_req *ur = (struct update_req *)arg;
ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
bool new_block = false; bool new_block = false;
int prio = ur->prio;
bool ret = false; bool ret = false;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
char *buf; char *buf;
buf = send_recv_generator(ckp, "getbase", *prio); pthread_detach(pthread_self());
rename_proc("updater");
buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGNOTICE("Get base in update_base delayed due to higher priority request"); LOGNOTICE("Get base in update_base delayed due to higher priority request");
goto out; goto out;
@ -874,17 +888,21 @@ out:
LOGINFO("Broadcast ping due to failed stratum base update"); LOGINFO("Broadcast ping due to failed stratum base update");
broadcast_ping(sdata); broadcast_ping(sdata);
} }
free(buf); dealloc(buf);
free(prio); free(ur->pth);
free(ur);
return NULL;
} }
static void update_base(ckpool_t *ckp, const int prio) static void update_base(ckpool_t *ckp, const int prio)
{ {
int *pprio = ckalloc(sizeof(int)); struct update_req *ur = ckalloc(sizeof(struct update_req));
sdata_t *sdata = ckp->data; pthread_t *pth = ckalloc(sizeof(pthread_t));
*pprio = prio; ur->pth = pth;
ckwq_add(sdata->ckwqs, &do_update, pprio); ur->ckp = ckp;
ur->prio = prio;
create_pthread(pth, do_update, ur);
} }
static void __kill_instance(stratum_instance_t *client) static void __kill_instance(stratum_instance_t *client)
@ -928,7 +946,7 @@ static void drop_allclients(ckpool_t *ckp)
client->dropped = true; client->dropped = true;
kills++; kills++;
sprintf(buf, "dropclient=%"PRId64, client_id); sprintf(buf, "dropclient=%"PRId64, client_id);
async_send_proc(ckp, ckp->connector, buf); send_proc(ckp->connector, buf);
} }
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
disconnects++; disconnects++;
@ -1633,21 +1651,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
} }
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{ {
json_t *val = json_object(), *subval; json_t *val = json_object(), *subval;
@ -1694,14 +1697,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval); json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */ /* Don't know exactly how big the string is so just count the pointer for now */
ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "ckwqs", subval); json_set_object(val, "srecvs", subval);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval); json_set_object(val, "ckdbq", subval);
} }
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val); json_decref(val);
@ -1709,8 +1713,6 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
return buf; return buf;
} }
static void srecv_process(ckpool_t *ckp, char *buf);
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret = 0; int sockd, ret = 0, selret = 0;
@ -1765,7 +1767,7 @@ retry:
/* The bulk of the messages will be received json from the /* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees * connector so look for this first. The srecv_process frees
* the buf heap ram */ * the buf heap ram */
ckwq_add(sdata->ckwqs, &srecv_process, buf); ckmsgq_add(sdata->srecvs, buf);
Close(sockd); Close(sockd);
buf = NULL; buf = NULL;
goto retry; goto retry;
@ -3287,14 +3289,10 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
} }
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address) json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
{ {
ckpool_t *ckp = client->ckp;
const char *method; const char *method;
char buf[256]; char buf[256];
@ -3305,7 +3303,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &sshare_process, jp); ckmsgq_add(sdata->sshareq, jp);
return; return;
} }
@ -3340,7 +3338,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* to it since it's unauthorised. Set the flag just in case. */ * to it since it's unauthorised. Set the flag just in case. */
client->authorised = false; client->authorised = false;
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
async_send_proc(ckp, client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
return; return;
} }
@ -3348,7 +3346,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (!client->subscribed) { if (!client->subscribed) {
LOGINFO("Dropping unsubscribed client %"PRId64, client_id); LOGINFO("Dropping unsubscribed client %"PRId64, client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id);
async_send_proc(ckp, client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
return; return;
} }
@ -3371,7 +3369,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* the stratum instance data. Clients will just reconnect. */ * the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64, client_id); LOGINFO("Dropping unauthorised client %"PRId64, client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id);
async_send_proc(ckp, client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
return; return;
} }
@ -3384,7 +3382,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) { if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &send_transactions, jp); ckmsgq_add(sdata->stxnq, jp);
return; return;
} }
/* Unhandled message here */ /* Unhandled message here */
@ -3403,13 +3401,12 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *
{ {
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int64_t client_id = msg->client_id; int64_t client_id = msg->client_id;
ckpool_t *ckp = client->ckp;
char buf[256]; char buf[256];
if (unlikely(client->reject == 2)) { if (unlikely(client->reject == 2)) {
LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id);
async_send_proc(ckp, ckp->connector, buf); send_proc(client->ckp->connector, buf);
goto out; goto out;
} }
@ -3543,7 +3540,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */ * connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, 0); s = json_dumps(msg->json_msg, 0);
async_send_proc(ckp, ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
free_smsg(msg); free_smsg(msg);
} }
@ -4401,10 +4398,14 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock); mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create as many generic workqueue threads as there are CPUs */ /* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN); threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) { if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save