Browse Source

Check all delayed clients for a serviceable one in the connector when we can and consider dropping a client servicing one to not potentially create delayed sends faster than we service them

master
Con Kolivas 10 years ago
parent
commit
163fc40afb
  1. 34
      src/connector.c

34
src/connector.c

@ -466,17 +466,17 @@ void *sender(void *arg)
rename_proc("csender"); rename_proc("csender");
while (42) { while (42) {
sender_send_t *sender_send; sender_send_t *sender_send, *delayed;
client_instance_t *client; client_instance_t *client;
int ret, fd, ofs = 0; int ret = 0, fd, ofs = 0;
mutex_lock(&cdata->sender_lock); mutex_lock(&cdata->sender_lock);
/* Poll every 100ms if there are no new sends. Re-examine /* Poll every 10ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case * delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the * endless new sends more frequently end up starving the
* delayed sends. */ * delayed sends. */
if (!cdata->sender_sends && !sent) { if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 100000000}; const ts_t polltime = {0, 10000000};
ts_t timeout_ts; ts_t timeout_ts;
ts_realtime(&timeout_ts); ts_realtime(&timeout_ts);
@ -494,23 +494,33 @@ void *sender(void *arg)
* conditional with no new sends appearing or have just * conditional with no new sends appearing or have just
* serviced another message successfully. */ * serviced another message successfully. */
if (!sender_send) { if (!sender_send) {
if (!cdata->delayed_sends) /* Find a delayed client that needs servicing and set
continue; * ret accordingly. We do not need to use FOREACH_SAFE
* as we break out of the loop as soon as we manipuate
* the list. */
DL_FOREACH(cdata->delayed_sends, delayed) {
if ((ret = wait_write_select(delayed->client->fd, 0))) {
sender_send = cdata->delayed_sends; sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send); DL_DELETE(cdata->delayed_sends, sender_send);
break;
}
}
/* None found ? */
if (!sender_send)
continue;
} }
client = sender_send->client; client = sender_send->client;
ck_rlock(&cdata->lock);
fd = client->fd;
ck_runlock(&cdata->lock);
/* If this socket is not ready to receive data from us, put the /* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout * send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll * to poll to either look for a client that is ready or poll
* select on this one */ * select on this one */
ck_rlock(&cdata->lock);
fd = client->fd;
if (!ret)
ret = wait_write_select(fd, 0); ret = wait_write_select(fd, 0);
ck_runlock(&cdata->lock);
if (ret < 1) { if (ret < 1) {
if (ret < 0) { if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
@ -526,7 +536,6 @@ void *sender(void *arg)
cdata->sends_delayed++; cdata->sends_delayed++;
continue; continue;
} }
sent = true;
while (sender_send->len) { while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
@ -538,6 +547,7 @@ void *sender(void *arg)
sender_send->len -= ret; sender_send->len -= ret;
} }
contfree: contfree:
sent = true;
free(sender_send->buf); free(sender_send->buf);
free(sender_send); free(sender_send);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);

Loading…
Cancel
Save