From 52cd0665637eb22ef3bf48100be855d0b9c4949b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 6 Feb 2015 14:40:22 +1100 Subject: [PATCH] Add proxies to the stratifier as its notified of their existence by the generator and issue reconnects without rejecting connections when a new subscribe is discovered --- src/generator.c | 15 +++-- src/stratifier.c | 142 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 109 insertions(+), 48 deletions(-) diff --git a/src/generator.c b/src/generator.c index e5e82962..77782403 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1113,7 +1113,9 @@ static void send_subscribe(proxy_instance_t *proxi, int *sockd) json_t *json_msg; char *msg; - JSON_CPACK(json_msg, "{sssi}", "enonce1", proxi->enonce1, + JSON_CPACK(json_msg, "{sisssi}", + "proxy", proxi->id, + "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); @@ -1140,7 +1142,7 @@ static void send_notify(proxy_instance_t *proxi, int *sockd) for (i = 0; i < ni->merkles; i++) json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); /* Use our own jobid instead of the server's one for easy lookup */ - JSON_CPACK(json_msg, "{si,ss,si,ss,ss,so,ss,ss,ss,sb}", + JSON_CPACK(json_msg, "{si,si,ss,si,ss,ss,so,ss,ss,ss,sb}", "proxy", proxi->id, "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "merklehash", merkle_arr, "bbversion", ni->bbversion, @@ -1546,14 +1548,11 @@ out: return alive; } -static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) +static void kill_proxy(proxy_instance_t *proxi) { notify_instance_t *ni, *tmp; connsock_t *cs; - send_proc(ckp->stratifier, "reconnect"); - send_proc(ckp->connector, "reject"); - if (!proxi) // This shouldn't happen return; @@ -1586,7 +1585,7 @@ static int proxy_loop(proc_instance_t *pi) reconnect: if (proxi) { - kill_proxy(ckp, proxi); + kill_proxy(proxi); reconnecting = true; } proxi = live_proxy(ckp); @@ -1671,7 +1670,7 @@ retry: Close(sockd); goto retry; out: - kill_proxy(ckp, proxi); + kill_proxy(proxi); Close(sockd); return ret; } diff --git a/src/stratifier.c b/src/stratifier.c index 3a80ff6c..dba09bc2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -937,12 +937,20 @@ static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instan DL_DELETE(user->clients, client); } +static void connector_drop_client(ckpool_t *ckp, const int64_t id) +{ + char buf[256]; + + LOGWARNING("Stratifier requesting connector drop client %"PRId64, id); + snprintf(buf, 255, "dropclient=%"PRId64, id); + send_proc(ckp->connector, buf); +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; int disconnects = 0, kills = 0; sdata_t *sdata = ckp->data; - char buf[128]; ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { @@ -954,8 +962,7 @@ static void drop_allclients(ckpool_t *ckp) } else client->dropped = true; kills++; - sprintf(buf, "dropclient=%"PRId64, client_id); - send_proc(ckp->connector, buf); + connector_drop_client(ckp, client_id); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { disconnects++; @@ -970,62 +977,119 @@ static void drop_allclients(ckpool_t *ckp) LOGNOTICE("Dropped %d instances", kills); } +static proxy_t *__generate_proxy(sdata_t *sdata, const int id) +{ + proxy_t *proxy = ckzalloc(sizeof(proxy_t)); + + proxy->id = id; + HASH_ADD_INT(sdata->proxies, id, proxy); + return proxy; +} + +/* Find proxy by id number, generate one if none exist yet by that id */ +static proxy_t *proxy_by_id(sdata_t *sdata, const int id) +{ + bool new_proxy = false; + proxy_t *proxy; + + mutex_lock(&sdata->proxy_lock); + HASH_FIND_INT(sdata->proxies, &id, proxy); + if (unlikely(!proxy)) { + new_proxy = true; + proxy = __generate_proxy(sdata, id); + } + mutex_unlock(&sdata->proxy_lock); + + if (unlikely(new_proxy)) + LOGINFO("Stratifier added new proxy %d", id); + + return proxy; +} + +static void reconnect_clients(sdata_t *sdata, const char *cmd); + static void update_subscribe(ckpool_t *ckp) { sdata_t *sdata = ckp->data; + proxy_t *proxy; json_t *val; + int id = 0; char *buf; buf = send_recv_proc(ckp->generator, "getsubscribe"); if (unlikely(!buf)) { - LOGWARNING("Failed to get subscribe from generator in update_notify"); + LOGWARNING("Failed to get subscribe from generator in update_subscribe"); drop_allclients(ckp); return; } LOGDEBUG("Update subscribe: %s", buf); val = json_loads(buf, 0, NULL); free(buf); + if (unlikely(!val)) { + LOGWARNING("Failed to json decode getsubscribe response in update_subscribe"); + return; + } + + json_get_int(&id, val, "proxy"); + proxy = proxy_by_id(sdata, id); + + mutex_lock(&sdata->proxy_lock); + if (sdata->proxy != proxy) + sdata->proxy = proxy; + mutex_unlock(&sdata->proxy_lock); ck_wlock(&sdata->workbase_lock); - sdata->proxy->subscribed = true; - sdata->proxy->diff = ckp->startdiff; + proxy->subscribed = true; + proxy->diff = ckp->startdiff; /* Length is checked by generator */ - strcpy(sdata->proxy->enonce1, json_string_value(json_object_get(val, "enonce1"))); - sdata->proxy->enonce1constlen = strlen(sdata->proxy->enonce1) / 2; - hex2bin(sdata->proxy->enonce1bin, sdata->proxy->enonce1, sdata->proxy->enonce1constlen); - sdata->proxy->nonce2len = json_integer_value(json_object_get(val, "nonce2len")); + strcpy(proxy->enonce1, json_string_value(json_object_get(val, "enonce1"))); + proxy->enonce1constlen = strlen(proxy->enonce1) / 2; + hex2bin(proxy->enonce1bin, proxy->enonce1, proxy->enonce1constlen); + proxy->nonce2len = json_integer_value(json_object_get(val, "nonce2len")); if (ckp->clientsvspeed) { - if (sdata->proxy->nonce2len > 5) - sdata->proxy->enonce1varlen = 4; - else if (sdata->proxy->nonce2len > 3) - sdata->proxy->enonce1varlen = 2; + if (proxy->nonce2len > 5) + proxy->enonce1varlen = 4; + else if (proxy->nonce2len > 3) + proxy->enonce1varlen = 2; else - sdata->proxy->enonce1varlen = 1; + proxy->enonce1varlen = 1; } else { - if (sdata->proxy->nonce2len > 7) - sdata->proxy->enonce1varlen = 4; - else if (sdata->proxy->nonce2len > 5) - sdata->proxy->enonce1varlen = 2; + if (proxy->nonce2len > 7) + proxy->enonce1varlen = 4; + else if (proxy->nonce2len > 5) + proxy->enonce1varlen = 2; else - sdata->proxy->enonce1varlen = 1; + proxy->enonce1varlen = 1; } - sdata->proxy->enonce2varlen = sdata->proxy->nonce2len - sdata->proxy->enonce1varlen; + proxy->enonce2varlen = proxy->nonce2len - proxy->enonce1varlen; ck_wunlock(&sdata->workbase_lock); LOGNOTICE("Upstream pool extranonce2 length %d, max proxy clients %lld", - sdata->proxy->nonce2len, 1ll << (sdata->proxy->enonce1varlen * 8)); + proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8)); json_decref(val); - drop_allclients(ckp); + reconnect_clients(sdata, ""); } static void update_diff(ckpool_t *ckp); +static proxy_t *current_proxy(sdata_t *sdata) +{ + proxy_t *proxy; + + mutex_lock(&sdata->proxy_lock); + proxy = sdata->proxy; + mutex_unlock(&sdata->proxy_lock); + + return proxy; +} + static void update_notify(ckpool_t *ckp) { bool new_block = false, clean; sdata_t *sdata = ckp->data; char header[228]; + proxy_t *proxy; workbase_t *wb; json_t *val; char *buf; @@ -1037,7 +1101,8 @@ static void update_notify(ckpool_t *ckp) return; } - if (unlikely(!sdata->proxy->subscribed)) { + proxy = current_proxy(sdata); + if (unlikely(!proxy || !proxy->subscribed)) { LOGINFO("No valid proxy subscription to update notify yet"); return; } @@ -1088,12 +1153,12 @@ static void update_notify(ckpool_t *ckp) update_diff(ckp); ck_rlock(&sdata->workbase_lock); - strcpy(wb->enonce1const, sdata->proxy->enonce1); - wb->enonce1constlen = sdata->proxy->enonce1constlen; - memcpy(wb->enonce1constbin, sdata->proxy->enonce1bin, wb->enonce1constlen); - wb->enonce1varlen = sdata->proxy->enonce1varlen; - wb->enonce2varlen = sdata->proxy->enonce2varlen; - wb->diff = sdata->proxy->diff; + strcpy(wb->enonce1const, proxy->enonce1); + wb->enonce1constlen = proxy->enonce1constlen; + memcpy(wb->enonce1constbin, proxy->enonce1bin, wb->enonce1constlen); + wb->enonce1varlen = proxy->enonce1varlen; + wb->enonce2varlen = proxy->enonce2varlen; + wb->diff = proxy->diff; ck_runlock(&sdata->workbase_lock); add_base(ckp, wb, &new_block); @@ -3291,7 +3356,6 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { const char *method; - char buf[256]; /* Random broken clients send something not an integer as the id so we * copy the json item for id_val as is for the response. By far the @@ -3328,6 +3392,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 } if (unlikely(cmdmatch(method, "mining.passthrough"))) { + char buf[256]; + LOGNOTICE("Adding passthrough client %"PRId64, client_id); /* We need to inform the connector process that this client * is a passthrough and to manage its messages accordingly. @@ -3342,8 +3408,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 /* We should only accept subscribed requests from here on */ if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + connector_drop_client(client->ckp, client_id); return; } @@ -3365,8 +3430,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + connector_drop_client(client->ckp, client_id); return; } @@ -3507,6 +3571,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) /* Client may be NULL here */ LOGNOTICE("Stratifier skipped dropped instance %"PRId64" message from server %d", msg->client_id, server); + connector_drop_client(ckp, msg->client_id); free_smsg(msg); goto out; } @@ -4339,7 +4404,6 @@ static void read_poolstats(ckpool_t *ckp) int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; - proxy_t *proxy, *tmpproxy; ckpool_t *ckp = pi->ckp; int ret = 1, threads; int64_t randomiser; @@ -4410,11 +4474,7 @@ int stratifier(proc_instance_t *pi) if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp); else { - /* Generate one proxy for now */ - proxy = ckzalloc(sizeof(proxy_t)); mutex_init(&sdata->proxy_lock); - sdata->proxy = proxy; - HASH_ADD_INT(sdata->proxies, id, proxy); } mutex_init(&sdata->stats_lock); @@ -4428,6 +4488,8 @@ int stratifier(proc_instance_t *pi) ret = stratum_loop(ckp, pi); out: if (ckp->proxy) { + proxy_t *proxy, *tmpproxy; + mutex_lock(&sdata->proxy_lock); HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) { HASH_DEL(sdata->proxies, proxy);