Browse Source

Explicitly set the proxy in the stratifier when we switch and avoid creating workbases from backup proxies

master
Con Kolivas 10 years ago
parent
commit
32b88de36c
  1. 8
      src/generator.c
  2. 48
      src/stratifier.c

8
src/generator.c

@ -1472,6 +1472,7 @@ static void *passthrough_recv(void *arg)
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect");
continue; continue;
} }
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
@ -1561,7 +1562,6 @@ static void *proxy_recv(void *arg)
} while (ret == 0 && ++retries < 120); } while (ret == 0 && ++retries < 120);
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
continue; continue;
@ -1577,12 +1577,10 @@ static void *proxy_recv(void *arg)
proxi->diffed = false; proxi->diffed = false;
} }
if (proxi->reconnect) { if (proxi->reconnect) {
proxi->alive = false;
proxi->reconnect = false; proxi->reconnect = false;
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
Close(cs->fd); Close(cs->fd);
send_proc(ckp->generator, "reconnect");
break; break;
} }
continue; continue;
@ -1677,7 +1675,9 @@ reconnect:
connsock_t *cs = proxi->cs; connsock_t *cs = proxi->cs;
LOGWARNING("Successfully connected to %s:%s as proxy", LOGWARNING("Successfully connected to %s:%s as proxy",
cs->url, cs->port); cs->url, cs->port);
send_proc(ckp->stratifier, "reconnect"); dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf);
} }
} }
retry: retry:

48
src/stratifier.c

@ -987,21 +987,26 @@ static proxy_t *__generate_proxy(sdata_t *sdata, const int id)
} }
/* Find proxy by id number, generate one if none exist yet by that id */ /* Find proxy by id number, generate one if none exist yet by that id */
static proxy_t *proxy_by_id(sdata_t *sdata, const int id) static proxy_t *__proxy_by_id(sdata_t *sdata, const int id)
{ {
bool new_proxy = false;
proxy_t *proxy; proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
HASH_FIND_INT(sdata->proxies, &id, proxy); HASH_FIND_INT(sdata->proxies, &id, proxy);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
new_proxy = true;
proxy = __generate_proxy(sdata, id); proxy = __generate_proxy(sdata, id);
LOGINFO("Stratifier added new proxy %d", id);
} }
mutex_unlock(&sdata->proxy_lock);
if (unlikely(new_proxy)) return proxy;
LOGINFO("Stratifier added new proxy %d", id); }
static proxy_t *proxy_by_id(sdata_t *sdata, const int id)
{
proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
proxy = __proxy_by_id(sdata, id);
mutex_unlock(&sdata->proxy_lock);
return proxy; return proxy;
} }
@ -1055,11 +1060,6 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
proxy = proxy_by_id(sdata, id); proxy = proxy_by_id(sdata, id);
json_get_bool(&reconnect, val, "reconnect"); json_get_bool(&reconnect, val, "reconnect");
mutex_lock(&sdata->proxy_lock);
if (sdata->proxy != proxy)
sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock);
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
proxy->subscribed = true; proxy->subscribed = true;
proxy->diff = ckp->startdiff; proxy->diff = ckp->startdiff;
@ -1092,7 +1092,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
json_decref(val); json_decref(val);
/* Notify implied required now too */ /* Notify implied required now too */
_update_notify(ckp, id); _update_notify(ckp, id);
if (reconnect || proxy == current_proxy(sdata)) if (reconnect)
reconnect_clients(sdata, ""); reconnect_clients(sdata, "");
} }
@ -1114,6 +1114,10 @@ static void _update_notify(ckpool_t *ckp, const int id)
LOGINFO("No valid proxy subscription to update notify yet"); LOGINFO("No valid proxy subscription to update notify yet");
return; return;
} }
if (proxy != current_proxy(sdata)) {
LOGINFO("Notify from backup proxy");
return;
}
ASPRINTF(&msg, "getnotify=%d", id); ASPRINTF(&msg, "getnotify=%d", id);
buf = send_recv_proc(ckp->generator, msg); buf = send_recv_proc(ckp->generator, msg);
@ -1809,6 +1813,22 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
return buf; return buf;
} }
/* Sets the currently active proxy */
static void set_proxy(sdata_t *sdata, const char *buf)
{
proxy_t *proxy;
int id = 0;
sscanf(buf, "proxy=%d", &id);
mutex_lock(&sdata->proxy_lock);
proxy = __proxy_by_id(sdata, id);
sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock);
reconnect_clients(sdata, "");
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret = 0; int sockd, ret = 0, selret = 0;
@ -1915,6 +1935,8 @@ retry:
block_reject(sdata, buf + 8); block_reject(sdata, buf + 8);
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
reconnect_clients(sdata, buf); reconnect_clients(sdata, buf);
} else if (cmdmatch(buf, "proxy")) {
set_proxy(sdata, buf);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else } else

Loading…
Cancel
Save