From 873b1b702a79e2dff1d65d8a985fd8f53cf904ad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 26 Aug 2014 11:19:13 +1000 Subject: [PATCH] Reattempt sending delayed sends immediately after servicing one send successfully. --- src/connector.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index d91755da..63b5fec2 100644 --- a/src/connector.c +++ b/src/connector.c @@ -340,6 +340,7 @@ void *sender(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; ckpool_t *ckp = ci->pi->ckp; + bool sent = false; rename_proc("csender"); @@ -349,8 +350,11 @@ void *sender(void *arg) int ret, fd, ofs = 0; mutex_lock(&sender_lock); - /* Poll every 100ms if there are no new sends */ - if (!sender_sends) { + /* Poll every 100ms if there are no new sends. Re-examine + * delayed sends immediately after a successful send in case + * endless new sends more frequently end up starving the + * delayed sends. */ + if (!sender_sends && !sent) { const ts_t polltime = {0, 100000000}; ts_t timeout_ts; @@ -363,8 +367,11 @@ void *sender(void *arg) DL_DELETE(sender_sends, sender_send); mutex_unlock(&sender_lock); + sent = false; + /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing. */ + * conditional with no new sends appearing or have just + * serviced another message successfully. */ if (!sender_send) { if (!delayed_sends) continue; @@ -404,6 +411,7 @@ void *sender(void *arg) DL_APPEND(delayed_sends, sender_send); continue; } + sent = true; while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) {