Browse Source

Replace unnecessary multiprocess model with simpler thread based model for each component for future removal of duplication of work when communicating between threads

master
Con Kolivas 9 years ago
parent
commit
3c965946e7
  1. 320
      src/ckpool.c
  2. 23
      src/ckpool.h
  3. 8
      src/connector.c
  4. 4
      src/connector.h
  5. 64
      src/generator.c
  6. 4
      src/generator.h
  7. 31
      src/stratifier.c
  8. 4
      src/stratifier.h

320
src/ckpool.c

@ -227,7 +227,6 @@ static void *unix_receiver(void *arg)
sockd = accept(rsockd, NULL, NULL); sockd = accept(rsockd, NULL, NULL);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGEMERG("Failed to accept on %s socket, exiting", qname); LOGEMERG("Failed to accept on %s socket, exiting", qname);
childsighandler(15);
break; break;
} }
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
@ -283,17 +282,6 @@ void create_unix_receiver(proc_instance_t *pi)
create_pthread(&pth, unix_receiver, pi); create_pthread(&pth, unix_receiver, pi);
} }
static void broadcast_proc(ckpool_t *ckp, const char *buf)
{
int i;
for (i = 0; i < ckp->proc_instances; i++) {
proc_instance_t *pi = ckp->children[i];
send_proc(pi, buf);
}
}
/* Put a sanity check on kill calls to make sure we are not sending them to /* Put a sanity check on kill calls to make sure we are not sending them to
* pid 0. */ * pid 0. */
static int kill_pid(const int pid, const int sig) static int kill_pid(const int pid, const int sig)
@ -318,63 +306,41 @@ static int pid_wait(const pid_t pid, const int ms)
return ret; return ret;
} }
static int get_proc_pid(const proc_instance_t *pi) static int _send_procmsg(proc_instance_t *pi, const char *buf, const char *file, const char *func, const int line)
{
int ret, pid = 0;
char path[256];
FILE *fp;
sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname);
fp = fopen(path, "re");
if (!fp)
goto out;
ret = fscanf(fp, "%d", &pid);
if (ret < 1)
pid = 0;
fclose(fp);
out:
return pid;
}
static int send_procmsg(proc_instance_t *pi, const char *buf)
{ {
char *path = pi->us.path; char *path = pi->us.path;
int ret = -1; int ret = -1;
int sockd; int sockd;
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", buf ? buf : ""); LOGERR("Attempted to send message %s to null path in send_proc from %s %s:%d",
buf ? buf : "", file, func, line);
goto out; goto out;
} }
if (unlikely(!buf || !strlen(buf))) { if (unlikely(!buf || !strlen(buf))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc from %s %s:%d",
goto out; path, file, func, line);
}
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname);
goto out; goto out;
} }
sockd = open_unix_client(path); sockd = open_unix_client(path);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s in send_procmsg", path); LOGWARNING("Failed to open socket %s in send_procmsg from %s %s:%d",
path, file, func, line);
goto out; goto out;
} }
if (unlikely(!send_unix_msg(sockd, buf))) if (unlikely(!send_unix_msg(sockd, buf)))
LOGWARNING("Failed to send %s to socket %s", buf, path); LOGWARNING("Failed to send %s to socket %s from %s %s:%d", buf,
path, file, func, line);
else else
ret = sockd; ret = sockd;
out: out:
if (unlikely(ret == -1)) if (unlikely(ret == -1))
LOGERR("Failure in send_procmsg"); LOGERR("Failure in send_procmsg from %s %s:%d", file, func, line);
return ret; return ret;
} }
#define send_procmsg(PI, BUF) _send_procmsg(&(PI), BUF, __FILE__, __func__, __LINE__)
static void api_message(ckpool_t *ckp, char **buf, int *sockd) static void api_message(ckpool_t *ckp, char **buf, int *sockd)
{ {
apimsg_t *apimsg = ckalloc(sizeof(apimsg_t)); apimsg_t *apimsg = ckalloc(sizeof(apimsg_t));
@ -429,7 +395,6 @@ retry:
send_unix_msg(sockd, "Invalid"); send_unix_msg(sockd, "Invalid");
} else { } else {
ckp->loglevel = loglevel; ckp->loglevel = loglevel;
broadcast_proc(ckp, buf);
send_unix_msg(sockd, "success"); send_unix_msg(sockd, "success");
} }
} else if (cmdmatch(buf, "getxfd")) { } else if (cmdmatch(buf, "getxfd")) {
@ -495,19 +460,6 @@ out:
return NULL; return NULL;
} }
bool ping_main(ckpool_t *ckp)
{
char *buf;
if (unlikely(kill_pid(ckp->main.pid, 0)))
return false;
buf = send_recv_proc(&ckp->main, "ping");
if (unlikely(!buf))
return false;
free(buf);
return true;
}
void empty_buffer(connsock_t *cs) void empty_buffer(connsock_t *cs)
{ {
if (cs->buf) if (cs->buf)
@ -719,7 +671,7 @@ out:
/* Send a single message to a process instance when there will be no response, /* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */ * closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{ {
char *path = pi->us.path; char *path = pi->us.path;
bool ret = false; bool ret = false;
@ -735,20 +687,6 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
goto out; goto out;
} }
/* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
return;
}
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
goto out;
}
sockd = open_unix_client(path); sockd = open_unix_client(path);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path); LOGWARNING("Failed to open socket %s", path);
@ -766,7 +704,7 @@ out:
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then
* close the socket. */ * close the socket. */
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,
const char *file, const char *func, const int line) const char *file, const char *func, const int line)
{ {
char *path = pi->us.path, *buf = NULL; char *path = pi->us.path, *buf = NULL;
@ -780,18 +718,6 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, in
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out; goto out;
} }
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) {
/* Reset the pid value in case we are still looking for an old
* process */
pi->pid = 0;
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path); sockd = open_unix_client(path);
if (unlikely(sockd < 0)) { if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s in send_recv_proc", path); LOGWARNING("Failed to open socket %s in send_recv_proc", path);
@ -1121,20 +1047,6 @@ static void rm_namepid(const proc_instance_t *pi)
unlink(s); unlink(s);
} }
/* Disable signal handlers for child processes, but simply pass them onto the
* parent process to shut down cleanly. */
void childsighandler(const int sig)
{
signal(sig, SIG_IGN);
signal(SIGTERM, SIG_IGN);
if (sig != SIGUSR1) {
LOGWARNING("Child process received signal %d, forwarding signal to %s main process",
sig, global_ckp->name);
kill_pid(global_ckp->main.pid, sig);
}
exit(0);
}
static void launch_logger(const proc_instance_t *pi) static void launch_logger(const proc_instance_t *pi)
{ {
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
@ -1146,78 +1058,10 @@ static void launch_logger(const proc_instance_t *pi)
ckp->logger = create_ckmsgq(ckp, loggername, &proclog); ckp->logger = create_ckmsgq(ckp, loggername, &proclog);
} }
static void launch_process(proc_instance_t *pi)
{
pid_t pid;
pid = fork();
if (pid < 0)
quit(1, "Failed to fork %s in launch_process", pi->processname);
if (!pid) {
struct sigaction handler;
int ret;
json_set_alloc_funcs(json_ckalloc, free);
launch_logger(pi);
handler.sa_handler = &childsighandler;
handler.sa_flags = 0;
sigemptyset(&handler.sa_mask);
sigaction(SIGUSR1, &handler, NULL);
sigaction(SIGTERM, &handler, NULL);
signal(SIGINT, SIG_IGN);
rename_proc(pi->processname);
write_namepid(pi);
ret = pi->process(pi);
close_unix_socket(pi->us.sockd, pi->us.path);
rm_namepid(pi);
exit(ret);
}
pi->pid = pid;
}
static void launch_processes(const ckpool_t *ckp)
{
int i;
for (i = 0; i < ckp->proc_instances; i++)
launch_process(ckp->children[i]);
}
int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret)
{
if (ret) {
/* Abnormal termination, kill entire process */
LOGWARNING("%s %s exiting with return code %d, shutting down!",
ckp->name, pi->processname, ret);
send_proc(&ckp->main, "shutdown");
cksleep_ms(100);
ret = 1;
} else /* Should be part of a normal shutdown */
LOGNOTICE("%s %s exited normally", ckp->name, pi->processname);
return ret;
}
static void clean_up(ckpool_t *ckp) static void clean_up(ckpool_t *ckp)
{ {
int i, children = ckp->proc_instances;
rm_namepid(&ckp->main); rm_namepid(&ckp->main);
dealloc(ckp->socket_dir); dealloc(ckp->socket_dir);
ckp->proc_instances = 0;
for (i = 0; i < children; i++)
dealloc(ckp->children[i]);
dealloc(ckp->children);
}
static void cancel_join_pthread(pthread_t *pth)
{
if (!pth || !*pth)
return;
pthread_cancel(*pth);
join_pthread(*pth);
pth = NULL;
} }
static void cancel_pthread(pthread_t *pth) static void cancel_pthread(pthread_t *pth)
@ -1228,54 +1072,15 @@ static void cancel_pthread(pthread_t *pth)
pth = NULL; pth = NULL;
} }
static void wait_child(const pid_t *pid)
{
int ret;
do {
ret = waitpid(*pid, NULL, 0);
} while (ret != *pid);
}
static void __shutdown_children(ckpool_t *ckp)
{
int i;
cancel_join_pthread(&ckp->pth_watchdog);
/* They never got set up in the first place */
if (!ckp->children)
return;
/* Send the children a SIGUSR1 for them to shutdown gracefully, then
* wait for them to exit and kill them if they don't for 500ms. */
for (i = 0; i < ckp->proc_instances; i++) {
pid_t pid = ckp->children[i]->pid;
kill_pid(pid, SIGUSR1);
if (!ck_completion_timeout(&wait_child, (void *)&pid, 500))
kill_pid(pid, SIGKILL);
}
}
static void shutdown_children(ckpool_t *ckp)
{
cancel_join_pthread(&ckp->pth_watchdog);
__shutdown_children(ckp);
}
static void sighandler(const int sig) static void sighandler(const int sig)
{ {
ckpool_t *ckp = global_ckp; ckpool_t *ckp = global_ckp;
signal(sig, SIG_IGN); signal(sig, SIG_IGN);
signal(SIGTERM, SIG_IGN); signal(SIGTERM, SIG_IGN);
LOGWARNING("Parent process %s received signal %d, shutting down", LOGWARNING("Process %s received signal %d, shutting down",
ckp->name, sig); ckp->name, sig);
cancel_join_pthread(&ckp->pth_watchdog);
__shutdown_children(ckp);
cancel_pthread(&ckp->pth_listener); cancel_pthread(&ckp->pth_listener);
exit(0); exit(0);
} }
@ -1651,7 +1456,7 @@ static void parse_config(ckpool_t *ckp)
json_decref(json_conf); json_decref(json_conf);
} }
static void manage_old_child(ckpool_t *ckp, proc_instance_t *pi) static void manage_old_instance(ckpool_t *ckp, proc_instance_t *pi)
{ {
struct stat statbuf; struct stat statbuf;
char path[256]; char path[256];
@ -1679,85 +1484,13 @@ static void manage_old_child(ckpool_t *ckp, proc_instance_t *pi)
} }
} }
static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *name) static void prepare_child(ckpool_t *ckp, proc_instance_t *pi, void *process, char *name)
{ {
proc_instance_t *pi = ckzalloc(sizeof(proc_instance_t));
ckp->children = realloc(ckp->children, sizeof(proc_instance_t *) * (ckp->proc_instances + 1));
ckp->children[ckp->proc_instances++] = pi;
pi->ckp = ckp; pi->ckp = ckp;
pi->processname = name; pi->processname = name;
pi->sockname = pi->processname; pi->sockname = pi->processname;
pi->process = process;
create_process_unixsock(pi); create_process_unixsock(pi);
manage_old_child(ckp, pi); create_pthread(&pi->pth_process, process, pi);
/* Remove the old pid file if we've succeeded in coming this far */
rm_namepid(pi);
return pi;
}
static proc_instance_t *child_by_pid(const ckpool_t *ckp, const pid_t pid)
{
proc_instance_t *pi = NULL;
int i;
for (i = 0; i < ckp->proc_instances; i++) {
if (ckp->children[i]->pid == pid) {
pi = ckp->children[i];
break;
}
}
return pi;
}
static void *watchdog(void *arg)
{
#if 0
time_t last_relaunch_t = time(NULL);
#endif
ckpool_t *ckp = (ckpool_t *)arg;
rename_proc("watchdog");
sleep(1);
while (42) {
proc_instance_t *pi;
#if 0
time_t relaunch_t;
#endif
int pid, status;
pid = waitpid(0, &status, 0);
pi = child_by_pid(ckp, pid);
if (pi && WIFEXITED(status)) {
LOGWARNING("Child process %s exited, terminating!", pi->processname);
break;
}
#if 0
/* Don't bother trying to respawn for now since communication
* breakdown between the processes will make them exit. */
relaunch_t = time(NULL);
if (relaunch_t == last_relaunch_t) {
LOGEMERG("Respawning processes too fast, exiting!");
break;
}
last_relaunch_t = relaunch_t;
if (pi) {
LOGERR("%s process dead! Relaunching", pi->processname);
launch_process(pi);
} else {
LOGEMERG("Unknown child process %d dead, exiting!", pid);
break;
}
#else
if (pi)
LOGEMERG("%s process dead, terminating!", pi->processname);
else
LOGEMERG("Unknown child process %d dead, exiting!", pid);
break;
#endif
}
send_proc(&ckp->main, "shutdown");
return NULL;
} }
#ifdef USE_CKDB #ifdef USE_CKDB
@ -2130,7 +1863,7 @@ int main(int argc, char **argv)
} }
if (!ckp.handover) if (!ckp.handover)
manage_old_child(&ckp, &ckp.main); manage_old_instance(&ckp, &ckp.main);
write_namepid(&ckp.main); write_namepid(&ckp.main);
open_process_sock(&ckp, &ckp.main, &ckp.main.us); open_process_sock(&ckp, &ckp.main, &ckp.main.us);
launch_logger(&ckp.main); launch_logger(&ckp.main);
@ -2150,26 +1883,21 @@ int main(int argc, char **argv)
ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api);
create_pthread(&ckp.pth_listener, listener, &ckp.main); create_pthread(&ckp.pth_listener, listener, &ckp.main);
/* Launch separate processes from here */
ckp.generator = prepare_child(&ckp, &generator, "generator");
ckp.stratifier = prepare_child(&ckp, &stratifier, "stratifier");
ckp.connector = prepare_child(&ckp, &connector, "connector");
launch_processes(&ckp);
create_pthread(&ckp.pth_watchdog, watchdog, &ckp);
handler.sa_handler = &sighandler; handler.sa_handler = &sighandler;
handler.sa_flags = 0; handler.sa_flags = 0;
sigemptyset(&handler.sa_mask); sigemptyset(&handler.sa_mask);
sigaction(SIGTERM, &handler, NULL); sigaction(SIGTERM, &handler, NULL);
sigaction(SIGINT, &handler, NULL); sigaction(SIGINT, &handler, NULL);
/* Launch separate processes from here */
prepare_child(&ckp, &ckp.generator, generator, "generator");
prepare_child(&ckp, &ckp.stratifier, stratifier, "stratifier");
prepare_child(&ckp, &ckp.connector, connector, "connector");
/* Shutdown from here if the listener is sent a shutdown message */ /* Shutdown from here if the listener is sent a shutdown message */
if (ckp.pth_listener) if (ckp.pth_listener)
join_pthread(ckp.pth_listener); join_pthread(ckp.pth_listener);
shutdown_children(&ckp);
clean_up(&ckp); clean_up(&ckp);
return 0; return 0;

23
src/ckpool.h

@ -63,7 +63,7 @@ struct proc_instance {
char *sockname; char *sockname;
int pid; int pid;
int oldpid; int oldpid;
int (*process)(proc_instance_t *); pthread_t pth_process;
/* Linked list of received messages, locking and conditional */ /* Linked list of received messages, locking and conditional */
unix_msg_t *unix_msgs; unix_msg_t *unix_msgs;
@ -169,17 +169,14 @@ struct ckpool_instance {
/* API message queue */ /* API message queue */
ckmsgq_t *ckpapi; ckmsgq_t *ckpapi;
/* Logger message queue NOTE: Unique per process */ /* Logger message queue */
ckmsgq_t *logger; ckmsgq_t *logger;
/* Process instance data of parent/child processes */ /* Process instance data of parent/child processes */
proc_instance_t main; proc_instance_t main;
int proc_instances; proc_instance_t generator;
proc_instance_t **children; proc_instance_t stratifier;
proc_instance_t connector;
proc_instance_t *generator;
proc_instance_t *stratifier;
proc_instance_t *connector;
/* Threads of main process */ /* Threads of main process */
pthread_t pth_listener; pthread_t pth_listener;
@ -329,11 +326,11 @@ void empty_buffer(connsock_t *cs);
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len);
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(&(pi), msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,
const char *file, const char *func, const int line); const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__) #define send_recv_proc(pi, msg) _send_recv_proc(&(pi), msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);
#define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__)
char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, const char *func, char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, const char *func,
@ -344,8 +341,6 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
bool send_json_msg(connsock_t *cs, const json_t *json_msg); bool send_json_msg(connsock_t *cs, const json_t *json_msg);
json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val); json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val);
void childsighandler(const int sig);
int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret);
bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_string(char **store, const json_t *val, const char *res);
bool json_get_int64(int64_t *store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res);
bool json_get_int(int *store, const json_t *val, const char *res); bool json_get_int(int *store, const json_t *val, const char *res);

8
src/connector.c

@ -685,7 +685,6 @@ static void *receiver(void *arg)
} }
out: out:
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL; return NULL;
} }
@ -794,7 +793,6 @@ static void *sender(void *arg)
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
} }
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL; return NULL;
} }
@ -1466,13 +1464,15 @@ out:
return ret; return ret;
} }
int connector(proc_instance_t *pi) void *connector(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg;
cdata_t *cdata = ckzalloc(sizeof(cdata_t)); cdata_t *cdata = ckzalloc(sizeof(cdata_t));
int threads, sockd, ret = 0, i, tries = 0; int threads, sockd, ret = 0, i, tries = 0;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
const int on = 1; const int on = 1;
rename_proc(pi->processname);
LOGWARNING("%s connector starting", ckp->name); LOGWARNING("%s connector starting", ckp->name);
ckp->cdata = cdata; ckp->cdata = cdata;
cdata->ckp = ckp; cdata->ckp = ckp;
@ -1591,5 +1591,5 @@ int connector(proc_instance_t *pi)
ret = connector_loop(pi, cdata); ret = connector_loop(pi, cdata);
out: out:
dealloc(ckp->cdata); dealloc(ckp->cdata);
return process_exit(ckp, pi, ret); return NULL;
} }

4
src/connector.h

@ -1,5 +1,5 @@
/* /*
* Copyright 2014 Con Kolivas * Copyright 2014-2016 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -10,6 +10,6 @@
#ifndef CONNECTOR_H #ifndef CONNECTOR_H
#define CONNECTOR_H #define CONNECTOR_H
int connector(proc_instance_t *pi); void *connector(void *arg);
#endif /* CONNECTOR_H */ #endif /* CONNECTOR_H */

64
src/generator.c

@ -232,9 +232,6 @@ static server_instance_t *live_server(ckpool_t *ckp)
LOGDEBUG("Attempting to connect to bitcoind"); LOGDEBUG("Attempting to connect to bitcoind");
retry: retry:
if (!ping_main(ckp))
goto out;
/* First find a server that is already flagged alive if possible /* First find a server that is already flagged alive if possible
* without blocking on server_alive() */ * without blocking on server_alive() */
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
@ -262,7 +259,6 @@ retry:
living: living:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out:
send_proc(ckp->connector, alive ? "accept" : "reject"); send_proc(ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -294,7 +290,7 @@ static void clear_unix_msg(unix_msg_t **umsg)
} }
} }
static int gen_loop(proc_instance_t *pi) static void gen_loop(proc_instance_t *pi)
{ {
server_instance_t *si = NULL, *old_si; server_instance_t *si = NULL, *old_si;
unix_msg_t *umsg = NULL; unix_msg_t *umsg = NULL;
@ -304,7 +300,6 @@ static int gen_loop(proc_instance_t *pi)
connsock_t *cs; connsock_t *cs;
gbtbase_t *gbt; gbtbase_t *gbt;
char hash[68]; char hash[68];
int ret = 0;
reconnect: reconnect:
clear_unix_msg(&umsg); clear_unix_msg(&umsg);
@ -329,11 +324,6 @@ retry:
do { do {
umsg = get_unix_msg(pi); umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (!umsg); } while (!umsg);
if (unlikely(!si->alive)) { if (unlikely(!si->alive)) {
@ -343,10 +333,6 @@ retry:
buf = umsg->buf; buf = umsg->buf;
LOGDEBUG("Generator received request: %s", buf); LOGDEBUG("Generator received request: %s", buf);
if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
}
if (cmdmatch(buf, "getbase")) { if (cmdmatch(buf, "getbase")) {
if (!gen_gbtbase(cs, gbt)) { if (!gen_gbtbase(cs, gbt)) {
LOGWARNING("Failed to get block template from %s:%s", LOGWARNING("Failed to get block template from %s:%s",
@ -418,7 +404,6 @@ retry:
out: out:
kill_server(si); kill_server(si);
return ret;
} }
static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy) static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy)
@ -2223,9 +2208,6 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
int retries = 0; int retries = 0;
while (42) { while (42) {
if (!ping_main(ckp))
break;
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->proxies, proxi, tmp) { HASH_ITER(hh, gdata->proxies, proxi, tmp) {
if (proxi->disabled || !proxi->global) if (proxi->disabled || !proxi->global)
@ -2678,7 +2660,7 @@ static void parse_globaluser(ckpool_t *ckp, gdata_t *gdata, const char *buf)
add_userproxy(ckp, gdata, userid, url, username, pass); add_userproxy(ckp, gdata, userid, url, username, pass);
} }
static int proxy_loop(proc_instance_t *pi) static void proxy_loop(proc_instance_t *pi)
{ {
proxy_instance_t *proxi = NULL, *cproxy; proxy_instance_t *proxi = NULL, *cproxy;
server_instance_t *si = NULL, *old_si; server_instance_t *si = NULL, *old_si;
@ -2688,7 +2670,6 @@ static int proxy_loop(proc_instance_t *pi)
connsock_t *cs = NULL; connsock_t *cs = NULL;
bool started = false; bool started = false;
char *buf = NULL; char *buf = NULL;
int ret = 0;
reconnect: reconnect:
clear_unix_msg(&umsg); clear_unix_msg(&umsg);
@ -2724,11 +2705,6 @@ retry:
clear_unix_msg(&umsg); clear_unix_msg(&umsg);
do { do {
umsg = get_unix_msg(pi); umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (!umsg); } while (!umsg);
buf = umsg->buf; buf = umsg->buf;
@ -2763,9 +2739,6 @@ retry:
parse_proxystats(gdata, umsg->sockd, buf + 11); parse_proxystats(gdata, umsg->sockd, buf + 11);
} else if (cmdmatch(buf, "globaluser")) { } else if (cmdmatch(buf, "globaluser")) {
parse_globaluser(ckp, gdata, buf + 11); parse_globaluser(ckp, gdata, buf + 11);
} else if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "submitblock:")) { } else if (cmdmatch(buf, "submitblock:")) {
@ -2791,7 +2764,7 @@ retry:
} }
goto retry; goto retry;
out: out:
return ret; return;
} }
/* Check which servers are alive, maintaining a connection with them and /* Check which servers are alive, maintaining a connection with them and
@ -2849,13 +2822,13 @@ static void setup_servers(ckpool_t *ckp)
create_pthread(&pth_watchdog, server_watchdog, ckp); create_pthread(&pth_watchdog, server_watchdog, ckp);
} }
static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static void server_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
int i, ret; int i;
setup_servers(ckp); setup_servers(ckp);
ret = gen_loop(pi); gen_loop(pi);
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i]; server_instance_t *si = ckp->servers[i];
@ -2864,7 +2837,6 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
dealloc(si); dealloc(si);
} }
dealloc(ckp->servers); dealloc(ckp->servers);
return ret;
} }
static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id) static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id)
@ -2888,11 +2860,11 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
return proxy; return proxy;
} }
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) static void proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
gdata_t *gdata = ckp->gdata; gdata_t *gdata = ckp->gdata;
proxy_instance_t *proxy; proxy_instance_t *proxy;
int i, ret; int i;
mutex_init(&gdata->lock); mutex_init(&gdata->lock);
mutex_init(&gdata->notify_lock); mutex_init(&gdata->notify_lock);
@ -2916,17 +2888,16 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
} }
} }
ret = proxy_loop(pi); proxy_loop(pi);
return ret;
} }
int generator(proc_instance_t *pi) void *generator(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
gdata_t *gdata; gdata_t *gdata;
int ret;
rename_proc(pi->processname);
LOGWARNING("%s generator starting", ckp->name); LOGWARNING("%s generator starting", ckp->name);
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->gdata = gdata; ckp->gdata = gdata;
@ -2938,18 +2909,13 @@ int generator(proc_instance_t *pi)
/* Wait for the stratifier to be ready for us */ /* Wait for the stratifier to be ready for us */
do { do {
if (!ping_main(ckp)) {
ret = 1;
goto out;
}
cksleep_ms(10); cksleep_ms(10);
buf = send_recv_proc(ckp->stratifier, "ping"); buf = send_recv_proc(ckp->stratifier, "ping");
} while (!buf); } while (!buf);
dealloc(buf); dealloc(buf);
ret = proxy_mode(ckp, pi); proxy_mode(ckp, pi);
} else } else
ret = server_mode(ckp, pi); server_mode(ckp, pi);
out:
dealloc(ckp->gdata); dealloc(ckp->gdata);
return process_exit(ckp, pi, ret); return NULL;
} }

4
src/generator.h

@ -1,5 +1,5 @@
/* /*
* Copyright 2014 Con Kolivas * Copyright 2014-2016 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -12,6 +12,6 @@
#include "config.h" #include "config.h"
int generator(proc_instance_t *pi); void *generator(void *arg);
#endif /* GENERATOR_H */ #endif /* GENERATOR_H */

31
src/stratifier.c

@ -1074,7 +1074,7 @@ static char *__send_recv_generator(ckpool_t *ckp, const char *msg, const int pri
set = true; set = true;
} else } else
set = false; set = false;
buf = _send_recv_proc(ckp->generator, msg, UNIX_WRITE_TIMEOUT, RPC_TIMEOUT, __FILE__, __func__, __LINE__); buf = _send_recv_proc(&ckp->generator, msg, UNIX_WRITE_TIMEOUT, RPC_TIMEOUT, __FILE__, __func__, __LINE__);
if (unlikely(!buf)) if (unlikely(!buf))
buf = strdup("failed"); buf = strdup("failed");
if (set) if (set)
@ -3937,7 +3937,7 @@ static void ckdbq_flush(sdata_t *sdata)
LOGWARNING("Flushed %d messages from ckdb queue", flushed); LOGWARNING("Flushed %d messages from ckdb queue", flushed);
} }
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static void stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
unix_msg_t *umsg = NULL; unix_msg_t *umsg = NULL;
@ -3968,11 +3968,6 @@ retry:
} }
umsg = get_unix_msg(pi); umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Stratifier failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (!umsg); } while (!umsg);
buf = umsg->buf; buf = umsg->buf;
@ -4060,10 +4055,7 @@ retry:
Close(umsg->sockd); Close(umsg->sockd);
LOGDEBUG("Stratifier received request: %s", buf); LOGDEBUG("Stratifier received request: %s", buf);
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "update")) {
ret = 0;
goto out;
} else if (cmdmatch(buf, "update")) {
update_base(ckp, GEN_PRIORITY); update_base(ckp, GEN_PRIORITY);
} else if (cmdmatch(buf, "subscribe")) { } else if (cmdmatch(buf, "subscribe")) {
/* Proxifier has a new subscription */ /* Proxifier has a new subscription */
@ -4106,9 +4098,6 @@ retry:
} else } else
LOGWARNING("Unhandled stratifier message: %s", buf); LOGWARNING("Unhandled stratifier message: %s", buf);
goto retry; goto retry;
out:
return ret;
} }
static void *blockupdate(void *arg) static void *blockupdate(void *arg)
@ -7519,15 +7508,17 @@ static bool script_address(const char *btcaddress)
return btcaddress[0] == '3'; return btcaddress[0] == '3';
} }
int stratifier(proc_instance_t *pi) void *stratifier(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 1, threads;
int64_t randomiser; int64_t randomiser;
char *buf = NULL; char *buf = NULL;
sdata_t *sdata; sdata_t *sdata;
int threads;
rename_proc(pi->processname);
LOGWARNING("%s stratifier starting", ckp->name); LOGWARNING("%s stratifier starting", ckp->name);
sdata = ckzalloc(sizeof(sdata_t)); sdata = ckzalloc(sizeof(sdata_t));
ckp->sdata = sdata; ckp->sdata = sdata;
@ -7536,10 +7527,6 @@ int stratifier(proc_instance_t *pi)
/* Wait for the generator to have something for us */ /* Wait for the generator to have something for us */
do { do {
if (!ping_main(ckp)) {
ret = 1;
goto out;
}
if (ckp->proxy) if (ckp->proxy)
break; break;
buf = send_recv_proc(ckp->generator, "ping"); buf = send_recv_proc(ckp->generator, "ping");
@ -7622,7 +7609,7 @@ int stratifier(proc_instance_t *pi)
LOGWARNING("%s stratifier ready", ckp->name); LOGWARNING("%s stratifier ready", ckp->name);
ret = stratum_loop(ckp, pi); stratum_loop(ckp, pi);
out: out:
if (ckp->proxy) { if (ckp->proxy) {
proxy_t *proxy, *tmpproxy; proxy_t *proxy, *tmpproxy;
@ -7635,5 +7622,5 @@ out:
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
} }
dealloc(ckp->sdata); dealloc(ckp->sdata);
return process_exit(ckp, pi, ret); return NULL;
} }

4
src/stratifier.h

@ -1,5 +1,5 @@
/* /*
* Copyright 2014 Con Kolivas * Copyright 2014-2016 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -10,6 +10,6 @@
#ifndef STRATIFIER_H #ifndef STRATIFIER_H
#define STRATIFIER_H #define STRATIFIER_H
int stratifier(proc_instance_t *pi); void *stratifier(void *arg);
#endif /* STRATIFIER_H */ #endif /* STRATIFIER_H */

Loading…
Cancel
Save