diff --git a/src/connector.c b/src/connector.c index e2a743a4..424483a8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -72,6 +72,8 @@ struct connector_data { int *serverfd; /* All time count of clients connected */ int nfds; + /* The epoll fd */ + int epfd; bool accept; pthread_t pth_sender; @@ -147,6 +149,7 @@ static client_instance_t *recruit_client(cdata_t *cdata) static void __recycle_client(cdata_t *cdata, client_instance_t *client) { memset(client, 0, sizeof(client_instance_t)); + client->id = -1; DL_APPEND(cdata->recycled_clients, client); } @@ -255,6 +258,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL); Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); @@ -409,6 +413,19 @@ reparse: goto retry; } +static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) +{ + client_instance_t *client; + + ck_wlock(&cdata->lock); + HASH_FIND_I64(cdata->clients, &id, client); + if (client) + __inc_instance_ref(client); + ck_wunlock(&cdata->lock); + + return client; +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ void *receiver(void *arg) @@ -422,7 +439,7 @@ void *receiver(void *arg) rename_proc("creceiver"); - epfd = epoll_create1(EPOLL_CLOEXEC); + epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); return NULL; @@ -478,13 +495,18 @@ void *receiver(void *arg) continue; } client = event.data.ptr; + /* Recheck this client still exists in the same form when it + * was queued. */ + client = ref_client_by_id(cdata, client->id); + if (unlikely(!client)) + continue; if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { /* Client disconnected */ LOGDEBUG("Client fd %d HUP in epoll", client->fd); invalidate_client(cdata->pi->ckp, cdata, client); - continue; - } - parse_client_msg(cdata, client); + } else + parse_client_msg(cdata, client); + dec_instance_ref(cdata, client); } return NULL; } @@ -645,19 +667,6 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) mutex_unlock(&cdata->sender_lock); } -static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) -{ - client_instance_t *client; - - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); - if (client) - __inc_instance_ref(client); - ck_wunlock(&cdata->lock); - - return client; -} - static void passthrough_client(cdata_t *cdata, client_instance_t *client) { char *buf;