kanoi 10 years ago
parent
commit
20397ff4a4
  1. 35
      src/connector.c

35
src/connector.c

@ -223,12 +223,17 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
cdata->nfds, fd, no_clients, client->address_name, port); cdata->nfds, fd, no_clients, client->address_name, port);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
client->fd = fd; client->fd = fd;
event.data.ptr = client; event.data.u64 = client->id;
event.events = EPOLLIN; event.events = EPOLLIN | EPOLLRDHUP;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
recycle_client(cdata, client);
return 0; return 0;
} }
@ -237,12 +242,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
* removes it automatically from the epoll list. */ * removes it automatically from the epoll list. */
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
return 1; return 1;
} }
@ -442,8 +441,7 @@ void *receiver(void *arg)
serverfds = cdata->ckp->serverurls; serverfds = cdata->ckp->serverurls;
/* Add all the serverfds to the epoll */ /* Add all the serverfds to the epoll */
for (i = 0; i < serverfds; i++) { for (i = 0; i < serverfds; i++) {
/* The small values will be easily identifiable compared to /* The small values will be less than the first client ids */
* pointers */
event.data.u64 = i; event.data.u64 = i;
event.events = EPOLLIN; event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
@ -489,13 +487,12 @@ void *receiver(void *arg)
} }
continue; continue;
} }
client = event.data.ptr; client = ref_client_by_id(cdata, event.data.u64);
/* Recheck this client still exists in the same form when it if (unlikely(!client)) {
* was queued. */ LOGWARNING("Failed to find client by id in receiver!");
client = ref_client_by_id(cdata, client->id);
if (unlikely(!client))
continue; continue;
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { }
if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
/* Client disconnected */ /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd); LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(cdata->pi->ckp, cdata, client); invalidate_client(cdata->pi->ckp, cdata, client);
@ -970,7 +967,9 @@ int connector(proc_instance_t *pi)
cklock_init(&cdata->lock); cklock_init(&cdata->lock);
cdata->pi = pi; cdata->pi = pi;
cdata->nfds = 0; cdata->nfds = 0;
cdata->client_id = 1; /* Set the client id to the highest serverurl count to distinguish
* them from the server fds in epoll. */
cdata->client_id = ckp->serverurls;
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond); cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);

Loading…
Cancel
Save