diff --git a/src/connector.c b/src/connector.c index d3f6a442..de81d2a7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -138,6 +138,9 @@ struct connector_data { /* client message process queue */ ckmsgq_t *cmpq; + /* client message event process queue */ + ckmsgq_t *cevents; + /* For the linked list of pending sends */ sender_send_t *sender_sends; @@ -562,12 +565,14 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) return client; } -static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd, - struct epoll_event *event, const uint64_t id) +static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) { - uint32_t events = event->events; - client_instance_t *client = ref_client_by_id(cdata, id); + const uint32_t events = event->events; + const uint64_t id = event->data.u64; + cdata_t *cdata = ckp->data; + client_instance_t *client; + client = ref_client_by_id(cdata, id); if (unlikely(!client)) { LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); return; @@ -582,7 +587,7 @@ static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const in if (parse_client_msg(ckp, cdata, client)) { event->data.u64 = id; event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event); + epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); } else invalidate_client(ckp, cdata, client); } @@ -621,7 +626,6 @@ out: static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; - ckpool_t *ckp = cdata->ckp; struct epoll_event event; uint64_t serverfds, i; int ret, epfd; @@ -672,7 +676,7 @@ static void *receiver(void *arg) } continue; } - process_client_events(ckp, cdata, epfd, &event, edu64); + ckmsgq_add(cdata->cevents, &event); } out: /* We shouldn't get here unless there's an error */ @@ -1463,10 +1467,9 @@ out: int connector(proc_instance_t *pi) { cdata_t *cdata = ckzalloc(sizeof(cdata_t)); + int threads, sockd, ret = 0, i, tries = 0; ckpool_t *ckp = pi->ckp; - int sockd, ret = 0, i; const int on = 1; - int tries = 0; LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; @@ -1576,6 +1579,8 @@ int connector(proc_instance_t *pi) mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + cdata->cevents = create_ckmsgqs(ckp, "cevent", &client_event_processor, threads); create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL);