kanoi 10 years ago
parent
commit
8ccaedaf42
  1. 105
      src/generator.c
  2. 3
      src/stratifier.c

105
src/generator.c

@ -104,7 +104,6 @@ struct proxy_instance {
pthread_mutex_t notify_lock; pthread_mutex_t notify_lock;
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
notify_instance_t *current_notify; notify_instance_t *current_notify;
int notify_id;
pthread_t pth_precv; pthread_t pth_precv;
pthread_t pth_psend; pthread_t pth_psend;
@ -122,6 +121,8 @@ struct proxy_instance {
typedef struct proxy_instance proxy_instance_t; typedef struct proxy_instance proxy_instance_t;
static int proxy_notify_id; // Globally increasing notify id
static ckmsgq_t *srvchk; // Server check message queue static ckmsgq_t *srvchk; // Server check message queue
static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
@ -132,6 +133,9 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
gbtbase_t *gbt; gbtbase_t *gbt;
cs = &si->cs; cs = &si->cs;
/* Has this server already been reconnected? */
if (cs->fd > 0)
return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return ret; return ret;
@ -161,18 +165,20 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
LOGINFO("Failed to get test block template from %s:%s!", LOGINFO("Failed to get test block template from %s:%s!",
cs->url, cs->port); cs->url, cs->port);
} }
goto out_close; goto out;
} }
clear_gbtbase(gbt); clear_gbtbase(gbt);
if (!validate_address(cs, ckp->btcaddress)) { if (!validate_address(cs, ckp->btcaddress)) {
LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress);
goto out_close; goto out;
} }
ret = true; ret = true;
out_close: out:
if (!ret) if (!ret) {
/* Close and invalidate the file handle */
close(cs->fd); close(cs->fd);
else cs->fd = -1;
} else
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
return ret; return ret;
} }
@ -213,8 +219,10 @@ static void kill_server(server_instance_t *si)
{ {
connsock_t *cs; connsock_t *cs;
if (!si) if (!si) // This shouldn't happen
return; return;
LOGNOTICE("Killing server");
cs = &si->cs; cs = &si->cs;
close(cs->fd); close(cs->fd);
cs->fd = -1; cs->fd = -1;
@ -348,7 +356,7 @@ retry:
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
else else
send_unix_msg(sockd, "false"); send_unix_msg(sockd, "false");
} else if (cmdmatch(buf, "fallback")) { } else if (cmdmatch(buf, "reconnect")) {
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
@ -388,7 +396,7 @@ static bool connect_proxy(connsock_t *cs)
{ {
cs->fd = connect_socket(cs->url, cs->port); cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect socket to %s:%s in connect_proxy", LOGINFO("Failed to connect socket to %s:%s in connect_proxy",
cs->url, cs->port); cs->url, cs->port);
return false; return false;
} }
@ -482,6 +490,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Failed to receive line in parse_subscribe"); LOGWARNING("Failed to receive line in parse_subscribe");
goto out; goto out;
} }
LOGDEBUG("parse_subscribe received %s", cs->buf);
val = json_msg_result(cs->buf, &res_val); val = json_msg_result(cs->buf, &res_val);
if (!val) { if (!val) {
LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf); LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf);
@ -588,14 +597,12 @@ retry:
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGWARNING("Failed to send message in subscribe_stratum"); LOGWARNING("Failed to send message in subscribe_stratum");
close(cs->fd);
goto out; goto out;
} }
ret = parse_subscribe(cs, proxi); ret = parse_subscribe(cs, proxi);
if (ret) if (ret)
goto out; goto out;
close(cs->fd);
if (proxi->no_params) { if (proxi->no_params) {
LOGWARNING("Failed all subscription options in subscribe_stratum"); LOGWARNING("Failed all subscription options in subscribe_stratum");
goto out; goto out;
@ -616,10 +623,10 @@ retry:
goto retry; goto retry;
out: out:
/* Only keep any downstream connections if we're successfully resuming if (!ret) {
* to the existing stratum sessionid */ close(cs->fd);
if (!ret || !proxi->sessionid) cs->fd = -1;
send_proc(proxi->ckp->stratifier, "dropall"); }
return ret; return ret;
} }
@ -723,7 +730,7 @@ static bool parse_notify(proxy_instance_t *proxi, json_t *val)
ni->notify_time = time(NULL); ni->notify_time = time(NULL);
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
ni->id = proxi->notify_id++; ni->id = proxy_notify_id++;
HASH_ADD_INT(proxi->notify_instances, id, ni); HASH_ADD_INT(proxi->notify_instances, id, ni);
proxi->current_notify = ni; proxi->current_notify = ni;
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxi->notify_lock);
@ -1283,24 +1290,30 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging) connsock_t *cs, bool pinging)
{ {
bool ret = false;
/* Has this proxy already been reconnected? */
if (cs->fd > 0)
return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return false; return ret;
} }
if (!connect_proxy(cs)) { if (!connect_proxy(cs)) {
if (!pinging) { if (!pinging) {
LOGINFO("Failed to connect to %s:%s in proxy_mode!", LOGINFO("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port); cs->url, cs->port);
} }
return false; return ret;
} }
if (ckp->passthrough) { if (ckp->passthrough) {
if (!passthrough_stratum(cs, proxi)) { if (!passthrough_stratum(cs, proxi)) {
LOGWARNING("Failed initial passthrough to %s:%s !", LOGWARNING("Failed initial passthrough to %s:%s !",
cs->url, cs->port); cs->url, cs->port);
return false; goto out;
} }
return true; ret = true;
goto out;
} }
/* Test we can connect, authorise and get stratum information */ /* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) { if (!subscribe_stratum(cs, proxi)) {
@ -1308,16 +1321,24 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
LOGINFO("Failed initial subscribe to %s:%s !", LOGINFO("Failed initial subscribe to %s:%s !",
cs->url, cs->port); cs->url, cs->port);
} }
return false; goto out;
} }
if (!auth_stratum(cs, proxi)) { if (!auth_stratum(cs, proxi)) {
if (!pinging) { if (!pinging) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass); cs->url, cs->port, si->auth, si->pass);
} }
return false; goto out;
} }
return true; ret = true;
out:
if (!ret) {
/* Close and invalidate the file handle */
close(cs->fd);
cs->fd = -1;
} else
keep_sockalive(cs->fd);
return ret;
} }
/* Cycle through the available proxies and find the first alive one */ /* Cycle through the available proxies and find the first alive one */
@ -1345,8 +1366,8 @@ retry:
} }
} }
if (!alive) { if (!alive) {
send_proc(ckp->stratifier, "dropall");
send_proc(ckp->connector, "reject"); send_proc(ckp->connector, "reject");
send_proc(ckp->stratifier, "dropall");
if (!ckp->chosen_server) { if (!ckp->chosen_server) {
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5); sleep(5);
@ -1357,7 +1378,6 @@ retry:
cs = alive->cs; cs = alive->cs;
LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port,
ckp->passthrough ? " in passthrough mode" : ""); ckp->passthrough ? " in passthrough mode" : "");
mutex_init(&alive->notify_lock);
if (ckp->passthrough) { if (ckp->passthrough) {
create_pthread(&alive->pth_precv, passthrough_recv, alive); create_pthread(&alive->pth_precv, passthrough_recv, alive);
alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
@ -1372,15 +1392,21 @@ out:
return alive; return alive;
} }
static void kill_proxy(proxy_instance_t *proxi) static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi)
{ {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; connsock_t *cs;
if (!proxi) send_proc(ckp->connector, "reject");
send_proc(ckp->stratifier, "dropall");
if (!proxi) // This shouldn't happen
return; return;
LOGNOTICE("Killing proxy");
cs = proxi->cs; cs = proxi->cs;
close(cs->fd); close(cs->fd);
cs->fd = -1;
/* All our notify data is invalid if we reconnect so discard them */ /* All our notify data is invalid if we reconnect so discard them */
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
@ -1404,8 +1430,10 @@ static int proxy_loop(proc_instance_t *pi)
char *buf = NULL; char *buf = NULL;
reconnect: reconnect:
if (proxi) if (proxi) {
kill_proxy(ckp, proxi);
reconnecting = true; reconnecting = true;
}
proxi = live_proxy(ckp); proxi = live_proxy(ckp);
if (!proxi) if (!proxi)
goto out; goto out;
@ -1424,6 +1452,9 @@ reconnect:
proxi->notified = false; proxi->notified = false;
} }
retry:
ckmsgq_add(srvchk, proxi->si);
do { do {
selret = wait_read_select(us->sockd, 5); selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) { if (!selret && !ping_main(ckp)) {
@ -1432,7 +1463,12 @@ reconnect:
goto out; goto out;
} }
} while (selret < 1); } while (selret < 1);
retry:
if (unlikely(proxi->cs->fd < 0)) {
LOGWARNING("Upstream socket invalidated, will attempt failover");
goto reconnect;
}
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGEMERG("Failed to accept on proxy socket"); LOGEMERG("Failed to accept on proxy socket");
@ -1457,9 +1493,6 @@ retry:
} else if (cmdmatch(buf, "getdiff")) { } else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, sockd); send_diff(proxi, sockd);
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
kill_proxy(proxi);
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "submitblock:")) { } else if (cmdmatch(buf, "submitblock:")) {
if (ckp->btcdbackup) { if (ckp->btcdbackup) {
@ -1470,8 +1503,6 @@ retry:
LOGWARNING("Block rejected locally."); LOGWARNING("Block rejected locally.");
} else } else
LOGNOTICE("No backup btcd to send block to ourselves"); LOGNOTICE("No backup btcd to send block to ourselves");
} else if (cmdmatch(buf, "fallback")) {
goto reconnect;
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ping")) { } else if (cmdmatch(buf, "ping")) {
@ -1492,7 +1523,7 @@ retry:
close(sockd); close(sockd);
goto retry; goto retry;
out: out:
kill_proxy(proxi); kill_proxy(ckp, proxi);
close(sockd); close(sockd);
return ret; return ret;
} }
@ -1542,7 +1573,6 @@ static bool alive_btcd(server_instance_t *si)
LOGWARNING("Failed to create base64 auth from btcd %s", userpass); LOGWARNING("Failed to create base64 auth from btcd %s", userpass);
return false; return false;
} }
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port); LOGWARNING("Failed to connect socket to btcd %s:%s !", cs->url, cs->port);
return false; return false;
@ -1581,6 +1611,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
proxi->si = si; proxi->si = si;
proxi->ckp = ckp; proxi->ckp = ckp;
proxi->cs = &si->cs; proxi->cs = &si->cs;
mutex_init(&proxi->notify_lock);
} }
if (ckp->btcds) { if (ckp->btcds) {
@ -1667,7 +1698,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "fallback"); send_proc(ckp->generator, "reconnect");
} }
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)

3
src/stratifier.c

@ -744,8 +744,7 @@ static bool update_subscribe(ckpool_t *ckp)
free(buf); free(buf);
ck_wlock(&workbase_lock); ck_wlock(&workbase_lock);
if (!proxy_base.diff) proxy_base.diff = ckp->startdiff;
proxy_base.diff = 1;
/* Length is checked by generator */ /* Length is checked by generator */
strcpy(proxy_base.enonce1, json_string_value(json_object_get(val, "enonce1"))); strcpy(proxy_base.enonce1, json_string_value(json_object_get(val, "enonce1")));
proxy_base.enonce1constlen = strlen(proxy_base.enonce1) / 2; proxy_base.enonce1constlen = strlen(proxy_base.enonce1) / 2;

Loading…
Cancel
Save