diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 092a0121..52235247 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -54,15 +54,20 @@ void mkstamp(char *stamp, size_t siz) int main(int argc, char **argv) { - char *name = NULL, *socket_dir = NULL, *buf = NULL; + char *name = NULL, *socket_dir = NULL, *buf = NULL, *sockname = "listener"; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; bool proxy = false; char stamp[128]; int c; - while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) { + while ((c = getopt(argc, argv, "N:n:s:pt:T:")) != -1) { switch(c) { + /* Allows us to specify which process or socket to + * talk to. */ + case 'N': + sockname = strdup(optarg); + break; case 'n': name = strdup(optarg); break; @@ -92,7 +97,7 @@ int main(int argc, char **argv) realloc_strcat(&socket_dir, name); dealloc(name); trail_slash(&socket_dir); - realloc_strcat(&socket_dir, "listener"); + realloc_strcat(&socket_dir, sockname); while (42) { int sockd, len; diff --git a/src/ckpool.c b/src/ckpool.c index 218139de..9b9396e6 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -202,8 +202,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } -static void childsighandler(const int sig); - /* Create a standalone thread that queues received unix messages for a proc * instance and adds them to linked list of received messages with their * associated receive socket, then signal the associated rmsg_cond for the @@ -808,7 +806,7 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid return; LOGWARNING("Old process %s pid %d failed to respond to terminate request, killing", pi->processname, oldpid); - if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 500)) + if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 3000)) quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); } @@ -879,7 +877,7 @@ static void rm_namepid(const proc_instance_t *pi) /* Disable signal handlers for child processes, but simply pass them onto the * parent process to shut down cleanly. */ -static void childsighandler(const int sig) +void childsighandler(const int sig) { signal(sig, SIG_IGN); signal(SIGTERM, SIG_IGN); diff --git a/src/ckpool.h b/src/ckpool.h index 2388aedd..656fc5fa 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); +void childsighandler(const int sig); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); diff --git a/src/connector.c b/src/connector.c index 221b808f..0bd6fac7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,12 +29,17 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; + + /* fd cannot be changed while a ref is held */ int fd; /* Reference count for when this instance is used outside of the * connector_data lock */ int ref; + /* Have we disabled this client to be removed when there are no refs? */ + bool invalid; + /* For dead_clients list */ client_instance_t *next; client_instance_t *prev; @@ -58,6 +63,7 @@ struct sender_send { client_instance_t *client; char *buf; int len; + int ofs; }; typedef struct sender_send sender_send_t; @@ -68,6 +74,8 @@ struct connector_data { cklock_t lock; proc_instance_t *pi; + time_t start_time; + /* Array of server fds */ int *serverfd; /* All time count of clients connected */ @@ -93,10 +101,11 @@ struct connector_data { /* For the linked list of pending sends */ sender_send_t *sender_sends; - sender_send_t *delayed_sends; int64_t sends_generated; int64_t sends_delayed; + int64_t sends_queued; + int64_t sends_size; /* For protecting the pending sends list */ mutex_t sender_lock; @@ -219,6 +228,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } keep_sockalive(fd); + noblock_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); @@ -229,7 +239,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - client->fd = fd; event.data.u64 = client->id; event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { @@ -241,6 +250,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ __inc_instance_ref(client); + client->fd = fd; return 1; } @@ -249,16 +259,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int drop_client(cdata_t *cdata, client_instance_t *client) { int64_t client_id = 0; - int fd; + int fd = -1; ck_wlock(&cdata->lock); - fd = client->fd; - if (fd != -1) { + if (!client->invalid) { + client->invalid = true; client_id = client->id; - + fd = client->fd; epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); - nolinger_socket(fd); - Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the @@ -274,7 +282,22 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } -static void stratifier_drop_client(ckpool_t *ckp, int64_t id) +/* For sending the drop command to the upstream pool in passthrough mode */ +static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + json_t *val; + char *s; + + JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", + client->address_name, "server", client->server, "method", "mining.term", + "params"); + s = json_dumps(val, 0); + json_decref(val); + send_proc(ckp->generator, s); + free(s); +} + +static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) { char buf[256]; @@ -282,6 +305,11 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) send_proc(ckp->stratifier, buf); } +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + stratifier_drop_id(ckp, client->id); +} + /* Invalidate this instance. Remove them from the hashtables we look up * regularly but keep the instances in a linked list until their ref count * drops to zero when we can remove them lazily. Client must hold a reference @@ -292,9 +320,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - if (ckp->passthrough) - goto out; - stratifier_drop_client(ckp, client->id); + if (!ckp->passthrough && !client->passthrough) + stratifier_drop_client(ckp, client); + else if (ckp->passthrough) + generator_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -303,12 +332,16 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c if (!client->ref) { DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector recycling client %"PRId64, client->id); + /* We only close the client fd once we're sure there + * are no references to it left to prevent fds being + * reused on new and old clients. */ + nolinger_socket(client->fd); + Close(client->fd); __recycle_client(cdata, client); } } ck_wunlock(&cdata->lock); -out: return ret; } @@ -323,41 +356,33 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) json_t *val; retry: - ret = wait_read_select(client->fd, 0); - if (ret < 1) { - if (!ret) - return; - LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + if (unlikely(client->bufofs > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", + client->id, client->fd); invalidate_client(ckp, cdata, client); return; } buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); + /* This read call is non-blocking since the socket is set to O_NOBLOCK */ + ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - if (!ret) + if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } client->bufofs += ret; reparse: eol = memchr(client->buf, '\n', client->bufofs); - if (!eol) { - if (unlikely(client->bufofs > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, cdata, client); - return; - } + if (!eol) goto retry; - } /* Do something useful with this message now */ buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); + LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); return; } @@ -382,16 +407,17 @@ reparse: json_object_del(val, "client_id"); passthrough_id = (client->id << 32) | passthrough_id; json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); - } else + } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); - json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + } json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ - if (likely(client->fd != -1)) { + if (likely(!client->invalid)) { if (ckp->passthrough) send_proc(ckp->generator, s); else @@ -413,8 +439,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client) - __inc_instance_ref(client); + if (client) { + if (!client->invalid) + __inc_instance_ref(client); + else + client = NULL; + } ck_wunlock(&cdata->lock); return client; @@ -425,10 +455,8 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; - bool dropped_backlog = false; struct epoll_event event; uint64_t serverfds, i; - time_t start_t; int ret, epfd; rename_proc("creceiver"); @@ -436,7 +464,7 @@ void *receiver(void *arg) epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); - return NULL; + goto out; } serverfds = cdata->ckp->serverurls; /* Add all the serverfds to the epoll */ @@ -447,27 +475,16 @@ void *receiver(void *arg) ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); - return NULL; + goto out; } } while (!cdata->accept) cksleep_ms(1); - start_t = time(NULL); while (42) { client_instance_t *client; - if (unlikely(!dropped_backlog && time(NULL) - start_t > 90)) { - /* When we first start we listen to as many connections - * as possible. After the first minute we drop the - * listen to the minimum to effectively ratelimit how - * fast we can receive new connections. */ - dropped_backlog = true; - LOGNOTICE("Dropping server listen backlog to 0"); - for (i = 0; i < serverfds; i++) - listen(cdata->serverfd[i], 0); - } while (unlikely(!cdata->accept)) cksleep_ms(10); ret = epoll_wait(epfd, &event, 1, 1000); @@ -489,41 +506,115 @@ void *receiver(void *arg) } client = ref_client_by_id(cdata, event.data.u64); if (unlikely(!client)) { - LOGWARNING("Failed to find client by id in receiver!"); + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } - if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { - /* Client disconnected */ - LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else + if (unlikely(client->invalid)) + goto noparse; + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(client->invalid)) + goto noparse; + if (unlikely(event.events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } +noparse: dec_instance_ref(cdata, client); } +out: + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } -/* Use a thread to send queued messages, using select() to only send to sockets - * ready for writing immediately to not delay other messages. */ -void *sender(void *arg) +/* Send a sender_send message and return true if we've finished sending it or + * are unable to send any more. */ +static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) +{ + client_instance_t *client = sender_send->client; + + if (unlikely(client->invalid)) + return true; + + while (sender_send->len) { + int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); + + if (unlikely(ret < 1)) { + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) + return false; + LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", + client->id, client->fd, errno, strerror(errno)); + invalidate_client(ckp, cdata, client); + return true; + } + sender_send->ofs += ret; + sender_send->len -= ret; + } + return true; +} + +static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) +{ + dec_instance_ref(cdata, sender_send->client); + free(sender_send->buf); + free(sender_send); +} + +/* Use a thread to send queued messages, appending them to the sends list and + * iterating over all of them, attempting to send them all non-blocking to + * only send to those clients ready to receive data. */ +static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; + sender_send_t *sends = NULL; ckpool_t *ckp = cdata->ckp; - bool sent = false; rename_proc("csender"); while (42) { - sender_send_t *sender_send, *delayed; - client_instance_t *client; - int ret = 0, fd, ofs = 0; + int64_t sends_queued = 0, sends_size = 0; + sender_send_t *sending, *tmp; + + /* Check all sends to see if they can be written out */ + DL_FOREACH_SAFE(sends, sending, tmp) { + if (send_sender_send(ckp, cdata, sending)) { + DL_DELETE(sends, sending); + clear_sender_send(sending, cdata); + } else { + sends_queued++; + sends_size += sizeof(sender_send_t) + sending->len + 1; + } + } mutex_lock(&cdata->sender_lock); - /* Poll every 10ms if there are no new sends. Re-examine - * delayed sends immediately after a successful send in case - * endless new sends more frequently end up starving the - * delayed sends. */ - if (!cdata->sender_sends && !sent) { + cdata->sends_delayed += sends_queued; + cdata->sends_queued = sends_queued; + cdata->sends_size = sends_size; + /* Poll every 10ms if there are no new sends. */ + if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; ts_t timeout_ts; @@ -531,86 +622,25 @@ void *sender(void *arg) timeraddspec(&timeout_ts, &polltime); cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } - sender_send = cdata->sender_sends; - if (sender_send) - DL_DELETE(cdata->sender_sends, sender_send); - mutex_unlock(&cdata->sender_lock); - - sent = false; - - /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing or have just - * serviced another message successfully. */ - if (!sender_send) { - /* Find a delayed client that needs servicing and set - * ret accordingly. We do not need to use FOREACH_SAFE - * as we break out of the loop as soon as we manipuate - * the list. */ - DL_FOREACH(cdata->delayed_sends, delayed) { - if ((ret = wait_write_select(delayed->client->fd, 0))) { - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); - break; - } - } - /* None found ? */ - if (!sender_send) - continue; - } - client = sender_send->client; - - /* If this socket is not ready to receive data from us, put the - * send back on the tail of the list and decrease the timeout - * to poll to either look for a client that is ready or poll - * select on this one */ - ck_rlock(&cdata->lock); - fd = client->fd; - if (!ret) - ret = wait_write_select(fd, 0); - ck_runlock(&cdata->lock); - - if (ret < 1) { - if (ret < 0) { - LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); - invalidate_client(ckp, cdata, client); - goto contfree; - } - LOGDEBUG("Client %"PRId64" not ready for writes", client->id); - - /* Append it to the tail of the delayed sends list. - * This is the only function that alters it so no - * locking is required. Keep the client ref. */ - DL_APPEND(cdata->delayed_sends, sender_send); - cdata->sends_delayed++; - continue; - } - while (sender_send->len) { - ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); - if (unlikely(ret < 0)) { - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); - invalidate_client(ckp, cdata, client); - break; - } - ofs += ret; - sender_send->len -= ret; + if (cdata->sender_sends) { + DL_CONCAT(sends, cdata->sender_sends); + cdata->sender_sends = NULL; } -contfree: - sent = true; - free(sender_send->buf); - free(sender_send); - dec_instance_ref(cdata, client); + mutex_unlock(&cdata->sender_lock); } - + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, int64_t id, char *buf) +static void send_client(cdata_t *cdata, const int64_t id, char *buf) { + ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; - int fd = -1, len; + int len; if (unlikely(!buf)) { LOGWARNING("Connector send_client sent a null buffer"); @@ -623,28 +653,36 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) return; } - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) { - fd = client->fd; - /* Grab a reference to this client until the sender_send has - * completed processing. */ - __inc_instance_ref(client); - } - ck_wunlock(&cdata->lock); - - if (unlikely(fd == -1)) { - ckpool_t *ckp = cdata->ckp; + /* Grab a reference to this client until the sender_send has + * completed processing. Is this a passthrough subclient ? */ + if (id > 0xffffffffll) { + int64_t client_id, pass_id; - if (client) { - /* This shouldn't happen */ - LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); - invalidate_client(ckp, cdata, client); - } else { + client_id = id & 0xffffffffll; + pass_id = id >> 32; + /* Make sure the passthrough exists for passthrough subclients */ + client = ref_client_by_id(cdata, pass_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", + pass_id, client_id); + /* Now see if the subclient exists */ + client = ref_client_by_id(cdata, client_id); + if (client) { + invalidate_client(ckp, cdata, client); + dec_instance_ref(cdata, client); + } else + stratifier_drop_id(ckp, id); + free(buf); + return; + } + } else { + client = ref_client_by_id(cdata, id); + if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); + free(buf); + return; } - free(buf); - return; } sender_send = ckzalloc(sizeof(sender_send_t)); @@ -671,7 +709,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void process_client_msg(cdata_t *cdata, const char *buf) { - int64_t client_id64, client_id; + int64_t client_id; json_t *json_msg; char *msg; @@ -682,22 +720,18 @@ static void process_client_msg(cdata_t *cdata, const char *buf) } /* Extract the client id from the json message and remove its entry */ - client_id64 = json_integer_value(json_object_get(json_msg, "client_id")); + client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); - if (client_id64 > 0xffffffffll) { - int64_t passthrough_id; - - passthrough_id = client_id64 & 0xffffffffll; - client_id = client_id64 >> 32; - json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); - } else - client_id = client_id64; + /* Put client_id back in for a passthrough subclient, passing its + * upstream client_id instead of the passthrough's. */ + if (client_id > 0xffffffffll) + json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); json_decref(json_msg); } -static char *connector_stats(cdata_t *cdata) +static char *connector_stats(cdata_t *cdata, const int runtime) { json_t *val = json_object(), *subval; client_instance_t *client; @@ -706,6 +740,10 @@ static char *connector_stats(cdata_t *cdata) int64_t memsize; char *buf; + /* If called in passthrough mode we log stats instead of the stratifier */ + if (runtime) + json_set_int(val, "runtime", runtime); + ck_rlock(&cdata->lock); objects = HASH_COUNT(cdata->clients); memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; @@ -728,68 +766,57 @@ static char *connector_stats(cdata_t *cdata) memsize = 0; mutex_lock(&cdata->sender_lock); - generated = cdata->sends_generated; DL_FOREACH(cdata->sender_sends, send) { objects++; memsize += sizeof(sender_send_t) + send->len + 1; } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); json_set_object(val, "sends", subval); - objects = 0; - memsize = 0; - - mutex_lock(&cdata->sender_lock); - generated = cdata->sends_delayed; - DL_FOREACH(cdata->delayed_sends, send) { - objects++; - memsize += sizeof(sender_send_t) + send->len + 1; - } + JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); mutex_unlock(&cdata->sender_lock); - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); - LOGNOTICE("Connector stats: %s", buf); + if (runtime) + LOGNOTICE("Passthrough:%s", buf); + else + LOGNOTICE("Connector stats: %s", buf); return buf; } static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int64_t client_id64, client_id; unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; - uint8_t test_cycle = 0; - char *buf; + time_t last_stats; + int64_t client_id; int ret = 0; + char *buf; LOGWARNING("%s connector ready", ckp->name); + last_stats = cdata->start_time; retry: + if (ckp->passthrough) { + time_t diff = time(NULL); + + if (diff - last_stats >= 60) { + last_stats = diff; + diff -= cdata->start_time; + buf = connector_stats(cdata, diff); + dealloc(buf); + } + } + if (umsg) { Close(umsg->sockd); free(umsg->buf); dealloc(umsg); } - if (!++test_cycle) { - /* Test for pthread join every 256 messages */ - if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { - LOGEMERG("Connector sender thread shutdown, exiting"); - ret = 1; - goto out; - } - if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { - LOGEMERG("Connector receiver thread shutdown, exiting"); - ret = 1; - goto out; - } - } - do { umsg = get_unix_msg(pi); } while (!umsg); @@ -803,12 +830,14 @@ retry: } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; - ret = sscanf(buf, "dropclient=%"PRId64, &client_id64); + ret = sscanf(buf, "dropclient=%"PRId64, &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse dropclient command: %s", buf); goto retry; } - client_id = client_id64 & 0xffffffffll; + /* A passthrough client, we can't drop this yet */ + if (client_id > 0xffffffffll) + goto retry; client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id); @@ -831,7 +860,7 @@ retry: char *msg; LOGDEBUG("Connector received stats request"); - msg = connector_stats(cdata); + msg = connector_stats(cdata, 0); send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -910,7 +939,9 @@ int connector(proc_instance_t *pi) Close(sockd); goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + /* Set listen backlog to larger than SOMAXCONN in case the + * system configuration supports it */ + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -952,7 +983,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -974,6 +1005,7 @@ int connector(proc_instance_t *pi) cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + cdata->start_time = time(NULL); create_unix_receiver(pi); diff --git a/src/libckpool.c b/src/libckpool.c index 84e8ce8c..07a1727a 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -893,13 +893,13 @@ int wait_close(int sockd, int timeout) if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); if (ret < 1) return 0; - return sfd.revents & POLLHUP; + return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } /* Emulate a select read wait for high fds that select doesn't support */ @@ -911,7 +911,7 @@ int wait_read_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLIN | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); @@ -993,7 +993,7 @@ int wait_write_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLOUT; + sfd.events = POLLOUT | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); diff --git a/src/stratifier.c b/src/stratifier.c index 500cb835..106344dc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1289,17 +1289,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (client->workername) { if (user) { - ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s", - client->id, client->address, user->throttled ? "throttled " : "", - user->username, client->workername, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", + client->id, client->address, user->throttled ? "throttled " : "", + user->username, client->workername, lazily ? "lazily" : ""); } else { - ASPRINTF(msg, "Client %"PRId64" %s no user worker %s dropped %s", - client->id, client->address, client->workername, - lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s", + client->id, client->address, client->workername, + lazily ? "lazily" : ""); } } else { - ASPRINTF(msg, "Workerless client %"PRId64" %s dropped %s", - client->id, client->address, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s", + client->id, client->address, lazily ? "lazily" : ""); } __del_client(sdata, client); __kill_instance(sdata, client); @@ -1348,7 +1348,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id, - const char *address, const int server) + const char *address, int server) { stratum_instance_t *client; sdata_t *sdata = ckp->data; @@ -1357,6 +1357,9 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i client->id = id; client->session_id = ++sdata->session_id; strcpy(client->address, address); + /* Sanity check to not overflow lookup in ckp->serverurl[] */ + if (server >= ckp->serverurls) + server = 0; client->server = server; client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; @@ -1509,11 +1512,8 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) if (port) url = strsep(&port, ","); if (url && port) { - int port_no; - - port_no = strtol(port, NULL, 10); - JSON_CPACK(json_msg, "{sosss[sii]}", "id", json_null(), "method", "client.reconnect", - "params", url, port_no, 0); + JSON_CPACK(json_msg, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect", + "params", url, port, 0); } else JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); @@ -2428,8 +2428,7 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); client->suggest_diff = worker->mindiff; - if (!user->auth_time) - user->auth_time = time(NULL); + user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || !safecmp(response, "ok.addrauth"))) { @@ -3265,7 +3264,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va { json_t *val; - JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); + JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); stratum_add_send(sdata, val, client_id); } @@ -3409,14 +3408,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 } if (unlikely(cmdmatch(method, "mining.passthrough"))) { - LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); /* We need to inform the connector process that this client - * is a passthrough and to manage its messages accordingly. - * The client_id stays on the list but we won't send anything - * to it since it's unauthorised. Set the flag just in case. */ - client->authorised = false; + * is a passthrough and to manage its messages accordingly. No + * data from this client id should ever come back to this + * stratifier after this so drop the client in the stratifier. */ + LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(client->ckp->connector, buf); + drop_client(sdata, client_id); return; } @@ -3443,12 +3442,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 /* We should only accept authorised requests from here on */ if (!client->authorised) { - /* Dropping unauthorised clients here also allows the - * stratifier process to restart since it will have lost all - * the stratum instance data. Clients will just reconnect. */ - LOGINFO("Dropping unauthorised client %"PRId64" %s", client_id, client->address); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method, + client_id, client->address); return; } @@ -3464,6 +3459,13 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 ckmsgq_add(sdata->stxnq, jp); return; } + + if (cmdmatch(method, "mining.term")) { + LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + drop_client(sdata, client_id); + return; + } + /* Unhandled message here */ LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); return; @@ -3625,10 +3627,18 @@ static void discard_json_params(json_params_t *jp) { json_decref(jp->method); json_decref(jp->params); - json_decref(jp->id_val); + if (jp->id_val) + json_decref(jp->id_val); free(jp); } +static void steal_json_id(json_t *val, json_params_t *jp) +{ + /* Steal the id_val as is to avoid a copy */ + json_object_set_new_nocheck(val, "id", jp->id_val); + jp->id_val = NULL; +} + static void sshare_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; @@ -3651,7 +3661,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) result_val = parse_submit(client, json_msg, jp->params, &err_val); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); out_decref: dec_instance_ref(sdata, client); @@ -3713,7 +3723,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); if (!json_is_true(result_val) || !client->suggest_diff) @@ -3877,7 +3887,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out; } val = json_object(); - json_object_set_nocheck(val, "id", jp->id_val); + steal_json_id(val, jp); if (cmdmatch(msg, "mining.get_transactions")) { int txns; @@ -4521,7 +4531,8 @@ int stratifier(proc_instance_t *pi) create_pthread(&pth_blockupdate, blockupdate, ckp); mutex_init(&sdata->stats_lock); - create_pthread(&pth_statsupdate, statsupdate, ckp); + if (!ckp->passthrough) + create_pthread(&pth_statsupdate, statsupdate, ckp); mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock);