Browse Source

Remove clients from the epoll list when invalidating them to avoid a receiver event and check they still exist in their original form on getting a receiver event

master
Con Kolivas 10 years ago
parent
commit
655f82f09d
  1. 43
      src/connector.c

43
src/connector.c

@ -72,6 +72,8 @@ struct connector_data {
int *serverfd; int *serverfd;
/* All time count of clients connected */ /* All time count of clients connected */
int nfds; int nfds;
/* The epoll fd */
int epfd;
bool accept; bool accept;
pthread_t pth_sender; pthread_t pth_sender;
@ -147,6 +149,7 @@ static client_instance_t *recruit_client(cdata_t *cdata)
static void __recycle_client(cdata_t *cdata, client_instance_t *client) static void __recycle_client(cdata_t *cdata, client_instance_t *client)
{ {
memset(client, 0, sizeof(client_instance_t)); memset(client, 0, sizeof(client_instance_t));
client->id = -1;
DL_APPEND(cdata->recycled_clients, client); DL_APPEND(cdata->recycled_clients, client);
} }
@ -255,6 +258,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
if (fd != -1) { if (fd != -1) {
client_id = client->id; client_id = client->id;
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL);
Close(client->fd); Close(client->fd);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
@ -409,6 +413,19 @@ reparse:
goto retry; goto retry;
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */ * handles the incoming messages */
void *receiver(void *arg) void *receiver(void *arg)
@ -422,7 +439,7 @@ void *receiver(void *arg)
rename_proc("creceiver"); rename_proc("creceiver");
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0) { if (epfd < 0) {
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL; return NULL;
@ -478,13 +495,18 @@ void *receiver(void *arg)
continue; continue;
} }
client = event.data.ptr; client = event.data.ptr;
/* Recheck this client still exists in the same form when it
* was queued. */
client = ref_client_by_id(cdata, client->id);
if (unlikely(!client))
continue;
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
/* Client disconnected */ /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd); LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(cdata->pi->ckp, cdata, client); invalidate_client(cdata->pi->ckp, cdata, client);
continue; } else
} parse_client_msg(cdata, client);
parse_client_msg(cdata, client); dec_instance_ref(cdata, client);
} }
return NULL; return NULL;
} }
@ -645,19 +667,6 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
mutex_unlock(&cdata->sender_lock); mutex_unlock(&cdata->sender_lock);
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{ {
char *buf; char *buf;

Loading…
Cancel
Save