From 3919d182607ad39ec8256ceaeef5d8e84730e2c0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 07:17:51 +1000 Subject: [PATCH] Rework write path to have no potentially blocking calls and be able to send partial messages --- src/connector.c | 135 +++++++++++++++++++----------------------------- 1 file changed, 52 insertions(+), 83 deletions(-) diff --git a/src/connector.c b/src/connector.c index 3e020871..2ac6bb40 100644 --- a/src/connector.c +++ b/src/connector.c @@ -63,6 +63,7 @@ struct sender_send { client_instance_t *client; char *buf; int len; + int ofs; }; typedef struct sender_send sender_send_t; @@ -98,7 +99,6 @@ struct connector_data { /* For the linked list of pending sends */ sender_send_t *sender_sends; - sender_send_t *delayed_sends; int64_t sends_generated; int64_t sends_delayed; @@ -559,27 +559,65 @@ out: return NULL; } +/* Send a sender_send message and return true if we've finished sending it or + * are unable to send any more. */ +static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) +{ + client_instance_t *client = sender_send->client; + + if (unlikely(client->invalid)) + return true; + + while (sender_send->len) { + int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT); + + if (unlikely(ret < 1)) { + if (!ret) + return false; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return false; + LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); + invalidate_client(ckp, cdata, client); + return true; + } + sender_send->ofs += ret; + sender_send->len -= ret; + } + return true; +} + +static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) +{ + dec_instance_ref(cdata, sender_send->client); + free(sender_send->buf); + free(sender_send); +} + /* Use a thread to send queued messages, using select() to only send to sockets * ready for writing immediately to not delay other messages. */ -void *sender(void *arg) +static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; + sender_send_t *sends = NULL; ckpool_t *ckp = cdata->ckp; - bool sent = false; rename_proc("csender"); while (42) { - sender_send_t *sender_send, *delayed; - client_instance_t *client; - int ret = 0, fd, ofs = 0; + sender_send_t *sender_send, *sending, *tmp; + + /* Check all sends to see if they can be written out */ + DL_FOREACH_SAFE(sends, sending, tmp) { + if (send_sender_send(ckp, cdata, sending)) { + DL_DELETE(sends, sending); + clear_sender_send(sending, cdata); + } else + cdata->sends_delayed++; + } mutex_lock(&cdata->sender_lock); - /* Poll every 10ms if there are no new sends. Re-examine - * delayed sends immediately after a successful send in case - * endless new sends more frequently end up starving the - * delayed sends. */ - if (!cdata->sender_sends && !sent) { + /* Poll every 10ms if there are no new sends. */ + if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; ts_t timeout_ts; @@ -592,70 +630,8 @@ void *sender(void *arg) DL_DELETE(cdata->sender_sends, sender_send); mutex_unlock(&cdata->sender_lock); - sent = false; - - /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing or have just - * serviced another message successfully. */ - if (!sender_send) { - /* Find a delayed client that needs servicing and set - * ret accordingly. We do not need to use FOREACH_SAFE - * as we break out of the loop as soon as we manipuate - * the list. */ - DL_FOREACH(cdata->delayed_sends, delayed) { - if ((ret = wait_write_select(delayed->client->fd, 0))) { - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); - break; - } - } - /* None found ? */ - if (!sender_send) - continue; - } - client = sender_send->client; - if (unlikely(client->invalid)) { - LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); - goto contfree; - } - - /* If this socket is not ready to receive data from us, put the - * send back on the tail of the list and decrease the timeout - * to poll to either look for a client that is ready or poll - * select on this one */ - fd = client->fd; - if (!ret) - ret = wait_write_select(fd, 0); - if (ret < 1) { - if (ret < 0) { - LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); - invalidate_client(ckp, cdata, client); - goto contfree; - } - LOGDEBUG("Client %"PRId64" not ready for writes", client->id); - - /* Append it to the tail of the delayed sends list. - * This is the only function that alters it so no - * locking is required. Keep the client ref. */ - DL_APPEND(cdata->delayed_sends, sender_send); - cdata->sends_delayed++; - continue; - } - while (sender_send->len) { - ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); - if (unlikely(ret < 0)) { - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); - invalidate_client(ckp, cdata, client); - break; - } - ofs += ret; - sender_send->len -= ret; - } -contfree: - sent = true; - free(sender_send->buf); - free(sender_send); - dec_instance_ref(cdata, client); + if (sender_send) + DL_APPEND(sends, sender_send); } /* We shouldn't get here unless there's an error */ childsighandler(15); @@ -787,15 +763,8 @@ static char *connector_stats(cdata_t *cdata) objects = 0; memsize = 0; - mutex_lock(&cdata->sender_lock); generated = cdata->sends_delayed; - DL_FOREACH(cdata->delayed_sends, send) { - objects++; - memsize += sizeof(sender_send_t) + send->len + 1; - } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si}", "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);