From 55b8e8424a89069bdbc29c1c573c2df03242d638 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 23 Apr 2014 14:38:34 +1000 Subject: [PATCH] Add connector process which will process all remote communications --- src/Makefile.am | 2 +- src/ckpool.c | 17 ++++++++++++ src/connector.c | 70 +++++++++++++++++++++++++++++++++++++++++++++++++ src/connector.h | 15 +++++++++++ src/libckpool.c | 35 +++++++++++++++++++++++++ src/libckpool.h | 3 +++ 6 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 src/connector.c create mode 100644 src/connector.h diff --git a/src/Makefile.am b/src/Makefile.am index f6eb8744..b2074139 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,5 +6,5 @@ libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ @JANSSON_LIBS@ bin_PROGRAMS = ckpool ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ - stratifier.c stratifier.h + stratifier.c stratifier.h connector.c connector.h ckpool_LDADD = libckpool.la diff --git a/src/ckpool.c b/src/ckpool.c index 9b3aca6a..cd8166b1 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -21,6 +21,7 @@ #include "libckpool.h" #include "generator.h" #include "stratifier.h" +#include "connector.h" /* Only global variable, to be used only by sighandler */ static ckpool_t *global_ckp; @@ -209,6 +210,7 @@ static void parse_config(ckpool_t *ckp) json_get_string(&ckp->btcsig, json_conf, "btcsig"); json_get_int(&ckp->blockpoll, json_conf, "blockpoll"); json_get_int(&ckp->update_interval, json_conf, "update_interval"); + json_get_string(&ckp->serverurl, json_conf, "serverurl"); json_decref(json_conf); } @@ -257,6 +259,17 @@ static void launch_stratifier(ckpool_t *ckp) launch_process(pi); } +static void launch_connector(ckpool_t *ckp) +{ + proc_instance_t *pi = &ckp->connector; + + pi->ckp = ckp; + pi->processname = strdup("connector"); + pi->sockname = pi->processname; + pi->process = &connector; + launch_process(pi); +} + static void *watchdog(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; @@ -272,6 +285,9 @@ static void *watchdog(void *arg) } else if (pid == ckp->stratifier.pid) { LOGERR("Stratifier process dead! Relaunching"); launch_stratifier(ckp); + } else if (pid == ckp->connector.pid) { + LOGERR("Connector process dead! Relaunching"); + launch_connector(ckp); } } return NULL; @@ -345,6 +361,7 @@ int main(int argc, char **argv) /* Launch separate processes from here */ launch_generator(&ckp); launch_stratifier(&ckp); + launch_connector(&ckp); test_functions(&ckp); diff --git a/src/connector.c b/src/connector.c new file mode 100644 index 00000000..012b14e3 --- /dev/null +++ b/src/connector.c @@ -0,0 +1,70 @@ +/* + * Copyright 2014 Con Kolivas + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 3 of the License, or (at your option) + * any later version. See COPYING for more details. + */ + +#include "config.h" + +#include +#include +#include +#include +#include + +#include "ckpool.h" +#include "libckpool.h" + +int connector(proc_instance_t *pi) +{ + char *url = NULL, *port = NULL; + ckpool_t *ckp = pi->ckp; + int sockd, ret = 0; + + if (ckp->serverurl) { + if (!extract_sockaddr(ckp->serverurl, &url, &port)) { + LOGWARNING("Failed to extract server address from %s", ckp->serverurl); + ret = 1; + goto out; + } + sockd = bind_socket(url, port); + dealloc(url); + dealloc(port); + if (sockd < 0) { + LOGERR("Connector failed to bind to socket"); + ret = 1; + goto out; + } + } else { + struct sockaddr_in serv_addr; + + sockd = socket(AF_INET, SOCK_STREAM, 0); + if (sockd < 0) { + LOGERR("Connector failed to open socket"); + ret = 1; + goto out; + } + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); + serv_addr.sin_port = htons(3333); + ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); + if (ret < 0) { + LOGERR("Connector failed to bind to socket"); + ret = 1; + close(sockd); + goto out; + } + } + +out: + LOGINFO("%s connector exiting with return code %d", ckp->name, ret); + if (ret) { + send_proc(&ckp->main, "shutdown"); + sleep(1); + } + return ret; +} diff --git a/src/connector.h b/src/connector.h new file mode 100644 index 00000000..e121d2be --- /dev/null +++ b/src/connector.h @@ -0,0 +1,15 @@ +/* + * Copyright 2014 Con Kolivas + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 3 of the License, or (at your option) + * any later version. See COPYING for more details. + */ + +#ifndef CONNECTOR_H +#define CONNECTOR_H + +int connector(proc_instance_t *pi); + +#endif /* CONNECTOR_H */ diff --git a/src/libckpool.c b/src/libckpool.c index 0f6794b1..a14f44b8 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -348,6 +348,41 @@ void block_socket(int fd) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); } +int bind_socket(char *url, char *port) +{ + struct addrinfo servinfobase, *servinfo, hints, *p; + int ret, sockd = -1; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + servinfo = &servinfobase; + + if (getaddrinfo(url, port, &hints, &servinfo) != 0) { + LOGWARNING("Failed to resolve (?wrong URL) %s:%s", url, port); + goto out; + } + for (p = servinfo; p != NULL; p = p->ai_next) { + sockd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (sockd > 0) + break; + } + if (sockd < 0) { + LOGWARNING("Failed to open socket for %s:%s", url, port); + goto out; + } + ret = bind(sockd, p->ai_addr, p->ai_addrlen); + if (ret < 0) { + LOGWARNING("Failed to bind socket for %s:%s", url, port); + close(sockd); + sockd = -1; + goto out; + } + +out: + return sockd; +} + int connect_socket(char *url, char *port) { struct addrinfo servinfobase, *servinfo, hints, *p; diff --git a/src/libckpool.h b/src/libckpool.h index be186473..9b647c4f 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -149,6 +149,7 @@ struct ckpool_instance { proc_instance_t main; proc_instance_t generator; proc_instance_t stratifier; + proc_instance_t connector; /* Threads of main process */ pthread_t pth_listener; @@ -166,6 +167,7 @@ struct ckpool_instance { /* Stratum options */ int update_interval; // Seconds between stratum updates + char *serverurl; }; void rename_proc(const char *name); @@ -224,6 +226,7 @@ bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); void keep_sockalive(int fd); void noblock_socket(int fd); void block_socket(int fd); +int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); int read_socket_line(connsock_t *cs);