From 3c965946e70ebbe2a9a6e24e6a4a8eccb637507e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 22 Feb 2016 14:40:38 +1100 Subject: [PATCH] Replace unnecessary multiprocess model with simpler thread based model for each component for future removal of duplication of work when communicating between threads --- src/ckpool.c | 320 ++++------------------------------------------- src/ckpool.h | 23 ++-- src/connector.c | 8 +- src/connector.h | 4 +- src/generator.c | 64 +++------- src/generator.h | 4 +- src/stratifier.c | 31 ++--- src/stratifier.h | 4 +- 8 files changed, 67 insertions(+), 391 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 84a85b01..be91cc91 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -227,7 +227,6 @@ static void *unix_receiver(void *arg) sockd = accept(rsockd, NULL, NULL); if (unlikely(sockd < 0)) { LOGEMERG("Failed to accept on %s socket, exiting", qname); - childsighandler(15); break; } buf = recv_unix_msg(sockd); @@ -283,17 +282,6 @@ void create_unix_receiver(proc_instance_t *pi) create_pthread(&pth, unix_receiver, pi); } -static void broadcast_proc(ckpool_t *ckp, const char *buf) -{ - int i; - - for (i = 0; i < ckp->proc_instances; i++) { - proc_instance_t *pi = ckp->children[i]; - - send_proc(pi, buf); - } -} - /* Put a sanity check on kill calls to make sure we are not sending them to * pid 0. */ static int kill_pid(const int pid, const int sig) @@ -318,63 +306,41 @@ static int pid_wait(const pid_t pid, const int ms) return ret; } -static int get_proc_pid(const proc_instance_t *pi) -{ - int ret, pid = 0; - char path[256]; - FILE *fp; - - sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); - fp = fopen(path, "re"); - if (!fp) - goto out; - ret = fscanf(fp, "%d", &pid); - if (ret < 1) - pid = 0; - fclose(fp); -out: - return pid; -} - -static int send_procmsg(proc_instance_t *pi, const char *buf) +static int _send_procmsg(proc_instance_t *pi, const char *buf, const char *file, const char *func, const int line) { char *path = pi->us.path; int ret = -1; int sockd; if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message %s to null path in send_proc", buf ? buf : ""); + LOGERR("Attempted to send message %s to null path in send_proc from %s %s:%d", + buf ? buf : "", file, func, line); goto out; } if (unlikely(!buf || !strlen(buf))) { - LOGERR("Attempted to send null message to socket %s in send_proc", path); - goto out; - } - if (unlikely(!pi->pid)) { - pi->pid = get_proc_pid(pi); - if (!pi->pid) - goto out; - - } - if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname); + LOGERR("Attempted to send null message to socket %s in send_proc from %s %s:%d", + path, file, func, line); goto out; } sockd = open_unix_client(path); if (unlikely(sockd < 0)) { - LOGWARNING("Failed to open socket %s in send_procmsg", path); + LOGWARNING("Failed to open socket %s in send_procmsg from %s %s:%d", + path, file, func, line); goto out; } if (unlikely(!send_unix_msg(sockd, buf))) - LOGWARNING("Failed to send %s to socket %s", buf, path); + LOGWARNING("Failed to send %s to socket %s from %s %s:%d", buf, + path, file, func, line); else ret = sockd; out: if (unlikely(ret == -1)) - LOGERR("Failure in send_procmsg"); + LOGERR("Failure in send_procmsg from %s %s:%d", file, func, line); return ret; } +#define send_procmsg(PI, BUF) _send_procmsg(&(PI), BUF, __FILE__, __func__, __LINE__) + static void api_message(ckpool_t *ckp, char **buf, int *sockd) { apimsg_t *apimsg = ckalloc(sizeof(apimsg_t)); @@ -429,7 +395,6 @@ retry: send_unix_msg(sockd, "Invalid"); } else { ckp->loglevel = loglevel; - broadcast_proc(ckp, buf); send_unix_msg(sockd, "success"); } } else if (cmdmatch(buf, "getxfd")) { @@ -495,19 +460,6 @@ out: return NULL; } -bool ping_main(ckpool_t *ckp) -{ - char *buf; - - if (unlikely(kill_pid(ckp->main.pid, 0))) - return false; - buf = send_recv_proc(&ckp->main, "ping"); - if (unlikely(!buf)) - return false; - free(buf); - return true; -} - void empty_buffer(connsock_t *cs) { if (cs->buf) @@ -719,7 +671,7 @@ out: /* Send a single message to a process instance when there will be no response, * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { char *path = pi->us.path; bool ret = false; @@ -735,20 +687,6 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch goto out; } - /* At startup the pid fields are not set up before some processes are - * forked so they never inherit them. */ - if (unlikely(!pi->pid)) { - pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - return; - } - } - if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s pid %d", - msg, pi->processname, pi->pid); - goto out; - } sockd = open_unix_client(path); if (unlikely(sockd < 0)) { LOGWARNING("Failed to open socket %s", path); @@ -766,7 +704,7 @@ out: /* Send a single message to a process instance and retrieve the response, then * close the socket. */ -char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, +char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, const char *file, const char *func, const int line) { char *path = pi->us.path, *buf = NULL; @@ -780,18 +718,6 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, in LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } - if (unlikely(!pi->pid)) { - pi->pid = get_proc_pid(pi); - if (!pi->pid) - goto out; - } - if (unlikely(kill_pid(pi->pid, 0))) { - /* Reset the pid value in case we are still looking for an old - * process */ - pi->pid = 0; - LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); - goto out; - } sockd = open_unix_client(path); if (unlikely(sockd < 0)) { LOGWARNING("Failed to open socket %s in send_recv_proc", path); @@ -1121,20 +1047,6 @@ static void rm_namepid(const proc_instance_t *pi) unlink(s); } -/* Disable signal handlers for child processes, but simply pass them onto the - * parent process to shut down cleanly. */ -void childsighandler(const int sig) -{ - signal(sig, SIG_IGN); - signal(SIGTERM, SIG_IGN); - if (sig != SIGUSR1) { - LOGWARNING("Child process received signal %d, forwarding signal to %s main process", - sig, global_ckp->name); - kill_pid(global_ckp->main.pid, sig); - } - exit(0); -} - static void launch_logger(const proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -1146,78 +1058,10 @@ static void launch_logger(const proc_instance_t *pi) ckp->logger = create_ckmsgq(ckp, loggername, &proclog); } -static void launch_process(proc_instance_t *pi) -{ - pid_t pid; - - pid = fork(); - if (pid < 0) - quit(1, "Failed to fork %s in launch_process", pi->processname); - if (!pid) { - struct sigaction handler; - int ret; - - json_set_alloc_funcs(json_ckalloc, free); - launch_logger(pi); - handler.sa_handler = &childsighandler; - handler.sa_flags = 0; - sigemptyset(&handler.sa_mask); - sigaction(SIGUSR1, &handler, NULL); - sigaction(SIGTERM, &handler, NULL); - signal(SIGINT, SIG_IGN); - - rename_proc(pi->processname); - write_namepid(pi); - ret = pi->process(pi); - close_unix_socket(pi->us.sockd, pi->us.path); - rm_namepid(pi); - exit(ret); - } - pi->pid = pid; -} - -static void launch_processes(const ckpool_t *ckp) -{ - int i; - - for (i = 0; i < ckp->proc_instances; i++) - launch_process(ckp->children[i]); -} - -int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret) -{ - if (ret) { - /* Abnormal termination, kill entire process */ - LOGWARNING("%s %s exiting with return code %d, shutting down!", - ckp->name, pi->processname, ret); - send_proc(&ckp->main, "shutdown"); - cksleep_ms(100); - ret = 1; - } else /* Should be part of a normal shutdown */ - LOGNOTICE("%s %s exited normally", ckp->name, pi->processname); - - return ret; -} - static void clean_up(ckpool_t *ckp) { - int i, children = ckp->proc_instances; - rm_namepid(&ckp->main); dealloc(ckp->socket_dir); - ckp->proc_instances = 0; - for (i = 0; i < children; i++) - dealloc(ckp->children[i]); - dealloc(ckp->children); -} - -static void cancel_join_pthread(pthread_t *pth) -{ - if (!pth || !*pth) - return; - pthread_cancel(*pth); - join_pthread(*pth); - pth = NULL; } static void cancel_pthread(pthread_t *pth) @@ -1228,54 +1072,15 @@ static void cancel_pthread(pthread_t *pth) pth = NULL; } -static void wait_child(const pid_t *pid) -{ - int ret; - - do { - ret = waitpid(*pid, NULL, 0); - } while (ret != *pid); -} - -static void __shutdown_children(ckpool_t *ckp) -{ - int i; - - cancel_join_pthread(&ckp->pth_watchdog); - - /* They never got set up in the first place */ - if (!ckp->children) - return; - - /* Send the children a SIGUSR1 for them to shutdown gracefully, then - * wait for them to exit and kill them if they don't for 500ms. */ - for (i = 0; i < ckp->proc_instances; i++) { - pid_t pid = ckp->children[i]->pid; - - kill_pid(pid, SIGUSR1); - if (!ck_completion_timeout(&wait_child, (void *)&pid, 500)) - kill_pid(pid, SIGKILL); - } -} - -static void shutdown_children(ckpool_t *ckp) -{ - cancel_join_pthread(&ckp->pth_watchdog); - - __shutdown_children(ckp); -} - static void sighandler(const int sig) { ckpool_t *ckp = global_ckp; signal(sig, SIG_IGN); signal(SIGTERM, SIG_IGN); - LOGWARNING("Parent process %s received signal %d, shutting down", + LOGWARNING("Process %s received signal %d, shutting down", ckp->name, sig); - cancel_join_pthread(&ckp->pth_watchdog); - __shutdown_children(ckp); cancel_pthread(&ckp->pth_listener); exit(0); } @@ -1651,7 +1456,7 @@ static void parse_config(ckpool_t *ckp) json_decref(json_conf); } -static void manage_old_child(ckpool_t *ckp, proc_instance_t *pi) +static void manage_old_instance(ckpool_t *ckp, proc_instance_t *pi) { struct stat statbuf; char path[256]; @@ -1679,85 +1484,13 @@ static void manage_old_child(ckpool_t *ckp, proc_instance_t *pi) } } -static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *name) +static void prepare_child(ckpool_t *ckp, proc_instance_t *pi, void *process, char *name) { - proc_instance_t *pi = ckzalloc(sizeof(proc_instance_t)); - - ckp->children = realloc(ckp->children, sizeof(proc_instance_t *) * (ckp->proc_instances + 1)); - ckp->children[ckp->proc_instances++] = pi; pi->ckp = ckp; pi->processname = name; pi->sockname = pi->processname; - pi->process = process; create_process_unixsock(pi); - manage_old_child(ckp, pi); - /* Remove the old pid file if we've succeeded in coming this far */ - rm_namepid(pi); - return pi; -} - -static proc_instance_t *child_by_pid(const ckpool_t *ckp, const pid_t pid) -{ - proc_instance_t *pi = NULL; - int i; - - for (i = 0; i < ckp->proc_instances; i++) { - if (ckp->children[i]->pid == pid) { - pi = ckp->children[i]; - break; - } - } - return pi; -} - -static void *watchdog(void *arg) -{ -#if 0 - time_t last_relaunch_t = time(NULL); -#endif - ckpool_t *ckp = (ckpool_t *)arg; - - rename_proc("watchdog"); - sleep(1); - while (42) { - proc_instance_t *pi; -#if 0 - time_t relaunch_t; -#endif - int pid, status; - - pid = waitpid(0, &status, 0); - pi = child_by_pid(ckp, pid); - if (pi && WIFEXITED(status)) { - LOGWARNING("Child process %s exited, terminating!", pi->processname); - break; - } -#if 0 - /* Don't bother trying to respawn for now since communication - * breakdown between the processes will make them exit. */ - relaunch_t = time(NULL); - if (relaunch_t == last_relaunch_t) { - LOGEMERG("Respawning processes too fast, exiting!"); - break; - } - last_relaunch_t = relaunch_t; - if (pi) { - LOGERR("%s process dead! Relaunching", pi->processname); - launch_process(pi); - } else { - LOGEMERG("Unknown child process %d dead, exiting!", pid); - break; - } -#else - if (pi) - LOGEMERG("%s process dead, terminating!", pi->processname); - else - LOGEMERG("Unknown child process %d dead, exiting!", pid); - break; -#endif - } - send_proc(&ckp->main, "shutdown"); - return NULL; + create_pthread(&pi->pth_process, process, pi); } #ifdef USE_CKDB @@ -2130,7 +1863,7 @@ int main(int argc, char **argv) } if (!ckp.handover) - manage_old_child(&ckp, &ckp.main); + manage_old_instance(&ckp, &ckp.main); write_namepid(&ckp.main); open_process_sock(&ckp, &ckp.main, &ckp.main.us); launch_logger(&ckp.main); @@ -2150,26 +1883,21 @@ int main(int argc, char **argv) ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); create_pthread(&ckp.pth_listener, listener, &ckp.main); - /* Launch separate processes from here */ - ckp.generator = prepare_child(&ckp, &generator, "generator"); - ckp.stratifier = prepare_child(&ckp, &stratifier, "stratifier"); - ckp.connector = prepare_child(&ckp, &connector, "connector"); - - launch_processes(&ckp); - - create_pthread(&ckp.pth_watchdog, watchdog, &ckp); - handler.sa_handler = &sighandler; handler.sa_flags = 0; sigemptyset(&handler.sa_mask); sigaction(SIGTERM, &handler, NULL); sigaction(SIGINT, &handler, NULL); + /* Launch separate processes from here */ + prepare_child(&ckp, &ckp.generator, generator, "generator"); + prepare_child(&ckp, &ckp.stratifier, stratifier, "stratifier"); + prepare_child(&ckp, &ckp.connector, connector, "connector"); + /* Shutdown from here if the listener is sent a shutdown message */ if (ckp.pth_listener) join_pthread(ckp.pth_listener); - shutdown_children(&ckp); clean_up(&ckp); return 0; diff --git a/src/ckpool.h b/src/ckpool.h index eb78eb55..e7ebfd29 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -63,7 +63,7 @@ struct proc_instance { char *sockname; int pid; int oldpid; - int (*process)(proc_instance_t *); + pthread_t pth_process; /* Linked list of received messages, locking and conditional */ unix_msg_t *unix_msgs; @@ -169,17 +169,14 @@ struct ckpool_instance { /* API message queue */ ckmsgq_t *ckpapi; - /* Logger message queue NOTE: Unique per process */ + /* Logger message queue */ ckmsgq_t *logger; /* Process instance data of parent/child processes */ proc_instance_t main; - int proc_instances; - proc_instance_t **children; - - proc_instance_t *generator; - proc_instance_t *stratifier; - proc_instance_t *connector; + proc_instance_t generator; + proc_instance_t stratifier; + proc_instance_t connector; /* Threads of main process */ pthread_t pth_listener; @@ -329,11 +326,11 @@ void empty_buffer(connsock_t *cs); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int read_socket_line(connsock_t *cs, float *timeout); -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) -char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, +void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define send_proc(pi, msg) _send_proc(&(pi), msg, __FILE__, __func__, __LINE__) +char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, const char *file, const char *func, const int line); -#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__) +#define send_recv_proc(pi, msg) _send_recv_proc(&(pi), msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, const char *func, @@ -344,8 +341,6 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); bool send_json_msg(connsock_t *cs, const json_t *json_msg); json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val); -void childsighandler(const int sig); -int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); bool json_get_int(int *store, const json_t *val, const char *res); diff --git a/src/connector.c b/src/connector.c index 65819160..5f1b1f06 100644 --- a/src/connector.c +++ b/src/connector.c @@ -685,7 +685,6 @@ static void *receiver(void *arg) } out: /* We shouldn't get here unless there's an error */ - childsighandler(15); return NULL; } @@ -794,7 +793,6 @@ static void *sender(void *arg) mutex_unlock(&cdata->sender_lock); } /* We shouldn't get here unless there's an error */ - childsighandler(15); return NULL; } @@ -1466,13 +1464,15 @@ out: return ret; } -int connector(proc_instance_t *pi) +void *connector(void *arg) { + proc_instance_t *pi = (proc_instance_t *)arg; cdata_t *cdata = ckzalloc(sizeof(cdata_t)); int threads, sockd, ret = 0, i, tries = 0; ckpool_t *ckp = pi->ckp; const int on = 1; + rename_proc(pi->processname); LOGWARNING("%s connector starting", ckp->name); ckp->cdata = cdata; cdata->ckp = ckp; @@ -1591,5 +1591,5 @@ int connector(proc_instance_t *pi) ret = connector_loop(pi, cdata); out: dealloc(ckp->cdata); - return process_exit(ckp, pi, ret); + return NULL; } diff --git a/src/connector.h b/src/connector.h index e121d2be..e113211e 100644 --- a/src/connector.h +++ b/src/connector.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2016 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -10,6 +10,6 @@ #ifndef CONNECTOR_H #define CONNECTOR_H -int connector(proc_instance_t *pi); +void *connector(void *arg); #endif /* CONNECTOR_H */ diff --git a/src/generator.c b/src/generator.c index 3efff2da..d5986a69 100644 --- a/src/generator.c +++ b/src/generator.c @@ -232,9 +232,6 @@ static server_instance_t *live_server(ckpool_t *ckp) LOGDEBUG("Attempting to connect to bitcoind"); retry: - if (!ping_main(ckp)) - goto out; - /* First find a server that is already flagged alive if possible * without blocking on server_alive() */ for (i = 0; i < ckp->btcds; i++) { @@ -262,7 +259,6 @@ retry: living: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); -out: send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -294,7 +290,7 @@ static void clear_unix_msg(unix_msg_t **umsg) } } -static int gen_loop(proc_instance_t *pi) +static void gen_loop(proc_instance_t *pi) { server_instance_t *si = NULL, *old_si; unix_msg_t *umsg = NULL; @@ -304,7 +300,6 @@ static int gen_loop(proc_instance_t *pi) connsock_t *cs; gbtbase_t *gbt; char hash[68]; - int ret = 0; reconnect: clear_unix_msg(&umsg); @@ -329,11 +324,6 @@ retry: do { umsg = get_unix_msg(pi); - if (unlikely(!umsg &&!ping_main(ckp))) { - LOGEMERG("Generator failed to ping main process, exiting"); - ret = 1; - goto out; - } } while (!umsg); if (unlikely(!si->alive)) { @@ -343,10 +333,6 @@ retry: buf = umsg->buf; LOGDEBUG("Generator received request: %s", buf); - if (cmdmatch(buf, "shutdown")) { - ret = 0; - goto out; - } if (cmdmatch(buf, "getbase")) { if (!gen_gbtbase(cs, gbt)) { LOGWARNING("Failed to get block template from %s:%s", @@ -418,7 +404,6 @@ retry: out: kill_server(si); - return ret; } static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy) @@ -2223,9 +2208,6 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) int retries = 0; while (42) { - if (!ping_main(ckp)) - break; - mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { if (proxi->disabled || !proxi->global) @@ -2678,7 +2660,7 @@ static void parse_globaluser(ckpool_t *ckp, gdata_t *gdata, const char *buf) add_userproxy(ckp, gdata, userid, url, username, pass); } -static int proxy_loop(proc_instance_t *pi) +static void proxy_loop(proc_instance_t *pi) { proxy_instance_t *proxi = NULL, *cproxy; server_instance_t *si = NULL, *old_si; @@ -2688,7 +2670,6 @@ static int proxy_loop(proc_instance_t *pi) connsock_t *cs = NULL; bool started = false; char *buf = NULL; - int ret = 0; reconnect: clear_unix_msg(&umsg); @@ -2724,11 +2705,6 @@ retry: clear_unix_msg(&umsg); do { umsg = get_unix_msg(pi); - if (unlikely(!umsg &&!ping_main(ckp))) { - LOGEMERG("Generator failed to ping main process, exiting"); - ret = 1; - goto out; - } } while (!umsg); buf = umsg->buf; @@ -2763,9 +2739,6 @@ retry: parse_proxystats(gdata, umsg->sockd, buf + 11); } else if (cmdmatch(buf, "globaluser")) { parse_globaluser(ckp, gdata, buf + 11); - } else if (cmdmatch(buf, "shutdown")) { - ret = 0; - goto out; } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { @@ -2791,7 +2764,7 @@ retry: } goto retry; out: - return ret; + return; } /* Check which servers are alive, maintaining a connection with them and @@ -2849,13 +2822,13 @@ static void setup_servers(ckpool_t *ckp) create_pthread(&pth_watchdog, server_watchdog, ckp); } -static int server_mode(ckpool_t *ckp, proc_instance_t *pi) +static void server_mode(ckpool_t *ckp, proc_instance_t *pi) { - int i, ret; + int i; setup_servers(ckp); - ret = gen_loop(pi); + gen_loop(pi); for (i = 0; i < ckp->btcds; i++) { server_instance_t *si = ckp->servers[i]; @@ -2864,7 +2837,6 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) dealloc(si); } dealloc(ckp->servers); - return ret; } static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id) @@ -2888,11 +2860,11 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id return proxy; } -static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) +static void proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { gdata_t *gdata = ckp->gdata; proxy_instance_t *proxy; - int i, ret; + int i; mutex_init(&gdata->lock); mutex_init(&gdata->notify_lock); @@ -2916,17 +2888,16 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) } } - ret = proxy_loop(pi); - - return ret; + proxy_loop(pi); } -int generator(proc_instance_t *pi) +void *generator(void *arg) { + proc_instance_t *pi = (proc_instance_t *)arg; ckpool_t *ckp = pi->ckp; gdata_t *gdata; - int ret; + rename_proc(pi->processname); LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->gdata = gdata; @@ -2938,18 +2909,13 @@ int generator(proc_instance_t *pi) /* Wait for the stratifier to be ready for us */ do { - if (!ping_main(ckp)) { - ret = 1; - goto out; - } cksleep_ms(10); buf = send_recv_proc(ckp->stratifier, "ping"); } while (!buf); dealloc(buf); - ret = proxy_mode(ckp, pi); + proxy_mode(ckp, pi); } else - ret = server_mode(ckp, pi); -out: + server_mode(ckp, pi); dealloc(ckp->gdata); - return process_exit(ckp, pi, ret); + return NULL; } diff --git a/src/generator.h b/src/generator.h index 8b7aa875..2e6bdf2a 100644 --- a/src/generator.h +++ b/src/generator.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2016 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -12,6 +12,6 @@ #include "config.h" -int generator(proc_instance_t *pi); +void *generator(void *arg); #endif /* GENERATOR_H */ diff --git a/src/stratifier.c b/src/stratifier.c index c6cd4214..10daa208 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1074,7 +1074,7 @@ static char *__send_recv_generator(ckpool_t *ckp, const char *msg, const int pri set = true; } else set = false; - buf = _send_recv_proc(ckp->generator, msg, UNIX_WRITE_TIMEOUT, RPC_TIMEOUT, __FILE__, __func__, __LINE__); + buf = _send_recv_proc(&ckp->generator, msg, UNIX_WRITE_TIMEOUT, RPC_TIMEOUT, __FILE__, __func__, __LINE__); if (unlikely(!buf)) buf = strdup("failed"); if (set) @@ -3937,7 +3937,7 @@ static void ckdbq_flush(sdata_t *sdata) LOGWARNING("Flushed %d messages from ckdb queue", flushed); } -static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) +static void stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { sdata_t *sdata = ckp->sdata; unix_msg_t *umsg = NULL; @@ -3968,11 +3968,6 @@ retry: } umsg = get_unix_msg(pi); - if (unlikely(!umsg &&!ping_main(ckp))) { - LOGEMERG("Stratifier failed to ping main process, exiting"); - ret = 1; - goto out; - } } while (!umsg); buf = umsg->buf; @@ -4060,10 +4055,7 @@ retry: Close(umsg->sockd); LOGDEBUG("Stratifier received request: %s", buf); - if (cmdmatch(buf, "shutdown")) { - ret = 0; - goto out; - } else if (cmdmatch(buf, "update")) { + if (cmdmatch(buf, "update")) { update_base(ckp, GEN_PRIORITY); } else if (cmdmatch(buf, "subscribe")) { /* Proxifier has a new subscription */ @@ -4106,9 +4098,6 @@ retry: } else LOGWARNING("Unhandled stratifier message: %s", buf); goto retry; - -out: - return ret; } static void *blockupdate(void *arg) @@ -7519,15 +7508,17 @@ static bool script_address(const char *btcaddress) return btcaddress[0] == '3'; } -int stratifier(proc_instance_t *pi) +void *stratifier(void *arg) { + proc_instance_t *pi = (proc_instance_t *)arg; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; ckpool_t *ckp = pi->ckp; - int ret = 1, threads; int64_t randomiser; char *buf = NULL; sdata_t *sdata; + int threads; + rename_proc(pi->processname); LOGWARNING("%s stratifier starting", ckp->name); sdata = ckzalloc(sizeof(sdata_t)); ckp->sdata = sdata; @@ -7536,10 +7527,6 @@ int stratifier(proc_instance_t *pi) /* Wait for the generator to have something for us */ do { - if (!ping_main(ckp)) { - ret = 1; - goto out; - } if (ckp->proxy) break; buf = send_recv_proc(ckp->generator, "ping"); @@ -7622,7 +7609,7 @@ int stratifier(proc_instance_t *pi) LOGWARNING("%s stratifier ready", ckp->name); - ret = stratum_loop(ckp, pi); + stratum_loop(ckp, pi); out: if (ckp->proxy) { proxy_t *proxy, *tmpproxy; @@ -7635,5 +7622,5 @@ out: mutex_unlock(&sdata->proxy_lock); } dealloc(ckp->sdata); - return process_exit(ckp, pi, ret); + return NULL; } diff --git a/src/stratifier.h b/src/stratifier.h index 0f41b91c..096614f1 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2016 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -10,6 +10,6 @@ #ifndef STRATIFIER_H #define STRATIFIER_H -int stratifier(proc_instance_t *pi); +void *stratifier(void *arg); #endif /* STRATIFIER_H */