Browse Source

Rewrite connector receive thread to use epoll and remove associated now unnecessary fd hashtable

master
Con Kolivas 10 years ago
parent
commit
42fc9ca8f6
  1. 2
      configure.ac
  2. 159
      src/connector.c

2
configure.ac

@ -37,7 +37,7 @@ AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h)
AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h)
AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h)
AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
PTHREAD_LIBS="-lpthread" PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm" MATH_LIBS="-lm"

159
src/connector.c

@ -11,7 +11,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/poll.h> #include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -38,9 +38,6 @@ struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
int64_t id; int64_t id;
/* For fdclients hashtable */
UT_hash_handle fdhh;
int fd; int fd;
/* For dead_clients list */ /* For dead_clients list */
@ -59,8 +56,6 @@ typedef struct client_instance client_instance_t;
/* For the hashtable of all clients */ /* For the hashtable of all clients */
static client_instance_t *clients; static client_instance_t *clients;
/* A hashtable of the clients sorted by fd */
static client_instance_t *fdclients;
/* Linked list of dead clients no longer in use but may still have references */ /* Linked list of dead clients no longer in use but may still have references */
static client_instance_t *dead_clients; static client_instance_t *dead_clients;
@ -85,12 +80,15 @@ static sender_send_t *delayed_sends;
static pthread_mutex_t sender_lock; static pthread_mutex_t sender_lock;
static pthread_cond_t sender_cond; static pthread_cond_t sender_cond;
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client);
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(conn_instance_t *ci) static int accept_client(conn_instance_t *ci, int epfd)
{ {
client_instance_t *client, *old_client;
ckpool_t *ckp = ci->pi->ckp; ckpool_t *ckp = ci->pi->ckp;
client_instance_t *client;
struct epoll_event event;
int fd, port, no_clients; int fd, port, no_clients;
socklen_t address_len; socklen_t address_len;
@ -98,7 +96,7 @@ static int accept_client(conn_instance_t *ci)
no_clients = HASH_COUNT(clients); no_clients = HASH_COUNT(clients);
ck_runlock(&ci->lock); ck_runlock(&ci->lock);
if (ckp->maxclients && no_clients >= ckp->maxclients) { if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) {
LOGWARNING("Server full with %d clients", no_clients); LOGWARNING("Server full with %d clients", no_clients);
return 0; return 0;
} }
@ -143,17 +141,26 @@ static int accept_client(conn_instance_t *ci)
keep_sockalive(fd); keep_sockalive(fd);
nolinger_socket(fd); nolinger_socket(fd);
LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
ci->nfds, fd, no_clients, client->address_name, port);
client->fd = fd; client->fd = fd;
event.data.ptr = client;
event.events = EPOLLIN;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client");
free(client);
return 0;
}
ck_wlock(&ci->lock); ck_wlock(&ci->lock);
client->id = client_id++; client->id = client_id++;
HASH_ADD_I64(clients, id, client); HASH_ADD_I64(clients, id, client);
HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client);
ci->nfds++; ci->nfds++;
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
parse_client_msg(ci, client);
return 1; return 1;
} }
@ -166,7 +173,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client)
if (fd != -1) { if (fd != -1) {
Close(client->fd); Close(client->fd);
HASH_DEL(clients, client); HASH_DEL(clients, client);
HASH_DELETE(fdhh, fdclients, client);
LL_PREPEND(dead_clients, client); LL_PREPEND(dead_clients, client);
} }
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
@ -292,104 +298,65 @@ reparse:
void *receiver(void *arg) void *receiver(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client, *tmp; struct epoll_event *events, event;
int ret, nfds, i, maxfds = 1; client_instance_t *client;
bool update, maxconn = true; bool maxconn = true;
struct pollfd *fds; int ret, epfd;
rename_proc("creceiver"); rename_proc("creceiver");
fds = ckalloc(sizeof(struct pollfd)); epfd = epoll_create1(EPOLL_CLOEXEC);
/* First fd is reserved for the accepting socket */ if (epfd < 0) {
fds[0].fd = ci->serverfd; LOGEMERG("FATAL: Failed to create epoll in receiver");
fds[0].events = POLLIN; return NULL;
fds[0].revents = 0; }
rebuild_fds: event.data.fd = ci->serverfd;
update = false; event.events = EPOLLIN;
nfds = 1; events = ckalloc(sizeof(struct epoll_event));
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event);
ck_rlock(&ci->lock); if (ret < 0) {
HASH_ITER(fdhh, fdclients, client, tmp) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
if (unlikely(client->fd == -1)) { goto out;
LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!",
client->id);
continue;
}
if (nfds >= maxfds) {
maxfds = nfds + 1;
fds = realloc(fds, sizeof(struct pollfd) * maxfds);
if (unlikely(!fds)) {
LOGEMERG("FATAL: Failed to realloc fds in receiver!");
goto out;
}
}
fds[nfds].fd = client->fd;
fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
nfds++;
} }
ck_runlock(&ci->lock);
repoll: repoll:
while (!ci->accept) while (!ci->accept)
cksleep_ms(100); cksleep_ms(100);
ret = epoll_wait(epfd, events, 1, 1000);
ret = poll(fds, nfds, 1000); if (unlikely(ret == -1)) {
if (unlikely(ret < 0)) { LOGEMERG("FATAL: Failed to epoll_wait in receiver");
LOGERR("Failed to poll in receiver");
goto out; goto out;
} }
if (unlikely(!ret)) {
for (i = 0; i < nfds && ret > 0; i++) { if (unlikely(maxconn)) {
int fd, accepted; /* When we first start we listen to as many connections as
* possible. Once we stop receiving connections we drop the
if (!fds[i].revents) * listen to the minimum to effectively ratelimit how fast we
continue; * can receive connections. */
LOGDEBUG("Dropping listen backlog to 0");
/* Reset for the next poll pass */ maxconn = false;
fds[i].events = POLLIN; listen(ci->serverfd, 0);
fds[i].revents = 0;
--ret;
/* Is this the listening server socket? */
if (i == 0) {
accepted = accept_client(ci);
if (unlikely(accepted < 0))
goto out;
if (accepted)
update = true;
continue;
} }
goto repoll;
client = NULL;
fd = fds[i].fd;
ck_rlock(&ci->lock);
HASH_FIND(fdhh, fdclients, &fd, SOI, client);
ck_runlock(&ci->lock);
if (!client) {
/* Probably already removed, remove lazily */
LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable",
i, fd);
update = true;
} else
parse_client_msg(ci, client);
} }
if (events->data.fd == ci->serverfd) {
if (update) ret = accept_client(ci, epfd);
goto rebuild_fds; if (unlikely(ret < 0)) {
else if (unlikely(maxconn)) { LOGEMERG("FATAL: Failed to accept_client in receiver");
/* When we first start we listen to as many connections as goto out;
* possible. Once we stop receiving connections we drop the }
* listen to the minimum to effectively ratelimit how fast we goto repoll;
* can receive connections. */ }
maxconn = false; client = events->data.ptr;
listen(ci->serverfd, 0); if ((events->events & EPOLLERR) || (events->events & EPOLLHUP)) {
/* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(ci->pi->ckp, ci, client);
goto repoll;
} }
parse_client_msg(ci, client);
goto repoll; goto repoll;
out: out:
free(fds); free(events);
return NULL; return NULL;
} }

Loading…
Cancel
Save