diff --git a/configure.ac b/configure.ac index 31c47cb0..e0efb34a 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.9.1, kernel@kolivas.org) +AC_INIT(ckpool, 0.9.2, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) diff --git a/src/ckpool.c b/src/ckpool.c index fe457b44..84a85b01 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -479,6 +479,10 @@ retry: msg = send_recv_proc(ckp->connector, "stats"); send_unix_msg(sockd, msg); dealloc(msg); + } else if (cmdmatch(buf, "ckdbflush")) { + LOGWARNING("Received ckdb flush message"); + send_procmsg(ckp->stratifier, buf); + send_unix_msg(sockd, "flushing"); } else { LOGINFO("Listener received unhandled message: %s", buf); send_unix_msg(sockd, "unknown"); diff --git a/src/connector.c b/src/connector.c index c93f6ffe..de81d2a7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -135,6 +135,12 @@ struct connector_data { int64_t client_id; + /* client message process queue */ + ckmsgq_t *cmpq; + + /* client message event process queue */ + ckmsgq_t *cevents; + /* For the linked list of pending sends */ sender_send_t *sender_sends; @@ -205,14 +211,14 @@ static client_instance_t *recruit_client(cdata_t *cdata) } else LOGDEBUG("Connector recycled client instance"); - client->buf = realloc(client->buf, PAGESIZE); - client->buf[0] = '\0'; + client->buf = ckzalloc(PAGESIZE); return client; } static void __recycle_client(cdata_t *cdata, client_instance_t *client) { + dealloc(client->buf); memset(client, 0, sizeof(client_instance_t)); client->id = -1; DL_APPEND(cdata->recycled_clients, client); @@ -298,7 +304,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; @@ -456,10 +462,10 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val) } } -/* Client is holding a reference count from being on the epoll list */ -static void parse_client_msg(cdata_t *cdata, client_instance_t *client) +/* Client is holding a reference count from being on the epoll list. Returns + * true if we will still be receiving messages from this client. */ +static bool parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - ckpool_t *ckp = cdata->ckp; int buflen, ret; json_t *val; char *eol; @@ -469,8 +475,7 @@ retry: if (!client->remote) { LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1)); } @@ -478,11 +483,10 @@ retry: ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE); if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) - return; + return true; LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, cdata, client); - return; + return false; } client->bufofs += ret; reparse: @@ -494,8 +498,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); - invalidate_client(ckp, cdata, client); - return; + return false; } if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) { @@ -503,8 +506,7 @@ reparse: LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf); send_client(cdata, client->id, buf); - invalidate_client(ckp, cdata, client); - return; + return false; } else { char *s; @@ -563,9 +565,65 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) return client; } +static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) +{ + const uint32_t events = event->events; + const uint64_t id = event->data.u64; + cdata_t *cdata = ckp->data; + client_instance_t *client; + + client = ref_client_by_id(cdata, id); + if (unlikely(!client)) { + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); + return; + } + if (unlikely(client->invalid)) + goto out; + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(events & EPOLLIN)) { + /* Rearm the client for epoll events if we have successfully + * parsed a message from it */ + if (parse_client_msg(ckp, cdata, client)) { + event->data.u64 = id; + event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); + } else + invalidate_client(ckp, cdata, client); + } + if (unlikely(client->invalid)) + goto out; + if (unlikely(events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } +out: + dec_instance_ref(cdata, client); +} + /* Waits on fds ready to read on from the list stored in conn_instance and * handles the incoming messages */ -void *receiver(void *arg) +static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; struct epoll_event event; @@ -596,7 +654,7 @@ void *receiver(void *arg) cksleep_ms(1); while (42) { - client_instance_t *client; + uint64_t edu64; while (unlikely(!cdata->accept)) cksleep_ms(10); @@ -609,53 +667,16 @@ void *receiver(void *arg) /* Nothing to service, still very unlikely */ continue; } - if (event.data.u64 < serverfds) { - ret = accept_client(cdata, epfd, event.data.u64); + edu64 = event.data.u64; + if (edu64 < serverfds) { + ret = accept_client(cdata, epfd, edu64); if (unlikely(ret < 0)) { LOGEMERG("FATAL: Failed to accept_client in receiver"); break; } continue; } - client = ref_client_by_id(cdata, event.data.u64); - if (unlikely(!client)) { - LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); - continue; - } - if (unlikely(client->invalid)) - goto noparse; - /* We can have both messages and read hang ups so process the - * message first. */ - if (likely(event.events & EPOLLIN)) - parse_client_msg(cdata, client); - if (unlikely(client->invalid)) - goto noparse; - if (unlikely(event.events & EPOLLERR)) { - socklen_t errlen = sizeof(int); - int error = 0; - - /* See what type of error this is and raise the log - * level of the message if it's unexpected. */ - getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error != 104) { - LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } else { - LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", - client->id, client->fd, error, strerror(error)); - } - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(event.events & EPOLLHUP)) { - /* Client connection reset by peer */ - LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else if (unlikely(event.events & EPOLLRDHUP)) { - /* Client disconnected by peer */ - LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } -noparse: - dec_instance_ref(cdata, client); + ckmsgq_add(cdata->cevents, &event); } out: /* We shouldn't get here unless there's an error */ @@ -688,7 +709,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende while (sender_send->len) { int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); - if (unlikely(ret < 1)) { + if (ret < 1) { /* Invalidate clients that block for more than 60 seconds */ if (unlikely(client->blocked_time && now_t - client->blocked_time >= 60)) { LOGNOTICE("Client id %"PRId64" fd %d blocked for >60 seconds, disconnecting", @@ -1200,18 +1221,11 @@ out: return ret; } -static void process_client_msg(cdata_t *cdata, const char *buf) +static void client_message_processor(ckpool_t *ckp, json_t *json_msg) { int64_t client_id; - json_t *json_msg; char *msg; - json_msg = json_loads(buf, 0, NULL); - if (unlikely(!json_msg)) { - LOGWARNING("Invalid json message in process_client_msg: %s", buf); - return; - } - /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); @@ -1221,10 +1235,22 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); - send_client(cdata, client_id, msg); + send_client(ckp->data, client_id, msg); json_decref(json_msg); } +static void process_client_msg(cdata_t *cdata, const char *buf) +{ + json_t *json_msg = json_loads(buf, 0, NULL); + + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message in process_client_msg: %s", buf); + return; + } + + ckmsgq_add(cdata->cmpq, json_msg); +} + /* Send the passthrough the terminate node.method */ static void drop_passthrough_client(cdata_t *cdata, const int64_t id) { @@ -1441,10 +1467,9 @@ out: int connector(proc_instance_t *pi) { cdata_t *cdata = ckzalloc(sizeof(cdata_t)); + int threads, sockd, ret = 0, i, tries = 0; ckpool_t *ckp = pi->ckp; - int sockd, ret = 0, i; const int on = 1; - int tries = 0; LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; @@ -1539,6 +1564,8 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); + cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); + if (ckp->remote && !setup_upstream(ckp, cdata)) { ret = 1; goto out; @@ -1552,6 +1579,8 @@ int connector(proc_instance_t *pi) mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + cdata->cevents = create_ckmsgqs(ckp, "cevent", &client_event_processor, threads); create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL); diff --git a/src/libckpool.c b/src/libckpool.c index 25593c08..e14ffae4 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -971,7 +971,7 @@ int wait_close(int sockd, int timeout) /* Emulate a select read wait for high fds that select doesn't support. */ int wait_read_select(int sockd, float timeout) { - struct epoll_event event; + struct epoll_event event = {0, {NULL}}; int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); @@ -1053,7 +1053,7 @@ out: /* Emulate a select write wait for high fds that select doesn't support */ int wait_write_select(int sockd, float timeout) { - struct epoll_event event; + struct epoll_event event = {0, {NULL}}; int epfd, ret; epfd = epoll_create1(EPOLL_CLOEXEC); diff --git a/src/stratifier.c b/src/stratifier.c index ecc92702..4a63b713 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3916,6 +3916,27 @@ static void get_poolstats(sdata_t *sdata, int *sockd) static void srecv_process(ckpool_t *ckp, char *buf); +/* For emergency use only, flushes all pending ckdbq messages */ +static void ckdbq_flush(sdata_t *sdata) +{ + ckmsgq_t *ckdbq = sdata->ckdbq; + int flushed = 0; + + mutex_lock(ckdbq->lock); + while (ckdbq->msgs) { + ckmsg_t *msg = ckdbq->msgs; + + DL_DELETE(ckdbq->msgs, msg); + free(msg->data); + free(msg); + ckdbq->messages--; + flushed++; + } + mutex_unlock(ckdbq->lock); + + LOGWARNING("Flushed %d messages from ckdb queue", flushed); +} + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { sdata_t *sdata = ckp->data; @@ -4080,6 +4101,8 @@ retry: dead_proxy(sdata, buf); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); + } else if (cmdmatch(buf, "ckdbflush")) { + ckdbq_flush(sdata); } else LOGWARNING("Unhandled stratifier message: %s", buf); goto retry;