Browse Source

Disconnect clients whose sends block for more than 60 seconds

master
Con Kolivas 9 years ago
parent
commit
b0c30a3f7d
  1. 19
      src/connector.c

19
src/connector.c

@ -57,6 +57,9 @@ struct client_instance {
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
sender_send_t *sending; sender_send_t *sending;
bool passthrough; bool passthrough;
/* Time this client started blocking, 0 when not blocked */
time_t blocked_time;
}; };
struct sender_send { struct sender_send {
@ -556,6 +559,7 @@ out:
static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send)
{ {
client_instance_t *client = sender_send->client; client_instance_t *client = sender_send->client;
time_t now_t;
if (unlikely(client->invalid)) if (unlikely(client->invalid))
goto out_true; goto out_true;
@ -563,14 +567,26 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
/* Make sure we only send one message at a time to each client */ /* Make sure we only send one message at a time to each client */
if (unlikely(client->sending && client->sending != sender_send)) if (unlikely(client->sending && client->sending != sender_send))
return false; return false;
client->sending = sender_send; client->sending = sender_send;
now_t = time(NULL);
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);
if (unlikely(ret < 1)) { if (unlikely(ret < 1)) {
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) /* Invalidate clients that block for more than 60 seconds */
if (unlikely(client->blocked_time && now_t - client->blocked_time >= 60)) {
LOGNOTICE("Client id %"PRId64" fd %d blocked for >60 seconds, disconnecting",
client->id, client->fd);
invalidate_client(ckp, cdata, client);
goto out_true;
}
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) {
if (!client->blocked_time)
client->blocked_time = now_t;
return false; return false;
}
LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s",
client->id, client->fd, errno, strerror(errno)); client->id, client->fd, errno, strerror(errno));
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
@ -578,6 +594,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
} }
sender_send->ofs += ret; sender_send->ofs += ret;
sender_send->len -= ret; sender_send->len -= ret;
client->blocked_time = 0;
} }
out_true: out_true:
client->sending = NULL; client->sending = NULL;

Loading…
Cancel
Save