diff --git a/src/connector.c b/src/connector.c index 58281bf4..64b77afd 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,6 +30,10 @@ struct client_instance { int64_t id; int fd; + /* Reference count for when this instance is used outside of the + * connector_data lock */ + int ref; + /* For dead_clients list */ struct client_instance *next; @@ -84,6 +88,25 @@ struct connector_data { typedef struct connector_data cdata_t; +/* Increase the reference count of instance */ +static void __inc_instance_ref(client_instance_t *client) +{ + client->ref++; +} + +/* Increase the reference count of instance */ +static void __dec_instance_ref(client_instance_t *client) +{ + client->ref--; +} + +static void dec_instance_ref(cdata_t *cdata, client_instance_t *client) +{ + ck_wlock(&cdata->lock); + __dec_instance_ref(client); + ck_wunlock(&cdata->lock); +} + /* Accepts incoming connections on the server socket and generates client * instances */ static int accept_client(cdata_t *cdata, int epfd) @@ -155,6 +178,11 @@ static int accept_client(cdata_t *cdata, int epfd) return 0; } + /* We increase the ref count on this client as epoll creates a pointer + * to it. We drop that reference when the socket is closed which + * removes it automatically from the epoll list. */ + __inc_instance_ref(client); + ck_wlock(&cdata->lock); client->id = cdata->client_id++; HASH_ADD_I64(cdata->clients, id, client); @@ -164,6 +192,7 @@ static int accept_client(cdata_t *cdata, int epfd) return 1; } +/* Client must hold a reference count */ static int drop_client(cdata_t *cdata, client_instance_t *client) { int fd; @@ -174,6 +203,9 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) Close(client->fd); HASH_DEL(cdata->clients, client); LL_PREPEND(cdata->dead_clients, client); + /* This is the reference to this client's presence in the + * epoll list. */ + __dec_instance_ref(client); } ck_wunlock(&cdata->lock); @@ -192,18 +224,34 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) } /* Invalidate this instance. Remove them from the hashtables we look up - * regularly but keep the instances in a linked list indefinitely in case we - * still reference any of its members. */ + * regularly but keep the instances in a linked list until their ref count + * drops to zero when we can remove them lazily. Client must hold a reference + * count. */ static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { + client_instance_t *tmp; + drop_client(cdata, client); if (ckp->passthrough) return; stratifier_drop_client(ckp, client->id); + + /* Cull old unused clients lazily when there are no more reference + * counts for them. */ + ck_wlock(&cdata->lock); + LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { + if (!client->ref) { + LL_DELETE(cdata->dead_clients, client); + LOGINFO("Connector discarding client %ld", client->id); + free(client); + } + } + ck_wunlock(&cdata->lock); } static void send_client(cdata_t *cdata, int64_t id, char *buf); +/* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { int buflen, ret, selfail = 0; @@ -412,9 +460,7 @@ void *sender(void *arg) if (fd == -1) { LOGDEBUG("Discarding message sent to invalidated client"); - free(sender_send->buf); - free(sender_send); - continue; + goto contfree; } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout @@ -425,15 +471,13 @@ void *sender(void *arg) if (ret < 0) { LOGINFO("Client id %d fd %d interrupted", client->id, fd); invalidate_client(ckp, cdata, client); - free(sender_send->buf); - free(sender_send); - continue; + goto contfree; } LOGDEBUG("Client %d not ready for writes", client->id); /* Append it to the tail of the delayed sends list. * This is the only function that alters it so no - * locking is required. */ + * locking is required. Keep the client ref. */ DL_APPEND(cdata->delayed_sends, sender_send); continue; } @@ -448,8 +492,10 @@ void *sender(void *arg) ofs += ret; sender_send->len -= ret; } +contfree: free(sender_send->buf); free(sender_send); + dec_instance_ref(cdata, client); } return NULL; @@ -474,11 +520,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) return; } - ck_rlock(&cdata->lock); + ck_ilock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) + if (likely(client)) { + ck_ulock(&cdata->lock); fd = client->fd; - ck_runlock(&cdata->lock); + /* Grab a reference to this client until the sender_send has + * completed processing. */ + __inc_instance_ref(client); + ck_dwilock(&cdata->lock); + } + ck_uilock(&cdata->lock); if (unlikely(fd == -1)) { ckpool_t *ckp = cdata->ckp; @@ -506,13 +558,18 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) mutex_unlock(&cdata->sender_lock); } -static client_instance_t *client_by_id(cdata_t *cdata, int64_t id) +static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; - ck_rlock(&cdata->lock); + ck_ilock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - ck_runlock(&cdata->lock); + if (client) { + ck_ulock(&cdata->lock); + __inc_instance_ref(client); + ck_dwilock(&cdata->lock); + } + ck_uilock(&cdata->lock); return client; } @@ -605,12 +662,13 @@ retry: goto retry; } client_id = client_id64 & 0xffffffffll; - client = client_by_id(cdata, client_id); + client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %ld to drop", client_id); goto retry; } ret = drop_client(cdata, client); + dec_instance_ref(cdata, client); if (ret >= 0) LOGINFO("Connector dropped client id: %ld", client_id); goto retry; @@ -623,12 +681,13 @@ retry: LOGDEBUG("Connector failed to parse passthrough command: %s", buf); goto retry; } - client = client_by_id(cdata, client_id); + client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %ld to pass through", client_id); goto retry; } passthrough_client(cdata, client); + dec_instance_ref(cdata, client); goto retry; } if (cmdmatch(buf, "getfd")) {