diff --git a/src/ckpool.c b/src/ckpool.c index 9b9396e6..ccb708c3 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -495,11 +495,13 @@ void empty_buffer(connsock_t *cs) /* Read from a socket into cs->buf till we get an '\n', converting it to '\0' * and storing how much extra data we've received, to be moved to the beginning * of the buffer for use on the next receive. */ -int read_socket_line(connsock_t *cs, const int timeout) +int read_socket_line(connsock_t *cs, float timeout) { int fd = cs->fd, ret = -1; char *eom = NULL; + tv_t start, now; size_t buflen; + float diff; if (unlikely(fd < 0)) goto out; @@ -515,28 +517,35 @@ int read_socket_line(connsock_t *cs, const int timeout) eom = strchr(cs->buf, '\n'); } + tv_time(&start); +rewait: + ret = wait_read_select(fd, eom ? 0 : timeout); + if (ret < 1) { + if (!ret) { + if (eom) + goto parse; + LOGDEBUG("Select timed out in read_socket_line"); + } else + LOGERR("Select failed in read_socket_line"); + goto out; + } + tv_time(&now); + diff = tvdiff(&now, &start); + timeout -= diff; while (42) { char readbuf[PAGESIZE] = {}; int backoff = 1; char *newbuf; - ret = wait_read_select(fd, eom ? 0 : timeout); - if (ret < 1) { - if (eom) - break; - if (!ret) - LOGDEBUG("Select timed out in read_socket_line"); - else - LOGERR("Select failed in read_socket_line"); - goto out; - } - ret = recv(fd, readbuf, PAGESIZE - 4, 0); + ret = recv(fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { - /* Closed socket after valid message */ + /* No more to read or closed socket after valid message */ if (eom) break; + /* Have we used up all the timeout yet? */ + if (timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) + goto rewait; LOGERR("Failed to recv in read_socket_line"); - ret = -1; goto out; } buflen = cs->bufofs + ret + 1; @@ -557,6 +566,7 @@ int read_socket_line(connsock_t *cs, const int timeout) cs->buf[cs->bufofs] = '\0'; eom = strchr(cs->buf, '\n'); } +parse: ret = eom - cs->buf; cs->buflen = cs->buf + cs->bufofs - eom - 1; diff --git a/src/ckpool.h b/src/ckpool.h index 656fc5fa..f5bfdb20 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -242,7 +242,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); -int read_socket_line(connsock_t *cs, const int timeout); +int read_socket_line(connsock_t *cs, float timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); diff --git a/src/connector.c b/src/connector.c index 0bd6fac7..3107901c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -24,6 +24,7 @@ #define MAX_MSGSIZE 1024 typedef struct client_instance client_instance_t; +typedef struct sender_send sender_send_t; struct client_instance { /* For clients hashtable */ @@ -53,6 +54,8 @@ struct client_instance { char buf[PAGESIZE]; int bufofs; + /* Are we currently sending a blocked message from this client */ + sender_send_t *sending; bool passthrough; }; @@ -66,8 +69,6 @@ struct sender_send { int ofs; }; -typedef struct sender_send sender_send_t; - /* Private data for the connector */ struct connector_data { ckpool_t *ckp; @@ -557,7 +558,12 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende client_instance_t *client = sender_send->client; if (unlikely(client->invalid)) - return true; + goto out_true; + + /* Make sure we only send one message at a time to each client */ + if (unlikely(client->sending && client->sending != sender_send)) + return false; + client->sending = sender_send; while (sender_send->len) { int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); @@ -568,11 +574,13 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", client->id, client->fd, errno, strerror(errno)); invalidate_client(ckp, cdata, client); - return true; + goto out_true; } sender_send->ofs += ret; sender_send->len -= ret; } +out_true: + client->sending = NULL; return true; } diff --git a/src/libckpool.c b/src/libckpool.c index 07a1727a..09bfa9a2 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -903,7 +903,7 @@ int wait_close(int sockd, int timeout) } /* Emulate a select read wait for high fds that select doesn't support */ -int wait_read_select(int sockd, int timeout) +int wait_read_select(int sockd, float timeout) { struct pollfd sfd; int ret = -1; @@ -985,7 +985,7 @@ out: } /* Emulate a select write wait for high fds that select doesn't support */ -int wait_write_select(int sockd, int timeout) +int wait_write_select(int sockd, float timeout) { struct pollfd sfd; int ret = -1; diff --git a/src/libckpool.h b/src/libckpool.h index 8901e3be..0180606c 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -490,7 +490,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) int wait_close(int sockd, int timeout); -int wait_read_select(int sockd, int timeout); +int wait_read_select(int sockd, float timeout); int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); #define RECV_UNIX_TIMEOUT1 30 @@ -498,7 +498,7 @@ char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, co #define recv_unix_msg(sockd) _recv_unix_msg(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) #define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) #define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__) -int wait_write_select(int sockd, int timeout); +int wait_write_select(int sockd, float timeout); int write_length(int sockd, const void *buf, int len); bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line); #define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__)