Browse Source

Fall over to the first working stratum proxy whenever we are disconnected

master
Con Kolivas 11 years ago
parent
commit
ca176c8e1d
  1. 313
      src/generator.c
  2. 6
      src/stratifier.c

313
src/generator.c

@ -759,64 +759,6 @@ static void submit_share(proxy_instance_t *proxi, json_t *val)
mutex_unlock(&proxi->psend_lock); mutex_unlock(&proxi->psend_lock);
} }
static int proxy_loop(proc_instance_t *pi, proxy_instance_t *proxi)
{
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
char *buf = NULL;
/* We're not subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */
send_proc(ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify");
proxi->notified = false;
retry:
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
if (interrupted())
goto retry;
LOGERR("Failed to accept on proxy socket");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in proxy_loop");
close(sockd);
goto retry;
}
LOGDEBUG("Proxy received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
ret = 0;
goto out;
} else if (!strncasecmp(buf, "getsubscribe", 12)) {
send_subscribe(proxi, sockd);
} else if (!strncasecmp(buf, "getnotify", 9)) {
send_notify(proxi, sockd);
} else if (!strncasecmp(buf, "getdiff", 7)) {
send_diff(proxi, sockd);
} else if (!strncasecmp(buf, "ping", 4)) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
} else {
/* Anything remaining should be share submissions */
json_t *val = json_loads(buf, 0, NULL);
if (!val)
LOGWARNING("Received unrecognised message: %s", buf);
else
submit_share(proxi, val);
}
close(sockd);
goto retry;
out:
close(sockd);
return ret;
}
static void clear_notify(notify_instance_t *ni) static void clear_notify(notify_instance_t *ni)
{ {
free(ni->jobid); free(ni->jobid);
@ -825,35 +767,8 @@ static void clear_notify(notify_instance_t *ni)
free(ni); free(ni);
} }
static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
notify_instance_t *ni, *tmp;
bool ret = true;
/* All our notify data is invalid if we reconnect so discard them */
mutex_lock(&proxi->notify_lock);
HASH_ITER(hh, proxi->notify_instances, ni, tmp) {
HASH_DEL(proxi->notify_instances, ni);
clear_notify(ni);
}
mutex_unlock(&proxi->notify_lock);
do {
if (!ret)
sleep(5);
close(cs->fd);
ret = connect_proxy(cs);
if (!ret)
continue;
ret = subscribe_stratum(cs, proxi);
if (!ret)
continue;
ret = auth_stratum(cs, proxi);
} while (!ret);
}
/* FIXME: Return something useful to the stratifier based on this result */ /* FIXME: Return something useful to the stratifier based on this result */
static bool parse_share(ckpool_t *ckp, proxy_instance_t *proxi, const char *buf) static bool parse_share(proxy_instance_t *proxi, const char *buf)
{ {
json_t *val = NULL, *idval; json_t *val = NULL, *idval;
share_msg_t *share; share_msg_t *share;
@ -899,6 +814,9 @@ static void *proxy_recv(void *arg)
ckpool_t *ckp = proxi->ckp; ckpool_t *ckp = proxi->ckp;
rename_proc("proxyrecv"); rename_proc("proxyrecv");
/* We don't wait for them to pthread_join in case it takes a while, so
* we detach the threads instead to clean up themselves */
pthread_detach(pthread_self());
while (42) { while (42) {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
@ -940,11 +858,10 @@ static void *proxy_recv(void *arg)
} while (ret == 0 && ++retries < 24); } while (ret == 0 && ++retries < 24);
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
reconnect_stratum(cs, proxi); send_proc(ckp->generator, "reconnect");
send_proc(ckp->stratifier, "subscribe"); break;
send_proc(ckp->stratifier, "notify");
continue;
} }
if (parse_method(proxi, cs->buf)) { if (parse_method(proxi, cs->buf)) {
if (proxi->notified) { if (proxi->notified) {
@ -957,7 +874,7 @@ static void *proxy_recv(void *arg)
} }
continue; continue;
} }
if (parse_share(ckp, proxi, cs->buf)) { if (parse_share(proxi, cs->buf)) {
continue; continue;
} }
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
@ -973,6 +890,7 @@ static void *proxy_send(void *arg)
connsock_t *cs = proxi->cs; connsock_t *cs = proxi->cs;
rename_proc("proxysend"); rename_proc("proxysend");
pthread_detach(pthread_self());
while (42) { while (42) {
notify_instance_t *ni; notify_instance_t *ni;
@ -1024,61 +942,144 @@ static void *proxy_send(void *arg)
return NULL; return NULL;
} }
#if 0 /* Cycle through the available proxies and find the first alive one */
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, static proxy_instance_t *live_proxy(ckpool_t *ckp)
const char *auth, const char *pass)
{ {
proxy_instance_t proxi; proxy_instance_t *alive = NULL;
int ret = 1; int i;
memset(&proxi, 0, sizeof(proxi)); LOGDEBUG("Attempting to connect to proxy");
proxi.ckp = ckp; retry:
proxi.cs = cs; for (i = 0; i < ckp->proxies; i++) {
proxy_instance_t *proxi;
server_instance_t *si;
connsock_t *cs;
si = ckp->servers[i];
proxi = si->data;
cs = proxi->cs;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
continue;
}
if (!connect_proxy(cs)) { if (!connect_proxy(cs)) {
LOGWARNING("FATAL: Failed to connect to %s:%s in proxy_mode!", LOGINFO("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port); cs->url, cs->port);
goto out; continue;
} }
/* Test we can connect, authorise and get stratum information */ /* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, &proxi)) { if (!subscribe_stratum(cs, proxi)) {
LOGWARNING("FATAL: Failed initial subscribe to %s:%s !", LOGINFO("Failed initial subscribe to %s:%s !",
cs->url, cs->port); cs->url, cs->port);
goto out; continue;
}
if (!auth_stratum(cs, proxi)) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass);
continue;
}
alive = proxi;
break;
}
if (!alive) {
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5);
goto retry;
}
LOGNOTICE("Connected to upstream server %s:%s as proxy", alive->cs->url, alive->cs->port);
mutex_init(&alive->notify_lock);
create_pthread(&alive->pth_precv, proxy_recv, alive);
mutex_init(&alive->psend_lock);
cond_init(&alive->psend_cond);
create_pthread(&alive->pth_psend, proxy_send, alive);
return alive;
} }
proxi.auth = auth; static void kill_proxy(proxy_instance_t *proxi)
proxi.pass = pass; {
if (!auth_stratum(cs, &proxi)) { notify_instance_t *ni, *tmp;
LOGWARNING("FATAL: Failed initial authorise to %s:%s with %s:%s !", connsock_t *cs = proxi->cs;
cs->url, cs->port, auth, pass);
goto out; close(cs->fd);
/* All our notify data is invalid if we reconnect so discard them */
mutex_lock(&proxi->notify_lock);
HASH_ITER(hh, proxi->notify_instances, ni, tmp) {
HASH_DEL(proxi->notify_instances, ni);
clear_notify(ni);
} }
mutex_unlock(&proxi->notify_lock);
mutex_init(&proxi.notify_lock); pthread_cancel(proxi->pth_precv);
create_pthread(&proxi.pth_precv, proxy_recv, &proxi); pthread_cancel(proxi->pth_psend);
mutex_init(&proxi.psend_lock); }
cond_init(&proxi.psend_cond);
create_pthread(&proxi.pth_psend, proxy_send, &proxi);
ret = proxy_loop(pi, &proxi); static int proxy_loop(proc_instance_t *pi)
{
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
proxy_instance_t *proxi;
int sockd, ret = 0;
char *buf = NULL;
/* Return from the proxy loop means we have received a shutdown reconnect:
* request */ proxi = live_proxy(ckp);
pthread_cancel(proxi.pth_precv); /* We've just subscribed and authorised so tell the stratifier to
pthread_cancel(proxi.pth_psend); * retrieve the first subscription. */
join_pthread(proxi.pth_precv); send_proc(ckp->stratifier, "subscribe");
join_pthread(proxi.pth_psend); send_proc(ckp->stratifier, "notify");
out: proxi->notified = false;
close(cs->fd);
free(proxi.enonce1);
free(proxi.enonce1bin);
free(proxi.sessionid);
retry:
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
if (interrupted())
goto retry;
LOGERR("Failed to accept on proxy socket");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in proxy_loop");
close(sockd);
goto retry;
}
LOGDEBUG("Proxy received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
ret = 0;
goto out;
} else if (!strncasecmp(buf, "getsubscribe", 12)) {
send_subscribe(proxi, sockd);
} else if (!strncasecmp(buf, "getnotify", 9)) {
send_notify(proxi, sockd);
} else if (!strncasecmp(buf, "getdiff", 7)) {
send_diff(proxi, sockd);
} else if (!strncasecmp(buf, "reconnect", 9)) {
kill_proxy(proxi);
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
goto reconnect;
} else if (!strncasecmp(buf, "ping", 4)) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
} else {
/* Anything remaining should be share submissions */
json_t *val = json_loads(buf, 0, NULL);
if (!val)
LOGWARNING("Received unrecognised message: %s", buf);
else
submit_share(proxi, val);
}
close(sockd);
goto retry;
out:
kill_proxy(proxi);
close(sockd);
return ret; return ret;
} }
#endif
/* FIXME: Make these use multiple BTCDs instead of just first alive. */ /* FIXME: Make these use multiple BTCDs instead of just first alive. */
static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
@ -1153,74 +1154,30 @@ out:
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
int i, ret = 1, alive = 0;
proxy_instance_t *proxi; proxy_instance_t *proxi;
server_instance_t *si; server_instance_t *si;
connsock_t *cs; int i, ret = 1;
/* Create all our proxy structures and pointers */
ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies); ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies);
for (i = 0; i < ckp->proxies; i++) { for (i = 0; i < ckp->proxies; i++) {
ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[i]; si = ckp->servers[i];
cs = &si->cs;
si->url = ckp->proxyurl[i]; si->url = ckp->proxyurl[i];
si->auth = ckp->proxyauth[i]; si->auth = ckp->proxyauth[i];
si->pass = ckp->proxypass[i]; si->pass = ckp->proxypass[i];
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
continue;
}
if (!connect_proxy(cs)) {
LOGWARNING("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port);
continue;
}
proxi = ckzalloc(sizeof(proxy_instance_t)); proxi = ckzalloc(sizeof(proxy_instance_t));
si->data = proxi; si->data = proxi;
/* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) {
LOGWARNING("Failed initial subscribe to %s:%s !",
cs->url, cs->port);
continue;
}
proxi->auth = si->auth; proxi->auth = si->auth;
proxi->pass = si->pass; proxi->pass = si->pass;
if (!auth_stratum(cs, proxi)) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass);
continue;
}
proxi->si = si; proxi->si = si;
proxi->ckp = ckp; proxi->ckp = ckp;
proxi->cs = cs; proxi->cs = &si->cs;
si->alive = true;
alive++;
}
if (!alive) {
LOGEMERG("FATAL: No proxied servers active!");
goto out;
} }
for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) {
if (si->alive)
break;
}
proxi = si->data;
mutex_init(&proxi->notify_lock);
create_pthread(&proxi->pth_precv, proxy_recv, proxi);
mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi);
ret = proxy_loop(pi, proxi); ret = proxy_loop(pi);
/* Return from the proxy loop means we have received a shutdown
* request */
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
join_pthread(proxi->pth_precv);
join_pthread(proxi->pth_psend);
out:
for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) { for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) {
close(si->cs.fd); close(si->cs.fd);
proxi = si->data; proxi = si->data;

6
src/stratifier.c

@ -2043,6 +2043,7 @@ int stratifier(proc_instance_t *pi)
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate; pthread_t pth_statsupdate;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf;
int ret; int ret;
/* Store this for use elsewhere */ /* Store this for use elsewhere */
@ -2054,6 +2055,11 @@ int stratifier(proc_instance_t *pi)
* id on restarts */ * id on restarts */
blockchange_id = workbase_id = ((int64_t)time(NULL)) << 32; blockchange_id = workbase_id = ((int64_t)time(NULL)) << 32;
/* Wait for the generator to have something for us */
do {
buf = send_recv_proc(ckp->generator, "ping");
} while (!buf);
cklock_init(&instance_lock); cklock_init(&instance_lock);
mutex_init(&stratum_recv_lock); mutex_init(&stratum_recv_lock);

Loading…
Cancel
Save