|
|
@ -115,6 +115,7 @@ struct proxy_instance { |
|
|
|
bool reconnecting; /* Testing of parent in progress */ |
|
|
|
bool reconnecting; /* Testing of parent in progress */ |
|
|
|
int64_t recruit; /* No of recruiting requests in progress */ |
|
|
|
int64_t recruit; /* No of recruiting requests in progress */ |
|
|
|
bool alive; |
|
|
|
bool alive; |
|
|
|
|
|
|
|
bool sending; /* Are we in the middle of a blocked write? */ |
|
|
|
|
|
|
|
|
|
|
|
pthread_t pth_precv; |
|
|
|
pthread_t pth_precv; |
|
|
|
|
|
|
|
|
|
|
@ -1533,10 +1534,17 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) |
|
|
|
int ret; |
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { |
|
|
|
DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { |
|
|
|
|
|
|
|
proxy_instance_t *proxy = csmsg->proxy; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Only try to send one message at a time to each proxy
|
|
|
|
|
|
|
|
* to avoid sending parts of different messages */ |
|
|
|
|
|
|
|
if (proxy->sending) |
|
|
|
|
|
|
|
continue; |
|
|
|
while (csmsg->len) { |
|
|
|
while (csmsg->len) { |
|
|
|
proxy_instance_t *proxy = csmsg->proxy; |
|
|
|
int fd; |
|
|
|
int fd = proxy->cs.fd; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
proxy->sending = true; |
|
|
|
|
|
|
|
fd = proxy->cs.fd; |
|
|
|
ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT); |
|
|
|
ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT); |
|
|
|
if (ret < 1) { |
|
|
|
if (ret < 1) { |
|
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) |
|
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) |
|
|
@ -1550,6 +1558,7 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) |
|
|
|
csmsg->len -= ret; |
|
|
|
csmsg->len -= ret; |
|
|
|
} |
|
|
|
} |
|
|
|
if (!csmsg->len) { |
|
|
|
if (!csmsg->len) { |
|
|
|
|
|
|
|
proxy->sending = false; |
|
|
|
DL_DELETE(*csmsgq, csmsg); |
|
|
|
DL_DELETE(*csmsgq, csmsg); |
|
|
|
free(csmsg->buf); |
|
|
|
free(csmsg->buf); |
|
|
|
free(csmsg); |
|
|
|
free(csmsg); |
|
|
|