Browse Source

Convert proxy id to int64

master
Con Kolivas 10 years ago
parent
commit
14873b44b7
  1. 126
      src/generator.c
  2. 98
      src/stratifier.c

126
src/generator.c

@ -84,7 +84,7 @@ struct proxy_instance {
connsock_t *cs; connsock_t *cs;
server_instance_t *si; server_instance_t *si;
bool passthrough; bool passthrough;
int id; /* Proxy server id*/ int64_t id; /* Proxy server id*/
int subid; /* Subproxy id */ int subid; /* Subproxy id */
const char *auth; const char *auth;
@ -575,7 +575,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
retry: retry:
parsed = true; parsed = true;
if (!(buf = new_proxy_line(cs))) { if (!(buf = new_proxy_line(cs))) {
LOGNOTICE("Proxy %d:%d %s failed to receive line in parse_subscribe", LOGNOTICE("Proxy %ld:%d %s failed to receive line in parse_subscribe",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
goto out; goto out;
} }
@ -608,7 +608,7 @@ retry:
buf = NULL; buf = NULL;
goto retry; goto retry;
} }
LOGNOTICE("Proxy %d:%d %s failed to parse subscribe response in parse_subscribe", LOGNOTICE("Proxy %ld:%d %s failed to parse subscribe response in parse_subscribe",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
goto out; goto out;
} }
@ -653,10 +653,10 @@ retry:
} }
if (size < 3) { if (size < 3) {
if (!proxi->subid) { if (!proxi->subid) {
LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners", LOGWARNING("Proxy %ld %s Nonce2 length %d too small for fast miners",
proxi->id, proxi->si->url, size); proxi->id, proxi->si->url, size);
} else { } else {
LOGNOTICE("Proxy %d:%d Nonce2 length %d too small for fast miners", LOGNOTICE("Proxy %ld:%d Nonce2 length %d too small for fast miners",
proxi->id, proxi->subid, size); proxi->id, proxi->subid, size);
} }
} }
@ -665,11 +665,11 @@ retry:
/* Set the number of clients per proxy on the parent proxy */ /* Set the number of clients per proxy on the parent proxy */
int64_t clients_per_proxy = 1ll << ((size - 3) * 8); int64_t clients_per_proxy = 1ll << ((size - 3) * 8);
LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url, LOGNOTICE("Proxy %ld:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url,
clients_per_proxy); clients_per_proxy);
} }
LOGINFO("Found notify for proxy %d:%d with enonce %s nonce2len %d", proxi->id, LOGINFO("Found notify for proxy %ld:%d with enonce %s nonce2len %d", proxi->id,
proxi->subid, proxi->enonce1, proxi->nonce2len); proxi->subid, proxi->enonce1, proxi->nonce2len);
ret = true; ret = true;
@ -708,7 +708,7 @@ retry:
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to send message in subscribe_stratum", LOGNOTICE("Proxy %ld:%d %s failed to send message in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
goto out; goto out;
} }
@ -717,23 +717,23 @@ retry:
goto out; goto out;
if (proxi->no_params) { if (proxi->no_params) {
LOGNOTICE("Proxy %d:%d %s failed all subscription options in subscribe_stratum", LOGNOTICE("Proxy %ld:%d %s failed all subscription options in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
goto out; goto out;
} }
if (proxi->sessionid) { if (proxi->sessionid) {
LOGINFO("Proxy %d:%d %s failed sessionid reconnect in subscribe_stratum, retrying without", LOGINFO("Proxy %ld:%d %s failed sessionid reconnect in subscribe_stratum, retrying without",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
proxi->no_sessionid = true; proxi->no_sessionid = true;
dealloc(proxi->sessionid); dealloc(proxi->sessionid);
} else { } else {
LOGINFO("Proxy %d:%d %s failed connecting with parameters in subscribe_stratum, retrying without", LOGINFO("Proxy %ld:%d %s failed connecting with parameters in subscribe_stratum, retrying without",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
proxi->no_params = true; proxi->no_params = true;
} }
ret = connect_proxy(cs); ret = connect_proxy(cs);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to reconnect in subscribe_stratum", LOGNOTICE("Proxy %ld:%d %s failed to reconnect in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
goto out; goto out;
} }
@ -822,7 +822,7 @@ static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val)
goto out; goto out;
} }
LOGDEBUG("Received new notify from proxy %d:%d", proxi->id, proxi->subid); LOGDEBUG("Received new notify from proxy %ld:%d", proxi->id, proxi->subid);
ni = ckzalloc(sizeof(notify_instance_t)); ni = ckzalloc(sizeof(notify_instance_t));
ni->jobid = job_id; ni->jobid = job_id;
jobidbuf = json_string_value(job_id); jobidbuf = json_string_value(job_id);
@ -934,18 +934,18 @@ static proxy_instance_t *__subproxy_by_id(proxy_instance_t *proxy, const int sub
/* Add to the dead list to be recycled if possible */ /* Add to the dead list to be recycled if possible */
static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy) static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy)
{ {
LOGINFO("Recycling data from proxy %d:%d", proxy->id, proxy->subid); LOGINFO("Recycling data from proxy %ld:%d", proxy->id, proxy->subid);
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
DL_APPEND(gdata->dead_proxies, proxy); DL_APPEND(gdata->dead_proxies, proxy);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
} }
static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid) static void send_stratifier_deadproxy(ckpool_t *ckp, const int64_t id, const int subid)
{ {
char buf[256]; char buf[256];
sprintf(buf, "deadproxy=%d:%d", id, subid); sprintf(buf, "deadproxy=%ld:%d", id, subid);
async_send_proc(ckp, ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
} }
@ -1065,7 +1065,7 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
newproxi->cs->ckp = ckp; newproxi->cs->ckp = ckp;
newproxi->id = newsi->id; newproxi->id = newsi->id;
newproxi->subproxy_count = ++proxi->subproxy_count; newproxi->subproxy_count = ++proxi->subproxy_count;
HASH_REPLACE_INT(gdata->proxies, id, newproxi, proxi); HASH_REPLACE_I64(gdata->proxies, id, newproxi, proxi);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
/* Old proxy memory is basically lost here */ /* Old proxy memory is basically lost here */
@ -1085,7 +1085,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
if (!proxi->diff) if (!proxi->diff)
return; return;
JSON_CPACK(json_msg, "{sisisf}", JSON_CPACK(json_msg, "{sIsisf}",
"proxy", proxy->id, "proxy", proxy->id,
"subproxy", proxi->subid, "subproxy", proxi->subid,
"diff", proxi->diff); "diff", proxi->diff);
@ -1109,7 +1109,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_
for (i = 0; i < ni->merkles; i++) for (i = 0; i < ni->merkles; i++)
json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0]));
/* Use our own jobid instead of the server's one for easy lookup */ /* Use our own jobid instead of the server's one for easy lookup */
JSON_CPACK(json_msg, "{sisisisssisssssosssssssb}", JSON_CPACK(json_msg, "{sIsisisssisssssosssssssb}",
"proxy", proxy->id, "subproxy", proxi->subid, "proxy", proxy->id, "subproxy", proxi->subid,
"jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len,
"coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2,
@ -1174,7 +1174,7 @@ static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg
goto out; goto out;
} }
LOGDEBUG("Proxy %d:%d received method %s", proxi->id, proxi->subid, buf); LOGDEBUG("Proxy %ld:%d received method %s", proxi->id, proxi->subid, buf);
if (cmdmatch(buf, "mining.notify")) { if (cmdmatch(buf, "mining.notify")) {
ret = parse_notify(ckp, proxi, params); ret = parse_notify(ckp, proxi, params);
goto out; goto out;
@ -1225,7 +1225,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to send message in auth_stratum", LOGNOTICE("Proxy %ld:%d %s failed to send message in auth_stratum",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
Close(cs->fd); Close(cs->fd);
goto out; goto out;
@ -1237,7 +1237,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
free(buf); free(buf);
buf = next_proxy_line(cs, proxi); buf = next_proxy_line(cs, proxi);
if (!buf) { if (!buf) {
LOGNOTICE("Proxy %d:%d %s failed to receive line in auth_stratum", LOGNOTICE("Proxy %ld:%d %s failed to receive line in auth_stratum",
proxi->id, proxi->subid, proxi->si->url); proxi->id, proxi->subid, proxi->si->url);
ret = false; ret = false;
goto out; goto out;
@ -1247,20 +1247,20 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
val = json_msg_result(buf, &res_val, &err_val); val = json_msg_result(buf, &res_val, &err_val);
if (!val) { if (!val) {
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", LOGWARNING("Proxy %ld:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf); proxi->id, proxi->subid, proxi->si->url, buf);
goto out; goto out;
} }
if (err_val && !json_is_null(err_val)) { if (err_val && !json_is_null(err_val)) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s", LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum due to err_val, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf); proxi->id, proxi->subid, proxi->si->url, buf);
goto out; goto out;
} }
if (res_val) { if (res_val) {
ret = json_is_true(res_val); ret = json_is_true(res_val);
if (!ret) { if (!ret) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s", LOGWARNING("Proxy %ld:%d %s failed to authorise in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->si->url, buf); proxi->id, proxi->subid, proxi->si->url, buf);
goto out; goto out;
} }
@ -1268,7 +1268,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
/* No result and no error but successful val means auth success */ /* No result and no error but successful val means auth success */
ret = true; ret = true;
} }
LOGINFO("Proxy %d:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->si->url); LOGINFO("Proxy %ld:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->si->url);
out: out:
if (val) if (val)
json_decref(val); json_decref(val);
@ -1286,12 +1286,12 @@ out:
return ret; return ret;
} }
static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int64_t id)
{ {
proxy_instance_t *proxi; proxy_instance_t *proxi;
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_FIND_INT(gdata->proxies, &id, proxi); HASH_FIND_I64(gdata->proxies, &id, proxi);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
return proxi; return proxi;
@ -1302,8 +1302,8 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_t *json_msg; json_t *json_msg;
char *msg, *buf; char *msg, *buf;
JSON_CPACK(json_msg, "{sisisssi}", JSON_CPACK(json_msg, "{sIsisssi}",
"proxy", proxi->parent->id, "proxy", proxi->id,
"subproxy", proxi->subid, "subproxy", proxi->subid,
"enonce1", proxi->enonce1, "enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len); "nonce2len", proxi->nonce2len);
@ -1331,24 +1331,25 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int subid
static void drop_proxy(gdata_t *gdata, const char *buf) static void drop_proxy(gdata_t *gdata, const char *buf)
{ {
proxy_instance_t *proxy, *subproxy; proxy_instance_t *proxy, *subproxy;
int id = 0, subid = 0; int64_t id = 0;
int subid = 0;
sscanf(buf, "dropproxy=%d:%d", &id, &subid); sscanf(buf, "dropproxy=%ld:%d", &id, &subid);
if (unlikely(!subid)) { if (unlikely(!subid)) {
LOGWARNING("Generator asked to drop parent proxy %d", id); LOGWARNING("Generator asked to drop parent proxy %ld", id);
return; return;
} }
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGINFO("Generator asked to drop subproxy from non-existent parent %d", id); LOGINFO("Generator asked to drop subproxy from non-existent parent %ld", id);
return; return;
} }
subproxy = subproxy_by_id(proxy, subid); subproxy = subproxy_by_id(proxy, subid);
if (!subproxy) { if (!subproxy) {
LOGINFO("Generator asked to drop non-existent subproxy %d:%d", id, subid); LOGINFO("Generator asked to drop non-existent subproxy %ld:%d", id, subid);
return; return;
} }
LOGNOTICE("Generator asked to drop proxy %d:%d", id, subid); LOGNOTICE("Generator asked to drop proxy %ld:%d", id, subid);
disable_subproxy(gdata, proxy, subproxy); disable_subproxy(gdata, proxy, subproxy);
} }
@ -1364,33 +1365,33 @@ static void submit_share(gdata_t *gdata, json_t *val)
{ {
proxy_instance_t *proxy, *proxi; proxy_instance_t *proxy, *proxi;
ckpool_t *ckp = gdata->ckp; ckpool_t *ckp = gdata->ckp;
int64_t client_id, id;
stratum_msg_t *msg; stratum_msg_t *msg;
share_msg_t *share; share_msg_t *share;
int64_t client_id; int subid;
int id, subid;
/* Get the client id so we can tell the stratifier to drop it if the /* Get the client id so we can tell the stratifier to drop it if the
* proxy it's bound to is not functional */ * proxy it's bound to is not functional */
json_get_int64(&client_id, val, "client_id"); json_get_int64(&client_id, val, "client_id");
json_get_int(&id, val, "proxy"); json_get_int64(&id, val, "proxy");
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGNOTICE("Client %"PRId64" sending shares to non existent proxy %d, dropping", LOGNOTICE("Client %"PRId64" sending shares to non existent proxy %ld, dropping",
client_id, id); client_id, id);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
proxi = subproxy_by_id(proxy, subid); proxi = subproxy_by_id(proxy, subid);
if (unlikely(!proxi)) { if (unlikely(!proxi)) {
LOGNOTICE("Client %"PRId64" sending shares to non existent subproxy %d:%d, dropping", LOGNOTICE("Client %"PRId64" sending shares to non existent subproxy %ld:%d, dropping",
client_id, id, subid); client_id, id, subid);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
if (!proxi->alive) { if (!proxi->alive) {
LOGNOTICE("Client %"PRId64" sending shares to dead subproxy %d:%d, dropping", LOGNOTICE("Client %"PRId64" sending shares to dead subproxy %ld:%d, dropping",
client_id, id, subid); client_id, id, subid);
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
return json_decref(val); return json_decref(val);
@ -1456,11 +1457,11 @@ static bool parse_share(proxy_instance_t *proxi, const char *buf)
* so long as we recognised it as a share response */ * so long as we recognised it as a share response */
ret = true; ret = true;
if (!share) { if (!share) {
LOGINFO("Proxy %d:%d failed to find matching share to result: %s", LOGINFO("Proxy %ld:%d failed to find matching share to result: %s",
proxi->id, proxi->subid, buf); proxi->id, proxi->subid, buf);
goto out; goto out;
} }
LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid, LOGINFO("Proxy %ld:%d share result %s from client %d", proxi->id, proxi->subid,
buf, share->client_id); buf, share->client_id);
free(share); free(share);
out: out:
@ -1480,20 +1481,19 @@ static void *proxy_send(void *arg)
rename_proc("proxysend"); rename_proc("proxysend");
while (42) { while (42) {
int64_t proxyid = 0, client_id = 0, id;
proxy_instance_t *subproxy; proxy_instance_t *subproxy;
int proxyid = 0, subid = 0;
notify_instance_t *ni; notify_instance_t *ni;
int64_t client_id = 0;
json_t *jobid = NULL; json_t *jobid = NULL;
stratum_msg_t *msg; stratum_msg_t *msg;
bool ret = true; bool ret = true;
int subid = 0;
json_t *val; json_t *val;
int64_t id;
tv_t now; tv_t now;
ts_t abs; ts_t abs;
if (unlikely(proxy->reconnect)) { if (unlikely(proxy->reconnect)) {
LOGINFO("Shutting down proxy_send thread for proxy %d to reconnect", LOGINFO("Shutting down proxy_send thread for proxy %ld to reconnect",
proxy->id); proxy->id);
break; break;
} }
@ -1515,10 +1515,10 @@ static void *proxy_send(void *arg)
json_get_int(&subid, msg->json_msg, "subproxy"); json_get_int(&subid, msg->json_msg, "subproxy");
json_get_int64(&id, msg->json_msg, "jobid"); json_get_int64(&id, msg->json_msg, "jobid");
json_get_int(&proxyid, msg->json_msg, "proxy"); json_get_int64(&proxyid, msg->json_msg, "proxy");
json_get_int64(&client_id, msg->json_msg, "client_id"); json_get_int64(&client_id, msg->json_msg, "client_id");
if (unlikely(proxyid != proxy->id)) { if (unlikely(proxyid != proxy->id)) {
LOGWARNING("Proxysend for proxy %d got message for proxy %d!", LOGWARNING("Proxysend for proxy %ld got message for proxy %ld!",
proxy->id, proxyid); proxy->id, proxyid);
} }
@ -1542,17 +1542,17 @@ static void *proxy_send(void *arg)
json_decref(val); json_decref(val);
} else if (!jobid) { } else if (!jobid) {
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Proxy %d:%s failed to find matching jobid for %sknown subproxy in proxysend", LOGNOTICE("Proxy %ld:%s failed to find matching jobid for %sknown subproxy in proxysend",
proxy->id, proxy->si->url, subproxy ? "" : "un"); proxy->id, proxy->si->url, subproxy ? "" : "un");
} else { } else {
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Failed to find subproxy %d:%d to send message to", LOGNOTICE("Failed to find subproxy %ld:%d to send message to",
proxy->id, subid); proxy->id, subid);
} }
json_decref(msg->json_msg); json_decref(msg->json_msg);
free(msg); free(msg);
if (!ret && subproxy) { if (!ret && subproxy) {
LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect", LOGNOTICE("Proxy %ld:%d %s failed to send msg in proxy_send, dropping to reconnect",
proxy->id, proxy->subid, proxy->si->url); proxy->id, proxy->subid, proxy->si->url);
disable_subproxy(gdata, proxy, subproxy); disable_subproxy(gdata, proxy, subproxy);
} }
@ -1764,7 +1764,7 @@ static void *passthrough_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
reconnect_generator(ckp); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %ld:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
alive = proxi->alive; alive = proxi->alive;
@ -1787,7 +1787,7 @@ static void *passthrough_recv(void *arg)
if (likely(ret > 0)) if (likely(ret > 0))
ret = read_socket_line(cs, 60); ret = read_socket_line(cs, 60);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %ld:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
alive = proxi->alive = false; alive = proxi->alive = false;
reconnect_generator(ckp); reconnect_generator(ckp);
@ -1851,7 +1851,7 @@ static void *proxy_recv(void *arg)
} }
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) { if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %ld:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
alive = proxi->alive; alive = proxi->alive;
@ -1869,7 +1869,7 @@ static void *proxy_recv(void *arg)
while (!subproxies_alive(proxi)) { while (!subproxies_alive(proxi)) {
reconnect_proxy(proxi); reconnect_proxy(proxi);
if (alive) { if (alive) {
LOGWARNING("Proxy %d:%s failed, attempting reconnect", LOGWARNING("Proxy %ld:%s failed, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
alive = false; alive = false;
reconnect_generator(ckp); reconnect_generator(ckp);
@ -1881,7 +1881,7 @@ static void *proxy_recv(void *arg)
/* Wait 30 seconds before declaring this upstream pool alive /* Wait 30 seconds before declaring this upstream pool alive
* to prevent switching to unstable pools. */ * to prevent switching to unstable pools. */
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); LOGWARNING("Proxy %ld:%s recovered", proxi->id, proxi->si->url);
proxi->reconnect_time = 0; proxi->reconnect_time = 0;
reconnect_generator(ckp); reconnect_generator(ckp);
alive = true; alive = true;
@ -1919,7 +1919,7 @@ static void *proxy_recv(void *arg)
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, 5);
} }
if (ret < 1) { if (ret < 1) {
LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", LOGNOTICE("Proxy %ld:%d %s failed to epoll/read_socket_line in proxy_recv",
proxi->id, subproxy->subid, subproxy->si->url); proxi->id, subproxy->subid, subproxy->si->url);
disable_subproxy(gdata, proxi, subproxy); disable_subproxy(gdata, proxi, subproxy);
continue; continue;
@ -1931,7 +1931,7 @@ static void *proxy_recv(void *arg)
* pool is up */ * pool is up */
disable_subproxy(gdata, proxi, subproxy); disable_subproxy(gdata, proxi, subproxy);
if (parent_proxy(subproxy)) { if (parent_proxy(subproxy)) {
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", LOGWARNING("Proxy %ld:%s reconnect issue, dropping existing connection",
subproxy->id, subproxy->si->url); subproxy->id, subproxy->si->url);
break; break;
} else } else
@ -1971,7 +1971,7 @@ static void setup_proxies(ckpool_t *ckp, gdata_t *gdata)
si = ckp->servers[i]; si = ckp->servers[i];
proxi = si->data; proxi = si->data;
proxi->id = i; proxi->id = i;
HASH_ADD_INT(gdata->proxies, id, proxi); HASH_ADD_I64(gdata->proxies, id, proxi);
if (ckp->passthrough) { if (ckp->passthrough) {
create_pthread(&proxi->pth_precv, passthrough_recv, proxi); create_pthread(&proxi->pth_precv, passthrough_recv, proxi);
proxi->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); proxi->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
@ -2029,10 +2029,10 @@ reconnect:
proxi = cproxy; proxi = cproxy;
if (!ckp->passthrough) { if (!ckp->passthrough) {
connsock_t *cs = proxi->cs; connsock_t *cs = proxi->cs;
LOGWARNING("Successfully connected to proxy %d %s:%s as proxy", LOGWARNING("Successfully connected to proxy %ld %s:%s as proxy",
proxi->id, cs->url, cs->port); proxi->id, cs->url, cs->port);
dealloc(buf); dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id); ASPRINTF(&buf, "proxy=%ld", proxi->id);
async_send_proc(ckp, ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
} }
} }

98
src/stratifier.c

@ -279,7 +279,7 @@ struct stratum_instance {
sdata_t *sdata; /* Which sdata this client is bound to */ sdata_t *sdata; /* Which sdata this client is bound to */
proxy_t *proxy; /* Proxy this is bound to in proxy mode */ proxy_t *proxy; /* Proxy this is bound to in proxy mode */
int proxyid; /* Which proxy id */ int64_t proxyid; /* Which proxy id */
int subproxyid; /* Which subproxy */ int subproxyid; /* Which subproxy */
int64_t notify_id; /* Which notify_id from the subproxy did we join */ int64_t notify_id; /* Which notify_id from the subproxy did we join */
}; };
@ -305,7 +305,7 @@ struct proxy_base {
UT_hash_handle sh; /* For subproxy hashlist */ UT_hash_handle sh; /* For subproxy hashlist */
proxy_t *next; /* For retired subproxies */ proxy_t *next; /* For retired subproxies */
proxy_t *prev; proxy_t *prev;
int id; int64_t id;
int subid; int subid;
double diff; double diff;
@ -1022,7 +1022,7 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata)
return dsdata; return dsdata;
} }
static proxy_t *__generate_proxy(sdata_t *sdata, const int id) static proxy_t *__generate_proxy(sdata_t *sdata, const int64_t id)
{ {
proxy_t *proxy = ckzalloc(sizeof(proxy_t)); proxy_t *proxy = ckzalloc(sizeof(proxy_t));
@ -1033,7 +1033,7 @@ static proxy_t *__generate_proxy(sdata_t *sdata, const int id)
proxy->parent = proxy; proxy->parent = proxy;
/* subid == 0 on parent proxy */ /* subid == 0 on parent proxy */
HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy); HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy);
HASH_ADD_INT(sdata->proxies, id, proxy); HASH_ADD_I64(sdata->proxies, id, proxy);
sdata->proxy_count++; sdata->proxy_count++;
return proxy; return proxy;
} }
@ -1051,22 +1051,22 @@ static proxy_t *__generate_subproxy(sdata_t *sdata, proxy_t *proxy, const int su
return subproxy; return subproxy;
} }
static proxy_t *__existing_proxy(const sdata_t *sdata, const int id) static proxy_t *__existing_proxy(const sdata_t *sdata, const int64_t id)
{ {
proxy_t *proxy; proxy_t *proxy;
HASH_FIND_INT(sdata->proxies, &id, proxy); HASH_FIND_I64(sdata->proxies, &id, proxy);
return proxy; return proxy;
} }
/* Find proxy by id number, generate one if none exist yet by that id */ /* Find proxy by id number, generate one if none exist yet by that id */
static proxy_t *__proxy_by_id(sdata_t *sdata, const int id) static proxy_t *__proxy_by_id(sdata_t *sdata, const int64_t id)
{ {
proxy_t *proxy = __existing_proxy(sdata, id); proxy_t *proxy = __existing_proxy(sdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
proxy = __generate_proxy(sdata, id); proxy = __generate_proxy(sdata, id);
LOGNOTICE("Stratifier added new proxy %d", id); LOGNOTICE("Stratifier added new proxy %ld", id);
} }
return proxy; return proxy;
@ -1086,12 +1086,12 @@ static proxy_t *__subproxy_by_id(sdata_t *sdata, proxy_t *proxy, const int subid
if (!subproxy) { if (!subproxy) {
subproxy = __generate_subproxy(sdata, proxy, subid); subproxy = __generate_subproxy(sdata, proxy, subid);
LOGINFO("Stratifier added new subproxy %d:%d", proxy->id, subid); LOGINFO("Stratifier added new subproxy %ld:%d", proxy->id, subid);
} }
return subproxy; return subproxy;
} }
static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid) static proxy_t *subproxy_by_id(sdata_t *sdata, const int64_t id, const int subid)
{ {
proxy_t *proxy, *subproxy; proxy_t *proxy, *subproxy;
@ -1103,7 +1103,7 @@ static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid)
return subproxy; return subproxy;
} }
static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid) static proxy_t *existing_subproxy(sdata_t *sdata, const int64_t id, const int subid)
{ {
proxy_t *proxy, *subproxy = NULL; proxy_t *proxy, *subproxy = NULL;
@ -1173,7 +1173,7 @@ static void reconnect_clients(sdata_t *sdata)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged for reconnect to proxy %d", reconnects, LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects,
proxy->id); proxy->id);
if (headroom < 42) if (headroom < 42)
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp);
@ -1191,7 +1191,7 @@ static proxy_t *current_proxy(sdata_t *sdata)
return proxy; return proxy;
} }
static void dead_parent_proxy(sdata_t *sdata, const int id) static void dead_parent_proxy(sdata_t *sdata, const int64_t id)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0; int reconnects = 0;
@ -1216,20 +1216,20 @@ static void dead_parent_proxy(sdata_t *sdata, const int id)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %d", LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld",
reconnects, id); reconnects, id);
if (headroom < 42) if (headroom < 42)
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp);
} }
} }
static void new_proxy(sdata_t *sdata, const int id) static void new_proxy(sdata_t *sdata, const int64_t id)
{ {
proxy_t *proxy, *subproxy, *tmp, *proxy_list = NULL; proxy_t *proxy, *subproxy, *tmp, *proxy_list = NULL;
bool exists = false, current = false; bool exists = false, current = false;
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
HASH_FIND_INT(sdata->proxies, &id, proxy); HASH_FIND_I64(sdata->proxies, &id, proxy);
if (proxy) { if (proxy) {
exists = true; exists = true;
HASH_DEL(sdata->proxies, proxy); HASH_DEL(sdata->proxies, proxy);
@ -1255,7 +1255,7 @@ static void new_proxy(sdata_t *sdata, const int id)
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
if (exists) { if (exists) {
LOGNOTICE("Stratifier replaced old proxy instance %d", id); LOGNOTICE("Stratifier replaced old proxy %ld", id);
dead_parent_proxy(sdata, id); dead_parent_proxy(sdata, id);
} }
} }
@ -1263,9 +1263,10 @@ static void new_proxy(sdata_t *sdata, const int id)
static void update_subscribe(ckpool_t *ckp, const char *cmd) static void update_subscribe(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
int id = 0, subid = 0;
const char *buf; const char *buf;
proxy_t *proxy; proxy_t *proxy;
int64_t id = 0;
int subid = 0;
json_t *val; json_t *val;
if (unlikely(strlen(cmd) < 11)) { if (unlikely(strlen(cmd) < 11)) {
@ -1279,7 +1280,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode subscribe response in update_subscribe %s", buf); LOGWARNING("Failed to json decode subscribe response in update_subscribe %s", buf);
return; return;
} }
if (unlikely(!json_get_int(&id, val, "proxy"))) { if (unlikely(!json_get_int64(&id, val, "proxy"))) {
LOGWARNING("Failed to json decode proxy value in update_subscribe %s", buf); LOGWARNING("Failed to json decode proxy value in update_subscribe %s", buf);
return; return;
} }
@ -1290,9 +1291,9 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
if (!subid) { if (!subid) {
new_proxy(sdata, id); new_proxy(sdata, id);
LOGNOTICE("Got updated subscribe for proxy %d", id); LOGNOTICE("Got updated subscribe for proxy %ld", id);
} else } else
LOGINFO("Got updated subscribe for proxy %d:%d", id, subid); LOGINFO("Got updated subscribe for proxy %ld:%d", id, subid);
proxy = subproxy_by_id(sdata, id, subid); proxy = subproxy_by_id(sdata, id, subid);
dsdata = proxy->sdata; dsdata = proxy->sdata;
@ -1323,10 +1324,10 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
ck_wunlock(&dsdata->workbase_lock); ck_wunlock(&dsdata->workbase_lock);
if (subid) { if (subid) {
LOGINFO("Upstream pool %d:%d extranonce2 length %d, max proxy clients %"PRId64, LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64,
id, subid, proxy->nonce2len, proxy->max_clients); id, subid, proxy->nonce2len, proxy->max_clients);
} else { } else {
LOGNOTICE("Upstream pool %d extranonce2 length %d, max proxy clients %"PRId64, LOGNOTICE("Upstream pool %ld extranonce2 length %d, max proxy clients %"PRId64,
id, proxy->nonce2len, proxy->max_clients); id, proxy->nonce2len, proxy->max_clients);
} }
@ -1342,9 +1343,10 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
bool new_block = false, clean; bool new_block = false, clean;
int i, id = 0, subid = 0; int i, subid = 0;
char header[228]; char header[228];
const char *buf; const char *buf;
int64_t id = 0;
proxy_t *proxy; proxy_t *proxy;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
@ -1361,17 +1363,17 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode in update_notify"); LOGWARNING("Failed to json decode in update_notify");
return; return;
} }
json_get_int(&id, val, "proxy"); json_get_int64(&id, val, "proxy");
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (unlikely(!proxy || !proxy->subscribed)) { if (unlikely(!proxy || !proxy->subscribed)) {
LOGINFO("No valid proxy %d:%d subscription to update notify yet", id, subid); LOGINFO("No valid proxy %ld:%d subscription to update notify yet", id, subid);
goto out; goto out;
} }
if (!subid) if (!subid)
LOGNOTICE("Got updated notify for proxy %d", id); LOGNOTICE("Got updated notify for proxy %ld", id);
else else
LOGINFO("Got updated notify for proxy %d:%d", id, subid); LOGINFO("Got updated notify for proxy %ld:%d", id, subid);
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
wb->ckp = ckp; wb->ckp = ckp;
@ -1425,15 +1427,15 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
add_base(ckp, dsdata, wb, &new_block); add_base(ckp, dsdata, wb, &new_block);
if (new_block) { if (new_block) {
if (subid) if (subid)
LOGINFO("Block hash on proxy %d:%d changed to %s", id, subid, dsdata->lastswaphash); LOGINFO("Block hash on proxy %ld:%d changed to %s", id, subid, dsdata->lastswaphash);
else else
LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash); LOGNOTICE("Block hash on proxy %ld changed to %s", id, dsdata->lastswaphash);
} }
if (parent_proxy(proxy) && proxy == current_proxy(sdata)) if (parent_proxy(proxy) && proxy == current_proxy(sdata))
reconnect_clients(sdata); reconnect_clients(sdata);
clean |= new_block; clean |= new_block;
LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, LOGINFO("Proxy %ld:%d broadcast updated stratum notify with%s clean", id,
subid, clean ? "" : "out"); subid, clean ? "" : "out");
stratum_broadcast_update(dsdata, wb, clean); stratum_broadcast_update(dsdata, wb, clean);
out: out:
@ -1447,9 +1449,10 @@ static void update_diff(ckpool_t *ckp, const char *cmd)
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
double old_diff, diff; double old_diff, diff;
int id = 0, subid = 0;
const char *buf; const char *buf;
int64_t id = 0;
proxy_t *proxy; proxy_t *proxy;
int subid = 0;
json_t *val; json_t *val;
if (unlikely(strlen(cmd) < 6)) { if (unlikely(strlen(cmd) < 6)) {
@ -1464,15 +1467,15 @@ static void update_diff(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode in update_diff"); LOGWARNING("Failed to json decode in update_diff");
return; return;
} }
json_get_int(&id, val, "proxy"); json_get_int64(&id, val, "proxy");
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
json_dblcpy(&diff, val, "diff"); json_dblcpy(&diff, val, "diff");
json_decref(val); json_decref(val);
LOGINFO("Got updated diff for proxy %d:%d", id, subid); LOGINFO("Got updated diff for proxy %ld:%d", id, subid);
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (!proxy) { if (!proxy) {
LOGINFO("No existing subproxy %d:%d to update diff", id, subid); LOGINFO("No existing subproxy %ld:%d to update diff", id, subid);
return; return;
} }
@ -1512,11 +1515,11 @@ static void update_diff(ckpool_t *ckp, const char *cmd)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
} }
static void generator_drop_proxy(ckpool_t *ckp, const int id, const int subid) static void generator_drop_proxy(ckpool_t *ckp, const int64_t id, const int subid)
{ {
char msg[256]; char msg[256];
sprintf(msg, "dropproxy=%d:%d", id, subid); sprintf(msg, "dropproxy=%ld:%d", id, subid);
send_generator(ckp, msg, GEN_LAX); send_generator(ckp, msg, GEN_LAX);
} }
@ -2226,17 +2229,17 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
* this proxy we are only given this message if all clients must move. */ * this proxy we are only given this message if all clients must move. */
static void set_proxy(sdata_t *sdata, const char *buf) static void set_proxy(sdata_t *sdata, const char *buf)
{ {
int64_t id = 0;
proxy_t *proxy; proxy_t *proxy;
int id = 0;
sscanf(buf, "proxy=%d", &id); sscanf(buf, "proxy=%ld", &id);
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
proxy = __proxy_by_id(sdata, id); proxy = __proxy_by_id(sdata, id);
sdata->proxy = proxy; sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
LOGNOTICE("Stratifier setting active proxy to %d", id); LOGNOTICE("Stratifier setting active proxy to %ld", id);
if (proxy->notify_id != -1) if (proxy->notify_id != -1)
reconnect_clients(sdata); reconnect_clients(sdata);
} }
@ -2244,16 +2247,16 @@ static void set_proxy(sdata_t *sdata, const char *buf)
static void dead_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int id = 0, subid = 0; int64_t headroom, id = 0;
int reconnects = 0; int reconnects = 0;
int64_t headroom;
proxy_t *proxy; proxy_t *proxy;
int subid = 0;
sscanf(buf, "deadproxy=%d:%d", &id, &subid); sscanf(buf, "deadproxy=%ld:%d", &id, &subid);
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (proxy) if (proxy)
proxy->dead = true; proxy->dead = true;
LOGNOTICE("Stratifier dropping clients from proxy %d:%d", id, subid); LOGNOTICE("Stratifier dropping clients from proxy %ld:%d", id, subid);
headroom = current_headroom(sdata, &proxy); headroom = current_headroom(sdata, &proxy);
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
@ -2270,7 +2273,7 @@ static void dead_proxy(sdata_t *sdata, const char *buf)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %d:%d", reconnects, LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects,
id, subid); id, subid);
if (headroom < 42) if (headroom < 42)
generator_recruit(sdata->ckp); generator_recruit(sdata->ckp);
@ -2578,7 +2581,8 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien
static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata) static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata)
{ {
proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub; proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub;
int best_id, best_subid = 0; int best_subid = 0;
int64_t best_id;
if (!ckp->proxy || ckp->passthrough) if (!ckp->proxy || ckp->passthrough)
return ckp_sdata; return ckp_sdata;
@ -3521,7 +3525,7 @@ static void submit_share(stratum_instance_t *client, const int64_t jobid, const
char *msg; char *msg;
sprintf(enonce2, "%s%s", client->enonce1var, nonce2); sprintf(enonce2, "%s%s", client->enonce1var, nonce2);
JSON_CPACK(json_msg, "{sIsssssssIsisi}", "jobid", jobid, "nonce2", enonce2, JSON_CPACK(json_msg, "{sIsssssssIsIsi}", "jobid", jobid, "nonce2", enonce2,
"ntime", ntime, "nonce", nonce, "client_id", client->id, "ntime", ntime, "nonce", nonce, "client_id", client->id,
"proxy", client->proxyid, "subproxy", client->subproxyid); "proxy", client->proxyid, "subproxy", client->subproxyid);
msg = json_dumps(json_msg, 0); msg = json_dumps(json_msg, 0);

Loading…
Cancel
Save