Browse Source

Reference the actual message we're blocking on in the generator to a proxy and continue only that one

master
Con Kolivas 10 years ago
parent
commit
ef67643d5e
  1. 13
      src/generator.c

13
src/generator.c

@ -74,6 +74,7 @@ struct pass_msg {
}; };
typedef struct pass_msg pass_msg_t; typedef struct pass_msg pass_msg_t;
typedef struct cs_msg cs_msg_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
@ -115,7 +116,9 @@ struct proxy_instance {
bool reconnecting; /* Testing of parent in progress */ bool reconnecting; /* Testing of parent in progress */
int64_t recruit; /* No of recruiting requests in progress */ int64_t recruit; /* No of recruiting requests in progress */
bool alive; bool alive;
bool sending; /* Are we in the middle of a blocked write? */
/* Are we in the middle of a blocked write of this message? */
cs_msg_t *sending;
pthread_t pth_precv; pthread_t pth_precv;
@ -1515,8 +1518,6 @@ out:
return ret; return ret;
} }
typedef struct cs_msg cs_msg_t;
struct cs_msg { struct cs_msg {
cs_msg_t *next; cs_msg_t *next;
cs_msg_t *prev; cs_msg_t *prev;
@ -1538,12 +1539,12 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq)
/* Only try to send one message at a time to each proxy /* Only try to send one message at a time to each proxy
* to avoid sending parts of different messages */ * to avoid sending parts of different messages */
if (proxy->sending) if (proxy->sending != csmsg)
continue; continue;
while (csmsg->len) { while (csmsg->len) {
int fd; int fd;
proxy->sending = true; proxy->sending = csmsg;
fd = proxy->cs.fd; fd = proxy->cs.fd;
ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT); ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
@ -1558,7 +1559,7 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq)
csmsg->len -= ret; csmsg->len -= ret;
} }
if (!csmsg->len) { if (!csmsg->len) {
proxy->sending = false; proxy->sending = NULL;
DL_DELETE(*csmsgq, csmsg); DL_DELETE(*csmsgq, csmsg);
free(csmsg->buf); free(csmsg->buf);
free(csmsg); free(csmsg);

Loading…
Cancel
Save