Browse Source

Make process instances an array within the ckpool struct allowing us to enable only those we need when we add more modules to future code

master
Con Kolivas 11 years ago
parent
commit
78c1b2dfda
  1. 99
      src/ckpool.c
  2. 10
      src/ckpool.h
  3. 4
      src/connector.c
  4. 2
      src/generator.c
  5. 14
      src/stratifier.c

99
src/ckpool.c

@ -175,23 +175,38 @@ static void launch_process(proc_instance_t *pi)
pi->pid = pid;
}
static void launch_processes(ckpool_t *ckp)
{
int i;
for (i = 0; i < ckp->proc_instances; i++)
launch_process(ckp->children[i]);
}
static void clean_up(ckpool_t *ckp)
{
int i, children = ckp->proc_instances;
rm_namepid(&ckp->main);
dealloc(ckp->socket_dir);
ckp->proc_instances = 0;
for (i = 0; i < children; i++)
dealloc(ckp->children[i]);
dealloc(ckp->children);
}
static void __shutdown_children(ckpool_t *ckp, int sig)
{
int i;
pthread_cancel(ckp->pth_watchdog);
join_pthread(ckp->pth_watchdog);
if (!kill(ckp->generator.pid, 0))
kill(ckp->generator.pid, sig);
if (!kill(ckp->stratifier.pid, 0))
kill(ckp->stratifier.pid, sig);
if (!kill(ckp->connector.pid, 0))
kill(ckp->connector.pid, sig);
for (i = 0; i < ckp->proc_instances; i++) {
pid_t pid = ckp->children[i]->pid;
if (!kill(pid, 0))
kill(pid, sig);
}
}
static void shutdown_children(ckpool_t *ckp, int sig)
@ -204,14 +219,16 @@ static void shutdown_children(ckpool_t *ckp, int sig)
static void sighandler(int sig)
{
int i;
pthread_cancel(global_ckp->pth_watchdog);
join_pthread(global_ckp->pth_watchdog);
/* First attempt, send a shutdown message */
send_proc(&global_ckp->main, "shutdown");
send_proc(&global_ckp->generator, "shutdown");
send_proc(&global_ckp->stratifier, "shutdown");
send_proc(&global_ckp->connector, "shutdown");
for (i = 0; i < global_ckp->proc_instances; i++)
send_proc(global_ckp->children[i], "shutdown");
if (sig != 9) {
/* Wait a second, then send SIGTERM */
@ -287,37 +304,32 @@ static void parse_config(ckpool_t *ckp)
json_decref(json_conf);
}
static void prepare_generator(ckpool_t *ckp)
static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *name)
{
proc_instance_t *pi = &ckp->generator;
proc_instance_t *pi = ckzalloc(sizeof(proc_instance_t));
ckp->children = realloc(ckp->children, sizeof(proc_instance_t *) * (ckp->proc_instances + 1));
ckp->children[ckp->proc_instances++] = pi;
pi->ckp = ckp;
pi->processname = strdup("generator");
pi->processname = name;
pi->sockname = pi->processname;
pi->process = &generator;
pi->process = process;
create_process_unixsock(pi);
return pi;
}
static void prepare_stratifier(ckpool_t *ckp)
static proc_instance_t *child_by_pid(ckpool_t *ckp, pid_t pid)
{
proc_instance_t *pi = &ckp->stratifier;
proc_instance_t *pi = NULL;
int i;
pi->ckp = ckp;
pi->processname = strdup("stratifier");
pi->sockname = pi->processname;
pi->process = &stratifier;
create_process_unixsock(pi);
}
static void prepare_connector(ckpool_t *ckp)
{
proc_instance_t *pi = &ckp->connector;
pi->ckp = ckp;
pi->processname = strdup("connector");
pi->sockname = pi->processname;
pi->process = &connector;
create_process_unixsock(pi);
for (i = 0; i < ckp->proc_instances; i++) {
if (ckp->children[i]->pid == pid) {
pi = ckp->children[i];
break;
}
}
return pi;
}
static void *watchdog(void *arg)
@ -327,6 +339,7 @@ static void *watchdog(void *arg)
rename_proc("watchdog");
while (42) {
proc_instance_t *pi;
time_t relaunch_t;
int pid;
@ -337,15 +350,10 @@ static void *watchdog(void *arg)
break;
}
last_relaunch_t = relaunch_t;
if (pid == ckp->generator.pid) {
LOGERR("Generator process dead! Relaunching");
launch_process(&ckp->generator);
} else if (pid == ckp->stratifier.pid) {
LOGERR("Stratifier process dead! Relaunching");
launch_process(&ckp->stratifier);
} else if (pid == ckp->connector.pid) {
LOGERR("Connector process dead! Relaunching");
launch_process(&ckp->connector);
pi = child_by_pid(ckp, pid);
if (pi) {
LOGERR("%s process dead! Relaunching", pi->processname);
launch_process(pi);
} else {
LOGEMERG("Unknown child process %d dead, exiting!", pid);
break;
@ -448,12 +456,11 @@ int main(int argc, char **argv)
create_pthread(&ckp.pth_listener, listener, &ckp.main);
/* Launch separate processes from here */
prepare_generator(&ckp);
prepare_stratifier(&ckp);
prepare_connector(&ckp);
launch_process(&ckp.generator);
launch_process(&ckp.stratifier);
launch_process(&ckp.connector);
ckp.generator = prepare_child(&ckp, &generator, "generator");
ckp.stratifier = prepare_child(&ckp, &stratifier, "stratifier");
ckp.connector = prepare_child(&ckp, &connector, "connector");
launch_processes(&ckp);
create_pthread(&ckp.pth_watchdog, watchdog, &ckp);

10
src/ckpool.h

@ -42,9 +42,13 @@ struct ckpool_instance {
/* Process instance data of parent/child processes */
proc_instance_t main;
proc_instance_t generator;
proc_instance_t stratifier;
proc_instance_t connector;
int proc_instances;
proc_instance_t **children;
proc_instance_t *generator;
proc_instance_t *stratifier;
proc_instance_t *connector;
/* Threads of main process */
pthread_t pth_listener;

4
src/connector.c

@ -150,7 +150,7 @@ static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instanc
if (fd == -1)
return;
sprintf(buf, "dropclient=%d", client->id);
send_proc(&ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
}
static void send_client(conn_instance_t *ci, int id, char *buf);
@ -222,7 +222,7 @@ reparse:
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
s = json_dumps(val, 0);
send_proc(&ckp->stratifier, s);
send_proc(ckp->stratifier, s);
free(s);
json_decref(val);
}

2
src/generator.c

@ -87,7 +87,7 @@ retry:
} else if (!strncasecmp(buf, "submitblock:", 12)) {
LOGNOTICE("Submitting block data!");
if (submit_block(cs, buf + 12))
send_proc(&ckp->stratifier, "update");
send_proc(ckp->stratifier, "update");
/* FIXME Add logging of block solves */
} else if (!strncasecmp(buf, "ping", 4)) {
LOGDEBUG("Generator received ping request");

14
src/stratifier.c

@ -390,7 +390,7 @@ static void update_base(ckpool_t *ckp)
json_t *val;
char *buf;
buf = send_recv_proc(&ckp->generator, "getbase");
buf = send_recv_proc(ckp->generator, "getbase");
if (unlikely(!buf)) {
LOGWARNING("Failed to get base from generator in update_base");
return;
@ -720,7 +720,7 @@ static void *blockupdate(void *arg)
char request[8];
rename_proc("blockupdate");
buf = send_recv_proc(&ckp->generator, "getbest");
buf = send_recv_proc(ckp->generator, "getbest");
if (buf && strncasecmp(buf, "Failed", 6))
sprintf(request, "getbest");
else
@ -729,11 +729,11 @@ static void *blockupdate(void *arg)
memset(hash, 0, 68);
while (42) {
dealloc(buf);
buf = send_recv_proc(&ckp->generator, request);
buf = send_recv_proc(ckp->generator, request);
if (buf && strcmp(buf, hash) && strncasecmp(buf, "Failed", 6)) {
strcpy(hash, buf);
LOGNOTICE("Block hash changed to %s", hash);
send_proc(&ckp->stratifier, "update");
send_proc(ckp->stratifier, "update");
} else
cksleep_ms(ckp->blockpoll);
}
@ -996,7 +996,7 @@ static void test_blocksolve(workbase_t *wb, const uchar *data, double diff, cons
strcat(gbt_block, hexcoinbase);
if (wb->transactions)
realloc_strcat(&gbt_block, wb->txn_data);
send_proc(&wb->ckp->generator, gbt_block);
send_proc(wb->ckp->generator, gbt_block);
free(gbt_block);
}
@ -1289,7 +1289,7 @@ static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_v
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %d", client->id);
sprintf(buf, "dropclient=%d", client->id);
send_proc(&client->ckp->connector, buf);
send_proc(client->ckp->connector, buf);
goto out;
}
@ -1486,7 +1486,7 @@ static void *stratum_sender(void *arg)
* connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, 0);
send_proc(&ckp->connector, s);
send_proc(ckp->connector, s);
free(s);
json_decref(msg->json_msg);

Loading…
Cancel
Save