|
|
|
@ -71,6 +71,8 @@ struct sender_send {
|
|
|
|
|
client_instance_t *client; |
|
|
|
|
char *buf; |
|
|
|
|
int len; |
|
|
|
|
bool polling; |
|
|
|
|
ts_t polltime; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
typedef struct sender_send sender_send_t; |
|
|
|
@ -326,7 +328,6 @@ void *sender(void *arg)
|
|
|
|
|
{ |
|
|
|
|
conn_instance_t *ci = (conn_instance_t *)arg; |
|
|
|
|
ckpool_t *ckp = ci->pi->ckp; |
|
|
|
|
bool polling = false; |
|
|
|
|
|
|
|
|
|
rename_proc("csender"); |
|
|
|
|
|
|
|
|
@ -334,17 +335,24 @@ void *sender(void *arg)
|
|
|
|
|
sender_send_t *sender_send; |
|
|
|
|
client_instance_t *client; |
|
|
|
|
int ret, fd, ofs = 0; |
|
|
|
|
bool polling = false; |
|
|
|
|
|
|
|
|
|
mutex_lock(&sender_lock); |
|
|
|
|
if (sender_sends && sender_sends->polling) |
|
|
|
|
polling = true; |
|
|
|
|
if (!sender_sends || polling) { |
|
|
|
|
ts_t timeout_ts; |
|
|
|
|
|
|
|
|
|
ts_realtime(&timeout_ts); |
|
|
|
|
if (!polling) |
|
|
|
|
if (!polling) { |
|
|
|
|
/* Wait 1 second in pure event driven mode */ |
|
|
|
|
ts_realtime(&timeout_ts); |
|
|
|
|
timeout_ts.tv_sec += 1; |
|
|
|
|
else { |
|
|
|
|
ts_t wait_ts = {0, 100000000}; |
|
|
|
|
timeraddspec(&timeout_ts, &wait_ts); |
|
|
|
|
} else { |
|
|
|
|
/* Poll every 100ms if the head of the list is
|
|
|
|
|
* a delayed writer. */ |
|
|
|
|
timeout_ts.tv_sec = 0; |
|
|
|
|
timeout_ts.tv_nsec = 100000000; |
|
|
|
|
timeraddspec(&timeout_ts, &sender_sends->polltime); |
|
|
|
|
} |
|
|
|
|
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); |
|
|
|
|
} |
|
|
|
@ -387,10 +395,10 @@ void *sender(void *arg)
|
|
|
|
|
DL_APPEND(sender_sends, sender_send); |
|
|
|
|
mutex_unlock(&sender_lock); |
|
|
|
|
|
|
|
|
|
polling = true; |
|
|
|
|
sender_send->polling = true; |
|
|
|
|
ts_realtime(&sender_send->polltime); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
polling = false; |
|
|
|
|
while (sender_send->len) { |
|
|
|
|
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); |
|
|
|
|
if (unlikely(ret < 0)) { |
|
|
|
|