Browse Source

Handle proxy failover correctly for remainder of upstream pools when a reconnect is issued and don't lose original pool details, demoting it to last instead

master
Con Kolivas 10 years ago
parent
commit
416b7a02eb
  1. 1
      src/ckpool.h
  2. 146
      src/generator.c

1
src/ckpool.h

@ -183,7 +183,6 @@ struct ckpool_instance {
char **serverurl; // Array of URLs to bind our server/proxy to char **serverurl; // Array of URLs to bind our server/proxy to
int serverurls; // Number of server bindings int serverurls; // Number of server bindings
int update_interval; // Seconds between stratum updates int update_interval; // Seconds between stratum updates
int chosen_server; // Chosen server for next connection
/* Proxy options */ /* Proxy options */
int proxies; int proxies;

146
src/generator.c

@ -71,12 +71,18 @@ struct pass_msg {
typedef struct pass_msg pass_msg_t; typedef struct pass_msg pass_msg_t;
typedef struct proxy_instance proxy_instance_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
proxy_instance_t *next;
proxy_instance_t *prev;
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
server_instance_t *si; server_instance_t *si;
bool passthrough; bool passthrough;
int id; /* Proxy server id */
const char *auth; const char *auth;
const char *pass; const char *pass;
@ -92,14 +98,13 @@ struct proxy_instance {
double diff; double diff;
tv_t last_share; tv_t last_share;
int id; /* Message id for sending stratum messages */ int msg_id; /* Message id for sending stratum messages */
bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_sessionid; /* Doesn't support session id resume on subscribe */
bool no_params; /* Doesn't want any parameters on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */
bool notified; /* Received new template for work */ bool notified; /* Received new template for work */
bool diffed; /* Received new diff */ bool diffed; /* Received new diff */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool replaced; /* This proxy has issued a reconnect with new data */
pthread_mutex_t notify_lock; pthread_mutex_t notify_lock;
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
@ -121,10 +126,10 @@ struct proxy_instance {
char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ char_entry_t *recvd_lines; /* Linked list of unprocessed messages */
}; };
typedef struct proxy_instance proxy_instance_t;
/* Private data for the generator */ /* Private data for the generator */
struct generator_data { struct generator_data {
pthread_mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxy_list; /* Linked list of all active proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
}; };
@ -653,19 +658,19 @@ retry:
/* Attempt to reconnect if the pool supports resuming */ /* Attempt to reconnect if the pool supports resuming */
if (proxi->sessionid) { if (proxi->sessionid) {
JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", JSON_CPACK(req, "{s:i,s:s,s:[s,s]}",
"id", proxi->id++, "id", proxi->msg_id++,
"method", "mining.subscribe", "method", "mining.subscribe",
"params", PACKAGE"/"VERSION, proxi->sessionid); "params", PACKAGE"/"VERSION, proxi->sessionid);
/* Then attempt to connect with just the client description */ /* Then attempt to connect with just the client description */
} else if (!proxi->no_params) { } else if (!proxi->no_params) {
JSON_CPACK(req, "{s:i,s:s,s:[s]}", JSON_CPACK(req, "{s:i,s:s,s:[s]}",
"id", proxi->id++, "id", proxi->msg_id++,
"method", "mining.subscribe", "method", "mining.subscribe",
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
/* Then try without any parameters */ /* Then try without any parameters */
} else { } else {
JSON_CPACK(req, "{s:i,s:s,s:[]}", JSON_CPACK(req, "{s:i,s:s,s:[]}",
"id", proxi->id++, "id", proxi->msg_id++,
"method", "mining.subscribe", "method", "mining.subscribe",
"params"); "params");
} }
@ -875,7 +880,9 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val)
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{ {
server_instance_t *newsi, *si = proxi->si; server_instance_t *newsi, *si = proxi->si;
proxy_instance_t *newproxi;
ckpool_t *ckp = proxi->ckp; ckpool_t *ckp = proxi->ckp;
gdata_t *gdata = ckp->data;
const char *new_url; const char *new_url;
bool ret = false; bool ret = false;
int new_port; int new_port;
@ -920,27 +927,25 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
ret = true; ret = true;
newsi = ckzalloc(sizeof(server_instance_t)); newsi = ckzalloc(sizeof(server_instance_t));
newsi->id = ckp->proxies;
ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * (ckp->proxies + 1)); mutex_lock(&gdata->lock);
newsi->id = si->id; /* Inherit the old connection's id */
si->id = ckp->proxies++; /* Give the old connection the lowest id */
ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies);
ckp->servers[newsi->id] = newsi; ckp->servers[newsi->id] = newsi;
newsi->url = url; newsi->url = url;
newsi->auth = strdup(si->auth); newsi->auth = strdup(si->auth);
newsi->pass = strdup(si->pass); newsi->pass = strdup(si->pass);
proxi->reconnect = true; proxi->reconnect = true;
proxi->replaced = true;
/* Reuse variable on a new proxy instance */
proxi = ckzalloc(sizeof(proxy_instance_t));
newsi->data = proxi;
proxi->auth = newsi->auth;
proxi->pass = newsi->pass;
proxi->si = newsi;
proxi->ckp = ckp;
proxi->cs = &newsi->cs;
/* Set chosen server only once all new proxy data exists */ newproxi = ckzalloc(sizeof(proxy_instance_t));
ckp->proxies++; newsi->data = newproxi;
ckp->chosen_server = newsi->id; newproxi->auth = newsi->auth;
newproxi->pass = newsi->pass;
newproxi->si = newsi;
newproxi->ckp = ckp;
newproxi->cs = &newsi->cs;
mutex_unlock(&gdata->lock);
out: out:
return ret; return ret;
} }
@ -1035,7 +1040,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
bool ret; bool ret;
JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", JSON_CPACK(req, "{s:i,s:s,s:[s,s]}",
"id", proxi->id++, "id", proxi->msg_id++,
"method", "mining.authorize", "method", "mining.authorize",
"params", proxi->auth, proxi->pass); "params", proxi->auth, proxi->pass);
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
@ -1430,9 +1435,6 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
{ {
bool ret = false; bool ret = false;
if (proxi->replaced)
return false;
/* Has this proxy already been reconnected? */ /* Has this proxy already been reconnected? */
if (cs->fd > 0) if (cs->fd > 0)
return true; return true;
@ -1484,21 +1486,29 @@ out:
/* Cycle through the available proxies and find the first alive one */ /* Cycle through the available proxies and find the first alive one */
static proxy_instance_t *live_proxy(ckpool_t *ckp) static proxy_instance_t *live_proxy(ckpool_t *ckp)
{ {
int i, start_from = ckp->chosen_server;
proxy_instance_t *alive = NULL; proxy_instance_t *alive = NULL;
gdata_t *gdata = ckp->data;
connsock_t *cs; connsock_t *cs;
int i, srvs;
LOGDEBUG("Attempting to connect to proxy"); LOGDEBUG("Attempting to connect to proxy");
retry: retry:
if (!ping_main(ckp)) if (!ping_main(ckp))
goto out; goto out;
for (i = start_from; i < ckp->proxies; i++) { mutex_lock(&gdata->lock);
srvs = ckp->proxies;
mutex_unlock(&gdata->lock);
for (i = 0; i < srvs; i++) {
proxy_instance_t *proxi; proxy_instance_t *proxi;
server_instance_t *si; server_instance_t *si;
mutex_lock(&gdata->lock);
si = ckp->servers[i]; si = ckp->servers[i];
proxi = si->data; proxi = si->data;
mutex_unlock(&gdata->lock);
cs = proxi->cs; cs = proxi->cs;
if (proxy_alive(ckp, si, proxi, cs, false)) { if (proxy_alive(ckp, si, proxi, cs, false)) {
alive = proxi; alive = proxi;
@ -1508,13 +1518,11 @@ retry:
if (!alive) { if (!alive) {
send_proc(ckp->connector, "reject"); send_proc(ckp->connector, "reject");
send_proc(ckp->stratifier, "dropall"); send_proc(ckp->stratifier, "dropall");
if (!start_from) {
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5); sleep(5);
}
goto retry; goto retry;
} }
start_from = 0;
cs = alive->cs; cs = alive->cs;
LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port,
ckp->passthrough ? " in passthrough mode" : ""); ckp->passthrough ? " in passthrough mode" : "");
@ -1579,10 +1587,10 @@ reconnect:
if (!proxi) if (!proxi)
goto out; goto out;
if (reconnecting) { if (reconnecting) {
reconnecting = false;
connsock_t *cs = proxi->cs; connsock_t *cs = proxi->cs;
LOGWARNING("Successfully reconnected to %s:%s as proxy", LOGWARNING("Successfully reconnected to %s:%s as proxy",
cs->url, cs->port); cs->url, cs->port);
reconnecting = false;
} }
/* We've just subscribed and authorised so tell the stratifier to /* We've just subscribed and authorised so tell the stratifier to
@ -1690,10 +1698,13 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
gdata_t *gdata = ckp->data;
proxy_instance_t *proxi; proxy_instance_t *proxi;
server_instance_t *si; server_instance_t *si;
int i, ret; int i, ret;
mutex_init(&gdata->lock);
/* Create all our proxy structures and pointers */ /* Create all our proxy structures and pointers */
ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies);
for (i = 0; i < ckp->proxies; i++) { for (i = 0; i < ckp->proxies; i++) {
@ -1718,6 +1729,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
ret = proxy_loop(pi); ret = proxy_loop(pi);
mutex_lock(&gdata->lock);
for (i = 0; i < ckp->proxies; i++) { for (i = 0; i < ckp->proxies; i++) {
si = ckp->servers[i]; si = ckp->servers[i];
Close(si->cs.fd); Close(si->cs.fd);
@ -1735,6 +1747,8 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
dealloc(si->pass); dealloc(si->pass);
dealloc(si); dealloc(si);
} }
mutex_unlock(&gdata->lock);
dealloc(ckp->servers); dealloc(ckp->servers);
return ret; return ret;
} }
@ -1743,6 +1757,40 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
* should check to see if the higher priority servers are alive and fallback */ * should check to see if the higher priority servers are alive and fallback */
static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{ {
static time_t last_t = 0;
bool alive = false;
time_t now_t;
int i;
/* Rate limit to checking only once every 5 seconds */
now_t = time(NULL);
if (now_t <= last_t + 5)
return;
last_t = now_t;
/* Is this the highest priority server already? */
if (!cursi->id)
return;
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
/* Have we reached the current server? */
if (si == cursi)
return;
alive = server_alive(ckp, si, true);
if (alive)
break;
}
if (alive)
send_proc(ckp->generator, "reconnect");
}
static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{
gdata_t *gdata = ckp->data;
static time_t last_t = 0; static time_t last_t = 0;
bool alive = false; bool alive = false;
time_t now_t; time_t now_t;
@ -1756,27 +1804,29 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
last_t = now_t; last_t = now_t;
/* Is this the highest priority server already? */ /* Is this the highest priority server already? */
if (cursi == ckp->servers[ckp->chosen_server]) if (!cursi->id)
return; return;
if (ckp->proxy) mutex_lock(&gdata->lock);
srvs = ckp->proxies; srvs = ckp->proxies;
else mutex_unlock(&gdata->lock);
srvs = ckp->btcds;
for (i = 0; i < srvs; i++) { for (i = 0; i < srvs; i++) {
server_instance_t *si = ckp->servers[i]; proxy_instance_t *proxi;
server_instance_t *si;
connsock_t *cs;
mutex_lock(&gdata->lock);
si = ckp->servers[i];
proxi = si->data;
mutex_unlock(&gdata->lock);
/* Have we reached the current server? */ /* Have we reached the current server? */
if (si == cursi) if (si == cursi)
return; return;
if (ckp->proxy) { cs = proxi->cs;
proxy_instance_t *proxi = si->data;
connsock_t *cs = proxi->cs;
alive = proxy_alive(ckp, si, proxi, cs, true); alive = proxy_alive(ckp, si, proxi, cs, true);
} else
alive = server_alive(ckp, si, true);
if (alive) if (alive)
break; break;
} }
@ -1784,6 +1834,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
send_proc(ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
} }
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)
{ {
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
@ -1793,12 +1844,13 @@ int generator(proc_instance_t *pi)
LOGWARNING("%s generator starting", ckp->name); LOGWARNING("%s generator starting", ckp->name);
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); if (ckp->proxy) {
gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog);
if (ckp->proxy)
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);
else } else {
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);
ret = server_mode(ckp, pi); ret = server_mode(ckp, pi);
}
dealloc(ckp->data); dealloc(ckp->data);
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);

Loading…
Cancel
Save