diff --git a/src/connector.c b/src/connector.c index 9d053dba..135da0bb 100644 --- a/src/connector.c +++ b/src/connector.c @@ -466,17 +466,17 @@ void *sender(void *arg) rename_proc("csender"); while (42) { - sender_send_t *sender_send; + sender_send_t *sender_send, *delayed; client_instance_t *client; - int ret, fd, ofs = 0; + int ret = 0, fd, ofs = 0; mutex_lock(&cdata->sender_lock); - /* Poll every 100ms if there are no new sends. Re-examine + /* Poll every 10ms if there are no new sends. Re-examine * delayed sends immediately after a successful send in case * endless new sends more frequently end up starving the * delayed sends. */ if (!cdata->sender_sends && !sent) { - const ts_t polltime = {0, 100000000}; + const ts_t polltime = {0, 10000000}; ts_t timeout_ts; ts_realtime(&timeout_ts); @@ -494,23 +494,33 @@ void *sender(void *arg) * conditional with no new sends appearing or have just * serviced another message successfully. */ if (!sender_send) { - if (!cdata->delayed_sends) + /* Find a delayed client that needs servicing and set + * ret accordingly. We do not need to use FOREACH_SAFE + * as we break out of the loop as soon as we manipuate + * the list. */ + DL_FOREACH(cdata->delayed_sends, delayed) { + if ((ret = wait_write_select(delayed->client->fd, 0))) { + sender_send = cdata->delayed_sends; + DL_DELETE(cdata->delayed_sends, sender_send); + break; + } + } + /* None found ? */ + if (!sender_send) continue; - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); } - client = sender_send->client; - ck_rlock(&cdata->lock); - fd = client->fd; - ck_runlock(&cdata->lock); - /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ret = wait_write_select(fd, 0); + ck_rlock(&cdata->lock); + fd = client->fd; + if (!ret) + ret = wait_write_select(fd, 0); + ck_runlock(&cdata->lock); + if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -526,7 +536,6 @@ void *sender(void *arg) cdata->sends_delayed++; continue; } - sent = true; while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { @@ -538,6 +547,7 @@ void *sender(void *arg) sender_send->len -= ret; } contfree: + sent = true; free(sender_send->buf); free(sender_send); dec_instance_ref(cdata, client);