From 3c21f90aabb72faf456ff94d17c75ee95adb03f9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 23 Apr 2014 16:34:42 +1000 Subject: [PATCH] Use poll to connect many clients and see which connections need to be read in connector --- configure.ac | 2 +- src/connector.c | 84 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/configure.ac b/configure.ac index 3807ec59..042c953f 100644 --- a/configure.ac +++ b/configure.ac @@ -36,7 +36,7 @@ AC_CHECK_LIB(jansson, json_loads, , AC_CHECK_HEADERS(stdio.h stdlib.h fcntl.h sys/time.h unistd.h) AC_CHECK_HEADERS(ctype.h errno.h byteswap.h string.h time.h) -AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h syslog.h) +AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h) diff --git a/src/connector.c b/src/connector.c index 3d9821bb..7b772ec0 100644 --- a/src/connector.c +++ b/src/connector.c @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -22,54 +23,104 @@ struct connector_instance { cklock_t lock; proc_instance_t *pi; int serverfd; - int clients; + int nfds; + struct pollfd fds[65536]; }; typedef struct connector_instance conn_instance_t; struct client_instance { - connsock_t cs; struct sockaddr address; socklen_t address_len; }; typedef struct client_instance client_instance_t; +/* Accepts incoming connections to the server socket and generates client + * instances */ void *acceptor(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - proc_instance_t *pi = ci->pi; - unixsock_t *us = &pi->us; - ckpool_t *ckp = pi->ckp; client_instance_t cli; + int fd; rename_proc("acceptor"); retry: cli.address_len = sizeof(cli.address); - cli.cs.fd = accept(ci->serverfd, &cli.address, &cli.address_len); - if (unlikely(cli.cs.fd < 0)) { + fd = accept(ci->serverfd, &cli.address, &cli.address_len); + if (unlikely(fd < 0)) { if (interrupted()) goto retry; LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); goto out; } - /* Do something here with the client instance instead of just reading - * a line. */ - if (read_socket_line(&cli.cs)) - LOGWARNING("Received %s", cli.cs.buf); - dealloc(cli.cs.buf); - close(cli.cs.fd); + + LOGINFO("Connected new client %d on socket %d", ci->nfds, fd); + + ck_wlock(&ci->lock); + ci->fds[ci->nfds].fd = fd; + ci->fds[ci->nfds].events = POLLIN; + ci->nfds++; + ck_wunlock(&ci->lock); + + goto retry; out: return NULL; } +/* Waits on fds ready to read on from the list stored in conn_instance and + * handles the incoming messages */ +void *receiver(void *arg) +{ + conn_instance_t *ci = (conn_instance_t *)arg; + struct pollfd fds[65536]; + int ret, nfds, i; + connsock_t cs; + + rename_proc("receiver"); + + memset(&cs, 0, sizeof(cs)); +retry: + dealloc(cs.buf); + + ck_rlock(&ci->lock); + memcpy(&fds, ci->fds, sizeof(fds)); + nfds = ci->nfds; + ck_runlock(&ci->lock); + + ret = poll(fds, nfds, 1); + if (ret < 0) { + if (interrupted()) + goto retry; + LOGERR("Failed to poll in receiver"); + goto out; + } + if (!ret) + goto retry; + for (i = 0; i < nfds; i++) { + if (!(fds[i].revents & POLLIN)) + continue; + cs.fd = fds[i].fd; + if (read_socket_line(&cs)) { + LOGWARNING("Received %s", cs.buf); + dealloc(cs.buf); + } + if (--ret < 1) + break; + } + goto retry; + +out: + return NULL; +} + int connector(proc_instance_t *pi) { + pthread_t pth_acceptor, pth_receiver; char *url = NULL, *port = NULL; ckpool_t *ckp = pi->ckp; - pthread_t pth_acceptor; int sockd, ret = 0; conn_instance_t ci; @@ -115,12 +166,15 @@ int connector(proc_instance_t *pi) goto out; } cklock_init(&ci.lock); + memset(&ci, 0, sizeof(ci)); ci.pi = pi; ci.serverfd = sockd; - ci.clients = 0; + ci.nfds = 0; create_pthread(&pth_acceptor, acceptor, &ci); + create_pthread(&pth_receiver, receiver, &ci); join_pthread(pth_acceptor); + ret = 1; out: LOGINFO("%s connector exiting with return code %d", ckp->name, ret); if (ret) {