From cc6e431b13f8c2b852e8a9184c0da339f7461fe9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 13:04:19 +1000 Subject: [PATCH 1/6] Properly handle userproxy priorities --- src/stratifier.c | 55 +++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 304a5928..1d8592fe 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2081,19 +2081,25 @@ static int64_t prio_sort(proxy_t *a, proxy_t *b) return (a->priority - b->priority); } +/* Masked increment */ +static int64_t masked_inc(int64_t value, int64_t mask) +{ + value &= ~mask; + value++; + value |= mask; + return value; +} + /* Priority values can be sparse, they do not need to be sequential */ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) { proxy_t *tmpa, *tmpb, *exists = NULL; - int64_t next_prio = 0; + int64_t mask, next_prio = 0; /* Encode the userid as the high bits in priority */ - if (!proxy->global) { - int64_t high_bits = proxy->userid; - - high_bits <<= 32; - priority |= high_bits; - } + mask = proxy->userid; + mask <<= 32; + priority |= mask; /* See if the priority is already in use */ HASH_ITER(hh, sdata->proxies, tmpa, tmpb) { @@ -2101,7 +2107,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) break; if (tmpa->priority == priority) { exists = tmpa; - next_prio = exists->priority + 1; + next_prio = masked_inc(priority, mask); break; } } @@ -2109,7 +2115,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) HASH_ITER(hh, exists, tmpa, tmpb) { if (tmpa->priority > next_prio) break; - tmpa->priority++; + tmpa->priority = masked_inc(tmpa->priority, mask); next_prio++; } proxy->priority = priority; @@ -2225,11 +2231,15 @@ static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid) return subproxy; } +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid); + static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority) { mutex_lock(&sdata->proxy_lock); __set_proxy_prio(sdata, proxy, priority); mutex_unlock(&sdata->proxy_lock); + + check_userproxies(sdata, proxy, proxy->userid); } /* Set proxy to the current proxy and calculate how much headroom it has */ @@ -2544,7 +2554,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r /* Check how much headroom the userid proxies have and reconnect any clients * that are not bound to it that should be */ -static void check_userproxies(sdata_t *sdata, const int userid) +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid) { int64_t headroom = proxy_headroom(sdata, userid); stratum_instance_t *client, *tmpclient; @@ -2558,8 +2568,10 @@ static void check_userproxies(sdata_t *sdata, const int userid) continue; if (client->user_id != userid) continue; - /* Is this client bound to a dead proxy? */ - if (!client->reconnect && client->proxy->userid == userid) + /* Is the client already bound to a proxy of its own userid of + * a higher priority than this one. */ + if (client->proxy->userid == userid && + client->proxy->parent->priority <= proxy->parent->priority) continue; if (headroom-- < 1) continue; @@ -2681,7 +2693,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd) if (proxy->parent != best_proxy(sdata)->parent) reconnect_clients(sdata); } else - check_userproxies(sdata, proxy->userid); + check_userproxies(sdata, proxy, proxy->userid); clean |= new_block; LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, subid, clean ? "" : "out"); @@ -4355,19 +4367,16 @@ static proxy_t *__best_subproxy(proxy_t *proxy) * running out of room. */ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) { - proxy_t *current, *proxy, *tmp, *best = NULL; + proxy_t *global, *proxy, *tmp, *best = NULL; if (!ckp->proxy || ckp->passthrough) return ckp_sdata; - current = ckp_sdata->proxy; - if (!current) { - LOGWARNING("No proxy available yet to generate subscribes"); - return NULL; - } /* Proxies are ordered by priority so first available will be the best * priority */ mutex_lock(&ckp_sdata->proxy_lock); + best = global = ckp_sdata->proxy; + HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { if (proxy->userid < userid) continue; @@ -4381,12 +4390,14 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int if (!best) { if (!userid) - LOGWARNING("Temporarily insufficient subproxies to accept more clients"); + LOGWARNING("Temporarily insufficient proxies to accept more clients"); + else + LOGNOTICE("Temporarily insufficient proxies for userid %d to accept more clients", userid); return NULL; } if (!userid) { - if (best->id != current->id || current_headroom(ckp_sdata, &proxy) < 2) - generator_recruit(ckp, current->id, 1); + if (best->id != global->id || current_headroom(ckp_sdata, &proxy) < 2) + generator_recruit(ckp, global->id, 1); } else { if (proxy_headroom(ckp_sdata, userid) < 2) generator_recruit(ckp, best->id, 1); From a58e4ee46a5aa328d0dae0d64078e3852ac29b34 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 13:52:53 +1000 Subject: [PATCH 2/6] Better handle deleted global proxies and switch more rapidly --- src/stratifier.c | 52 ++++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1d8592fe..0b29b90c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2239,7 +2239,8 @@ static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority) __set_proxy_prio(sdata, proxy, priority); mutex_unlock(&sdata->proxy_lock); - check_userproxies(sdata, proxy, proxy->userid); + if (!proxy->global) + check_userproxies(sdata, proxy, proxy->userid); } /* Set proxy to the current proxy and calculate how much headroom it has */ @@ -2300,7 +2301,7 @@ static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int /* Find how much headroom we have and connect up to that many clients that are * not currently on this pool, recruiting more slots to switch more clients * later on lazily. Only reconnect clients bound to global proxies. */ -static void reconnect_clients(sdata_t *sdata) +static void reconnect_global_clients(sdata_t *sdata) { stratum_instance_t *client, *tmpclient; int reconnects = 0; @@ -2382,6 +2383,32 @@ static void check_bestproxy(sdata_t *sdata) LOGNOTICE("Stratifier setting active proxy to %d", changed_id); } +static proxy_t *best_proxy(sdata_t *sdata) +{ + proxy_t *proxy; + + mutex_lock(&sdata->proxy_lock); + proxy = sdata->proxy; + mutex_unlock(&sdata->proxy_lock); + + return proxy; +} + +static void check_globalproxies(sdata_t *sdata, proxy_t *proxy) +{ + check_bestproxy(sdata); + if (proxy->parent == best_proxy(sdata)->parent) + reconnect_global_clients(sdata); +} + +static void check_proxy(sdata_t *sdata, proxy_t *proxy) +{ + if (proxy->global) + check_globalproxies(sdata, proxy); + else + check_userproxies(sdata, proxy, proxy->userid); +} + static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted) { stratum_instance_t *client, *tmp; @@ -2393,6 +2420,7 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bo if (proxy) { proxy->dead = true; proxy->deleted = deleted; + set_proxy_prio(sdata, proxy, 0xFFFF); if (!replaced && proxy->global) check_bestproxy(sdata); } @@ -2526,6 +2554,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) LOGWARNING("Only able to set nonce2len %d of requested %d on proxy %d:%d", proxy->enonce2varlen, ckp->nonce2length, id, subid); json_decref(val); + + check_proxy(sdata, proxy); } /* Find the highest priority alive proxy belonging to userid and recruit extra @@ -2588,17 +2618,6 @@ static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid) recruit_best_userproxy(sdata, userid, -headroom); } -static proxy_t *best_proxy(sdata_t *sdata) -{ - proxy_t *proxy; - - mutex_lock(&sdata->proxy_lock); - proxy = sdata->proxy; - mutex_unlock(&sdata->proxy_lock); - - return proxy; -} - static void update_notify(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->sdata, *dsdata; @@ -2688,12 +2707,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd) LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash); } - if (proxy->global) { - check_bestproxy(sdata); - if (proxy->parent != best_proxy(sdata)->parent) - reconnect_clients(sdata); - } else - check_userproxies(sdata, proxy, proxy->userid); + check_proxy(sdata, proxy); clean |= new_block; LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, subid, clean ? "" : "out"); From 6cc8d57982fb5ecd09212b738214f26d421a0a9f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 16:29:01 +1000 Subject: [PATCH 3/6] If we're not reading from a terminal, parse lines at a time in ckpsmg allowing us to script its usage --- src/ckpmsg.c | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/ckpmsg.c b/src/ckpmsg.c index d8635be8..8b049865 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -98,8 +98,31 @@ int get_line(char **buf) { struct input_log *entry = NULL; int c, len = 0, ctl1, ctl2; + struct termios ctrl; *buf = NULL; + /* If we're not reading from a terminal, parse lines at a time allowing + * us to script usage of ckpmsg */ + if (!isatty(fileno((FILE *)stdin))) do { + size_t n; + + dealloc(*buf); + len = getline(buf, &n, stdin); + if (len < 2) { + if (len == -1) + dealloc(*buf); + LOGNOTICE("Failed to get a valid line"); + return 0; + } + len = strlen(*buf); + (*buf)[len - 1] = '\0'; // Strip \n + goto out; + } while (42); + + tcgetattr(STDIN_FILENO, &ctrl); + ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo + tcsetattr(STDIN_FILENO, TCSANOW, &ctrl); + do { c = getchar(); if (c == EOF || c == '\n') @@ -140,6 +163,7 @@ int get_line(char **buf) if (*buf) len = strlen(*buf); printf("\n"); +out: return len; } @@ -151,7 +175,6 @@ int main(int argc, char **argv) int tmo2 = RECV_UNIX_TIMEOUT2; int c, count, i = 0, j; char stamp[128]; - struct termios ctrl; while ((c = getopt_long(argc, argv, "chl:N:n:ps:t:T:", long_options, &i)) != -1) { switch(c) { @@ -224,18 +247,16 @@ int main(int argc, char **argv) trail_slash(&socket_dir); realloc_strcat(&socket_dir, sockname); - tcgetattr(STDIN_FILENO, &ctrl); - ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo - tcsetattr(STDIN_FILENO, TCSANOW, &ctrl); - count = 0; while (42) { struct input_log *log_entry; int sockd, len; + char *buf2; - dealloc(buf); len = get_line(&buf); - if (len < 2) { + if (!buf) + break; + if (len < 1) { LOGERR("%s No message", stamp); continue; } @@ -258,15 +279,15 @@ int main(int argc, char **argv) LOGERR("Failed to send unix msg: %s", buf); break; } - buf = NULL; - buf = recv_unix_msg_tmo2(sockd, tmo1, tmo2); + buf2 = recv_unix_msg_tmo2(sockd, tmo1, tmo2); close(sockd); - if (!buf) { + if (!buf2) { LOGERR("Received empty reply"); continue; } mkstamp(stamp, sizeof(stamp)); - LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf); + LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf2); + dealloc(buf2); if (counter) { if ((++count % 100) == 0) { @@ -276,7 +297,7 @@ int main(int argc, char **argv) } } - dealloc(buf); dealloc(socket_dir); + return 0; } From 05dc708744d5808cf56399c4b43c2b1487317dce Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 17:02:31 +1000 Subject: [PATCH 4/6] Add signal handler to ckpmsg to return console to normal on exit --- src/ckpmsg.c | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 8b049865..0765e2cc 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -11,6 +11,7 @@ #include "config.h" #include +#include #include #include #include @@ -94,6 +95,16 @@ static struct option long_options[] = { {0, 0, 0, 0} }; +struct termios oldctrl; + +static void sighandler(const int __maybe_unused sig) +{ + /* Return console to its previous state */ + tcsetattr(STDIN_FILENO, TCSANOW, &oldctrl); + + exit(0); +} + int get_line(char **buf) { struct input_log *entry = NULL; @@ -108,14 +119,12 @@ int get_line(char **buf) dealloc(*buf); len = getline(buf, &n, stdin); - if (len < 2) { - if (len == -1) - dealloc(*buf); - LOGNOTICE("Failed to get a valid line"); - return 0; + if (len == -1) { + dealloc(*buf); + goto out; } len = strlen(*buf); - (*buf)[len - 1] = '\0'; // Strip \n + (*buf)[--len] = '\0'; // Strip \n goto out; } while (42); @@ -173,9 +182,12 @@ int main(int argc, char **argv) bool proxy = false, counter = false; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; + struct sigaction handler; int c, count, i = 0, j; char stamp[128]; + tcgetattr(STDIN_FILENO, &oldctrl); + while ((c = getopt_long(argc, argv, "chl:N:n:ps:t:T:", long_options, &i)) != -1) { switch(c) { /* You'd normally disable most logmsg with -l 3 to @@ -247,6 +259,16 @@ int main(int argc, char **argv) trail_slash(&socket_dir); realloc_strcat(&socket_dir, sockname); + signal(SIGPIPE, SIG_IGN); + handler.sa_handler = &sighandler; + handler.sa_flags = 0; + sigemptyset(&handler.sa_mask); + sigaction(SIGTERM, &handler, NULL); + sigaction(SIGINT, &handler, NULL); + sigaction(SIGQUIT, &handler, NULL); + sigaction(SIGKILL, &handler, NULL); + sigaction(SIGHUP, &handler, NULL); + count = 0; while (42) { struct input_log *log_entry; @@ -254,13 +276,13 @@ int main(int argc, char **argv) char *buf2; len = get_line(&buf); - if (!buf) + if (len == -1) break; + mkstamp(stamp, sizeof(stamp)); if (len < 1) { LOGERR("%s No message", stamp); continue; } - mkstamp(stamp, sizeof(stamp)); if (buf[0] == '#') { LOGDEBUG("%s Got comment: %s", stamp, buf); continue; @@ -298,6 +320,7 @@ int main(int argc, char **argv) } dealloc(socket_dir); + sighandler(0); return 0; } From 88d36209f31efcfbc61a9d2c313529bf8e58467f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 17:09:57 +1000 Subject: [PATCH 5/6] Fix propagation of global proxy passwords --- src/generator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/generator.c b/src/generator.c index 9ed5132a..5eb71418 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2888,7 +2888,7 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id proxy->id = id; proxy->url = strdup(ckp->proxyurl[id]); proxy->auth = strdup(ckp->proxyauth[id]); - if (proxy->pass) + if (ckp->proxypass[id]) proxy->pass = strdup(ckp->proxypass[id]); else proxy->pass = strdup(""); From a8cdc4470aa1ce774da3f321630b376432a14c4a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 17:13:45 +1000 Subject: [PATCH 6/6] Raise the original signal on exiting signal handler in ckpmsg --- src/ckpmsg.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 0765e2cc..206b9056 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -97,12 +97,15 @@ static struct option long_options[] = { struct termios oldctrl; -static void sighandler(const int __maybe_unused sig) +static void sighandler(const int sig) { /* Return console to its previous state */ tcsetattr(STDIN_FILENO, TCSANOW, &oldctrl); - exit(0); + if (sig) { + signal (sig, SIG_DFL); + raise (sig); + } } int get_line(char **buf)