Browse Source

Check all delayed clients for a serviceable one in the connector when we can and consider dropping a client servicing one to not potentially create delayed sends faster than we service them

master
Con Kolivas 10 years ago
parent
commit
0e7bc51541
  1. 34
      src/connector.c

34
src/connector.c

@ -455,17 +455,17 @@ void *sender(void *arg)
rename_proc("csender");
while (42) {
sender_send_t *sender_send;
sender_send_t *sender_send, *delayed;
client_instance_t *client;
int ret, fd, ofs = 0;
int ret = 0, fd, ofs = 0;
mutex_lock(&cdata->sender_lock);
/* Poll every 100ms if there are no new sends. Re-examine
/* Poll every 10ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the
* delayed sends. */
if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 100000000};
const ts_t polltime = {0, 10000000};
ts_t timeout_ts;
ts_realtime(&timeout_ts);
@ -483,23 +483,33 @@ void *sender(void *arg)
* conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) {
if (!cdata->delayed_sends)
continue;
/* Find a delayed client that needs servicing and set
* ret accordingly. We do not need to use FOREACH_SAFE
* as we break out of the loop as soon as we manipuate
* the list. */
DL_FOREACH(cdata->delayed_sends, delayed) {
if ((ret = wait_write_select(delayed->client->fd, 0))) {
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
break;
}
}
/* None found ? */
if (!sender_send)
continue;
}
client = sender_send->client;
ck_rlock(&cdata->lock);
fd = client->fd;
ck_runlock(&cdata->lock);
/* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll
* select on this one */
ck_rlock(&cdata->lock);
fd = client->fd;
if (!ret)
ret = wait_write_select(fd, 0);
ck_runlock(&cdata->lock);
if (ret < 1) {
if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
@ -515,7 +525,6 @@ void *sender(void *arg)
cdata->sends_delayed++;
continue;
}
sent = true;
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
@ -527,6 +536,7 @@ void *sender(void *arg)
sender_send->len -= ret;
}
contfree:
sent = true;
free(sender_send->buf);
free(sender_send);
dec_instance_ref(cdata, client);

Loading…
Cancel
Save