From 55ff1389eeb996715346a837ddf51a6140788a1e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 2 Oct 2014 20:30:16 +1000 Subject: [PATCH 01/35] Make ckpool built without ckdb support imply standalone and remove ckdb options when build support is disabled --- README | 3 +++ configure.ac | 1 + src/ckpool.c | 19 ++++++++++++++++++- src/ckpool.h | 6 ++++++ src/stratifier.c | 8 ++++---- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/README b/README index 17f428a7..15d47c46 100644 --- a/README +++ b/README @@ -138,6 +138,7 @@ ckpool supports the following options: -A Standalone mode tells ckpool not to try to communicate with ckdb or log any ckdb requests in the rotating ckdb logs it would otherwise store. All users are automatically accepted without any attempt to authorise users in any way. +This option is explicitly enabled when built without ckdb support. -c tells ckpool to override its default configuration filename and load the specified one. If -c is not specified, ckpool looks for ckpool.conf @@ -145,6 +146,7 @@ whereas in proxy or passthrough modes it will look for ckproxy.conf -d tells ckpool what the name of the ckdb process is that it should speak to, otherwise it will look for ckdb. +This option does not exist when built without ckdb support. -g will start ckpool as the group ID specified. @@ -179,6 +181,7 @@ it to scale to large hashrates. Standalone mode is Optional. -S tells ckpool which directory to look for the ckdb socket to talk to. +This option does not exist when built without ckdb support. -s tells ckpool which directory to place its own communication sockets (/tmp by default) diff --git a/configure.ac b/configure.ac index 3f646444..218a2392 100644 --- a/configure.ac +++ b/configure.ac @@ -59,6 +59,7 @@ AC_ARG_WITH([ckdb], if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) + AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required]) PQ_LIBS="-lpq" else PQ_LIBS="" diff --git a/src/ckpool.c b/src/ckpool.c index 6c2b02ec..dee10716 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1058,6 +1058,7 @@ static void *watchdog(void *arg) return NULL; } +#ifdef USE_CKDB static struct option long_options[] = { {"standalone", no_argument, 0, 'A'}, {"config", required_argument, 0, 'c'}, @@ -1075,6 +1076,22 @@ static struct option long_options[] = { {"sockdir", required_argument, 0, 's'}, {0, 0, 0, 0} }; +#else +static struct option long_options[] = { + {"config", required_argument, 0, 'c'}, + {"group", required_argument, 0, 'g'}, + {"handover", no_argument, 0, 'H'}, + {"help", no_argument, 0, 'h'}, + {"killold", no_argument, 0, 'k'}, + {"log-shares", no_argument, 0, 'L'}, + {"loglevel", required_argument, 0, 'l'}, + {"name", required_argument, 0, 'n'}, + {"passthrough", no_argument, 0, 'P'}, + {"proxy", no_argument, 0, 'p'}, + {"sockdir", required_argument, 0, 's'}, + {0, 0, 0, 0} +}; +#endif int main(int argc, char **argv) { @@ -1193,7 +1210,7 @@ int main(int argc, char **argv) } trail_slash(&ckp.socket_dir); - if (!ckp.standalone) { + if (!CKP_STANDALONE(&ckp)) { if (!ckp.ckdb_name) ckp.ckdb_name = "ckdb"; if (!ckp.ckdb_sockdir) { diff --git a/src/ckpool.h b/src/ckpool.h index e7715d47..a29798d3 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -171,6 +171,12 @@ struct ckpool_instance { server_instance_t *btcdbackup; }; +#ifdef USE_CKDB +#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) +#else +#define CKP_STANDALONE(CKP) (true) +#endif + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); diff --git a/src/stratifier.c b/src/stratifier.c index f473c2a6..55b84346 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -539,7 +539,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char fflush(stdout); } - if (ckp->standalone) + if (CKP_STANDALONE(ckp)) return json_decref(val); json_msg = ckdb_msg(ckp, val, idtype); @@ -1683,7 +1683,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, user_instance->username); client->workername = strdup(buf); - if (client->ckp->standalone) + if (CKP_STANDALONE(client->ckp)) ret = true; else { *errnum = send_recv_auth(client); @@ -2203,7 +2203,7 @@ out_unlock: json_set_int(val, "workinfoid", id); json_set_int(val, "clientid", client->id); json_set_string(val, "enonce1", client->enonce1); - if (!ckp->standalone) + if (!CKP_STANDALONE(ckp)) json_set_string(val, "secondaryuserid", user_instance->secondaryuserid); json_set_string(val, "nonce2", nonce2); json_set_string(val, "nonce", nonce); @@ -3197,7 +3197,7 @@ int stratifier(proc_instance_t *pi) sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); - if (!ckp->standalone) + if (!CKP_STANDALONE(ckp)) create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); cklock_init(&workbase_lock); From 857462e16a882724594280692fd2c82575990938 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 2 Oct 2014 23:20:06 +1000 Subject: [PATCH 02/35] Simplify the complicated sequential locking to pure choice of write or read in the stratifer --- src/stratifier.c | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 55b84346..b33f7ab9 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1103,7 +1103,7 @@ static void drop_client(int64_t id) LOGINFO("Stratifier dropping client %ld", id); - ck_ilock(&instance_lock); + ck_wlock(&instance_lock); client = __instance_by_id(id); if (client) { stratum_instance_t *old_client = NULL; @@ -1113,15 +1113,13 @@ static void drop_client(int64_t id) client->authorised = false; } - ck_ulock(&instance_lock); HASH_DEL(stratum_instances, client); HASH_FIND(hh, disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64) HASH_ADD(hh, disconnected_instances, enonce1_64, sizeof(uint64_t), client); - ck_dwilock(&instance_lock); } - ck_uilock(&instance_lock); + ck_wunlock(&instance_lock); if (dec) dec_worker(client->user_instance); @@ -2004,19 +2002,17 @@ static bool new_share(const uchar *hash, int64_t wb_id) share_t *share, *match = NULL; bool ret = false; - ck_ilock(&share_lock); + ck_wlock(&share_lock); HASH_FIND(hh, shares, hash, 32, match); if (match) goto out_unlock; share = ckzalloc(sizeof(share_t)); memcpy(share->hash, hash, 32); share->workbase_id = wb_id; - ck_ulock(&share_lock); HASH_ADD(hh, shares, hash, 32, share); - ck_dwilock(&share_lock); ret = true; out_unlock: - ck_uilock(&share_lock); + ck_wunlock(&share_lock); return ret; } @@ -2543,15 +2539,13 @@ static void srecv_process(ckpool_t *ckp, smsg_t *msg) json_object_clear(val); /* Parse the message here */ - ck_ilock(&instance_lock); + ck_wlock(&instance_lock); instance = __instance_by_id(msg->client_id); if (!instance) { /* client_id instance doesn't exist yet, create one */ - ck_ulock(&instance_lock); instance = __stratum_add_instance(ckp, msg->client_id); - ck_dwilock(&instance_lock); } - ck_uilock(&instance_lock); + ck_wunlock(&instance_lock); parse_instance_msg(msg); From 255bab88fe2f11a0eddef8af760a5dca338c9c1e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 00:07:43 +1000 Subject: [PATCH 03/35] Add a maximum diff option --- README | 3 +++ ckpool.conf | 1 + ckproxy.conf | 1 + src/ckpool.c | 1 + src/ckpool.h | 1 + src/stratifier.c | 2 ++ 6 files changed, 9 insertions(+) diff --git a/README b/README index 15d47c46..01f32aa1 100644 --- a/README +++ b/README @@ -252,4 +252,7 @@ and 3334 in proxy mode. "startdiff" : Starting diff that new clients are given. Default 42 +"maxdiff" : Optional maximum diff that vardiff will clamp to where zero is no +maximum. + "logdir" : Which directory to store pool and client logs. Default "logs" diff --git a/ckpool.conf b/ckpool.conf index 62424d30..340556bd 100644 --- a/ckpool.conf +++ b/ckpool.conf @@ -20,6 +20,7 @@ "serverurl" : "ckpool.org:3333", "mindiff" : 1, "startdiff" : 42, +"maxdiff" : 0, "logdir" : "logs" } Comments from here on are ignored. diff --git a/ckproxy.conf b/ckproxy.conf index b4e3f2bf..6754198b 100644 --- a/ckproxy.conf +++ b/ckproxy.conf @@ -15,6 +15,7 @@ "serverurl" : "192.168.1.100:3334", "mindiff" : 1, "startdiff" : 42, +"maxdiff" : 0, "logdir" : "logs" } Comments from here on are ignored. diff --git a/src/ckpool.c b/src/ckpool.c index dee10716..25dab571 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -983,6 +983,7 @@ static void parse_config(ckpool_t *ckp) json_get_string(&ckp->serverurl, json_conf, "serverurl"); json_get_int64(&ckp->mindiff, json_conf, "mindiff"); json_get_int64(&ckp->startdiff, json_conf, "startdiff"); + json_get_int64(&ckp->maxdiff, json_conf, "maxdiff"); json_get_string(&ckp->logdir, json_conf, "logdir"); arr_val = json_object_get(json_conf, "proxy"); if (arr_val && json_is_array(arr_val)) { diff --git a/src/ckpool.h b/src/ckpool.h index a29798d3..9e3311cb 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -150,6 +150,7 @@ struct ckpool_instance { /* Difficulty settings */ int64_t mindiff; // Default 1 int64_t startdiff; // Default 42 + int64_t maxdiff; // No default /* Coinbase data */ char *btcaddress; // Address to mine to diff --git a/src/stratifier.c b/src/stratifier.c index b33f7ab9..264352f9 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1838,6 +1838,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool optimal = client->suggest_diff; } else if (optimal < worker->mindiff) optimal = worker->mindiff; + if (ckp->maxdiff && optimal > ckp->maxdiff) + optimal = ckp->maxdiff; if (optimal > network_diff) optimal = network_diff; if (client->diff == optimal) From 17d14552673813ddce60445389fbc85aa719567e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 00:52:09 +1000 Subject: [PATCH 04/35] Add more info to disconnects --- src/connector.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 9548f599..4994f0a7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -204,7 +204,8 @@ retry: /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ - LOGINFO("Client fd %d disconnected", client->fd); + LOGINFO("Client fd %d disconnected with ret %d errno %d: %s", client->fd, + ret, errno, strerror(errno)); invalidate_client(ckp, ci, client); return; } From a493539a752f982e87708fa9b63eba9e324a80bf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 00:54:21 +1000 Subject: [PATCH 05/35] Only add strerro if errno != 0 --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 4994f0a7..b282189f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -204,8 +204,8 @@ retry: /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ - LOGINFO("Client fd %d disconnected with ret %d errno %d: %s", client->fd, - ret, errno, strerror(errno)); + LOGINFO("Client fd %d disconnected with ret %d errno %d %s", client->fd, + ret, errno, errno ? strerror(errno) : ""); invalidate_client(ckp, ci, client); return; } From 2d6d982bb1b08c2d2dbed3bf97813dde390765b1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 01:01:31 +1000 Subject: [PATCH 06/35] Add buf offset to disconnect message and reset flags on looping --- src/connector.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index b282189f..bc9e2232 100644 --- a/src/connector.c +++ b/src/connector.c @@ -186,8 +186,8 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { ckpool_t *ckp = ci->pi->ckp; - int buflen, ret, flags = 0; char msg[PAGESIZE], *eol; + int buflen, ret, flags; bool moredata = false; json_t *val; @@ -195,6 +195,8 @@ retry: buflen = PAGESIZE - client->bufofs; if (moredata) flags = MSG_DONTWAIT; + else + flags = 0; ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); if (ret < 1) { /* Nothing else ready to be read */ @@ -204,8 +206,8 @@ retry: /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ - LOGINFO("Client fd %d disconnected with ret %d errno %d %s", client->fd, - ret, errno, errno ? strerror(errno) : ""); + LOGINFO("Client fd %d disconnected with bufofs %d ret %d errno %d %s", + client->fd, client->bufofs, ret, errno, errno ? strerror(errno) : ""); invalidate_client(ckp, ci, client); return; } From 0f7daecac4b812f3b5dfb829bce1146f231b60b0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 08:38:22 +1000 Subject: [PATCH 07/35] Stats on 1st reject were being missed --- src/stratifier.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 264352f9..c4019032 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1748,6 +1748,14 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool int64_t next_blockid, optimal; tv_t now_t; + mutex_lock(&stats_lock); + if (valid) { + stats.unaccounted_shares++; + stats.unaccounted_diff_shares += diff; + } else + stats.unaccounted_rejects += diff; + mutex_unlock(&stats_lock); + /* Ignore successive rejects in count if they haven't submitted a valid * share yet. */ if (unlikely(!client->ssdc && !valid)) @@ -1795,14 +1803,6 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool bias = time_bias(bdiff, 300); tdiff = sane_tdiff(&now_t, &client->ldc); - mutex_lock(&stats_lock); - if (valid) { - stats.unaccounted_shares++; - stats.unaccounted_diff_shares += diff; - } else - stats.unaccounted_rejects += diff; - mutex_unlock(&stats_lock); - /* Check the difficulty every 240 seconds or as many shares as we * should have had in that time, whichever comes first. */ if (client->ssdc < 72 && tdiff < 240) From 600e6923fc6d38054cc3aa4a4018a2292a49f929 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 08:45:39 +1000 Subject: [PATCH 08/35] Count only accepted and stale rejects in diff calculation --- src/stratifier.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c4019032..e4cdf79c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1756,9 +1756,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool stats.unaccounted_rejects += diff; mutex_unlock(&stats_lock); - /* Ignore successive rejects in count if they haven't submitted a valid - * share yet. */ - if (unlikely(!client->ssdc && !valid)) + /* Count only accepted and stale rejects in diff calculation. */ + if (!valid) return; tv_time(&now_t); @@ -2153,7 +2152,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, } invalid = false; out_submit: - if (wb->proxy && sdiff >= wdiff) + if (sdiff >= wdiff) submit = true; out_unlock: ck_runlock(&workbase_lock); @@ -2189,12 +2188,12 @@ out_unlock: /* Submit share to upstream pool in proxy mode. We submit valid and * stale shares and filter out the rest. */ - if (submit) { + if (wb->proxy && submit) { LOGINFO("Submitting share upstream: %s", hexhash); submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); } - add_submit(ckp, client, diff, result); + add_submit(ckp, client, diff, submit); /* Now write to the pool's sharelog. */ val = json_object(); From 53641a9c242d6b26b4527042dc4c68af42587aa3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 09:04:15 +1000 Subject: [PATCH 09/35] Prevent wb dereference in parse_submit --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e4cdf79c..bd2252e2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2050,9 +2050,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, char *fname = NULL, *s, *nonce2; enum share_err err = SE_NONE; ckpool_t *ckp = client->ckp; + workbase_t *wb = NULL; char idstring[20]; uint32_t ntime32; - workbase_t *wb; uchar hash[32]; int64_t id; json_t *val; @@ -2188,7 +2188,7 @@ out_unlock: /* Submit share to upstream pool in proxy mode. We submit valid and * stale shares and filter out the rest. */ - if (wb->proxy && submit) { + if (wb && wb->proxy && submit) { LOGINFO("Submitting share upstream: %s", hexhash); submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); } From 70393442a364ecad325ca2024530074bb6d4f0be Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 09:10:05 +1000 Subject: [PATCH 10/35] Zero bufofs in connector loop for completeness and make sure to retry if there is more data to read --- src/connector.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index bc9e2232..4090a567 100644 --- a/src/connector.c +++ b/src/connector.c @@ -237,9 +237,10 @@ reparse: return; } memcpy(msg, client->buf, buflen); - msg[buflen] = 0; + msg[buflen] = '\0'; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); + client->buf[client->bufofs] = '\0'; if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); @@ -270,6 +271,8 @@ reparse: if (client->bufofs) goto reparse; + if (moredata) + goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and From 1e175d851c6b87e12142462b453531cf632a050d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 09:57:27 +1000 Subject: [PATCH 11/35] Missing valid but not submitted count for proxy --- src/stratifier.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index bd2252e2..2a5090b1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1740,7 +1740,8 @@ static double sane_tdiff(tv_t *end, tv_t *start) return tdiff; } -static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid) +static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid, + bool submit) { worker_instance_t *worker = client->worker_instance; double tdiff, bdiff, dsps, drr, network_diff, bias; @@ -1757,7 +1758,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool mutex_unlock(&stats_lock); /* Count only accepted and stale rejects in diff calculation. */ - if (!valid) + if (!valid && !submit) return; tv_time(&now_t); @@ -2193,7 +2194,7 @@ out_unlock: submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); } - add_submit(ckp, client, diff, submit); + add_submit(ckp, client, diff, result, submit); /* Now write to the pool's sharelog. */ val = json_object(); From e3b4d969d5d4092bb30d852ccaa1427403c0e26b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 10:31:52 +1000 Subject: [PATCH 12/35] Always re-read with dontwait when parsing a client message --- src/connector.c | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/connector.c b/src/connector.c index 4090a567..9439ec0d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -186,17 +186,12 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { ckpool_t *ckp = ci->pi->ckp; + int buflen, ret, flags = 0; char msg[PAGESIZE], *eol; - int buflen, ret, flags; - bool moredata = false; json_t *val; retry: buflen = PAGESIZE - client->bufofs; - if (moredata) - flags = MSG_DONTWAIT; - else - flags = 0; ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); if (ret < 1) { /* Nothing else ready to be read */ @@ -212,10 +207,7 @@ retry: return; } client->bufofs += ret; - if (client->bufofs == PAGESIZE) - moredata = true; - else - moredata = false; + flags = MSG_DONTWAIT; reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) { @@ -224,9 +216,7 @@ reparse: invalidate_client(ckp, ci, client); return; } - if (moredata) - goto retry; - return; + goto retry; } /* Do something useful with this message now */ @@ -271,8 +261,7 @@ reparse: if (client->bufofs) goto reparse; - if (moredata) - goto retry; + goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and From ae1f9827acbad33400ebb7285df3de49e8f87338 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 10:34:16 +1000 Subject: [PATCH 13/35] Revert "Always re-read with dontwait when parsing a client message" This reverts commit e3b4d969d5d4092bb30d852ccaa1427403c0e26b. --- src/connector.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 9439ec0d..4090a567 100644 --- a/src/connector.c +++ b/src/connector.c @@ -186,12 +186,17 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { ckpool_t *ckp = ci->pi->ckp; - int buflen, ret, flags = 0; char msg[PAGESIZE], *eol; + int buflen, ret, flags; + bool moredata = false; json_t *val; retry: buflen = PAGESIZE - client->bufofs; + if (moredata) + flags = MSG_DONTWAIT; + else + flags = 0; ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); if (ret < 1) { /* Nothing else ready to be read */ @@ -207,7 +212,10 @@ retry: return; } client->bufofs += ret; - flags = MSG_DONTWAIT; + if (client->bufofs == PAGESIZE) + moredata = true; + else + moredata = false; reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) { @@ -216,7 +224,9 @@ reparse: invalidate_client(ckp, ci, client); return; } - goto retry; + if (moredata) + goto retry; + return; } /* Do something useful with this message now */ @@ -261,7 +271,8 @@ reparse: if (client->bufofs) goto reparse; - goto retry; + if (moredata) + goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and From 48b2e72661e25ed2b43718c0d9c266ed95dcd5ef Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 10:36:33 +1000 Subject: [PATCH 14/35] Rereading with dontwait is pointless and adds complexity --- src/connector.c | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/connector.c b/src/connector.c index 4090a567..13933dd0 100644 --- a/src/connector.c +++ b/src/connector.c @@ -187,22 +187,12 @@ static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { ckpool_t *ckp = ci->pi->ckp; char msg[PAGESIZE], *eol; - int buflen, ret, flags; - bool moredata = false; + int buflen, ret; json_t *val; -retry: buflen = PAGESIZE - client->bufofs; - if (moredata) - flags = MSG_DONTWAIT; - else - flags = 0; - ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); + ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); if (ret < 1) { - /* Nothing else ready to be read */ - if (!ret && flags) - return; - /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ @@ -212,10 +202,6 @@ retry: return; } client->bufofs += ret; - if (client->bufofs == PAGESIZE) - moredata = true; - else - moredata = false; reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) { @@ -224,8 +210,6 @@ reparse: invalidate_client(ckp, ci, client); return; } - if (moredata) - goto retry; return; } @@ -271,8 +255,6 @@ reparse: if (client->bufofs) goto reparse; - if (moredata) - goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and From f6f61e88702697a3e6a457f0c445472efc1f8b85 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 11:55:16 +1000 Subject: [PATCH 15/35] Close polled fds that are ready for reads but no longer have a client associated with them --- src/connector.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 13933dd0..8ed1634f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -308,8 +308,9 @@ retry: if (!client) { /* Probably already removed */ - LOGDEBUG("Failed to find client with polled fd %d in hashtable", - fd); + LOGINFO("Failed to find client with polled fd %d in hashtable, closing", + fd); + close(fd); } else parse_client_msg(ci, client); From f0f948171abd5d8cc5242d349a29500d969895a3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 12:04:58 +1000 Subject: [PATCH 16/35] Invalidate interrupted clients and increase verbosity of messages logged --- src/connector.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 8ed1634f..16513097 100644 --- a/src/connector.c +++ b/src/connector.c @@ -387,7 +387,8 @@ void *sender(void *arg) ret = wait_write_select(fd, 0); if (ret < 1) { if (ret < 0) { - LOGDEBUG("Discarding message sent to interrupted client"); + LOGINFO("Client id %d fd %d interrupted", client->id, fd); + invalidate_client(ckp, ci, client); free(sender_send->buf); free(sender_send); continue; @@ -404,7 +405,7 @@ void *sender(void *arg) while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { - LOGINFO("Client id %d disconnected", client->id); + LOGINFO("Client id %d fd %d disconnected", client->id, fd); invalidate_client(ckp, ci, client); break; } From 6ba4cf50a5a058891b83b31ef391be87d51a35b1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 12:15:11 +1000 Subject: [PATCH 17/35] There is no need to bias 60 second times, and we should bias worker stats --- src/stratifier.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 2a5090b1..8ad02163 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2892,10 +2892,9 @@ static void *statsupdate(void *arg) timersub(&now, &stats.start_time, &diff); tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000; - bias = time_bias(tdiff, 60); - ghs1 = stats.dsps1 * nonces / bias; + ghs1 = stats.dsps1 * nonces; suffix_string(ghs1, suffix1, 16, 0); - sps1 = stats.sps1 / bias; + sps1 = stats.sps1; bias = time_bias(tdiff, 300); ghs5 = stats.dsps5 * nonces / bias; @@ -3028,11 +3027,17 @@ static void *statsupdate(void *arg) } ghs = instance->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = instance->dsps5 * nonces; + + bias = time_bias(tdiff, 300); + ghs = instance->dsps5 * nonces / bias; suffix_string(ghs, suffix5, 16, 0); - ghs = instance->dsps60 * nonces; + + bias = time_bias(tdiff, 3600); + ghs = instance->dsps60 * nonces / bias; suffix_string(ghs, suffix60, 16, 0); - ghs = instance->dsps1440 * nonces; + + bias = time_bias(tdiff, 86400); + ghs = instance->dsps1440 * nonces / bias; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss,si}", From 6acb482e7aae125c2bb128103b87a249e08b412a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 12:23:34 +1000 Subject: [PATCH 18/35] Bias workers as well as users --- src/stratifier.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 8ad02163..e40935cc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2991,11 +2991,17 @@ static void *statsupdate(void *arg) } ghs = worker->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = worker->dsps5 * nonces; + + bias = time_bias(tdiff, 300); + ghs = worker->dsps5 * nonces / bias; suffix_string(ghs, suffix5, 16, 0); - ghs = worker->dsps60 * nonces; + + bias = time_bias(tdiff, 3600); + ghs = worker->dsps60 * nonces / bias; suffix_string(ghs, suffix60, 16, 0); - ghs = worker->dsps1440 * nonces; + + bias = time_bias(tdiff, 86400); + ghs = worker->dsps1440 * nonces / bias; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss}", From 8d625b736b7d6d65363a749a7220602da1c711bb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 12:45:52 +1000 Subject: [PATCH 19/35] Revert "Close polled fds that are ready for reads but no longer have a client associated with them" This reverts commit f6f61e88702697a3e6a457f0c445472efc1f8b85. --- src/connector.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 16513097..671c21f4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -308,9 +308,8 @@ retry: if (!client) { /* Probably already removed */ - LOGINFO("Failed to find client with polled fd %d in hashtable, closing", - fd); - close(fd); + LOGDEBUG("Failed to find client with polled fd %d in hashtable", + fd); } else parse_client_msg(ci, client); From 86f72ff8896d03f05c392a41f6a72469e2c9901e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 14:10:26 +1000 Subject: [PATCH 20/35] Keep rereading in parse_client_msg if the socket indicates it is still ready for further reads --- src/connector.c | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 671c21f4..71b85fce 100644 --- a/src/connector.c +++ b/src/connector.c @@ -185,19 +185,33 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { + int buflen, ret, selfail = 0; ckpool_t *ckp = ci->pi->ckp; char msg[PAGESIZE], *eol; - int buflen, ret; json_t *val; +retry: + /* Select should always return positive after poll unless we have + * been disconnected. On retries, decide whether we should do further + * reads based on select readiness and only fail if we get an error. */ + ret = wait_read_select(client->fd, 0); + if (ret < 1) { + if (ret > selfail) + return; + LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", + client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + invalidate_client(ckp, ci, client); + return; + } + selfail = -1; buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); if (ret < 1) { /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ - LOGINFO("Client fd %d disconnected with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, errno ? strerror(errno) : ""); + LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, ci, client); return; } @@ -210,7 +224,7 @@ reparse: invalidate_client(ckp, ci, client); return; } - return; + goto retry; } /* Do something useful with this message now */ @@ -255,6 +269,7 @@ reparse: if (client->bufofs) goto reparse; + goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and From 070a1a3b3eddc076dffac81f3470d9268cb78d11 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 15:28:49 +1000 Subject: [PATCH 21/35] Cache reused bias calculations --- src/stratifier.c | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e40935cc..332d0e62 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2874,8 +2874,8 @@ static void *statsupdate(void *arg) sleep(1); while (42) { + double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias, bias5, bias60, bias1440; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; - double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias; char suffix360[16], suffix1440[16]; user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; @@ -2896,27 +2896,27 @@ static void *statsupdate(void *arg) suffix_string(ghs1, suffix1, 16, 0); sps1 = stats.sps1; - bias = time_bias(tdiff, 300); - ghs5 = stats.dsps5 * nonces / bias; + bias5 = time_bias(tdiff, 300); + ghs5 = stats.dsps5 * nonces / bias5; suffix_string(ghs5, suffix5, 16, 0); - sps5 = stats.sps5 / bias; + sps5 = stats.sps5 / bias5; bias = time_bias(tdiff, 900); ghs15 = stats.dsps15 * nonces / bias; suffix_string(ghs15, suffix15, 16, 0); sps15 = stats.sps15 / bias; - bias = time_bias(tdiff, 3600); - ghs60 = stats.dsps60 * nonces / bias; + bias60 = time_bias(tdiff, 3600); + ghs60 = stats.dsps60 * nonces / bias60; suffix_string(ghs60, suffix60, 16, 0); - sps60 = stats.sps60 / bias; + sps60 = stats.sps60 / bias60; bias = time_bias(tdiff, 21600); ghs360 = stats.dsps360 * nonces / bias; suffix_string(ghs360, suffix360, 16, 0); - bias = time_bias(tdiff, 86400); - ghs1440 = stats.dsps1440 * nonces / bias; + bias1440 = time_bias(tdiff, 86400); + ghs1440 = stats.dsps1440 * nonces / bias1440; suffix_string(ghs1440, suffix1440, 16, 0); snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); @@ -2992,16 +2992,13 @@ static void *statsupdate(void *arg) ghs = worker->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - bias = time_bias(tdiff, 300); - ghs = worker->dsps5 * nonces / bias; + ghs = worker->dsps5 * nonces / bias5; suffix_string(ghs, suffix5, 16, 0); - bias = time_bias(tdiff, 3600); - ghs = worker->dsps60 * nonces / bias; + ghs = worker->dsps60 * nonces / bias60; suffix_string(ghs, suffix60, 16, 0); - bias = time_bias(tdiff, 86400); - ghs = worker->dsps1440 * nonces / bias; + ghs = worker->dsps1440 * nonces / bias1440; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss}", @@ -3034,16 +3031,13 @@ static void *statsupdate(void *arg) ghs = instance->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - bias = time_bias(tdiff, 300); - ghs = instance->dsps5 * nonces / bias; + ghs = instance->dsps5 * nonces / bias5; suffix_string(ghs, suffix5, 16, 0); - bias = time_bias(tdiff, 3600); - ghs = instance->dsps60 * nonces / bias; + ghs = instance->dsps60 * nonces / bias60; suffix_string(ghs, suffix60, 16, 0); - bias = time_bias(tdiff, 86400); - ghs = instance->dsps1440 * nonces / bias; + ghs = instance->dsps1440 * nonces / bias1440; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss,si}", From bae2493346a0153b0a8753c8d87b3b987ed2d1d9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:01:11 +1000 Subject: [PATCH 22/35] Add a helper for close which invalidates the file handle as well --- src/libckpool.c | 33 ++++++++++++++++++--------------- src/libckpool.h | 5 ++++- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 3c749880..49232894 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -432,6 +432,15 @@ void block_socket(int fd) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); } +void _Close(int *fd) +{ + if (*fd < 0) + return; + LOGDEBUG("Closing file handle %d", *fd); + close(*fd); + *fd = -1; +} + int bind_socket(char *url, char *port) { struct addrinfo servinfobase, *servinfo, hints, *p; @@ -460,8 +469,7 @@ int bind_socket(char *url, char *port) ret = bind(sockd, p->ai_addr, p->ai_addrlen); if (ret < 0) { LOGWARNING("Failed to bind socket for %s:%s", url, port); - close(sockd); - sockd = -1; + Close(sockd); goto out; } @@ -500,7 +508,7 @@ int connect_socket(char *url, char *port) int selret; if (!sock_connecting()) { - close(sockd); + Close(sockd); LOGDEBUG("Failed sock connect"); continue; } @@ -517,8 +525,7 @@ int connect_socket(char *url, char *port) break; } } - close(sockd); - sockd = -1; + Close(sockd); LOGDEBUG("Select timeout/failed connect"); continue; } @@ -575,13 +582,10 @@ void empty_socket(int fd) } while (ret > 0); } -void close_unix_socket(const int sockd, const char *server_path) +void _close_unix_socket(int *sockd, const char *server_path) { - int ret; - - ret = close(sockd); - if (unlikely(ret < 0)) - LOGERR("Failed to close sock %d %s", sockd, server_path); + LOGDEBUG("Closing unix socket %d %s", *sockd, server_path); + _Close(sockd); } int _open_unix_server(const char *server_path, const char *file, const char *func, const int line) @@ -681,8 +685,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun ret = connect(sockd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); if (unlikely(ret < 0)) { LOGERR("Failed to bind to socket in open_unix_client"); - close(sockd); - sockd = -1; + Close(sockd); goto out; } @@ -951,7 +954,7 @@ int _get_fd(int sockd, const char *file, const char *func, const int line) goto out; } out: - close(sockd); + Close(sockd); cm = (int *)CMSG_DATA(cmptr); newfd = *cm; free(cmptr); @@ -1032,7 +1035,7 @@ bool rotating_log(const char *path, const char *msg) } fp = fdopen(fd, "ae"); if (unlikely(!fp)) { - close(fd); + Close(fd); LOGERR("Failed to fdopen %s in rotating_log!", filename); goto stageleft; } diff --git a/src/libckpool.h b/src/libckpool.h index c56f4c24..ce1c8b3a 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -415,11 +415,14 @@ bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); void keep_sockalive(int fd); void noblock_socket(int fd); void block_socket(int fd); +void _Close(int *fd); +#define Close(FD) _Close(&FD) int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); void empty_socket(int fd); -void close_unix_socket(const int sockd, const char *server_path); +void _close_unix_socket(int *sockd, const char *server_path); +#define close_unix_socket(sockd, server_path) _close_unix_socket(&sockd, server_path) int _open_unix_server(const char *server_path, const char *file, const char *func, const int line); #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); From e683fa42b636560f23e65d9cd1c47d3f655edd57 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:07:19 +1000 Subject: [PATCH 23/35] Use the Close handler in ckpool.c --- src/ckpool.c | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 25dab571..b350291d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -280,10 +280,10 @@ retry: if (newfd > 0) { LOGDEBUG("Sending new fd %d", newfd); send_fd(newfd, sockd); - close(newfd); + Close(newfd); } else LOGWARNING("Failed to get_fd"); - close(connfd); + Close(connfd); } else LOGWARNING("Failed to send_procmsg to connector"); } else if (cmdmatch(buf, "restart")) { @@ -300,7 +300,7 @@ retry: LOGINFO("Listener received unhandled message: %s", buf); send_unix_msg(sockd, "unknown"); } - close(sockd); + Close(sockd); goto retry; out: dealloc(buf); @@ -386,10 +386,7 @@ int read_socket_line(connsock_t *cs, int timeout) out: if (ret < 0) { dealloc(cs->buf); - if (cs->fd > 0) { - close(cs->fd); - cs->fd = -1; - } + Close(cs->fd); } return ret; } @@ -447,7 +444,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - close(sockd); + Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); @@ -484,7 +481,7 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGWARNING("Failed to send %s to socket %s", msg, path); else buf = recv_unix_msg(sockd); - close(sockd); + Close(sockd); out: if (unlikely(!buf)) LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line); @@ -515,7 +512,7 @@ char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, co LOGWARNING("Failed to send %s to ckdb", msg); else buf = recv_unix_msg(sockd); - close(sockd); + Close(sockd); out: if (unlikely(!buf)) LOGERR("Failure in send_recv_ckdb from %s %s:%d", file, func, line); @@ -608,7 +605,7 @@ out_empty: /* Assume that a failed request means the socket will be closed * and reopen it */ LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); - close(cs->fd); + Close(cs->fd); cs->fd = connect_socket(cs->url, cs->port); } out: @@ -1313,12 +1310,12 @@ int main(int argc, char **argv) if (sockd > 0 && send_unix_msg(sockd, "getfd")) { ckp.oldconnfd = get_fd(sockd); - close(sockd); + Close(sockd); sockd = open_unix_client(ckp.main.us.path); send_unix_msg(sockd, "shutdown"); if (ckp.oldconnfd > 0) LOGWARNING("Inherited old socket with new file descriptor %d!", ckp.oldconnfd); - close(sockd); + Close(sockd); } } From 7da6c0b2a07f9438e74cee5fcc6c8fd5b19e1617 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:09:38 +1000 Subject: [PATCH 24/35] Use the Close handler in connector.c --- src/connector.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 71b85fce..e6bf60d7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -123,7 +123,7 @@ retry: default: LOGWARNING("Unknown INET type for client %d on socket %d", ci->nfds, fd); - close(fd); + Close(fd); free(client); goto retry; } @@ -153,11 +153,10 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) ck_wlock(&ci->lock); fd = client->fd; if (fd != -1) { - close(fd); + Close(client->fd); HASH_DEL(clients, client); HASH_DELETE(fdhh, fdclients, client); LL_PREPEND(dead_clients, client); - client->fd = -1; } ck_wunlock(&ci->lock); @@ -521,7 +520,7 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) LOGWARNING("%s connector ready", ckp->name); retry: - close(sockd); + Close(sockd); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on connector socket, exiting"); @@ -625,7 +624,7 @@ retry: goto retry; out: - close(sockd); + Close(sockd); dealloc(buf); return ret; } @@ -688,7 +687,7 @@ int connector(proc_instance_t *pi) } while (++tries < 25); if (ret < 0) { LOGERR("Connector failed to bind to socket for 2 minutes"); - close(sockd); + Close(sockd); goto out; } } @@ -698,7 +697,7 @@ int connector(proc_instance_t *pi) ret = listen(sockd, 10); if (ret < 0) { LOGERR("Connector failed to listen on socket"); - close(sockd); + Close(sockd); goto out; } From 5a9eb7648893def743fb4db92cbe922ec4b0b758 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:14:36 +1000 Subject: [PATCH 25/35] Use the Close handler in the generator --- src/generator.c | 61 ++++++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/src/generator.c b/src/generator.c index 748d6f05..79a30681 100644 --- a/src/generator.c +++ b/src/generator.c @@ -177,8 +177,7 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) out: if (!ret) { /* Close and invalidate the file handle */ - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } else keep_sockalive(cs->fd); return ret; @@ -225,8 +224,7 @@ static void kill_server(server_instance_t *si) LOGNOTICE("Killing server"); cs = &si->cs; - close(cs->fd); - cs->fd = -1; + Close(cs->fd); dealloc(cs->url); dealloc(cs->port); dealloc(cs->auth); @@ -290,7 +288,7 @@ retry: buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in gen_loop"); - close(sockd); + Close(sockd); goto retry; } LOGDEBUG("Generator received request: %s", buf); @@ -365,7 +363,7 @@ retry: LOGDEBUG("Generator received ping request"); send_unix_msg(sockd, "pong"); } - close(sockd); + Close(sockd); goto retry; out: @@ -627,10 +625,8 @@ retry: goto retry; out: - if (!ret) { - close(cs->fd); - cs->fd = -1; - } + if (!ret) + Close(cs->fd); return ret; } @@ -670,7 +666,7 @@ out: if (val) json_decref(val); if (!ret) - close(cs->fd); + Close(cs->fd); return ret; } @@ -934,7 +930,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) json_decref(req); if (!ret) { LOGWARNING("Failed to send message in auth_stratum"); - close(cs->fd); + Close(cs->fd); goto out; } @@ -979,7 +975,7 @@ out: return ret; } -static void send_subscribe(proxy_instance_t *proxi, int sockd) +static void send_subscribe(proxy_instance_t *proxi, int *sockd) { json_t *json_msg; char *msg; @@ -988,12 +984,12 @@ static void send_subscribe(proxy_instance_t *proxi, int sockd) "nonce2len", proxi->nonce2len); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } -static void send_notify(proxy_instance_t *proxi, int sockd) +static void send_notify(proxy_instance_t *proxi, int *sockd) { json_t *json_msg, *merkle_arr; notify_instance_t *ni; @@ -1017,12 +1013,12 @@ static void send_notify(proxy_instance_t *proxi, int sockd) msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } -static void send_diff(proxy_instance_t *proxi, int sockd) +static void send_diff(proxy_instance_t *proxi, int *sockd) { json_t *json_msg; char *msg; @@ -1030,9 +1026,9 @@ static void send_diff(proxy_instance_t *proxi, int sockd) JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } static void submit_share(proxy_instance_t *proxi, json_t *val) @@ -1242,8 +1238,7 @@ static void *proxy_send(void *arg) free(msg); if (!ret && cs->fd > 0) { LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect"); - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } } return NULL; @@ -1350,8 +1345,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * out: if (!ret) { /* Close and invalidate the file handle */ - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } else keep_sockalive(cs->fd); return ret; @@ -1421,8 +1415,7 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) LOGNOTICE("Killing proxy"); cs = proxi->cs; - close(cs->fd); - cs->fd = -1; + Close(cs->fd); /* All our notify data is invalid if we reconnect so discard them */ mutex_lock(&proxi->notify_lock); @@ -1495,7 +1488,7 @@ retry: buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in proxy_loop"); - close(sockd); + Close(sockd); goto retry; } LOGDEBUG("Proxy received request: %s", buf); @@ -1503,11 +1496,11 @@ retry: ret = 0; goto out; } else if (cmdmatch(buf, "getsubscribe")) { - send_subscribe(proxi, sockd); + send_subscribe(proxi, &sockd); } else if (cmdmatch(buf, "getnotify")) { - send_notify(proxi, sockd); + send_notify(proxi, &sockd); } else if (cmdmatch(buf, "getdiff")) { - send_diff(proxi, sockd); + send_diff(proxi, &sockd); } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { @@ -1536,11 +1529,11 @@ retry: else submit_share(proxi, val); } - close(sockd); + Close(sockd); goto retry; out: kill_proxy(ckp, proxi); - close(sockd); + Close(sockd); return ret; } @@ -1653,7 +1646,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) for (i = 0; i < ckp->proxies; i++) { si = ckp->servers[i]; - close(si->cs.fd); + Close(si->cs.fd); proxi = si->data; free(proxi->enonce1); free(proxi->enonce1bin); From 1f68a5a98b2a4b901aac8d400dedb16bceb13e2f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:16:30 +1000 Subject: [PATCH 26/35] Use the Close handler in the stratifier --- src/stratifier.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 332d0e62..4c6fb620 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1238,18 +1238,18 @@ retry: dealloc(buf); buf = recv_unix_msg(sockd); if (!buf) { - close(sockd); + Close(sockd); LOGWARNING("Failed to get message in stratum_loop"); goto retry; } if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); send_unix_msg(sockd, "pong"); - close(sockd); + Close(sockd); goto retry; } - close(sockd); + Close(sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; From 3bf4e0de8d5388712a9e38190868a83b2ce99c8a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:38:16 +1000 Subject: [PATCH 27/35] Always send dropclient to the stratifier on every invalidate client call --- src/connector.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index e6bf60d7..7c9e28b1 100644 --- a/src/connector.c +++ b/src/connector.c @@ -169,11 +169,8 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client) { char buf[256]; - int fd; - fd = drop_client(ci, client); - if (fd == -1) - return; + drop_client(ci, client); if (ckp->passthrough) return; sprintf(buf, "dropclient=%ld", client->id); @@ -459,7 +456,8 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf) if (unlikely(fd == -1)) { if (client) { - LOGINFO("Client id %ld disconnected", id); + /* This shouldn't happen */ + LOGWARNING("Client id %ld disconnected but fd already invalidated!", id); invalidate_client(ci->pi->ckp, ci, client); } else LOGINFO("Connector failed to find client id %ld to send to", id); From 26b4037effb2d66f102574bed40c51007d61b52c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:41:32 +1000 Subject: [PATCH 28/35] Log a warning if a client with an invalidated fd is still in the fdclients hashtable --- src/connector.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/connector.c b/src/connector.c index 7c9e28b1..083e6b9e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -286,6 +286,11 @@ retry: ck_rlock(&ci->lock); HASH_ITER(fdhh, fdclients, client, tmp) { + if (unlikely(client->fd == -1)) { + LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!", + client->id); + continue; + } fds[nfds].fd = client->fd; fds[nfds].events = POLLIN; fds[nfds].revents = 0; From 5c6e048a2e1afae1ac035a96de8f15c3b40e57cf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 22:57:27 +1000 Subject: [PATCH 29/35] Poll every 100ms instead of every second in the connector to rapidly pick up new clients --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 083e6b9e..9538ce6c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -302,7 +302,7 @@ retry: cksleep_ms(100); goto retry; } - ret = poll(fds, nfds, 1000); + ret = poll(fds, nfds, 100); if (unlikely(ret < 0)) { LOGERR("Failed to poll in receiver"); goto out; From 146e3140f312345a8f5dd49502e22d7369b5ca3e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Oct 2014 23:07:55 +1000 Subject: [PATCH 30/35] Add client source port to verbose logging --- src/connector.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 9538ce6c..69ec4825 100644 --- a/src/connector.c +++ b/src/connector.c @@ -92,7 +92,7 @@ void *acceptor(void *arg) conn_instance_t *ci = (conn_instance_t *)arg; client_instance_t *client, *old_client; socklen_t address_len; - int fd; + int fd, port; rename_proc("acceptor"); @@ -115,10 +115,12 @@ retry: case AF_INET: inet4_in = (struct sockaddr_in *)&client->address; inet_ntop(AF_INET, &inet4_in->sin_addr, client->address_name, INET6_ADDRSTRLEN); + port = htons(inet4_in->sin_port); break; case AF_INET6: inet6_in = (struct sockaddr_in6 *)&client->address; inet_ntop(AF_INET6, &inet6_in->sin6_addr, client->address_name, INET6_ADDRSTRLEN); + port = htons(inet6_in->sin6_port); break; default: LOGWARNING("Unknown INET type for client %d on socket %d", @@ -130,7 +132,7 @@ retry: keep_sockalive(fd); - LOGINFO("Connected new client %d on socket %d from %s", ci->nfds, fd, client->address_name); + LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); client->fd = fd; From 7d878bd22318d6a8a5abca40ce732d538e133826 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 4 Oct 2014 03:13:04 +1000 Subject: [PATCH 31/35] Force close when dropping a client with a reset using SO_LINGER and give a warning on close failure --- src/connector.c | 4 ++++ src/libckpool.c | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 69ec4825..71941848 100644 --- a/src/connector.c +++ b/src/connector.c @@ -155,6 +155,10 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) ck_wlock(&ci->lock); fd = client->fd; if (fd != -1) { + const struct linger so_linger = { 1, 0 }; + + if (unlikely(setsockopt(client->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)))) + LOGWARNING("setsockopt failed with errno %d:%s", errno, strerror(errno)); Close(client->fd); HASH_DEL(clients, client); HASH_DELETE(fdhh, fdclients, client); diff --git a/src/libckpool.c b/src/libckpool.c index 49232894..731132ae 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -437,7 +437,8 @@ void _Close(int *fd) if (*fd < 0) return; LOGDEBUG("Closing file handle %d", *fd); - close(*fd); + if (unlikely(close(*fd))) + LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno)); *fd = -1; } From 8e29c957e0f699f343387047fae92f816f6b0204 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 4 Oct 2014 10:06:19 +1000 Subject: [PATCH 32/35] Add helper function to set no linger --- src/libckpool.c | 7 +++++++ src/libckpool.h | 1 + 2 files changed, 8 insertions(+) diff --git a/src/libckpool.c b/src/libckpool.c index 731132ae..32d08a98 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -418,6 +418,13 @@ void keep_sockalive(int fd) setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &tcp_keepintvl, sizeof(tcp_keepintvl)); } +void nolinger_socket(int fd) +{ + const struct linger so_linger = { 1, 0 }; + + setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); +} + void noblock_socket(int fd) { int flags = fcntl(fd, F_GETFL, 0); diff --git a/src/libckpool.h b/src/libckpool.h index ce1c8b3a..a769a0ed 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -413,6 +413,7 @@ static inline bool sock_timeout(void) bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); void keep_sockalive(int fd); +void nolinger_socket(int fd); void noblock_socket(int fd); void block_socket(int fd); void _Close(int *fd); From 0c16e193ceef70eaf8ff8e05ce124bb4bcf3eb46 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 4 Oct 2014 10:09:28 +1000 Subject: [PATCH 33/35] Set no linger on all connected clients --- src/connector.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 71941848..97f51418 100644 --- a/src/connector.c +++ b/src/connector.c @@ -131,6 +131,7 @@ retry: } keep_sockalive(fd); + nolinger_socket(fd); LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); @@ -155,10 +156,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) ck_wlock(&ci->lock); fd = client->fd; if (fd != -1) { - const struct linger so_linger = { 1, 0 }; - - if (unlikely(setsockopt(client->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)))) - LOGWARNING("setsockopt failed with errno %d:%s", errno, strerror(errno)); Close(client->fd); HASH_DEL(clients, client); HASH_DELETE(fdhh, fdclients, client); From 1b45079802f584d95258e560bd6a7a0975b1f4a1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 4 Oct 2014 14:51:36 +1000 Subject: [PATCH 34/35] Keep track of clients that submit a run of continuous rejects and send them a diff update if they do it for longer than 1 minute and then lazily drop them if it goes beyond 2 minutes --- src/stratifier.c | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 4c6fb620..df99c571 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -272,6 +272,7 @@ struct stratum_instance { int ssdc; /* Shares since diff change */ tv_t first_share; tv_t last_share; + time_t first_invalid; /* Time of first invalid in run of non stale rejects */ time_t start_time; char address[INET6_ADDRSTRLEN]; @@ -279,6 +280,9 @@ struct stratum_instance { bool authorised; bool idle; bool notified_idle; + int reject; /* Indicator that this client is having a run of rejects + * or other problem and should be dropped lazily if + * this is set to 2 */ user_instance_t *user_instance; worker_instance_t *worker_instance; @@ -2055,13 +2059,15 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, char idstring[20]; uint32_t ntime32; uchar hash[32]; - int64_t id; + time_t now_t; json_t *val; + int64_t id; ts_t now; FILE *fp; int len; ts_realtime(&now); + now_t = now.tv_sec; sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); if (unlikely(!json_is_array(params_val))) { @@ -2235,6 +2241,26 @@ out_unlock: } ckdbq_add(ckp, ID_SHARES, val); out: + if ((!result && !submit) || !share) { + /* Is this the first in a run of invalids? */ + if (client->first_invalid < client->last_share.tv_sec || !client->first_invalid) + client->first_invalid = now_t; + else if (client->first_invalid && client->first_invalid < now_t - 120) { + LOGNOTICE("Client %d rejecting for 120s, disconnecting", client->id); + stratum_send_message(client, "Disconnecting for continuous invalid shares"); + client->reject = 2; + } else if (client->first_invalid && client->first_invalid < now_t - 60) { + if (!client->reject) { + LOGINFO("Client %d rejecting for 60s, sending diff", client->id); + stratum_send_diff(client); + client->reject = 1; + } + } + } else { + client->first_invalid = 0; + client->reject = 0; + } + if (!share) { JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", "clientid", client->id, @@ -2401,6 +2427,13 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method return; } + if (unlikely(client->reject == 2)) { + LOGINFO("Dropping client %d tagged for lazy invalidation", client_id); + snprintf(buf, 255, "dropclient=%ld", client->id); + send_proc(client->ckp->connector, buf); + return; + } + /* Random broken clients send something not an integer as the id so we copy * the json item for id_val as is for the response. */ method = json_string_value(method_val); From b08f4bcb6950110509fe3e22e8a95f2bea174e7a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 4 Oct 2014 14:57:42 +1000 Subject: [PATCH 35/35] Drop clients lazily when the proxy is full --- src/stratifier.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stratifier.c b/src/stratifier.c index df99c571..14a30911 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1443,6 +1443,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js /* Create a new extranonce1 based on a uint64_t pointer */ if (!new_enonce1(client)) { stratum_send_message(client, "Pool full of clients"); + client->reject = 2; return json_string("proxy full"); } LOGINFO("Set new subscription %ld to new enonce1 %s", client->id,