From 86e70edfbc6509deacb2d267baf86c17a46faf27 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 3 Jun 2014 14:04:09 +1000 Subject: [PATCH] Make all child processes ping the parent process at regular intervals and then die if it has disappeared. Do not attempt to send unix messages to dead processes and abort instantly on socket EINTR --- src/ckpool.c | 11 +++++++++ src/ckpool.h | 1 + src/connector.c | 11 ++++++++- src/generator.c | 22 ++++++++++++++++-- src/libckpool.c | 18 ++++++++------- src/stratifier.c | 60 ++++++++++++++++++++++-------------------------- 6 files changed, 79 insertions(+), 44 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 02a63059..3af586fe 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -71,6 +71,17 @@ out: return NULL; } +bool ping_main(ckpool_t *ckp) +{ + char *buf; + + buf = send_recv_proc(&ckp->main, "ping"); + if (!buf) + return false; + free(buf); + return true; +} + /* Open the file in path, check if there is a pid in there that still exists * and if not, write the pid into that file. */ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid) diff --git a/src/ckpool.h b/src/ckpool.h index f33b6020..dff9c54a 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -108,6 +108,7 @@ struct ckpool_instance { ckpool_t *global_ckp; +bool ping_main(ckpool_t *ckp); int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret); #define ASPRINTF(strp, fmt, ...) do { \ diff --git a/src/connector.c b/src/connector.c index a1fb13c2..85f7ddb0 100644 --- a/src/connector.c +++ b/src/connector.c @@ -431,11 +431,20 @@ static client_instance_t *client_by_id(conn_instance_t *ci, int id) static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) { - int sockd, client_id, ret = 0; + int sockd, client_id, ret = 0, selret; unixsock_t *us = &pi->us; + ckpool_t *ckp = pi->ckp; char *buf = NULL; json_t *json_msg; + do { + selret = wait_read_select(us->sockd, 5); + if (!selret && !ping_main(ckp)) { + LOGEMERG("Connector failed to ping main process, exiting"); + ret = 1; + goto out; + } + } while (selret < 1); retry: sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { diff --git a/src/generator.c b/src/generator.c index 0dc724d1..f1478b8c 100644 --- a/src/generator.c +++ b/src/generator.c @@ -173,10 +173,10 @@ static void kill_server(server_instance_t *si) static int gen_loop(proc_instance_t *pi) { + int sockd = -1, ret = 0, selret; server_instance_t *si = NULL; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; - int sockd, ret = 0; char *buf = NULL; connsock_t *cs; gbtbase_t *gbt; @@ -188,7 +188,17 @@ reconnect: si = live_server(ckp); gbt = si->data; cs = &si->cs; + retry: + do { + selret = wait_read_select(us->sockd, 5); + if (!selret && !ping_main(ckp)) { + LOGEMERG("Generator failed to ping main process, exiting"); + ret = 1; + goto out; + } + } while (selret < 1); + sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { if (interrupted()) @@ -1163,10 +1173,10 @@ static void kill_proxy(proxy_instance_t *proxi) static int proxy_loop(proc_instance_t *pi) { + int sockd = -1, ret = 0, selret; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; proxy_instance_t *proxi; - int sockd, ret = 0; char *buf = NULL; reconnect: @@ -1177,6 +1187,14 @@ reconnect: send_proc(ckp->stratifier, "notify"); proxi->notified = false; + do { + selret = wait_read_select(us->sockd, 5); + if (!selret && !ping_main(ckp)) { + LOGEMERG("Generator failed to ping main process, exiting"); + ret = 1; + goto out; + } + } while (selret < 1); retry: sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { diff --git a/src/libckpool.c b/src/libckpool.c index e04ecf08..6cc3078c 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -671,12 +671,9 @@ int wait_read_select(int sockd, int timeout) tv_timeout.tv_sec = timeout; tv_timeout.tv_usec = 0; -retry: FD_ZERO(&readfs); FD_SET(sockd, &readfs); ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); - if (unlikely(ret < 1 &&interrupted())) - goto retry; return ret; } @@ -750,12 +747,9 @@ int wait_write_select(int sockd, int timeout) tv_timeout.tv_sec = timeout; tv_timeout.tv_usec = 0; -retry: FD_ZERO(&writefds); FD_SET(sockd, &writefds); ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); - if (unlikely(ret < 1 &&interrupted())) - goto retry; return ret; } @@ -793,7 +787,7 @@ bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *fu goto out; } msglen = htole32(len); - ret = wait_write_select(sockd, 60); + ret = wait_write_select(sockd, 5); if (unlikely(ret < 1)) { LOGERR("Select1 failed in send_unix_msg"); goto out; @@ -803,7 +797,7 @@ bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *fu LOGERR("Failed to write 4 byte length in send_unix_msg"); goto out; } - ret = wait_write_select(sockd, 60); + ret = wait_write_select(sockd, 5); if (unlikely(ret < 1)) { LOGERR("Select2 failed in send_unix_msg"); goto out; @@ -836,6 +830,10 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(kill(pi->pid, 0))) { + LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); + goto out; + } sockd = open_unix_client(path); if (unlikely(sockd < 0)) { LOGWARNING("Failed to open socket %s", path); @@ -867,6 +865,10 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(kill(pi->pid, 0))) { + LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); + goto out; + } sockd = open_unix_client(path); if (unlikely(sockd < 0)) { LOGWARNING("Failed to open socket %s", path); diff --git a/src/stratifier.c b/src/stratifier.c index 53269c44..f3fe08df 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -875,36 +875,36 @@ static void block_solve(ckpool_t *ckp) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { - int sockd, ret = 0, selret; + int sockd, ret = 0, selret = 0; unixsock_t *us = &pi->us; - tv_t timeout, *to; char *buf = NULL; - fd_set readfds; + tv_t start_tv; - if (ckp->proxy) - to = NULL; - else - to = &timeout; -reset: - timeout.tv_sec = ckp->update_interval; - timeout.tv_usec = 0; + tv_time(&start_tv); retry: - FD_ZERO(&readfds); - FD_SET(us->sockd, &readfds); - selret = select(us->sockd + 1, &readfds, NULL, NULL, to); - if (selret < 0) { - if (interrupted()) - goto retry; - LOGERR("Select failed in strat_loop, killing stratifier!"); - sleep(5); - ret = 1; - goto out; - } - if (!selret) { - LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", ckp->update_interval); - update_base(ckp); - goto reset; - } + do { + if (!ckp->proxy) { + double tdiff; + tv_t end_tv; + + tv_time(&end_tv); + tdiff = tvdiff(&end_tv, &start_tv); + if (tdiff > ckp->update_interval) { + copy_tv(&start_tv, &end_tv); + LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", + ckp->update_interval); + update_base(ckp); + continue; + } + } + selret = wait_read_select(us->sockd, 5); + if (!selret && !ping_main(ckp)) { + LOGEMERG("Generator failed to ping main process, exiting"); + ret = 1; + goto out; + } + } while (selret < 1); + sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { if (interrupted()) @@ -935,19 +935,15 @@ retry: goto out; } else if (!strncasecmp(buf, "update", 6)) { update_base(ckp); - goto reset; } else if (!strncasecmp(buf, "subscribe", 9)) { /* Proxifier has a new subscription */ if (!update_subscribe(ckp)) goto out; - goto retry; } else if (!strncasecmp(buf, "notify", 6)) { /* Proxifier has a new notify ready */ update_notify(ckp); - goto retry; } else if (!strncasecmp(buf, "diff", 4)) { update_diff(ckp); - goto retry; } else if (!strncasecmp(buf, "dropclient", 10)) { int client_id; @@ -956,11 +952,9 @@ retry: LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); else drop_client(client_id); - goto retry; } else if (!strncasecmp(buf, "block", 5)) { block_solve(ckp); update_base(ckp); - goto reset; } else { json_t *val = json_loads(buf, 0, NULL); @@ -968,8 +962,8 @@ retry: LOGWARNING("Received unrecognised message: %s", buf); } else stratum_add_recvd(val); - goto retry; } + goto retry; out: dealloc(buf);