Browse Source

Use poll to connect many clients and see which connections need to be read in connector

master
Con Kolivas 11 years ago
parent
commit
3c21f90aab
  1. 2
      configure.ac
  2. 84
      src/connector.c

2
configure.ac

@ -36,7 +36,7 @@ AC_CHECK_LIB(jansson, json_loads, ,
AC_CHECK_HEADERS(stdio.h stdlib.h fcntl.h sys/time.h unistd.h) AC_CHECK_HEADERS(stdio.h stdlib.h fcntl.h sys/time.h unistd.h)
AC_CHECK_HEADERS(ctype.h errno.h byteswap.h string.h time.h) AC_CHECK_HEADERS(ctype.h errno.h byteswap.h string.h time.h)
AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h syslog.h) AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h)
AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h)
AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h)
AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h)

84
src/connector.c

@ -11,6 +11,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/poll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -22,54 +23,104 @@ struct connector_instance {
cklock_t lock; cklock_t lock;
proc_instance_t *pi; proc_instance_t *pi;
int serverfd; int serverfd;
int clients; int nfds;
struct pollfd fds[65536];
}; };
typedef struct connector_instance conn_instance_t; typedef struct connector_instance conn_instance_t;
struct client_instance { struct client_instance {
connsock_t cs;
struct sockaddr address; struct sockaddr address;
socklen_t address_len; socklen_t address_len;
}; };
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
/* Accepts incoming connections to the server socket and generates client
* instances */
void *acceptor(void *arg) void *acceptor(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
proc_instance_t *pi = ci->pi;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
client_instance_t cli; client_instance_t cli;
int fd;
rename_proc("acceptor"); rename_proc("acceptor");
retry: retry:
cli.address_len = sizeof(cli.address); cli.address_len = sizeof(cli.address);
cli.cs.fd = accept(ci->serverfd, &cli.address, &cli.address_len); fd = accept(ci->serverfd, &cli.address, &cli.address_len);
if (unlikely(cli.cs.fd < 0)) { if (unlikely(fd < 0)) {
if (interrupted()) if (interrupted())
goto retry; goto retry;
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
goto out; goto out;
} }
/* Do something here with the client instance instead of just reading
* a line. */ LOGINFO("Connected new client %d on socket %d", ci->nfds, fd);
if (read_socket_line(&cli.cs))
LOGWARNING("Received %s", cli.cs.buf); ck_wlock(&ci->lock);
dealloc(cli.cs.buf); ci->fds[ci->nfds].fd = fd;
close(cli.cs.fd); ci->fds[ci->nfds].events = POLLIN;
ci->nfds++;
ck_wunlock(&ci->lock);
goto retry; goto retry;
out: out:
return NULL; return NULL;
} }
/* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */
void *receiver(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
struct pollfd fds[65536];
int ret, nfds, i;
connsock_t cs;
rename_proc("receiver");
memset(&cs, 0, sizeof(cs));
retry:
dealloc(cs.buf);
ck_rlock(&ci->lock);
memcpy(&fds, ci->fds, sizeof(fds));
nfds = ci->nfds;
ck_runlock(&ci->lock);
ret = poll(fds, nfds, 1);
if (ret < 0) {
if (interrupted())
goto retry;
LOGERR("Failed to poll in receiver");
goto out;
}
if (!ret)
goto retry;
for (i = 0; i < nfds; i++) {
if (!(fds[i].revents & POLLIN))
continue;
cs.fd = fds[i].fd;
if (read_socket_line(&cs)) {
LOGWARNING("Received %s", cs.buf);
dealloc(cs.buf);
}
if (--ret < 1)
break;
}
goto retry;
out:
return NULL;
}
int connector(proc_instance_t *pi) int connector(proc_instance_t *pi)
{ {
pthread_t pth_acceptor, pth_receiver;
char *url = NULL, *port = NULL; char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
pthread_t pth_acceptor;
int sockd, ret = 0; int sockd, ret = 0;
conn_instance_t ci; conn_instance_t ci;
@ -115,12 +166,15 @@ int connector(proc_instance_t *pi)
goto out; goto out;
} }
cklock_init(&ci.lock); cklock_init(&ci.lock);
memset(&ci, 0, sizeof(ci));
ci.pi = pi; ci.pi = pi;
ci.serverfd = sockd; ci.serverfd = sockd;
ci.clients = 0; ci.nfds = 0;
create_pthread(&pth_acceptor, acceptor, &ci); create_pthread(&pth_acceptor, acceptor, &ci);
create_pthread(&pth_receiver, receiver, &ci);
join_pthread(pth_acceptor); join_pthread(pth_acceptor);
ret = 1;
out: out:
LOGINFO("%s connector exiting with return code %d", ckp->name, ret); LOGINFO("%s connector exiting with return code %d", ckp->name, ret);
if (ret) { if (ret) {

Loading…
Cancel
Save