Browse Source

Reference clients in the epoll list by their client id to avoid double lookup and possible wrong client selection

master
Con Kolivas 10 years ago
parent
commit
75d24d1d07
  1. 31
      src/connector.c

31
src/connector.c

@ -223,12 +223,17 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
cdata->nfds, fd, no_clients, client->address_name, port); cdata->nfds, fd, no_clients, client->address_name, port);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
client->fd = fd; client->fd = fd;
event.data.ptr = client; event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
recycle_client(cdata, client);
return 0; return 0;
} }
@ -237,12 +242,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
* removes it automatically from the epoll list. */ * removes it automatically from the epoll list. */
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
return 1; return 1;
} }
@ -442,8 +441,7 @@ void *receiver(void *arg)
serverfds = cdata->ckp->serverurls; serverfds = cdata->ckp->serverurls;
/* Add all the serverfds to the epoll */ /* Add all the serverfds to the epoll */
for (i = 0; i < serverfds; i++) { for (i = 0; i < serverfds; i++) {
/* The small values will be easily identifiable compared to /* The small values will be less than the first client ids */
* pointers */
event.data.u64 = i; event.data.u64 = i;
event.events = EPOLLIN; event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
@ -489,12 +487,11 @@ void *receiver(void *arg)
} }
continue; continue;
} }
client = event.data.ptr; client = ref_client_by_id(cdata, event.data.u64);
/* Recheck this client still exists in the same form when it if (unlikely(!client)) {
* was queued. */ LOGWARNING("Failed to find client by id in receiver!");
client = ref_client_by_id(cdata, client->id);
if (unlikely(!client))
continue; continue;
}
if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
/* Client disconnected */ /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd); LOGDEBUG("Client fd %d HUP in epoll", client->fd);
@ -970,7 +967,9 @@ int connector(proc_instance_t *pi)
cklock_init(&cdata->lock); cklock_init(&cdata->lock);
cdata->pi = pi; cdata->pi = pi;
cdata->nfds = 0; cdata->nfds = 0;
cdata->client_id = 1; /* Set the client id to the highest serverurl count to distinguish
* them from the server fds in epoll. */
cdata->client_id = ckp->serverurls;
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond); cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);

Loading…
Cancel
Save