Browse Source

Use reference counting to know when we can drop client structures in the connector

master
Con Kolivas 10 years ago
parent
commit
6c47d61bb1
  1. 93
      src/connector.c

93
src/connector.c

@ -30,6 +30,10 @@ struct client_instance {
int64_t id; int64_t id;
int fd; int fd;
/* Reference count for when this instance is used outside of the
* connector_data lock */
int ref;
/* For dead_clients list */ /* For dead_clients list */
struct client_instance *next; struct client_instance *next;
@ -84,6 +88,25 @@ struct connector_data {
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
/* Increase the reference count of instance */
static void __inc_instance_ref(client_instance_t *client)
{
client->ref++;
}
/* Increase the reference count of instance */
static void __dec_instance_ref(client_instance_t *client)
{
client->ref--;
}
static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
{
ck_wlock(&cdata->lock);
__dec_instance_ref(client);
ck_wunlock(&cdata->lock);
}
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, int epfd) static int accept_client(cdata_t *cdata, int epfd)
@ -155,6 +178,11 @@ static int accept_client(cdata_t *cdata, int epfd)
return 0; return 0;
} }
/* We increase the ref count on this client as epoll creates a pointer
* to it. We drop that reference when the socket is closed which
* removes it automatically from the epoll list. */
__inc_instance_ref(client);
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
client->id = cdata->client_id++; client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client); HASH_ADD_I64(cdata->clients, id, client);
@ -164,6 +192,7 @@ static int accept_client(cdata_t *cdata, int epfd)
return 1; return 1;
} }
/* Client must hold a reference count */
static int drop_client(cdata_t *cdata, client_instance_t *client) static int drop_client(cdata_t *cdata, client_instance_t *client)
{ {
int fd; int fd;
@ -174,6 +203,9 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
Close(client->fd); Close(client->fd);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
LL_PREPEND(cdata->dead_clients, client); LL_PREPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the
* epoll list. */
__dec_instance_ref(client);
} }
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
@ -192,18 +224,34 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
} }
/* Invalidate this instance. Remove them from the hashtables we look up /* Invalidate this instance. Remove them from the hashtables we look up
* regularly but keep the instances in a linked list indefinitely in case we * regularly but keep the instances in a linked list until their ref count
* still reference any of its members. */ * drops to zero when we can remove them lazily. Client must hold a reference
* count. */
static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{ {
client_instance_t *tmp;
drop_client(cdata, client); drop_client(cdata, client);
if (ckp->passthrough) if (ckp->passthrough)
return; return;
stratifier_drop_client(ckp, client->id); stratifier_drop_client(ckp, client->id);
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
ck_wlock(&cdata->lock);
LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
if (!client->ref) {
LL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %ld", client->id);
free(client);
}
}
ck_wunlock(&cdata->lock);
} }
static void send_client(cdata_t *cdata, int64_t id, char *buf); static void send_client(cdata_t *cdata, int64_t id, char *buf);
/* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{ {
int buflen, ret, selfail = 0; int buflen, ret, selfail = 0;
@ -412,9 +460,7 @@ void *sender(void *arg)
if (fd == -1) { if (fd == -1) {
LOGDEBUG("Discarding message sent to invalidated client"); LOGDEBUG("Discarding message sent to invalidated client");
free(sender_send->buf); goto contfree;
free(sender_send);
continue;
} }
/* If this socket is not ready to receive data from us, put the /* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout * send back on the tail of the list and decrease the timeout
@ -425,15 +471,13 @@ void *sender(void *arg)
if (ret < 0) { if (ret < 0) {
LOGINFO("Client id %d fd %d interrupted", client->id, fd); LOGINFO("Client id %d fd %d interrupted", client->id, fd);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
free(sender_send->buf); goto contfree;
free(sender_send);
continue;
} }
LOGDEBUG("Client %d not ready for writes", client->id); LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the delayed sends list. /* Append it to the tail of the delayed sends list.
* This is the only function that alters it so no * This is the only function that alters it so no
* locking is required. */ * locking is required. Keep the client ref. */
DL_APPEND(cdata->delayed_sends, sender_send); DL_APPEND(cdata->delayed_sends, sender_send);
continue; continue;
} }
@ -448,8 +492,10 @@ void *sender(void *arg)
ofs += ret; ofs += ret;
sender_send->len -= ret; sender_send->len -= ret;
} }
contfree:
free(sender_send->buf); free(sender_send->buf);
free(sender_send); free(sender_send);
dec_instance_ref(cdata, client);
} }
return NULL; return NULL;
@ -474,11 +520,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
return; return;
} }
ck_rlock(&cdata->lock); ck_ilock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client); HASH_FIND_I64(cdata->clients, &id, client);
if (likely(client)) if (likely(client)) {
ck_ulock(&cdata->lock);
fd = client->fd; fd = client->fd;
ck_runlock(&cdata->lock); /* Grab a reference to this client until the sender_send has
* completed processing. */
__inc_instance_ref(client);
ck_dwilock(&cdata->lock);
}
ck_uilock(&cdata->lock);
if (unlikely(fd == -1)) { if (unlikely(fd == -1)) {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
@ -506,13 +558,18 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
} }
static client_instance_t *client_by_id(cdata_t *cdata, int64_t id) static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{ {
client_instance_t *client; client_instance_t *client;
ck_rlock(&cdata->lock); ck_ilock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client); HASH_FIND_I64(cdata->clients, &id, client);
ck_runlock(&cdata->lock); if (client) {
ck_ulock(&cdata->lock);
__inc_instance_ref(client);
ck_dwilock(&cdata->lock);
}
ck_uilock(&cdata->lock);
return client; return client;
} }
@ -605,12 +662,13 @@ retry:
goto retry; goto retry;
} }
client_id = client_id64 & 0xffffffffll; client_id = client_id64 & 0xffffffffll;
client = client_by_id(cdata, client_id); client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to drop", client_id); LOGINFO("Connector failed to find client id %ld to drop", client_id);
goto retry; goto retry;
} }
ret = drop_client(cdata, client); ret = drop_client(cdata, client);
dec_instance_ref(cdata, client);
if (ret >= 0) if (ret >= 0)
LOGINFO("Connector dropped client id: %ld", client_id); LOGINFO("Connector dropped client id: %ld", client_id);
goto retry; goto retry;
@ -623,12 +681,13 @@ retry:
LOGDEBUG("Connector failed to parse passthrough command: %s", buf); LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry; goto retry;
} }
client = client_by_id(cdata, client_id); client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to pass through", client_id); LOGINFO("Connector failed to find client id %ld to pass through", client_id);
goto retry; goto retry;
} }
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client);
goto retry; goto retry;
} }
if (cmdmatch(buf, "getfd")) { if (cmdmatch(buf, "getfd")) {

Loading…
Cancel
Save