From 6986ba95fc58de4ac11d8f6d124896f83509ef0a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 09:34:05 +1100 Subject: [PATCH 01/10] Make client message processing a ckmsq to minimise connector_loop holdup --- src/connector.c | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/connector.c b/src/connector.c index c93f6ffe..25000977 100644 --- a/src/connector.c +++ b/src/connector.c @@ -135,6 +135,9 @@ struct connector_data { int64_t client_id; + /* client message process queue */ + ckmsgq_t *cmpq; + /* For the linked list of pending sends */ sender_send_t *sender_sends; @@ -1200,18 +1203,11 @@ out: return ret; } -static void process_client_msg(cdata_t *cdata, const char *buf) +static void client_message_processor(ckpool_t *ckp, json_t *json_msg) { int64_t client_id; - json_t *json_msg; char *msg; - json_msg = json_loads(buf, 0, NULL); - if (unlikely(!json_msg)) { - LOGWARNING("Invalid json message in process_client_msg: %s", buf); - return; - } - /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); @@ -1221,10 +1217,22 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); - send_client(cdata, client_id, msg); + send_client(ckp->data, client_id, msg); json_decref(json_msg); } +static void process_client_msg(cdata_t *cdata, const char *buf) +{ + json_t *json_msg = json_loads(buf, 0, NULL); + + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message in process_client_msg: %s", buf); + return; + } + + ckmsgq_add(cdata->cmpq, json_msg); +} + /* Send the passthrough the terminate node.method */ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) { @@ -1539,6 +1547,8 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); + cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); + if (ckp->remote && !setup_upstream(ckp, cdata)) { ret = 1; goto out; From 8007f6524bb59eb789dd7bd8cdaa93cf339b95b1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 09:47:15 +1100 Subject: [PATCH 02/10] Writes blocking in send_sender_send are not an unlikely event --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 25000977..1ea69dc7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -691,7 +691,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende while (sender_send->len) { int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); - if (unlikely(ret < 1)) { + if (ret < 1) { /* Invalidate clients that block for more than 60 seconds */ if (unlikely(client->blocked_time && now_t - client->blocked_time >= 60)) { LOGNOTICE("Client id %"PRId64" fd %d blocked for >60 seconds, disconnecting", From f89ace7a3d62c4639ef5527eda8ad3c2d721c0dc Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 10:10:00 +1100 Subject: [PATCH 03/10] Silence uninitialised byte warnings in valgrind --- src/libckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 25593c08..e14ffae4 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -971,7 +971,7 @@ int wait_close(int sockd, int timeout) /* Emulate a select read wait for high fds that select doesn't support. */ int wait_read_select(int sockd, float timeout) { - struct epoll_event event; + struct epoll_event event = {0, {NULL}}; int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); @@ -1053,7 +1053,7 @@ out: /* Emulate a select write wait for high fds that select doesn't support */ int wait_write_select(int sockd, float timeout) { - struct epoll_event event; + struct epoll_event event = {0, {NULL}}; int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); From 0cb8d0af486d8e5614832749f03a25a6c1801794 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 10:21:25 +1100 Subject: [PATCH 04/10] Fix client buffer leak in connector --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 1ea69dc7..781ae906 100644 --- a/src/connector.c +++ b/src/connector.c @@ -208,14 +208,14 @@ static client_instance_t *recruit_client(cdata_t *cdata) } else LOGDEBUG("Connector recycled client instance"); - client->buf = realloc(client->buf, PAGESIZE); - client->buf[0] = '\0'; + client->buf = ckzalloc(PAGESIZE); return client; } static void __recycle_client(cdata_t *cdata, client_instance_t *client) { + dealloc(client->buf); memset(client, 0, sizeof(client_instance_t)); client->id = -1; DL_APPEND(cdata->recycled_clients, client); From db42a137e1c9ac5e08fabd2990eeae16e8a585db Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 10:41:45 +1100 Subject: [PATCH 05/10] Implement a ckdbflush message for emergency use only --- src/ckpool.c | 4 ++++ src/stratifier.c | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index fe457b44..84a85b01 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -479,6 +479,10 @@ retry: msg = send_recv_proc(ckp->connector, "stats"); send_unix_msg(sockd, msg); dealloc(msg); + } else if (cmdmatch(buf, "ckdbflush")) { + LOGWARNING("Received ckdb flush message"); + send_procmsg(ckp->stratifier, buf); + send_unix_msg(sockd, "flushing"); } else { LOGINFO("Listener received unhandled message: %s", buf); send_unix_msg(sockd, "unknown"); diff --git a/src/stratifier.c b/src/stratifier.c index ecc92702..4a63b713 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3916,6 +3916,27 @@ static void get_poolstats(sdata_t *sdata, int *sockd) static void srecv_process(ckpool_t *ckp, char *buf); +/* For emergency use only, flushes all pending ckdbq messages */ +static void ckdbq_flush(sdata_t *sdata) +{ + ckmsgq_t *ckdbq = sdata->ckdbq; + int flushed = 0; + + mutex_lock(ckdbq->lock); + while (ckdbq->msgs) { + ckmsg_t *msg = ckdbq->msgs; + + DL_DELETE(ckdbq->msgs, msg); + free(msg->data); + free(msg); + ckdbq->messages--; + flushed++; + } + mutex_unlock(ckdbq->lock); + + LOGWARNING("Flushed %d messages from ckdb queue", flushed); +} + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { sdata_t *sdata = ckp->data; @@ -4080,6 +4101,8 @@ retry: dead_proxy(sdata, buf); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); + } else if (cmdmatch(buf, "ckdbflush")) { + ckdbq_flush(sdata); } else LOGWARNING("Unhandled stratifier message: %s", buf); goto retry; From abe0aef095c63b1ce263e3b90d7dc2ff7ce9bc9f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 13 Feb 2016 14:34:09 +1100 Subject: [PATCH 06/10] Bump ckpool version to 0.9.2 signifying stable code point --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index 31c47cb0..e0efb34a 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.9.1, kernel@kolivas.org) +AC_INIT(ckpool, 0.9.2, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) From ebedb77629cf97510d86e7030e1533e82b1da81c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 18 Feb 2016 11:30:01 +1100 Subject: [PATCH 07/10] Cache reused variables in creceiver --- src/connector.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/connector.c b/src/connector.c index 781ae906..e2820fe7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -600,6 +600,8 @@ void *receiver(void *arg) while (42) { client_instance_t *client; + uint32_t events; + uint64_t edu64; while (unlikely(!cdata->accept)) cksleep_ms(10); @@ -612,28 +614,30 @@ void *receiver(void *arg) /* Nothing to service, still very unlikely */ continue; } - if (event.data.u64 < serverfds) { - ret = accept_client(cdata, epfd, event.data.u64); + edu64 = event.data.u64; + if (edu64 < serverfds) { + ret = accept_client(cdata, epfd, edu64); if (unlikely(ret < 0)) { LOGEMERG("FATAL: Failed to accept_client in receiver"); break; } continue; } - client = ref_client_by_id(cdata, event.data.u64); + client = ref_client_by_id(cdata, edu64); if (unlikely(!client)) { - LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", edu64); continue; } if (unlikely(client->invalid)) goto noparse; /* We can have both messages and read hang ups so process the * message first. */ - if (likely(event.events & EPOLLIN)) + events = event.events; + if (likely(events & EPOLLIN)) parse_client_msg(cdata, client); if (unlikely(client->invalid)) goto noparse; - if (unlikely(event.events & EPOLLERR)) { + if (unlikely(events & EPOLLERR)) { socklen_t errlen = sizeof(int); int error = 0; @@ -648,11 +652,11 @@ void *receiver(void *arg) client->id, client->fd, error, strerror(error)); } invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(event.events & EPOLLHUP)) { + } else if (unlikely(events & EPOLLHUP)) { /* Client connection reset by peer */ LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(event.events & EPOLLRDHUP)) { + } else if (unlikely(events & EPOLLRDHUP)) { /* Client disconnected by peer */ LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); invalidate_client(cdata->pi->ckp, cdata, client); From 39070094cf8d3997affa5f7230759274d9e1fc25 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 18 Feb 2016 11:39:05 +1100 Subject: [PATCH 08/10] Invalidate clients in common location after parse_client_msg --- src/connector.c | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/connector.c b/src/connector.c index e2820fe7..984c8f03 100644 --- a/src/connector.c +++ b/src/connector.c @@ -459,10 +459,10 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) } } -/* Client is holding a reference count from being on the epoll list */ -static void parse_client_msg(cdata_t *cdata, client_instance_t *client) +/* Client is holding a reference count from being on the epoll list. Returns + * true if we will still be receiving messages from this client. */ +static bool __parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - ckpool_t *ckp = cdata->ckp; int buflen, ret; json_t *val; char *eol; @@ -472,8 +472,7 @@ retry: if (!client->remote) { LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1)); } @@ -481,11 +480,10 @@ retry: ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE); if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) - return; + return true; LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, cdata, client); - return; + return false; } client->bufofs += ret; reparse: @@ -497,8 +495,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) { @@ -506,8 +503,7 @@ reparse: LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf); send_client(cdata, client->id, buf); - invalidate_client(ckp, cdata, client); - return; + return false; } else { char *s; @@ -549,6 +545,12 @@ reparse: goto retry; } +static void parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) +{ + if (unlikely(!__parse_client_msg(ckp, cdata, client))) + invalidate_client(ckp, cdata, client); +} + static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; @@ -571,6 +573,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; + ckpool_t *ckp = cdata->ckp; struct epoll_event event; uint64_t serverfds, i; int ret, epfd; @@ -634,7 +637,7 @@ void *receiver(void *arg) * message first. */ events = event.events; if (likely(events & EPOLLIN)) - parse_client_msg(cdata, client); + parse_client_msg(ckp, cdata, client); if (unlikely(client->invalid)) goto noparse; if (unlikely(events & EPOLLERR)) { From f0e07c24a4fcd66095cdbcc382be0e8b78dc5cf5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 18 Feb 2016 13:41:02 +1100 Subject: [PATCH 09/10] Make client event handling a oneshot event that is rearmed to allow us to thread the work --- src/connector.c | 109 ++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/src/connector.c b/src/connector.c index 984c8f03..d3f6a442 100644 --- a/src/connector.c +++ b/src/connector.c @@ -301,7 +301,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; @@ -461,7 +461,7 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) /* Client is holding a reference count from being on the epoll list. Returns * true if we will still be receiving messages from this client. */ -static bool __parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) +static bool parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { int buflen, ret; json_t *val; @@ -545,12 +545,6 @@ reparse: goto retry; } -static void parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) -{ - if (unlikely(!__parse_client_msg(ckp, cdata, client))) - invalidate_client(ckp, cdata, client); -} - static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) { client_instance_t *client; @@ -568,9 +562,63 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) return client; } +static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd, + struct epoll_event *event, const uint64_t id) +{ + uint32_t events = event->events; + client_instance_t *client = ref_client_by_id(cdata, id); + + if (unlikely(!client)) { + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); + return; + } + if (unlikely(client->invalid)) + goto out; + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(events & EPOLLIN)) { + /* Rearm the client for epoll events if we have successfully + * parsed a message from it */ + if (parse_client_msg(ckp, cdata, client)) { + event->data.u64 = id; + event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event); + } else + invalidate_client(ckp, cdata, client); + } + if (unlikely(client->invalid)) + goto out; + if (unlikely(events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } +out: + dec_instance_ref(cdata, client); +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ -void *receiver(void *arg) +static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; ckpool_t *ckp = cdata->ckp; @@ -602,8 +650,6 @@ void *receiver(void *arg) cksleep_ms(1); while (42) { - client_instance_t *client; - uint32_t events; uint64_t edu64; while (unlikely(!cdata->accept)) @@ -626,46 +672,7 @@ void *receiver(void *arg) } continue; } - client = ref_client_by_id(cdata, edu64); - if (unlikely(!client)) { - LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", edu64); - continue; - } - if (unlikely(client->invalid)) - goto noparse; - /* We can have both messages and read hang ups so process the - * message first. */ - events = event.events; - if (likely(events & EPOLLIN)) - parse_client_msg(ckp, cdata, client); - if (unlikely(client->invalid)) - goto noparse; - if (unlikely(events & EPOLLERR)) { - socklen_t errlen = sizeof(int); - int error = 0; - - /* See what type of error this is and raise the log - * level of the message if it's unexpected. */ - getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error != 104) { - LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } else { - LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(events & EPOLLHUP)) { - /* Client connection reset by peer */ - LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(events & EPOLLRDHUP)) { - /* Client disconnected by peer */ - LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } -noparse: - dec_instance_ref(cdata, client); + process_client_events(ckp, cdata, epfd, &event, edu64); } out: /* We shouldn't get here unless there's an error */ From 75923f6daad268e02b04b57c155f779c83766d7e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 18 Feb 2016 13:57:46 +1100 Subject: [PATCH 10/10] Make client epoll event handling scalable ckmsgq threads --- src/connector.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/connector.c b/src/connector.c index d3f6a442..de81d2a7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -138,6 +138,9 @@ struct connector_data { /* client message process queue */ ckmsgq_t *cmpq; + /* client message event process queue */ + ckmsgq_t *cevents; + /* For the linked list of pending sends */ sender_send_t *sender_sends; @@ -562,12 +565,14 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) return client; } -static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const int epfd, - struct epoll_event *event, const uint64_t id) +static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) { - uint32_t events = event->events; - client_instance_t *client = ref_client_by_id(cdata, id); + const uint32_t events = event->events; + const uint64_t id = event->data.u64; + cdata_t *cdata = ckp->data; + client_instance_t *client; + client = ref_client_by_id(cdata, id); if (unlikely(!client)) { LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); return; @@ -582,7 +587,7 @@ static inline void process_client_events(ckpool_t *ckp, cdata_t *cdata, const in if (parse_client_msg(ckp, cdata, client)) { event->data.u64 = id; event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - epoll_ctl(epfd, EPOLL_CTL_MOD, client->fd, event); + epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); } else invalidate_client(ckp, cdata, client); } @@ -621,7 +626,6 @@ out: static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; - ckpool_t *ckp = cdata->ckp; struct epoll_event event; uint64_t serverfds, i; int ret, epfd; @@ -672,7 +676,7 @@ static void *receiver(void *arg) } continue; } - process_client_events(ckp, cdata, epfd, &event, edu64); + ckmsgq_add(cdata->cevents, &event); } out: /* We shouldn't get here unless there's an error */ @@ -1463,10 +1467,9 @@ out: int connector(proc_instance_t *pi) { cdata_t *cdata = ckzalloc(sizeof(cdata_t)); + int threads, sockd, ret = 0, i, tries = 0; ckpool_t *ckp = pi->ckp; - int sockd, ret = 0, i; const int on = 1; - int tries = 0; LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; @@ -1576,6 +1579,8 @@ int connector(proc_instance_t *pi) mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + cdata->cevents = create_ckmsgqs(ckp, "cevent", &client_event_processor, threads); create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL);