diff --git a/configure.ac b/configure.ac index 218a2392..65d311c0 100644 --- a/configure.ac +++ b/configure.ac @@ -37,7 +37,7 @@ AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) -AC_CHECK_HEADERS(libpq-fe.h postgresql/libpq-fe.h grp.h) +AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) PTHREAD_LIBS="-lpthread" MATH_LIBS="-lm" diff --git a/src/connector.c b/src/connector.c index d79500e6..ba338860 100644 --- a/src/connector.c +++ b/src/connector.c @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -38,9 +38,6 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; - - /* For fdclients hashtable */ - UT_hash_handle fdhh; int fd; /* For dead_clients list */ @@ -59,8 +56,6 @@ typedef struct client_instance client_instance_t; /* For the hashtable of all clients */ static client_instance_t *clients; -/* A hashtable of the clients sorted by fd */ -static client_instance_t *fdclients; /* Linked list of dead clients no longer in use but may still have references */ static client_instance_t *dead_clients; @@ -85,12 +80,15 @@ static sender_send_t *delayed_sends; static pthread_mutex_t sender_lock; static pthread_cond_t sender_cond; +static void parse_client_msg(conn_instance_t *ci, client_instance_t *client); + /* Accepts incoming connections on the server socket and generates client * instances */ -static int accept_client(conn_instance_t *ci) +static int accept_client(conn_instance_t *ci, int epfd) { - client_instance_t *client, *old_client; ckpool_t *ckp = ci->pi->ckp; + client_instance_t *client; + struct epoll_event event; int fd, port, no_clients; socklen_t address_len; @@ -98,7 +96,7 @@ static int accept_client(conn_instance_t *ci) no_clients = HASH_COUNT(clients); ck_runlock(&ci->lock); - if (ckp->maxclients && no_clients >= ckp->maxclients) { + if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) { LOGWARNING("Server full with %d clients", no_clients); return 0; } @@ -143,17 +141,26 @@ static int accept_client(conn_instance_t *ci) keep_sockalive(fd); nolinger_socket(fd); - LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); + LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", + ci->nfds, fd, no_clients, client->address_name, port); client->fd = fd; + event.data.ptr = client; + event.events = EPOLLIN; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + LOGERR("Failed to epoll_ctl add in accept_client"); + free(client); + return 0; + } ck_wlock(&ci->lock); client->id = client_id++; HASH_ADD_I64(clients, id, client); - HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); ci->nfds++; ck_wunlock(&ci->lock); + parse_client_msg(ci, client); + return 1; } @@ -166,7 +173,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) if (fd != -1) { Close(client->fd); HASH_DEL(clients, client); - HASH_DELETE(fdhh, fdclients, client); LL_PREPEND(dead_clients, client); } ck_wunlock(&ci->lock); @@ -292,104 +298,65 @@ reparse: void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - client_instance_t *client, *tmp; - int ret, nfds, i, maxfds = 1; - bool update, maxconn = true; - struct pollfd *fds; + struct epoll_event *events, event; + client_instance_t *client; + bool maxconn = true; + int ret, epfd; rename_proc("creceiver"); - fds = ckalloc(sizeof(struct pollfd)); - /* First fd is reserved for the accepting socket */ - fds[0].fd = ci->serverfd; - fds[0].events = POLLIN; - fds[0].revents = 0; -rebuild_fds: - update = false; - nfds = 1; - - ck_rlock(&ci->lock); - HASH_ITER(fdhh, fdclients, client, tmp) { - if (unlikely(client->fd == -1)) { - LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!", - client->id); - continue; - } - if (nfds >= maxfds) { - maxfds = nfds + 1; - fds = realloc(fds, sizeof(struct pollfd) * maxfds); - if (unlikely(!fds)) { - LOGEMERG("FATAL: Failed to realloc fds in receiver!"); - goto out; - } - } - fds[nfds].fd = client->fd; - fds[nfds].events = POLLIN; - fds[nfds].revents = 0; - nfds++; + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0) { + LOGEMERG("FATAL: Failed to create epoll in receiver"); + return NULL; + } + event.data.fd = ci->serverfd; + event.events = EPOLLIN; + events = ckalloc(sizeof(struct epoll_event)); + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event); + if (ret < 0) { + LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); + goto out; } - ck_runlock(&ci->lock); - repoll: while (!ci->accept) cksleep_ms(100); - - ret = poll(fds, nfds, 1000); - if (unlikely(ret < 0)) { - LOGERR("Failed to poll in receiver"); + ret = epoll_wait(epfd, events, 1, 1000); + if (unlikely(ret == -1)) { + LOGEMERG("FATAL: Failed to epoll_wait in receiver"); goto out; } - - for (i = 0; i < nfds && ret > 0; i++) { - int fd, accepted; - - if (!fds[i].revents) - continue; - - /* Reset for the next poll pass */ - fds[i].events = POLLIN; - fds[i].revents = 0; - --ret; - - /* Is this the listening server socket? */ - if (i == 0) { - accepted = accept_client(ci); - if (unlikely(accepted < 0)) - goto out; - if (accepted) - update = true; - continue; + if (unlikely(!ret)) { + if (unlikely(maxconn)) { + /* When we first start we listen to as many connections as + * possible. Once we stop receiving connections we drop the + * listen to the minimum to effectively ratelimit how fast we + * can receive connections. */ + LOGDEBUG("Dropping listen backlog to 0"); + maxconn = false; + listen(ci->serverfd, 0); } - - client = NULL; - fd = fds[i].fd; - - ck_rlock(&ci->lock); - HASH_FIND(fdhh, fdclients, &fd, SOI, client); - ck_runlock(&ci->lock); - - if (!client) { - /* Probably already removed, remove lazily */ - LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable", - i, fd); - update = true; - } else - parse_client_msg(ci, client); + goto repoll; } - - if (update) - goto rebuild_fds; - else if (unlikely(maxconn)) { - /* When we first start we listen to as many connections as - * possible. Once we stop receiving connections we drop the - * listen to the minimum to effectively ratelimit how fast we - * can receive connections. */ - maxconn = false; - listen(ci->serverfd, 0); + if (events->data.fd == ci->serverfd) { + ret = accept_client(ci, epfd); + if (unlikely(ret < 0)) { + LOGEMERG("FATAL: Failed to accept_client in receiver"); + goto out; + } + goto repoll; + } + client = events->data.ptr; + if ((events->events & EPOLLERR) || (events->events & EPOLLHUP)) { + /* Client disconnected */ + LOGDEBUG("Client fd %d HUP in epoll", client->fd); + invalidate_client(ci->pi->ckp, ci, client); + goto repoll; } + parse_client_msg(ci, client); goto repoll; out: - free(fds); + free(events); return NULL; }