kanoi 8 years ago
parent
commit
6fcc818cbb
  1. 71
      src/ckpmsg.c
  2. 2
      src/generator.c
  3. 103
      src/stratifier.c

71
src/ckpmsg.c

@ -11,6 +11,7 @@
#include "config.h" #include "config.h"
#include <getopt.h> #include <getopt.h>
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <termios.h> #include <termios.h>
@ -94,12 +95,46 @@ static struct option long_options[] = {
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
struct termios oldctrl;
static void sighandler(const int sig)
{
/* Return console to its previous state */
tcsetattr(STDIN_FILENO, TCSANOW, &oldctrl);
if (sig) {
signal (sig, SIG_DFL);
raise (sig);
}
}
int get_line(char **buf) int get_line(char **buf)
{ {
struct input_log *entry = NULL; struct input_log *entry = NULL;
int c, len = 0, ctl1, ctl2; int c, len = 0, ctl1, ctl2;
struct termios ctrl;
*buf = NULL; *buf = NULL;
/* If we're not reading from a terminal, parse lines at a time allowing
* us to script usage of ckpmsg */
if (!isatty(fileno((FILE *)stdin))) do {
size_t n;
dealloc(*buf);
len = getline(buf, &n, stdin);
if (len == -1) {
dealloc(*buf);
goto out;
}
len = strlen(*buf);
(*buf)[--len] = '\0'; // Strip \n
goto out;
} while (42);
tcgetattr(STDIN_FILENO, &ctrl);
ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo
tcsetattr(STDIN_FILENO, TCSANOW, &ctrl);
do { do {
c = getchar(); c = getchar();
if (c == EOF || c == '\n') if (c == EOF || c == '\n')
@ -140,6 +175,7 @@ int get_line(char **buf)
if (*buf) if (*buf)
len = strlen(*buf); len = strlen(*buf);
printf("\n"); printf("\n");
out:
return len; return len;
} }
@ -149,9 +185,11 @@ int main(int argc, char **argv)
bool proxy = false, counter = false; bool proxy = false, counter = false;
int tmo1 = RECV_UNIX_TIMEOUT1; int tmo1 = RECV_UNIX_TIMEOUT1;
int tmo2 = RECV_UNIX_TIMEOUT2; int tmo2 = RECV_UNIX_TIMEOUT2;
struct sigaction handler;
int c, count, i = 0, j; int c, count, i = 0, j;
char stamp[128]; char stamp[128];
struct termios ctrl;
tcgetattr(STDIN_FILENO, &oldctrl);
while ((c = getopt_long(argc, argv, "chl:N:n:ps:t:T:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "chl:N:n:ps:t:T:", long_options, &i)) != -1) {
switch(c) { switch(c) {
@ -224,22 +262,30 @@ int main(int argc, char **argv)
trail_slash(&socket_dir); trail_slash(&socket_dir);
realloc_strcat(&socket_dir, sockname); realloc_strcat(&socket_dir, sockname);
tcgetattr(STDIN_FILENO, &ctrl); signal(SIGPIPE, SIG_IGN);
ctrl.c_lflag &= ~(ICANON | ECHO); // turn off canonical mode and echo handler.sa_handler = &sighandler;
tcsetattr(STDIN_FILENO, TCSANOW, &ctrl); handler.sa_flags = 0;
sigemptyset(&handler.sa_mask);
sigaction(SIGTERM, &handler, NULL);
sigaction(SIGINT, &handler, NULL);
sigaction(SIGQUIT, &handler, NULL);
sigaction(SIGKILL, &handler, NULL);
sigaction(SIGHUP, &handler, NULL);
count = 0; count = 0;
while (42) { while (42) {
struct input_log *log_entry; struct input_log *log_entry;
int sockd, len; int sockd, len;
char *buf2;
dealloc(buf);
len = get_line(&buf); len = get_line(&buf);
if (len < 2) { if (len == -1)
break;
mkstamp(stamp, sizeof(stamp));
if (len < 1) {
LOGERR("%s No message", stamp); LOGERR("%s No message", stamp);
continue; continue;
} }
mkstamp(stamp, sizeof(stamp));
if (buf[0] == '#') { if (buf[0] == '#') {
LOGDEBUG("%s Got comment: %s", stamp, buf); LOGDEBUG("%s Got comment: %s", stamp, buf);
continue; continue;
@ -258,15 +304,15 @@ int main(int argc, char **argv)
LOGERR("Failed to send unix msg: %s", buf); LOGERR("Failed to send unix msg: %s", buf);
break; break;
} }
buf = NULL; buf2 = recv_unix_msg_tmo2(sockd, tmo1, tmo2);
buf = recv_unix_msg_tmo2(sockd, tmo1, tmo2);
close(sockd); close(sockd);
if (!buf) { if (!buf2) {
LOGERR("Received empty reply"); LOGERR("Received empty reply");
continue; continue;
} }
mkstamp(stamp, sizeof(stamp)); mkstamp(stamp, sizeof(stamp));
LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf); LOGMSGSIZ(65536, LOG_NOTICE, "%s Received response: %s", stamp, buf2);
dealloc(buf2);
if (counter) { if (counter) {
if ((++count % 100) == 0) { if ((++count % 100) == 0) {
@ -276,7 +322,8 @@ int main(int argc, char **argv)
} }
} }
dealloc(buf);
dealloc(socket_dir); dealloc(socket_dir);
sighandler(0);
return 0; return 0;
} }

2
src/generator.c

@ -2888,7 +2888,7 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy->id = id; proxy->id = id;
proxy->url = strdup(ckp->proxyurl[id]); proxy->url = strdup(ckp->proxyurl[id]);
proxy->auth = strdup(ckp->proxyauth[id]); proxy->auth = strdup(ckp->proxyauth[id]);
if (proxy->pass) if (ckp->proxypass[id])
proxy->pass = strdup(ckp->proxypass[id]); proxy->pass = strdup(ckp->proxypass[id]);
else else
proxy->pass = strdup(""); proxy->pass = strdup("");

103
src/stratifier.c

@ -2081,19 +2081,25 @@ static int64_t prio_sort(proxy_t *a, proxy_t *b)
return (a->priority - b->priority); return (a->priority - b->priority);
} }
/* Masked increment */
static int64_t masked_inc(int64_t value, int64_t mask)
{
value &= ~mask;
value++;
value |= mask;
return value;
}
/* Priority values can be sparse, they do not need to be sequential */ /* Priority values can be sparse, they do not need to be sequential */
static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority)
{ {
proxy_t *tmpa, *tmpb, *exists = NULL; proxy_t *tmpa, *tmpb, *exists = NULL;
int64_t next_prio = 0; int64_t mask, next_prio = 0;
/* Encode the userid as the high bits in priority */ /* Encode the userid as the high bits in priority */
if (!proxy->global) { mask = proxy->userid;
int64_t high_bits = proxy->userid; mask <<= 32;
priority |= mask;
high_bits <<= 32;
priority |= high_bits;
}
/* See if the priority is already in use */ /* See if the priority is already in use */
HASH_ITER(hh, sdata->proxies, tmpa, tmpb) { HASH_ITER(hh, sdata->proxies, tmpa, tmpb) {
@ -2101,7 +2107,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority)
break; break;
if (tmpa->priority == priority) { if (tmpa->priority == priority) {
exists = tmpa; exists = tmpa;
next_prio = exists->priority + 1; next_prio = masked_inc(priority, mask);
break; break;
} }
} }
@ -2109,7 +2115,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority)
HASH_ITER(hh, exists, tmpa, tmpb) { HASH_ITER(hh, exists, tmpa, tmpb) {
if (tmpa->priority > next_prio) if (tmpa->priority > next_prio)
break; break;
tmpa->priority++; tmpa->priority = masked_inc(tmpa->priority, mask);
next_prio++; next_prio++;
} }
proxy->priority = priority; proxy->priority = priority;
@ -2225,11 +2231,16 @@ static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid)
return subproxy; return subproxy;
} }
static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid);
static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority) static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority)
{ {
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
__set_proxy_prio(sdata, proxy, priority); __set_proxy_prio(sdata, proxy, priority);
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
if (!proxy->global)
check_userproxies(sdata, proxy, proxy->userid);
} }
/* Set proxy to the current proxy and calculate how much headroom it has */ /* Set proxy to the current proxy and calculate how much headroom it has */
@ -2290,7 +2301,7 @@ static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int
/* Find how much headroom we have and connect up to that many clients that are /* Find how much headroom we have and connect up to that many clients that are
* not currently on this pool, recruiting more slots to switch more clients * not currently on this pool, recruiting more slots to switch more clients
* later on lazily. Only reconnect clients bound to global proxies. */ * later on lazily. Only reconnect clients bound to global proxies. */
static void reconnect_clients(sdata_t *sdata) static void reconnect_global_clients(sdata_t *sdata)
{ {
stratum_instance_t *client, *tmpclient; stratum_instance_t *client, *tmpclient;
int reconnects = 0; int reconnects = 0;
@ -2372,6 +2383,32 @@ static void check_bestproxy(sdata_t *sdata)
LOGNOTICE("Stratifier setting active proxy to %d", changed_id); LOGNOTICE("Stratifier setting active proxy to %d", changed_id);
} }
static proxy_t *best_proxy(sdata_t *sdata)
{
proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
proxy = sdata->proxy;
mutex_unlock(&sdata->proxy_lock);
return proxy;
}
static void check_globalproxies(sdata_t *sdata, proxy_t *proxy)
{
check_bestproxy(sdata);
if (proxy->parent == best_proxy(sdata)->parent)
reconnect_global_clients(sdata);
}
static void check_proxy(sdata_t *sdata, proxy_t *proxy)
{
if (proxy->global)
check_globalproxies(sdata, proxy);
else
check_userproxies(sdata, proxy, proxy->userid);
}
static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted) static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bool replaced, const bool deleted)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
@ -2383,6 +2420,7 @@ static void dead_proxyid(sdata_t *sdata, const int id, const int subid, const bo
if (proxy) { if (proxy) {
proxy->dead = true; proxy->dead = true;
proxy->deleted = deleted; proxy->deleted = deleted;
set_proxy_prio(sdata, proxy, 0xFFFF);
if (!replaced && proxy->global) if (!replaced && proxy->global)
check_bestproxy(sdata); check_bestproxy(sdata);
} }
@ -2516,6 +2554,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
LOGWARNING("Only able to set nonce2len %d of requested %d on proxy %d:%d", LOGWARNING("Only able to set nonce2len %d of requested %d on proxy %d:%d",
proxy->enonce2varlen, ckp->nonce2length, id, subid); proxy->enonce2varlen, ckp->nonce2length, id, subid);
json_decref(val); json_decref(val);
check_proxy(sdata, proxy);
} }
/* Find the highest priority alive proxy belonging to userid and recruit extra /* Find the highest priority alive proxy belonging to userid and recruit extra
@ -2544,7 +2584,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r
/* Check how much headroom the userid proxies have and reconnect any clients /* Check how much headroom the userid proxies have and reconnect any clients
* that are not bound to it that should be */ * that are not bound to it that should be */
static void check_userproxies(sdata_t *sdata, const int userid) static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid)
{ {
int64_t headroom = proxy_headroom(sdata, userid); int64_t headroom = proxy_headroom(sdata, userid);
stratum_instance_t *client, *tmpclient; stratum_instance_t *client, *tmpclient;
@ -2558,8 +2598,10 @@ static void check_userproxies(sdata_t *sdata, const int userid)
continue; continue;
if (client->user_id != userid) if (client->user_id != userid)
continue; continue;
/* Is this client bound to a dead proxy? */ /* Is the client already bound to a proxy of its own userid of
if (!client->reconnect && client->proxy->userid == userid) * a higher priority than this one. */
if (client->proxy->userid == userid &&
client->proxy->parent->priority <= proxy->parent->priority)
continue; continue;
if (headroom-- < 1) if (headroom-- < 1)
continue; continue;
@ -2576,17 +2618,6 @@ static void check_userproxies(sdata_t *sdata, const int userid)
recruit_best_userproxy(sdata, userid, -headroom); recruit_best_userproxy(sdata, userid, -headroom);
} }
static proxy_t *best_proxy(sdata_t *sdata)
{
proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
proxy = sdata->proxy;
mutex_unlock(&sdata->proxy_lock);
return proxy;
}
static void update_notify(ckpool_t *ckp, const char *cmd) static void update_notify(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->sdata, *dsdata; sdata_t *sdata = ckp->sdata, *dsdata;
@ -2676,12 +2707,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash); LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash);
} }
if (proxy->global) { check_proxy(sdata, proxy);
check_bestproxy(sdata);
if (proxy->parent != best_proxy(sdata)->parent)
reconnect_clients(sdata);
} else
check_userproxies(sdata, proxy->userid);
clean |= new_block; clean |= new_block;
LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id,
subid, clean ? "" : "out"); subid, clean ? "" : "out");
@ -4355,19 +4381,16 @@ static proxy_t *__best_subproxy(proxy_t *proxy)
* running out of room. */ * running out of room. */
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid)
{ {
proxy_t *current, *proxy, *tmp, *best = NULL; proxy_t *global, *proxy, *tmp, *best = NULL;
if (!ckp->proxy || ckp->passthrough) if (!ckp->proxy || ckp->passthrough)
return ckp_sdata; return ckp_sdata;
current = ckp_sdata->proxy;
if (!current) {
LOGWARNING("No proxy available yet to generate subscribes");
return NULL;
}
/* Proxies are ordered by priority so first available will be the best /* Proxies are ordered by priority so first available will be the best
* priority */ * priority */
mutex_lock(&ckp_sdata->proxy_lock); mutex_lock(&ckp_sdata->proxy_lock);
best = global = ckp_sdata->proxy;
HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) {
if (proxy->userid < userid) if (proxy->userid < userid)
continue; continue;
@ -4381,12 +4404,14 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int
if (!best) { if (!best) {
if (!userid) if (!userid)
LOGWARNING("Temporarily insufficient subproxies to accept more clients"); LOGWARNING("Temporarily insufficient proxies to accept more clients");
else
LOGNOTICE("Temporarily insufficient proxies for userid %d to accept more clients", userid);
return NULL; return NULL;
} }
if (!userid) { if (!userid) {
if (best->id != current->id || current_headroom(ckp_sdata, &proxy) < 2) if (best->id != global->id || current_headroom(ckp_sdata, &proxy) < 2)
generator_recruit(ckp, current->id, 1); generator_recruit(ckp, global->id, 1);
} else { } else {
if (proxy_headroom(ckp_sdata, userid) < 2) if (proxy_headroom(ckp_sdata, userid) < 2)
generator_recruit(ckp, best->id, 1); generator_recruit(ckp, best->id, 1);

Loading…
Cancel
Save