diff --git a/src/ckpool.c b/src/ckpool.c index cdbfbe39..7f2f0ff5 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1024,7 +1024,7 @@ out: return ret; } -static bool json_get_bool(bool *store, const json_t *val, const char *res) +bool json_get_bool(bool *store, const json_t *val, const char *res) { json_t *entry = json_object_get(val, res); bool ret = false; diff --git a/src/ckpool.h b/src/ckpool.h index b2f59333..f2d8bb3b 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -229,5 +229,6 @@ bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); bool json_get_int(int *store, const json_t *val, const char *res); bool json_get_double(double *store, const json_t *val, const char *res); +bool json_get_bool(bool *store, const json_t *val, const char *res); #endif /* CKPOOL_H */ diff --git a/src/generator.c b/src/generator.c index 75e55dff..53908353 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1116,29 +1116,57 @@ out: return ret; } -static void send_subscribe(proxy_instance_t *proxi, int *sockd) +static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) { + proxy_instance_t *proxi; + + mutex_lock(&gdata->lock); + HASH_FIND_INT(gdata->proxies, &id, proxi); + mutex_unlock(&gdata->lock); + + return proxi; +} + +static void send_subscribe(gdata_t *gdata, proxy_instance_t *cproxy, int *sockd, const char *buf) +{ + proxy_instance_t *proxi; json_t *json_msg; + int id = 0; char *msg; - JSON_CPACK(json_msg, "{sisssi}", + sscanf(buf, "getsubscribe=%d", &id); + proxi = proxy_by_id(gdata, id); + if (unlikely(!proxi || !proxi->nonce2len)) { + ASPRINTF(&msg, "notready"); + goto out_send; + } + JSON_CPACK(json_msg, "{sisssisb}", "proxy", proxi->id, "enonce1", proxi->enonce1, - "nonce2len", proxi->nonce2len); + "nonce2len", proxi->nonce2len, + "reconnect", proxi == cproxy ? true : false); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); +out_send: send_unix_msg(*sockd, msg); free(msg); _Close(sockd); } -static void send_notify(proxy_instance_t *proxi, int *sockd) +static void send_notify(gdata_t *gdata, int *sockd, const char *buf) { json_t *json_msg, *merkle_arr; + proxy_instance_t *proxi; notify_instance_t *ni; + int i, id = 0; char *msg; - int i; + sscanf(buf, "getnotify=%d", &id); + proxi = proxy_by_id(gdata, id); + if (unlikely(!proxi)) { + ASPRINTF(&msg, "notready"); + goto out_send; + } merkle_arr = json_array(); mutex_lock(&proxi->notify_lock); @@ -1618,7 +1646,9 @@ reconnect: LOGWARNING("Successfully connected to %s:%s as proxy", cs->url, cs->port); /* Sending subscribe implies stratifier will also do a notify */ - send_proc(ckp->stratifier, "subscribe"); + dealloc(buf); + ASPRINTF(&buf, "subscribe=%d", proxi->id); + send_proc(ckp->stratifier, buf); proxi->notified = false; } } @@ -1656,9 +1686,9 @@ retry: ret = 0; goto out; } else if (cmdmatch(buf, "getsubscribe")) { - send_subscribe(proxi, &sockd); + send_subscribe(gdata, proxi, &sockd, buf); } else if (cmdmatch(buf, "getnotify")) { - send_notify(proxi, &sockd); + send_notify(gdata, &sockd, buf); } else if (cmdmatch(buf, "getdiff")) { send_diff(proxi, &sockd); } else if (cmdmatch(buf, "reconnect")) { diff --git a/src/stratifier.c b/src/stratifier.c index bd194998..199895b8 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1006,24 +1006,32 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id) return proxy; } -static void update_notify(ckpool_t *ckp); static void reconnect_clients(sdata_t *sdata, const char *cmd); +static void _update_notify(ckpool_t *ckp, const int id); -static void update_subscribe(ckpool_t *ckp) +static void update_subscribe(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->data; + bool reconnect = true; + char *buf, *msg; proxy_t *proxy; json_t *val; int id = 0; - char *buf; - buf = send_recv_proc(ckp->generator, "getsubscribe"); + sscanf(cmd, "subscribe=%d", &id); + ASPRINTF(&msg, "getsubscribe=%d", id); + buf = send_recv_proc(ckp->generator, msg); + dealloc(msg); if (unlikely(!buf)) { LOGWARNING("Failed to get subscribe from generator in update_subscribe"); drop_allclients(ckp); return; } LOGDEBUG("Update subscribe: %s", buf); + if (unlikely(!safecmp(buf, "notready"))) { + LOGNOTICE("Generator not ready to send subscribe for proxy %d", id); + return; + } val = json_loads(buf, 0, NULL); free(buf); if (unlikely(!val)) { @@ -1031,8 +1039,8 @@ static void update_subscribe(ckpool_t *ckp) return; } - json_get_int(&id, val, "proxy"); proxy = proxy_by_id(sdata, id); + json_get_bool(&reconnect, val, "reconnect"); mutex_lock(&sdata->proxy_lock); if (sdata->proxy != proxy) @@ -1070,12 +1078,13 @@ static void update_subscribe(ckpool_t *ckp) json_decref(val); /* Notify implied required now too */ - update_notify(ckp); - reconnect_clients(sdata, ""); + _update_notify(ckp, id); + if (reconnect) + reconnect_clients(sdata, ""); } static void update_diff(ckpool_t *ckp); - +#if 0 static proxy_t *current_proxy(sdata_t *sdata) { proxy_t *proxy; @@ -1086,19 +1095,28 @@ static proxy_t *current_proxy(sdata_t *sdata) return proxy; } +#endif -static void update_notify(ckpool_t *ckp) +static void _update_notify(ckpool_t *ckp, const int id) { bool new_block = false, clean; sdata_t *sdata = ckp->data; char header[228]; + char *buf, *msg; proxy_t *proxy; workbase_t *wb; json_t *val; - char *buf; int i; - buf = send_recv_proc(ckp->generator, "getnotify"); + proxy = proxy_by_id(sdata, id); + if (unlikely(!proxy->subscribed)) { + LOGINFO("No valid proxy subscription to update notify yet"); + return; + } + + ASPRINTF(&msg, "getnotify=%d", id); + buf = send_recv_proc(ckp->generator, msg); + dealloc(msg); if (unlikely(!buf)) { LOGWARNING("Failed to get notify from generator in update_notify"); return; @@ -1108,12 +1126,6 @@ static void update_notify(ckpool_t *ckp) return; } - proxy = current_proxy(sdata); - if (unlikely(!proxy || !proxy->subscribed)) { - LOGINFO("No valid proxy subscription to update notify yet"); - return; - } - LOGDEBUG("Update notify: %s", buf); wb = ckzalloc(sizeof(workbase_t)); val = json_loads(buf, 0, NULL); @@ -1173,6 +1185,14 @@ static void update_notify(ckpool_t *ckp) stratum_broadcast_update(sdata, new_block | clean); } +static void update_notify(ckpool_t *ckp, const char *cmd) +{ + int id = 0; + + sscanf(cmd, "notify=%d", &id); + _update_notify(ckp, id); +} + static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client); static void update_diff(ckpool_t *ckp) @@ -1866,10 +1886,10 @@ retry: update_base(ckp, GEN_PRIORITY); } else if (cmdmatch(buf, "subscribe")) { /* Proxifier has a new subscription */ - update_subscribe(ckp); + update_subscribe(ckp, buf); } else if (cmdmatch(buf, "notify")) { /* Proxifier has a new notify ready */ - update_notify(ckp); + update_notify(ckp, buf); } else if (cmdmatch(buf, "diff")) { update_diff(ckp); } else if (cmdmatch(buf, "dropclient")) {