diff --git a/src/generator.c b/src/generator.c index ffae0180..78b34d4d 100644 --- a/src/generator.c +++ b/src/generator.c @@ -411,6 +411,7 @@ out: return ret; } +/* This is for blocking sends of json messages */ static bool send_json_msg(connsock_t *cs, const json_t *json_msg) { int len, sent; @@ -1519,31 +1520,38 @@ struct cs_msg { cs_msg_t *next; cs_msg_t *prev; proxy_instance_t *proxy; - json_t *val; + char *buf; + int len; + int ofs; }; /* Sends all messages in the queue ready to be dispatched, leaving those that - * fail write select to be handled next pass */ + * would block to be handled next pass */ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) { cs_msg_t *csmsg, *tmp; int ret; DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { - connsock_t *cs = &(csmsg->proxy->cs); + while (csmsg->len) { + proxy_instance_t *proxy = csmsg->proxy; + int fd = proxy->cs.fd; - if ((ret = wait_write_select(cs->fd, 0))) { - DL_DELETE(*csmsgq, csmsg); - if (ret > 0) - ret = send_json_msg(cs, csmsg->val); - json_decref(csmsg->val); + ret = send(fd, csmsg->buf + csmsg->ofs, csmsg->len, MSG_DONTWAIT); if (ret < 1) { - proxy_instance_t *proxy = csmsg->proxy; - + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + csmsg->len = 0; LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping", proxy->id, proxy->subid, proxy->url); disable_subproxy(gdata, proxy->parent, proxy); } + csmsg->ofs += ret; + csmsg->len -= ret; + } + if (!csmsg->len) { + DL_DELETE(*csmsgq, csmsg); + free(csmsg->buf); free(csmsg); } } @@ -1551,10 +1559,16 @@ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val) { - cs_msg_t *csmsg = ckalloc(sizeof(cs_msg_t)); + cs_msg_t *csmsg = ckzalloc(sizeof(cs_msg_t)); - csmsg->val = *val; + csmsg->buf = json_dumps(*val, JSON_ESCAPE_SLASH | JSON_EOL); + json_decref(*val); *val = NULL; + if (unlikely(!csmsg->buf)) { + LOGWARNING("Failed to create json dump in add_json_msgq"); + return; + } + csmsg->len = strlen(csmsg->buf); csmsg->proxy = proxy; DL_APPEND(*csmsgq, csmsg); }