Browse Source

Attach to every upstream pool in proxy mode all the time and fail over and back as needed

master
Con Kolivas 10 years ago
parent
commit
b7e71e1be7
  1. 438
      src/generator.c
  2. 3
      src/stratifier.c

438
src/generator.c

@ -75,8 +75,7 @@ typedef struct proxy_instance proxy_instance_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
proxy_instance_t *next; UT_hash_handle hh;
proxy_instance_t *prev;
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
@ -104,6 +103,7 @@ struct proxy_instance {
bool notified; /* Received new template for work */ bool notified; /* Received new template for work */
bool diffed; /* Received new diff */ bool diffed; /* Received new diff */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool alive;
pthread_mutex_t notify_lock; pthread_mutex_t notify_lock;
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
@ -128,7 +128,7 @@ struct proxy_instance {
/* Private data for the generator */ /* Private data for the generator */
struct generator_data { struct generator_data {
pthread_mutex_t lock; /* Lock protecting linked lists */ pthread_mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxy_list; /* Linked list of all active proxies */ proxy_instance_t *proxies; /* Hash list of all proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
}; };
@ -878,6 +878,8 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val)
return ret; return ret;
} }
static void prepare_proxy(proxy_instance_t *proxi);
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{ {
server_instance_t *newsi, *si = proxi->si; server_instance_t *newsi, *si = proxi->si;
@ -930,6 +932,7 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
newsi = ckzalloc(sizeof(server_instance_t)); newsi = ckzalloc(sizeof(server_instance_t));
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_DEL(gdata->proxies, proxi);
newsi->id = si->id; /* Inherit the old connection's id */ newsi->id = si->id; /* Inherit the old connection's id */
si->id = ckp->proxies++; /* Give the old connection the lowest id */ si->id = ckp->proxies++; /* Give the old connection the lowest id */
ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies); ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * ckp->proxies);
@ -946,7 +949,12 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
newproxi->si = newsi; newproxi->si = newsi;
newproxi->ckp = ckp; newproxi->ckp = ckp;
newproxi->cs = &newsi->cs; newproxi->cs = &newsi->cs;
newproxi->id = newsi->id;
HASH_ADD_INT(gdata->proxies, id, proxi);
HASH_ADD_INT(gdata->proxies, id, newproxi);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
prepare_proxy(newproxi);
out: out:
return ret; return ret;
} }
@ -1248,85 +1256,6 @@ out:
return ret; return ret;
} }
static void *proxy_recv(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
rename_proc("proxyrecv");
while (42) {
notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare;
int retries = 0, ret;
time_t now;
now = time(NULL);
/* Age old notifications older than 10 mins old */
mutex_lock(&proxi->notify_lock);
HASH_ITER(hh, proxi->notify_instances, ni, tmp) {
if (HASH_COUNT(proxi->notify_instances) < 3)
break;
if (ni->notify_time < now - 600) {
HASH_DEL(proxi->notify_instances, ni);
clear_notify(ni);
}
}
mutex_unlock(&proxi->notify_lock);
/* Similary with shares older than 2 mins without response */
mutex_lock(&proxi->share_lock);
HASH_ITER(hh, proxi->shares, share, tmpshare) {
if (share->submit_time < now - 120) {
HASH_DEL(proxi->shares, share);
}
}
mutex_unlock(&proxi->share_lock);
/* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */
do {
if (cs->fd == -1) {
ret = -1;
break;
}
ret = read_socket_line(cs, 5);
} while (ret == 0 && ++retries < 120);
if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
send_proc(ckp->generator, "reconnect");
break;
}
if (parse_method(proxi, cs->buf)) {
if (proxi->notified) {
send_proc(ckp->stratifier, "notify");
proxi->notified = false;
}
if (proxi->diffed) {
send_proc(ckp->stratifier, "diff");
proxi->diffed = false;
}
if (proxi->reconnect) {
proxi->reconnect = false;
LOGWARNING("Reconnect issue, dropping existing connection");
send_proc(ckp->generator, "reconnect");
break;
}
continue;
}
if (parse_share(proxi, cs->buf)) {
continue;
}
/* If it's not a method it should be a share result */
LOGWARNING("Unhandled stratum message: %s", cs->buf);
}
return NULL;
}
/* For processing and sending shares */ /* For processing and sending shares */
static void *proxy_send(void *arg) static void *proxy_send(void *arg)
{ {
@ -1384,36 +1313,6 @@ static void *proxy_send(void *arg)
return NULL; return NULL;
} }
/* For receiving messages from an upstream pool to pass downstream */
static void *passthrough_recv(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
rename_proc("passrecv");
while (42) {
int ret;
do {
ret = read_socket_line(cs, 60);
} while (ret == 0);
if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
send_proc(ckp->generator, "reconnect");
break;
}
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
send_proc(ckp->connector, cs->buf);
}
return NULL;
}
static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm)
{ {
int len, sent; int len, sent;
@ -1491,124 +1390,238 @@ out:
return ret; return ret;
} }
/* Cycle through the available proxies and find the first alive one */ /* For receiving messages from an upstream pool to pass downstream. Responsible
static proxy_instance_t *live_proxy(ckpool_t *ckp) * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg)
{ {
proxy_instance_t *alive = NULL; proxy_instance_t *proxi = (proxy_instance_t *)arg;
gdata_t *gdata = ckp->data; server_instance_t *si = proxi->si;
connsock_t *cs; connsock_t *cs = proxi->cs;
int i, srvs; ckpool_t *ckp = proxi->ckp;
LOGDEBUG("Attempting to connect to proxy");
retry:
if (!ping_main(ckp))
goto out;
mutex_lock(&gdata->lock);
srvs = ckp->proxies;
mutex_unlock(&gdata->lock);
for (i = 0; i < srvs; i++) {
proxy_instance_t *proxi;
server_instance_t *si;
mutex_lock(&gdata->lock); rename_proc("passrecv");
si = ckp->servers[i];
proxi = si->data;
mutex_unlock(&gdata->lock);
cs = proxi->cs;
if (proxy_alive(ckp, si, proxi, cs, false)) { if (proxy_alive(ckp, si, proxi, cs, false)) {
alive = proxi; proxi->alive = true;
break; send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url);
} }
while (42) {
int ret;
while (!proxy_alive(ckp, si, proxi, cs, true)) {
if (proxi->alive) {
proxi->alive = false;
send_proc(ckp->generator, "reconnect");
} }
if (!alive) {
send_proc(ckp->connector, "reject");
send_proc(ckp->stratifier, "dropall");
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5); sleep(5);
goto retry; }
if (!proxi->alive) {
proxi->alive = true;
send_proc(ckp->generator, "reconnect");
} }
cs = alive->cs; do {
LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ret = read_socket_line(cs, 60);
ckp->passthrough ? " in passthrough mode" : ""); } while (ret == 0);
if (ckp->passthrough) {
create_pthread(&alive->pth_precv, passthrough_recv, alive); if (ret < 1) {
alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
} else { continue;
create_pthread(&alive->pth_precv, proxy_recv, alive);
mutex_init(&alive->psend_lock);
cond_init(&alive->psend_cond);
create_pthread(&alive->pth_psend, proxy_send, alive);
} }
out: /* Simply forward the message on, as is, to the connector to
send_proc(ckp->connector, alive ? "accept" : "reject"); * process. Possibly parse parameters sent by upstream pool
return alive; * here */
send_proc(ckp->connector, cs->buf);
}
return NULL;
} }
static void kill_proxy(proxy_instance_t *proxi) /* For receiving messages from the upstream proxy, also responsible for setting
* up the connection and testing it's alive. */
static void *proxy_recv(void *arg)
{ {
proxy_instance_t *proxi = (proxy_instance_t *)arg;
server_instance_t *si = proxi->si;
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
rename_proc("proxyrecv");
if (proxy_alive(ckp, si, proxi, cs, false)) {
proxi->alive = true;
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url);
}
while (42) {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; share_msg_t *share, *tmpshare;
int retries = 0, ret;
time_t now;
if (!proxi) // This shouldn't happen while (!proxy_alive(ckp, si, proxi, cs, true)) {
return; if (proxi->alive) {
proxi->alive = false;
send_proc(ckp->generator, "reconnect");
}
sleep(5);
}
if (!proxi->alive) {
proxi->alive = true;
send_proc(ckp->generator, "reconnect");
}
LOGNOTICE("Killing proxy connection to %s", proxi->si->url); now = time(NULL);
cs = proxi->cs;
Close(cs->fd);
empty_buffer(cs);
/* All our notify data is invalid if we reconnect so discard them */ /* Age old notifications older than 10 mins old */
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
HASH_ITER(hh, proxi->notify_instances, ni, tmp) { HASH_ITER(hh, proxi->notify_instances, ni, tmp) {
if (HASH_COUNT(proxi->notify_instances) < 3)
break;
if (ni->notify_time < now - 600) {
HASH_DEL(proxi->notify_instances, ni); HASH_DEL(proxi->notify_instances, ni);
clear_notify(ni); clear_notify(ni);
} }
}
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxi->notify_lock);
pthread_cancel(proxi->pth_precv); /* Similary with shares older than 2 mins without response */
pthread_cancel(proxi->pth_psend); mutex_lock(&proxi->share_lock);
HASH_ITER(hh, proxi->shares, share, tmpshare) {
if (share->submit_time < now - 120) {
HASH_DEL(proxi->shares, share);
}
}
mutex_unlock(&proxi->share_lock);
/* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */
do {
if (cs->fd == -1) {
ret = -1;
break;
}
ret = read_socket_line(cs, 5);
} while (ret == 0 && ++retries < 120);
if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
continue;
}
if (parse_method(proxi, cs->buf)) {
if (proxi->notified) {
send_proc(ckp->stratifier, "notify");
proxi->notified = false;
}
if (proxi->diffed) {
send_proc(ckp->stratifier, "diff");
proxi->diffed = false;
}
if (proxi->reconnect) {
proxi->reconnect = false;
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
proxi->id, proxi->si->url);
send_proc(ckp->generator, "reconnect");
break;
}
continue;
}
if (parse_share(proxi, cs->buf)) {
continue;
}
/* If it's not a method it should be a share result */
LOGWARNING("Unhandled stratum message: %s", cs->buf);
}
return NULL;
}
static void prepare_proxy(proxy_instance_t *proxi)
{
mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi);
create_pthread(&proxi->pth_precv, proxy_recv, proxi);
}
static void setup_proxies(ckpool_t *ckp, gdata_t *gdata)
{
int i;
for (i = 0; i < ckp->proxies; i++) {
proxy_instance_t *proxi;
server_instance_t *si;
si = ckp->servers[i];
proxi = si->data;
proxi->id = i;
HASH_ADD_INT(gdata->proxies, id, proxi);
if (ckp->passthrough) {
create_pthread(&proxi->pth_precv, passthrough_recv, proxi);
proxi->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
} else {
prepare_proxy(proxi);
}
}
}
static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata)
{
proxy_instance_t *ret = NULL, *proxi, *tmp;
while (42) {
if (!ping_main(ckp))
break;
HASH_ITER(hh, gdata->proxies, proxi, tmp) {
if (proxi->alive) {
if (!ret) {
ret = proxi;
continue;
}
if (proxi->id < ret->id)
ret = proxi;
}
}
if (ret)
break;
sleep(1);
}
send_proc(ckp->connector, ret ? "accept" : "reject");
return ret;
} }
static int proxy_loop(proc_instance_t *pi) static int proxy_loop(proc_instance_t *pi)
{ {
proxy_instance_t *proxi = NULL, *cproxy;
int sockd = -1, ret = 0, selret; int sockd = -1, ret = 0, selret;
proxy_instance_t *proxi = NULL;
bool reconnecting = false;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
char *buf = NULL; char *buf = NULL;
setup_proxies(ckp, gdata);
reconnect: reconnect:
if (proxi) { /* This does not necessarily mean we reconnect, but a change has
kill_proxy(proxi); * occurred and we need to reexamine the proxies. */
reconnecting = true; cproxy = current_proxy(ckp, gdata);
} if (!cproxy)
proxi = live_proxy(ckp);
if (!proxi)
goto out; goto out;
if (reconnecting) { if (proxi != cproxy) {
reconnecting = false; proxi = cproxy;
if (!ckp->passthrough) {
connsock_t *cs = proxi->cs; connsock_t *cs = proxi->cs;
LOGWARNING("Successfully reconnected to %s:%s as proxy", LOGWARNING("Successfully connected to %s:%s as proxy",
cs->url, cs->port); cs->url, cs->port);
} /* Sending subscribe implies stratifier will also do a notify */
/* We've just subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */
if (!ckp->passthrough) {
send_proc(ckp->stratifier, "subscribe"); send_proc(ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
} }
}
retry: retry:
ckmsgq_add(gdata->srvchk, proxi->si);
do { do {
selret = wait_read_select(us->sockd, 5); selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) { if (!selret && !ping_main(ckp)) {
@ -1670,7 +1683,6 @@ retry:
Close(sockd); Close(sockd);
goto retry; goto retry;
out: out:
kill_proxy(proxi);
Close(sockd); Close(sockd);
return ret; return ret;
} }
@ -1793,53 +1805,6 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
send_proc(ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
} }
static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{
gdata_t *gdata = ckp->data;
static time_t last_t = 0;
bool alive = false;
time_t now_t;
int i, srvs;
/* Rate limit to checking only once every 5 seconds */
now_t = time(NULL);
if (now_t <= last_t + 5)
return;
last_t = now_t;
/* Is this the highest priority server already? */
if (!cursi->id)
return;
mutex_lock(&gdata->lock);
srvs = ckp->proxies;
mutex_unlock(&gdata->lock);
for (i = 0; i < srvs; i++) {
proxy_instance_t *proxi;
server_instance_t *si;
connsock_t *cs;
mutex_lock(&gdata->lock);
si = ckp->servers[i];
proxi = si->data;
mutex_unlock(&gdata->lock);
/* Have we reached the current server? */
if (si == cursi)
return;
cs = proxi->cs;
alive = proxy_alive(ckp, si, proxi, cs, true);
if (alive)
break;
}
if (alive)
send_proc(ckp->generator, "reconnect");
}
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)
{ {
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
@ -1850,7 +1815,6 @@ int generator(proc_instance_t *pi)
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
if (ckp->proxy) { if (ckp->proxy) {
gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog);
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);
} else { } else {
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);

3
src/stratifier.c

@ -1006,6 +1006,7 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id)
return proxy; return proxy;
} }
static void update_notify(ckpool_t *ckp);
static void reconnect_clients(sdata_t *sdata, const char *cmd); static void reconnect_clients(sdata_t *sdata, const char *cmd);
static void update_subscribe(ckpool_t *ckp) static void update_subscribe(ckpool_t *ckp)
@ -1068,6 +1069,8 @@ static void update_subscribe(ckpool_t *ckp)
proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8)); proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8));
json_decref(val); json_decref(val);
/* Notify implied required now too */
update_notify(ckp);
reconnect_clients(sdata, ""); reconnect_clients(sdata, "");
} }

Loading…
Cancel
Save