diff --git a/src/connector.c b/src/connector.c index 984c8f03..d3f6a442 100644 --- a/src/connector.c +++ b/src/connector.c @@ -301,7 +301,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; @@ -461,7 +461,7 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) /* Client is holding a reference count from being on the epoll list. Returns * true if we will still be receiving messages from this client. */ -static bool __parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) +static bool parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { int buflen, ret; json_t *val; @@ -545,12 +545,6 @@ reparse: goto retry; } -static void parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) -{ - if (unlikely(!__parse_client_msg(ckp, cdata, client))) - invalidate_client(ckp, cdata, client); -} - static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; @@ -568,9 +562,63 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) return client; } +static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd, + struct epoll_event *event, const uint64_t id) +{ + uint32_t events = event->events; + client_instance_t *client = ref_client_by_id(cdata, id); + + if (unlikely(!client)) { + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); + return; + } + if (unlikely(client->invalid)) + goto out; + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(events & EPOLLIN)) { + /* Rearm the client for epoll events if we have successfully + * parsed a message from it */ + if (parse_client_msg(ckp, cdata, client)) { + event->data.u64 = id; + event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event); + } else + invalidate_client(ckp, cdata, client); + } + if (unlikely(client->invalid)) + goto out; + if (unlikely(events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } +out: + dec_instance_ref(cdata, client); +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ -void *receiver(void *arg) +static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; ckpool_t *ckp = cdata->ckp; @@ -602,8 +650,6 @@ void *receiver(void *arg) cksleep_ms(1); while (42) { - client_instance_t *client; - uint32_t events; uint64_t edu64; while (unlikely(!cdata->accept)) @@ -626,46 +672,7 @@ void *receiver(void *arg) } continue; } - client = ref_client_by_id(cdata, edu64); - if (unlikely(!client)) { - LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", edu64); - continue; - } - if (unlikely(client->invalid)) - goto noparse; - /* We can have both messages and read hang ups so process the - * message first. */ - events = event.events; - if (likely(events & EPOLLIN)) - parse_client_msg(ckp, cdata, client); - if (unlikely(client->invalid)) - goto noparse; - if (unlikely(events & EPOLLERR)) { - socklen_t errlen = sizeof(int); - int error = 0; - - /* See what type of error this is and raise the log - * level of the message if it's unexpected. */ - getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error != 104) { - LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } else { - LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(events & EPOLLHUP)) { - /* Client connection reset by peer */ - LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(events & EPOLLRDHUP)) { - /* Client disconnected by peer */ - LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } -noparse: - dec_instance_ref(cdata, client); + process_client_events(ckp, cdata, epfd, &event, edu64); } out: /* We shouldn't get here unless there's an error */