From dc9012d7771a31de09c1cb9e8603256f9714a2a7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 1 May 2014 19:47:36 +1000 Subject: [PATCH] Use a 2nd hashtable of connected clients indexed by fd for fast lookup --- src/connector.c | 60 ++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/src/connector.c b/src/connector.c index 18e73d0c..4e900dad 100644 --- a/src/connector.c +++ b/src/connector.c @@ -22,6 +22,7 @@ #include "utlist.h" #define MAX_MSGSIZE 1024 +#define SOI (sizeof(int)) struct connector_instance { cklock_t lock; @@ -34,18 +35,24 @@ typedef struct connector_instance conn_instance_t; struct client_instance { UT_hash_handle hh; + int id; + + UT_hash_handle fdhh; + int fd; + struct sockaddr address; socklen_t address_len; char buf[PAGESIZE]; int bufofs; - int fd; - int id; }; typedef struct client_instance client_instance_t; /* For the hashtable of all clients */ static client_instance_t *clients; +/* A hashtable of the clients sorted by fd */ +static client_instance_t *fdclients; + static int client_id; struct sender_send { @@ -71,7 +78,7 @@ static pthread_cond_t sender_cond; void *acceptor(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - client_instance_t *client; + client_instance_t *client, *old_client; int fd; rename_proc("acceptor"); @@ -97,6 +104,7 @@ reaccept: ck_wlock(&ci->lock); client->id = client_id++; HASH_ADD_INT(clients, id, client); + HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); ci->nfds++; ck_wunlock(&ci->lock); @@ -106,12 +114,22 @@ out: } /* Invalidate this instance */ -static void invalidate_client(ckpool_t *ckp, client_instance_t *client) +static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client) { char buf[256]; + int fd; - close(client->fd); - client->fd = -1; + ck_wlock(&ci->lock); + fd = client->fd; + if (fd != -1) { + close(fd); + HASH_DELETE(fdhh, fdclients, client); + client->fd = -1; + } + ck_wunlock(&ci->lock); + + if (fd == -1) + return; sprintf(buf, "dropclient=%d", client->id); send_proc(&ckp->stratifier, buf); } @@ -135,7 +153,7 @@ retry: return; LOGINFO("Client fd %d disconnected", client->fd); - invalidate_client(ckp, client); + invalidate_client(ckp, ci, client); return; } client->bufofs += ret; @@ -148,7 +166,7 @@ reparse: if (!eol) { if (unlikely(client->bufofs > MAX_MSGSIZE)) { LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, client); + invalidate_client(ckp, ci, client); return; } if (moredata) @@ -160,7 +178,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); - invalidate_client(ckp, client); + invalidate_client(ckp, ci, client); return; } memcpy(msg, client->buf, buflen); @@ -173,7 +191,7 @@ reparse: LOGINFO("Client id %d sent invalid json message %s", client->id, msg); send_client(ci, client->id, buf); - invalidate_client(ckp, client); + invalidate_client(ckp, ci, client); return; } else { char *s; @@ -194,7 +212,7 @@ reparse: void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - client_instance_t *client; + client_instance_t *client, *tmp; struct pollfd fds[65536]; int ret, nfds, i; @@ -204,10 +222,7 @@ retry: nfds = 0; ck_rlock(&ci->lock); - for (client = clients; client != NULL; client = client->hh.next) { - /* Invalid client */ - if (client->fd == -1) - continue; + HASH_ITER(fdhh, fdclients, client, tmp) { fds[nfds].fd = client->fd; fds[nfds].events = POLLIN; nfds++; @@ -229,23 +244,22 @@ repoll: if (!ret) goto retry; for (i = 0; i < nfds; i++) { + int fd; if (!(fds[i].revents & POLLIN)) continue; client = NULL; + fd = fds[i].fd; ck_rlock(&ci->lock); - for (client = clients; client != NULL; client = client->hh.next) { - if (client->fd == fds[i].fd) - break; - } + HASH_FIND(fdhh, fdclients, &fd, SOI, client); ck_runlock(&ci->lock); if (!client) { - /* Not yet invalidated */ - LOGDEBUG("Failed to find client with fd %d in hashtable!", fds[i].fd); - close(fds[i].fd); + /* Probably already removed */ + LOGDEBUG("Failed to find client with polled fd %d in hashtable", + fd); } else parse_client_msg(ci, client); @@ -321,7 +335,7 @@ void *sender(void *arg) if (interrupted()) continue; LOGWARNING("Client id %d disconnected", client->id); - invalidate_client(ckp, client); + invalidate_client(ckp, ci, client); break; } ofs += ret;