kanoi 10 years ago
parent
commit
1ce6a34a82
  1. 101
      src/connector.c

101
src/connector.c

@ -23,6 +23,8 @@
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
typedef struct client_instance client_instance_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
@ -34,8 +36,8 @@ struct client_instance {
int ref; int ref;
/* For dead_clients list */ /* For dead_clients list */
struct client_instance *next; client_instance_t *next;
struct client_instance *prev; client_instance_t *prev;
struct sockaddr address; struct sockaddr address;
char address_name[INET6_ADDRSTRLEN]; char address_name[INET6_ADDRSTRLEN];
@ -49,8 +51,6 @@ struct client_instance {
bool passthrough; bool passthrough;
}; };
typedef struct client_instance client_instance_t;
struct sender_send { struct sender_send {
struct sender_send *next; struct sender_send *next;
struct sender_send *prev; struct sender_send *prev;
@ -72,6 +72,8 @@ struct connector_data {
int *serverfd; int *serverfd;
/* All time count of clients connected */ /* All time count of clients connected */
int nfds; int nfds;
/* The epoll fd */
int epfd;
bool accept; bool accept;
pthread_t pth_sender; pthread_t pth_sender;
@ -81,6 +83,8 @@ struct connector_data {
client_instance_t *clients; client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */ /* Linked list of dead clients no longer in use but may still have references */
client_instance_t *dead_clients; client_instance_t *dead_clients;
/* Linked list of client structures we can reuse */
client_instance_t *recycled_clients;
int clients_generated; int clients_generated;
int dead_generated; int dead_generated;
@ -120,6 +124,42 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
} }
/* Recruit a client structure from a recycled one if available, creating a
* new structure only if we have none to reuse. */
static client_instance_t *recruit_client(cdata_t *cdata)
{
client_instance_t *client = NULL;
ck_wlock(&cdata->lock);
if (cdata->recycled_clients) {
client = cdata->recycled_clients;
DL_DELETE(cdata->recycled_clients, client);
} else
cdata->clients_generated++;
ck_wunlock(&cdata->lock);
if (!client) {
LOGDEBUG("Connector created new client instance");
client = ckzalloc(sizeof(client_instance_t));
} else
LOGDEBUG("Connector recycled client instance");
return client;
}
static void __recycle_client(cdata_t *cdata, client_instance_t *client)
{
memset(client, 0, sizeof(client_instance_t));
client->id = -1;
DL_APPEND(cdata->recycled_clients, client);
}
static void recycle_client(cdata_t *cdata, client_instance_t *client)
{
ck_wlock(&cdata->lock);
__recycle_client(cdata, client);
ck_wunlock(&cdata->lock);
}
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
@ -140,7 +180,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
} }
sockd = cdata->serverfd[server]; sockd = cdata->serverfd[server];
client = ckzalloc(sizeof(client_instance_t)); client = recruit_client(cdata);
client->server = server; client->server = server;
address_len = sizeof(client->address); address_len = sizeof(client->address);
fd = accept(sockd, &client->address, &address_len); fd = accept(sockd, &client->address, &address_len);
@ -152,7 +192,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
return 0; return 0;
} }
LOGERR("Failed to accept on socket %d in acceptor", sockd); LOGERR("Failed to accept on socket %d in acceptor", sockd);
dealloc(client); recycle_client(cdata, client);
return -1; return -1;
} }
@ -174,7 +214,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
LOGWARNING("Unknown INET type for client %d on socket %d", LOGWARNING("Unknown INET type for client %d on socket %d",
cdata->nfds, fd); cdata->nfds, fd);
Close(fd); Close(fd);
free(client); recycle_client(cdata, client);
return 0; return 0;
} }
@ -189,7 +229,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
event.events = EPOLLIN; event.events = EPOLLIN;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
free(client); recycle_client(cdata, client);
return 0; return 0;
} }
@ -199,7 +239,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
cdata->clients_generated++;
client->id = cdata->client_id++; client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client); HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++; cdata->nfds++;
@ -219,6 +258,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
if (fd != -1) { if (fd != -1) {
client_id = client->id; client_id = client->id;
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL);
Close(client->fd); Close(client->fd);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
@ -263,8 +303,8 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
if (!client->ref) { if (!client->ref) {
DL_DELETE(cdata->dead_clients, client); DL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %"PRId64, client->id); LOGINFO("Connector recycling client %"PRId64, client->id);
dealloc(client); __recycle_client(cdata, client);
} }
} }
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
@ -373,6 +413,19 @@ reparse:
goto retry; goto retry;
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */ * handles the incoming messages */
void *receiver(void *arg) void *receiver(void *arg)
@ -386,7 +439,7 @@ void *receiver(void *arg)
rename_proc("creceiver"); rename_proc("creceiver");
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0) { if (epfd < 0) {
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL; return NULL;
@ -442,13 +495,18 @@ void *receiver(void *arg)
continue; continue;
} }
client = event.data.ptr; client = event.data.ptr;
/* Recheck this client still exists in the same form when it
* was queued. */
client = ref_client_by_id(cdata, client->id);
if (unlikely(!client))
continue;
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
/* Client disconnected */ /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd); LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(cdata->pi->ckp, cdata, client); invalidate_client(cdata->pi->ckp, cdata, client);
continue; } else
} parse_client_msg(cdata, client);
parse_client_msg(cdata, client); dec_instance_ref(cdata, client);
} }
return NULL; return NULL;
} }
@ -609,19 +667,6 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{ {
char *buf; char *buf;

Loading…
Cancel
Save