diff --git a/src/connector.c b/src/connector.c index 92665bed..424483a8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -23,6 +23,8 @@ #define MAX_MSGSIZE 1024 +typedef struct client_instance client_instance_t; + struct client_instance { /* For clients hashtable */ UT_hash_handle hh; @@ -34,8 +36,8 @@ struct client_instance { int ref; /* For dead_clients list */ - struct client_instance *next; - struct client_instance *prev; + client_instance_t *next; + client_instance_t *prev; struct sockaddr address; char address_name[INET6_ADDRSTRLEN]; @@ -49,8 +51,6 @@ struct client_instance { bool passthrough; }; -typedef struct client_instance client_instance_t; - struct sender_send { struct sender_send *next; struct sender_send *prev; @@ -72,6 +72,8 @@ struct connector_data { int *serverfd; /* All time count of clients connected */ int nfds; + /* The epoll fd */ + int epfd; bool accept; pthread_t pth_sender; @@ -81,6 +83,8 @@ struct connector_data { client_instance_t *clients; /* Linked list of dead clients no longer in use but may still have references */ client_instance_t *dead_clients; + /* Linked list of client structures we can reuse */ + client_instance_t *recycled_clients; int clients_generated; int dead_generated; @@ -120,6 +124,42 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client) ck_wunlock(&cdata->lock); } +/* Recruit a client structure from a recycled one if available, creating a + * new structure only if we have none to reuse. */ +static client_instance_t *recruit_client(cdata_t *cdata) +{ + client_instance_t *client = NULL; + + ck_wlock(&cdata->lock); + if (cdata->recycled_clients) { + client = cdata->recycled_clients; + DL_DELETE(cdata->recycled_clients, client); + } else + cdata->clients_generated++; + ck_wunlock(&cdata->lock); + + if (!client) { + LOGDEBUG("Connector created new client instance"); + client = ckzalloc(sizeof(client_instance_t)); + } else + LOGDEBUG("Connector recycled client instance"); + return client; +} + +static void __recycle_client(cdata_t *cdata, client_instance_t *client) +{ + memset(client, 0, sizeof(client_instance_t)); + client->id = -1; + DL_APPEND(cdata->recycled_clients, client); +} + +static void recycle_client(cdata_t *cdata, client_instance_t *client) +{ + ck_wlock(&cdata->lock); + __recycle_client(cdata, client); + ck_wunlock(&cdata->lock); +} + /* Accepts incoming connections on the server socket and generates client * instances */ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) @@ -140,7 +180,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } sockd = cdata->serverfd[server]; - client = ckzalloc(sizeof(client_instance_t)); + client = recruit_client(cdata); client->server = server; address_len = sizeof(client->address); fd = accept(sockd, &client->address, &address_len); @@ -152,7 +192,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) return 0; } LOGERR("Failed to accept on socket %d in acceptor", sockd); - dealloc(client); + recycle_client(cdata, client); return -1; } @@ -174,7 +214,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) LOGWARNING("Unknown INET type for client %d on socket %d", cdata->nfds, fd); Close(fd); - free(client); + recycle_client(cdata, client); return 0; } @@ -189,7 +229,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) event.events = EPOLLIN; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); - free(client); + recycle_client(cdata, client); return 0; } @@ -199,7 +239,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) __inc_instance_ref(client); ck_wlock(&cdata->lock); - cdata->clients_generated++; client->id = cdata->client_id++; HASH_ADD_I64(cdata->clients, id, client); cdata->nfds++; @@ -219,6 +258,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL); Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); @@ -263,8 +303,8 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { if (!client->ref) { DL_DELETE(cdata->dead_clients, client); - LOGINFO("Connector discarding client %"PRId64, client->id); - dealloc(client); + LOGINFO("Connector recycling client %"PRId64, client->id); + __recycle_client(cdata, client); } } ck_wunlock(&cdata->lock); @@ -373,6 +413,19 @@ reparse: goto retry; } +static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) +{ + client_instance_t *client; + + ck_wlock(&cdata->lock); + HASH_FIND_I64(cdata->clients, &id, client); + if (client) + __inc_instance_ref(client); + ck_wunlock(&cdata->lock); + + return client; +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ void *receiver(void *arg) @@ -386,7 +439,7 @@ void *receiver(void *arg) rename_proc("creceiver"); - epfd = epoll_create1(EPOLL_CLOEXEC); + epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); return NULL; @@ -442,13 +495,18 @@ void *receiver(void *arg) continue; } client = event.data.ptr; + /* Recheck this client still exists in the same form when it + * was queued. */ + client = ref_client_by_id(cdata, client->id); + if (unlikely(!client)) + continue; if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { /* Client disconnected */ LOGDEBUG("Client fd %d HUP in epoll", client->fd); invalidate_client(cdata->pi->ckp, cdata, client); - continue; - } - parse_client_msg(cdata, client); + } else + parse_client_msg(cdata, client); + dec_instance_ref(cdata, client); } return NULL; } @@ -609,19 +667,6 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) mutex_unlock(&cdata->sender_lock); } -static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) -{ - client_instance_t *client; - - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); - if (client) - __inc_instance_ref(client); - ck_wunlock(&cdata->lock); - - return client; -} - static void passthrough_client(cdata_t *cdata, client_instance_t *client) { char *buf;