Browse Source

Make client epoll event handling scalable ckmsgq threads

master
Con Kolivas 9 years ago
parent
commit
75923f6daa
  1. 23
      src/connector.c

23
src/connector.c

@ -138,6 +138,9 @@ struct connector_data {
/* client message process queue */ /* client message process queue */
ckmsgq_t *cmpq; ckmsgq_t *cmpq;
/* client message event process queue */
ckmsgq_t *cevents;
/* For the linked list of pending sends */ /* For the linked list of pending sends */
sender_send_t *sender_sends; sender_send_t *sender_sends;
@ -562,12 +565,14 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
return client; return client;
} }
static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd, static void client_event_processor(ckpool_t *ckp, struct epoll_event *event)
struct epoll_event *event, const uint64_t id)
{ {
uint32_t events = event->events; const uint32_t events = event->events;
client_instance_t *client = ref_client_by_id(cdata, id); const uint64_t id = event->data.u64;
cdata_t *cdata = ckp->data;
client_instance_t *client;
client = ref_client_by_id(cdata, id);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id);
return; return;
@ -582,7 +587,7 @@ static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const in
if (parse_client_msg(ckp, cdata, client)) { if (parse_client_msg(ckp, cdata, client)) {
event->data.u64 = id; event->data.u64 = id;
event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event); epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event);
} else } else
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
} }
@ -621,7 +626,6 @@ out:
static void *receiver(void *arg) static void *receiver(void *arg)
{ {
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
ckpool_t *ckp = cdata->ckp;
struct epoll_event event; struct epoll_event event;
uint64_t serverfds, i; uint64_t serverfds, i;
int ret, epfd; int ret, epfd;
@ -672,7 +676,7 @@ static void *receiver(void *arg)
} }
continue; continue;
} }
process_client_events(ckp, cdata, epfd, &event, edu64); ckmsgq_add(cdata->cevents, &event);
} }
out: out:
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */
@ -1463,10 +1467,9 @@ out:
int connector(proc_instance_t *pi) int connector(proc_instance_t *pi)
{ {
cdata_t *cdata = ckzalloc(sizeof(cdata_t)); cdata_t *cdata = ckzalloc(sizeof(cdata_t));
int threads, sockd, ret = 0, i, tries = 0;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int sockd, ret = 0, i;
const int on = 1; const int on = 1;
int tries = 0;
LOGWARNING("%s connector starting", ckp->name); LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata; ckp->data = cdata;
@ -1576,6 +1579,8 @@ int connector(proc_instance_t *pi)
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond); cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
cdata->cevents = create_ckmsgqs(ckp, "cevent", &client_event_processor, threads);
create_pthread(&cdata->pth_receiver, receiver, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata);
cdata->start_time = time(NULL); cdata->start_time = time(NULL);

Loading…
Cancel
Save