Browse Source

Rework write path to have no potentially blocking calls and be able to send partial messages

master
Con Kolivas 10 years ago
parent
commit
3919d18260
  1. 135
      src/connector.c

135
src/connector.c

@ -63,6 +63,7 @@ struct sender_send {
client_instance_t *client; client_instance_t *client;
char *buf; char *buf;
int len; int len;
int ofs;
}; };
typedef struct sender_send sender_send_t; typedef struct sender_send sender_send_t;
@ -98,7 +99,6 @@ struct connector_data {
/* For the linked list of pending sends */ /* For the linked list of pending sends */
sender_send_t *sender_sends; sender_send_t *sender_sends;
sender_send_t *delayed_sends;
int64_t sends_generated; int64_t sends_generated;
int64_t sends_delayed; int64_t sends_delayed;
@ -559,27 +559,65 @@ out:
return NULL; return NULL;
} }
/* Send a sender_send message and return true if we've finished sending it or
* are unable to send any more. */
static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send)
{
client_instance_t *client = sender_send->client;
if (unlikely(client->invalid))
return true;
while (sender_send->len) {
int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT);
if (unlikely(ret < 1)) {
if (!ret)
return false;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return false;
LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd);
invalidate_client(ckp, cdata, client);
return true;
}
sender_send->ofs += ret;
sender_send->len -= ret;
}
return true;
}
static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata)
{
dec_instance_ref(cdata, sender_send->client);
free(sender_send->buf);
free(sender_send);
}
/* Use a thread to send queued messages, using select() to only send to sockets /* Use a thread to send queued messages, using select() to only send to sockets
* ready for writing immediately to not delay other messages. */ * ready for writing immediately to not delay other messages. */
void *sender(void *arg) static void *sender(void *arg)
{ {
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
sender_send_t *sends = NULL;
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
bool sent = false;
rename_proc("csender"); rename_proc("csender");
while (42) { while (42) {
sender_send_t *sender_send, *delayed; sender_send_t *sender_send, *sending, *tmp;
client_instance_t *client;
int ret = 0, fd, ofs = 0; /* Check all sends to see if they can be written out */
DL_FOREACH_SAFE(sends, sending, tmp) {
if (send_sender_send(ckp, cdata, sending)) {
DL_DELETE(sends, sending);
clear_sender_send(sending, cdata);
} else
cdata->sends_delayed++;
}
mutex_lock(&cdata->sender_lock); mutex_lock(&cdata->sender_lock);
/* Poll every 10ms if there are no new sends. Re-examine /* Poll every 10ms if there are no new sends. */
* delayed sends immediately after a successful send in case if (!cdata->sender_sends) {
* endless new sends more frequently end up starving the
* delayed sends. */
if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 10000000}; const ts_t polltime = {0, 10000000};
ts_t timeout_ts; ts_t timeout_ts;
@ -592,70 +630,8 @@ void *sender(void *arg)
DL_DELETE(cdata->sender_sends, sender_send); DL_DELETE(cdata->sender_sends, sender_send);
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
sent = false; if (sender_send)
DL_APPEND(sends, sender_send);
/* Service delayed sends only if we have timed out on the
* conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) {
/* Find a delayed client that needs servicing and set
* ret accordingly. We do not need to use FOREACH_SAFE
* as we break out of the loop as soon as we manipuate
* the list. */
DL_FOREACH(cdata->delayed_sends, delayed) {
if ((ret = wait_write_select(delayed->client->fd, 0))) {
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
break;
}
}
/* None found ? */
if (!sender_send)
continue;
}
client = sender_send->client;
if (unlikely(client->invalid)) {
LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id);
goto contfree;
}
/* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll
* select on this one */
fd = client->fd;
if (!ret)
ret = wait_write_select(fd, 0);
if (ret < 1) {
if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
invalidate_client(ckp, cdata, client);
goto contfree;
}
LOGDEBUG("Client %"PRId64" not ready for writes", client->id);
/* Append it to the tail of the delayed sends list.
* This is the only function that alters it so no
* locking is required. Keep the client ref. */
DL_APPEND(cdata->delayed_sends, sender_send);
cdata->sends_delayed++;
continue;
}
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd);
invalidate_client(ckp, cdata, client);
break;
}
ofs += ret;
sender_send->len -= ret;
}
contfree:
sent = true;
free(sender_send->buf);
free(sender_send);
dec_instance_ref(cdata, client);
} }
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */
childsighandler(15); childsighandler(15);
@ -787,15 +763,8 @@ static char *connector_stats(cdata_t *cdata)
objects = 0; objects = 0;
memsize = 0; memsize = 0;
mutex_lock(&cdata->sender_lock);
generated = cdata->sends_delayed; generated = cdata->sends_delayed;
DL_FOREACH(cdata->delayed_sends, send) { JSON_CPACK(subval, "{si}", "generated", generated);
objects++;
memsize += sizeof(sender_send_t) + send->len + 1;
}
mutex_unlock(&cdata->sender_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "delays", subval); json_set_object(val, "delays", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);

Loading…
Cancel
Save