Browse Source

Use a 2nd hashtable of connected clients indexed by fd for fast lookup

master
Con Kolivas 11 years ago
parent
commit
dc9012d777
  1. 60
      src/connector.c

60
src/connector.c

@ -22,6 +22,7 @@
#include "utlist.h" #include "utlist.h"
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
#define SOI (sizeof(int))
struct connector_instance { struct connector_instance {
cklock_t lock; cklock_t lock;
@ -34,18 +35,24 @@ typedef struct connector_instance conn_instance_t;
struct client_instance { struct client_instance {
UT_hash_handle hh; UT_hash_handle hh;
int id;
UT_hash_handle fdhh;
int fd;
struct sockaddr address; struct sockaddr address;
socklen_t address_len; socklen_t address_len;
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
int fd;
int id;
}; };
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
/* For the hashtable of all clients */ /* For the hashtable of all clients */
static client_instance_t *clients; static client_instance_t *clients;
/* A hashtable of the clients sorted by fd */
static client_instance_t *fdclients;
static int client_id; static int client_id;
struct sender_send { struct sender_send {
@ -71,7 +78,7 @@ static pthread_cond_t sender_cond;
void *acceptor(void *arg) void *acceptor(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client; client_instance_t *client, *old_client;
int fd; int fd;
rename_proc("acceptor"); rename_proc("acceptor");
@ -97,6 +104,7 @@ reaccept:
ck_wlock(&ci->lock); ck_wlock(&ci->lock);
client->id = client_id++; client->id = client_id++;
HASH_ADD_INT(clients, id, client); HASH_ADD_INT(clients, id, client);
HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client);
ci->nfds++; ci->nfds++;
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
@ -106,12 +114,22 @@ out:
} }
/* Invalidate this instance */ /* Invalidate this instance */
static void invalidate_client(ckpool_t *ckp, client_instance_t *client) static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client)
{ {
char buf[256]; char buf[256];
int fd;
close(client->fd); ck_wlock(&ci->lock);
client->fd = -1; fd = client->fd;
if (fd != -1) {
close(fd);
HASH_DELETE(fdhh, fdclients, client);
client->fd = -1;
}
ck_wunlock(&ci->lock);
if (fd == -1)
return;
sprintf(buf, "dropclient=%d", client->id); sprintf(buf, "dropclient=%d", client->id);
send_proc(&ckp->stratifier, buf); send_proc(&ckp->stratifier, buf);
} }
@ -135,7 +153,7 @@ retry:
return; return;
LOGINFO("Client fd %d disconnected", client->fd); LOGINFO("Client fd %d disconnected", client->fd);
invalidate_client(ckp, client); invalidate_client(ckp, ci, client);
return; return;
} }
client->bufofs += ret; client->bufofs += ret;
@ -148,7 +166,7 @@ reparse:
if (!eol) { if (!eol) {
if (unlikely(client->bufofs > MAX_MSGSIZE)) { if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd);
invalidate_client(ckp, client); invalidate_client(ckp, ci, client);
return; return;
} }
if (moredata) if (moredata)
@ -160,7 +178,7 @@ reparse:
buflen = eol - client->buf + 1; buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) { if (unlikely(buflen > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); LOGWARNING("Client fd %d message oversize, disconnecting", client->fd);
invalidate_client(ckp, client); invalidate_client(ckp, ci, client);
return; return;
} }
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
@ -173,7 +191,7 @@ reparse:
LOGINFO("Client id %d sent invalid json message %s", client->id, msg); LOGINFO("Client id %d sent invalid json message %s", client->id, msg);
send_client(ci, client->id, buf); send_client(ci, client->id, buf);
invalidate_client(ckp, client); invalidate_client(ckp, ci, client);
return; return;
} else { } else {
char *s; char *s;
@ -194,7 +212,7 @@ reparse:
void *receiver(void *arg) void *receiver(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client; client_instance_t *client, *tmp;
struct pollfd fds[65536]; struct pollfd fds[65536];
int ret, nfds, i; int ret, nfds, i;
@ -204,10 +222,7 @@ retry:
nfds = 0; nfds = 0;
ck_rlock(&ci->lock); ck_rlock(&ci->lock);
for (client = clients; client != NULL; client = client->hh.next) { HASH_ITER(fdhh, fdclients, client, tmp) {
/* Invalid client */
if (client->fd == -1)
continue;
fds[nfds].fd = client->fd; fds[nfds].fd = client->fd;
fds[nfds].events = POLLIN; fds[nfds].events = POLLIN;
nfds++; nfds++;
@ -229,23 +244,22 @@ repoll:
if (!ret) if (!ret)
goto retry; goto retry;
for (i = 0; i < nfds; i++) { for (i = 0; i < nfds; i++) {
int fd;
if (!(fds[i].revents & POLLIN)) if (!(fds[i].revents & POLLIN))
continue; continue;
client = NULL; client = NULL;
fd = fds[i].fd;
ck_rlock(&ci->lock); ck_rlock(&ci->lock);
for (client = clients; client != NULL; client = client->hh.next) { HASH_FIND(fdhh, fdclients, &fd, SOI, client);
if (client->fd == fds[i].fd)
break;
}
ck_runlock(&ci->lock); ck_runlock(&ci->lock);
if (!client) { if (!client) {
/* Not yet invalidated */ /* Probably already removed */
LOGDEBUG("Failed to find client with fd %d in hashtable!", fds[i].fd); LOGDEBUG("Failed to find client with polled fd %d in hashtable",
close(fds[i].fd); fd);
} else } else
parse_client_msg(ci, client); parse_client_msg(ci, client);
@ -321,7 +335,7 @@ void *sender(void *arg)
if (interrupted()) if (interrupted())
continue; continue;
LOGWARNING("Client id %d disconnected", client->id); LOGWARNING("Client id %d disconnected", client->id);
invalidate_client(ckp, client); invalidate_client(ckp, ci, client);
break; break;
} }
ofs += ret; ofs += ret;

Loading…
Cancel
Save