diff --git a/src/connector.c b/src/connector.c index 2c1fb215..a94ee304 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,12 +29,17 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; + + /* fd cannot be changed while a ref is held */ int fd; /* Reference count for when this instance is used outside of the * connector_data lock */ int ref; + /* Have we disabled this client to be removed when there are no refs? */ + bool invalid; + /* For dead_clients list */ client_instance_t *next; client_instance_t *prev; @@ -229,7 +234,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - client->fd = fd; event.data.u64 = client->id; event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { @@ -241,6 +245,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ __inc_instance_ref(client); + client->fd = fd; return 1; } @@ -249,16 +254,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int drop_client(cdata_t *cdata, client_instance_t *client) { int64_t client_id = 0; - int fd; + int fd = -1; ck_wlock(&cdata->lock); - fd = client->fd; - if (fd != -1) { + if (!client->invalid) { + client->invalid = true; client_id = client->id; - + fd = client->fd; epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); - nolinger_socket(fd); - Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the @@ -324,6 +327,11 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c if (!client->ref) { DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector recycling client %"PRId64, client->id); + /* We only close the client fd once we're sure there + * are no references to it left to prevent fds being + * reused on new and old clients. */ + nolinger_socket(client->fd); + Close(client->fd); __recycle_client(cdata, client); } } @@ -405,7 +413,7 @@ reparse: /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ - if (likely(client->fd != -1)) { + if (likely(!client->invalid)) { if (ckp->passthrough) send_proc(ckp->generator, s); else @@ -427,7 +435,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client) + if (client && !client->invalid) __inc_instance_ref(client); ck_wunlock(&cdata->lock); @@ -506,13 +514,13 @@ void *receiver(void *arg) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; /* We can have both messages and read hang ups so process the * message first. */ if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; if (unlikely(event.events & EPOLLERR)) { socklen_t errlen = sizeof(int); @@ -602,17 +610,18 @@ void *sender(void *arg) continue; } client = sender_send->client; + if (unlikely(client->invalid)) { + LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); + goto contfree; + } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ck_rlock(&cdata->lock); fd = client->fd; if (!ret) ret = wait_write_select(fd, 0); - ck_runlock(&cdata->lock); - if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -655,7 +664,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) { sender_send_t *sender_send; client_instance_t *client; - int fd = -1, len; + int len; if (unlikely(!buf)) { LOGWARNING("Connector send_client sent a null buffer"); @@ -670,25 +679,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) { - fd = client->fd; - /* Grab a reference to this client until the sender_send has - * completed processing. */ + /* Grab a reference to this client until the sender_send has + * completed processing. */ + if (likely(client)) __inc_instance_ref(client); - } ck_wunlock(&cdata->lock); - if (unlikely(fd == -1)) { + if (unlikely(!client)) { ckpool_t *ckp = cdata->ckp; - if (client) { - /* This shouldn't happen */ - LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); - invalidate_client(ckp, cdata, client); - } else { - LOGINFO("Connector failed to find client id %"PRId64" to send to", id); - stratifier_drop_id(ckp, id); - } + LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); free(buf); return; }