From dcf55ff1fb896840abd48e960e109802b6ae4985 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Apr 2015 09:56:09 +1000 Subject: [PATCH] Maintain a reference to which message we're currently sending to a client to prevent sending interleaved messages should they block --- src/connector.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 5393ae1f..be006391 100644 --- a/src/connector.c +++ b/src/connector.c @@ -24,6 +24,7 @@ #define MAX_MSGSIZE 1024 typedef struct client_instance client_instance_t; +typedef struct sender_send sender_send_t; struct client_instance { /* For clients hashtable */ @@ -53,6 +54,8 @@ struct client_instance { char buf[PAGESIZE]; int bufofs; + /* Are we currently sending a blocked message from this client */ + sender_send_t *sending; bool passthrough; }; @@ -66,8 +69,6 @@ struct sender_send { int ofs; }; -typedef struct sender_send sender_send_t; - /* Private data for the connector */ struct connector_data { ckpool_t *ckp; @@ -556,7 +557,12 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende client_instance_t *client = sender_send->client; if (unlikely(client->invalid)) - return true; + goto out_true; + + /* Make sure we only send one message at a time to each client */ + if (unlikely(client->sending && client->sending != sender_send)) + return false; + client->sending = sender_send; while (sender_send->len) { int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); @@ -567,11 +573,13 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", client->id, client->fd, errno, strerror(errno)); invalidate_client(ckp, cdata, client); - return true; + goto out_true; } sender_send->ofs += ret; sender_send->len -= ret; } +out_true: + client->sending = NULL; return true; }