diff --git a/src/connector.c b/src/connector.c index 401f70a4..8c098b96 100644 --- a/src/connector.c +++ b/src/connector.c @@ -24,18 +24,6 @@ #define MAX_MSGSIZE 1024 #define SOI (sizeof(int)) -struct connector_instance { - cklock_t lock; - proc_instance_t *pi; - int serverfd; - int nfds; - bool accept; - pthread_t pth_sender; - pthread_t pth_receiver; -}; - -typedef struct connector_instance conn_instance_t; - struct client_instance { /* For clients hashtable */ UT_hash_handle hh; @@ -56,13 +44,6 @@ struct client_instance { typedef struct client_instance client_instance_t; -/* For the hashtable of all clients */ -static client_instance_t *clients; -/* Linked list of dead clients no longer in use but may still have references */ -static client_instance_t *dead_clients; - -static int64_t client_id = 1; - struct sender_send { struct sender_send *next; struct sender_send *prev; @@ -74,27 +55,48 @@ struct sender_send { typedef struct sender_send sender_send_t; -/* For the linked list of pending sends */ -static sender_send_t *sender_sends; -static sender_send_t *delayed_sends; +/* Private data for the connector */ +struct connector_data { + ckpool_t *ckp; + cklock_t lock; + proc_instance_t *pi; + int serverfd; + int nfds; + bool accept; + pthread_t pth_sender; + pthread_t pth_receiver; + + /* For the hashtable of all clients */ + client_instance_t *clients; + /* Linked list of dead clients no longer in use but may still have references */ + client_instance_t *dead_clients; + + int64_t client_id; + + /* For the linked list of pending sends */ + sender_send_t *sender_sends; + sender_send_t *delayed_sends; + + /* For protecting the pending sends list */ + pthread_mutex_t sender_lock; + pthread_cond_t sender_cond; +}; -/* For protecting the pending sends list */ -static pthread_mutex_t sender_lock; -static pthread_cond_t sender_cond; +typedef struct connector_data cdata_t; /* Accepts incoming connections on the server socket and generates client * instances */ -static int accept_client(conn_instance_t *ci, int epfd) +static int accept_client(cdata_t *cdata, int epfd) { - ckpool_t *ckp = ci->pi->ckp; + ckpool_t *ckp = cdata->ckp; client_instance_t *client; struct epoll_event event; int fd, port, no_clients; socklen_t address_len; - ck_rlock(&ci->lock); - no_clients = HASH_COUNT(clients); - ck_runlock(&ci->lock); + ck_rlock(&cdata->lock); + no_clients = HASH_COUNT(cdata->clients); + ck_runlock(&cdata->lock); if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) { LOGWARNING("Server full with %d clients", no_clients); @@ -103,7 +105,7 @@ static int accept_client(conn_instance_t *ci, int epfd) client = ckzalloc(sizeof(client_instance_t)); address_len = sizeof(client->address); - fd = accept(ci->serverfd, &client->address, &address_len); + fd = accept(cdata->serverfd, &client->address, &address_len); if (unlikely(fd < 0)) { /* Handle these errors gracefully should we ever share this * socket */ @@ -111,7 +113,7 @@ static int accept_client(conn_instance_t *ci, int epfd) LOGERR("Recoverable error on accept in accept_client"); return 0; } - LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); + LOGERR("Failed to accept on socket %d in acceptor", cdata->serverfd); dealloc(client); return -1; } @@ -132,7 +134,7 @@ static int accept_client(conn_instance_t *ci, int epfd) break; default: LOGWARNING("Unknown INET type for client %d on socket %d", - ci->nfds, fd); + cdata->nfds, fd); Close(fd); free(client); return 0; @@ -142,7 +144,7 @@ static int accept_client(conn_instance_t *ci, int epfd) nolinger_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", - ci->nfds, fd, no_clients, client->address_name, port); + cdata->nfds, fd, no_clients, client->address_name, port); client->fd = fd; event.data.ptr = client; @@ -153,27 +155,27 @@ static int accept_client(conn_instance_t *ci, int epfd) return 0; } - ck_wlock(&ci->lock); - client->id = client_id++; - HASH_ADD_I64(clients, id, client); - ci->nfds++; - ck_wunlock(&ci->lock); + ck_wlock(&cdata->lock); + client->id = cdata->client_id++; + HASH_ADD_I64(cdata->clients, id, client); + cdata->nfds++; + ck_wunlock(&cdata->lock); return 1; } -static int drop_client(conn_instance_t *ci, client_instance_t *client) +static int drop_client(cdata_t *cdata, client_instance_t *client) { int fd; - ck_wlock(&ci->lock); + ck_wlock(&cdata->lock); fd = client->fd; if (fd != -1) { Close(client->fd); - HASH_DEL(clients, client); - LL_PREPEND(dead_clients, client); + HASH_DEL(cdata->clients, client); + LL_PREPEND(cdata->dead_clients, client); } - ck_wunlock(&ci->lock); + ck_wunlock(&cdata->lock); if (fd > -1) LOGINFO("Connector dropped client %d fd %d", client->id, fd); @@ -192,26 +194,26 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) /* Invalidate this instance. Remove them from the hashtables we look up * regularly but keep the instances in a linked list indefinitely in case we * still reference any of its members. */ -static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client) +static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - drop_client(ci, client); + drop_client(cdata, client); if (ckp->passthrough) return; stratifier_drop_client(ckp, client->id); } -static void send_client(conn_instance_t *ci, int64_t id, char *buf); +static void send_client(cdata_t *cdata, int64_t id, char *buf); -static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) +static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { int buflen, ret, selfail = 0; - ckpool_t *ckp = ci->pi->ckp; + ckpool_t *ckp = cdata->ckp; char msg[PAGESIZE], *eol; json_t *val; retry: /* Select should always return positive after poll unless we have - * been disconnected. On retries, decide whether we should do further + * been disconnected. On retries, decdatade whether we should do further * reads based on select readiness and only fail if we get an error. */ ret = wait_read_select(client->fd, 0); if (ret < 1) { @@ -219,7 +221,7 @@ retry: return; LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); return; } selfail = -1; @@ -231,7 +233,7 @@ retry: * client has disconnected. */ LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); return; } client->bufofs += ret; @@ -240,7 +242,7 @@ reparse: if (!eol) { if (unlikely(client->bufofs > MAX_MSGSIZE)) { LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); return; } goto retry; @@ -250,7 +252,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); return; } memcpy(msg, client->buf, buflen); @@ -262,8 +264,8 @@ reparse: char *buf = strdup("Invalid JSON, disconnecting\n"); LOGINFO("Client id %d sent invalid json message %s", client->id, msg); - send_client(ci, client->id, buf); - invalidate_client(ckp, ci, client); + send_client(cdata, client->id, buf); + invalidate_client(ckp, cdata, client); return; } else { int64_t passthrough_id; @@ -295,7 +297,7 @@ reparse: * handles the incoming messages */ void *receiver(void *arg) { - conn_instance_t *ci = (conn_instance_t *)arg; + cdata_t *cdata = (cdata_t *)arg; struct epoll_event event; bool maxconn = true; int ret, epfd; @@ -307,9 +309,9 @@ void *receiver(void *arg) LOGEMERG("FATAL: Failed to create epoll in receiver"); return NULL; } - event.data.fd = ci->serverfd; + event.data.fd = cdata->serverfd; event.events = EPOLLIN; - ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event); + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd, &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); return NULL; @@ -318,7 +320,7 @@ void *receiver(void *arg) while (42) { client_instance_t *client; - while (unlikely(!ci->accept)) + while (unlikely(!cdata->accept)) cksleep_ms(100); ret = epoll_wait(epfd, &event, 1, 1000); if (unlikely(ret == -1)) { @@ -333,12 +335,12 @@ void *receiver(void *arg) * can receive connections. */ LOGDEBUG("Dropping listen backlog to 0"); maxconn = false; - listen(ci->serverfd, 0); + listen(cdata->serverfd, 0); } continue; } - if (event.data.fd == ci->serverfd) { - ret = accept_client(ci, epfd); + if (event.data.fd == cdata->serverfd) { + ret = accept_client(cdata, epfd); if (unlikely(ret < 0)) { LOGEMERG("FATAL: Failed to accept_client in receiver"); break; @@ -349,10 +351,10 @@ void *receiver(void *arg) if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { /* Client disconnected */ LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(ci->pi->ckp, ci, client); + invalidate_client(cdata->pi->ckp, cdata, client); continue; } - parse_client_msg(ci, client); + parse_client_msg(cdata, client); } return NULL; } @@ -361,8 +363,8 @@ void *receiver(void *arg) * ready for writing immediately to not delay other messages. */ void *sender(void *arg) { - conn_instance_t *ci = (conn_instance_t *)arg; - ckpool_t *ckp = ci->pi->ckp; + cdata_t *cdata = (cdata_t *)arg; + ckpool_t *ckp = cdata->ckp; bool sent = false; rename_proc("csender"); @@ -372,23 +374,23 @@ void *sender(void *arg) client_instance_t *client; int ret, fd, ofs = 0; - mutex_lock(&sender_lock); + mutex_lock(&cdata->sender_lock); /* Poll every 100ms if there are no new sends. Re-examine * delayed sends immediately after a successful send in case * endless new sends more frequently end up starving the * delayed sends. */ - if (!sender_sends && !sent) { + if (!cdata->sender_sends && !sent) { const ts_t polltime = {0, 100000000}; ts_t timeout_ts; ts_realtime(&timeout_ts); timeraddspec(&timeout_ts, &polltime); - pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); + pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } - sender_send = sender_sends; + sender_send = cdata->sender_sends; if (sender_send) - DL_DELETE(sender_sends, sender_send); - mutex_unlock(&sender_lock); + DL_DELETE(cdata->sender_sends, sender_send); + mutex_unlock(&cdata->sender_lock); sent = false; @@ -396,17 +398,17 @@ void *sender(void *arg) * conditional with no new sends appearing or have just * serviced another message successfully. */ if (!sender_send) { - if (!delayed_sends) + if (!cdata->delayed_sends) continue; - sender_send = delayed_sends; - DL_DELETE(delayed_sends, sender_send); + sender_send = cdata->delayed_sends; + DL_DELETE(cdata->delayed_sends, sender_send); } client = sender_send->client; - ck_rlock(&ci->lock); + ck_rlock(&cdata->lock); fd = client->fd; - ck_runlock(&ci->lock); + ck_runlock(&cdata->lock); if (fd == -1) { LOGDEBUG("Discarding message sent to invalidated client"); @@ -422,7 +424,7 @@ void *sender(void *arg) if (ret < 1) { if (ret < 0) { LOGINFO("Client id %d fd %d interrupted", client->id, fd); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); free(sender_send->buf); free(sender_send); continue; @@ -432,7 +434,7 @@ void *sender(void *arg) /* Append it to the tail of the delayed sends list. * This is the only function that alters it so no * locking is required. */ - DL_APPEND(delayed_sends, sender_send); + DL_APPEND(cdata->delayed_sends, sender_send); continue; } sent = true; @@ -440,7 +442,7 @@ void *sender(void *arg) ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { LOGINFO("Client id %d fd %d disconnected", client->id, fd); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); break; } ofs += ret; @@ -455,7 +457,7 @@ void *sender(void *arg) /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(conn_instance_t *ci, int64_t id, char *buf) +static void send_client(cdata_t *cdata, int64_t id, char *buf) { sender_send_t *sender_send; client_instance_t *client; @@ -472,19 +474,19 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf) return; } - ck_rlock(&ci->lock); - HASH_FIND_I64(clients, &id, client); + ck_rlock(&cdata->lock); + HASH_FIND_I64(cdata->clients, &id, client); if (likely(client)) fd = client->fd; - ck_runlock(&ci->lock); + ck_runlock(&cdata->lock); if (unlikely(fd == -1)) { - ckpool_t *ckp = ci->pi->ckp; + ckpool_t *ckp = cdata->ckp; if (client) { /* This shouldn't happen */ LOGWARNING("Client id %ld disconnected but fd already invalidated!", id); - invalidate_client(ckp, ci, client); + invalidate_client(ckp, cdata, client); } else { LOGINFO("Connector failed to find client id %ld to send to", id); stratifier_drop_client(ckp, id); @@ -498,34 +500,34 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf) sender_send->buf = buf; sender_send->len = len; - mutex_lock(&sender_lock); - DL_APPEND(sender_sends, sender_send); - pthread_cond_signal(&sender_cond); - mutex_unlock(&sender_lock); + mutex_lock(&cdata->sender_lock); + DL_APPEND(cdata->sender_sends, sender_send); + pthread_cond_signal(&cdata->sender_cond); + mutex_unlock(&cdata->sender_lock); } -static client_instance_t *client_by_id(conn_instance_t *ci, int64_t id) +static client_instance_t *client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; - ck_rlock(&ci->lock); - HASH_FIND_I64(clients, &id, client); - ck_runlock(&ci->lock); + ck_rlock(&cdata->lock); + HASH_FIND_I64(cdata->clients, &id, client); + ck_runlock(&cdata->lock); return client; } -static void passthrough_client(conn_instance_t *ci, client_instance_t *client) +static void passthrough_client(cdata_t *cdata, client_instance_t *client) { char *buf; LOGINFO("Connector adding passthrough client %d", client->id); client->passthrough = true; ASPRINTF(&buf, "{\"result\": true}\n"); - send_client(ci, client->id, buf); + send_client(cdata, client->id, buf); } -static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) +static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { int sockd = -1, ret = 0, selret; int64_t client_id64, client_id; @@ -546,12 +548,12 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) LOGWARNING("%s connector ready", ckp->name); retry: - if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) { + if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { LOGEMERG("Connector sender thread shutdown, exiting"); ret = 1; goto out; } - if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) { + if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { LOGEMERG("Connector receiver thread shutdown, exiting"); ret = 1; goto out; @@ -579,12 +581,12 @@ retry: } if (cmdmatch(buf, "accept")) { LOGDEBUG("Connector received accept signal"); - ci->accept = true; + cdata->accept = true; goto retry; } if (cmdmatch(buf, "reject")) { LOGDEBUG("Connector received reject signal"); - ci->accept = false; + cdata->accept = false; goto retry; } if (cmdmatch(buf, "loglevel")) { @@ -603,12 +605,12 @@ retry: goto retry; } client_id = client_id64 & 0xffffffffll; - client = client_by_id(ci, client_id); + client = client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %ld to drop", client_id); goto retry; } - ret = drop_client(ci, client); + ret = drop_client(cdata, client); if (ret >= 0) LOGINFO("Connector dropped client id: %ld", client_id); goto retry; @@ -621,16 +623,16 @@ retry: LOGDEBUG("Connector failed to parse passthrough command: %s", buf); goto retry; } - client = client_by_id(ci, client_id); + client = client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %ld to pass through", client_id); goto retry; } - passthrough_client(ci, client); + passthrough_client(cdata, client); goto retry; } if (cmdmatch(buf, "getfd")) { - send_fd(ci->serverfd, sockd); + send_fd(cdata->serverfd, sockd); goto retry; } @@ -655,7 +657,7 @@ retry: dealloc(buf); buf = json_dumps(json_msg, 0); realloc_strcat(&buf, "\n"); - send_client(ci, client_id, buf); + send_client(cdata, client_id, buf); json_decref(json_msg); buf = NULL; @@ -668,14 +670,16 @@ out: int connector(proc_instance_t *pi) { + cdata_t *cdata = ckzalloc(sizeof(cdata_t)); char *url = NULL, *port = NULL; ckpool_t *ckp = pi->ckp; int sockd, ret = 0; - conn_instance_t ci; const int on = 1; int tries = 0; LOGWARNING("%s connector starting", ckp->name); + ckp->data = cdata; + cdata->ckp = ckp; if (ckp->oldconnfd > 0) { sockd = ckp->oldconnfd; @@ -737,17 +741,18 @@ int connector(proc_instance_t *pi) goto out; } - cklock_init(&ci.lock); - memset(&ci, 0, sizeof(ci)); - ci.pi = pi; - ci.serverfd = sockd; - ci.nfds = 0; - mutex_init(&sender_lock); - cond_init(&sender_cond); - create_pthread(&ci.pth_sender, sender, &ci); - create_pthread(&ci.pth_receiver, receiver, &ci); - - ret = connector_loop(pi, &ci); + cklock_init(&cdata->lock); + cdata->pi = pi; + cdata->serverfd = sockd; + cdata->nfds = 0; + cdata->client_id = 1; + mutex_init(&cdata->sender_lock); + cond_init(&cdata->sender_cond); + create_pthread(&cdata->pth_sender, sender, &cdata); + create_pthread(&cdata->pth_receiver, receiver, &cdata); + + ret = connector_loop(pi, cdata); out: + dealloc(ckp->data); return process_exit(ckp, pi, ret); } diff --git a/src/generator.c b/src/generator.c index d7e238e6..524b1ec6 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1745,5 +1745,6 @@ int generator(proc_instance_t *pi) else ret = server_mode(ckp, pi); + dealloc(ckp->data); return process_exit(ckp, pi, ret); }