|
|
@ -20,6 +20,8 @@ |
|
|
|
#include "libckpool.h" |
|
|
|
#include "libckpool.h" |
|
|
|
#include "uthash.h" |
|
|
|
#include "uthash.h" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#define MAX_MSGSIZE 1024 |
|
|
|
|
|
|
|
|
|
|
|
struct connector_instance { |
|
|
|
struct connector_instance { |
|
|
|
cklock_t lock; |
|
|
|
cklock_t lock; |
|
|
|
proc_instance_t *pi; |
|
|
|
proc_instance_t *pi; |
|
|
@ -36,7 +38,6 @@ struct client_instance { |
|
|
|
char buf[PAGESIZE]; |
|
|
|
char buf[PAGESIZE]; |
|
|
|
int bufofs; |
|
|
|
int bufofs; |
|
|
|
int fd; |
|
|
|
int fd; |
|
|
|
int id; |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
typedef struct client_instance client_instance_t; |
|
|
|
typedef struct client_instance client_instance_t; |
|
|
@ -70,7 +71,7 @@ retry: |
|
|
|
client->fd = fd; |
|
|
|
client->fd = fd; |
|
|
|
|
|
|
|
|
|
|
|
ck_wlock(&ci->lock); |
|
|
|
ck_wlock(&ci->lock); |
|
|
|
HASH_ADD_INT(clients, id, client); |
|
|
|
HASH_ADD_INT(clients, fd, client); |
|
|
|
ci->nfds++; |
|
|
|
ci->nfds++; |
|
|
|
ck_wunlock(&ci->lock); |
|
|
|
ck_wunlock(&ci->lock); |
|
|
|
|
|
|
|
|
|
|
@ -79,6 +80,68 @@ out: |
|
|
|
return NULL; |
|
|
|
return NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Invalidate this instance */ |
|
|
|
|
|
|
|
static void invalidate_client(client_instance_t *client) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
close(client->fd); |
|
|
|
|
|
|
|
client->fd = -1; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void parse_client_msg(conn_instance_t *ci, client_instance_t *client) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
char msg[PAGESIZE], *eol; |
|
|
|
|
|
|
|
int buflen, ret; |
|
|
|
|
|
|
|
bool moredata; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
|
|
|
|
buflen = PAGESIZE - client->bufofs; |
|
|
|
|
|
|
|
ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); |
|
|
|
|
|
|
|
if (ret < 1) { |
|
|
|
|
|
|
|
/* If we're reading on the first pass, there is supposed to be
|
|
|
|
|
|
|
|
* data waiting for us according to poll() so no data implies |
|
|
|
|
|
|
|
* a lost connection. On repeat reads may have just read all |
|
|
|
|
|
|
|
* the data */ |
|
|
|
|
|
|
|
if (moredata) |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGINFO("Client fd %d disconnected", client->fd); |
|
|
|
|
|
|
|
invalidate_client(client); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
client->bufofs += ret; |
|
|
|
|
|
|
|
if (client->bufofs == PAGESIZE) |
|
|
|
|
|
|
|
moredata = true; |
|
|
|
|
|
|
|
else |
|
|
|
|
|
|
|
moredata = false; |
|
|
|
|
|
|
|
reparse: |
|
|
|
|
|
|
|
eol = memchr(client->buf, '\n', client->bufofs); |
|
|
|
|
|
|
|
if (!eol) { |
|
|
|
|
|
|
|
if (unlikely(client->bufofs > MAX_MSGSIZE)) { |
|
|
|
|
|
|
|
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); |
|
|
|
|
|
|
|
invalidate_client(client); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (moredata) |
|
|
|
|
|
|
|
goto retry; |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Do something useful with this message now */ |
|
|
|
|
|
|
|
buflen = eol - client->buf + 1; |
|
|
|
|
|
|
|
if (unlikely(buflen > MAX_MSGSIZE)) { |
|
|
|
|
|
|
|
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); |
|
|
|
|
|
|
|
invalidate_client(client); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
memcpy(msg, client->buf, buflen); |
|
|
|
|
|
|
|
msg[buflen] = 0; |
|
|
|
|
|
|
|
client->bufofs -= buflen; |
|
|
|
|
|
|
|
memmove(client->buf, client->buf + buflen, client->bufofs); |
|
|
|
|
|
|
|
LOGWARNING("Client fd %d sent message %s", client->fd, msg); |
|
|
|
|
|
|
|
if (client->bufofs) |
|
|
|
|
|
|
|
goto reparse; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* Waits on fds ready to read on from the list stored in conn_instance and
|
|
|
|
/* Waits on fds ready to read on from the list stored in conn_instance and
|
|
|
|
* handles the incoming messages */ |
|
|
|
* handles the incoming messages */ |
|
|
|
void *receiver(void *arg) |
|
|
|
void *receiver(void *arg) |
|
|
@ -87,19 +150,18 @@ void *receiver(void *arg) |
|
|
|
client_instance_t *client, *tmp; |
|
|
|
client_instance_t *client, *tmp; |
|
|
|
struct pollfd fds[65536]; |
|
|
|
struct pollfd fds[65536]; |
|
|
|
int ret, nfds, i; |
|
|
|
int ret, nfds, i; |
|
|
|
connsock_t cs; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rename_proc("receiver"); |
|
|
|
rename_proc("receiver"); |
|
|
|
|
|
|
|
|
|
|
|
memset(&cs, 0, sizeof(cs)); |
|
|
|
|
|
|
|
retry: |
|
|
|
retry: |
|
|
|
dealloc(cs.buf); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
memset(fds, 0, sizeof(fds)); |
|
|
|
memset(fds, 0, sizeof(fds)); |
|
|
|
nfds = 0; |
|
|
|
nfds = 0; |
|
|
|
|
|
|
|
|
|
|
|
ck_rlock(&ci->lock); |
|
|
|
ck_rlock(&ci->lock); |
|
|
|
HASH_ITER(hh, clients, client, tmp) { |
|
|
|
HASH_ITER(hh, clients, client, tmp) { |
|
|
|
|
|
|
|
/* Invalid client */ |
|
|
|
|
|
|
|
if (client->fd == -1) |
|
|
|
|
|
|
|
continue; |
|
|
|
fds[nfds].fd = client->fd; |
|
|
|
fds[nfds].fd = client->fd; |
|
|
|
fds[nfds].events = POLLIN; |
|
|
|
fds[nfds].events = POLLIN; |
|
|
|
nfds++; |
|
|
|
nfds++; |
|
|
@ -116,13 +178,21 @@ retry: |
|
|
|
if (!ret) |
|
|
|
if (!ret) |
|
|
|
goto retry; |
|
|
|
goto retry; |
|
|
|
for (i = 0; i < nfds; i++) { |
|
|
|
for (i = 0; i < nfds; i++) { |
|
|
|
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
|
|
if (!(fds[i].revents & POLLIN)) |
|
|
|
if (!(fds[i].revents & POLLIN)) |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
cs.fd = fds[i].fd; |
|
|
|
|
|
|
|
if (read_socket_line(&cs)) { |
|
|
|
fd = fds[i].fd; |
|
|
|
LOGWARNING("Received %s", cs.buf); |
|
|
|
|
|
|
|
dealloc(cs.buf); |
|
|
|
ck_rlock(&ci->lock); |
|
|
|
} |
|
|
|
HASH_FIND_INT(clients, &fd, client); |
|
|
|
|
|
|
|
if (!client) |
|
|
|
|
|
|
|
LOGWARNING("Failed to find client with fd %d in hashtable!", fd); |
|
|
|
|
|
|
|
else |
|
|
|
|
|
|
|
parse_client_msg(ci, client); |
|
|
|
|
|
|
|
ck_runlock(&ci->lock); |
|
|
|
|
|
|
|
|
|
|
|
if (--ret < 1) |
|
|
|
if (--ret < 1) |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|