kanoi 11 years ago
parent
commit
2c776ed38e
  1. 27
      src/ckpool.c
  2. 13
      src/connector.c
  3. 8
      src/generator.c
  4. 28
      src/libckpool.c
  5. 8
      src/stratifier.c

27
src/ckpool.c

@ -227,11 +227,9 @@ void empty_buffer(connsock_t *cs)
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
int read_socket_line(connsock_t *cs, int timeout) int read_socket_line(connsock_t *cs, int timeout)
{ {
tv_t tv_timeout = {timeout, 0};
char *eom = NULL; char *eom = NULL;
size_t buflen; size_t buflen;
int ret = -1; int ret = -1;
fd_set rd;
if (unlikely(cs->fd < 0)) if (unlikely(cs->fd < 0))
goto out; goto out;
@ -250,11 +248,7 @@ int read_socket_line(connsock_t *cs, int timeout)
while (42) { while (42) {
char readbuf[PAGESIZE] = {}; char readbuf[PAGESIZE] = {};
FD_ZERO(&rd); ret = wait_read_select(cs->fd, eom ? 0 : timeout);
FD_SET(cs->fd, &rd);
if (eom)
tv_timeout.tv_sec = tv_timeout.tv_usec = 0;
ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout);
if (eom && !ret) if (eom && !ret)
break; break;
if (ret < 1) { if (ret < 1) {
@ -697,20 +691,17 @@ static void sighandler(int sig)
{ {
ckpool_t *ckp = global_ckp; ckpool_t *ckp = global_ckp;
LOGWARNING("Parent process %s received signal %d, shutting down",
ckp->name, sig);
pthread_cancel(ckp->pth_watchdog); pthread_cancel(ckp->pth_watchdog);
join_pthread(ckp->pth_watchdog); join_pthread(ckp->pth_watchdog);
if (sig != 9) { __shutdown_children(ckp, SIGTERM);
__shutdown_children(ckp, SIGTERM); /* Wait a second, then send SIGKILL */
/* Wait a second, then send SIGKILL */ sleep(1);
sleep(1); __shutdown_children(ckp, SIGKILL);
__shutdown_children(ckp, SIGKILL); pthread_cancel(ckp->pth_listener);
pthread_cancel(ckp->pth_listener); exit(0);
exit(0);
} else {
__shutdown_children(ckp, SIGKILL);
exit(1);
}
} }
static void json_get_string(char **store, json_t *val, const char *res) static void json_get_string(char **store, json_t *val, const char *res)

13
src/connector.c

@ -260,7 +260,9 @@ retry:
cksleep_ms(100); cksleep_ms(100);
goto retry; goto retry;
} }
ret = poll(fds, nfds, 1000); do {
ret = poll(fds, nfds, 1000);
} while (unlikely(ret < 0 && interrupted()));
if (ret < 0) { if (ret < 0) {
LOGERR("Failed to poll in receiver"); LOGERR("Failed to poll in receiver");
goto out; goto out;
@ -308,7 +310,6 @@ void *sender(void *arg)
while (42) { while (42) {
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
tv_t timeout_tv = {0, 0};
bool only_send = false; bool only_send = false;
int ret, fd, ofs = 0; int ret, fd, ofs = 0;
@ -346,11 +347,7 @@ void *sender(void *arg)
* ready to receive data from us, put the send back on the * ready to receive data from us, put the send back on the
* list. */ * list. */
if (!only_send) { if (!only_send) {
fd_set writefds; ret = wait_write_select(fd, 0);
FD_ZERO(&writefds);
FD_SET(fd, &writefds);
ret = select(fd + 1, NULL, &writefds, NULL, &timeout_tv);
if (ret < 1) { if (ret < 1) {
LOGDEBUG("Client %d not ready for writes", client->id); LOGDEBUG("Client %d not ready for writes", client->id);
@ -480,7 +477,7 @@ retry:
close(sockd); close(sockd);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGERR("Failed to accept on connector socket, retrying in 5s"); LOGEMERG("Failed to accept on connector socket, exiting");
ret = 1; ret = 1;
goto out; goto out;
} }

8
src/generator.c

@ -220,9 +220,7 @@ retry:
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
if (interrupted()) LOGEMERG("Failed to accept on generator socket");
goto retry;
LOGERR("Failed to accept on generator socket");
ret = 1; ret = 1;
goto out; goto out;
} }
@ -1227,9 +1225,7 @@ reconnect:
retry: retry:
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
if (interrupted()) LOGEMERG("Failed to accept on proxy socket");
goto retry;
LOGERR("Failed to accept on proxy socket");
ret = 1; ret = 1;
goto out; goto out;
} }

28
src/libckpool.c

@ -411,19 +411,15 @@ int connect_socket(char *url, char *port)
* we can connect to quickly. */ * we can connect to quickly. */
noblock_socket(sockd); noblock_socket(sockd);
if (connect(sockd, p->ai_addr, p->ai_addrlen) == -1) { if (connect(sockd, p->ai_addr, p->ai_addrlen) == -1) {
struct timeval tv_timeout = {5, 0};
int selret; int selret;
fd_set rw;
if (!sock_connecting()) { if (!sock_connecting()) {
close(sockd); close(sockd);
LOGDEBUG("Failed sock connect"); LOGDEBUG("Failed sock connect");
continue; continue;
} }
FD_ZERO(&rw); selret = wait_write_select(sockd, 5);
FD_SET(sockd, &rw); if (selret > 0) {
selret = select(sockd + 1, NULL, &rw, NULL, &tv_timeout);
if (selret > 0 && FD_ISSET(sockd, &rw)) {
socklen_t len; socklen_t len;
int err, n; int err, n;
@ -457,13 +453,9 @@ out:
int write_socket(int fd, const void *buf, size_t nbyte) int write_socket(int fd, const void *buf, size_t nbyte)
{ {
tv_t tv_timeout = {1, 0};
fd_set writefds;
int ret; int ret;
FD_ZERO(&writefds); ret = wait_write_select(fd, 5);
FD_SET(fd, &writefds);
ret = select(fd + 1, NULL, &writefds, NULL, &tv_timeout);
if (ret < 1) { if (ret < 1) {
if (!ret) if (!ret)
LOGNOTICE("Select timed out in write_socket"); LOGNOTICE("Select timed out in write_socket");
@ -487,12 +479,8 @@ void empty_socket(int fd)
do { do {
char buf[PAGESIZE]; char buf[PAGESIZE];
tv_t timeout = {0, 0};
fd_set rd;
FD_ZERO(&rd); ret = wait_read_select(fd, 0);
FD_SET(fd, &rd);
ret = select(fd + 1, &rd, NULL, NULL, &timeout);
if (ret > 0) { if (ret > 0) {
ret = recv(fd, buf, PAGESIZE - 1, 0); ret = recv(fd, buf, PAGESIZE - 1, 0);
buf[ret] = 0; buf[ret] = 0;
@ -625,7 +613,9 @@ int wait_read_select(int sockd, int timeout)
FD_ZERO(&readfs); FD_ZERO(&readfs);
FD_SET(sockd, &readfs); FD_SET(sockd, &readfs);
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); do {
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout);
} while (unlikely(ret < 0 && interrupted()));
return ret; return ret;
} }
@ -700,7 +690,9 @@ int wait_write_select(int sockd, int timeout)
FD_ZERO(&writefds); FD_ZERO(&writefds);
FD_SET(sockd, &writefds); FD_SET(sockd, &writefds);
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); do {
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout);
} while (unlikely(ret < 0 && interrupted()));
return ret; return ret;
} }

8
src/stratifier.c

@ -949,11 +949,9 @@ retry:
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
if (interrupted()) LOGERR("Failed to accept on stratifier socket, exiting");
goto retry; ret = 1;
LOGERR("Failed to accept on stratifier socket, retrying in 5s"); goto out;
sleep(5);
goto retry;
} }
dealloc(buf); dealloc(buf);

Loading…
Cancel
Save