Browse Source

Merge commit '428cabdfc4c8fe0cf3be17aef5033295eeffb50f' into proxydev

Conflicts:
	src/stratifier.c
master
Con Kolivas 10 years ago
parent
commit
d437404ada
  1. 170
      src/ckpool.c
  2. 35
      src/ckpool.h
  3. 18
      src/libckpool.c
  4. 1
      src/libckpool.h
  5. 87
      src/stratifier.c

170
src/ckpool.c

@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg)
return NULL;
}
/* Generic workqueue function and message receiving and parsing thread */
static void *ckwq_queue(void *arg)
{
ckwq_t *ckmsgq = (ckwq_t *)arg;
ckpool_t *ckp = ckmsgq->ckp;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckwqmsg_t *wqmsg;
tv_t now;
ts_t abs;
mutex_lock(ckmsgq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckmsgq->wqmsgs)
cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
wqmsg = ckmsgq->wqmsgs;
if (wqmsg)
DL_DELETE(ckmsgq->wqmsgs, wqmsg);
mutex_unlock(ckmsgq->lock);
if (!wqmsg)
continue;
wqmsg->func(ckp, wqmsg->data);
free(wqmsg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
return ckmsgq;
}
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count)
{
ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count);
mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckwq[i].name, 15, "%.6swq%d", name, i);
ckwq[i].ckp = ckp;
ckwq[i].lock = lock;
ckwq[i].cond = cond;
create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]);
}
return ckwq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
mutex_lock(ckmsgq->lock);
ckmsgq->messages++;
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(ckmsgq->cond);
pthread_cond_broadcast(ckmsgq->cond);
mutex_unlock(ckmsgq->lock);
}
void ckwq_add(ckwq_t *ckwq, const void *func, void *data)
{
ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t));
wqmsg->func = func;
wqmsg->data = data;
mutex_lock(ckwq->lock);
ckwq->messages++;
DL_APPEND(ckwq->wqmsgs, wqmsg);
pthread_cond_broadcast(ckwq->cond);
mutex_unlock(ckwq->lock);
}
/* Return whether there are any messages queued in the ckmsgq linked list. */
bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{
@ -237,7 +307,25 @@ static int pid_wait(const pid_t pid, const int ms)
return ret;
}
static int send_procmsg(const proc_instance_t *pi, const char *buf)
static int get_proc_pid(const proc_instance_t *pi)
{
int ret, pid = 0;
char path[256];
FILE *fp;
sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname);
fp = fopen(path, "re");
if (!fp)
goto out;
ret = fscanf(fp, "%d", &pid);
if (ret < 1)
pid = 0;
fclose(fp);
out:
return pid;
}
static int send_procmsg(proc_instance_t *pi, const char *buf)
{
char *path = pi->us.path;
int ret = -1;
@ -251,6 +339,12 @@ static int send_procmsg(const proc_instance_t *pi, const char *buf)
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname);
goto out;
@ -481,27 +575,9 @@ out:
static void childsighandler(const int sig);
static int get_proc_pid(const proc_instance_t *pi)
{
int ret, pid = 0;
char path[256];
FILE *fp;
sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname);
fp = fopen(path, "re");
if (!fp)
goto out;
ret = fscanf(fp, "%d", &pid);
if (ret < 1)
pid = 0;
fclose(fp);
out:
return pid;
}
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path;
bool ret = false;
@ -519,6 +595,10 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
* forked so they never inherit them. */
if (unlikely(!pi->pid))
pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
return;
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out;
@ -532,13 +612,49 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
ret = true;
if (!wait_close(sockd, 5))
LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line);
Close(sockd);
out:
if (unlikely(!ret)) {
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15);
}
return ret;
}
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm)
{
_send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line);
free(pm->msg);
free(pm);
}
/* Fore sending asynchronous messages to another process, the sending process
* must have ckwqs of its own, referenced in the ckpool structure */
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm;
if (unlikely(!ckp->ckwqs)) {
LOGALERT("Workqueues not set up in async_send_proc!");
_send_proc(pi, msg, file, func, line);
return;
}
pm = ckzalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
ckwq_add(ckp->ckwqs, &asp_send, pm);
}
/* Send a single message to a process instance and retrieve the response, then
@ -556,7 +672,15 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) {
/* Reset the pid value in case we are still looking for an old
* process */
pi->pid = 0;
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
@ -1209,6 +1333,8 @@ static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *nam
pi->process = process;
create_process_unixsock(pi);
manage_old_child(ckp, pi);
/* Remove the old pid file if we've succeeded in coming this far */
rm_namepid(pi);
return pi;
}

35
src/ckpool.h

@ -21,13 +21,22 @@
typedef struct ckpool_instance ckpool_t;
typedef struct ckmsg ckmsg_t;
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
ckmsg_t *next;
ckmsg_t *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
typedef struct ckwqmsg ckwqmsg_t;
struct ckwqmsg {
ckwqmsg_t *next;
ckwqmsg_t *prev;
void *data;
void (*func)(ckpool_t *, void *);
};
struct ckmsgq {
ckpool_t *ckp;
@ -42,6 +51,18 @@ struct ckmsgq {
typedef struct ckmsgq ckmsgq_t;
struct ckwq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
mutex_t *lock;
pthread_cond_t *cond;
ckwqmsg_t *wqmsgs;
int64_t messages;
};
typedef struct ckwq ckwq_t;
struct proc_instance {
ckpool_t *ckp;
unixsock_t us;
@ -192,6 +213,8 @@ struct ckpool_instance {
/* Private data for each process */
void *data;
/* Private generic workqueues if this process has them */
ckwq_t *ckwqs;
};
#ifdef USE_CKDB
@ -204,7 +227,9 @@ struct ckpool_instance {
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
void ckwq_add(ckwq_t *ckwq, const void *func, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq);
ckpool_t *global_ckp;
@ -212,8 +237,10 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout);
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);

18
src/libckpool.c

@ -910,6 +910,24 @@ out:
return sockd;
}
/* Wait till a socket has been closed at the other end */
int wait_close(int sockd, int timeout)
{
struct pollfd sfd;
int ret;
if (unlikely(sockd < 0))
return -1;
sfd.fd = sockd;
sfd.events = POLLIN;
sfd.revents = 0;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
if (ret < 1)
return 0;
return sfd.revents & POLLHUP;
}
/* Emulate a select read wait for high fds that select doesn't support */
int wait_read_select(int sockd, int timeout)
{

1
src/libckpool.h

@ -493,6 +493,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
#define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__)
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_close(int sockd, int timeout);
int wait_read_select(int sockd, int timeout);
int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);

87
src/stratifier.c

@ -336,12 +336,10 @@ struct stratifier_data {
char lasthash[68];
char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id;
@ -815,33 +813,21 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
sdata->gen_priority = 0;
}
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(sdata_t *sdata);
/* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template
* for generating work templates. */
static void *do_update(void *arg)
static void do_update(ckpool_t *ckp, int *prio)
{
struct update_req *ur = (struct update_req *)arg;
ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->data;
bool new_block = false;
int prio = ur->prio;
bool ret = false;
workbase_t *wb;
json_t *val;
char *buf;
pthread_detach(pthread_self());
rename_proc("updater");
buf = send_recv_generator(ckp, "getbase", prio);
buf = send_recv_generator(ckp, "getbase", *prio);
if (unlikely(!buf)) {
LOGNOTICE("Get base in update_base delayed due to higher priority request");
goto out;
@ -901,21 +887,17 @@ out:
LOGINFO("Broadcast ping due to failed stratum base update");
broadcast_ping(sdata);
}
dealloc(buf);
free(ur->pth);
free(ur);
return NULL;
free(buf);
free(prio);
}
static void update_base(ckpool_t *ckp, const int prio)
{
struct update_req *ur = ckalloc(sizeof(struct update_req));
pthread_t *pth = ckalloc(sizeof(pthread_t));
int *pprio = ckalloc(sizeof(int));
sdata_t *sdata = ckp->data;
ur->pth = pth;
ur->ckp = ckp;
ur->prio = prio;
create_pthread(pth, do_update, ur);
*pprio = prio;
ckwq_add(sdata->ckwqs, &do_update, pprio);
}
static void __kill_instance(stratum_instance_t *client)
@ -1757,6 +1739,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{
json_t *val = json_object(), *subval;
@ -1803,15 +1800,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "srecvs", subval);
ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval);
json_set_object(val, "ckwqs", subval);
if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
}
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
@ -1838,6 +1834,8 @@ static void set_proxy(sdata_t *sdata, const char *buf)
proxy->notified = false;
}
static void srecv_process(ckpool_t *ckp, char *buf);
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{
int sockd, ret = 0, selret = 0;
@ -1892,7 +1890,7 @@ retry:
/* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees
* the buf heap ram */
ckmsgq_add(sdata->srecvs, buf);
ckwq_add(sdata->ckwqs, &srecv_process, buf);
Close(sockd);
buf = NULL;
goto retry;
@ -3416,6 +3414,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client);
}
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
@ -3429,7 +3430,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp);
ckwq_add(sdata->ckwqs, &sshare_process, jp);
return;
}
@ -3508,7 +3509,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp);
ckwq_add(sdata->ckwqs, &send_transactions, jp);
return;
}
/* Unhandled message here */
@ -3545,7 +3546,11 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
if (res_val) {
const char *result = json_string_value(res_val);
LOGDEBUG("Received spurious response %s", result ? result : "");
if (!safecmp(result, "pong"))
LOGDEBUG("Received pong from client %"PRId64, client_id);
else
LOGDEBUG("Received spurious response %s from client %"PRId64,
result ? result : "", client_id);
goto out;
}
send_json_err(sdata, client_id, id_val, "-3:method not found");
@ -4488,8 +4493,8 @@ int stratifier(proc_instance_t *pi)
ckpool_t *ckp = pi->ckp;
int ret = 1, threads;
int64_t randomiser;
char *buf = NULL;
sdata_t *sdata;
char *buf;
LOGWARNING("%s stratifier starting", ckp->name);
sdata = ckzalloc(sizeof(sdata_t));
@ -4537,14 +4542,10 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
/* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN);
ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save