Browse Source

Privatise all the connector specific data

master
Con Kolivas 10 years ago
parent
commit
3ddef2d66f
  1. 245
      src/connector.c
  2. 1
      src/generator.c

245
src/connector.c

@ -24,18 +24,6 @@
#define MAX_MSGSIZE 1024
#define SOI (sizeof(int))
struct connector_instance {
cklock_t lock;
proc_instance_t *pi;
int serverfd;
int nfds;
bool accept;
pthread_t pth_sender;
pthread_t pth_receiver;
};
typedef struct connector_instance conn_instance_t;
struct client_instance {
/* For clients hashtable */
UT_hash_handle hh;
@ -56,13 +44,6 @@ struct client_instance {
typedef struct client_instance client_instance_t;
/* For the hashtable of all clients */
static client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */
static client_instance_t *dead_clients;
static int64_t client_id = 1;
struct sender_send {
struct sender_send *next;
struct sender_send *prev;
@ -74,27 +55,48 @@ struct sender_send {
typedef struct sender_send sender_send_t;
/* For the linked list of pending sends */
static sender_send_t *sender_sends;
static sender_send_t *delayed_sends;
/* Private data for the connector */
struct connector_data {
ckpool_t *ckp;
cklock_t lock;
proc_instance_t *pi;
int serverfd;
int nfds;
bool accept;
pthread_t pth_sender;
pthread_t pth_receiver;
/* For the hashtable of all clients */
client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */
client_instance_t *dead_clients;
int64_t client_id;
/* For the linked list of pending sends */
sender_send_t *sender_sends;
sender_send_t *delayed_sends;
/* For protecting the pending sends list */
pthread_mutex_t sender_lock;
pthread_cond_t sender_cond;
};
/* For protecting the pending sends list */
static pthread_mutex_t sender_lock;
static pthread_cond_t sender_cond;
typedef struct connector_data cdata_t;
/* Accepts incoming connections on the server socket and generates client
* instances */
static int accept_client(conn_instance_t *ci, int epfd)
static int accept_client(cdata_t *cdata, int epfd)
{
ckpool_t *ckp = ci->pi->ckp;
ckpool_t *ckp = cdata->ckp;
client_instance_t *client;
struct epoll_event event;
int fd, port, no_clients;
socklen_t address_len;
ck_rlock(&ci->lock);
no_clients = HASH_COUNT(clients);
ck_runlock(&ci->lock);
ck_rlock(&cdata->lock);
no_clients = HASH_COUNT(cdata->clients);
ck_runlock(&cdata->lock);
if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) {
LOGWARNING("Server full with %d clients", no_clients);
@ -103,7 +105,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
client = ckzalloc(sizeof(client_instance_t));
address_len = sizeof(client->address);
fd = accept(ci->serverfd, &client->address, &address_len);
fd = accept(cdata->serverfd, &client->address, &address_len);
if (unlikely(fd < 0)) {
/* Handle these errors gracefully should we ever share this
* socket */
@ -111,7 +113,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
LOGERR("Recoverable error on accept in accept_client");
return 0;
}
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
LOGERR("Failed to accept on socket %d in acceptor", cdata->serverfd);
dealloc(client);
return -1;
}
@ -132,7 +134,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
break;
default:
LOGWARNING("Unknown INET type for client %d on socket %d",
ci->nfds, fd);
cdata->nfds, fd);
Close(fd);
free(client);
return 0;
@ -142,7 +144,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
nolinger_socket(fd);
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
ci->nfds, fd, no_clients, client->address_name, port);
cdata->nfds, fd, no_clients, client->address_name, port);
client->fd = fd;
event.data.ptr = client;
@ -153,27 +155,27 @@ static int accept_client(conn_instance_t *ci, int epfd)
return 0;
}
ck_wlock(&ci->lock);
client->id = client_id++;
HASH_ADD_I64(clients, id, client);
ci->nfds++;
ck_wunlock(&ci->lock);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
return 1;
}
static int drop_client(conn_instance_t *ci, client_instance_t *client)
static int drop_client(cdata_t *cdata, client_instance_t *client)
{
int fd;
ck_wlock(&ci->lock);
ck_wlock(&cdata->lock);
fd = client->fd;
if (fd != -1) {
Close(client->fd);
HASH_DEL(clients, client);
LL_PREPEND(dead_clients, client);
HASH_DEL(cdata->clients, client);
LL_PREPEND(cdata->dead_clients, client);
}
ck_wunlock(&ci->lock);
ck_wunlock(&cdata->lock);
if (fd > -1)
LOGINFO("Connector dropped client %d fd %d", client->id, fd);
@ -192,26 +194,26 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
/* Invalidate this instance. Remove them from the hashtables we look up
* regularly but keep the instances in a linked list indefinitely in case we
* still reference any of its members. */
static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client)
static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{
drop_client(ci, client);
drop_client(cdata, client);
if (ckp->passthrough)
return;
stratifier_drop_client(ckp, client->id);
}
static void send_client(conn_instance_t *ci, int64_t id, char *buf);
static void send_client(cdata_t *cdata, int64_t id, char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{
int buflen, ret, selfail = 0;
ckpool_t *ckp = ci->pi->ckp;
ckpool_t *ckp = cdata->ckp;
char msg[PAGESIZE], *eol;
json_t *val;
retry:
/* Select should always return positive after poll unless we have
* been disconnected. On retries, decide whether we should do further
* been disconnected. On retries, decdatade whether we should do further
* reads based on select readiness and only fail if we get an error. */
ret = wait_read_select(client->fd, 0);
if (ret < 1) {
@ -219,7 +221,7 @@ retry:
return;
LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
selfail = -1;
@ -231,7 +233,7 @@ retry:
* client has disconnected. */
LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs += ret;
@ -240,7 +242,7 @@ reparse:
if (!eol) {
if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
goto retry;
@ -250,7 +252,7 @@ reparse:
buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
memcpy(msg, client->buf, buflen);
@ -262,8 +264,8 @@ reparse:
char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %d sent invalid json message %s", client->id, msg);
send_client(ci, client->id, buf);
invalidate_client(ckp, ci, client);
send_client(cdata, client->id, buf);
invalidate_client(ckp, cdata, client);
return;
} else {
int64_t passthrough_id;
@ -295,7 +297,7 @@ reparse:
* handles the incoming messages */
void *receiver(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
cdata_t *cdata = (cdata_t *)arg;
struct epoll_event event;
bool maxconn = true;
int ret, epfd;
@ -307,9 +309,9 @@ void *receiver(void *arg)
LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL;
}
event.data.fd = ci->serverfd;
event.data.fd = cdata->serverfd;
event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event);
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd, &event);
if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL;
@ -318,7 +320,7 @@ void *receiver(void *arg)
while (42) {
client_instance_t *client;
while (unlikely(!ci->accept))
while (unlikely(!cdata->accept))
cksleep_ms(100);
ret = epoll_wait(epfd, &event, 1, 1000);
if (unlikely(ret == -1)) {
@ -333,12 +335,12 @@ void *receiver(void *arg)
* can receive connections. */
LOGDEBUG("Dropping listen backlog to 0");
maxconn = false;
listen(ci->serverfd, 0);
listen(cdata->serverfd, 0);
}
continue;
}
if (event.data.fd == ci->serverfd) {
ret = accept_client(ci, epfd);
if (event.data.fd == cdata->serverfd) {
ret = accept_client(cdata, epfd);
if (unlikely(ret < 0)) {
LOGEMERG("FATAL: Failed to accept_client in receiver");
break;
@ -349,10 +351,10 @@ void *receiver(void *arg)
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
/* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(ci->pi->ckp, ci, client);
invalidate_client(cdata->pi->ckp, cdata, client);
continue;
}
parse_client_msg(ci, client);
parse_client_msg(cdata, client);
}
return NULL;
}
@ -361,8 +363,8 @@ void *receiver(void *arg)
* ready for writing immediately to not delay other messages. */
void *sender(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
ckpool_t *ckp = ci->pi->ckp;
cdata_t *cdata = (cdata_t *)arg;
ckpool_t *ckp = cdata->ckp;
bool sent = false;
rename_proc("csender");
@ -372,23 +374,23 @@ void *sender(void *arg)
client_instance_t *client;
int ret, fd, ofs = 0;
mutex_lock(&sender_lock);
mutex_lock(&cdata->sender_lock);
/* Poll every 100ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the
* delayed sends. */
if (!sender_sends && !sent) {
if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 100000000};
ts_t timeout_ts;
ts_realtime(&timeout_ts);
timeraddspec(&timeout_ts, &polltime);
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts);
}
sender_send = sender_sends;
sender_send = cdata->sender_sends;
if (sender_send)
DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock);
DL_DELETE(cdata->sender_sends, sender_send);
mutex_unlock(&cdata->sender_lock);
sent = false;
@ -396,17 +398,17 @@ void *sender(void *arg)
* conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) {
if (!delayed_sends)
if (!cdata->delayed_sends)
continue;
sender_send = delayed_sends;
DL_DELETE(delayed_sends, sender_send);
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
}
client = sender_send->client;
ck_rlock(&ci->lock);
ck_rlock(&cdata->lock);
fd = client->fd;
ck_runlock(&ci->lock);
ck_runlock(&cdata->lock);
if (fd == -1) {
LOGDEBUG("Discarding message sent to invalidated client");
@ -422,7 +424,7 @@ void *sender(void *arg)
if (ret < 1) {
if (ret < 0) {
LOGINFO("Client id %d fd %d interrupted", client->id, fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
free(sender_send->buf);
free(sender_send);
continue;
@ -432,7 +434,7 @@ void *sender(void *arg)
/* Append it to the tail of the delayed sends list.
* This is the only function that alters it so no
* locking is required. */
DL_APPEND(delayed_sends, sender_send);
DL_APPEND(cdata->delayed_sends, sender_send);
continue;
}
sent = true;
@ -440,7 +442,7 @@ void *sender(void *arg)
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
LOGINFO("Client id %d fd %d disconnected", client->id, fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
break;
}
ofs += ret;
@ -455,7 +457,7 @@ void *sender(void *arg)
/* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */
static void send_client(conn_instance_t *ci, int64_t id, char *buf)
static void send_client(cdata_t *cdata, int64_t id, char *buf)
{
sender_send_t *sender_send;
client_instance_t *client;
@ -472,19 +474,19 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf)
return;
}
ck_rlock(&ci->lock);
HASH_FIND_I64(clients, &id, client);
ck_rlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (likely(client))
fd = client->fd;
ck_runlock(&ci->lock);
ck_runlock(&cdata->lock);
if (unlikely(fd == -1)) {
ckpool_t *ckp = ci->pi->ckp;
ckpool_t *ckp = cdata->ckp;
if (client) {
/* This shouldn't happen */
LOGWARNING("Client id %ld disconnected but fd already invalidated!", id);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
} else {
LOGINFO("Connector failed to find client id %ld to send to", id);
stratifier_drop_client(ckp, id);
@ -498,34 +500,34 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf)
sender_send->buf = buf;
sender_send->len = len;
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
pthread_cond_signal(&sender_cond);
mutex_unlock(&sender_lock);
mutex_lock(&cdata->sender_lock);
DL_APPEND(cdata->sender_sends, sender_send);
pthread_cond_signal(&cdata->sender_cond);
mutex_unlock(&cdata->sender_lock);
}
static client_instance_t *client_by_id(conn_instance_t *ci, int64_t id)
static client_instance_t *client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_rlock(&ci->lock);
HASH_FIND_I64(clients, &id, client);
ck_runlock(&ci->lock);
ck_rlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
ck_runlock(&cdata->lock);
return client;
}
static void passthrough_client(conn_instance_t *ci, client_instance_t *client)
static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{
char *buf;
LOGINFO("Connector adding passthrough client %d", client->id);
client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(ci, client->id, buf);
send_client(cdata, client->id, buf);
}
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{
int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id;
@ -546,12 +548,12 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
LOGWARNING("%s connector ready", ckp->name);
retry:
if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) {
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) {
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
@ -579,12 +581,12 @@ retry:
}
if (cmdmatch(buf, "accept")) {
LOGDEBUG("Connector received accept signal");
ci->accept = true;
cdata->accept = true;
goto retry;
}
if (cmdmatch(buf, "reject")) {
LOGDEBUG("Connector received reject signal");
ci->accept = false;
cdata->accept = false;
goto retry;
}
if (cmdmatch(buf, "loglevel")) {
@ -603,12 +605,12 @@ retry:
goto retry;
}
client_id = client_id64 & 0xffffffffll;
client = client_by_id(ci, client_id);
client = client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to drop", client_id);
goto retry;
}
ret = drop_client(ci, client);
ret = drop_client(cdata, client);
if (ret >= 0)
LOGINFO("Connector dropped client id: %ld", client_id);
goto retry;
@ -621,16 +623,16 @@ retry:
LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry;
}
client = client_by_id(ci, client_id);
client = client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to pass through", client_id);
goto retry;
}
passthrough_client(ci, client);
passthrough_client(cdata, client);
goto retry;
}
if (cmdmatch(buf, "getfd")) {
send_fd(ci->serverfd, sockd);
send_fd(cdata->serverfd, sockd);
goto retry;
}
@ -655,7 +657,7 @@ retry:
dealloc(buf);
buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n");
send_client(ci, client_id, buf);
send_client(cdata, client_id, buf);
json_decref(json_msg);
buf = NULL;
@ -668,14 +670,16 @@ out:
int connector(proc_instance_t *pi)
{
cdata_t *cdata = ckzalloc(sizeof(cdata_t));
char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
conn_instance_t ci;
const int on = 1;
int tries = 0;
LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata;
cdata->ckp = ckp;
if (ckp->oldconnfd > 0) {
sockd = ckp->oldconnfd;
@ -737,17 +741,18 @@ int connector(proc_instance_t *pi)
goto out;
}
cklock_init(&ci.lock);
memset(&ci, 0, sizeof(ci));
ci.pi = pi;
ci.serverfd = sockd;
ci.nfds = 0;
mutex_init(&sender_lock);
cond_init(&sender_cond);
create_pthread(&ci.pth_sender, sender, &ci);
create_pthread(&ci.pth_receiver, receiver, &ci);
ret = connector_loop(pi, &ci);
cklock_init(&cdata->lock);
cdata->pi = pi;
cdata->serverfd = sockd;
cdata->nfds = 0;
cdata->client_id = 1;
mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, &cdata);
create_pthread(&cdata->pth_receiver, receiver, &cdata);
ret = connector_loop(pi, cdata);
out:
dealloc(ckp->data);
return process_exit(ckp, pi, ret);
}

1
src/generator.c

@ -1745,5 +1745,6 @@ int generator(proc_instance_t *pi)
else
ret = server_mode(ckp, pi);
dealloc(ckp->data);
return process_exit(ckp, pi, ret);
}

Loading…
Cancel
Save