diff --git a/src/ckpool.c b/src/ckpool.c index 6f3eb99e..5d43d96a 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1542,6 +1542,7 @@ static struct option long_options[] = { {"node", no_argument, 0, 'N'}, {"passthrough", no_argument, 0, 'P'}, {"proxy", no_argument, 0, 'p'}, + {"quiet", no_argument, 0, 'q'}, {"redirector", no_argument, 0, 'R'}, {"ckdb-sockdir",required_argument, 0, 'S'}, {"sockdir", required_argument, 0, 's'}, @@ -1563,6 +1564,7 @@ static struct option long_options[] = { {"node", no_argument, 0, 'N'}, {"passthrough", no_argument, 0, 'P'}, {"proxy", no_argument, 0, 'p'}, + {"quiet", no_argument, 0, 'q'}, {"redirector", no_argument, 0, 'R'}, {"sockdir", required_argument, 0, 's'}, {"trusted", no_argument, 0, 't'}, @@ -1610,7 +1612,7 @@ int main(int argc, char **argv) ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = NULL; - while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:Nn:PpRS:s:tu", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:Nn:PpqRS:s:tu", long_options, &i)) != -1) { switch (c) { case 'A': ckp.standalone = true; @@ -1679,6 +1681,9 @@ int main(int argc, char **argv) quit(1, "Cannot set another proxy type or redirector and proxy mode"); ckp.proxy = true; break; + case 'q': + ckp.quiet = true; + break; case 'R': if (ckp.proxy || ckp.passthrough || ckp.userproxy || ckp.node) quit(1, "Cannot set a proxy type or passthrough and redirector modes"); diff --git a/src/ckpool.h b/src/ckpool.h index 8596c8ec..520c6eb3 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -209,6 +209,9 @@ struct ckpool_instance { /* Should we daemonise the ckpool process */ bool daemon; + /* Should we disable the throbber */ + bool quiet; + /* Have we given warnings about the inability to raise buf sizes */ bool wmem_warn; bool rmem_warn; diff --git a/src/generator.c b/src/generator.c index 5d21ed03..2908421d 100644 --- a/src/generator.c +++ b/src/generator.c @@ -969,6 +969,9 @@ static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy) mutex_unlock(&gdata->lock); } +/* The difference between a dead proxy and a deleted one is the parent proxy entry + * is not removed from the stratifier as it assumes it is down whereas a deleted + * proxy has had its entry removed from the generator. */ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid) { char buf[256]; @@ -979,6 +982,16 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub send_proc(ckp->stratifier, buf); } +static void send_stratifier_delproxy(ckpool_t *ckp, const int id, const int subid) +{ + char buf[256]; + + if (ckp->passthrough) + return; + sprintf(buf, "delproxy=%d:%d", id, subid); + send_proc(ckp->stratifier, buf); +} + /* Remove the subproxy from the proxi list and put it on the dead list. * Further use of the subproxy pointer may point to a new proxy but will not * dereference. This will only disable subproxies so parent proxies need to @@ -2446,9 +2459,11 @@ static void delete_proxy(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxy) HASH_DELETE(sh, proxy->subproxies, subproxy); mutex_unlock(&proxy->proxy_lock); - send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid); - if (subproxy && proxy != subproxy) - store_proxy(gdata, subproxy); + if (subproxy) { + send_stratifier_delproxy(ckp, subproxy->id, subproxy->subid); + if (proxy != subproxy) + store_proxy(gdata, subproxy); + } } while (subproxy); /* Recycle the proxy itself */ diff --git a/src/stratifier.c b/src/stratifier.c index d3efb0f2..a037da91 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -366,6 +366,7 @@ struct proxy_base { proxy_t *subproxies; /* Hashlist of subproxies sorted by subid */ sdata_t *sdata; /* Unique stratifer data for each subproxy */ bool dead; + bool deleted; }; typedef struct session session_t; @@ -777,30 +778,31 @@ out: static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, const char *func, const int line) { - static time_t time_counter; sdata_t *sdata = ckp->sdata; + static time_t time_counter; static int counter = 0; char *json_msg; - time_t now_t; - char ch; if (unlikely(!val)) { LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line); return; } - now_t = time(NULL); - if (now_t != time_counter) { - pool_stats_t *stats = &sdata->stats; - char hashrate[16]; + if (!ckp->quiet) { + time_t now_t = time(NULL); - /* Rate limit to 1 update per second */ - time_counter = now_t; - suffix_string(stats->dsps1 * nonces, hashrate, 16, 3); - ch = status_chars[(counter++) & 0x3]; - fprintf(stdout, "\33[2K\r%c %sH/s %.1f SPS %d users %d workers", - ch, hashrate, stats->sps1, stats->users, stats->workers); - fflush(stdout); + if (now_t != time_counter) { + pool_stats_t *stats = &sdata->stats; + char hashrate[16], ch; + + /* Rate limit to 1 update per second */ + time_counter = now_t; + suffix_string(stats->dsps1 * nonces, hashrate, 16, 3); + ch = status_chars[(counter++) & 0x3]; + fprintf(stdout, "\33[2K\r%c %sH/s %.1f SPS %d users %d workers", + ch, hashrate, stats->sps1, stats->users, stats->workers); + fflush(stdout); + } } if (CKP_STANDALONE(ckp)) @@ -2282,7 +2284,7 @@ static void check_bestproxy(sdata_t *sdata) LOGNOTICE("Stratifier setting active proxy to %d", changed_id); } -static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced) +static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted) { stratum_instance_t *client, *tmp; int reconnects = 0, proxyid = 0; @@ -2292,6 +2294,7 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bo proxy = existing_subproxy(sdata, id, subid); if (proxy) { proxy->dead = true; + proxy->deleted = deleted; if (!replaced && proxy->global) check_bestproxy(sdata); } @@ -2373,7 +2376,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) /* Is this a replacement for an existing proxy id? */ old = existing_subproxy(sdata, id, subid); if (old) { - dead_proxyid(sdata, id, subid, true); + dead_proxyid(sdata, id, subid, true, false); proxy = old; proxy->dead = false; } else @@ -2727,6 +2730,11 @@ static void reap_proxies(ckpool_t *ckp, sdata_t *sdata) proxy->subproxy_count--; free_proxy(subproxy); } + /* Should we reap the parent proxy too?*/ + if (!proxy->deleted || proxy->subproxy_count > 1 || proxy->bound_clients) + continue; + HASH_DELETE(hh, sdata->proxies, proxy); + free_proxy(proxy); } mutex_unlock(&sdata->proxy_lock); @@ -2752,7 +2760,7 @@ static void __inc_instance_ref(stratum_instance_t *client) /* Find an __instance_by_id and increase its reference count allowing us to * use this instance outside of instance_lock without fear of it being * dereferenced. Does not return dropped clients still on the list. */ -static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, const int64_t id) +static inline stratum_instance_t *ref_instance_by_id(sdata_t *sdata, const int64_t id) { stratum_instance_t *client; @@ -2795,6 +2803,11 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil __kill_instance(sdata, client); } +static int __dec_instance_ref(stratum_instance_t *client) +{ + return --client->ref; +} + /* Decrease the reference count of instance. */ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file, const char *func, const int line) @@ -2805,7 +2818,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const int ref; ck_wlock(&sdata->instance_lock); - ref = --client->ref; + ref = __dec_instance_ref(client); /* See if there are any instances that were dropped that could not be * moved due to holding a reference and drop them now. */ if (unlikely(client->dropped && !ref)) { @@ -3414,12 +3427,22 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client) stratum_add_send(sdata, json_msg, client->id, SM_RECONNECT); } -static void dead_proxy(sdata_t *sdata, const char *buf) +static void dead_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) { int id = 0, subid = 0; sscanf(buf, "deadproxy=%d:%d", &id, &subid); - dead_proxyid(sdata, id, subid, false); + dead_proxyid(sdata, id, subid, false, false); + reap_proxies(ckp, sdata); +} + +static void del_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) +{ + int id = 0, subid = 0; + + sscanf(buf, "delproxy=%d:%d", &id, &subid); + dead_proxyid(sdata, id, subid, false, true); + reap_proxies(ckp, sdata); } static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) @@ -4108,7 +4131,9 @@ retry: } else if (cmdmatch(buf, "reconnect")) { request_reconnect(sdata, buf); } else if (cmdmatch(buf, "deadproxy")) { - dead_proxy(sdata, buf); + dead_proxy(ckp, sdata, buf); + } else if (cmdmatch(buf, "delproxy")) { + del_proxy(ckp, sdata, buf); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ckdbflush")) { @@ -5481,7 +5506,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, *err_val = JSON_ERR(err); goto out; } - if (unlikely(json_array_size(params_val) != 5)) { + if (unlikely(json_array_size(params_val) < 5)) { err = SE_INVALID_SIZE; *err_val = JSON_ERR(err); goto out; @@ -7083,6 +7108,35 @@ static void upstream_workers(ckpool_t *ckp, user_instance_t *user) send_proc(ckp->connector, buf); } + +/* To iterate over all users, if user is initially NULL, this will return the first entry, + * otherwise it will return the entry after user, and NULL if there are no more entries. + * Allows us to grab and drop the lock on each iteration. */ +static user_instance_t *next_user(sdata_t *sdata, user_instance_t *user) +{ + ck_rlock(&sdata->instance_lock); + if (unlikely(!user)) + user = sdata->user_instances; + else + user = user->hh.next; + ck_runlock(&sdata->instance_lock); + + return user; +} + +/* Ditto for worker */ +static worker_instance_t *next_worker(sdata_t *sdata, user_instance_t *user, worker_instance_t *worker) +{ + ck_rlock(&sdata->instance_lock); + if (!worker) + worker = user->worker_instances; + else + worker = worker->next; + ck_runlock(&sdata->instance_lock); + + return worker; +} + static void *statsupdate(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; @@ -7100,10 +7154,10 @@ static void *statsupdate(void *arg) double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080, per_tdiff; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; char suffix360[16], suffix1440[16], suffix10080[16]; - stratum_instance_t *client, *tmp; log_entry_t *log_entries = NULL; - user_instance_t *user, *tmpuser; char_entry_t *char_list = NULL; + stratum_instance_t *client; + user_instance_t *user; int idle_workers = 0; char *fname, *s, *sp; tv_t now, diff; @@ -7115,53 +7169,69 @@ static void *statsupdate(void *arg) tv_time(&now); timersub(&now, &stats->start_time, &diff); - /* Use this locking as an opportunity to test clients. */ - ck_rlock(&sdata->instance_lock); - HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + ck_wlock(&sdata->instance_lock); + /* Grab the first entry */ + client = sdata->stratum_instances; + if (likely(client)) + __inc_instance_ref(client); + ck_wunlock(&sdata->instance_lock); + + while (client) { + tv_time(&now); /* Look for clients that may have been dropped which the * stratifier has not been informed about and ask the * connector if they still exist */ - if (client->dropped) { + if (client->dropped) connector_test_client(ckp, client->id); - continue; - } - - if (client->node || client->remote) - continue; - - /* Test for clients that haven't authed in over a minute - * and drop them lazily */ - if (!client->authorised) { + else if (client->node || client->remote) { + /* Do nothing to these */ + } else if (!client->authorised) { + /* Test for clients that haven't authed in over a minute + * and drop them lazily */ if (now.tv_sec > client->start_time + 60) { client->dropped = true; connector_drop_client(ckp, client->id); } - continue; + } else { + per_tdiff = tvdiff(&now, &client->last_share); + /* Decay times per connected instance */ + if (per_tdiff > 60) { + /* No shares for over a minute, decay to 0 */ + decay_client(client, 0, &now); + idle_workers++; + if (per_tdiff > 600) + client->idle = true; + /* Test idle clients are still connected */ + connector_test_client(ckp, client->id); + } } - per_tdiff = tvdiff(&now, &client->last_share); - /* Decay times per connected instance */ - if (per_tdiff > 60) { - /* No shares for over a minute, decay to 0 */ - decay_client(client, 0, &now); - idle_workers++; - if (per_tdiff > 600) - client->idle = true; - /* Test idle clients are still connected */ - connector_test_client(ckp, client->id); - continue; - } + ck_wlock(&sdata->instance_lock); + /* Drop the reference of the last entry we examined, + * then grab the next client. */ + __dec_instance_ref(client); + client = client->hh.next; + /* Grab a reference to this client allowing us to examine + * it without holding the lock */ + if (likely(client)) + __inc_instance_ref(client); + ck_wunlock(&sdata->instance_lock); } - HASH_ITER(hh, sdata->user_instances, user, tmpuser) { + user = NULL; + + while ((user = next_user(sdata, user)) != NULL) { worker_instance_t *worker; bool idle = false; if (!user->authorised) continue; + worker = NULL; + tv_time(&now); + /* Decay times per worker */ - DL_FOREACH(user->worker_instances, worker) { + while ((worker = next_worker(sdata, user, worker)) != NULL) { per_tdiff = tvdiff(&now, &worker->last_share); if (per_tdiff > 60) { decay_worker(worker, 0, &now); @@ -7250,7 +7320,6 @@ static void *statsupdate(void *arg) if (ckp->remote) upstream_workers(ckp, user); } - ck_runlock(&sdata->instance_lock); /* Dump log entries out of instance_lock */ dump_log_entries(&log_entries);