Browse Source

Add the server accept to the receiver's poll function, eliminating the need for short polls and one extra thread

master
Con Kolivas 10 years ago
parent
commit
dd187da977
  1. 77
      src/connector.c

77
src/connector.c

@ -85,27 +85,21 @@ static sender_send_t *delayed_sends;
static pthread_mutex_t sender_lock; static pthread_mutex_t sender_lock;
static pthread_cond_t sender_cond; static pthread_cond_t sender_cond;
/* Accepts incoming connections to the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
void *acceptor(void *arg) static int accept_client(conn_instance_t *ci)
{ {
conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client, *old_client; client_instance_t *client, *old_client;
socklen_t address_len; socklen_t address_len;
int fd, port; int fd, port;
rename_proc("acceptor");
retry:
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
address_len = sizeof(client->address); address_len = sizeof(client->address);
while (!ci->accept)
sleep(1);
fd = accept(ci->serverfd, &client->address, &address_len); fd = accept(ci->serverfd, &client->address, &address_len);
if (unlikely(fd < 0)) { if (unlikely(fd < 0)) {
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
dealloc(client); dealloc(client);
goto out; return -1;
} }
switch (client->address.sa_family) { switch (client->address.sa_family) {
@ -127,7 +121,7 @@ retry:
ci->nfds, fd); ci->nfds, fd);
Close(fd); Close(fd);
free(client); free(client);
goto retry; return 0;
} }
keep_sockalive(fd); keep_sockalive(fd);
@ -144,9 +138,7 @@ retry:
ci->nfds++; ci->nfds++;
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
goto retry; return 1;
out:
return NULL;
} }
static int drop_client(conn_instance_t *ci, client_instance_t *client) static int drop_client(conn_instance_t *ci, client_instance_t *client)
@ -163,6 +155,9 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client)
} }
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
if (fd > -1)
LOGINFO("Connector dropped client %d fd %d", client->id, fd);
return fd; return fd;
} }
@ -279,13 +274,15 @@ void *receiver(void *arg)
client_instance_t *client, *tmp; client_instance_t *client, *tmp;
struct pollfd fds[65536]; struct pollfd fds[65536];
int ret, nfds, i; int ret, nfds, i;
bool update;
rename_proc("creceiver"); rename_proc("creceiver");
/* First fd is reserved for the accepting socket */
fds[0].fd = ci->serverfd;
retry: retry:
nfds = 0; update = false;
while (!ci->accept) nfds = 1;
sleep(1);
ck_rlock(&ci->lock); ck_rlock(&ci->lock);
HASH_ITER(fdhh, fdclients, client, tmp) { HASH_ITER(fdhh, fdclients, client, tmp) {
@ -301,18 +298,24 @@ retry:
} }
ck_runlock(&ci->lock); ck_runlock(&ci->lock);
if (!nfds) { repoll:
cksleep_ms(100); fds[0].events = POLLIN;
goto retry; fds[0].revents = 0;
}
ret = poll(fds, nfds, 100); while (!ci->accept)
sleep(1);
ret = poll(fds, nfds, 1000);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGERR("Failed to poll in receiver"); LOGERR("Failed to poll in receiver");
goto out; goto out;
} }
if (!ret) if (!ret) {
goto retry; if (update)
for (i = 0; i < nfds; i++) { goto retry;
goto repoll;
}
for (i = nfds - 1; i > 0; i--) {
int fd; int fd;
if (!fds[i].revents) if (!fds[i].revents)
@ -326,17 +329,26 @@ retry:
ck_runlock(&ci->lock); ck_runlock(&ci->lock);
if (!client) { if (!client) {
/* Probably already removed */ /* Probably already removed, remove lazily */
LOGDEBUG("Failed to find client with polled fd %d in hashtable", LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable",
fd); i, fd);
update = true;
} else } else
parse_client_msg(ci, client); parse_client_msg(ci, client);
if (--ret < 1) /* Reset for the next poll pass */
break; fds[i].events = POLLIN;
fds[i].revents = 0;
} }
goto retry;
/* i should be zero now allowing us to examine the accepting socket */
if (fds[0].revents)
ret = accept_client(ci);
if (unlikely(ret < 0))
goto out;
if (ret > 0 || update)
goto retry;
goto repoll;
out: out:
return NULL; return NULL;
} }
@ -637,7 +649,7 @@ out:
int connector(proc_instance_t *pi) int connector(proc_instance_t *pi)
{ {
pthread_t pth_sender, pth_acceptor, pth_receiver; pthread_t pth_sender, pth_receiver;
char *url = NULL, *port = NULL; char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int sockd, ret = 0; int sockd, ret = 0;
@ -715,12 +727,9 @@ int connector(proc_instance_t *pi)
mutex_init(&sender_lock); mutex_init(&sender_lock);
cond_init(&sender_cond); cond_init(&sender_cond);
create_pthread(&pth_sender, sender, &ci); create_pthread(&pth_sender, sender, &ci);
create_pthread(&pth_acceptor, acceptor, &ci);
create_pthread(&pth_receiver, receiver, &ci); create_pthread(&pth_receiver, receiver, &ci);
ret = connector_loop(pi, &ci); ret = connector_loop(pi, &ci);
//join_pthread(pth_acceptor);
out: out:
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);
} }

Loading…
Cancel
Save