Browse Source

Maintain a reference to which message we're currently sending to a client to prevent sending interleaved messages should they block

master
Con Kolivas 10 years ago
parent
commit
b197ba2ff9
  1. 16
      src/connector.c

16
src/connector.c

@ -24,6 +24,7 @@
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
typedef struct sender_send sender_send_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
@ -53,6 +54,8 @@ struct client_instance {
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
/* Are we currently sending a blocked message from this client */
sender_send_t *sending;
bool passthrough; bool passthrough;
}; };
@ -66,8 +69,6 @@ struct sender_send {
int ofs; int ofs;
}; };
typedef struct sender_send sender_send_t;
/* Private data for the connector */ /* Private data for the connector */
struct connector_data { struct connector_data {
ckpool_t *ckp; ckpool_t *ckp;
@ -557,7 +558,12 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
client_instance_t *client = sender_send->client; client_instance_t *client = sender_send->client;
if (unlikely(client->invalid)) if (unlikely(client->invalid))
return true; goto out_true;
/* Make sure we only send one message at a time to each client */
if (unlikely(client->sending && client->sending != sender_send))
return false;
client->sending = sender_send;
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);
@ -568,11 +574,13 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s",
client->id, client->fd, errno, strerror(errno)); client->id, client->fd, errno, strerror(errno));
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return true; goto out_true;
} }
sender_send->ofs += ret; sender_send->ofs += ret;
sender_send->len -= ret; sender_send->len -= ret;
} }
out_true:
client->sending = NULL;
return true; return true;
} }

Loading…
Cancel
Save