Browse Source

Make the connector bind to multiple IPs and Ports specified in the configuration, handling incoming connections

master
Con Kolivas 10 years ago
parent
commit
0f7b6c5a68
  1. 6
      README
  2. 107
      src/connector.c

6
README

@ -250,9 +250,11 @@ from 2 to 8. Default 8
miners and is set to 30 seconds by default to help perpetuate transactions for miners and is set to 30 seconds by default to help perpetuate transactions for
the health of the bitcoin network. the health of the bitcoin network.
"serverurl" : This is the IP to try to bind ckpool uniquely to, otherwise it "serverurl" : This is the IP(s) to try to bind ckpool uniquely to, otherwise it
will attempt to bind to all interfaces in port 3333 by default in pool mode will attempt to bind to all interfaces in port 3333 by default in pool mode
and 3334 in proxy mode. and 3334 in proxy mode. Multiple entries can be specified as an array by
either IP or resolvable domain name but the executable must be able to bind to
all of them and ports up to 1024 usually require privileged access.
"mindiff" : Minimum diff that vardiff will allow miners to drop to. Default 1 "mindiff" : Minimum diff that vardiff will allow miners to drop to. Default 1

107
src/connector.c

@ -64,8 +64,14 @@ struct connector_data {
ckpool_t *ckp; ckpool_t *ckp;
cklock_t lock; cklock_t lock;
proc_instance_t *pi; proc_instance_t *pi;
int serverfd;
/* Array of server fds */
int *serverfd;
/* Number of server fds */
int serverfds;
/* All time count of clients connected */
int nfds; int nfds;
bool accept; bool accept;
pthread_t pth_sender; pthread_t pth_sender;
pthread_t pth_receiver; pthread_t pth_receiver;
@ -109,7 +115,7 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, int epfd) static int accept_client(cdata_t *cdata, const int epfd, const int sockd)
{ {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
client_instance_t *client; client_instance_t *client;
@ -128,7 +134,7 @@ static int accept_client(cdata_t *cdata, int epfd)
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
address_len = sizeof(client->address); address_len = sizeof(client->address);
fd = accept(cdata->serverfd, &client->address, &address_len); fd = accept(sockd, &client->address, &address_len);
if (unlikely(fd < 0)) { if (unlikely(fd < 0)) {
/* Handle these errors gracefully should we ever share this /* Handle these errors gracefully should we ever share this
* socket */ * socket */
@ -136,7 +142,7 @@ static int accept_client(cdata_t *cdata, int epfd)
LOGERR("Recoverable error on accept in accept_client"); LOGERR("Recoverable error on accept in accept_client");
return 0; return 0;
} }
LOGERR("Failed to accept on socket %d in acceptor", cdata->serverfd); LOGERR("Failed to accept on socket %d in acceptor", sockd);
dealloc(client); dealloc(client);
return -1; return -1;
} }
@ -348,7 +354,7 @@ void *receiver(void *arg)
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
struct epoll_event event; struct epoll_event event;
bool maxconn = true; bool maxconn = true;
int ret, epfd; int ret, epfd, i;
rename_proc("creceiver"); rename_proc("creceiver");
@ -357,13 +363,18 @@ void *receiver(void *arg)
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL; return NULL;
} }
event.data.fd = cdata->serverfd; /* Add all the serverfds to the epoll */
for (i = 0; i < cdata->serverfds; i++) {
/* The small values will be easily identifiable compared to
* pointers */
event.data.u64 = i;
event.events = EPOLLIN; event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd, &event); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
if (ret < 0) { if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL; return NULL;
} }
}
while (42) { while (42) {
client_instance_t *client; client_instance_t *client;
@ -383,12 +394,13 @@ void *receiver(void *arg)
* can receive connections. */ * can receive connections. */
LOGDEBUG("Dropping listen backlog to 0"); LOGDEBUG("Dropping listen backlog to 0");
maxconn = false; maxconn = false;
listen(cdata->serverfd, 0); for (i = 0; i < cdata->serverfds; i++)
listen(cdata->serverfd[i], 0);
} }
continue; continue;
} }
if (event.data.fd == cdata->serverfd) { if (event.data.u64 < (uint64_t)cdata->serverfds) {
ret = accept_client(cdata, epfd); ret = accept_client(cdata, epfd, (int)cdata->serverfd[event.data.u64]);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGEMERG("FATAL: Failed to accept_client in receiver"); LOGEMERG("FATAL: Failed to accept_client in receiver");
break; break;
@ -691,7 +703,7 @@ retry:
goto retry; goto retry;
} }
if (cmdmatch(buf, "getfd")) { if (cmdmatch(buf, "getfd")) {
send_fd(cdata->serverfd, sockd); send_fd(cdata->serverfd[0], sockd);
goto retry; goto retry;
} }
@ -740,32 +752,23 @@ int connector(proc_instance_t *pi)
ckp->data = cdata; ckp->data = cdata;
cdata->ckp = ckp; cdata->ckp = ckp;
if (!ckp->serverurls)
cdata->serverfd = ckalloc(sizeof(int *));
else
cdata->serverfd = ckalloc(sizeof(int *) * ckp->serverurls);
if (ckp->oldconnfd > 0) { if (ckp->oldconnfd > 0) {
sockd = ckp->oldconnfd; /* Only handing over the first interface socket for now */
} else if (ckp->serverurls && ckp->serverurl[0]) { cdata->serverfd[0] = ckp->oldconnfd;
if (!extract_sockaddr(ckp->serverurl[0], &url, &port)) { cdata->serverfds++;
LOGWARNING("Failed to extract server address from %s", ckp->serverurl[0]);
ret = 1;
goto out;
} }
do {
sockd = bind_socket(url, port);
if (sockd > 0)
break;
LOGWARNING("Connector failed to bind to socket, retrying in 5s");
sleep(5);
} while (++tries < 25);
dealloc(url); if (!cdata->serverfds && !ckp->serverurls) {
dealloc(port); /* No serverurls have been specified and no sockets have been
if (sockd < 0) { * inherited. Bind to all interfaces on default sockets. */
LOGERR("Connector failed to bind to socket for 2 minutes");
ret = 1;
goto out;
}
} else {
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
cdata->serverfds = 1;
sockd = socket(AF_INET, SOCK_STREAM, 0); sockd = socket(AF_INET, SOCK_STREAM, 0);
if (sockd < 0) { if (sockd < 0) {
LOGERR("Connector failed to open socket"); LOGERR("Connector failed to open socket");
@ -789,20 +792,50 @@ int connector(proc_instance_t *pi)
Close(sockd); Close(sockd);
goto out; goto out;
} }
if (listen(sockd, SOMAXCONN) < 0) {
LOGERR("Connector failed to listen on socket");
Close(sockd);
goto out;
} }
if (tries) cdata->serverfd[0] = sockd;
LOGWARNING("Connector successfully bound to socket"); } else {
for ( ; cdata->serverfds < ckp->serverurls; cdata->serverfds++) {
char *serverurl = ckp->serverurl[cdata->serverfds];
ret = listen(sockd, SOMAXCONN); if (!extract_sockaddr(serverurl, &url, &port)) {
if (ret < 0) { LOGWARNING("Failed to extract server address from %s", serverurl);
ret = 1;
goto out;
}
do {
sockd = bind_socket(url, port);
if (sockd > 0)
break;
LOGWARNING("Connector failed to bind to socket, retrying in 5s");
sleep(5);
} while (++tries < 25);
dealloc(url);
dealloc(port);
if (sockd < 0) {
LOGERR("Connector failed to bind to socket for 2 minutes");
ret = 1;
goto out;
}
if (listen(sockd, SOMAXCONN) < 0) {
LOGERR("Connector failed to listen on socket"); LOGERR("Connector failed to listen on socket");
Close(sockd); Close(sockd);
goto out; goto out;
} }
cdata->serverfd[cdata->serverfds] = sockd;
}
}
if (tries)
LOGWARNING("Connector successfully bound to socket");
cklock_init(&cdata->lock); cklock_init(&cdata->lock);
cdata->pi = pi; cdata->pi = pi;
cdata->serverfd = sockd;
cdata->nfds = 0; cdata->nfds = 0;
cdata->client_id = 1; cdata->client_id = 1;
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);

Loading…
Cancel
Save