Browse Source

Allow url to be unique per proxy and subproxy allowing us to not have to recruit a full new parent proxy with a complicated id

master
Con Kolivas 10 years ago
parent
commit
2cee823d6c
  1. 285
      src/generator.c

285
src/generator.c

@ -84,8 +84,7 @@ struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;
connsock_t cs; connsock_t cs;
bool passthrough; bool passthrough;
int64_t id; /* Proxy server id*/ int id; /* Proxy server id*/
int low_id; /* Low bits of id */
int subid; /* Subproxy id */ int subid; /* Subproxy id */
char *url; char *url;
@ -107,8 +106,7 @@ struct proxy_instance {
bool notified; /* Has this proxy received any notifies yet */ bool notified; /* Has this proxy received any notifies yet */
bool disabled; /* Subproxy no longer to be used */ bool disabled; /* Subproxy no longer to be used */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool reconnecting; /* Testing in progress */ bool reconnecting; /* Testing of parent in progress */
bool redirecting; /* Children have received a reconnect */
int64_t recruit; /* No of recruiting requests in progress */ int64_t recruit; /* No of recruiting requests in progress */
bool alive; bool alive;
@ -580,7 +578,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
retry: retry:
parsed = true; parsed = true;
if (!(buf = new_proxy_line(cs))) { if (!(buf = new_proxy_line(cs))) {
LOGNOTICE("Proxy %ld:%d %s failed to receive line in parse_subscribe", LOGNOTICE("Proxy %d:%d %s failed to receive line in parse_subscribe",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
goto out; goto out;
} }
@ -613,7 +611,7 @@ retry:
buf = NULL; buf = NULL;
goto retry; goto retry;
} }
LOGNOTICE("Proxy %ld:%d %s failed to parse subscribe response in parse_subscribe", LOGNOTICE("Proxy %d:%d %s failed to parse subscribe response in parse_subscribe",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
goto out; goto out;
} }
@ -649,9 +647,9 @@ retry:
if (size < 3) { if (size < 3) {
if (!proxi->subid) { if (!proxi->subid) {
LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners", LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners",
proxi->low_id, proxi->url, size); proxi->id, proxi->url, size);
} else { } else {
LOGNOTICE("Proxy %ld:%d Nonce2 length %d too small for fast miners", LOGNOTICE("Proxy %d:%d Nonce2 length %d too small for fast miners",
proxi->id, proxi->subid, size); proxi->id, proxi->subid, size);
} }
} }
@ -665,7 +663,7 @@ retry:
parent->recruit = 0; parent->recruit = 0;
mutex_unlock(&parent->proxy_lock); mutex_unlock(&parent->proxy_lock);
LOGNOTICE("Found notify for new proxy %ld:%d with enonce %s nonce2len %d", proxi->id, LOGNOTICE("Found notify for new proxy %d:%d with enonce %s nonce2len %d", proxi->id,
proxi->subid, proxi->enonce1, proxi->nonce2len); proxi->subid, proxi->enonce1, proxi->nonce2len);
ret = true; ret = true;
@ -698,7 +696,7 @@ retry:
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to send message in subscribe_stratum", LOGNOTICE("Proxy %d:%d %s failed to send message in subscribe_stratum",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
goto out; goto out;
} }
@ -707,16 +705,16 @@ retry:
goto out; goto out;
if (proxi->no_params) { if (proxi->no_params) {
LOGNOTICE("Proxy %ld:%d %s failed all subscription options in subscribe_stratum", LOGNOTICE("Proxy %d:%d %s failed all subscription options in subscribe_stratum",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
goto out; goto out;
} }
LOGINFO("Proxy %ld:%d %s failed connecting with parameters in subscribe_stratum, retrying without", LOGINFO("Proxy %d:%d %s failed connecting with parameters in subscribe_stratum, retrying without",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
proxi->no_params = true; proxi->no_params = true;
ret = connect_proxy(cs); ret = connect_proxy(cs);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to reconnect in subscribe_stratum", LOGNOTICE("Proxy %d:%d %s failed to reconnect in subscribe_stratum",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
goto out; goto out;
} }
@ -812,7 +810,7 @@ static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val)
goto out; goto out;
} }
LOGDEBUG("Received new notify from proxy %ld:%d", proxi->id, proxi->subid); LOGDEBUG("Received new notify from proxy %d:%d", proxi->id, proxi->subid);
ni = ckzalloc(sizeof(notify_instance_t)); ni = ckzalloc(sizeof(notify_instance_t));
ni->jobid = job_id; ni->jobid = job_id;
jobidbuf = json_string_value(job_id); jobidbuf = json_string_value(job_id);
@ -908,7 +906,39 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val)
} }
static void prepare_proxy(proxy_instance_t *proxi); static void prepare_proxy(proxy_instance_t *proxi);
static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi);
/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring
* fields we don't use in the subproxy. */
static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi, const char *url)
{
proxy_instance_t *subproxy;
mutex_lock(&gdata->lock);
if (gdata->dead_proxies) {
/* Recycle an old proxy instance if one exists */
subproxy = gdata->dead_proxies;
DL_DELETE(gdata->dead_proxies, subproxy);
subproxy->disabled = false;
} else {
subproxy = ckzalloc(sizeof(proxy_instance_t));
mutex_init(&subproxy->share_lock);
}
mutex_unlock(&gdata->lock);
subproxy->cs.ckp = subproxy->ckp = proxi->ckp;
mutex_lock(&proxi->proxy_lock);
subproxy->subid = ++proxi->subproxy_count;
mutex_unlock(&proxi->proxy_lock);
subproxy->id = proxi->id;
subproxy->url = strdup(url);
subproxy->auth = strdup(proxi->auth);
subproxy->pass = strdup(proxi->pass);
subproxy->parent = proxi;
subproxy->epfd = proxi->epfd;
return subproxy;
}
static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy) static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy)
{ {
@ -928,7 +958,7 @@ static proxy_instance_t *__subproxy_by_id(proxy_instance_t *proxy, const int sub
/* Add to the dead list to be recycled if possible */ /* Add to the dead list to be recycled if possible */
static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy) static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy)
{ {
LOGINFO("Recycling data from proxy %ld:%d", proxy->id, proxy->subid); LOGINFO("Recycling data from proxy %d:%d", proxy->id, proxy->subid);
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
dealloc(proxy->url); dealloc(proxy->url);
@ -938,11 +968,11 @@ static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy)
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
} }
static void send_stratifier_deadproxy(ckpool_t *ckp, const int64_t id, const int subid) static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid)
{ {
char buf[256]; char buf[256];
sprintf(buf, "deadproxy=%ld:%d", id, subid); sprintf(buf, "deadproxy=%d:%d", id, subid);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
@ -972,13 +1002,12 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
store_proxy(gdata, subproxy); store_proxy(gdata, subproxy);
} }
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxy, json_t *val)
{ {
proxy_instance_t *parent, *newproxi;
bool sameurl = false, ret = false; bool sameurl = false, ret = false;
int64_t high_id, low_id, new_id; ckpool_t *ckp = proxy->ckp;
ckpool_t *ckp = proxi->ckp;
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
proxy_instance_t *parent;
const char *new_url; const char *new_url;
int new_port; int new_port;
char *url; char *url;
@ -997,10 +1026,10 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
char *dot_pool, *dot_reconnect; char *dot_pool, *dot_reconnect;
int len; int len;
dot_pool = strchr(proxi->url, '.'); dot_pool = strchr(proxy->url, '.');
if (!dot_pool) { if (!dot_pool) {
LOGWARNING("Denied stratum reconnect request from server without domain %s", LOGWARNING("Denied stratum reconnect request from server without domain %s",
proxi->url); proxy->url);
goto out; goto out;
} }
dot_reconnect = strchr(new_url, '.'); dot_reconnect = strchr(new_url, '.');
@ -1012,61 +1041,37 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
len = strlen(dot_reconnect); len = strlen(dot_reconnect);
if (strncmp(dot_pool, dot_reconnect, len)) { if (strncmp(dot_pool, dot_reconnect, len)) {
LOGWARNING("Denied stratum reconnect request from %s to non-matching domain %s", LOGWARNING("Denied stratum reconnect request from %s to non-matching domain %s",
proxi->url, new_url); proxy->url, new_url);
goto out; goto out;
} }
ASPRINTF(&url, "%s:%d", new_url, new_port); ASPRINTF(&url, "%s:%d", new_url, new_port);
} else { } else {
url = strdup(proxi->url); url = strdup(proxy->url);
sameurl = true; sameurl = true;
} }
LOGINFO("Processing reconnect request to %s", url); LOGINFO("Processing reconnect request to %s", url);
ret = true; ret = true;
parent = proxi->parent; parent = proxy->parent;
/* If this is the same url we don't need to replace any existing disable_subproxy(gdata, parent, proxy);
* parents, just drop the connection and allow a new one to be if (parent != proxy) {
* recruited. */ /* If this is a subproxy we only need to create a new one if
if (sameurl) { * the url has changed. Otherwise automated recruiting will
disable_subproxy(gdata, parent, proxi); * take care of creating one if needed. */
if (!sameurl)
create_subproxy(gdata, parent, url);
goto out; goto out;
} }
/* If this isn't a parent proxy, recruit a new parent! */ proxy->reconnect = true;
if (parent != proxi) { LOGWARNING("Proxy %d:%s reconnect issue to %s, dropping existing connection",
proxi->reconnect = true; proxy->id, proxy->url, url);
/* Do we already know this proxy is redirecting? */ if (!sameurl) {
if (parent->redirecting) char *oldurl = proxy->url;
goto out;
proxi = parent;
proxi->redirecting = true;
} else
proxi->redirecting = false;
mutex_lock(&gdata->lock); proxy->url = url;
high_id = proxi->id >> 32; /* Use the high bits for the reconnect id */ free(oldurl);
high_id++;
high_id <<= 32;
low_id = proxi->id & 0x00000000FFFFFFFFll; /* Use the low bits for the master id */
new_id = high_id | low_id;
newproxi = ckzalloc(sizeof(proxy_instance_t));
newproxi->url = url;
newproxi->auth = strdup(proxi->auth);
newproxi->pass = strdup(proxi->pass);
newproxi->ckp = ckp;
newproxi->cs.ckp = ckp;
newproxi->low_id = low_id;
newproxi->id = new_id;
newproxi->subproxy_count = ++proxi->subproxy_count;
HASH_ADD_I64(gdata->proxies, id, newproxi);
if (!proxi->redirecting) {
proxi->disabled = true;
HASH_DEL(gdata->proxies, proxi);
proxi->reconnect = true;
} }
mutex_unlock(&gdata->lock);
prepare_proxy(newproxi);
out: out:
return ret; return ret;
} }
@ -1170,7 +1175,7 @@ static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg
goto out; goto out;
} }
LOGDEBUG("Proxy %ld:%d received method %s", proxi->id, proxi->subid, buf); LOGDEBUG("Proxy %d:%d received method %s", proxi->id, proxi->subid, buf);
if (cmdmatch(buf, "mining.notify")) { if (cmdmatch(buf, "mining.notify")) {
ret = parse_notify(ckp, proxi, params); ret = parse_notify(ckp, proxi, params);
goto out; goto out;
@ -1221,7 +1226,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to send message in auth_stratum", LOGNOTICE("Proxy %d:%d %s failed to send message in auth_stratum",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
if (cs->fd > 0) { if (cs->fd > 0) {
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL);
@ -1236,7 +1241,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
free(buf); free(buf);
buf = next_proxy_line(cs, proxi); buf = next_proxy_line(cs, proxi);
if (!buf) { if (!buf) {
LOGNOTICE("Proxy %ld:%d %s failed to receive line in auth_stratum", LOGNOTICE("Proxy %d:%d %s failed to receive line in auth_stratum",
proxi->id, proxi->subid, proxi->url); proxi->id, proxi->subid, proxi->url);
ret = false; ret = false;
goto out; goto out;
@ -1247,27 +1252,27 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
val = json_msg_result(buf, &res_val, &err_val); val = json_msg_result(buf, &res_val, &err_val);
if (!val) { if (!val) {
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->url, buf); proxi->id, proxi->subid, proxi->url, buf);
goto out; goto out;
} }
if (err_val && !json_is_null(err_val)) { if (err_val && !json_is_null(err_val)) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s", LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s",
proxi->low_id, proxi->subid, proxi->url, buf); proxi->id, proxi->subid, proxi->url, buf);
goto out; goto out;
} }
if (res_val) { if (res_val) {
ret = json_is_true(res_val); ret = json_is_true(res_val);
if (!ret) { if (!ret) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s", LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->url, buf); proxi->id, proxi->subid, proxi->url, buf);
goto out; goto out;
} }
} else { } else {
/* No result and no error but successful val means auth success */ /* No result and no error but successful val means auth success */
ret = true; ret = true;
} }
LOGINFO("Proxy %ld:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->url); LOGINFO("Proxy %d:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->url);
out: out:
if (val) if (val)
json_decref(val); json_decref(val);
@ -1330,25 +1335,24 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int subid
static void drop_proxy(gdata_t *gdata, const char *buf) static void drop_proxy(gdata_t *gdata, const char *buf)
{ {
proxy_instance_t *proxy, *subproxy; proxy_instance_t *proxy, *subproxy;
int64_t id = 0; int id = -1, subid = -1;
int subid = 0;
sscanf(buf, "dropproxy=%ld:%d", &id, &subid); sscanf(buf, "dropproxy=%d:%d", &id, &subid);
if (unlikely(!subid)) { if (unlikely(!subid)) {
LOGWARNING("Generator asked to drop parent proxy %ld", id); LOGWARNING("Generator asked to drop parent proxy %d", id);
return; return;
} }
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGINFO("Generator asked to drop subproxy from non-existent parent %ld", id); LOGINFO("Generator asked to drop subproxy from non-existent parent %d", id);
return; return;
} }
subproxy = subproxy_by_id(proxy, subid); subproxy = subproxy_by_id(proxy, subid);
if (!subproxy) { if (!subproxy) {
LOGINFO("Generator asked to drop non-existent subproxy %ld:%d", id, subid); LOGINFO("Generator asked to drop non-existent subproxy %d:%d", id, subid);
return; return;
} }
LOGNOTICE("Generator asked to drop proxy %ld:%d", id, subid); LOGNOTICE("Generator asked to drop proxy %d:%d", id, subid);
disable_subproxy(gdata, proxy, subproxy); disable_subproxy(gdata, proxy, subproxy);
} }
@ -1364,11 +1368,11 @@ static void submit_share(gdata_t *gdata, json_t *val)
{ {
proxy_instance_t *proxy, *proxi; proxy_instance_t *proxy, *proxi;
ckpool_t *ckp = gdata->ckp; ckpool_t *ckp = gdata->ckp;
int64_t client_id, id;
bool success = false; bool success = false;
stratum_msg_t *msg; stratum_msg_t *msg;
share_msg_t *share; share_msg_t *share;
int subid; int64_t client_id;
int id, subid;
/* Get the client id so we can tell the stratifier to drop it if the /* Get the client id so we can tell the stratifier to drop it if the
* proxy it's bound to is not functional */ * proxy it's bound to is not functional */
@ -1376,7 +1380,7 @@ static void submit_share(gdata_t *gdata, json_t *val)
LOGWARNING("Got no client_id in share"); LOGWARNING("Got no client_id in share");
goto out; goto out;
} }
if (unlikely(!json_get_int64(&id, val, "proxy"))) { if (unlikely(!json_get_int(&id, val, "proxy"))) {
LOGWARNING("Got no proxy in share"); LOGWARNING("Got no proxy in share");
goto out; goto out;
} }
@ -1386,20 +1390,20 @@ static void submit_share(gdata_t *gdata, json_t *val)
} }
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGNOTICE("Client %"PRId64" sending shares to non existent proxy %ld, dropping", LOGNOTICE("Client %"PRId64" sending shares to non existent proxy %d, dropping",
client_id, id); client_id, id);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
goto out; goto out;
} }
proxi = subproxy_by_id(proxy, subid); proxi = subproxy_by_id(proxy, subid);
if (unlikely(!proxi)) { if (unlikely(!proxi)) {
LOGNOTICE("Client %"PRId64" sending shares to non existent subproxy %ld:%d, dropping", LOGNOTICE("Client %"PRId64" sending shares to non existent subproxy %d:%d, dropping",
client_id, id, subid); client_id, id, subid);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
goto out; goto out;
} }
if (!proxi->alive) { if (!proxi->alive) {
LOGNOTICE("Client %"PRId64" sending shares to dead subproxy %ld:%d, dropping", LOGNOTICE("Client %"PRId64" sending shares to dead subproxy %d:%d, dropping",
client_id, id, subid); client_id, id, subid);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
goto out; goto out;
@ -1470,11 +1474,11 @@ static bool parse_share(proxy_instance_t *proxi, const char *buf)
* so long as we recognised it as a share response */ * so long as we recognised it as a share response */
ret = true; ret = true;
if (!share) { if (!share) {
LOGINFO("Proxy %ld:%d failed to find matching share to result: %s", LOGINFO("Proxy %d:%d failed to find matching share to result: %s",
proxi->id, proxi->subid, buf); proxi->id, proxi->subid, buf);
goto out; goto out;
} }
LOGINFO("Proxy %ld:%d share result %s from client %d", proxi->id, proxi->subid, LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid,
buf, share->client_id); buf, share->client_id);
free(share); free(share);
out: out:
@ -1495,18 +1499,18 @@ static void *proxy_send(void *arg)
rename_proc("proxysend"); rename_proc("proxysend");
while (42) { while (42) {
int64_t proxyid = 0, client_id = 0, id;
proxy_instance_t *subproxy; proxy_instance_t *subproxy;
int proxyid = 0, subid = 0;
int64_t client_id = 0, id;
notify_instance_t *ni; notify_instance_t *ni;
json_t *jobid = NULL; json_t *jobid = NULL;
bool ret = true; bool ret = true;
int subid = 0;
json_t *val; json_t *val;
tv_t now; tv_t now;
ts_t abs; ts_t abs;
if (unlikely(proxy->reconnect)) { if (unlikely(proxy->reconnect)) {
LOGINFO("Shutting down proxy_send thread for proxy %ld to reconnect", LOGINFO("Shutting down proxy_send thread for proxy %d to reconnect",
proxy->id); proxy->id);
break; break;
} }
@ -1539,7 +1543,7 @@ static void *proxy_send(void *arg)
LOGWARNING("Failed to find jobid in proxy_send msg"); LOGWARNING("Failed to find jobid in proxy_send msg");
continue; continue;
} }
if (unlikely(!json_get_int64(&proxyid, msg->json_msg, "proxy"))) { if (unlikely(!json_get_int(&proxyid, msg->json_msg, "proxy"))) {
LOGWARNING("Failed to find proxy in proxy_send msg"); LOGWARNING("Failed to find proxy in proxy_send msg");
continue; continue;
} }
@ -1548,7 +1552,7 @@ static void *proxy_send(void *arg)
continue; continue;
} }
if (unlikely(proxyid != proxy->id)) { if (unlikely(proxyid != proxy->id)) {
LOGWARNING("Proxysend for proxy %ld got message for proxy %ld!", LOGWARNING("Proxysend for proxy %d got message for proxy %d!",
proxy->id, proxyid); proxy->id, proxyid);
} }
@ -1572,16 +1576,16 @@ static void *proxy_send(void *arg)
json_decref(val); json_decref(val);
} else if (!jobid) { } else if (!jobid) {
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Proxy %ld:%s failed to find matching jobid for %sknown subproxy in proxysend", LOGNOTICE("Proxy %d:%s failed to find matching jobid for %sknown subproxy in proxysend",
proxy->id, proxy->url, subproxy ? "" : "un"); proxy->id, proxy->url, subproxy ? "" : "un");
} else { } else {
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Failed to find subproxy %ld:%d to send message to", LOGNOTICE("Failed to find subproxy %d:%d to send message to",
proxy->id, subid); proxy->id, subid);
} }
if (!ret && subproxy) { if (!ret && subproxy) {
LOGNOTICE("Proxy %ld:%d %s failed to send msg in proxy_send, dropping to reconnect", LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect",
id, subid, proxy->url); proxy->id, subid, proxy->url);
disable_subproxy(gdata, proxy, subproxy); disable_subproxy(gdata, proxy, subproxy);
} }
} }
@ -1684,39 +1688,6 @@ out:
return ret; return ret;
} }
/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring
* fields we don't use in the subproxy. */
static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi)
{
proxy_instance_t *subproxy;
mutex_lock(&gdata->lock);
if (gdata->dead_proxies) {
/* Recycle an old proxy instance if one exists */
subproxy = gdata->dead_proxies;
DL_DELETE(gdata->dead_proxies, subproxy);
subproxy->disabled = false;
} else {
subproxy = ckzalloc(sizeof(proxy_instance_t));
mutex_init(&subproxy->share_lock);
}
mutex_unlock(&gdata->lock);
subproxy->cs.ckp = subproxy->ckp = proxi->ckp;
mutex_lock(&proxi->proxy_lock);
subproxy->subid = ++proxi->subproxy_count;
mutex_unlock(&proxi->proxy_lock);
subproxy->id = proxi->id;
subproxy->url = strdup(proxi->url);
subproxy->auth = strdup(proxi->auth);
subproxy->pass = strdup(proxi->pass);
subproxy->parent = proxi;
subproxy->epfd = proxi->epfd;
return subproxy;
}
static void *proxy_recruit(void *arg) static void *proxy_recruit(void *arg)
{ {
proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg; proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg;
@ -1728,7 +1699,7 @@ static void *proxy_recruit(void *arg)
retry: retry:
recruit = false; recruit = false;
proxy = create_subproxy(gdata, parent); proxy = create_subproxy(gdata, parent, parent->url);
alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd); alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd);
if (!alive) { if (!alive) {
LOGNOTICE("Subproxy failed proxy_alive testing"); LOGNOTICE("Subproxy failed proxy_alive testing");
@ -1818,8 +1789,7 @@ static void *passthrough_recv(void *arg)
if (proxy_alive(ckp, proxi, cs, false, epfd)) { if (proxy_alive(ckp, proxi, cs, false, epfd)) {
reconnect_generator(ckp); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url);
proxi->low_id, proxi->url);
} }
alive = proxi->alive; alive = proxi->alive;
@ -1841,7 +1811,7 @@ static void *passthrough_recv(void *arg)
if (likely(ret > 0)) if (likely(ret > 0))
ret = read_socket_line(cs, 60); ret = read_socket_line(cs, 60);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Proxy %ld:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->url); proxi->id, proxi->url);
alive = proxi->alive = false; alive = proxi->alive = false;
reconnect_generator(ckp); reconnect_generator(ckp);
@ -1905,8 +1875,7 @@ static void *proxy_recv(void *arg)
} }
if (proxy_alive(ckp, proxi, cs, false, epfd)) { if (proxy_alive(ckp, proxi, cs, false, epfd)) {
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url);
proxi->low_id, proxi->url);
} }
alive = proxi->alive; alive = proxi->alive;
@ -1923,7 +1892,7 @@ static void *proxy_recv(void *arg)
reconnect_proxy(proxi); reconnect_proxy(proxi);
if (alive) { if (alive) {
LOGWARNING("Proxy %d:%s failed, attempting reconnect", LOGWARNING("Proxy %d:%s failed, attempting reconnect",
proxi->low_id, proxi->url); proxi->id, proxi->url);
alive = false; alive = false;
} }
sleep(5); sleep(5);
@ -1934,7 +1903,7 @@ static void *proxy_recv(void *arg)
* to prevent switching to unstable pools. */ * to prevent switching to unstable pools. */
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
reconnect_generator(ckp); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->url); LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->url);
proxi->reconnect_time = 0; proxi->reconnect_time = 0;
alive = true; alive = true;
} }
@ -1974,33 +1943,23 @@ static void *proxy_recv(void *arg)
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, 5);
} }
if (ret < 1) { if (ret < 1) {
LOGNOTICE("Proxy %ld:%d %s failed to epoll/read_socket_line in proxy_recv", LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv",
proxi->id, subproxy->subid, subproxy->url); proxi->id, subproxy->subid, subproxy->url);
disable_subproxy(gdata, proxi, subproxy); disable_subproxy(gdata, proxi, subproxy);
continue; continue;
} }
do { do {
if (parse_method(ckp, subproxy, cs->buf)) { /* subproxy may have been recycled here if it is not a
if (subproxy->reconnect) { * parent and reconnect was issued */
/* Call this proxy dead to allow us to fail if (parse_method(ckp, subproxy, cs->buf))
* over to a backup pool until the reconnect
* pool is up */
disable_subproxy(gdata, proxi, subproxy);
if (parent_proxy(subproxy)) {
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
subproxy->low_id, subproxy->url);
goto out;
}
}
continue; continue;
}
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
if (!parse_share(subproxy, cs->buf)) if (!parse_share(subproxy, cs->buf))
LOGWARNING("Unhandled stratum message: %s", cs->buf); LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
subproxy->id, subproxy->subid, cs->buf);
} while ((ret = read_socket_line(cs, 0)) > 0); } while ((ret = read_socket_line(cs, 0)) > 0);
} }
out:
mutex_lock(&proxi->proxy_lock);
HASH_ITER(sh, proxi->subproxies, subproxy, tmp) { HASH_ITER(sh, proxi->subproxies, subproxy, tmp) {
subproxy->disabled = true; subproxy->disabled = true;
send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid); send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid);
@ -2039,9 +1998,7 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
if (proxi->disabled) if (proxi->disabled)
continue; continue;
if (proxi->alive || subproxies_alive(proxi)) { if (proxi->alive || subproxies_alive(proxi)) {
if ((!ret) || if (!ret || proxi->id < ret->id)
(proxi->low_id < ret->low_id) ||
(proxi->low_id == ret->low_id && proxi->id > ret->id))
ret = proxi; ret = proxi;
} }
} }
@ -2066,8 +2023,8 @@ static void send_list(gdata_t *gdata, const int sockd)
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->proxies, proxy, tmp) { HASH_ITER(hh, gdata->proxies, proxy, tmp) {
JSON_CPACK(val, "{sI,si,ss,ss,sf,sb,sb,sb,si}", JSON_CPACK(val, "{si,ss,ss,sf,sb,sb,sb,si}",
"id", proxy->id, "low_id", proxy->low_id, "id", proxy->id,
"auth", proxy->auth, "pass", proxy->pass, "auth", proxy->auth, "pass", proxy->pass,
"diff", proxy->diff, "notified", proxy->notified, "diff", proxy->diff, "notified", proxy->notified,
"disabled", proxy->disabled, "alive", proxy->alive, "disabled", proxy->disabled, "alive", proxy->alive,
@ -2191,10 +2148,10 @@ reconnect:
if (proxi != cproxy) { if (proxi != cproxy) {
proxi = cproxy; proxi = cproxy;
if (!ckp->passthrough) { if (!ckp->passthrough) {
LOGWARNING("Successfully connected to proxy %d %s:%s as proxy", LOGWARNING("Successfully connected to proxy %d %s as proxy",
proxi->low_id, proxi->url, proxi->cs.port); proxi->id, proxi->url);
dealloc(buf); dealloc(buf);
ASPRINTF(&buf, "proxy=%ld", proxi->id); ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
} }
@ -2296,7 +2253,7 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy_instance_t *proxy; proxy_instance_t *proxy;
proxy = ckzalloc(sizeof(proxy_instance_t)); proxy = ckzalloc(sizeof(proxy_instance_t));
proxy->id = proxy->low_id = id; proxy->id = id;
proxy->url = strdup(ckp->proxyurl[id]); proxy->url = strdup(ckp->proxyurl[id]);
proxy->auth = strdup(ckp->proxyauth[id]); proxy->auth = strdup(ckp->proxyauth[id]);
proxy->pass = strdup(ckp->proxypass[id]); proxy->pass = strdup(ckp->proxypass[id]);
@ -2304,7 +2261,7 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy->cs.ckp = ckp; proxy->cs.ckp = ckp;
mutex_init(&proxy->notify_lock); mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock); mutex_init(&proxy->share_lock);
HASH_ADD_I64(gdata->proxies, id, proxy); HASH_ADD_INT(gdata->proxies, id, proxy);
return proxy; return proxy;
} }

Loading…
Cancel
Save