diff --git a/src/connector.c b/src/connector.c index 97f51418..daf7cebc 100644 --- a/src/connector.c +++ b/src/connector.c @@ -85,27 +85,21 @@ static sender_send_t *delayed_sends; static pthread_mutex_t sender_lock; static pthread_cond_t sender_cond; -/* Accepts incoming connections to the server socket and generates client +/* Accepts incoming connections on the server socket and generates client * instances */ -void *acceptor(void *arg) +static int accept_client(conn_instance_t *ci) { - conn_instance_t *ci = (conn_instance_t *)arg; client_instance_t *client, *old_client; socklen_t address_len; int fd, port; - rename_proc("acceptor"); - -retry: client = ckzalloc(sizeof(client_instance_t)); address_len = sizeof(client->address); - while (!ci->accept) - sleep(1); fd = accept(ci->serverfd, &client->address, &address_len); if (unlikely(fd < 0)) { LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); dealloc(client); - goto out; + return -1; } switch (client->address.sa_family) { @@ -127,7 +121,7 @@ retry: ci->nfds, fd); Close(fd); free(client); - goto retry; + return 0; } keep_sockalive(fd); @@ -144,9 +138,7 @@ retry: ci->nfds++; ck_wunlock(&ci->lock); - goto retry; -out: - return NULL; + return 1; } static int drop_client(conn_instance_t *ci, client_instance_t *client) @@ -163,6 +155,9 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) } ck_wunlock(&ci->lock); + if (fd > -1) + LOGINFO("Connector dropped client %d fd %d", client->id, fd); + return fd; } @@ -279,13 +274,15 @@ void *receiver(void *arg) client_instance_t *client, *tmp; struct pollfd fds[65536]; int ret, nfds, i; + bool update; rename_proc("creceiver"); + /* First fd is reserved for the accepting socket */ + fds[0].fd = ci->serverfd; retry: - nfds = 0; - while (!ci->accept) - sleep(1); + update = false; + nfds = 1; ck_rlock(&ci->lock); HASH_ITER(fdhh, fdclients, client, tmp) { @@ -301,18 +298,24 @@ retry: } ck_runlock(&ci->lock); - if (!nfds) { - cksleep_ms(100); - goto retry; - } - ret = poll(fds, nfds, 100); +repoll: + fds[0].events = POLLIN; + fds[0].revents = 0; + + while (!ci->accept) + sleep(1); + + ret = poll(fds, nfds, 1000); if (unlikely(ret < 0)) { LOGERR("Failed to poll in receiver"); goto out; } - if (!ret) - goto retry; - for (i = 0; i < nfds; i++) { + if (!ret) { + if (update) + goto retry; + goto repoll; + } + for (i = nfds - 1; i > 0; i--) { int fd; if (!fds[i].revents) @@ -326,17 +329,26 @@ retry: ck_runlock(&ci->lock); if (!client) { - /* Probably already removed */ - LOGDEBUG("Failed to find client with polled fd %d in hashtable", - fd); + /* Probably already removed, remove lazily */ + LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable", + i, fd); + update = true; } else parse_client_msg(ci, client); - if (--ret < 1) - break; + /* Reset for the next poll pass */ + fds[i].events = POLLIN; + fds[i].revents = 0; } - goto retry; + /* i should be zero now allowing us to examine the accepting socket */ + if (fds[0].revents) + ret = accept_client(ci); + if (unlikely(ret < 0)) + goto out; + if (ret > 0 || update) + goto retry; + goto repoll; out: return NULL; } @@ -637,7 +649,7 @@ out: int connector(proc_instance_t *pi) { - pthread_t pth_sender, pth_acceptor, pth_receiver; + pthread_t pth_sender, pth_receiver; char *url = NULL, *port = NULL; ckpool_t *ckp = pi->ckp; int sockd, ret = 0; @@ -715,12 +727,9 @@ int connector(proc_instance_t *pi) mutex_init(&sender_lock); cond_init(&sender_cond); create_pthread(&pth_sender, sender, &ci); - create_pthread(&pth_acceptor, acceptor, &ci); create_pthread(&pth_receiver, receiver, &ci); ret = connector_loop(pi, &ci); - - //join_pthread(pth_acceptor); out: return process_exit(ckp, pi, ret); }