Browse Source

Add proxies to the stratifier as its notified of their existence by the generator and issue reconnects without rejecting connections when a new subscribe is discovered

master
Con Kolivas 10 years ago
parent
commit
52cd066563
  1. 15
      src/generator.c
  2. 142
      src/stratifier.c

15
src/generator.c

@ -1113,7 +1113,9 @@ static void send_subscribe(proxy_instance_t *proxi, int *sockd)
json_t *json_msg; json_t *json_msg;
char *msg; char *msg;
JSON_CPACK(json_msg, "{sssi}", "enonce1", proxi->enonce1, JSON_CPACK(json_msg, "{sisssi}",
"proxy", proxi->id,
"enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len); "nonce2len", proxi->nonce2len);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
@ -1140,7 +1142,7 @@ static void send_notify(proxy_instance_t *proxi, int *sockd)
for (i = 0; i < ni->merkles; i++) for (i = 0; i < ni->merkles; i++)
json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0]));
/* Use our own jobid instead of the server's one for easy lookup */ /* Use our own jobid instead of the server's one for easy lookup */
JSON_CPACK(json_msg, "{si,ss,si,ss,ss,so,ss,ss,ss,sb}", JSON_CPACK(json_msg, "{si,si,ss,si,ss,ss,so,ss,ss,ss,sb}", "proxy", proxi->id,
"jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len,
"coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2,
"merklehash", merkle_arr, "bbversion", ni->bbversion, "merklehash", merkle_arr, "bbversion", ni->bbversion,
@ -1546,14 +1548,11 @@ out:
return alive; return alive;
} }
static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) static void kill_proxy(proxy_instance_t *proxi)
{ {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; connsock_t *cs;
send_proc(ckp->stratifier, "reconnect");
send_proc(ckp->connector, "reject");
if (!proxi) // This shouldn't happen if (!proxi) // This shouldn't happen
return; return;
@ -1586,7 +1585,7 @@ static int proxy_loop(proc_instance_t *pi)
reconnect: reconnect:
if (proxi) { if (proxi) {
kill_proxy(ckp, proxi); kill_proxy(proxi);
reconnecting = true; reconnecting = true;
} }
proxi = live_proxy(ckp); proxi = live_proxy(ckp);
@ -1671,7 +1670,7 @@ retry:
Close(sockd); Close(sockd);
goto retry; goto retry;
out: out:
kill_proxy(ckp, proxi); kill_proxy(proxi);
Close(sockd); Close(sockd);
return ret; return ret;
} }

142
src/stratifier.c

@ -937,12 +937,20 @@ static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instan
DL_DELETE(user->clients, client); DL_DELETE(user->clients, client);
} }
static void connector_drop_client(ckpool_t *ckp, const int64_t id)
{
char buf[256];
LOGWARNING("Stratifier requesting connector drop client %"PRId64, id);
snprintf(buf, 255, "dropclient=%"PRId64, id);
send_proc(ckp->connector, buf);
}
static void drop_allclients(ckpool_t *ckp) static void drop_allclients(ckpool_t *ckp)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int disconnects = 0, kills = 0; int disconnects = 0, kills = 0;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
char buf[128];
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
@ -954,8 +962,7 @@ static void drop_allclients(ckpool_t *ckp)
} else } else
client->dropped = true; client->dropped = true;
kills++; kills++;
sprintf(buf, "dropclient=%"PRId64, client_id); connector_drop_client(ckp, client_id);
send_proc(ckp->connector, buf);
} }
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
disconnects++; disconnects++;
@ -970,62 +977,119 @@ static void drop_allclients(ckpool_t *ckp)
LOGNOTICE("Dropped %d instances", kills); LOGNOTICE("Dropped %d instances", kills);
} }
static proxy_t *__generate_proxy(sdata_t *sdata, const int id)
{
proxy_t *proxy = ckzalloc(sizeof(proxy_t));
proxy->id = id;
HASH_ADD_INT(sdata->proxies, id, proxy);
return proxy;
}
/* Find proxy by id number, generate one if none exist yet by that id */
static proxy_t *proxy_by_id(sdata_t *sdata, const int id)
{
bool new_proxy = false;
proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
HASH_FIND_INT(sdata->proxies, &id, proxy);
if (unlikely(!proxy)) {
new_proxy = true;
proxy = __generate_proxy(sdata, id);
}
mutex_unlock(&sdata->proxy_lock);
if (unlikely(new_proxy))
LOGINFO("Stratifier added new proxy %d", id);
return proxy;
}
static void reconnect_clients(sdata_t *sdata, const char *cmd);
static void update_subscribe(ckpool_t *ckp) static void update_subscribe(ckpool_t *ckp)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
proxy_t *proxy;
json_t *val; json_t *val;
int id = 0;
char *buf; char *buf;
buf = send_recv_proc(ckp->generator, "getsubscribe"); buf = send_recv_proc(ckp->generator, "getsubscribe");
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Failed to get subscribe from generator in update_notify"); LOGWARNING("Failed to get subscribe from generator in update_subscribe");
drop_allclients(ckp); drop_allclients(ckp);
return; return;
} }
LOGDEBUG("Update subscribe: %s", buf); LOGDEBUG("Update subscribe: %s", buf);
val = json_loads(buf, 0, NULL); val = json_loads(buf, 0, NULL);
free(buf); free(buf);
if (unlikely(!val)) {
LOGWARNING("Failed to json decode getsubscribe response in update_subscribe");
return;
}
json_get_int(&id, val, "proxy");
proxy = proxy_by_id(sdata, id);
mutex_lock(&sdata->proxy_lock);
if (sdata->proxy != proxy)
sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock);
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
sdata->proxy->subscribed = true; proxy->subscribed = true;
sdata->proxy->diff = ckp->startdiff; proxy->diff = ckp->startdiff;
/* Length is checked by generator */ /* Length is checked by generator */
strcpy(sdata->proxy->enonce1, json_string_value(json_object_get(val, "enonce1"))); strcpy(proxy->enonce1, json_string_value(json_object_get(val, "enonce1")));
sdata->proxy->enonce1constlen = strlen(sdata->proxy->enonce1) / 2; proxy->enonce1constlen = strlen(proxy->enonce1) / 2;
hex2bin(sdata->proxy->enonce1bin, sdata->proxy->enonce1, sdata->proxy->enonce1constlen); hex2bin(proxy->enonce1bin, proxy->enonce1, proxy->enonce1constlen);
sdata->proxy->nonce2len = json_integer_value(json_object_get(val, "nonce2len")); proxy->nonce2len = json_integer_value(json_object_get(val, "nonce2len"));
if (ckp->clientsvspeed) { if (ckp->clientsvspeed) {
if (sdata->proxy->nonce2len > 5) if (proxy->nonce2len > 5)
sdata->proxy->enonce1varlen = 4; proxy->enonce1varlen = 4;
else if (sdata->proxy->nonce2len > 3) else if (proxy->nonce2len > 3)
sdata->proxy->enonce1varlen = 2; proxy->enonce1varlen = 2;
else else
sdata->proxy->enonce1varlen = 1; proxy->enonce1varlen = 1;
} else { } else {
if (sdata->proxy->nonce2len > 7) if (proxy->nonce2len > 7)
sdata->proxy->enonce1varlen = 4; proxy->enonce1varlen = 4;
else if (sdata->proxy->nonce2len > 5) else if (proxy->nonce2len > 5)
sdata->proxy->enonce1varlen = 2; proxy->enonce1varlen = 2;
else else
sdata->proxy->enonce1varlen = 1; proxy->enonce1varlen = 1;
} }
sdata->proxy->enonce2varlen = sdata->proxy->nonce2len - sdata->proxy->enonce1varlen; proxy->enonce2varlen = proxy->nonce2len - proxy->enonce1varlen;
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
LOGNOTICE("Upstream pool extranonce2 length %d, max proxy clients %lld", LOGNOTICE("Upstream pool extranonce2 length %d, max proxy clients %lld",
sdata->proxy->nonce2len, 1ll << (sdata->proxy->enonce1varlen * 8)); proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8));
json_decref(val); json_decref(val);
drop_allclients(ckp); reconnect_clients(sdata, "");
} }
static void update_diff(ckpool_t *ckp); static void update_diff(ckpool_t *ckp);
static proxy_t *current_proxy(sdata_t *sdata)
{
proxy_t *proxy;
mutex_lock(&sdata->proxy_lock);
proxy = sdata->proxy;
mutex_unlock(&sdata->proxy_lock);
return proxy;
}
static void update_notify(ckpool_t *ckp) static void update_notify(ckpool_t *ckp)
{ {
bool new_block = false, clean; bool new_block = false, clean;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
char header[228]; char header[228];
proxy_t *proxy;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
char *buf; char *buf;
@ -1037,7 +1101,8 @@ static void update_notify(ckpool_t *ckp)
return; return;
} }
if (unlikely(!sdata->proxy->subscribed)) { proxy = current_proxy(sdata);
if (unlikely(!proxy || !proxy->subscribed)) {
LOGINFO("No valid proxy subscription to update notify yet"); LOGINFO("No valid proxy subscription to update notify yet");
return; return;
} }
@ -1088,12 +1153,12 @@ static void update_notify(ckpool_t *ckp)
update_diff(ckp); update_diff(ckp);
ck_rlock(&sdata->workbase_lock); ck_rlock(&sdata->workbase_lock);
strcpy(wb->enonce1const, sdata->proxy->enonce1); strcpy(wb->enonce1const, proxy->enonce1);
wb->enonce1constlen = sdata->proxy->enonce1constlen; wb->enonce1constlen = proxy->enonce1constlen;
memcpy(wb->enonce1constbin, sdata->proxy->enonce1bin, wb->enonce1constlen); memcpy(wb->enonce1constbin, proxy->enonce1bin, wb->enonce1constlen);
wb->enonce1varlen = sdata->proxy->enonce1varlen; wb->enonce1varlen = proxy->enonce1varlen;
wb->enonce2varlen = sdata->proxy->enonce2varlen; wb->enonce2varlen = proxy->enonce2varlen;
wb->diff = sdata->proxy->diff; wb->diff = proxy->diff;
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
add_base(ckp, wb, &new_block); add_base(ckp, wb, &new_block);
@ -3291,7 +3356,6 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
json_t *id_val, json_t *method_val, json_t *params_val, const char *address) json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
{ {
const char *method; const char *method;
char buf[256];
/* Random broken clients send something not an integer as the id so we /* Random broken clients send something not an integer as the id so we
* copy the json item for id_val as is for the response. By far the * copy the json item for id_val as is for the response. By far the
@ -3328,6 +3392,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
} }
if (unlikely(cmdmatch(method, "mining.passthrough"))) { if (unlikely(cmdmatch(method, "mining.passthrough"))) {
char buf[256];
LOGNOTICE("Adding passthrough client %"PRId64, client_id); LOGNOTICE("Adding passthrough client %"PRId64, client_id);
/* We need to inform the connector process that this client /* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly. * is a passthrough and to manage its messages accordingly.
@ -3342,8 +3408,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
/* We should only accept subscribed requests from here on */ /* We should only accept subscribed requests from here on */
if (!client->subscribed) { if (!client->subscribed) {
LOGINFO("Dropping unsubscribed client %"PRId64, client_id); LOGINFO("Dropping unsubscribed client %"PRId64, client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id); connector_drop_client(client->ckp, client_id);
send_proc(client->ckp->connector, buf);
return; return;
} }
@ -3365,8 +3430,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* stratifier process to restart since it will have lost all * stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */ * the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64, client_id); LOGINFO("Dropping unauthorised client %"PRId64, client_id);
snprintf(buf, 255, "dropclient=%"PRId64, client_id); connector_drop_client(client->ckp, client_id);
send_proc(client->ckp->connector, buf);
return; return;
} }
@ -3507,6 +3571,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
/* Client may be NULL here */ /* Client may be NULL here */
LOGNOTICE("Stratifier skipped dropped instance %"PRId64" message from server %d", LOGNOTICE("Stratifier skipped dropped instance %"PRId64" message from server %d",
msg->client_id, server); msg->client_id, server);
connector_drop_client(ckp, msg->client_id);
free_smsg(msg); free_smsg(msg);
goto out; goto out;
} }
@ -4339,7 +4404,6 @@ static void read_poolstats(ckpool_t *ckp)
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
proxy_t *proxy, *tmpproxy;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 1, threads; int ret = 1, threads;
int64_t randomiser; int64_t randomiser;
@ -4410,11 +4474,7 @@ int stratifier(proc_instance_t *pi)
if (!ckp->proxy) if (!ckp->proxy)
create_pthread(&pth_blockupdate, blockupdate, ckp); create_pthread(&pth_blockupdate, blockupdate, ckp);
else { else {
/* Generate one proxy for now */
proxy = ckzalloc(sizeof(proxy_t));
mutex_init(&sdata->proxy_lock); mutex_init(&sdata->proxy_lock);
sdata->proxy = proxy;
HASH_ADD_INT(sdata->proxies, id, proxy);
} }
mutex_init(&sdata->stats_lock); mutex_init(&sdata->stats_lock);
@ -4428,6 +4488,8 @@ int stratifier(proc_instance_t *pi)
ret = stratum_loop(ckp, pi); ret = stratum_loop(ckp, pi);
out: out:
if (ckp->proxy) { if (ckp->proxy) {
proxy_t *proxy, *tmpproxy;
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) { HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) {
HASH_DEL(sdata->proxies, proxy); HASH_DEL(sdata->proxies, proxy);

Loading…
Cancel
Save