From 70379428d6f528ae51d3cd1fbd02a75a4e13f845 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:51:20 +1000 Subject: [PATCH 01/46] Steal copied id_vals in the stratifier to avoid needing to copy them again, thus allowing opaque objects to be used as id values --- src/stratifier.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 500cb835..e5f704a3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3265,7 +3265,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va { json_t *val; - JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); + JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); stratum_add_send(sdata, val, client_id); } @@ -3625,10 +3625,18 @@ static void discard_json_params(json_params_t *jp) { json_decref(jp->method); json_decref(jp->params); - json_decref(jp->id_val); + if (jp->id_val) + json_decref(jp->id_val); free(jp); } +static void steal_json_id(json_t *val, json_params_t *jp) +{ + /* Steal the id_val as is to avoid a copy */ + json_object_set_new_nocheck(val, "id", jp->id_val); + jp->id_val = NULL; +} + static void sshare_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; @@ -3651,7 +3659,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) result_val = parse_submit(client, json_msg, jp->params, &err_val); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); out_decref: dec_instance_ref(sdata, client); @@ -3713,7 +3721,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); if (!json_is_true(result_val) || !client->suggest_diff) @@ -3877,7 +3885,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out; } val = json_object(); - json_object_set_nocheck(val, "id", jp->id_val); + steal_json_id(val, jp); if (cmdmatch(msg, "mining.get_transactions")) { int txns; From bf72ebbee6f0fe9767bcce50d50cda2ab8853046 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:56:24 +1000 Subject: [PATCH 02/46] Reinstate checking for zero as a return from recv for cleanly disconnected clients --- src/connector.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 221b808f..e2a66f36 100644 --- a/src/connector.c +++ b/src/connector.c @@ -335,8 +335,9 @@ retry: buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); if (ret < 1) { - if (!ret) - return; + /* We should have something to read if called since poll set + * this fd's revents status so if there's nothing it means the + * client has disconnected. */ LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); From ae2b00842b57924da1f7aa1971572afb5e6bd1d4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 11:07:33 +1000 Subject: [PATCH 03/46] Allow unauthorised clients to send other messages till they've authorised but silently ignore them --- src/stratifier.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e5f704a3..25f9cdd5 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3443,12 +3443,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 /* We should only accept authorised requests from here on */ if (!client->authorised) { - /* Dropping unauthorised clients here also allows the - * stratifier process to restart since it will have lost all - * the stratum instance data. Clients will just reconnect. */ - LOGINFO("Dropping unauthorised client %"PRId64" %s", client_id, client->address); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method, + client_id, client->address); return; } From da2f62aea96c0a472070c2c39376121e4ddca5c2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 11:44:17 +1000 Subject: [PATCH 04/46] Pass through the downstream clients' address and server --- src/connector.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index e2a66f36..7b930ea2 100644 --- a/src/connector.c +++ b/src/connector.c @@ -383,10 +383,11 @@ reparse: json_object_del(val, "client_id"); passthrough_id = (client->id << 32) | passthrough_id; json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); - } else + } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); - json_object_set_new_nocheck(val, "address", json_string(client->address_name)); - json_object_set_new_nocheck(val, "server", json_integer(client->server)); + json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "server", json_integer(client->server)); + } s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We From 24d89a3e72fc4bca6a72d5e55b3d7f7862c02d35 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:04:49 +1000 Subject: [PATCH 05/46] Don't use MSG_WAITALL on unix sockets --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 84e8ce8c..f3d218d1 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -932,7 +932,7 @@ int read_length(int sockd, void *buf, int len) if (unlikely(sockd < 0)) return -1; while (len) { - ret = recv(sockd, buf + ofs, len, MSG_WAITALL); + ret = recv(sockd, buf + ofs, len, 0); if (unlikely(ret < 1)) return -1; ofs += ret; From 2cc42376bdce1177326fc28a50f34d0efe653eff Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:06:27 +1000 Subject: [PATCH 06/46] Check also for POLLRDHUP in wait/read write select helpers --- src/libckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index f3d218d1..24c66f10 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -911,7 +911,7 @@ int wait_read_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLIN | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); @@ -993,7 +993,7 @@ int wait_write_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLOUT; + sfd.events = POLLOUT | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); From 6e7f39321df599b3b05ca721f0096c3837d39556 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:08:58 +1000 Subject: [PATCH 07/46] We are not interested in POLLIN in wait_close but any mode of a closed socket --- src/libckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 24c66f10..011194fc 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -893,13 +893,13 @@ int wait_close(int sockd, int timeout) if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); if (ret < 1) return 0; - return sfd.revents & POLLHUP; + return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } /* Emulate a select read wait for high fds that select doesn't support */ From 0b96be01443ff79db8b6e2984f591ffd871599e7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:12:29 +1000 Subject: [PATCH 08/46] Use non blocking receive in parse_client_msg as we check for read readiness with wait_read_select first, and there is the unlikely event the state changes --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 7b930ea2..b141f205 100644 --- a/src/connector.c +++ b/src/connector.c @@ -333,7 +333,7 @@ retry: return; } buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); + ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); if (ret < 1) { /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the From 8bb50fa05861aa2dfac08001aac0839e057f4543 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 15:04:41 +1000 Subject: [PATCH 09/46] Process epoll read messages before hangups and errors and add more info about the type of hang up, increasing verbosity when it's an unexpected error --- src/connector.c | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index b141f205..44dbc76d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -494,12 +494,34 @@ void *receiver(void *arg) LOGWARNING("Failed to find client by id in receiver!"); continue; } - if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { - /* Client disconnected */ - LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(event.events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } dec_instance_ref(cdata, client); } return NULL; From e9441f2e1af300b3afef10678031c514b7d2f462 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 19:28:11 +1000 Subject: [PATCH 10/46] Drop passthrough proxy connection from stratifier --- src/connector.c | 12 ++++++------ src/stratifier.c | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/connector.c b/src/connector.c index 44dbc76d..ed68155d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -274,11 +274,14 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } -static void stratifier_drop_client(ckpool_t *ckp, int64_t id) +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) { char buf[256]; - sprintf(buf, "dropclient=%"PRId64, id); + /* The stratifier is not in use in passthrough mode */ + if (ckp->passthrough || client->passthrough) + return; + sprintf(buf, "dropclient=%"PRId64, client->id); send_proc(ckp->stratifier, buf); } @@ -292,9 +295,7 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - if (ckp->passthrough) - goto out; - stratifier_drop_client(ckp, client->id); + stratifier_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -308,7 +309,6 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c } ck_wunlock(&cdata->lock); -out: return ret; } diff --git a/src/stratifier.c b/src/stratifier.c index 25f9cdd5..5d483fcb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3409,14 +3409,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 } if (unlikely(cmdmatch(method, "mining.passthrough"))) { - LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); /* We need to inform the connector process that this client - * is a passthrough and to manage its messages accordingly. - * The client_id stays on the list but we won't send anything - * to it since it's unauthorised. Set the flag just in case. */ - client->authorised = false; + * is a passthrough and to manage its messages accordingly. No + * data from this client id should ever come back to this + * stratifier after this so drop the client in the stratifier. */ + LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(client->ckp->connector, buf); + drop_client(sdata, client_id); return; } From 3f3dc7f4f30591ec2ba57f4860e30e817cf8dc44 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 19:58:40 +1000 Subject: [PATCH 11/46] Implement a stratum mining.term call which notifies the stratifier this client is terminating and use it to signal upstream pools when passthrough subclients have disconnected --- src/connector.c | 31 ++++++++++++++++++++++++------- src/stratifier.c | 7 +++++++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index ed68155d..35dd0bd2 100644 --- a/src/connector.c +++ b/src/connector.c @@ -274,13 +274,25 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } +/* For sending the drop command to the upstream pool in passthrough mode */ +static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + json_t *val; + char *s; + + JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", + client->address_name, "server", client->server, "method", "mining.term", + "params"); + s = json_dumps(val, 0); + json_decref(val); + send_proc(ckp->generator, s); + free(s); +} + static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) { char buf[256]; - /* The stratifier is not in use in passthrough mode */ - if (ckp->passthrough || client->passthrough) - return; sprintf(buf, "dropclient=%"PRId64, client->id); send_proc(ckp->stratifier, buf); } @@ -295,7 +307,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - stratifier_drop_client(ckp, client); + if (!ckp->passthrough && !client->passthrough) + stratifier_drop_client(ckp, client); + else if (ckp->passthrough) + generator_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -784,10 +799,10 @@ static char *connector_stats(cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int64_t client_id64, client_id; unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; uint8_t test_cycle = 0; + int64_t client_id; char *buf; int ret = 0; @@ -827,12 +842,14 @@ retry: } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; - ret = sscanf(buf, "dropclient=%"PRId64, &client_id64); + ret = sscanf(buf, "dropclient=%"PRId64, &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse dropclient command: %s", buf); goto retry; } - client_id = client_id64 & 0xffffffffll; + /* A passthrough client, we can't drop this yet */ + if (client_id > 0xffffffffll) + goto retry; client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id); diff --git a/src/stratifier.c b/src/stratifier.c index 5d483fcb..1b50a2cb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3460,6 +3460,13 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 ckmsgq_add(sdata->stxnq, jp); return; } + + if (cmdmatch(method, "mining.term")) { + LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + drop_client(sdata, client_id); + return; + } + /* Unhandled message here */ LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); return; From e095ba4b49c49d502749f74ce8dfc5f2fda955fa Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:03:30 +1000 Subject: [PATCH 12/46] Mask out error 0 in socket error messages --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 35dd0bd2..b57bf048 100644 --- a/src/connector.c +++ b/src/connector.c @@ -520,7 +520,7 @@ void *receiver(void *arg) /* See what type of error this is and raise the log * level of the message if it's unexpected. */ getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error != 104) { + if (error && error != 104) { LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", client->id, client->fd, error, strerror(error)); } else { From 93d5760b2d52b99da4608d4f2fcf31f81091029f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:13:51 +1000 Subject: [PATCH 13/46] Set server to the passthrough server client in passthrough mode --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index b57bf048..650e4921 100644 --- a/src/connector.c +++ b/src/connector.c @@ -401,8 +401,8 @@ reparse: } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); - json_object_set_new_nocheck(val, "server", json_integer(client->server)); } + json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We From 5633f8365df0082920d99ff669d0ca2a2c91812a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:17:24 +1000 Subject: [PATCH 14/46] Prevent theoretical read out of bounds --- src/stratifier.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1b50a2cb..00c9b000 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1348,7 +1348,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id, - const char *address, const int server) + const char *address, int server) { stratum_instance_t *client; sdata_t *sdata = ckp->data; @@ -1357,6 +1357,9 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i client->id = id; client->session_id = ++sdata->session_id; strcpy(client->address, address); + /* Sanity check to not overflow lookup in ckp->serverurl[] */ + if (server >= ckp->serverurls) + server = 0; client->server = server; client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; From 84ca31fcca398b432a151702c0c71f3d13d4a0bc Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 09:13:20 +1000 Subject: [PATCH 15/46] Drop stratifier id when client id is not found in connector --- src/connector.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 650e4921..b74e53fe 100644 --- a/src/connector.c +++ b/src/connector.c @@ -289,14 +289,19 @@ static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client free(s); } -static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) { char buf[256]; - sprintf(buf, "dropclient=%"PRId64, client->id); + sprintf(buf, "dropclient=%"PRId64, id); send_proc(ckp->stratifier, buf); } +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + stratifier_drop_id(ckp, client->id); +} + /* Invalidate this instance. Remove them from the hashtables we look up * regularly but keep the instances in a linked list until their ref count * drops to zero when we can remove them lazily. Client must hold a reference @@ -681,6 +686,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) invalidate_client(ckp, cdata, client); } else { LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); } free(buf); return; From 6c0c7dd7c43df51f50591845c42d8e0cd8c544b6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 09:51:12 +1000 Subject: [PATCH 16/46] Rework parse_client_msg to use the non blocking recv return values avoiding the need for an extra wait_read_select --- src/connector.c | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/connector.c b/src/connector.c index b74e53fe..65c77bd6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -343,21 +343,13 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) json_t *val; retry: - ret = wait_read_select(client->fd, 0); - if (ret < 1) { - if (!ret) - return; - LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, cdata, client); - return; - } buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); if (ret < 1) { - /* We should have something to read if called since poll set - * this fd's revents status so if there's nothing it means the - * client has disconnected. */ + if (!ret) + return; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); From 22ece7b96cc08081d8c332ed73ae3851d6c21ffb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 10:13:48 +1000 Subject: [PATCH 17/46] Revert "Don't use MSG_WAITALL on unix sockets" This reverts commit 24d89a3e72fc4bca6a72d5e55b3d7f7862c02d35. Do use it... --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 011194fc..07a1727a 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -932,7 +932,7 @@ int read_length(int sockd, void *buf, int len) if (unlikely(sockd < 0)) return -1; while (len) { - ret = recv(sockd, buf + ofs, len, 0); + ret = recv(sockd, buf + ofs, len, MSG_WAITALL); if (unlikely(ret < 1)) return -1; ofs += ret; From 94dbd802a262e7d41b127b73fe6bb92530187c83 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 10:20:54 +1000 Subject: [PATCH 18/46] Wait longer after kill message --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 218139de..4bd99f22 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -808,7 +808,7 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid return; LOGWARNING("Old process %s pid %d failed to respond to terminate request, killing", pi->processname, oldpid); - if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 500)) + if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 3000)) quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); } From fd73ebe3aeea3cd454bba9c2ece9eb9e28aed0ab Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 15:30:44 +1000 Subject: [PATCH 19/46] Send port number as a string on reconnect as most clients will be expecting it --- src/stratifier.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 00c9b000..f1cdf034 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1512,11 +1512,8 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) if (port) url = strsep(&port, ","); if (url && port) { - int port_no; - - port_no = strtol(port, NULL, 10); - JSON_CPACK(json_msg, "{sosss[sii]}", "id", json_null(), "method", "client.reconnect", - "params", url, port_no, 0); + JSON_CPACK(json_msg, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect", + "params", url, port, 0); } else JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); From 4f6776a4c3501977e4af0ef48c9edde5557379f5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 15:40:55 +1000 Subject: [PATCH 20/46] Allow specifying of the socket name to use in ckpmsg --- src/ckpmsg.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 092a0121..52235247 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -54,15 +54,20 @@ void mkstamp(char *stamp, size_t siz) int main(int argc, char **argv) { - char *name = NULL, *socket_dir = NULL, *buf = NULL; + char *name = NULL, *socket_dir = NULL, *buf = NULL, *sockname = "listener"; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; bool proxy = false; char stamp[128]; int c; - while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) { + while ((c = getopt(argc, argv, "N:n:s:pt:T:")) != -1) { switch(c) { + /* Allows us to specify which process or socket to + * talk to. */ + case 'N': + sockname = strdup(optarg); + break; case 'n': name = strdup(optarg); break; @@ -92,7 +97,7 @@ int main(int argc, char **argv) realloc_strcat(&socket_dir, name); dealloc(name); trail_slash(&socket_dir); - realloc_strcat(&socket_dir, "listener"); + realloc_strcat(&socket_dir, sockname); while (42) { int sockd, len; From d0f7ec7c35f26b04dae5cb32867f770df1552115 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 16:27:20 +1000 Subject: [PATCH 21/46] Decrease verbosity of missing client warning in receiver, adding id number to the output --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 65c77bd6..b1a3742d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -503,7 +503,7 @@ void *receiver(void *arg) } client = ref_client_by_id(cdata, event.data.u64); if (unlikely(!client)) { - LOGWARNING("Failed to find client by id in receiver!"); + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } /* We can have both messages and read hang ups so process the From 2561eff1cbc5f2bb2298d55e90f3f6a6d708e725 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 26 Apr 2015 08:52:29 +1000 Subject: [PATCH 22/46] Update user auth time on each successful auth --- src/stratifier.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index f1cdf034..6b4ee37b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2428,8 +2428,7 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); client->suggest_diff = worker->mindiff; - if (!user->auth_time) - user->auth_time = time(NULL); + user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || !safecmp(response, "ok.addrauth"))) { From 2bde0bbe3d51a86e857bbc8e3f11393a0723d028 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:38:36 +1000 Subject: [PATCH 23/46] Avoid trying to parse messages or test the client's error if their fd has already been invalidated --- src/connector.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/connector.c b/src/connector.c index b1a3742d..38052a23 100644 --- a/src/connector.c +++ b/src/connector.c @@ -506,10 +506,14 @@ void *receiver(void *arg) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } + if (unlikely(client->fd == -1)) + goto noparse; /* We can have both messages and read hang ups so process the * message first. */ if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(client->fd == -1)) + goto noparse; if (unlikely(event.events & EPOLLERR)) { socklen_t errlen = sizeof(int); int error = 0; @@ -534,6 +538,7 @@ void *receiver(void *arg) LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); invalidate_client(cdata->pi->ckp, cdata, client); } +noparse: dec_instance_ref(cdata, client); } return NULL; From 62d1ec3f8a351d906842f772d92305ab506649fd Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:47:23 +1000 Subject: [PATCH 24/46] Use signal handlers from sender and receiver threads in the connector avoid needing to pthread tryjoin on every message --- src/ckpool.c | 4 +--- src/ckpool.h | 1 + src/connector.c | 25 +++++++------------------ 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 4bd99f22..9b9396e6 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -202,8 +202,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } -static void childsighandler(const int sig); - /* Create a standalone thread that queues received unix messages for a proc * instance and adds them to linked list of received messages with their * associated receive socket, then signal the associated rmsg_cond for the @@ -879,7 +877,7 @@ static void rm_namepid(const proc_instance_t *pi) /* Disable signal handlers for child processes, but simply pass them onto the * parent process to shut down cleanly. */ -static void childsighandler(const int sig) +void childsighandler(const int sig) { signal(sig, SIG_IGN); signal(SIGTERM, SIG_IGN); diff --git a/src/ckpool.h b/src/ckpool.h index 2388aedd..656fc5fa 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); +void childsighandler(const int sig); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); diff --git a/src/connector.c b/src/connector.c index 38052a23..6e469e29 100644 --- a/src/connector.c +++ b/src/connector.c @@ -450,7 +450,7 @@ void *receiver(void *arg) epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); - return NULL; + goto out; } serverfds = cdata->ckp->serverurls; /* Add all the serverfds to the epoll */ @@ -461,7 +461,7 @@ void *receiver(void *arg) ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); - return NULL; + goto out; } } @@ -541,6 +541,9 @@ void *receiver(void *arg) noparse: dec_instance_ref(cdata, client); } +out: + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } @@ -641,7 +644,8 @@ contfree: free(sender_send); dec_instance_ref(cdata, client); } - + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } @@ -804,7 +808,6 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; - uint8_t test_cycle = 0; int64_t client_id; char *buf; int ret = 0; @@ -818,20 +821,6 @@ retry: dealloc(umsg); } - if (!++test_cycle) { - /* Test for pthread join every 256 messages */ - if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { - LOGEMERG("Connector sender thread shutdown, exiting"); - ret = 1; - goto out; - } - if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { - LOGEMERG("Connector receiver thread shutdown, exiting"); - ret = 1; - goto out; - } - } - do { umsg = get_unix_msg(pi); } while (!umsg); From 6a159bf5dc0ebc0f06f18ca170ac7378fe3bc037 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:50:36 +1000 Subject: [PATCH 25/46] We won't get no error if we are not testing fd -1 sockets any more, and use errno for consistent parsing of socket close errors --- src/connector.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 6e469e29..2c1fb215 100644 --- a/src/connector.c +++ b/src/connector.c @@ -521,11 +521,11 @@ void *receiver(void *arg) /* See what type of error this is and raise the log * level of the message if it's unexpected. */ getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error && error != 104) { - LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", client->id, client->fd, error, strerror(error)); } else { - LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", client->id, client->fd, error, strerror(error)); } invalidate_client(cdata->pi->ckp, cdata, client); From 8f1336986f91358786d4c4c9cfa88ec0058ab7ac Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 18:06:20 +1000 Subject: [PATCH 26/46] Use a unique event structure for each client added to the epoll list --- src/connector.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2c1fb215..2f0840f6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,6 +30,7 @@ struct client_instance { UT_hash_handle hh; int64_t id; int fd; + struct epoll_event event; /* Reference count for when this instance is used outside of the * connector_data lock */ @@ -167,7 +168,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) int fd, port, no_clients, sockd; ckpool_t *ckp = cdata->ckp; client_instance_t *client; - struct epoll_event event; socklen_t address_len; ck_rlock(&cdata->lock); @@ -230,9 +230,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); client->fd = fd; - event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + client->event.data.u64 = client->id; + client->event.events = EPOLLIN | EPOLLRDHUP; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &client->event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; } @@ -256,7 +256,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, &client->event); nolinger_socket(fd); Close(client->fd); HASH_DEL(cdata->clients, client); From 93753620440e8456e7ebaaffce22516bd2f36b97 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 18:53:30 +1000 Subject: [PATCH 27/46] Revert "Use a unique event structure for each client added to the epoll list" This reverts commit 8f1336986f91358786d4c4c9cfa88ec0058ab7ac. Unnecessary. --- src/connector.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2f0840f6..2c1fb215 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,7 +30,6 @@ struct client_instance { UT_hash_handle hh; int64_t id; int fd; - struct epoll_event event; /* Reference count for when this instance is used outside of the * connector_data lock */ @@ -168,6 +167,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) int fd, port, no_clients, sockd; ckpool_t *ckp = cdata->ckp; client_instance_t *client; + struct epoll_event event; socklen_t address_len; ck_rlock(&cdata->lock); @@ -230,9 +230,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); client->fd = fd; - client->event.data.u64 = client->id; - client->event.events = EPOLLIN | EPOLLRDHUP; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &client->event) < 0)) { + event.data.u64 = client->id; + event.events = EPOLLIN | EPOLLRDHUP; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; } @@ -256,7 +256,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, &client->event); + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); nolinger_socket(fd); Close(client->fd); HASH_DEL(cdata->clients, client); From 0395dd052f3ed7ac1680f67c7a2e8b1341079254 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 20:03:17 +1000 Subject: [PATCH 28/46] Use the client reference count in the connector to protect the client fd, closing it only once there are no more references to it --- src/connector.c | 59 +++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2c1fb215..a94ee304 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,12 +29,17 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; + + /* fd cannot be changed while a ref is held */ int fd; /* Reference count for when this instance is used outside of the * connector_data lock */ int ref; + /* Have we disabled this client to be removed when there are no refs? */ + bool invalid; + /* For dead_clients list */ client_instance_t *next; client_instance_t *prev; @@ -229,7 +234,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - client->fd = fd; event.data.u64 = client->id; event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { @@ -241,6 +245,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ __inc_instance_ref(client); + client->fd = fd; return 1; } @@ -249,16 +254,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int drop_client(cdata_t *cdata, client_instance_t *client) { int64_t client_id = 0; - int fd; + int fd = -1; ck_wlock(&cdata->lock); - fd = client->fd; - if (fd != -1) { + if (!client->invalid) { + client->invalid = true; client_id = client->id; - + fd = client->fd; epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); - nolinger_socket(fd); - Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the @@ -324,6 +327,11 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c if (!client->ref) { DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector recycling client %"PRId64, client->id); + /* We only close the client fd once we're sure there + * are no references to it left to prevent fds being + * reused on new and old clients. */ + nolinger_socket(client->fd); + Close(client->fd); __recycle_client(cdata, client); } } @@ -405,7 +413,7 @@ reparse: /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ - if (likely(client->fd != -1)) { + if (likely(!client->invalid)) { if (ckp->passthrough) send_proc(ckp->generator, s); else @@ -427,7 +435,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client) + if (client && !client->invalid) __inc_instance_ref(client); ck_wunlock(&cdata->lock); @@ -506,13 +514,13 @@ void *receiver(void *arg) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; /* We can have both messages and read hang ups so process the * message first. */ if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; if (unlikely(event.events & EPOLLERR)) { socklen_t errlen = sizeof(int); @@ -602,17 +610,18 @@ void *sender(void *arg) continue; } client = sender_send->client; + if (unlikely(client->invalid)) { + LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); + goto contfree; + } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ck_rlock(&cdata->lock); fd = client->fd; if (!ret) ret = wait_write_select(fd, 0); - ck_runlock(&cdata->lock); - if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -655,7 +664,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) { sender_send_t *sender_send; client_instance_t *client; - int fd = -1, len; + int len; if (unlikely(!buf)) { LOGWARNING("Connector send_client sent a null buffer"); @@ -670,25 +679,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) { - fd = client->fd; - /* Grab a reference to this client until the sender_send has - * completed processing. */ + /* Grab a reference to this client until the sender_send has + * completed processing. */ + if (likely(client)) __inc_instance_ref(client); - } ck_wunlock(&cdata->lock); - if (unlikely(fd == -1)) { + if (unlikely(!client)) { ckpool_t *ckp = cdata->ckp; - if (client) { - /* This shouldn't happen */ - LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); - invalidate_client(ckp, cdata, client); - } else { - LOGINFO("Connector failed to find client id %"PRId64" to send to", id); - stratifier_drop_id(ckp, id); - } + LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); free(buf); return; } From ec39117a1c7d7bf2467c91a53053ade6b9b5ddcb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 20:17:55 +1000 Subject: [PATCH 29/46] Don't return client if it is invalid in ref_client_by_id in the connector --- src/connector.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index a94ee304..e748808e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -435,8 +435,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client && !client->invalid) - __inc_instance_ref(client); + if (client) { + if (!client->invalid) + __inc_instance_ref(client); + else + client = NULL; + } ck_wunlock(&cdata->lock); return client; From 1efe7120c49b20be51cc0cbc657125f5e38a1133 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 21:39:53 +1000 Subject: [PATCH 30/46] Use ref_client_by_id in send_client instead of open coding it --- src/connector.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/connector.c b/src/connector.c index e748808e..3e020871 100644 --- a/src/connector.c +++ b/src/connector.c @@ -681,14 +681,9 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) return; } - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); /* Grab a reference to this client until the sender_send has * completed processing. */ - if (likely(client)) - __inc_instance_ref(client); - ck_wunlock(&cdata->lock); - + client = ref_client_by_id(cdata, id); if (unlikely(!client)) { ckpool_t *ckp = cdata->ckp; From 3919d182607ad39ec8256ceaeef5d8e84730e2c0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 07:17:51 +1000 Subject: [PATCH 31/46] Rework write path to have no potentially blocking calls and be able to send partial messages --- src/connector.c | 135 +++++++++++++++++++----------------------------- 1 file changed, 52 insertions(+), 83 deletions(-) diff --git a/src/connector.c b/src/connector.c index 3e020871..2ac6bb40 100644 --- a/src/connector.c +++ b/src/connector.c @@ -63,6 +63,7 @@ struct sender_send { client_instance_t *client; char *buf; int len; + int ofs; }; typedef struct sender_send sender_send_t; @@ -98,7 +99,6 @@ struct connector_data { /* For the linked list of pending sends */ sender_send_t *sender_sends; - sender_send_t *delayed_sends; int64_t sends_generated; int64_t sends_delayed; @@ -559,27 +559,65 @@ out: return NULL; } +/* Send a sender_send message and return true if we've finished sending it or + * are unable to send any more. */ +static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) +{ + client_instance_t *client = sender_send->client; + + if (unlikely(client->invalid)) + return true; + + while (sender_send->len) { + int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT); + + if (unlikely(ret < 1)) { + if (!ret) + return false; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return false; + LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); + invalidate_client(ckp, cdata, client); + return true; + } + sender_send->ofs += ret; + sender_send->len -= ret; + } + return true; +} + +static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) +{ + dec_instance_ref(cdata, sender_send->client); + free(sender_send->buf); + free(sender_send); +} + /* Use a thread to send queued messages, using select() to only send to sockets * ready for writing immediately to not delay other messages. */ -void *sender(void *arg) +static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; + sender_send_t *sends = NULL; ckpool_t *ckp = cdata->ckp; - bool sent = false; rename_proc("csender"); while (42) { - sender_send_t *sender_send, *delayed; - client_instance_t *client; - int ret = 0, fd, ofs = 0; + sender_send_t *sender_send, *sending, *tmp; + + /* Check all sends to see if they can be written out */ + DL_FOREACH_SAFE(sends, sending, tmp) { + if (send_sender_send(ckp, cdata, sending)) { + DL_DELETE(sends, sending); + clear_sender_send(sending, cdata); + } else + cdata->sends_delayed++; + } mutex_lock(&cdata->sender_lock); - /* Poll every 10ms if there are no new sends. Re-examine - * delayed sends immediately after a successful send in case - * endless new sends more frequently end up starving the - * delayed sends. */ - if (!cdata->sender_sends && !sent) { + /* Poll every 10ms if there are no new sends. */ + if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; ts_t timeout_ts; @@ -592,70 +630,8 @@ void *sender(void *arg) DL_DELETE(cdata->sender_sends, sender_send); mutex_unlock(&cdata->sender_lock); - sent = false; - - /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing or have just - * serviced another message successfully. */ - if (!sender_send) { - /* Find a delayed client that needs servicing and set - * ret accordingly. We do not need to use FOREACH_SAFE - * as we break out of the loop as soon as we manipuate - * the list. */ - DL_FOREACH(cdata->delayed_sends, delayed) { - if ((ret = wait_write_select(delayed->client->fd, 0))) { - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); - break; - } - } - /* None found ? */ - if (!sender_send) - continue; - } - client = sender_send->client; - if (unlikely(client->invalid)) { - LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); - goto contfree; - } - - /* If this socket is not ready to receive data from us, put the - * send back on the tail of the list and decrease the timeout - * to poll to either look for a client that is ready or poll - * select on this one */ - fd = client->fd; - if (!ret) - ret = wait_write_select(fd, 0); - if (ret < 1) { - if (ret < 0) { - LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); - invalidate_client(ckp, cdata, client); - goto contfree; - } - LOGDEBUG("Client %"PRId64" not ready for writes", client->id); - - /* Append it to the tail of the delayed sends list. - * This is the only function that alters it so no - * locking is required. Keep the client ref. */ - DL_APPEND(cdata->delayed_sends, sender_send); - cdata->sends_delayed++; - continue; - } - while (sender_send->len) { - ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); - if (unlikely(ret < 0)) { - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); - invalidate_client(ckp, cdata, client); - break; - } - ofs += ret; - sender_send->len -= ret; - } -contfree: - sent = true; - free(sender_send->buf); - free(sender_send); - dec_instance_ref(cdata, client); + if (sender_send) + DL_APPEND(sends, sender_send); } /* We shouldn't get here unless there's an error */ childsighandler(15); @@ -787,15 +763,8 @@ static char *connector_stats(cdata_t *cdata) objects = 0; memsize = 0; - mutex_lock(&cdata->sender_lock); generated = cdata->sends_delayed; - DL_FOREACH(cdata->delayed_sends, send) { - objects++; - memsize += sizeof(sender_send_t) + send->len + 1; - } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si}", "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); From 883d870060217449fc8b9f2dcb19a11f23f32195 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 17:34:38 +1000 Subject: [PATCH 32/46] Use the simpler read/write calls and make all client sockets non-blocking --- src/connector.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2ac6bb40..fc4637b3 100644 --- a/src/connector.c +++ b/src/connector.c @@ -224,6 +224,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } keep_sockalive(fd); + noblock_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); @@ -352,7 +353,8 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) retry: buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); + /* This read call is non-blocking since the socket is set to O_NOBLOCK */ + ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { if (!ret) return; @@ -569,7 +571,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende return true; while (sender_send->len) { - int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT); + int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); if (unlikely(ret < 1)) { if (!ret) @@ -593,8 +595,9 @@ static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) free(sender_send); } -/* Use a thread to send queued messages, using select() to only send to sockets - * ready for writing immediately to not delay other messages. */ +/* Use a thread to send queued messages, appending them to the sends list and + * iterating over all of them, attempting to send them all non-blocking to + * only send to those clients ready to receive data. */ static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; From f580a82a06e6e3c9d9ec40b06fdb0b54cc5d66a6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 21:46:00 +1000 Subject: [PATCH 33/46] Drop the subclients of passthroughs that no longer exist --- src/connector.c | 57 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/src/connector.c b/src/connector.c index fc4637b3..e26626d0 100644 --- a/src/connector.c +++ b/src/connector.c @@ -643,8 +643,9 @@ static void *sender(void *arg) /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, int64_t id, char *buf) +static void send_client(cdata_t *cdata, const int64_t id, char *buf) { + ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; int len; @@ -661,15 +662,35 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) } /* Grab a reference to this client until the sender_send has - * completed processing. */ - client = ref_client_by_id(cdata, id); - if (unlikely(!client)) { - ckpool_t *ckp = cdata->ckp; - - LOGINFO("Connector failed to find client id %"PRId64" to send to", id); - stratifier_drop_id(ckp, id); - free(buf); - return; + * completed processing. Is this a passthrough subclient ? */ + if (id > 0xffffffffll) { + int64_t client_id, pass_id; + + client_id = id & 0xffffffffll; + pass_id = id >> 32; + /* Make sure the passthrough exists for passthrough subclients */ + client = ref_client_by_id(cdata, pass_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", + pass_id, client_id); + /* Now see if the subclient exists */ + client = ref_client_by_id(cdata, client_id); + if (client) { + invalidate_client(ckp, cdata, client); + dec_instance_ref(cdata, client); + } else + stratifier_drop_id(ckp, id); + free(buf); + return; + } + } else { + client = ref_client_by_id(cdata, id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); + free(buf); + return; + } } sender_send = ckzalloc(sizeof(sender_send_t)); @@ -696,7 +717,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void process_client_msg(cdata_t *cdata, const char *buf) { - int64_t client_id64, client_id; + int64_t client_id; json_t *json_msg; char *msg; @@ -707,16 +728,12 @@ static void process_client_msg(cdata_t *cdata, const char *buf) } /* Extract the client id from the json message and remove its entry */ - client_id64 = json_integer_value(json_object_get(json_msg, "client_id")); + client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); - if (client_id64 > 0xffffffffll) { - int64_t passthrough_id; - - passthrough_id = client_id64 & 0xffffffffll; - client_id = client_id64 >> 32; - json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); - } else - client_id = client_id64; + /* Put client_id back in for a passthrough subclient, passing its + * upstream client_id instead of the passthrough's. */ + if (client_id > 0xffffffffll) + json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); json_decref(json_msg); From 83b23d864fda7c0ad6f9aab29c230d1a24fd3e04 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 22:08:41 +1000 Subject: [PATCH 34/46] Remove decrease of listen backlog which is of questionable utility --- src/connector.c | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/connector.c b/src/connector.c index e26626d0..dac54f34 100644 --- a/src/connector.c +++ b/src/connector.c @@ -453,10 +453,8 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; - bool dropped_backlog = false; struct epoll_event event; uint64_t serverfds, i; - time_t start_t; int ret, epfd; rename_proc("creceiver"); @@ -481,21 +479,10 @@ void *receiver(void *arg) while (!cdata->accept) cksleep_ms(1); - start_t = time(NULL); while (42) { client_instance_t *client; - if (unlikely(!dropped_backlog && time(NULL) - start_t > 90)) { - /* When we first start we listen to as many connections - * as possible. After the first minute we drop the - * listen to the minimum to effectively ratelimit how - * fast we can receive new connections. */ - dropped_backlog = true; - LOGNOTICE("Dropping server listen backlog to 0"); - for (i = 0; i < serverfds; i++) - listen(cdata->serverfd[i], 0); - } while (unlikely(!cdata->accept)) cksleep_ms(10); ret = epoll_wait(epfd, &event, 1, 1000); From 7c624502979d94942fc10737466726ad303fee17 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 22:20:52 +1000 Subject: [PATCH 35/46] Increase listen backlog to speed up reconnects up to the system config limits --- src/connector.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index dac54f34..17b4274d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -919,7 +919,9 @@ int connector(proc_instance_t *pi) Close(sockd); goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + /* Set listen backlog to larger than SOMAXCONN in case the + * system configuration supports it */ + if (listen(sockd, 1024) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -961,7 +963,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + if (listen(sockd, 1024) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; From f8356971658120168d77cebe3729e6369833fed0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 02:06:24 +1000 Subject: [PATCH 36/46] Microoptimisation --- src/connector.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/connector.c b/src/connector.c index 17b4274d..217946eb 100644 --- a/src/connector.c +++ b/src/connector.c @@ -356,9 +356,7 @@ retry: /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - if (!ret) - return; - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); @@ -561,9 +559,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); if (unlikely(ret < 1)) { - if (!ret) - return false; - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return false; LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); invalidate_client(ckp, cdata, client); From ed44843e3745b6418bb789d7e4a87d274f1b90c3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:26:54 +1000 Subject: [PATCH 37/46] Append all new sender_ends in csender instead of only the first --- src/connector.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 217946eb..b64b578d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -590,7 +590,7 @@ static void *sender(void *arg) rename_proc("csender"); while (42) { - sender_send_t *sender_send, *sending, *tmp; + sender_send_t *sending, *tmp; /* Check all sends to see if they can be written out */ DL_FOREACH_SAFE(sends, sending, tmp) { @@ -611,13 +611,11 @@ static void *sender(void *arg) timeraddspec(&timeout_ts, &polltime); cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } - sender_send = cdata->sender_sends; - if (sender_send) - DL_DELETE(cdata->sender_sends, sender_send); + if (cdata->sender_sends) { + DL_CONCAT(sends, cdata->sender_sends); + cdata->sender_sends = NULL; + } mutex_unlock(&cdata->sender_lock); - - if (sender_send) - DL_APPEND(sends, sender_send); } /* We shouldn't get here unless there's an error */ childsighandler(15); From 27d68200eca362cbdc18a12689f32eebb518893f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:42:12 +1000 Subject: [PATCH 38/46] Add better stats about queued sends consisten with the list changes --- src/connector.c | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/connector.c b/src/connector.c index b64b578d..89017590 100644 --- a/src/connector.c +++ b/src/connector.c @@ -102,6 +102,8 @@ struct connector_data { int64_t sends_generated; int64_t sends_delayed; + int64_t sends_queued; + int64_t sends_size; /* For protecting the pending sends list */ mutex_t sender_lock; @@ -590,6 +592,7 @@ static void *sender(void *arg) rename_proc("csender"); while (42) { + int64_t sends_queued = 0, sends_size = 0; sender_send_t *sending, *tmp; /* Check all sends to see if they can be written out */ @@ -597,11 +600,16 @@ static void *sender(void *arg) if (send_sender_send(ckp, cdata, sending)) { DL_DELETE(sends, sending); clear_sender_send(sending, cdata); - } else - cdata->sends_delayed++; + } else { + sends_queued++; + sends_size += sizeof(sender_send_t) + sending->len + 1; + } } mutex_lock(&cdata->sender_lock); + cdata->sends_delayed += sends_queued; + cdata->sends_queued = sends_queued; + cdata->sends_size = sends_size; /* Poll every 10ms if there are no new sends. */ if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; @@ -751,21 +759,16 @@ static char *connector_stats(cdata_t *cdata) memsize = 0; mutex_lock(&cdata->sender_lock); - generated = cdata->sends_generated; DL_FOREACH(cdata->sender_sends, send) { objects++; memsize += sizeof(sender_send_t) + send->len + 1; } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); json_set_object(val, "sends", subval); - objects = 0; - memsize = 0; + JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); + mutex_unlock(&cdata->sender_lock); - generated = cdata->sends_delayed; - JSON_CPACK(subval, "{si}", "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); From 8447e3ff554675c3e24c5816e60dd0ee88a9ccb2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:44:21 +1000 Subject: [PATCH 39/46] Microoptimise --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 89017590..ae768f8b 100644 --- a/src/connector.c +++ b/src/connector.c @@ -358,7 +358,7 @@ retry: /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) + if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); From 68e44be3ce3d8777760c2e828e8c094fcf13dce6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:52:16 +1000 Subject: [PATCH 40/46] Check for oversized client message before doing any reads to avoid possibility of exactly the wrong size buffer to ever invalidate the client, adding more info to a downgraded message --- src/connector.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index ae768f8b..224fc5b8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -354,6 +354,12 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) json_t *val; retry: + if (unlikely(client->bufofs > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", + client->id, client->fd); + invalidate_client(ckp, cdata, client); + return; + } buflen = PAGESIZE - client->bufofs; /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); @@ -368,14 +374,8 @@ retry: client->bufofs += ret; reparse: eol = memchr(client->buf, '\n', client->bufofs); - if (!eol) { - if (unlikely(client->bufofs > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, cdata, client); - return; - } + if (!eol) goto retry; - } /* Do something useful with this message now */ buflen = eol - client->buf + 1; From 806d1c76a2d057f2d9e0f105e95b4ae29b69a799 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:02:52 +1000 Subject: [PATCH 41/46] Do not create a stratifier statsupdate thread in passthrough mode since the stats always read zero --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 6b4ee37b..356f032d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4531,7 +4531,8 @@ int stratifier(proc_instance_t *pi) create_pthread(&pth_blockupdate, blockupdate, ckp); mutex_init(&sdata->stats_lock); - create_pthread(&pth_statsupdate, statsupdate, ckp); + if (!ckp->passthrough) + create_pthread(&pth_statsupdate, statsupdate, ckp); mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); From 146d164053a164b150ef83b425df2b14fb045d6b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:03:21 +1000 Subject: [PATCH 42/46] Put dropped at the start of the stratifier messages for easier parsing --- src/stratifier.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 356f032d..106344dc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1289,17 +1289,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (client->workername) { if (user) { - ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s", - client->id, client->address, user->throttled ? "throttled " : "", - user->username, client->workername, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", + client->id, client->address, user->throttled ? "throttled " : "", + user->username, client->workername, lazily ? "lazily" : ""); } else { - ASPRINTF(msg, "Client %"PRId64" %s no user worker %s dropped %s", - client->id, client->address, client->workername, - lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s", + client->id, client->address, client->workername, + lazily ? "lazily" : ""); } } else { - ASPRINTF(msg, "Workerless client %"PRId64" %s dropped %s", - client->id, client->address, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s", + client->id, client->address, lazily ? "lazily" : ""); } __del_client(sdata, client); __kill_instance(sdata, client); From 882bd693ddefe17d49324494e9d7ba598a3688ba Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:26:34 +1000 Subject: [PATCH 43/46] Display connector stats every ~60s in passthrough mode --- src/connector.c | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 224fc5b8..fd03991f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -74,6 +74,8 @@ struct connector_data { cklock_t lock; proc_instance_t *pi; + time_t start_time; + /* Array of server fds */ int *serverfd; /* All time count of clients connected */ @@ -728,7 +730,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_decref(json_msg); } -static char *connector_stats(cdata_t *cdata) +static char *connector_stats(cdata_t *cdata, const int runtime) { json_t *val = json_object(), *subval; client_instance_t *client; @@ -737,6 +739,10 @@ static char *connector_stats(cdata_t *cdata) int64_t memsize; char *buf; + /* If called in passthrough mode we log stats instead of the stratifier */ + if (runtime) + json_set_int(val, "runtime", runtime); + ck_rlock(&cdata->lock); objects = HASH_COUNT(cdata->clients); memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; @@ -773,7 +779,10 @@ static char *connector_stats(cdata_t *cdata) buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); - LOGNOTICE("Connector stats: %s", buf); + if (runtime) + LOGNOTICE("Passthrough:%s", buf); + else + LOGNOTICE("Connector stats: %s", buf); return buf; } @@ -781,13 +790,26 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; + time_t last_stats; int64_t client_id; - char *buf; int ret = 0; + char *buf; LOGWARNING("%s connector ready", ckp->name); + last_stats = cdata->start_time; retry: + if (ckp->passthrough) { + time_t diff = time(NULL); + + if (diff - last_stats >= 60) { + last_stats = diff; + diff -= cdata->start_time; + buf = connector_stats(cdata, diff); + dealloc(buf); + } + } + if (umsg) { Close(umsg->sockd); free(umsg->buf); @@ -837,7 +859,7 @@ retry: char *msg; LOGDEBUG("Connector received stats request"); - msg = connector_stats(cdata); + msg = connector_stats(cdata, 0); send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -982,6 +1004,7 @@ int connector(proc_instance_t *pi) cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + cdata->start_time = time(NULL); create_unix_receiver(pi); From 5365f1a420f498f741933964b214c5620272809c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:29:33 +1000 Subject: [PATCH 44/46] Increase listen backlog to 8k --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index fd03991f..2f4972a4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -940,7 +940,7 @@ int connector(proc_instance_t *pi) } /* Set listen backlog to larger than SOMAXCONN in case the * system configuration supports it */ - if (listen(sockd, 1024) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -982,7 +982,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, 1024) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; From b73f44f00cf9453ee55b22e98c3a61234b4511d6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:50:09 +1000 Subject: [PATCH 45/46] Tidy up and demote oversize message, adding client id --- src/connector.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2f4972a4..f01b0c2c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -368,8 +368,8 @@ retry: if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } @@ -382,7 +382,7 @@ reparse: /* Do something useful with this message now */ buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); + LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); return; } From dcfcae17799fa4ca6606728bcf90cf3c413e67c2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:53:17 +1000 Subject: [PATCH 46/46] Add errno details to write based close --- src/connector.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index f01b0c2c..0bd6fac7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -565,7 +565,8 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende if (unlikely(ret < 1)) { if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return false; - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); + LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", + client->id, client->fd, errno, strerror(errno)); invalidate_client(ckp, cdata, client); return true; }