Browse Source

Make client event handling a oneshot event that is rearmed to allow us to thread the work

master
Con Kolivas 9 years ago
parent
commit
f0e07c24a4
  1. 109
      src/connector.c

109
src/connector.c

@ -301,7 +301,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
event.data.u64 = client->id; event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
return 0; return 0;
@ -461,7 +461,7 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val)
/* Client is holding a reference count from being on the epoll list. Returns /* Client is holding a reference count from being on the epoll list. Returns
* true if we will still be receiving messages from this client. */ * true if we will still be receiving messages from this client. */
static bool __parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) static bool parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{ {
int buflen, ret; int buflen, ret;
json_t *val; json_t *val;
@ -545,12 +545,6 @@ reparse:
goto retry; goto retry;
} }
static void parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{
if (unlikely(!__parse_client_msg(ckp, cdata, client)))
invalidate_client(ckp, cdata, client);
}
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{ {
client_instance_t *client; client_instance_t *client;
@ -568,9 +562,63 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
return client; return client;
} }
static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd,
struct epoll_event *event, const uint64_t id)
{
uint32_t events = event->events;
client_instance_t *client = ref_client_by_id(cdata, id);
if (unlikely(!client)) {
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id);
return;
}
if (unlikely(client->invalid))
goto out;
/* We can have both messages and read hang ups so process the
* message first. */
if (likely(events & EPOLLIN)) {
/* Rearm the client for epoll events if we have successfully
* parsed a message from it */
if (parse_client_msg(ckp, cdata, client)) {
event->data.u64 = id;
event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event);
} else
invalidate_client(ckp, cdata, client);
}
if (unlikely(client->invalid))
goto out;
if (unlikely(events & EPOLLERR)) {
socklen_t errlen = sizeof(int);
int error = 0;
/* See what type of error this is and raise the log
* level of the message if it's unexpected. */
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error != 104) {
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
} else {
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
}
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLHUP)) {
/* Client connection reset by peer */
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLRDHUP)) {
/* Client disconnected by peer */
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
}
out:
dec_instance_ref(cdata, client);
}
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */ * handles the incoming messages */
void *receiver(void *arg) static void *receiver(void *arg)
{ {
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
@ -602,8 +650,6 @@ void *receiver(void *arg)
cksleep_ms(1); cksleep_ms(1);
while (42) { while (42) {
client_instance_t *client;
uint32_t events;
uint64_t edu64; uint64_t edu64;
while (unlikely(!cdata->accept)) while (unlikely(!cdata->accept))
@ -626,46 +672,7 @@ void *receiver(void *arg)
} }
continue; continue;
} }
client = ref_client_by_id(cdata, edu64); process_client_events(ckp, cdata, epfd, &event, edu64);
if (unlikely(!client)) {
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", edu64);
continue;
}
if (unlikely(client->invalid))
goto noparse;
/* We can have both messages and read hang ups so process the
* message first. */
events = event.events;
if (likely(events & EPOLLIN))
parse_client_msg(ckp, cdata, client);
if (unlikely(client->invalid))
goto noparse;
if (unlikely(events & EPOLLERR)) {
socklen_t errlen = sizeof(int);
int error = 0;
/* See what type of error this is and raise the log
* level of the message if it's unexpected. */
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error != 104) {
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
} else {
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
}
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLHUP)) {
/* Client connection reset by peer */
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLRDHUP)) {
/* Client disconnected by peer */
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
}
noparse:
dec_instance_ref(cdata, client);
} }
out: out:
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */

Loading…
Cancel
Save