Browse Source

Merge branch 'master' into multiproxy

Conflicts:
	src/stratifier.c
master
Con Kolivas 10 years ago
parent
commit
83fee96e2c
  1. 138
      src/ckpool.c
  2. 18
      src/ckpool.h
  3. 49
      src/connector.c
  4. 3
      src/libckpool.h
  5. 80
      src/stratifier.c

138
src/ckpool.c

@ -203,6 +203,86 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
return ret; return ret;
} }
static void childsighandler(const int sig);
/* Create a standalone thread that queues received unix messages for a proc
* instance and adds them to linked list of received messages with their
* associated receive socket, then signal the associated rmsg_cond for the
* process to know we have more queued messages. The unix_msg_t ram must be
* freed by the code that removes the entry from the list. */
static void *unix_receiver(void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
int rsockd = pi->us.sockd, sockd;
char qname[16];
sprintf(qname, "%cunixrq", pi->processname[0]);
rename_proc(qname);
pthread_detach(pthread_self());
while (42) {
unix_msg_t *umsg;
char *buf;
sockd = accept(rsockd, NULL, NULL);
if (unlikely(sockd < 0)) {
LOGEMERG("Failed to accept on %s socket, exiting", qname);
childsighandler(15);
break;
}
buf = recv_unix_msg(sockd);
if (unlikely(!buf)) {
Close(sockd);
LOGWARNING("Failed to get message on %s socket", qname);
continue;
}
umsg = ckalloc(sizeof(unix_msg_t));
umsg->sockd = sockd;
umsg->buf = buf;
mutex_lock(&pi->rmsg_lock);
DL_APPEND(pi->unix_msgs, umsg);
pthread_cond_signal(&pi->rmsg_cond);
mutex_unlock(&pi->rmsg_lock);
}
return NULL;
}
/* Get the next message in the receive queue, or wait up to 5 seconds for
* the next message, returning NULL if no message is received in that time. */
unix_msg_t *get_unix_msg(proc_instance_t *pi)
{
unix_msg_t *umsg;
mutex_lock(&pi->rmsg_lock);
if (!pi->unix_msgs) {
tv_t now;
ts_t abs;
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec += 5;
cond_timedwait(&pi->rmsg_cond, &pi->rmsg_lock, &abs);
}
umsg = pi->unix_msgs;
if (umsg)
DL_DELETE(pi->unix_msgs, umsg);
mutex_unlock(&pi->rmsg_lock);
return umsg;
}
void create_unix_receiver(proc_instance_t *pi)
{
pthread_t pth;
mutex_init(&pi->rmsg_lock);
cond_init(&pi->rmsg_cond);
create_pthread(&pth, unix_receiver, pi);
}
static void broadcast_proc(ckpool_t *ckp, const char *buf) static void broadcast_proc(ckpool_t *ckp, const char *buf)
{ {
int i; int i;
@ -518,45 +598,31 @@ out:
return ret; return ret;
} }
static void childsighandler(const int sig); /* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
struct proc_message { void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
/* Send all one way messages asynchronously so we can wait till the receiving
* end closes the socket to ensure all messages are received but no deadlocks
* can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
{ {
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path; char *path = pi->us.path;
bool ret = false; bool ret = false;
int sockd; int sockd;
pthread_detach(pthread_self()); if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return;
}
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out; goto out;
} }
/* At startup the pid fields are not set up before some processes are /* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */ * forked so they never inherit them. */
if (unlikely(!pi->pid)) { if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi); pi->pid = get_proc_pid(pi);
if (!pi->pid) { if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out_nofail; return;
} }
} }
if (unlikely(kill_pid(pi->pid, 0))) { if (unlikely(kill_pid(pi->pid, 0))) {
@ -573,38 +639,12 @@ void *async_send_proc(void *arg)
LOGWARNING("Failed to send %s to socket %s", msg, path); LOGWARNING("Failed to send %s to socket %s", msg, path);
else else
ret = true; ret = true;
if (!wait_close(sockd, 5))
LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line);
Close(sockd); Close(sockd);
out: out:
if (unlikely(!ret)) { if (unlikely(!ret)) {
LOGERR("Failure in send_proc from %s %s:%d", file, func, line); LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15); childsighandler(15);
} }
out_nofail:
free(msg);
free(pm);
return NULL;
}
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm;
pthread_t pth;
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return;
}
pm = ckalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
create_pthread(&pth, async_send_proc, pm);
} }
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then

18
src/ckpool.h

@ -29,6 +29,15 @@ struct ckmsg {
typedef struct ckmsg ckmsg_t; typedef struct ckmsg ckmsg_t;
typedef struct unix_msg unix_msg_t;
struct unix_msg {
unix_msg_t *next;
unix_msg_t *prev;
int sockd;
char *buf;
};
struct ckmsgq { struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
char name[16]; char name[16];
@ -42,6 +51,8 @@ struct ckmsgq {
typedef struct ckmsgq ckmsgq_t; typedef struct ckmsgq ckmsgq_t;
typedef struct proc_instance proc_instance_t;
struct proc_instance { struct proc_instance {
ckpool_t *ckp; ckpool_t *ckp;
unixsock_t us; unixsock_t us;
@ -50,6 +61,11 @@ struct proc_instance {
int pid; int pid;
int oldpid; int oldpid;
int (*process)(proc_instance_t *); int (*process)(proc_instance_t *);
/* Linked list of received messages, locking and conditional */
unix_msg_t *unix_msgs;
mutex_t rmsg_lock;
pthread_cond_t rmsg_cond;
}; };
struct connsock { struct connsock {
@ -219,6 +235,8 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);
unix_msg_t *get_unix_msg(proc_instance_t *pi);
void create_unix_receiver(proc_instance_t *pi);
ckpool_t *global_ckp; ckpool_t *global_ckp;

49
src/connector.c

@ -764,25 +764,22 @@ static char *connector_stats(cdata_t *cdata)
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{ {
int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id; int64_t client_id64, client_id;
unixsock_t *us = &pi->us; unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
uint8_t test_cycle = 0; uint8_t test_cycle = 0;
char *buf = NULL; char *buf;
int ret = 0;
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Connector failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
LOGWARNING("%s connector ready", ckp->name); LOGWARNING("%s connector ready", ckp->name);
retry: retry:
if (umsg) {
Close(umsg->sockd);
free(umsg->buf);
dealloc(umsg);
}
if (!++test_cycle) { if (!++test_cycle) {
/* Test for pthread join every 256 messages */ /* Test for pthread join every 256 messages */
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
@ -797,21 +794,11 @@ retry:
} }
} }
Close(sockd); do {
sockd = accept(us->sockd, NULL, NULL); umsg = get_unix_msg(pi);
if (sockd < 0) { } while (!umsg);
LOGEMERG("Failed to accept on connector socket, exiting");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in connector_loop");
goto retry;
}
buf = umsg->buf;
LOGDEBUG("Connector received message: %s", buf); LOGDEBUG("Connector received message: %s", buf);
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
@ -848,7 +835,7 @@ retry:
stratifier_drop_client(ckp, client_id); stratifier_drop_client(ckp, client_id);
} else if (cmdmatch(buf, "ping")) { } else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Connector received ping request"); LOGDEBUG("Connector received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(umsg->sockd, "pong");
} else if (cmdmatch(buf, "accept")) { } else if (cmdmatch(buf, "accept")) {
LOGDEBUG("Connector received accept signal"); LOGDEBUG("Connector received accept signal");
cdata->accept = true; cdata->accept = true;
@ -860,7 +847,7 @@ retry:
LOGDEBUG("Connector received stats request"); LOGDEBUG("Connector received stats request");
msg = connector_stats(cdata); msg = connector_stats(cdata);
send_unix_msg(sockd, msg); send_unix_msg(umsg->sockd, msg);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) { } else if (cmdmatch(buf, "shutdown")) {
@ -885,13 +872,11 @@ retry:
sscanf(buf, "getxfd%d", &fdno); sscanf(buf, "getxfd%d", &fdno);
if (fdno > -1 && fdno < ckp->serverurls) if (fdno > -1 && fdno < ckp->serverurls)
send_fd(cdata->serverfd[fdno], sockd); send_fd(cdata->serverfd[fdno], umsg->sockd);
} else } else
LOGWARNING("Unhandled connector message: %s", buf); LOGWARNING("Unhandled connector message: %s", buf);
goto retry; goto retry;
out: out:
Close(sockd);
dealloc(buf);
return ret; return ret;
} }
@ -1003,6 +988,8 @@ int connector(proc_instance_t *pi)
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);
create_pthread(&cdata->pth_receiver, receiver, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata);
create_unix_receiver(pi);
ret = connector_loop(pi, cdata); ret = connector_loop(pi, cdata);
out: out:
dealloc(ckp->data); dealloc(ckp->data);

3
src/libckpool.h

@ -309,9 +309,6 @@ struct unixsock {
typedef struct unixsock unixsock_t; typedef struct unixsock unixsock_t;
typedef struct proc_instance proc_instance_t;
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line);
#define json_check(VAL, ERR) _json_check(VAL, ERR, __FILE__, __func__, __LINE__) #define json_check(VAL, ERR) _json_check(VAL, ERR, __FILE__, __func__, __LINE__)

80
src/stratifier.c

@ -2983,13 +2983,18 @@ static void get_poolstats(sdata_t *sdata, int *sockd)
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret = 0;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
unixsock_t *us = &pi->us; unix_msg_t *umsg = NULL;
tv_t start_tv = {0, 0}; tv_t start_tv = {0, 0};
char *buf = NULL; int ret = 0;
char *buf;
retry: retry:
if (umsg) {
free(umsg->buf);
dealloc(umsg);
}
do { do {
double tdiff; double tdiff;
tv_t end_tv; tv_t end_tv;
@ -3008,41 +3013,29 @@ retry:
broadcast_ping(sdata); broadcast_ping(sdata);
} }
} }
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
sockd = accept(us->sockd, NULL, NULL); umsg = get_unix_msg(pi);
if (sockd < 0) { if (unlikely(!umsg &&!ping_main(ckp))) {
LOGERR("Failed to accept on stratifier socket, exiting"); LOGEMERG("Stratifier failed to ping main process, exiting");
ret = 1; ret = 1;
goto out; goto out;
} }
} while (!umsg);
dealloc(buf); buf = umsg->buf;
buf = recv_unix_msg(sockd);
if (unlikely(!buf)) {
Close(sockd);
LOGWARNING("Failed to get message in stratum_loop");
goto retry;
}
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
/* The bulk of the messages will be received json from the /* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees * connector so look for this first. The srecv_process frees
* the buf heap ram */ * the buf heap ram */
Close(sockd); Close(umsg->sockd);
ckmsgq_add(sdata->srecvs, buf); ckmsgq_add(sdata->srecvs, umsg->buf);
buf = NULL; umsg->buf = NULL;
goto retry; goto retry;
} }
if (cmdmatch(buf, "ping")) { if (cmdmatch(buf, "ping")) {
LOGDEBUG("Stratifier received ping request"); LOGDEBUG("Stratifier received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(umsg->sockd, "pong");
Close(sockd); Close(umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "stats")) { if (cmdmatch(buf, "stats")) {
@ -3050,69 +3043,69 @@ retry:
LOGDEBUG("Stratifier received stats request"); LOGDEBUG("Stratifier received stats request");
msg = stratifier_stats(ckp, sdata); msg = stratifier_stats(ckp, sdata);
send_unix_msg(sockd, msg); send_unix_msg(umsg->sockd, msg);
Close(sockd); Close(umsg->sockd);
goto retry; goto retry;
} }
/* Parse API commands here to return a message to sockd */ /* Parse API commands here to return a message to sockd */
if (cmdmatch(buf, "clients")) { if (cmdmatch(buf, "clients")) {
getclients(sdata, &sockd); getclients(sdata, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "workers")) { if (cmdmatch(buf, "workers")) {
getworkers(sdata, &sockd); getworkers(sdata, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "users")) { if (cmdmatch(buf, "users")) {
getusers(sdata, &sockd); getusers(sdata, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "getclient")) { if (cmdmatch(buf, "getclient")) {
getclient(sdata, buf + 10, &sockd); getclient(sdata, buf + 10, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "getuser")) { if (cmdmatch(buf, "getuser")) {
getuser(sdata, buf + 8, &sockd); getuser(sdata, buf + 8, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "getworker")) { if (cmdmatch(buf, "getworker")) {
getworker(sdata, buf + 10, &sockd); getworker(sdata, buf + 10, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "userclients")) { if (cmdmatch(buf, "userclients")) {
userclients(sdata, buf + 12, &sockd); userclients(sdata, buf + 12, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "workerclients")) { if (cmdmatch(buf, "workerclients")) {
workerclients(sdata, buf + 14, &sockd); workerclients(sdata, buf + 14, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "getproxy")) { if (cmdmatch(buf, "getproxy")) {
getproxy(sdata, buf + 9, &sockd); getproxy(sdata, buf + 9, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "setproxy")) { if (cmdmatch(buf, "setproxy")) {
setproxy(sdata, buf + 9, &sockd); setproxy(sdata, buf + 9, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "poolstats")) { if (cmdmatch(buf, "poolstats")) {
get_poolstats(sdata, &sockd); get_poolstats(sdata, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "proxyinfo")) { if (cmdmatch(buf, "proxyinfo")) {
proxyinfo(sdata, buf + 10, &sockd); proxyinfo(sdata, buf + 10, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "ucinfo")) { if (cmdmatch(buf, "ucinfo")) {
user_clientinfo(sdata, buf + 7, &sockd); user_clientinfo(sdata, buf + 7, &umsg->sockd);
goto retry; goto retry;
} }
if (cmdmatch(buf, "wcinfo")) { if (cmdmatch(buf, "wcinfo")) {
worker_clientinfo(sdata, buf + 7, &sockd); worker_clientinfo(sdata, buf + 7, &umsg->sockd);
goto retry; goto retry;
} }
Close(sockd); Close(umsg->sockd);
LOGDEBUG("Stratifier received request: %s", buf); LOGDEBUG("Stratifier received request: %s", buf);
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
@ -3160,7 +3153,6 @@ retry:
goto retry; goto retry;
out: out:
dealloc(buf);
return ret; return ret;
} }
@ -6011,6 +6003,8 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->share_lock); mutex_init(&sdata->share_lock);
mutex_init(&sdata->block_lock); mutex_init(&sdata->block_lock);
create_unix_receiver(pi);
LOGWARNING("%s stratifier ready", ckp->name); LOGWARNING("%s stratifier ready", ckp->name);
ret = stratum_loop(ckp, pi); ret = stratum_loop(ckp, pi);

Loading…
Cancel
Save