diff --git a/src/connector.c b/src/connector.c index 7d341519..9955be2d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -20,6 +20,8 @@ #include "libckpool.h" #include "uthash.h" +#define MAX_MSGSIZE 1024 + struct connector_instance { cklock_t lock; proc_instance_t *pi; @@ -36,7 +38,6 @@ struct client_instance { char buf[PAGESIZE]; int bufofs; int fd; - int id; }; typedef struct client_instance client_instance_t; @@ -70,7 +71,7 @@ retry: client->fd = fd; ck_wlock(&ci->lock); - HASH_ADD_INT(clients, id, client); + HASH_ADD_INT(clients, fd, client); ci->nfds++; ck_wunlock(&ci->lock); @@ -79,6 +80,68 @@ out: return NULL; } +/* Invalidate this instance */ +static void invalidate_client(client_instance_t *client) +{ + close(client->fd); + client->fd = -1; +} + +void parse_client_msg(conn_instance_t *ci, client_instance_t *client) +{ + char msg[PAGESIZE], *eol; + int buflen, ret; + bool moredata; + +retry: + buflen = PAGESIZE - client->bufofs; + ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); + if (ret < 1) { + /* If we're reading on the first pass, there is supposed to be + * data waiting for us according to poll() so no data implies + * a lost connection. On repeat reads may have just read all + * the data */ + if (moredata) + return; + + LOGINFO("Client fd %d disconnected", client->fd); + invalidate_client(client); + return; + } + client->bufofs += ret; + if (client->bufofs == PAGESIZE) + moredata = true; + else + moredata = false; +reparse: + eol = memchr(client->buf, '\n', client->bufofs); + if (!eol) { + if (unlikely(client->bufofs > MAX_MSGSIZE)) { + LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); + invalidate_client(client); + return; + } + if (moredata) + goto retry; + return; + } + + /* Do something useful with this message now */ + buflen = eol - client->buf + 1; + if (unlikely(buflen > MAX_MSGSIZE)) { + LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); + invalidate_client(client); + return; + } + memcpy(msg, client->buf, buflen); + msg[buflen] = 0; + client->bufofs -= buflen; + memmove(client->buf, client->buf + buflen, client->bufofs); + LOGWARNING("Client fd %d sent message %s", client->fd, msg); + if (client->bufofs) + goto reparse; +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ void *receiver(void *arg) @@ -87,19 +150,18 @@ void *receiver(void *arg) client_instance_t *client, *tmp; struct pollfd fds[65536]; int ret, nfds, i; - connsock_t cs; rename_proc("receiver"); - memset(&cs, 0, sizeof(cs)); retry: - dealloc(cs.buf); - memset(fds, 0, sizeof(fds)); nfds = 0; ck_rlock(&ci->lock); HASH_ITER(hh, clients, client, tmp) { + /* Invalid client */ + if (client->fd == -1) + continue; fds[nfds].fd = client->fd; fds[nfds].events = POLLIN; nfds++; @@ -116,13 +178,21 @@ retry: if (!ret) goto retry; for (i = 0; i < nfds; i++) { + int fd; + if (!(fds[i].revents & POLLIN)) continue; - cs.fd = fds[i].fd; - if (read_socket_line(&cs)) { - LOGWARNING("Received %s", cs.buf); - dealloc(cs.buf); - } + + fd = fds[i].fd; + + ck_rlock(&ci->lock); + HASH_FIND_INT(clients, &fd, client); + if (!client) + LOGWARNING("Failed to find client with fd %d in hashtable!", fd); + else + parse_client_msg(ci, client); + ck_runlock(&ci->lock); + if (--ret < 1) break; }