|
|
@ -326,13 +326,13 @@ void *sender(void *arg) |
|
|
|
{ |
|
|
|
{ |
|
|
|
conn_instance_t *ci = (conn_instance_t *)arg; |
|
|
|
conn_instance_t *ci = (conn_instance_t *)arg; |
|
|
|
ckpool_t *ckp = ci->pi->ckp; |
|
|
|
ckpool_t *ckp = ci->pi->ckp; |
|
|
|
|
|
|
|
bool polling = false; |
|
|
|
|
|
|
|
|
|
|
|
rename_proc("csender"); |
|
|
|
rename_proc("csender"); |
|
|
|
|
|
|
|
|
|
|
|
while (42) { |
|
|
|
while (42) { |
|
|
|
sender_send_t *sender_send; |
|
|
|
sender_send_t *sender_send; |
|
|
|
client_instance_t *client; |
|
|
|
client_instance_t *client; |
|
|
|
bool only_send = false; |
|
|
|
|
|
|
|
int ret, fd, ofs = 0; |
|
|
|
int ret, fd, ofs = 0; |
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&sender_lock); |
|
|
|
mutex_lock(&sender_lock); |
|
|
@ -340,14 +340,17 @@ void *sender(void *arg) |
|
|
|
ts_t timeout_ts; |
|
|
|
ts_t timeout_ts; |
|
|
|
|
|
|
|
|
|
|
|
ts_realtime(&timeout_ts); |
|
|
|
ts_realtime(&timeout_ts); |
|
|
|
timeout_ts.tv_sec += 1; |
|
|
|
if (!polling) |
|
|
|
|
|
|
|
timeout_ts.tv_sec += 1; |
|
|
|
|
|
|
|
else { |
|
|
|
|
|
|
|
ts_t wait_ts = {0, 1000000}; |
|
|
|
|
|
|
|
timeraddspec(&timeout_ts, &wait_ts); |
|
|
|
|
|
|
|
} |
|
|
|
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); |
|
|
|
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); |
|
|
|
} |
|
|
|
} |
|
|
|
sender_send = sender_sends; |
|
|
|
sender_send = sender_sends; |
|
|
|
if (likely(sender_send)) |
|
|
|
if (likely(sender_send)) |
|
|
|
DL_DELETE(sender_sends, sender_send); |
|
|
|
DL_DELETE(sender_sends, sender_send); |
|
|
|
if (!sender_send) |
|
|
|
|
|
|
|
only_send = true; |
|
|
|
|
|
|
|
mutex_unlock(&sender_lock); |
|
|
|
mutex_unlock(&sender_lock); |
|
|
|
|
|
|
|
|
|
|
|
if (!sender_send) |
|
|
|
if (!sender_send) |
|
|
@ -365,22 +368,23 @@ void *sender(void *arg) |
|
|
|
free(sender_send); |
|
|
|
free(sender_send); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
/* If there are other sends pending and this socket is not
|
|
|
|
/* If this socket is not ready to receive data from us, put the
|
|
|
|
* ready to receive data from us, put the send back on the |
|
|
|
* send back on the tail of the list and decrease the timeout |
|
|
|
* list. */ |
|
|
|
* to poll to either look for a client that is ready or poll |
|
|
|
if (!only_send) { |
|
|
|
* select on this one */ |
|
|
|
ret = wait_write_select(fd, 0); |
|
|
|
ret = wait_write_select(fd, 0); |
|
|
|
if (ret < 1) { |
|
|
|
if (ret < 1) { |
|
|
|
LOGDEBUG("Client %d not ready for writes", client->id); |
|
|
|
LOGDEBUG("Client %d not ready for writes", client->id); |
|
|
|
|
|
|
|
|
|
|
|
/* Append it to the tail of the list */ |
|
|
|
/* Append it to the tail of the list */ |
|
|
|
mutex_lock(&sender_lock); |
|
|
|
mutex_lock(&sender_lock); |
|
|
|
DL_APPEND(sender_sends, sender_send); |
|
|
|
DL_APPEND(sender_sends, sender_send); |
|
|
|
mutex_unlock(&sender_lock); |
|
|
|
mutex_unlock(&sender_lock); |
|
|
|
|
|
|
|
|
|
|
|
continue; |
|
|
|
polling = true; |
|
|
|
} |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
polling = false; |
|
|
|
while (sender_send->len) { |
|
|
|
while (sender_send->len) { |
|
|
|
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); |
|
|
|
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); |
|
|
|
if (unlikely(ret < 0)) { |
|
|
|
if (unlikely(ret < 0)) { |
|
|
|