From d6955d334544151b5250bb889059e4e55a196e29 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 11:38:00 +1100 Subject: [PATCH 01/21] Remove lower limit on suggest-diff since it is usually only set on initial client connect --- src/stratifier.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 52073773..1f7acc5e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2541,10 +2541,7 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t client->suggest_diff = sdiff; if (client->diff == sdiff) return; - if (sdiff < client->diff * 2 / 3) - client->diff = client->diff * 2 / 3; - else - client->diff = sdiff; + client->diff = sdiff; stratum_send_diff(client); } From c0aba3449f685b54854c2265e8fcb334ebf475c5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 12:44:56 +1100 Subject: [PATCH 02/21] Add support for job_id being in the params for get_transactions while maintaining backward support for job_id being embedded in the method. Convert get_txnhashes to using only params for the job_id. --- src/stratifier.c | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1f7acc5e..708ef31e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -173,6 +173,7 @@ static int64_t blockchange_id; static char lasthash[68], lastswaphash[68]; struct json_params { + json_t *method; json_t *params; json_t *id_val; int64_t client_id; @@ -2452,10 +2453,13 @@ static void update_client(stratum_instance_t *client, const int64_t client_id) stratum_send_diff(client); } -static json_params_t *create_json_params(const int64_t client_id, const json_t *params, const json_t *id_val, const char *address) +static json_params_t +*create_json_params(const int64_t client_id, json_t *method, const json_t *params, + const json_t *id_val, const char *address) { json_params_t *jp = ckalloc(sizeof(json_params_t)); + jp->method = json_copy(method); jp->params = json_deep_copy(params); jp->id_val = json_deep_copy(id_val); jp->client_id = client_id; @@ -2606,7 +2610,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method } if (cmdmatch(method, "mining.auth") && client->subscribed) { - json_params_t *jp = create_json_params(client_id, params_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sauthq, jp); return; @@ -2624,7 +2628,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method } if (cmdmatch(method, "mining.submit")) { - json_params_t *jp = create_json_params(client_id, params_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sshareq, jp); return; @@ -2637,7 +2641,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method /* Covers both get_transactions and get_txnhashes */ if (cmdmatch(method, "mining.get")) { - json_params_t *jp = create_json_params(client_id, method_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(stxnq, jp); return; @@ -2748,6 +2752,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) static void discard_json_params(json_params_t **jp) { + json_decref((*jp)->method); json_decref((*jp)->params); json_decref((*jp)->id_val); free(*jp); @@ -2936,7 +2941,8 @@ static json_t *txnhashes_by_jobid(int64_t id) static void send_transactions(ckpool_t *ckp, json_params_t *jp) { - const char *msg = json_string_value(jp->params); + const char *msg = json_string_value(jp->method), + *params = json_string_value(json_array_get(jp->params, 0)); stratum_instance_t *client; json_t *val, *hashes; int64_t job_id = 0; @@ -2953,8 +2959,12 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) /* We don't actually send the transactions as that would use * up huge bandwidth, so we just return the number of - * transactions :) */ - sscanf(msg, "mining.get_transactions(%lx", &job_id); + * transactions :) . Support both forms of encoding the + * request in method name and as a parameter. */ + if (params && strlen(params) > 0) + sscanf(params, "%lx", &job_id); + else + sscanf(msg, "mining.get_transactions(%lx", &job_id); txns = transactions_by_jobid(job_id); if (txns != -1) { json_set_int(val, "result", txns); @@ -2986,7 +2996,11 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out_send; } client->last_txns = now_t; - sscanf(msg, "mining.get_txnhashes(%lx", &job_id); + if (!params || !strlen(params)) { + json_set_string(val, "error", "Invalid params"); + goto out_send; + } + sscanf(params, "%lx", &job_id); hashes = txnhashes_by_jobid(job_id); if (hashes) { json_object_set_new_nocheck(val, "result", hashes); From 242f30ad56cbd4d5a17be0728af8b3852397f75e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 13:49:17 +1100 Subject: [PATCH 03/21] Document maxclients --- README | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README b/README index 01f32aa1..3d8c90ea 100644 --- a/README +++ b/README @@ -256,3 +256,6 @@ and 3334 in proxy mode. maximum. "logdir" : Which directory to store pool and client logs. Default "logs" + +"maxclients" : Optional upper limit on the number of clients ckpool will +accept before rejecting further clients. From 069e52a302a338d6a4ed1c26ac98d4c5d4004a58 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 13:52:01 +1100 Subject: [PATCH 04/21] Add sample configs to distribution --- Makefile.am | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile.am b/Makefile.am index 0a7b9299..329e98e1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,2 +1,3 @@ ACLOCAL_AMFLAGS = -I m4 SUBDIRS = src +EXTRA_DIST = ckpool.conf ckproxy.conf From 9dae34e3fa00740b6e9435bf98022cf8601d1acf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 14:34:32 +1100 Subject: [PATCH 05/21] Make update_base calls asynchronous to not hold up stratum loop messages and ping miners if we get no base to keep them mining --- src/stratifier.c | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 708ef31e..4d3cf5f3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -721,24 +721,39 @@ static void send_generator(ckpool_t *ckp, const char *msg, int prio) gen_priority = 0; } +struct update_req { + pthread_t *pth; + ckpool_t *ckp; + int prio; +}; + +static void broadcast_ping(void); + /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void update_base(ckpool_t *ckp, int prio) +static void *do_update(void *arg) { + struct update_req *ur = (struct update_req *)arg; + ckpool_t *ckp = ur->ckp; bool new_block = false; + int prio = ur->prio; + bool ret = false; workbase_t *wb; json_t *val; char *buf; + pthread_detach(pthread_self()); + rename_proc("updater"); + buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGWARNING("Failed to get base from generator in update_base"); - return; + goto out; } if (unlikely(cmdmatch(buf, "failed"))) { LOGWARNING("Generator returned failure in update_base"); - return; + goto out; } wb = ckzalloc(sizeof(workbase_t)); @@ -783,6 +798,29 @@ static void update_base(ckpool_t *ckp, int prio) add_base(ckp, wb, &new_block); stratum_broadcast_update(new_block); + ret = true; + LOGINFO("Broadcasted updated stratum base"); +out: + /* Send a ping to miners if we fail to get a base to keep them + * connected while bitcoind recovers(?) */ + if (!ret) { + LOGWARNING("Broadcasted ping due to failed stratum base update"); + broadcast_ping(); + } + free(ur->pth); + free(ur); + return NULL; +} + +static void update_base(ckpool_t *ckp, int prio) +{ + struct update_req *ur = ckalloc(sizeof(struct update_req)); + pthread_t *pth = ckalloc(sizeof(pthread_t)); + + ur->pth = pth; + ur->ckp = ckp; + ur->prio = prio; + create_pthread(pth, do_update, ur); } static void drop_allclients(ckpool_t *ckp) From f84c0aa2cdfcf8d21cb0c5840f6886e2b3a1a6c4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 14:51:53 +1100 Subject: [PATCH 06/21] Make number of clients we can connect to unlimited --- src/connector.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2acaf19c..35289338 100644 --- a/src/connector.c +++ b/src/connector.c @@ -293,12 +293,13 @@ void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; client_instance_t *client, *tmp; - struct pollfd fds[65536]; - int ret, nfds, i; + int ret, nfds, i, maxfds = 1; + struct pollfd *fds; bool update; rename_proc("creceiver"); + fds = ckalloc(sizeof(struct pollfd)); /* First fd is reserved for the accepting socket */ fds[0].fd = ci->serverfd; fds[0].events = POLLIN; @@ -314,6 +315,14 @@ rebuild_fds: client->id); continue; } + if (nfds >= maxfds) { + maxfds = nfds + 1; + fds = realloc(fds, sizeof(struct pollfd) * maxfds); + if (unlikely(!fds)) { + LOGEMERG("FATAL: Failed to realloc fds in receiver!"); + goto out; + } + } fds[nfds].fd = client->fd; fds[nfds].events = POLLIN; fds[nfds].revents = 0; @@ -372,6 +381,7 @@ repoll: goto rebuild_fds; goto repoll; out: + free(fds); return NULL; } From c8627c0d1b866b567b52a1540b313d6e4e448842 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 14:58:59 +1100 Subject: [PATCH 07/21] Listen with the maximum backlog when we first start for more seamless restarts and then drop to the minimum afterwards to effectively ratelimit new connections --- src/connector.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 35289338..d79500e6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -294,8 +294,8 @@ void *receiver(void *arg) conn_instance_t *ci = (conn_instance_t *)arg; client_instance_t *client, *tmp; int ret, nfds, i, maxfds = 1; + bool update, maxconn = true; struct pollfd *fds; - bool update; rename_proc("creceiver"); @@ -379,6 +379,14 @@ repoll: if (update) goto rebuild_fds; + else if (unlikely(maxconn)) { + /* When we first start we listen to as many connections as + * possible. Once we stop receiving connections we drop the + * listen to the minimum to effectively ratelimit how fast we + * can receive connections. */ + maxconn = false; + listen(ci->serverfd, 0); + } goto repoll; out: free(fds); @@ -748,7 +756,7 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); - ret = listen(sockd, 10); + ret = listen(sockd, SOMAXCONN); if (ret < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); From ad4e959a80ffb085c41c3c1680cf27067412ff72 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 16:21:52 +1100 Subject: [PATCH 08/21] Clamp suggest diff to pool mindiff directive --- src/stratifier.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 4d3cf5f3..87dcc1b7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2583,7 +2583,10 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t client->suggest_diff = sdiff; if (client->diff == sdiff) return; - client->diff = sdiff; + if (sdiff < client->ckp->mindiff) + client->diff = client->ckp->mindiff; + else + client->diff = sdiff; stratum_send_diff(client); } From 0fa0bbef4d4d5e4ab79bab9c21d18bdf8daaf05c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 17:11:12 +1100 Subject: [PATCH 09/21] Add 7 day stats to pool and users --- src/stratifier.c | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 87dcc1b7..6b0222a0 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -71,6 +71,7 @@ struct pool_stats { double dsps60; double dsps360; double dsps1440; + double dsps10080; }; typedef struct pool_stats pool_stats_t; @@ -227,6 +228,7 @@ struct user_instance { double dsps5; /* ... 5 minute ... */ double dsps60;/* etc */ double dsps1440; + double dsps10080; tv_t last_share; }; @@ -269,6 +271,7 @@ struct stratum_instance { double dsps5; /* ... 5 minute ... */ double dsps60;/* etc */ double dsps1440; + double dsps10080; tv_t ldc; /* Last diff change */ int ssdc; /* Shares since diff change */ tv_t first_share; @@ -1920,6 +1923,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool decay_time(&client->dsps5, diff, tdiff, 300); decay_time(&client->dsps60, diff, tdiff, 3600); decay_time(&client->dsps1440, diff, tdiff, 86400); + decay_time(&client->dsps10080, diff, tdiff, 604800); copy_tv(&client->last_share, &now_t); tdiff = sane_tdiff(&now_t, &worker->last_share); @@ -1934,6 +1938,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool decay_time(&instance->dsps5, diff, tdiff, 300); decay_time(&instance->dsps60, diff, tdiff, 3600); decay_time(&instance->dsps1440, diff, tdiff, 86400); + decay_time(&instance->dsps10080, diff, tdiff, 604800); copy_tv(&instance->last_share, &now_t); client->idle = false; @@ -3141,11 +3146,11 @@ static void *statsupdate(void *arg) sleep(1); while (42) { - double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440; + double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080; double bias, bias5, bias60, bias1440; double tdiff, per_tdiff; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; - char suffix360[16], suffix1440[16]; + char suffix360[16], suffix1440[16], suffix10080[16]; user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; double sps1, sps5, sps15, sps60; @@ -3188,6 +3193,10 @@ static void *statsupdate(void *arg) ghs1440 = stats.dsps1440 * nonces / bias1440; suffix_string(ghs1440, suffix1440, 16, 0); + bias = time_bias(tdiff, 604800); + ghs10080 = stats.dsps10080 * nonces / bias; + suffix_string(ghs10080, suffix10080, 16, 0); + snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); fp = fopen(fname, "we"); if (unlikely(!fp)) @@ -3203,13 +3212,14 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss,ss}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate15m", suffix15, "hashrate1hr", suffix60, "hashrate6hr", suffix360, - "hashrate1d", suffix1440); + "hashrate1d", suffix1440, + "hashrate7d", suffix10080); s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); LOGNOTICE("Pool:%s", s); @@ -3241,6 +3251,7 @@ static void *statsupdate(void *arg) decay_time(&client->dsps5, 0, per_tdiff, 300); decay_time(&client->dsps60, 0, per_tdiff, 3600); decay_time(&client->dsps1440, 0, per_tdiff, 86400); + decay_time(&client->dsps10080, 0, per_tdiff, 604800); if (per_tdiff > 600) client->idle = true; continue; @@ -3298,6 +3309,7 @@ static void *statsupdate(void *arg) decay_time(&instance->dsps5, 0, per_tdiff, 300); decay_time(&instance->dsps60, 0, per_tdiff, 3600); decay_time(&instance->dsps1440, 0, per_tdiff, 86400); + decay_time(&instance->dsps10080, 0, per_tdiff, 604800); idle = true; } ghs = instance->dsps1 * nonces; @@ -3311,12 +3323,15 @@ static void *statsupdate(void *arg) ghs = instance->dsps1440 * nonces / bias1440; suffix_string(ghs, suffix1440, 16, 0); + ghs = instance->dsps10080 * nonces; + suffix_string(ghs, suffix10080, 16, 0); - JSON_CPACK(val, "{ss,ss,ss,ss,si}", + JSON_CPACK(val, "{ss,ss,ss,ss,ss,si}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate1hr", suffix60, "hashrate1d", suffix1440, + "hashrate7d", suffix10080, "workers", instance->workers); snprintf(fname, 511, "%s/users/%s", ckp->logdir, instance->username); @@ -3375,6 +3390,7 @@ static void *statsupdate(void *arg) decay_time(&stats.dsps60, stats.unaccounted_diff_shares, 20, 3600); decay_time(&stats.dsps360, stats.unaccounted_diff_shares, 20, 21600); decay_time(&stats.dsps1440, stats.unaccounted_diff_shares, 20, 86400); + decay_time(&stats.dsps10080, stats.unaccounted_diff_shares, 20, 604800); stats.unaccounted_shares = stats.unaccounted_diff_shares = From f11f957cdae66843977425705e4171ab356d809e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 17:25:51 +1100 Subject: [PATCH 10/21] Read off worker and user stats from saved logs if they exist --- src/stratifier.c | 119 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 6b0222a0..d2a2088c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1603,6 +1603,108 @@ static bool test_address(ckpool_t *ckp, const char *address) return ret; } +static const double nonces = 4294967296; + +static double dsps_from_key(json_t *val, const char *key) +{ + char *string, *endptr; + double ret = 0; + + json_get_string(&string, val, key); + if (!string) + return ret; + ret = strtod(string, &endptr) / nonces; + if (endptr) { + switch (endptr[0]) { + case 'E': + ret *= (double)1000; + case 'P': + ret *= (double)1000; + case 'T': + ret *= (double)1000; + case 'G': + ret *= (double)1000; + case 'M': + ret *= (double)1000; + case 'K': + ret *= (double)1000; + default: + break; + } + } + free(string); + return ret; +} + +static void read_userstats(ckpool_t *ckp, user_instance_t *instance) +{ + char s[512]; + json_t *val; + FILE *fp; + int ret; + + snprintf(s, 511, "%s/users/%s", ckp->logdir, instance->username); + fp = fopen(s, "re"); + if (!fp) { + LOGINFO("User %s does not have a logfile to read", instance->username); + return; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGINFO("Failed to read user %s logfile", instance->username); + return; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGINFO("Failed to json decode user %s logfile: %s", instance->username, s); + return; + } + + instance->dsps1 = dsps_from_key(val, "hashrate1m"); + instance->dsps5 = dsps_from_key(val, "hashrate5m"); + instance->dsps60 = dsps_from_key(val, "hashrate1hr"); + instance->dsps1440 = dsps_from_key(val, "hashrate1d"); + instance->dsps10080 = dsps_from_key(val, "hashrate7d"); + LOGINFO("Successfully read user %s stats", instance->username); + json_decref(val); +} + +static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) +{ + char s[512]; + json_t *val; + FILE *fp; + int ret; + + snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername); + fp = fopen(s, "re"); + if (!fp) { + LOGINFO("Worker %s does not have a logfile to read"); + return; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGINFO("Failed to read worker %s logfile", worker->workername); + return; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s); + return; + } + worker->dsps1 = dsps_from_key(val, "hashrate1m"); + worker->dsps5 = dsps_from_key(val, "hashrate5m"); + worker->dsps60 = dsps_from_key(val, "hashrate1d"); + worker->dsps1440 = dsps_from_key(val, "hashrate1d"); + LOGINFO("Successfully read worker %s stats", worker->workername); + json_decref(val); +} + + /* This simply strips off the first part of the workername and matches it to a * user or creates a new one. */ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, @@ -1631,6 +1733,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, instance->id = user_instance_id++; HASH_ADD_STR(user_instances, username, instance); + read_userstats(ckp, instance); } DL_FOREACH(instance->instances, tmp) { if (!safecmp(workername, tmp->workername)) { @@ -1645,6 +1748,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, client->worker_instance->workername = strdup(workername); client->worker_instance->instance = instance; DL_APPEND(instance->worker_instances, client->worker_instance); + read_workerstats(ckp, client->worker_instance); } DL_APPEND(instance->instances, client); ck_wunlock(&instance_lock); @@ -3059,8 +3163,6 @@ out: discard_json_params(&jp); } -static const double nonces = 4294967296; - /* Called every 20 seconds, we send the updated stats to ckdb of those users * who have gone 10 minutes between updates. This ends up staggering stats to * avoid floods of stat data coming at once. */ @@ -3274,13 +3376,13 @@ static void *statsupdate(void *arg) ghs = worker->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = worker->dsps5 * nonces / bias5; + ghs = worker->dsps5 * nonces; suffix_string(ghs, suffix5, 16, 0); - ghs = worker->dsps60 * nonces / bias60; + ghs = worker->dsps60 * nonces; suffix_string(ghs, suffix60, 16, 0); - ghs = worker->dsps1440 * nonces / bias1440; + ghs = worker->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss}", @@ -3315,14 +3417,15 @@ static void *statsupdate(void *arg) ghs = instance->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = instance->dsps5 * nonces / bias5; + ghs = instance->dsps5 * nonces; suffix_string(ghs, suffix5, 16, 0); - ghs = instance->dsps60 * nonces / bias60; + ghs = instance->dsps60 * nonces; suffix_string(ghs, suffix60, 16, 0); - ghs = instance->dsps1440 * nonces / bias1440; + ghs = instance->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); + ghs = instance->dsps10080 * nonces; suffix_string(ghs, suffix10080, 16, 0); From 2b77a6957c5bc59fb7bf4d2a2fca75d21047069b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 17:37:27 +1100 Subject: [PATCH 11/21] Add output of sps loaded from user and worker stats --- src/stratifier.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index d2a2088c..e09f08b0 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1667,7 +1667,9 @@ static void read_userstats(ckpool_t *ckp, user_instance_t *instance) instance->dsps60 = dsps_from_key(val, "hashrate1hr"); instance->dsps1440 = dsps_from_key(val, "hashrate1d"); instance->dsps10080 = dsps_from_key(val, "hashrate7d"); - LOGINFO("Successfully read user %s stats", instance->username); + LOGINFO("Successfully read user %s stats %f %f %f %f %f", instance->username, + instance->dsps1, instance->dsps5, instance->dsps60, instance->dsps1440, + instance->dsps10080); json_decref(val); } @@ -1700,7 +1702,8 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) worker->dsps5 = dsps_from_key(val, "hashrate5m"); worker->dsps60 = dsps_from_key(val, "hashrate1d"); worker->dsps1440 = dsps_from_key(val, "hashrate1d"); - LOGINFO("Successfully read worker %s stats", worker->workername); + LOGINFO("Successfully read worker %s stats %f %f %f %f", worker->workername, + worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440); json_decref(val); } From 9f8237909f45848dfbb32653071cdec2c9672962 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 30 Oct 2014 20:18:59 +1100 Subject: [PATCH 12/21] Fix apparent dropping of hashrate on restart due to zero last share time --- src/stratifier.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index e09f08b0..69499cc9 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1662,6 +1662,7 @@ static void read_userstats(ckpool_t *ckp, user_instance_t *instance) return; } + tv_time(&instance->last_share); instance->dsps1 = dsps_from_key(val, "hashrate1m"); instance->dsps5 = dsps_from_key(val, "hashrate5m"); instance->dsps60 = dsps_from_key(val, "hashrate1hr"); @@ -1683,7 +1684,7 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername); fp = fopen(s, "re"); if (!fp) { - LOGINFO("Worker %s does not have a logfile to read"); + LOGINFO("Worker %s does not have a logfile to read", worker->workername); return; } memset(s, 0, 512); @@ -1698,6 +1699,8 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s); return; } + + tv_time(&worker->last_share); worker->dsps1 = dsps_from_key(val, "hashrate1m"); worker->dsps5 = dsps_from_key(val, "hashrate5m"); worker->dsps60 = dsps_from_key(val, "hashrate1d"); From 42fc9ca8f6443f160ab97229e0d0f6b92219e7f0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 11:57:39 +1100 Subject: [PATCH 13/21] Rewrite connector receive thread to use epoll and remove associated now unnecessary fd hashtable --- configure.ac | 2 +- src/connector.c | 159 +++++++++++++++++++----------------------------- 2 files changed, 64 insertions(+), 97 deletions(-) diff --git a/configure.ac b/configure.ac index 218a2392..65d311c0 100644 --- a/configure.ac +++ b/configure.ac @@ -37,7 +37,7 @@ AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) -AC_CHECK_HEADERS(libpq-fe.h postgresql/libpq-fe.h grp.h) +AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) PTHREAD_LIBS="-lpthread" MATH_LIBS="-lm" diff --git a/src/connector.c b/src/connector.c index d79500e6..ba338860 100644 --- a/src/connector.c +++ b/src/connector.c @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -38,9 +38,6 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; - - /* For fdclients hashtable */ - UT_hash_handle fdhh; int fd; /* For dead_clients list */ @@ -59,8 +56,6 @@ typedef struct client_instance client_instance_t; /* For the hashtable of all clients */ static client_instance_t *clients; -/* A hashtable of the clients sorted by fd */ -static client_instance_t *fdclients; /* Linked list of dead clients no longer in use but may still have references */ static client_instance_t *dead_clients; @@ -85,12 +80,15 @@ static sender_send_t *delayed_sends; static pthread_mutex_t sender_lock; static pthread_cond_t sender_cond; +static void parse_client_msg(conn_instance_t *ci, client_instance_t *client); + /* Accepts incoming connections on the server socket and generates client * instances */ -static int accept_client(conn_instance_t *ci) +static int accept_client(conn_instance_t *ci, int epfd) { - client_instance_t *client, *old_client; ckpool_t *ckp = ci->pi->ckp; + client_instance_t *client; + struct epoll_event event; int fd, port, no_clients; socklen_t address_len; @@ -98,7 +96,7 @@ static int accept_client(conn_instance_t *ci) no_clients = HASH_COUNT(clients); ck_runlock(&ci->lock); - if (ckp->maxclients && no_clients >= ckp->maxclients) { + if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) { LOGWARNING("Server full with %d clients", no_clients); return 0; } @@ -143,17 +141,26 @@ static int accept_client(conn_instance_t *ci) keep_sockalive(fd); nolinger_socket(fd); - LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); + LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", + ci->nfds, fd, no_clients, client->address_name, port); client->fd = fd; + event.data.ptr = client; + event.events = EPOLLIN; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + LOGERR("Failed to epoll_ctl add in accept_client"); + free(client); + return 0; + } ck_wlock(&ci->lock); client->id = client_id++; HASH_ADD_I64(clients, id, client); - HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); ci->nfds++; ck_wunlock(&ci->lock); + parse_client_msg(ci, client); + return 1; } @@ -166,7 +173,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) if (fd != -1) { Close(client->fd); HASH_DEL(clients, client); - HASH_DELETE(fdhh, fdclients, client); LL_PREPEND(dead_clients, client); } ck_wunlock(&ci->lock); @@ -292,104 +298,65 @@ reparse: void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - client_instance_t *client, *tmp; - int ret, nfds, i, maxfds = 1; - bool update, maxconn = true; - struct pollfd *fds; + struct epoll_event *events, event; + client_instance_t *client; + bool maxconn = true; + int ret, epfd; rename_proc("creceiver"); - fds = ckalloc(sizeof(struct pollfd)); - /* First fd is reserved for the accepting socket */ - fds[0].fd = ci->serverfd; - fds[0].events = POLLIN; - fds[0].revents = 0; -rebuild_fds: - update = false; - nfds = 1; - - ck_rlock(&ci->lock); - HASH_ITER(fdhh, fdclients, client, tmp) { - if (unlikely(client->fd == -1)) { - LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!", - client->id); - continue; - } - if (nfds >= maxfds) { - maxfds = nfds + 1; - fds = realloc(fds, sizeof(struct pollfd) * maxfds); - if (unlikely(!fds)) { - LOGEMERG("FATAL: Failed to realloc fds in receiver!"); - goto out; - } - } - fds[nfds].fd = client->fd; - fds[nfds].events = POLLIN; - fds[nfds].revents = 0; - nfds++; + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0) { + LOGEMERG("FATAL: Failed to create epoll in receiver"); + return NULL; + } + event.data.fd = ci->serverfd; + event.events = EPOLLIN; + events = ckalloc(sizeof(struct epoll_event)); + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event); + if (ret < 0) { + LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); + goto out; } - ck_runlock(&ci->lock); - repoll: while (!ci->accept) cksleep_ms(100); - - ret = poll(fds, nfds, 1000); - if (unlikely(ret < 0)) { - LOGERR("Failed to poll in receiver"); + ret = epoll_wait(epfd, events, 1, 1000); + if (unlikely(ret == -1)) { + LOGEMERG("FATAL: Failed to epoll_wait in receiver"); goto out; } - - for (i = 0; i < nfds && ret > 0; i++) { - int fd, accepted; - - if (!fds[i].revents) - continue; - - /* Reset for the next poll pass */ - fds[i].events = POLLIN; - fds[i].revents = 0; - --ret; - - /* Is this the listening server socket? */ - if (i == 0) { - accepted = accept_client(ci); - if (unlikely(accepted < 0)) - goto out; - if (accepted) - update = true; - continue; + if (unlikely(!ret)) { + if (unlikely(maxconn)) { + /* When we first start we listen to as many connections as + * possible. Once we stop receiving connections we drop the + * listen to the minimum to effectively ratelimit how fast we + * can receive connections. */ + LOGDEBUG("Dropping listen backlog to 0"); + maxconn = false; + listen(ci->serverfd, 0); } - - client = NULL; - fd = fds[i].fd; - - ck_rlock(&ci->lock); - HASH_FIND(fdhh, fdclients, &fd, SOI, client); - ck_runlock(&ci->lock); - - if (!client) { - /* Probably already removed, remove lazily */ - LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable", - i, fd); - update = true; - } else - parse_client_msg(ci, client); + goto repoll; } - - if (update) - goto rebuild_fds; - else if (unlikely(maxconn)) { - /* When we first start we listen to as many connections as - * possible. Once we stop receiving connections we drop the - * listen to the minimum to effectively ratelimit how fast we - * can receive connections. */ - maxconn = false; - listen(ci->serverfd, 0); + if (events->data.fd == ci->serverfd) { + ret = accept_client(ci, epfd); + if (unlikely(ret < 0)) { + LOGEMERG("FATAL: Failed to accept_client in receiver"); + goto out; + } + goto repoll; + } + client = events->data.ptr; + if ((events->events & EPOLLERR) || (events->events & EPOLLHUP)) { + /* Client disconnected */ + LOGDEBUG("Client fd %d HUP in epoll", client->fd); + invalidate_client(ci->pi->ckp, ci, client); + goto repoll; } + parse_client_msg(ci, client); goto repoll; out: - free(fds); + free(events); return NULL; } From 0887d993fd115664dd2bad82db96f3834523dc79 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 12:31:24 +1100 Subject: [PATCH 14/21] Remove parse client msg from accept client --- src/connector.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index ba338860..dc9ad6b5 100644 --- a/src/connector.c +++ b/src/connector.c @@ -159,8 +159,6 @@ static int accept_client(conn_instance_t *ci, int epfd) ci->nfds++; ck_wunlock(&ci->lock); - parse_client_msg(ci, client); - return 1; } From d42185a5eb7a263b35734afdf952e313b4625e19 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 15:47:02 +1100 Subject: [PATCH 15/21] Tidy up receiver loop --- src/connector.c | 79 ++++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/src/connector.c b/src/connector.c index dc9ad6b5..08e95161 100644 --- a/src/connector.c +++ b/src/connector.c @@ -296,8 +296,7 @@ reparse: void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - struct epoll_event *events, event; - client_instance_t *client; + struct epoll_event event; bool maxconn = true; int ret, epfd; @@ -310,51 +309,51 @@ void *receiver(void *arg) } event.data.fd = ci->serverfd; event.events = EPOLLIN; - events = ckalloc(sizeof(struct epoll_event)); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); - goto out; - } -repoll: - while (!ci->accept) - cksleep_ms(100); - ret = epoll_wait(epfd, events, 1, 1000); - if (unlikely(ret == -1)) { - LOGEMERG("FATAL: Failed to epoll_wait in receiver"); - goto out; + return NULL; } - if (unlikely(!ret)) { - if (unlikely(maxconn)) { - /* When we first start we listen to as many connections as - * possible. Once we stop receiving connections we drop the - * listen to the minimum to effectively ratelimit how fast we - * can receive connections. */ - LOGDEBUG("Dropping listen backlog to 0"); - maxconn = false; - listen(ci->serverfd, 0); + + while (42) { + client_instance_t *client; + + while (unlikely(!ci->accept)) + cksleep_ms(100); + ret = epoll_wait(epfd, &event, 1, 1000); + if (unlikely(ret == -1)) { + LOGEMERG("FATAL: Failed to epoll_wait in receiver"); + break; } - goto repoll; - } - if (events->data.fd == ci->serverfd) { - ret = accept_client(ci, epfd); - if (unlikely(ret < 0)) { - LOGEMERG("FATAL: Failed to accept_client in receiver"); - goto out; + if (unlikely(!ret)) { + if (unlikely(maxconn)) { + /* When we first start we listen to as many connections as + * possible. Once we stop receiving connections we drop the + * listen to the minimum to effectively ratelimit how fast we + * can receive connections. */ + LOGDEBUG("Dropping listen backlog to 0"); + maxconn = false; + listen(ci->serverfd, 0); + } + continue; } - goto repoll; - } - client = events->data.ptr; - if ((events->events & EPOLLERR) || (events->events & EPOLLHUP)) { - /* Client disconnected */ - LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(ci->pi->ckp, ci, client); - goto repoll; + if (event.data.fd == ci->serverfd) { + ret = accept_client(ci, epfd); + if (unlikely(ret < 0)) { + LOGEMERG("FATAL: Failed to accept_client in receiver"); + break; + } + continue; + } + client = event.data.ptr; + if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { + /* Client disconnected */ + LOGDEBUG("Client fd %d HUP in epoll", client->fd); + invalidate_client(ci->pi->ckp, ci, client); + continue; + } + parse_client_msg(ci, client); } - parse_client_msg(ci, client); - goto repoll; -out: - free(events); return NULL; } From 1fe5373c87f88912b15ab1c06fd4cdb8d87f71e5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 16:04:53 +1100 Subject: [PATCH 16/21] Handle early exiting of connector receiver and sender threads --- src/connector.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 08e95161..85e4edc8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,6 +30,8 @@ struct connector_instance { int serverfd; int nfds; bool accept; + pthread_t pth_sender; + pthread_t pth_receiver; }; typedef struct connector_instance conn_instance_t; @@ -546,6 +548,17 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) LOGWARNING("%s connector ready", ckp->name); retry: + if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) { + LOGEMERG("Connector sender thread shutdown, exiting"); + ret = 1; + goto out; + } + if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) { + LOGEMERG("Connector receiver thread shutdown, exiting"); + ret = 1; + goto out; + } + Close(sockd); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { @@ -657,7 +670,6 @@ out: int connector(proc_instance_t *pi) { - pthread_t pth_sender, pth_receiver; char *url = NULL, *port = NULL; ckpool_t *ckp = pi->ckp; int sockd, ret = 0; @@ -734,8 +746,8 @@ int connector(proc_instance_t *pi) ci.nfds = 0; mutex_init(&sender_lock); cond_init(&sender_cond); - create_pthread(&pth_sender, sender, &ci); - create_pthread(&pth_receiver, receiver, &ci); + create_pthread(&ci.pth_sender, sender, &ci); + create_pthread(&ci.pth_receiver, receiver, &ci); ret = connector_loop(pi, &ci); out: From 995fce28c1edbe8b3b9048572808bf9c2eb50652 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 16:43:45 +1100 Subject: [PATCH 17/21] Make it possible to create many threads associated with a ckmsgq --- src/ckpool.c | 46 ++++++++++++++++++++++++++++++++++++---------- src/ckpool.h | 5 +++-- src/stratifier.c | 6 +++--- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 8c522d46..e1d1186b 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -115,16 +115,16 @@ static void *ckmsg_queue(void *arg) tv_t now; ts_t abs; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs); + pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); msg = ckmsgq->msgs; if (msg) DL_DELETE(ckmsgq->msgs, msg); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); if (!msg) continue; @@ -141,13 +141,39 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) strncpy(ckmsgq->name, name, 15); ckmsgq->func = func; ckmsgq->ckp = ckp; - mutex_init(&ckmsgq->lock); - cond_init(&ckmsgq->cond); + ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); + ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(ckmsgq->lock); + cond_init(ckmsgq->cond); create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); return ckmsgq; } +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); + pthread_mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(pthread_mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckmsgq[i].name, 15, "%.8s%x", name, i); + ckmsgq[i].func = func; + ckmsgq[i].ckp = ckp; + ckmsgq[i].lock = lock; + ckmsgq[i].cond = cond; + create_pthread(&ckmsgq[i].pth, ckmsg_queue, &ckmsgq[i]); + } + + return ckmsgq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -156,10 +182,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) msg->data = data; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(&ckmsgq->cond); - mutex_unlock(&ckmsgq->lock); + pthread_cond_signal(ckmsgq->cond); + mutex_unlock(ckmsgq->lock); } /* Return whether there are any messages queued in the ckmsgq linked list. */ @@ -167,10 +193,10 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { bool ret = true; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); if (ckmsgq->msgs) ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 9284165d..d246e97f 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -34,8 +34,8 @@ struct ckmsgq { ckpool_t *ckp; char name[16]; pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_mutex_t *lock; + pthread_cond_t *cond; ckmsg_t *msgs; void (*func)(ckpool_t *, void *); }; @@ -181,6 +181,7 @@ struct ckpool_instance { #endif ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); diff --git a/src/stratifier.c b/src/stratifier.c index 69499cc9..c79a30a2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1110,13 +1110,13 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&ssends->lock); + mutex_lock(ssends->lock); if (ssends->msgs) DL_CONCAT(ssends->msgs, bulk_send); else ssends->msgs = bulk_send; - pthread_cond_signal(&ssends->cond); - mutex_unlock(&ssends->lock); + pthread_cond_signal(ssends->cond); + mutex_unlock(ssends->lock); } static void stratum_add_send(json_t *val, int64_t client_id) From f517a593c451046be94b11c8af7c31ef5276075e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 16:47:52 +1100 Subject: [PATCH 18/21] Create half as many share processing threads as there are CPUs --- src/stratifier.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c79a30a2..d77d2def 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3547,7 +3547,7 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; ckpool_t *ckp = pi->ckp; - int ret = 1; + int ret = 1, threads; char *buf; LOGWARNING("%s stratifier starting", ckp->name); @@ -3591,7 +3591,9 @@ int stratifier(proc_instance_t *pi) mutex_init(&ckdb_lock); ssends = create_ckmsgq(ckp, "ssender", &ssend_process); srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); - sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); + /* Create half as many share processing threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); From 329afb56ac137a1a3c06634b76b053677d298fed Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 17:01:23 +1100 Subject: [PATCH 19/21] Cosmetic tidy --- src/connector.c | 2 -- src/stratifier.c | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 85e4edc8..401f70a4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -82,8 +82,6 @@ static sender_send_t *delayed_sends; static pthread_mutex_t sender_lock; static pthread_cond_t sender_cond; -static void parse_client_msg(conn_instance_t *ci, client_instance_t *client); - /* Accepts incoming connections on the server socket and generates client * instances */ static int accept_client(conn_instance_t *ci, int epfd) diff --git a/src/stratifier.c b/src/stratifier.c index d77d2def..d9e6c4f1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -802,12 +802,12 @@ static void *do_update(void *arg) stratum_broadcast_update(new_block); ret = true; - LOGINFO("Broadcasted updated stratum base"); + LOGINFO("Broadcast updated stratum base"); out: /* Send a ping to miners if we fail to get a base to keep them * connected while bitcoind recovers(?) */ if (!ret) { - LOGWARNING("Broadcasted ping due to failed stratum base update"); + LOGWARNING("Broadcast ping due to failed stratum base update"); broadcast_ping(); } free(ur->pth); From d1e08ac9c266f0fccf94be1ad8819d28b115129b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 17:12:06 +1100 Subject: [PATCH 20/21] Add source debugging to close failure --- src/libckpool.c | 5 +++-- src/libckpool.h | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index bb34d79f..8cdd4ef1 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -440,13 +440,14 @@ void block_socket(int fd) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); } -void _Close(int *fd) +void _close(int *fd, const char *file, const char *func, const int line) { if (*fd < 0) return; LOGDEBUG("Closing file handle %d", *fd); if (unlikely(close(*fd))) - LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno)); + LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d", + *fd, errno, strerror(errno), file, func, line); *fd = -1; } diff --git a/src/libckpool.h b/src/libckpool.h index 26f6e670..b01967e5 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -426,8 +426,9 @@ void keep_sockalive(int fd); void nolinger_socket(int fd); void noblock_socket(int fd); void block_socket(int fd); -void _Close(int *fd); -#define Close(FD) _Close(&FD) +void _close(int *fd, const char *file, const char *func, const int line); +#define _Close(FD) _close(FD, __FILE__, __func__, __LINE__) +#define Close(FD) _close(&FD, __FILE__, __func__, __LINE__) int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); From c87ad0b2390f95c9c09fe1460c68a8d3d713b378 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 1 Nov 2014 17:15:37 +1100 Subject: [PATCH 21/21] Do not double close socket sent to get_fd --- src/libckpool.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 8cdd4ef1..dab6799e 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -970,7 +970,6 @@ int _get_fd(int sockd, const char *file, const char *func, const int line) goto out; } out: - Close(sockd); cm = (int *)CMSG_DATA(cmptr); newfd = *cm; free(cmptr);