kanoi 8 years ago
parent
commit
0e917aed1a
  1. 7
      src/ckpool.c
  2. 3
      src/ckpool.h
  3. 21
      src/generator.c
  4. 175
      src/stratifier.c

7
src/ckpool.c

@ -1542,6 +1542,7 @@ static struct option long_options[] = {
{"node", no_argument, 0, 'N'}, {"node", no_argument, 0, 'N'},
{"passthrough", no_argument, 0, 'P'}, {"passthrough", no_argument, 0, 'P'},
{"proxy", no_argument, 0, 'p'}, {"proxy", no_argument, 0, 'p'},
{"quiet", no_argument, 0, 'q'},
{"redirector", no_argument, 0, 'R'}, {"redirector", no_argument, 0, 'R'},
{"ckdb-sockdir",required_argument, 0, 'S'}, {"ckdb-sockdir",required_argument, 0, 'S'},
{"sockdir", required_argument, 0, 's'}, {"sockdir", required_argument, 0, 's'},
@ -1563,6 +1564,7 @@ static struct option long_options[] = {
{"node", no_argument, 0, 'N'}, {"node", no_argument, 0, 'N'},
{"passthrough", no_argument, 0, 'P'}, {"passthrough", no_argument, 0, 'P'},
{"proxy", no_argument, 0, 'p'}, {"proxy", no_argument, 0, 'p'},
{"quiet", no_argument, 0, 'q'},
{"redirector", no_argument, 0, 'R'}, {"redirector", no_argument, 0, 'R'},
{"sockdir", required_argument, 0, 's'}, {"sockdir", required_argument, 0, 's'},
{"trusted", no_argument, 0, 't'}, {"trusted", no_argument, 0, 't'},
@ -1610,7 +1612,7 @@ int main(int argc, char **argv)
ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = strdup(argv[ckp.args]);
ckp.initial_args[ckp.args] = NULL; ckp.initial_args[ckp.args] = NULL;
while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:Nn:PpRS:s:tu", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:Nn:PpqRS:s:tu", long_options, &i)) != -1) {
switch (c) { switch (c) {
case 'A': case 'A':
ckp.standalone = true; ckp.standalone = true;
@ -1679,6 +1681,9 @@ int main(int argc, char **argv)
quit(1, "Cannot set another proxy type or redirector and proxy mode"); quit(1, "Cannot set another proxy type or redirector and proxy mode");
ckp.proxy = true; ckp.proxy = true;
break; break;
case 'q':
ckp.quiet = true;
break;
case 'R': case 'R':
if (ckp.proxy || ckp.passthrough || ckp.userproxy || ckp.node) if (ckp.proxy || ckp.passthrough || ckp.userproxy || ckp.node)
quit(1, "Cannot set a proxy type or passthrough and redirector modes"); quit(1, "Cannot set a proxy type or passthrough and redirector modes");

3
src/ckpool.h

@ -209,6 +209,9 @@ struct ckpool_instance {
/* Should we daemonise the ckpool process */ /* Should we daemonise the ckpool process */
bool daemon; bool daemon;
/* Should we disable the throbber */
bool quiet;
/* Have we given warnings about the inability to raise buf sizes */ /* Have we given warnings about the inability to raise buf sizes */
bool wmem_warn; bool wmem_warn;
bool rmem_warn; bool rmem_warn;

21
src/generator.c

@ -969,6 +969,9 @@ static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy)
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
} }
/* The difference between a dead proxy and a deleted one is the parent proxy entry
* is not removed from the stratifier as it assumes it is down whereas a deleted
* proxy has had its entry removed from the generator. */
static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid) static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid)
{ {
char buf[256]; char buf[256];
@ -979,6 +982,16 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
static void send_stratifier_delproxy(ckpool_t *ckp, const int id, const int subid)
{
char buf[256];
if (ckp->passthrough)
return;
sprintf(buf, "delproxy=%d:%d", id, subid);
send_proc(ckp->stratifier, buf);
}
/* Remove the subproxy from the proxi list and put it on the dead list. /* Remove the subproxy from the proxi list and put it on the dead list.
* Further use of the subproxy pointer may point to a new proxy but will not * Further use of the subproxy pointer may point to a new proxy but will not
* dereference. This will only disable subproxies so parent proxies need to * dereference. This will only disable subproxies so parent proxies need to
@ -2446,9 +2459,11 @@ static void delete_proxy(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxy)
HASH_DELETE(sh, proxy->subproxies, subproxy); HASH_DELETE(sh, proxy->subproxies, subproxy);
mutex_unlock(&proxy->proxy_lock); mutex_unlock(&proxy->proxy_lock);
send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid); if (subproxy) {
if (subproxy && proxy != subproxy) send_stratifier_delproxy(ckp, subproxy->id, subproxy->subid);
store_proxy(gdata, subproxy); if (proxy != subproxy)
store_proxy(gdata, subproxy);
}
} while (subproxy); } while (subproxy);
/* Recycle the proxy itself */ /* Recycle the proxy itself */

175
src/stratifier.c

@ -366,6 +366,7 @@ struct proxy_base {
proxy_t *subproxies; /* Hashlist of subproxies sorted by subid */ proxy_t *subproxies; /* Hashlist of subproxies sorted by subid */
sdata_t *sdata; /* Unique stratifer data for each subproxy */ sdata_t *sdata; /* Unique stratifer data for each subproxy */
bool dead; bool dead;
bool deleted;
}; };
typedef struct session session_t; typedef struct session session_t;
@ -777,30 +778,31 @@ out:
static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file,
const char *func, const int line) const char *func, const int line)
{ {
static time_t time_counter;
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
static time_t time_counter;
static int counter = 0; static int counter = 0;
char *json_msg; char *json_msg;
time_t now_t;
char ch;
if (unlikely(!val)) { if (unlikely(!val)) {
LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line); LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line);
return; return;
} }
now_t = time(NULL); if (!ckp->quiet) {
if (now_t != time_counter) { time_t now_t = time(NULL);
pool_stats_t *stats = &sdata->stats;
char hashrate[16];
/* Rate limit to 1 update per second */ if (now_t != time_counter) {
time_counter = now_t; pool_stats_t *stats = &sdata->stats;
suffix_string(stats->dsps1 * nonces, hashrate, 16, 3); char hashrate[16], ch;
ch = status_chars[(counter++) & 0x3];
fprintf(stdout, "\33[2K\r%c %sH/s %.1f SPS %d users %d workers", /* Rate limit to 1 update per second */
ch, hashrate, stats->sps1, stats->users, stats->workers); time_counter = now_t;
fflush(stdout); suffix_string(stats->dsps1 * nonces, hashrate, 16, 3);
ch = status_chars[(counter++) & 0x3];
fprintf(stdout, "\33[2K\r%c %sH/s %.1f SPS %d users %d workers",
ch, hashrate, stats->sps1, stats->users, stats->workers);
fflush(stdout);
}
} }
if (CKP_STANDALONE(ckp)) if (CKP_STANDALONE(ckp))
@ -2282,7 +2284,7 @@ static void check_bestproxy(sdata_t *sdata)
LOGNOTICE("Stratifier setting active proxy to %d", changed_id); LOGNOTICE("Stratifier setting active proxy to %d", changed_id);
} }
static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced) static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0, proxyid = 0; int reconnects = 0, proxyid = 0;
@ -2292,6 +2294,7 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bo
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (proxy) { if (proxy) {
proxy->dead = true; proxy->dead = true;
proxy->deleted = deleted;
if (!replaced && proxy->global) if (!replaced && proxy->global)
check_bestproxy(sdata); check_bestproxy(sdata);
} }
@ -2373,7 +2376,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
/* Is this a replacement for an existing proxy id? */ /* Is this a replacement for an existing proxy id? */
old = existing_subproxy(sdata, id, subid); old = existing_subproxy(sdata, id, subid);
if (old) { if (old) {
dead_proxyid(sdata, id, subid, true); dead_proxyid(sdata, id, subid, true, false);
proxy = old; proxy = old;
proxy->dead = false; proxy->dead = false;
} else } else
@ -2727,6 +2730,11 @@ static void reap_proxies(ckpool_t *ckp, sdata_t *sdata)
proxy->subproxy_count--; proxy->subproxy_count--;
free_proxy(subproxy); free_proxy(subproxy);
} }
/* Should we reap the parent proxy too?*/
if (!proxy->deleted || proxy->subproxy_count > 1 || proxy->bound_clients)
continue;
HASH_DELETE(hh, sdata->proxies, proxy);
free_proxy(proxy);
} }
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
@ -2752,7 +2760,7 @@ static void __inc_instance_ref(stratum_instance_t *client)
/* Find an __instance_by_id and increase its reference count allowing us to /* Find an __instance_by_id and increase its reference count allowing us to
* use this instance outside of instance_lock without fear of it being * use this instance outside of instance_lock without fear of it being
* dereferenced. Does not return dropped clients still on the list. */ * dereferenced. Does not return dropped clients still on the list. */
static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, const int64_t id) static inline stratum_instance_t *ref_instance_by_id(sdata_t *sdata, const int64_t id)
{ {
stratum_instance_t *client; stratum_instance_t *client;
@ -2795,6 +2803,11 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
__kill_instance(sdata, client); __kill_instance(sdata, client);
} }
static int __dec_instance_ref(stratum_instance_t *client)
{
return --client->ref;
}
/* Decrease the reference count of instance. */ /* Decrease the reference count of instance. */
static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file, static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file,
const char *func, const int line) const char *func, const int line)
@ -2805,7 +2818,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const
int ref; int ref;
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
ref = --client->ref; ref = __dec_instance_ref(client);
/* See if there are any instances that were dropped that could not be /* See if there are any instances that were dropped that could not be
* moved due to holding a reference and drop them now. */ * moved due to holding a reference and drop them now. */
if (unlikely(client->dropped && !ref)) { if (unlikely(client->dropped && !ref)) {
@ -3414,12 +3427,22 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
stratum_add_send(sdata, json_msg, client->id, SM_RECONNECT); stratum_add_send(sdata, json_msg, client->id, SM_RECONNECT);
} }
static void dead_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf)
{ {
int id = 0, subid = 0; int id = 0, subid = 0;
sscanf(buf, "deadproxy=%d:%d", &id, &subid); sscanf(buf, "deadproxy=%d:%d", &id, &subid);
dead_proxyid(sdata, id, subid, false); dead_proxyid(sdata, id, subid, false, false);
reap_proxies(ckp, sdata);
}
static void del_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf)
{
int id = 0, subid = 0;
sscanf(buf, "delproxy=%d:%d", &id, &subid);
dead_proxyid(sdata, id, subid, false, true);
reap_proxies(ckp, sdata);
} }
static void reconnect_client_id(sdata_t *sdata, const int64_t client_id) static void reconnect_client_id(sdata_t *sdata, const int64_t client_id)
@ -4108,7 +4131,9 @@ retry:
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
request_reconnect(sdata, buf); request_reconnect(sdata, buf);
} else if (cmdmatch(buf, "deadproxy")) { } else if (cmdmatch(buf, "deadproxy")) {
dead_proxy(sdata, buf); dead_proxy(ckp, sdata, buf);
} else if (cmdmatch(buf, "delproxy")) {
del_proxy(ckp, sdata, buf);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ckdbflush")) { } else if (cmdmatch(buf, "ckdbflush")) {
@ -5481,7 +5506,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
*err_val = JSON_ERR(err); *err_val = JSON_ERR(err);
goto out; goto out;
} }
if (unlikely(json_array_size(params_val) != 5)) { if (unlikely(json_array_size(params_val) < 5)) {
err = SE_INVALID_SIZE; err = SE_INVALID_SIZE;
*err_val = JSON_ERR(err); *err_val = JSON_ERR(err);
goto out; goto out;
@ -7083,6 +7108,35 @@ static void upstream_workers(ckpool_t *ckp, user_instance_t *user)
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
} }
/* To iterate over all users, if user is initially NULL, this will return the first entry,
* otherwise it will return the entry after user, and NULL if there are no more entries.
* Allows us to grab and drop the lock on each iteration. */
static user_instance_t *next_user(sdata_t *sdata, user_instance_t *user)
{
ck_rlock(&sdata->instance_lock);
if (unlikely(!user))
user = sdata->user_instances;
else
user = user->hh.next;
ck_runlock(&sdata->instance_lock);
return user;
}
/* Ditto for worker */
static worker_instance_t *next_worker(sdata_t *sdata, user_instance_t *user, worker_instance_t *worker)
{
ck_rlock(&sdata->instance_lock);
if (!worker)
worker = user->worker_instances;
else
worker = worker->next;
ck_runlock(&sdata->instance_lock);
return worker;
}
static void *statsupdate(void *arg) static void *statsupdate(void *arg)
{ {
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
@ -7100,10 +7154,10 @@ static void *statsupdate(void *arg)
double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080, per_tdiff; double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080, per_tdiff;
char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64];
char suffix360[16], suffix1440[16], suffix10080[16]; char suffix360[16], suffix1440[16], suffix10080[16];
stratum_instance_t *client, *tmp;
log_entry_t *log_entries = NULL; log_entry_t *log_entries = NULL;
user_instance_t *user, *tmpuser;
char_entry_t *char_list = NULL; char_entry_t *char_list = NULL;
stratum_instance_t *client;
user_instance_t *user;
int idle_workers = 0; int idle_workers = 0;
char *fname, *s, *sp; char *fname, *s, *sp;
tv_t now, diff; tv_t now, diff;
@ -7115,53 +7169,69 @@ static void *statsupdate(void *arg)
tv_time(&now); tv_time(&now);
timersub(&now, &stats->start_time, &diff); timersub(&now, &stats->start_time, &diff);
/* Use this locking as an opportunity to test clients. */ ck_wlock(&sdata->instance_lock);
ck_rlock(&sdata->instance_lock); /* Grab the first entry */
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { client = sdata->stratum_instances;
if (likely(client))
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock);
while (client) {
tv_time(&now);
/* Look for clients that may have been dropped which the /* Look for clients that may have been dropped which the
* stratifier has not been informed about and ask the * stratifier has not been informed about and ask the
* connector if they still exist */ * connector if they still exist */
if (client->dropped) { if (client->dropped)
connector_test_client(ckp, client->id); connector_test_client(ckp, client->id);
continue; else if (client->node || client->remote) {
} /* Do nothing to these */
} else if (!client->authorised) {
if (client->node || client->remote) /* Test for clients that haven't authed in over a minute
continue; * and drop them lazily */
/* Test for clients that haven't authed in over a minute
* and drop them lazily */
if (!client->authorised) {
if (now.tv_sec > client->start_time + 60) { if (now.tv_sec > client->start_time + 60) {
client->dropped = true; client->dropped = true;
connector_drop_client(ckp, client->id); connector_drop_client(ckp, client->id);
} }
continue; } else {
per_tdiff = tvdiff(&now, &client->last_share);
/* Decay times per connected instance */
if (per_tdiff > 60) {
/* No shares for over a minute, decay to 0 */
decay_client(client, 0, &now);
idle_workers++;
if (per_tdiff > 600)
client->idle = true;
/* Test idle clients are still connected */
connector_test_client(ckp, client->id);
}
} }
per_tdiff = tvdiff(&now, &client->last_share); ck_wlock(&sdata->instance_lock);
/* Decay times per connected instance */ /* Drop the reference of the last entry we examined,
if (per_tdiff > 60) { * then grab the next client. */
/* No shares for over a minute, decay to 0 */ __dec_instance_ref(client);
decay_client(client, 0, &now); client = client->hh.next;
idle_workers++; /* Grab a reference to this client allowing us to examine
if (per_tdiff > 600) * it without holding the lock */
client->idle = true; if (likely(client))
/* Test idle clients are still connected */ __inc_instance_ref(client);
connector_test_client(ckp, client->id); ck_wunlock(&sdata->instance_lock);
continue;
}
} }
HASH_ITER(hh, sdata->user_instances, user, tmpuser) { user = NULL;
while ((user = next_user(sdata, user)) != NULL) {
worker_instance_t *worker; worker_instance_t *worker;
bool idle = false; bool idle = false;
if (!user->authorised) if (!user->authorised)
continue; continue;
worker = NULL;
tv_time(&now);
/* Decay times per worker */ /* Decay times per worker */
DL_FOREACH(user->worker_instances, worker) { while ((worker = next_worker(sdata, user, worker)) != NULL) {
per_tdiff = tvdiff(&now, &worker->last_share); per_tdiff = tvdiff(&now, &worker->last_share);
if (per_tdiff > 60) { if (per_tdiff > 60) {
decay_worker(worker, 0, &now); decay_worker(worker, 0, &now);
@ -7250,7 +7320,6 @@ static void *statsupdate(void *arg)
if (ckp->remote) if (ckp->remote)
upstream_workers(ckp, user); upstream_workers(ckp, user);
} }
ck_runlock(&sdata->instance_lock);
/* Dump log entries out of instance_lock */ /* Dump log entries out of instance_lock */
dump_log_entries(&log_entries); dump_log_entries(&log_entries);

Loading…
Cancel
Save