diff --git a/src/connector.c b/src/connector.c index e2820fe7..984c8f03 100644 --- a/src/connector.c +++ b/src/connector.c @@ -459,10 +459,10 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) } } -/* Client is holding a reference count from being on the epoll list */ -static void parse_client_msg(cdata_t *cdata, client_instance_t *client) +/* Client is holding a reference count from being on the epoll list. Returns + * true if we will still be receiving messages from this client. */ +static bool __parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - ckpool_t *ckp = cdata->ckp; int buflen, ret; json_t *val; char *eol; @@ -472,8 +472,7 @@ retry: if (!client->remote) { LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1)); } @@ -481,11 +480,10 @@ retry: ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE); if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) - return; + return true; LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, cdata, client); - return; + return false; } client->bufofs += ret; reparse: @@ -497,8 +495,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) { @@ -506,8 +503,7 @@ reparse: LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf); send_client(cdata, client->id, buf); - invalidate_client(ckp, cdata, client); - return; + return false; } else { char *s; @@ -549,6 +545,12 @@ reparse: goto retry; } +static void parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) +{ + if (unlikely(!__parse_client_msg(ckp, cdata, client))) + invalidate_client(ckp, cdata, client); +} + static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; @@ -571,6 +573,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; + ckpool_t *ckp = cdata->ckp; struct epoll_event event; uint64_t serverfds, i; int ret, epfd; @@ -634,7 +637,7 @@ void *receiver(void *arg) * message first. */ events = event.events; if (likely(events & EPOLLIN)) - parse_client_msg(cdata, client); + parse_client_msg(ckp, cdata, client); if (unlikely(client->invalid)) goto noparse; if (unlikely(events & EPOLLERR)) {