Browse Source

Use the client reference count in the connector to protect the client fd, closing it only once there are no more references to it

master
Con Kolivas 10 years ago
parent
commit
0395dd052f
  1. 51
      src/connector.c

51
src/connector.c

@ -29,12 +29,17 @@ struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
int64_t id; int64_t id;
/* fd cannot be changed while a ref is held */
int fd; int fd;
/* Reference count for when this instance is used outside of the /* Reference count for when this instance is used outside of the
* connector_data lock */ * connector_data lock */
int ref; int ref;
/* Have we disabled this client to be removed when there are no refs? */
bool invalid;
/* For dead_clients list */ /* For dead_clients list */
client_instance_t *next; client_instance_t *next;
client_instance_t *prev; client_instance_t *prev;
@ -229,7 +234,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
cdata->nfds++; cdata->nfds++;
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
client->fd = fd;
event.data.u64 = client->id; event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
@ -241,6 +245,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
* to it. We drop that reference when the socket is closed which * to it. We drop that reference when the socket is closed which
* removes it automatically from the epoll list. */ * removes it automatically from the epoll list. */
__inc_instance_ref(client); __inc_instance_ref(client);
client->fd = fd;
return 1; return 1;
} }
@ -249,16 +254,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
static int drop_client(cdata_t *cdata, client_instance_t *client) static int drop_client(cdata_t *cdata, client_instance_t *client)
{ {
int64_t client_id = 0; int64_t client_id = 0;
int fd; int fd = -1;
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
fd = client->fd; if (!client->invalid) {
if (fd != -1) { client->invalid = true;
client_id = client->id; client_id = client->id;
fd = client->fd;
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL);
nolinger_socket(fd);
Close(client->fd);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the /* This is the reference to this client's presence in the
@ -324,6 +327,11 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
if (!client->ref) { if (!client->ref) {
DL_DELETE(cdata->dead_clients, client); DL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector recycling client %"PRId64, client->id); LOGINFO("Connector recycling client %"PRId64, client->id);
/* We only close the client fd once we're sure there
* are no references to it left to prevent fds being
* reused on new and old clients. */
nolinger_socket(client->fd);
Close(client->fd);
__recycle_client(cdata, client); __recycle_client(cdata, client);
} }
} }
@ -405,7 +413,7 @@ reparse:
/* Do not send messages of clients we've already dropped. We /* Do not send messages of clients we've already dropped. We
* do this unlocked as the occasional false negative can be * do this unlocked as the occasional false negative can be
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(client->fd != -1)) { if (likely(!client->invalid)) {
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
else else
@ -427,7 +435,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client); HASH_FIND_I64(cdata->clients, &id, client);
if (client) if (client && !client->invalid)
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
@ -506,13 +514,13 @@ void *receiver(void *arg)
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64);
continue; continue;
} }
if (unlikely(client->fd == -1)) if (unlikely(client->invalid))
goto noparse; goto noparse;
/* We can have both messages and read hang ups so process the /* We can have both messages and read hang ups so process the
* message first. */ * message first. */
if (likely(event.events & EPOLLIN)) if (likely(event.events & EPOLLIN))
parse_client_msg(cdata, client); parse_client_msg(cdata, client);
if (unlikely(client->fd == -1)) if (unlikely(client->invalid))
goto noparse; goto noparse;
if (unlikely(event.events & EPOLLERR)) { if (unlikely(event.events & EPOLLERR)) {
socklen_t errlen = sizeof(int); socklen_t errlen = sizeof(int);
@ -602,17 +610,18 @@ void *sender(void *arg)
continue; continue;
} }
client = sender_send->client; client = sender_send->client;
if (unlikely(client->invalid)) {
LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id);
goto contfree;
}
/* If this socket is not ready to receive data from us, put the /* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout * send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll * to poll to either look for a client that is ready or poll
* select on this one */ * select on this one */
ck_rlock(&cdata->lock);
fd = client->fd; fd = client->fd;
if (!ret) if (!ret)
ret = wait_write_select(fd, 0); ret = wait_write_select(fd, 0);
ck_runlock(&cdata->lock);
if (ret < 1) { if (ret < 1) {
if (ret < 0) { if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
@ -655,7 +664,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
{ {
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
int fd = -1, len; int len;
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer"); LOGWARNING("Connector send_client sent a null buffer");
@ -670,25 +679,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client); HASH_FIND_I64(cdata->clients, &id, client);
if (likely(client)) {
fd = client->fd;
/* Grab a reference to this client until the sender_send has /* Grab a reference to this client until the sender_send has
* completed processing. */ * completed processing. */
if (likely(client))
__inc_instance_ref(client); __inc_instance_ref(client);
}
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
if (unlikely(fd == -1)) { if (unlikely(!client)) {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
if (client) {
/* This shouldn't happen */
LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id);
invalidate_client(ckp, cdata, client);
} else {
LOGINFO("Connector failed to find client id %"PRId64" to send to", id); LOGINFO("Connector failed to find client id %"PRId64" to send to", id);
stratifier_drop_id(ckp, id); stratifier_drop_id(ckp, id);
}
free(buf); free(buf);
return; return;
} }

Loading…
Cancel
Save