Browse Source

Abandon trying to mix delayed sends with new ones, instead adding them to a separate linked list and always polling every 100ms

master
Con Kolivas 10 years ago
parent
commit
0fb34986dd
  1. 46
      src/connector.c

46
src/connector.c

@ -71,14 +71,13 @@ struct sender_send {
client_instance_t *client; client_instance_t *client;
char *buf; char *buf;
int len; int len;
bool polling;
ts_t polltime;
}; };
typedef struct sender_send sender_send_t; typedef struct sender_send sender_send_t;
/* For the linked list of pending sends */ /* For the linked list of pending sends */
static sender_send_t *sender_sends; static sender_send_t *sender_sends;
static sender_send_t *delayed_sends;
/* For protecting the pending sends list */ /* For protecting the pending sends list */
static pthread_mutex_t sender_lock; static pthread_mutex_t sender_lock;
@ -335,34 +334,30 @@ void *sender(void *arg)
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
int ret, fd, ofs = 0; int ret, fd, ofs = 0;
bool polling = false;
mutex_lock(&sender_lock); mutex_lock(&sender_lock);
if (sender_sends && sender_sends->polling) /* Poll every 100ms if there are no new sends */
polling = true; if (!sender_sends) {
if (!sender_sends || polling) { const ts_t polltime = {0, 100000000};
ts_t timeout_ts; ts_t timeout_ts;
if (!polling) { ts_realtime(&timeout_ts);
/* Wait 1 second in pure event driven mode */ timeraddspec(&timeout_ts, &polltime);
ts_realtime(&timeout_ts);
timeout_ts.tv_sec += 1;
} else {
/* Poll every 100ms if the head of the list is
* a delayed writer. */
timeout_ts.tv_sec = 0;
timeout_ts.tv_nsec = 100000000;
timeraddspec(&timeout_ts, &sender_sends->polltime);
}
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
} }
sender_send = sender_sends; sender_send = sender_sends;
if (likely(sender_send)) if (sender_send)
DL_DELETE(sender_sends, sender_send); DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock); mutex_unlock(&sender_lock);
if (!sender_send) /* Service delayed sends only if we have timed out on the
continue; * conditional with no new sends appearing. */
if (!sender_send) {
if (!delayed_sends)
continue;
sender_send = delayed_sends;
DL_DELETE(delayed_sends, sender_send);
}
client = sender_send->client; client = sender_send->client;
@ -390,13 +385,10 @@ void *sender(void *arg)
} }
LOGDEBUG("Client %d not ready for writes", client->id); LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the list */ /* Append it to the tail of the delayed sends list.
mutex_lock(&sender_lock); * This is the only function that alters it so no
DL_APPEND(sender_sends, sender_send); * locking is required. */
mutex_unlock(&sender_lock); DL_APPEND(delayed_sends, sender_send);
sender_send->polling = true;
ts_realtime(&sender_send->polltime);
continue; continue;
} }
while (sender_send->len) { while (sender_send->len) {

Loading…
Cancel
Save