Browse Source

Make all child processes ping the parent process at regular intervals and then die if it has disappeared. Do not attempt to send unix messages to dead processes and abort instantly on socket EINTR

master
Con Kolivas 11 years ago
parent
commit
86e70edfbc
  1. 11
      src/ckpool.c
  2. 1
      src/ckpool.h
  3. 11
      src/connector.c
  4. 22
      src/generator.c
  5. 18
      src/libckpool.c
  6. 60
      src/stratifier.c

11
src/ckpool.c

@ -71,6 +71,17 @@ out:
return NULL; return NULL;
} }
bool ping_main(ckpool_t *ckp)
{
char *buf;
buf = send_recv_proc(&ckp->main, "ping");
if (!buf)
return false;
free(buf);
return true;
}
/* Open the file in path, check if there is a pid in there that still exists /* Open the file in path, check if there is a pid in there that still exists
* and if not, write the pid into that file. */ * and if not, write the pid into that file. */
static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid) static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid)

1
src/ckpool.h

@ -108,6 +108,7 @@ struct ckpool_instance {
ckpool_t *global_ckp; ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp);
int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret); int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret);
#define ASPRINTF(strp, fmt, ...) do { \ #define ASPRINTF(strp, fmt, ...) do { \

11
src/connector.c

@ -431,11 +431,20 @@ static client_instance_t *client_by_id(conn_instance_t *ci, int id)
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
{ {
int sockd, client_id, ret = 0; int sockd, client_id, ret = 0, selret;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
char *buf = NULL; char *buf = NULL;
json_t *json_msg; json_t *json_msg;
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Connector failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
retry: retry:
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {

22
src/generator.c

@ -173,10 +173,10 @@ static void kill_server(server_instance_t *si)
static int gen_loop(proc_instance_t *pi) static int gen_loop(proc_instance_t *pi)
{ {
int sockd = -1, ret = 0, selret;
server_instance_t *si = NULL; server_instance_t *si = NULL;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
char *buf = NULL; char *buf = NULL;
connsock_t *cs; connsock_t *cs;
gbtbase_t *gbt; gbtbase_t *gbt;
@ -188,7 +188,17 @@ reconnect:
si = live_server(ckp); si = live_server(ckp);
gbt = si->data; gbt = si->data;
cs = &si->cs; cs = &si->cs;
retry: retry:
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
if (interrupted()) if (interrupted())
@ -1163,10 +1173,10 @@ static void kill_proxy(proxy_instance_t *proxi)
static int proxy_loop(proc_instance_t *pi) static int proxy_loop(proc_instance_t *pi)
{ {
int sockd = -1, ret = 0, selret;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
proxy_instance_t *proxi; proxy_instance_t *proxi;
int sockd, ret = 0;
char *buf = NULL; char *buf = NULL;
reconnect: reconnect:
@ -1177,6 +1187,14 @@ reconnect:
send_proc(ckp->stratifier, "notify"); send_proc(ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
retry: retry:
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {

18
src/libckpool.c

@ -671,12 +671,9 @@ int wait_read_select(int sockd, int timeout)
tv_timeout.tv_sec = timeout; tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0; tv_timeout.tv_usec = 0;
retry:
FD_ZERO(&readfs); FD_ZERO(&readfs);
FD_SET(sockd, &readfs); FD_SET(sockd, &readfs);
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout);
if (unlikely(ret < 1 &&interrupted()))
goto retry;
return ret; return ret;
} }
@ -750,12 +747,9 @@ int wait_write_select(int sockd, int timeout)
tv_timeout.tv_sec = timeout; tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0; tv_timeout.tv_usec = 0;
retry:
FD_ZERO(&writefds); FD_ZERO(&writefds);
FD_SET(sockd, &writefds); FD_SET(sockd, &writefds);
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout);
if (unlikely(ret < 1 &&interrupted()))
goto retry;
return ret; return ret;
} }
@ -793,7 +787,7 @@ bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *fu
goto out; goto out;
} }
msglen = htole32(len); msglen = htole32(len);
ret = wait_write_select(sockd, 60); ret = wait_write_select(sockd, 5);
if (unlikely(ret < 1)) { if (unlikely(ret < 1)) {
LOGERR("Select1 failed in send_unix_msg"); LOGERR("Select1 failed in send_unix_msg");
goto out; goto out;
@ -803,7 +797,7 @@ bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *fu
LOGERR("Failed to write 4 byte length in send_unix_msg"); LOGERR("Failed to write 4 byte length in send_unix_msg");
goto out; goto out;
} }
ret = wait_write_select(sockd, 60); ret = wait_write_select(sockd, 5);
if (unlikely(ret < 1)) { if (unlikely(ret < 1)) {
LOGERR("Select2 failed in send_unix_msg"); LOGERR("Select2 failed in send_unix_msg");
goto out; goto out;
@ -836,6 +830,10 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out; goto out;
} }
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path); sockd = open_unix_client(path);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path); LOGWARNING("Failed to open socket %s", path);
@ -867,6 +865,10 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out; goto out;
} }
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path); sockd = open_unix_client(path);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path); LOGWARNING("Failed to open socket %s", path);

60
src/stratifier.c

@ -875,36 +875,36 @@ static void block_solve(ckpool_t *ckp)
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret; int sockd, ret = 0, selret = 0;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
tv_t timeout, *to;
char *buf = NULL; char *buf = NULL;
fd_set readfds; tv_t start_tv;
if (ckp->proxy) tv_time(&start_tv);
to = NULL;
else
to = &timeout;
reset:
timeout.tv_sec = ckp->update_interval;
timeout.tv_usec = 0;
retry: retry:
FD_ZERO(&readfds); do {
FD_SET(us->sockd, &readfds); if (!ckp->proxy) {
selret = select(us->sockd + 1, &readfds, NULL, NULL, to); double tdiff;
if (selret < 0) { tv_t end_tv;
if (interrupted())
goto retry; tv_time(&end_tv);
LOGERR("Select failed in strat_loop, killing stratifier!"); tdiff = tvdiff(&end_tv, &start_tv);
sleep(5); if (tdiff > ckp->update_interval) {
ret = 1; copy_tv(&start_tv, &end_tv);
goto out; LOGDEBUG("%ds elapsed in strat_loop, updating gbt base",
} ckp->update_interval);
if (!selret) { update_base(ckp);
LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", ckp->update_interval); continue;
update_base(ckp); }
goto reset; }
} selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
if (interrupted()) if (interrupted())
@ -935,19 +935,15 @@ retry:
goto out; goto out;
} else if (!strncasecmp(buf, "update", 6)) { } else if (!strncasecmp(buf, "update", 6)) {
update_base(ckp); update_base(ckp);
goto reset;
} else if (!strncasecmp(buf, "subscribe", 9)) { } else if (!strncasecmp(buf, "subscribe", 9)) {
/* Proxifier has a new subscription */ /* Proxifier has a new subscription */
if (!update_subscribe(ckp)) if (!update_subscribe(ckp))
goto out; goto out;
goto retry;
} else if (!strncasecmp(buf, "notify", 6)) { } else if (!strncasecmp(buf, "notify", 6)) {
/* Proxifier has a new notify ready */ /* Proxifier has a new notify ready */
update_notify(ckp); update_notify(ckp);
goto retry;
} else if (!strncasecmp(buf, "diff", 4)) { } else if (!strncasecmp(buf, "diff", 4)) {
update_diff(ckp); update_diff(ckp);
goto retry;
} else if (!strncasecmp(buf, "dropclient", 10)) { } else if (!strncasecmp(buf, "dropclient", 10)) {
int client_id; int client_id;
@ -956,11 +952,9 @@ retry:
LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf);
else else
drop_client(client_id); drop_client(client_id);
goto retry;
} else if (!strncasecmp(buf, "block", 5)) { } else if (!strncasecmp(buf, "block", 5)) {
block_solve(ckp); block_solve(ckp);
update_base(ckp); update_base(ckp);
goto reset;
} else { } else {
json_t *val = json_loads(buf, 0, NULL); json_t *val = json_loads(buf, 0, NULL);
@ -968,8 +962,8 @@ retry:
LOGWARNING("Received unrecognised message: %s", buf); LOGWARNING("Received unrecognised message: %s", buf);
} else } else
stratum_add_recvd(val); stratum_add_recvd(val);
goto retry;
} }
goto retry;
out: out:
dealloc(buf); dealloc(buf);

Loading…
Cancel
Save