kanoi 10 years ago
parent
commit
8d016f1811
  1. 34
      src/ckpool.c
  2. 2
      src/ckpool.h
  3. 16
      src/connector.c
  4. 4
      src/libckpool.c
  5. 4
      src/libckpool.h

34
src/ckpool.c

@ -495,11 +495,13 @@ void empty_buffer(connsock_t *cs)
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0' /* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning * and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
int read_socket_line(connsock_t *cs, const int timeout) int read_socket_line(connsock_t *cs, float timeout)
{ {
int fd = cs->fd, ret = -1; int fd = cs->fd, ret = -1;
char *eom = NULL; char *eom = NULL;
tv_t start, now;
size_t buflen; size_t buflen;
float diff;
if (unlikely(fd < 0)) if (unlikely(fd < 0))
goto out; goto out;
@ -515,28 +517,35 @@ int read_socket_line(connsock_t *cs, const int timeout)
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
} }
while (42) { tv_time(&start);
char readbuf[PAGESIZE] = {}; rewait:
int backoff = 1;
char *newbuf;
ret = wait_read_select(fd, eom ? 0 : timeout); ret = wait_read_select(fd, eom ? 0 : timeout);
if (ret < 1) { if (ret < 1) {
if (!ret) {
if (eom) if (eom)
break; goto parse;
if (!ret)
LOGDEBUG("Select timed out in read_socket_line"); LOGDEBUG("Select timed out in read_socket_line");
else } else
LOGERR("Select failed in read_socket_line"); LOGERR("Select failed in read_socket_line");
goto out; goto out;
} }
ret = recv(fd, readbuf, PAGESIZE - 4, 0); tv_time(&now);
diff = tvdiff(&now, &start);
timeout -= diff;
while (42) {
char readbuf[PAGESIZE] = {};
int backoff = 1;
char *newbuf;
ret = recv(fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
/* Closed socket after valid message */ /* No more to read or closed socket after valid message */
if (eom) if (eom)
break; break;
/* Have we used up all the timeout yet? */
if (timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret))
goto rewait;
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
ret = -1;
goto out; goto out;
} }
buflen = cs->bufofs + ret + 1; buflen = cs->bufofs + ret + 1;
@ -557,6 +566,7 @@ int read_socket_line(connsock_t *cs, const int timeout)
cs->buf[cs->bufofs] = '\0'; cs->buf[cs->bufofs] = '\0';
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
} }
parse:
ret = eom - cs->buf; ret = eom - cs->buf;
cs->buflen = cs->buf + cs->bufofs - eom - 1; cs->buflen = cs->buf + cs->bufofs - eom - 1;

2
src/ckpool.h

@ -242,7 +242,7 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout); int read_socket_line(connsock_t *cs, float timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);

16
src/connector.c

@ -24,6 +24,7 @@
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
typedef struct sender_send sender_send_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
@ -53,6 +54,8 @@ struct client_instance {
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
/* Are we currently sending a blocked message from this client */
sender_send_t *sending;
bool passthrough; bool passthrough;
}; };
@ -66,8 +69,6 @@ struct sender_send {
int ofs; int ofs;
}; };
typedef struct sender_send sender_send_t;
/* Private data for the connector */ /* Private data for the connector */
struct connector_data { struct connector_data {
ckpool_t *ckp; ckpool_t *ckp;
@ -557,7 +558,12 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
client_instance_t *client = sender_send->client; client_instance_t *client = sender_send->client;
if (unlikely(client->invalid)) if (unlikely(client->invalid))
return true; goto out_true;
/* Make sure we only send one message at a time to each client */
if (unlikely(client->sending && client->sending != sender_send))
return false;
client->sending = sender_send;
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);
@ -568,11 +574,13 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s",
client->id, client->fd, errno, strerror(errno)); client->id, client->fd, errno, strerror(errno));
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return true; goto out_true;
} }
sender_send->ofs += ret; sender_send->ofs += ret;
sender_send->len -= ret; sender_send->len -= ret;
} }
out_true:
client->sending = NULL;
return true; return true;
} }

4
src/libckpool.c

@ -903,7 +903,7 @@ int wait_close(int sockd, int timeout)
} }
/* Emulate a select read wait for high fds that select doesn't support */ /* Emulate a select read wait for high fds that select doesn't support */
int wait_read_select(int sockd, int timeout) int wait_read_select(int sockd, float timeout)
{ {
struct pollfd sfd; struct pollfd sfd;
int ret = -1; int ret = -1;
@ -985,7 +985,7 @@ out:
} }
/* Emulate a select write wait for high fds that select doesn't support */ /* Emulate a select write wait for high fds that select doesn't support */
int wait_write_select(int sockd, int timeout) int wait_write_select(int sockd, float timeout)
{ {
struct pollfd sfd; struct pollfd sfd;
int ret = -1; int ret = -1;

4
src/libckpool.h

@ -490,7 +490,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_close(int sockd, int timeout); int wait_close(int sockd, int timeout);
int wait_read_select(int sockd, int timeout); int wait_read_select(int sockd, float timeout);
int read_length(int sockd, void *buf, int len); int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);
#define RECV_UNIX_TIMEOUT1 30 #define RECV_UNIX_TIMEOUT1 30
@ -498,7 +498,7 @@ char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, co
#define recv_unix_msg(sockd) _recv_unix_msg(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) #define recv_unix_msg(sockd) _recv_unix_msg(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) #define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__) #define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__)
int wait_write_select(int sockd, int timeout); int wait_write_select(int sockd, float timeout);
int write_length(int sockd, const void *buf, int len); int write_length(int sockd, const void *buf, int len);
bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line); bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line);
#define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__) #define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__)

Loading…
Cancel
Save