Browse Source

Add connector process which will process all remote communications

master
Con Kolivas 11 years ago
parent
commit
55b8e8424a
  1. 2
      src/Makefile.am
  2. 17
      src/ckpool.c
  3. 70
      src/connector.c
  4. 15
      src/connector.h
  5. 35
      src/libckpool.c
  6. 3
      src/libckpool.h

2
src/Makefile.am

@ -6,5 +6,5 @@ libckpool_la_LIBADD = @PTHREAD_LIBS@ @MATH_LIBS@ @RT_LIBS@ @JANSSON_LIBS@
bin_PROGRAMS = ckpool bin_PROGRAMS = ckpool
ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \ ckpool_SOURCES = ckpool.c ckpool.h generator.c generator.h bitcoin.c bitcoin.h \
stratifier.c stratifier.h stratifier.c stratifier.h connector.c connector.h
ckpool_LDADD = libckpool.la ckpool_LDADD = libckpool.la

17
src/ckpool.c

@ -21,6 +21,7 @@
#include "libckpool.h" #include "libckpool.h"
#include "generator.h" #include "generator.h"
#include "stratifier.h" #include "stratifier.h"
#include "connector.h"
/* Only global variable, to be used only by sighandler */ /* Only global variable, to be used only by sighandler */
static ckpool_t *global_ckp; static ckpool_t *global_ckp;
@ -209,6 +210,7 @@ static void parse_config(ckpool_t *ckp)
json_get_string(&ckp->btcsig, json_conf, "btcsig"); json_get_string(&ckp->btcsig, json_conf, "btcsig");
json_get_int(&ckp->blockpoll, json_conf, "blockpoll"); json_get_int(&ckp->blockpoll, json_conf, "blockpoll");
json_get_int(&ckp->update_interval, json_conf, "update_interval"); json_get_int(&ckp->update_interval, json_conf, "update_interval");
json_get_string(&ckp->serverurl, json_conf, "serverurl");
json_decref(json_conf); json_decref(json_conf);
} }
@ -257,6 +259,17 @@ static void launch_stratifier(ckpool_t *ckp)
launch_process(pi); launch_process(pi);
} }
static void launch_connector(ckpool_t *ckp)
{
proc_instance_t *pi = &ckp->connector;
pi->ckp = ckp;
pi->processname = strdup("connector");
pi->sockname = pi->processname;
pi->process = &connector;
launch_process(pi);
}
static void *watchdog(void *arg) static void *watchdog(void *arg)
{ {
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
@ -272,6 +285,9 @@ static void *watchdog(void *arg)
} else if (pid == ckp->stratifier.pid) { } else if (pid == ckp->stratifier.pid) {
LOGERR("Stratifier process dead! Relaunching"); LOGERR("Stratifier process dead! Relaunching");
launch_stratifier(ckp); launch_stratifier(ckp);
} else if (pid == ckp->connector.pid) {
LOGERR("Connector process dead! Relaunching");
launch_connector(ckp);
} }
} }
return NULL; return NULL;
@ -345,6 +361,7 @@ int main(int argc, char **argv)
/* Launch separate processes from here */ /* Launch separate processes from here */
launch_generator(&ckp); launch_generator(&ckp);
launch_stratifier(&ckp); launch_stratifier(&ckp);
launch_connector(&ckp);
test_functions(&ckp); test_functions(&ckp);

70
src/connector.c

@ -0,0 +1,70 @@
/*
* Copyright 2014 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 3 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
#include "config.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include "ckpool.h"
#include "libckpool.h"
int connector(proc_instance_t *pi)
{
char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
if (ckp->serverurl) {
if (!extract_sockaddr(ckp->serverurl, &url, &port)) {
LOGWARNING("Failed to extract server address from %s", ckp->serverurl);
ret = 1;
goto out;
}
sockd = bind_socket(url, port);
dealloc(url);
dealloc(port);
if (sockd < 0) {
LOGERR("Connector failed to bind to socket");
ret = 1;
goto out;
}
} else {
struct sockaddr_in serv_addr;
sockd = socket(AF_INET, SOCK_STREAM, 0);
if (sockd < 0) {
LOGERR("Connector failed to open socket");
ret = 1;
goto out;
}
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(3333);
ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if (ret < 0) {
LOGERR("Connector failed to bind to socket");
ret = 1;
close(sockd);
goto out;
}
}
out:
LOGINFO("%s connector exiting with return code %d", ckp->name, ret);
if (ret) {
send_proc(&ckp->main, "shutdown");
sleep(1);
}
return ret;
}

15
src/connector.h

@ -0,0 +1,15 @@
/*
* Copyright 2014 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 3 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
#ifndef CONNECTOR_H
#define CONNECTOR_H
int connector(proc_instance_t *pi);
#endif /* CONNECTOR_H */

35
src/libckpool.c

@ -348,6 +348,41 @@ void block_socket(int fd)
fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
} }
int bind_socket(char *url, char *port)
{
struct addrinfo servinfobase, *servinfo, hints, *p;
int ret, sockd = -1;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
servinfo = &servinfobase;
if (getaddrinfo(url, port, &hints, &servinfo) != 0) {
LOGWARNING("Failed to resolve (?wrong URL) %s:%s", url, port);
goto out;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
sockd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sockd > 0)
break;
}
if (sockd < 0) {
LOGWARNING("Failed to open socket for %s:%s", url, port);
goto out;
}
ret = bind(sockd, p->ai_addr, p->ai_addrlen);
if (ret < 0) {
LOGWARNING("Failed to bind socket for %s:%s", url, port);
close(sockd);
sockd = -1;
goto out;
}
out:
return sockd;
}
int connect_socket(char *url, char *port) int connect_socket(char *url, char *port)
{ {
struct addrinfo servinfobase, *servinfo, hints, *p; struct addrinfo servinfobase, *servinfo, hints, *p;

3
src/libckpool.h

@ -149,6 +149,7 @@ struct ckpool_instance {
proc_instance_t main; proc_instance_t main;
proc_instance_t generator; proc_instance_t generator;
proc_instance_t stratifier; proc_instance_t stratifier;
proc_instance_t connector;
/* Threads of main process */ /* Threads of main process */
pthread_t pth_listener; pthread_t pth_listener;
@ -166,6 +167,7 @@ struct ckpool_instance {
/* Stratum options */ /* Stratum options */
int update_interval; // Seconds between stratum updates int update_interval; // Seconds between stratum updates
char *serverurl;
}; };
void rename_proc(const char *name); void rename_proc(const char *name);
@ -224,6 +226,7 @@ bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port);
void keep_sockalive(int fd); void keep_sockalive(int fd);
void noblock_socket(int fd); void noblock_socket(int fd);
void block_socket(int fd); void block_socket(int fd);
int bind_socket(char *url, char *port);
int connect_socket(char *url, char *port); int connect_socket(char *url, char *port);
int write_socket(int fd, const void *buf, size_t nbyte); int write_socket(int fd, const void *buf, size_t nbyte);
int read_socket_line(connsock_t *cs); int read_socket_line(connsock_t *cs);

Loading…
Cancel
Save