|
|
@ -411,6 +411,7 @@ out: |
|
|
|
return ret; |
|
|
|
return ret; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This is for blocking sends of json messages */ |
|
|
|
static bool send_json_msg(connsock_t *cs, const json_t *json_msg) |
|
|
|
static bool send_json_msg(connsock_t *cs, const json_t *json_msg) |
|
|
|
{ |
|
|
|
{ |
|
|
|
int len, sent; |
|
|
|
int len, sent; |
|
|
@ -1519,31 +1520,38 @@ struct cs_msg { |
|
|
|
cs_msg_t *next; |
|
|
|
cs_msg_t *next; |
|
|
|
cs_msg_t *prev; |
|
|
|
cs_msg_t *prev; |
|
|
|
proxy_instance_t *proxy; |
|
|
|
proxy_instance_t *proxy; |
|
|
|
json_t *val; |
|
|
|
char *buf; |
|
|
|
|
|
|
|
int len; |
|
|
|
|
|
|
|
int ofs; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
/* Sends all messages in the queue ready to be dispatched, leaving those that
|
|
|
|
/* Sends all messages in the queue ready to be dispatched, leaving those that
|
|
|
|
* fail write select to be handled next pass */ |
|
|
|
* would block to be handled next pass */ |
|
|
|
static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) |
|
|
|
static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) |
|
|
|
{ |
|
|
|
{ |
|
|
|
cs_msg_t *csmsg, *tmp; |
|
|
|
cs_msg_t *csmsg, *tmp; |
|
|
|
int ret; |
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { |
|
|
|
DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { |
|
|
|
connsock_t *cs = &(csmsg->proxy->cs); |
|
|
|
while (csmsg->len) { |
|
|
|
|
|
|
|
|
|
|
|
if ((ret = wait_write_select(cs->fd, 0))) { |
|
|
|
|
|
|
|
DL_DELETE(*csmsgq, csmsg); |
|
|
|
|
|
|
|
if (ret > 0) |
|
|
|
|
|
|
|
ret = send_json_msg(cs, csmsg->val); |
|
|
|
|
|
|
|
json_decref(csmsg->val); |
|
|
|
|
|
|
|
if (ret < 1) { |
|
|
|
|
|
|
|
proxy_instance_t *proxy = csmsg->proxy; |
|
|
|
proxy_instance_t *proxy = csmsg->proxy; |
|
|
|
|
|
|
|
int fd = proxy->cs.fd; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT); |
|
|
|
|
|
|
|
if (ret < 1) { |
|
|
|
|
|
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK) |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
csmsg->len = 0; |
|
|
|
LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping", |
|
|
|
LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping", |
|
|
|
proxy->id, proxy->subid, proxy->url); |
|
|
|
proxy->id, proxy->subid, proxy->url); |
|
|
|
disable_subproxy(gdata, proxy->parent, proxy); |
|
|
|
disable_subproxy(gdata, proxy->parent, proxy); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
csmsg->ofs += ret; |
|
|
|
|
|
|
|
csmsg->len -= ret; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (!csmsg->len) { |
|
|
|
|
|
|
|
DL_DELETE(*csmsgq, csmsg); |
|
|
|
|
|
|
|
free(csmsg->buf); |
|
|
|
free(csmsg); |
|
|
|
free(csmsg); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1551,10 +1559,16 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) |
|
|
|
|
|
|
|
|
|
|
|
static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val) |
|
|
|
static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val) |
|
|
|
{ |
|
|
|
{ |
|
|
|
cs_msg_t *csmsg = ckalloc(sizeof(cs_msg_t)); |
|
|
|
cs_msg_t *csmsg = ckzalloc(sizeof(cs_msg_t)); |
|
|
|
|
|
|
|
|
|
|
|
csmsg->val = *val; |
|
|
|
csmsg->buf = json_dumps(*val, JSON_ESCAPE_SLASH | JSON_EOL); |
|
|
|
|
|
|
|
json_decref(*val); |
|
|
|
*val = NULL; |
|
|
|
*val = NULL; |
|
|
|
|
|
|
|
if (unlikely(!csmsg->buf)) { |
|
|
|
|
|
|
|
LOGWARNING("Failed to create json dump in add_json_msgq"); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
csmsg->len = strlen(csmsg->buf); |
|
|
|
csmsg->proxy = proxy; |
|
|
|
csmsg->proxy = proxy; |
|
|
|
DL_APPEND(*csmsgq, csmsg); |
|
|
|
DL_APPEND(*csmsgq, csmsg); |
|
|
|
} |
|
|
|
} |
|
|
|