From 0f9e594dba9f69e1de1f5494e1d6c55d83ce2d3a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 00:10:46 +1100 Subject: [PATCH 01/15] Change dead instance linked list to doubly linked for O(1) removal --- src/stratifier.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 978551e6..38a3fb78 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -873,7 +873,7 @@ static void update_base(ckpool_t *ckp, int prio) static void __add_dead(sdata_t *sdata, stratum_instance_t *client) { LOGDEBUG("Adding dead instance %ld", client->id); - LL_PREPEND(sdata->dead_instances, client); + DL_APPEND(sdata->dead_instances, client); sdata->stats.dead++; sdata->dead_generated++; } @@ -881,7 +881,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client) static void __del_dead(sdata_t *sdata, stratum_instance_t *client) { LOGDEBUG("Deleting dead instance %ld", client->id); - LL_DELETE(sdata->dead_instances, client); + DL_DELETE(sdata->dead_instances, client); sdata->stats.dead--; } @@ -1333,7 +1333,7 @@ static void drop_client(sdata_t *sdata, int64_t id) /* Cull old unused clients lazily when there are no more reference * counts for them. */ - LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { + DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { if (!client->ref) { LOGINFO("Stratifier discarding dead instance %ld", client->id); __del_dead(sdata, client); From 49266178e5a28ba09ab3d3958f54bde425e82341 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 00:16:18 +1100 Subject: [PATCH 02/15] Add a DL_DELETE_INIT to delete items from a DL list and NULL the prev/next pointers --- src/utlist.h | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/utlist.h b/src/utlist.h index 48a8c7d0..b4558aee 100644 --- a/src/utlist.h +++ b/src/utlist.h @@ -572,6 +572,28 @@ do { } \ } while (0) +#define DL_DELETE_INIT(head,del) \ + DL_DELETE3(head,del,prev,next) + +#define DL_DELETE3(head,del,prev,next) \ +do { \ + assert((del)->prev != NULL); \ + if ((del)->prev == (del)) { \ + (head)=NULL; \ + } else if ((del)==(head)) { \ + (del)->next->prev = (del)->prev; \ + (head) = (del)->next; \ + } else { \ + (del)->prev->next = (del)->next; \ + if ((del)->next) { \ + (del)->next->prev = (del)->prev; \ + } else { \ + (head)->prev = (del)->prev; \ + } \ + } \ + (del)->prev = (del)->next = NULL; \ +} while (0) + #define DL_COUNT(head,el,counter) \ DL_COUNT2(head,el,counter,next) \ From 3c3621cc439b34bb7b8153efa4859bfd07be3edf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 00:23:38 +1100 Subject: [PATCH 03/15] Use delete_init on the DL lists used in stratifier for thoroughness --- src/stratifier.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 38a3fb78..bc20a054 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -881,7 +881,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client) static void __del_dead(sdata_t *sdata, stratum_instance_t *client) { LOGDEBUG("Deleting dead instance %ld", client->id); - DL_DELETE(sdata->dead_instances, client); + DL_DELETE_INIT(sdata->dead_instances, client); sdata->stats.dead--; } @@ -1122,7 +1122,7 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta HASH_DEL(sdata->stratum_instances, client); if (instance) - DL_DELETE(instance->instances, client); + DL_DELETE_INIT(instance->instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { @@ -1431,7 +1431,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE(sdata->block_solves, block); + DL_DELETE_INIT(sdata->block_solves, block); break; } dealloc(solvehash); @@ -1478,7 +1478,7 @@ static void block_reject(sdata_t *sdata, const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE(sdata->block_solves, block); + DL_DELETE_INIT(sdata->block_solves, block); break; } dealloc(solvehash); From b23dbab1a1a16ffc73972c8ffa2a6133036f03d0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 00:38:47 +1100 Subject: [PATCH 04/15] Use a doubly linked list for dead clients in the connector for O(1) removal --- src/connector.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index bd884e44..5cd81e22 100644 --- a/src/connector.c +++ b/src/connector.c @@ -36,6 +36,7 @@ struct client_instance { /* For dead_clients list */ struct client_instance *next; + struct client_instance *prev; struct sockaddr address; char address_name[INET6_ADDRSTRLEN]; @@ -211,7 +212,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { Close(client->fd); HASH_DEL(cdata->clients, client); - LL_PREPEND(cdata->dead_clients, client); + DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the * epoll list. */ __dec_instance_ref(client); @@ -249,9 +250,9 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c /* Cull old unused clients lazily when there are no more reference * counts for them. */ ck_wlock(&cdata->lock); - LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { + DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { if (!client->ref) { - LL_DELETE(cdata->dead_clients, client); + DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector discarding client %ld", client->id); dealloc(client); } From 94f6236d3c13ce623522039f37d5da2a13e56338 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 11:56:29 +1100 Subject: [PATCH 05/15] Optimise json malloc strategy to minimise memory allocations and avoid unnecessary strdups --- src/jansson-2.6/src/dump.c | 13 ++++--------- src/jansson-2.6/src/jansson_private.h | 3 ++- src/jansson-2.6/src/memory.c | 25 ++++++++++++------------- src/jansson-2.6/src/strbuffer.c | 5 +++-- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/jansson-2.6/src/dump.c b/src/jansson-2.6/src/dump.c index 776f294d..f1c3bc26 100644 --- a/src/jansson-2.6/src/dump.c +++ b/src/jansson-2.6/src/dump.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2009-2013 Petri Lehtinen + * Copyright (c) 2015 Con Kolivas * * Jansson is free software; you can redistribute it and/or modify * it under the terms of the MIT license. See LICENSE for details. @@ -427,13 +428,6 @@ static int do_dump(const json_t *json, size_t flags, int depth, } } -char *json_dump_dup(const char *str, size_t flags) -{ - if (flags & JSON_EOL) - return jsonp_eolstrdup(str); - return jsonp_strdup(str); -} - char *json_dumps(const json_t *json, size_t flags) { strbuffer_t strbuff; @@ -444,10 +438,11 @@ char *json_dumps(const json_t *json, size_t flags) if(json_dump_callback(json, dump_to_strbuffer, (void *)&strbuff, flags)) result = NULL; + else if (flags & JSON_EOL) + result = jsonp_eolstrsteal(&strbuff); else - result = json_dump_dup(strbuffer_value(&strbuff), flags); + result = jsonp_strsteal(&strbuff); - strbuffer_close(&strbuff); return result; } diff --git a/src/jansson-2.6/src/jansson_private.h b/src/jansson-2.6/src/jansson_private.h index 9de9c1eb..103f25bc 100644 --- a/src/jansson-2.6/src/jansson_private.h +++ b/src/jansson-2.6/src/jansson_private.h @@ -86,7 +86,8 @@ void _jsonp_free(void **ptr); char *jsonp_strndup(const char *str, size_t length); char *jsonp_strdup(const char *str); -char *jsonp_eolstrdup(const char *str); +char *jsonp_strsteal(strbuffer_t *strbuff); +char *jsonp_eolstrsteal(strbuffer_t *strbuff); /* Windows compatibility */ #ifdef _WIN32 diff --git a/src/jansson-2.6/src/memory.c b/src/jansson-2.6/src/memory.c index 5a00bafc..89c4671d 100644 --- a/src/jansson-2.6/src/memory.c +++ b/src/jansson-2.6/src/memory.c @@ -51,23 +51,22 @@ char *jsonp_strdup(const char *str) return new_str; } -char *jsonp_eolstrdup(const char *str) +char *jsonp_strsteal(strbuffer_t *strbuff) { - char *new_str; - size_t len; + size_t len = strbuff->length + 1; + char *ret = realloc(strbuff->value, len); - len = strlen(str); - if(len == (size_t)-1) - return NULL; + return ret; +} - new_str = jsonp_malloc(len + 2); - if(!new_str) - return NULL; +char *jsonp_eolstrsteal(strbuffer_t *strbuff) +{ + size_t len = strbuff->length + 2; + char *ret = realloc(strbuff->value, len); - memcpy(new_str, str, len); - new_str[len] = '\n'; - new_str[len + 1] = '\0'; - return new_str; + ret[strbuff->length] = '\n'; + ret[strbuff->length + 1] = '\0'; + return ret; } void json_set_alloc_funcs(json_malloc_t malloc_fn, json_free_t free_fn) diff --git a/src/jansson-2.6/src/strbuffer.c b/src/jansson-2.6/src/strbuffer.c index 86be2b38..fa26565f 100644 --- a/src/jansson-2.6/src/strbuffer.c +++ b/src/jansson-2.6/src/strbuffer.c @@ -16,7 +16,7 @@ #include "jansson_private.h" #include "strbuffer.h" -#define STRBUFFER_MIN_SIZE 16 +#define STRBUFFER_MIN_SIZE 4096 #define STRBUFFER_FACTOR 2 #define STRBUFFER_SIZE_MAX ((size_t)-1) @@ -74,7 +74,8 @@ int strbuffer_append_byte(strbuffer_t *strbuff, char byte) int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size) { - if(size >= strbuff->size - strbuff->length) + /* Leave room for EOL and NULL bytes */ + if(size + 2 > strbuff->size - strbuff->length) { int backoff = 1; size_t new_size; From e64db6bb405ba069bfe0e7d952471f69c512a522 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 12:41:06 +1100 Subject: [PATCH 06/15] Fix memleak on each reject reason message --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index bc20a054..59c889f3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2857,7 +2857,7 @@ out_unlock: json_set_double(val, "sdiff", sdiff); json_set_string(val, "hash", hexhash); json_set_bool(val, "result", result); - json_object_set(val, "reject-reason", json_object_dup(json_msg, "reject-reason")); + json_object_set(val, "reject-reason", json_object_get(json_msg, "reject-reason")); json_object_set(val, "error", *err_val); json_set_int(val, "errn", err); json_set_string(val, "createdate", cdfield); From d6c57c081754976a752f79374cefea947d397cc2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 13:00:13 +1100 Subject: [PATCH 07/15] Do not check for thread shutdown on every pass through the connector --- src/connector.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/connector.c b/src/connector.c index 5cd81e22..6eb9aa05 100644 --- a/src/connector.c +++ b/src/connector.c @@ -633,6 +633,7 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) int64_t client_id64, client_id; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; + uint8_t test_cycle = 0; char *buf = NULL; do { @@ -647,15 +648,18 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) LOGWARNING("%s connector ready", ckp->name); retry: - if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { - LOGEMERG("Connector sender thread shutdown, exiting"); - ret = 1; - goto out; - } - if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { - LOGEMERG("Connector receiver thread shutdown, exiting"); - ret = 1; - goto out; + if (!++test_cycle) { + /* Test for pthread join every 256 messages */ + if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { + LOGEMERG("Connector sender thread shutdown, exiting"); + ret = 1; + goto out; + } + if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { + LOGEMERG("Connector receiver thread shutdown, exiting"); + ret = 1; + goto out; + } } Close(sockd); From 3bbce633027d667b4ab71b0ddc3a864a0d06e74a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 13:28:33 +1100 Subject: [PATCH 08/15] Generate connector stats on command --- src/ckpool.c | 8 +++-- src/ckpool.h | 2 ++ src/connector.c | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ src/stratifier.c | 2 -- 4 files changed, 84 insertions(+), 5 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 82623475..c2ca2348 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -276,7 +276,7 @@ static void *listener(void *arg) proc_instance_t *pi = (proc_instance_t *)arg; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; - char *buf = NULL; + char *buf = NULL, *msg; int sockd; rename_proc(pi->sockname); @@ -351,11 +351,13 @@ retry: execv(ckp->initial_args[0], (char *const *)ckp->initial_args); } } else if (cmdmatch(buf, "stratifierstats")) { - char *msg; - LOGDEBUG("Listener received stratifierstats request"); msg = send_recv_proc(ckp->stratifier, "stats"); send_unix_msg(sockd, msg); + } else if (cmdmatch(buf, "connectorstats")) { + LOGDEBUG("Listener received connectorstats request"); + msg = send_recv_proc(ckp->connector, "stats"); + send_unix_msg(sockd, msg); } else { LOGINFO("Listener received unhandled message: %s", buf); send_unix_msg(sockd, "unknown"); diff --git a/src/ckpool.h b/src/ckpool.h index 27116b5c..891d21ef 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -190,6 +190,8 @@ struct ckpool_instance { #define CKP_STANDALONE(CKP) (true) #endif +#define SAFE_HASH_OVERHEAD(HASHLIST) (HASHLIST ? HASH_OVERHEAD(hh, HASHLIST) : 0) + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); diff --git a/src/connector.c b/src/connector.c index 6eb9aa05..be846e60 100644 --- a/src/connector.c +++ b/src/connector.c @@ -83,12 +83,18 @@ struct connector_data { /* Linked list of dead clients no longer in use but may still have references */ client_instance_t *dead_clients; + int clients_generated; + int dead_generated; + int64_t client_id; /* For the linked list of pending sends */ sender_send_t *sender_sends; sender_send_t *delayed_sends; + int64_t sends_generated; + int64_t sends_delayed; + /* For protecting the pending sends list */ pthread_mutex_t sender_lock; pthread_cond_t sender_cond; @@ -194,6 +200,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) __inc_instance_ref(client); ck_wlock(&cdata->lock); + cdata->clients_generated++; client->id = cdata->client_id++; HASH_ADD_I64(cdata->clients, id, client); cdata->nfds++; @@ -216,6 +223,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) /* This is the reference to this client's presence in the * epoll list. */ __dec_instance_ref(client); + cdata->dead_generated++; } ck_wunlock(&cdata->lock); @@ -495,6 +503,7 @@ void *sender(void *arg) * This is the only function that alters it so no * locking is required. Keep the client ref. */ DL_APPEND(cdata->delayed_sends, sender_send); + cdata->sends_delayed++; continue; } sent = true; @@ -568,6 +577,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) sender_send->len = len; mutex_lock(&cdata->sender_lock); + cdata->sends_generated++; DL_APPEND(cdata->sender_sends, sender_send); pthread_cond_signal(&cdata->sender_cond); mutex_unlock(&cdata->sender_lock); @@ -627,6 +637,67 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_decref(json_msg); } +static char *connector_stats(cdata_t *cdata) +{ + json_t *val = json_object(), *subval; + client_instance_t *client; + int objects, generated; + sender_send_t *send; + int64_t memsize; + char *buf; + + ck_rlock(&cdata->lock); + objects = HASH_COUNT(cdata->clients); + memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; + generated = cdata->clients_generated; + ck_runlock(&cdata->lock); + + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + json_set_object(val, "clients", subval); + + ck_rlock(&cdata->lock); + DL_COUNT(cdata->dead_clients, client, objects); + generated = cdata->dead_generated; + ck_runlock(&cdata->lock); + + memsize = objects * sizeof(client_instance_t); + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + json_set_object(val, "dead", subval); + + objects = 0; + memsize = 0; + + mutex_lock(&cdata->sender_lock); + generated = cdata->sends_generated; + DL_FOREACH(cdata->sender_sends, send) { + objects++; + memsize += sizeof(sender_send_t) + send->len + 1; + } + mutex_unlock(&cdata->sender_lock); + + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + json_set_object(val, "sends", subval); + + objects = 0; + memsize = 0; + + mutex_lock(&cdata->sender_lock); + generated = cdata->sends_delayed; + DL_FOREACH(cdata->delayed_sends, send) { + objects++; + memsize += sizeof(sender_send_t) + send->len + 1; + } + mutex_unlock(&cdata->sender_lock); + + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + json_set_object(val, "delays", subval); + + buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); + json_decref(val); + LOGNOTICE("Connector stats: %s", buf); + return buf; +} + static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { int sockd = -1, ret = 0, selret; @@ -709,6 +780,12 @@ retry: } else if (cmdmatch(buf, "reject")) { LOGDEBUG("Connector received reject signal"); cdata->accept = false; + } else if (cmdmatch(buf, "stats")) { + char *msg; + + LOGDEBUG("Connector received stats request"); + msg = connector_stats(cdata); + send_unix_msg(sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "shutdown")) { diff --git a/src/stratifier.c b/src/stratifier.c index 59c889f3..6c67651e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1512,8 +1512,6 @@ static void broadcast_ping(sdata_t *sdata) stratum_broadcast(sdata, json_msg); } -#define SAFE_HASH_OVERHEAD(HASHLIST) (HASHLIST ? HASH_OVERHEAD(hh, HASHLIST) : 0) - static void ckmsgq_stats(ckmsgq_t *ckmsgq, int size, json_t **val) { int objects, generated; From b40b47efb45ac6004c742d11fda80bf4e93eec1d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 25 Jan 2015 14:34:11 +1100 Subject: [PATCH 09/15] Age shares whose workbase id matches the one being aged --- src/stratifier.c | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 6c67651e..0603bc68 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -524,6 +524,7 @@ static void clear_workbase(workbase_t *wb) free(wb); } +/* Remove all shares with a workbase id less than wb_id for block changes */ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id) { share_t *share, *tmp; @@ -543,6 +544,26 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id) LOGINFO("Cleared %d shares from share hashtable", purged); } +/* Remove all shares with a workbase id == wb_id being discarded */ +static void age_share_hashtable(sdata_t *sdata, int64_t wb_id) +{ + share_t *share, *tmp; + int aged = 0; + + ck_wlock(&sdata->share_lock); + HASH_ITER(hh, sdata->shares, share, tmp) { + if (share->workbase_id == wb_id) { + HASH_DEL(sdata->shares, share); + dealloc(share); + aged++; + } + } + ck_wunlock(&sdata->share_lock); + + if (aged) + LOGINFO("Aged %d shares from share hashtable", aged); +} + static char *status_chars = "|/-\\"; /* Absorbs the json and generates a ckdb json message, logs it to the ckdb @@ -712,6 +733,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) * to prevent taking recursive locks */ if (aged) { send_ageworkinfo(ckp, aged->id); + age_share_hashtable(sdata, aged->id); clear_workbase(aged); } } From 61576761ba230d04b7ca5e8e2d672c86cb05e69c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 10:54:27 +1100 Subject: [PATCH 10/15] Avoid taking recursive locks in new_enonce1 in proxy mode --- src/stratifier.c | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 0603bc68..95ff63ce 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1761,7 +1761,8 @@ static void *blockupdate(void *arg) return NULL; } -static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1) +/* Enter holding instance_lock */ +static bool __enonce1_free(sdata_t *sdata, uint64_t enonce1) { stratum_instance_t *client, *tmp; bool ret = true; @@ -1771,14 +1772,12 @@ static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1) goto out; } - ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->enonce1_64 == enonce1) { ret = false; break; } } - ck_runlock(&sdata->instance_lock); out: return ret; } @@ -1801,13 +1800,19 @@ static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client) static bool new_enonce1(stratum_instance_t *client) { sdata_t *sdata = client->ckp->data; + int enonce1varlen, i; bool ret = false; - workbase_t *wb; - int i; - ck_wlock(&sdata->workbase_lock); - wb = sdata->current_workbase; - switch(wb->enonce1varlen) { + /* Extract the enonce1varlen from the current workbase which may be + * a different workbase to when we __fill_enonce1data but the value + * will not change and this avoids grabbing recursive locks */ + ck_rlock(&sdata->workbase_lock); + enonce1varlen = sdata->current_workbase->enonce1varlen; + ck_runlock(&sdata->workbase_lock); + + /* instance_lock protects sdata->enonce1u */ + ck_wlock(&sdata->instance_lock); + switch(enonce1varlen) { case 8: sdata->enonce1u.u64++; ret = true; @@ -1823,7 +1828,7 @@ static bool new_enonce1(stratum_instance_t *client) case 2: for (i = 0; i < 65536; i++) { sdata->enonce1u.u16++; - ret = enonce1_free(sdata, sdata->enonce1u.u64); + ret = __enonce1_free(sdata, sdata->enonce1u.u64); if (ret) break; } @@ -1831,18 +1836,21 @@ static bool new_enonce1(stratum_instance_t *client) case 1: for (i = 0; i < 256; i++) { sdata->enonce1u.u8++; - ret = enonce1_free(sdata, sdata->enonce1u.u64); + ret = __enonce1_free(sdata, sdata->enonce1u.u64); if (ret) break; } break; default: - quit(0, "Invalid enonce1varlen %d", wb->enonce1varlen); + quit(0, "Invalid enonce1varlen %d", enonce1varlen); } if (ret) client->enonce1_64 = sdata->enonce1u.u64; - __fill_enonce1data(wb, client); - ck_wunlock(&sdata->workbase_lock); + ck_wunlock(&sdata->instance_lock); + + ck_rlock(&sdata->workbase_lock); + __fill_enonce1data(sdata->current_workbase, client); + ck_runlock(&sdata->workbase_lock); if (unlikely(!ret)) LOGWARNING("Enonce1 space exhausted! Proxy rejecting clients"); From f8e644e95938135e0424f980e8dd222ffe584134 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 11:11:21 +1100 Subject: [PATCH 11/15] Optimise new_share for the common case where shares are new, minimising time under wlock --- src/stratifier.c | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 95ff63ce..ad42e193 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2652,24 +2652,26 @@ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const return ret; } +/* Optimised for the common case where shares are new */ static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id) { - share_t *share, *match = NULL; - bool ret = false; + share_t *share = ckzalloc(sizeof(share_t)), *match = NULL; + bool ret = true; - ck_wlock(&sdata->share_lock); - HASH_FIND(hh, sdata->shares, hash, 32, match); - if (match) - goto out_unlock; - share = ckzalloc(sizeof(share_t)); memcpy(share->hash, hash, 32); share->workbase_id = wb_id; + + ck_wlock(&sdata->share_lock); sdata->shares_generated++; - HASH_ADD(hh, sdata->shares, hash, 32, share); - ret = true; -out_unlock: + HASH_FIND(hh, sdata->shares, hash, 32, match); + if (likely(!match)) + HASH_ADD(hh, sdata->shares, hash, 32, share); ck_wunlock(&sdata->share_lock); + if (unlikely(match)) { + dealloc(share); + ret = false; + } return ret; } From aa0086079f10db1227abac029533d30d3601f27d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 11:15:57 +1100 Subject: [PATCH 12/15] Use HASH_ITER in update_diff for clarity --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index ad42e193..65849de0 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1058,7 +1058,7 @@ static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client); static void update_diff(ckpool_t *ckp) { - stratum_instance_t *client; + stratum_instance_t *client, *tmp; sdata_t *sdata = ckp->data; double old_diff, diff; json_t *val; @@ -1097,7 +1097,7 @@ static void update_diff(ckpool_t *ckp) /* If the diff has dropped, iterate over all the clients and check * they're at or below the new diff, and update it if not. */ ck_rlock(&sdata->instance_lock); - for (client = sdata->stratum_instances; client != NULL; client = client->hh.next) { + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->diff > diff) { client->diff = diff; stratum_send_diff(sdata, client); From 404e43ebe487bae27d627b02db4070fd7c57ecfd Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 12:56:04 +1100 Subject: [PATCH 13/15] Deauth client when asked to drop it --- src/stratifier.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stratifier.c b/src/stratifier.c index 65849de0..7ec8fa97 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1330,6 +1330,7 @@ static void drop_client(sdata_t *sdata, int64_t id) if (client) { instance = client->user_instance; if (client->authorised) { + client->authorised = false; dec = true; ckp = client->ckp; } From 7b450f16b6826e1ed6bd7ea7f2bc460c12aa163f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 13:10:29 +1100 Subject: [PATCH 14/15] Add client reference from moment we receive a message --- src/stratifier.c | 61 ++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 7ec8fa97..05c6ccd0 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3126,24 +3126,18 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t stratum_send_diff(sdata, client); } -static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val, - json_t *method_val, json_t *params_val, char *address) +/* Enter with client holding ref count */ +static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, + json_t *id_val, json_t *method_val, json_t *params_val, char *address) { - stratum_instance_t *client; const char *method; char buf[256]; - client = ref_instance_by_id(sdata, client_id); - if (unlikely(!client)) { - LOGINFO("Failed to find client id %ld in hashtable!", client_id); - return; - } - if (unlikely(client->reject == 2)) { LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); - snprintf(buf, 255, "dropclient=%ld", client->id); + snprintf(buf, 255, "dropclient=%ld", client_id); send_proc(client->ckp->connector, buf); - goto out; + return; } /* Random broken clients send something not an integer as the id so we copy @@ -3155,7 +3149,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val /* Shouldn't happen, sanity check */ if (unlikely(!result_val)) { LOGWARNING("parse_subscribe returned NULL result_val"); - goto out; + return; } val = json_object(); json_object_set_new_nocheck(val, "result", result_val); @@ -3164,10 +3158,11 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val stratum_add_send(sdata, val, client_id); if (likely(client->subscribed)) update_client(sdata, client, client_id); - goto out; + return; } if (unlikely(cmdmatch(method, "mining.passthrough"))) { + LOGNOTICE("Adding passthrough client %ld", client_id); /* We need to inform the connector process that this client * is a passthrough and to manage its messages accordingly. * Remove this instance since the client id may well be @@ -3178,17 +3173,16 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val __add_dead(sdata, client); ck_wunlock(&sdata->instance_lock); - LOGNOTICE("Adding passthrough client %ld", client->id); - snprintf(buf, 255, "passthrough=%ld", client->id); + snprintf(buf, 255, "passthrough=%ld", client_id); send_proc(client->ckp->connector, buf); - goto out; + return; } if (cmdmatch(method, "mining.auth") && client->subscribed) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->sauthq, jp); - goto out; + return; } /* We should only accept authorised requests from here on */ @@ -3196,22 +3190,22 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val /* Dropping unauthorised clients here also allows the * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ - LOGINFO("Dropping unauthorised client %ld", client->id); - snprintf(buf, 255, "dropclient=%ld", client->id); + LOGINFO("Dropping unauthorised client %ld", client_id); + snprintf(buf, 255, "dropclient=%ld", client_id); send_proc(client->ckp->connector, buf); - goto out; + return; } if (cmdmatch(method, "mining.submit")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->sshareq, jp); - goto out; + return; } if (cmdmatch(method, "mining.suggest")) { suggest_diff(client, method, params_val); - goto out; + return; } /* Covers both get_transactions and get_txnhashes */ @@ -3219,14 +3213,15 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->stxnq, jp); - goto out; + return; } /* Unhandled message here */ -out: - dec_instance_ref(sdata, client); + LOGINFO("Unhandled client %ld method %s", client_id, method); + return; } -static void parse_instance_msg(sdata_t *sdata, smsg_t *msg) +/* Entered with client holding ref count */ +static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client) { json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; @@ -3257,7 +3252,7 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg) send_json_err(sdata, client_id, id_val, "-1:params not found"); goto out; } - parse_method(sdata, client_id, id_val, method, params, msg->address); + parse_method(sdata, client, client_id, id_val, method, params, msg->address); out: json_decref(val); free(msg); @@ -3266,6 +3261,7 @@ out: static void srecv_process(ckpool_t *ckp, char *buf) { sdata_t *sdata = ckp->data; + stratum_instance_t *client; smsg_t *msg; json_t *val; int server; @@ -3310,12 +3306,15 @@ static void srecv_process(ckpool_t *ckp, char *buf) /* Parse the message here */ ck_wlock(&sdata->instance_lock); - /* client_id instance doesn't exist yet, create one */ - if (!__instance_by_id(sdata, msg->client_id)) - __stratum_add_instance(ckp, msg->client_id, server); + client = __instance_by_id(sdata, msg->client_id); + /* If client_id instance doesn't exist yet, create one */ + if (unlikely(!client)) + client = __stratum_add_instance(ckp, msg->client_id, server); + __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); - parse_instance_msg(sdata, msg); + parse_instance_msg(sdata, msg, client); + dec_instance_ref(sdata, client); out: free(buf); } From a102827cf92c6089ade21f5347ea763c1e994b2c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 26 Jan 2015 13:17:57 +1100 Subject: [PATCH 15/15] Sanity test --- src/stratifier.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 05c6ccd0..28b1cb53 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -713,6 +713,8 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) HASH_ITER(hh, sdata->workbases, tmp, tmpa) { if (HASH_COUNT(sdata->workbases) < 3) break; + if (wb == tmp) + continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->workbases, tmp);