diff --git a/src/ckpmsg.c b/src/ckpmsg.c index d8635be8..206b9056 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -11,6 +11,7 @@ #include "config.h" #include +#include #include #include #include @@ -94,12 +95,46 @@ static struct option long_options[] = { {0, 0, 0, 0} }; +struct termios oldctrl; + +static void sighandler(const int sig) +{ + /* Return console to its previous state */ + tcsetattr(STDIN_FILENO, TCSANOW, &oldctrl); + + if (sig) { + signal (sig, SIG_DFL); + raise (sig); + } +} + int get_line(char **buf) { struct input_log *entry = NULL; int c, len = 0, ctl1, ctl2; + struct termios ctrl; *buf = NULL; + /* If we're not reading from a terminal, parse lines at a time allowing + * us to script usage of ckpmsg */ + if (!isatty(fileno((FILE *)stdin))) do { + size_t n; + + dealloc(*buf); + len = getline(buf, &n, stdin); + if (len == -1) { + dealloc(*buf); + goto out; + } + len = strlen(*buf); + (*buf)[--len] = '\0'; // Strip \n + goto out; + } while (42); + + tcgetattr(STDIN_FILENO, &ctrl); + ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo + tcsetattr(STDIN_FILENO, TCSANOW, &ctrl); + do { c = getchar(); if (c == EOF || c == '\n') @@ -140,6 +175,7 @@ int get_line(char **buf) if (*buf) len = strlen(*buf); printf("\n"); +out: return len; } @@ -149,9 +185,11 @@ int main(int argc, char **argv) bool proxy = false, counter = false; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; + struct sigaction handler; int c, count, i = 0, j; char stamp[128]; - struct termios ctrl; + + tcgetattr(STDIN_FILENO, &oldctrl); while ((c = getopt_long(argc, argv, "chl:N:n:ps:t:T:", long_options, &i)) != -1) { switch(c) { @@ -224,22 +262,30 @@ int main(int argc, char **argv) trail_slash(&socket_dir); realloc_strcat(&socket_dir, sockname); - tcgetattr(STDIN_FILENO, &ctrl); - ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo - tcsetattr(STDIN_FILENO, TCSANOW, &ctrl); + signal(SIGPIPE, SIG_IGN); + handler.sa_handler = &sighandler; + handler.sa_flags = 0; + sigemptyset(&handler.sa_mask); + sigaction(SIGTERM, &handler, NULL); + sigaction(SIGINT, &handler, NULL); + sigaction(SIGQUIT, &handler, NULL); + sigaction(SIGKILL, &handler, NULL); + sigaction(SIGHUP, &handler, NULL); count = 0; while (42) { struct input_log *log_entry; int sockd, len; + char *buf2; - dealloc(buf); len = get_line(&buf); - if (len < 2) { + if (len == -1) + break; + mkstamp(stamp, sizeof(stamp)); + if (len < 1) { LOGERR("%s No message", stamp); continue; } - mkstamp(stamp, sizeof(stamp)); if (buf[0] == '#') { LOGDEBUG("%s Got comment: %s", stamp, buf); continue; @@ -258,15 +304,15 @@ int main(int argc, char **argv) LOGERR("Failed to send unix msg: %s", buf); break; } - buf = NULL; - buf = recv_unix_msg_tmo2(sockd, tmo1, tmo2); + buf2 = recv_unix_msg_tmo2(sockd, tmo1, tmo2); close(sockd); - if (!buf) { + if (!buf2) { LOGERR("Received empty reply"); continue; } mkstamp(stamp, sizeof(stamp)); - LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf); + LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf2); + dealloc(buf2); if (counter) { if ((++count % 100) == 0) { @@ -276,7 +322,8 @@ int main(int argc, char **argv) } } - dealloc(buf); dealloc(socket_dir); + sighandler(0); + return 0; } diff --git a/src/generator.c b/src/generator.c index 9ed5132a..5eb71418 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2888,7 +2888,7 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id proxy->id = id; proxy->url = strdup(ckp->proxyurl[id]); proxy->auth = strdup(ckp->proxyauth[id]); - if (proxy->pass) + if (ckp->proxypass[id]) proxy->pass = strdup(ckp->proxypass[id]); else proxy->pass = strdup(""); diff --git a/src/stratifier.c b/src/stratifier.c index 304a5928..0b29b90c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2081,19 +2081,25 @@ static int64_t prio_sort(proxy_t *a, proxy_t *b) return (a->priority - b->priority); } +/* Masked increment */ +static int64_t masked_inc(int64_t value, int64_t mask) +{ + value &= ~mask; + value++; + value |= mask; + return value; +} + /* Priority values can be sparse, they do not need to be sequential */ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) { proxy_t *tmpa, *tmpb, *exists = NULL; - int64_t next_prio = 0; + int64_t mask, next_prio = 0; /* Encode the userid as the high bits in priority */ - if (!proxy->global) { - int64_t high_bits = proxy->userid; - - high_bits <<= 32; - priority |= high_bits; - } + mask = proxy->userid; + mask <<= 32; + priority |= mask; /* See if the priority is already in use */ HASH_ITER(hh, sdata->proxies, tmpa, tmpb) { @@ -2101,7 +2107,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) break; if (tmpa->priority == priority) { exists = tmpa; - next_prio = exists->priority + 1; + next_prio = masked_inc(priority, mask); break; } } @@ -2109,7 +2115,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) HASH_ITER(hh, exists, tmpa, tmpb) { if (tmpa->priority > next_prio) break; - tmpa->priority++; + tmpa->priority = masked_inc(tmpa->priority, mask); next_prio++; } proxy->priority = priority; @@ -2225,11 +2231,16 @@ static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid) return subproxy; } +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid); + static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority) { mutex_lock(&sdata->proxy_lock); __set_proxy_prio(sdata, proxy, priority); mutex_unlock(&sdata->proxy_lock); + + if (!proxy->global) + check_userproxies(sdata, proxy, proxy->userid); } /* Set proxy to the current proxy and calculate how much headroom it has */ @@ -2290,7 +2301,7 @@ static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int /* Find how much headroom we have and connect up to that many clients that are * not currently on this pool, recruiting more slots to switch more clients * later on lazily. Only reconnect clients bound to global proxies. */ -static void reconnect_clients(sdata_t *sdata) +static void reconnect_global_clients(sdata_t *sdata) { stratum_instance_t *client, *tmpclient; int reconnects = 0; @@ -2372,6 +2383,32 @@ static void check_bestproxy(sdata_t *sdata) LOGNOTICE("Stratifier setting active proxy to %d", changed_id); } +static proxy_t *best_proxy(sdata_t *sdata) +{ + proxy_t *proxy; + + mutex_lock(&sdata->proxy_lock); + proxy = sdata->proxy; + mutex_unlock(&sdata->proxy_lock); + + return proxy; +} + +static void check_globalproxies(sdata_t *sdata, proxy_t *proxy) +{ + check_bestproxy(sdata); + if (proxy->parent == best_proxy(sdata)->parent) + reconnect_global_clients(sdata); +} + +static void check_proxy(sdata_t *sdata, proxy_t *proxy) +{ + if (proxy->global) + check_globalproxies(sdata, proxy); + else + check_userproxies(sdata, proxy, proxy->userid); +} + static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted) { stratum_instance_t *client, *tmp; @@ -2383,6 +2420,7 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bo if (proxy) { proxy->dead = true; proxy->deleted = deleted; + set_proxy_prio(sdata, proxy, 0xFFFF); if (!replaced && proxy->global) check_bestproxy(sdata); } @@ -2516,6 +2554,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) LOGWARNING("Only able to set nonce2len %d of requested %d on proxy %d:%d", proxy->enonce2varlen, ckp->nonce2length, id, subid); json_decref(val); + + check_proxy(sdata, proxy); } /* Find the highest priority alive proxy belonging to userid and recruit extra @@ -2544,7 +2584,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r /* Check how much headroom the userid proxies have and reconnect any clients * that are not bound to it that should be */ -static void check_userproxies(sdata_t *sdata, const int userid) +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid) { int64_t headroom = proxy_headroom(sdata, userid); stratum_instance_t *client, *tmpclient; @@ -2558,8 +2598,10 @@ static void check_userproxies(sdata_t *sdata, const int userid) continue; if (client->user_id != userid) continue; - /* Is this client bound to a dead proxy? */ - if (!client->reconnect && client->proxy->userid == userid) + /* Is the client already bound to a proxy of its own userid of + * a higher priority than this one. */ + if (client->proxy->userid == userid && + client->proxy->parent->priority <= proxy->parent->priority) continue; if (headroom-- < 1) continue; @@ -2576,17 +2618,6 @@ static void check_userproxies(sdata_t *sdata, const int userid) recruit_best_userproxy(sdata, userid, -headroom); } -static proxy_t *best_proxy(sdata_t *sdata) -{ - proxy_t *proxy; - - mutex_lock(&sdata->proxy_lock); - proxy = sdata->proxy; - mutex_unlock(&sdata->proxy_lock); - - return proxy; -} - static void update_notify(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->sdata, *dsdata; @@ -2676,12 +2707,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd) LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash); } - if (proxy->global) { - check_bestproxy(sdata); - if (proxy->parent != best_proxy(sdata)->parent) - reconnect_clients(sdata); - } else - check_userproxies(sdata, proxy->userid); + check_proxy(sdata, proxy); clean |= new_block; LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, subid, clean ? "" : "out"); @@ -4355,19 +4381,16 @@ static proxy_t *__best_subproxy(proxy_t *proxy) * running out of room. */ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) { - proxy_t *current, *proxy, *tmp, *best = NULL; + proxy_t *global, *proxy, *tmp, *best = NULL; if (!ckp->proxy || ckp->passthrough) return ckp_sdata; - current = ckp_sdata->proxy; - if (!current) { - LOGWARNING("No proxy available yet to generate subscribes"); - return NULL; - } /* Proxies are ordered by priority so first available will be the best * priority */ mutex_lock(&ckp_sdata->proxy_lock); + best = global = ckp_sdata->proxy; + HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { if (proxy->userid < userid) continue; @@ -4381,12 +4404,14 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int if (!best) { if (!userid) - LOGWARNING("Temporarily insufficient subproxies to accept more clients"); + LOGWARNING("Temporarily insufficient proxies to accept more clients"); + else + LOGNOTICE("Temporarily insufficient proxies for userid %d to accept more clients", userid); return NULL; } if (!userid) { - if (best->id != current->id || current_headroom(ckp_sdata, &proxy) < 2) - generator_recruit(ckp, current->id, 1); + if (best->id != global->id || current_headroom(ckp_sdata, &proxy) < 2) + generator_recruit(ckp, global->id, 1); } else { if (proxy_headroom(ckp_sdata, userid) < 2) generator_recruit(ckp, best->id, 1);