diff --git a/src/generator.c b/src/generator.c index 4c39f8ef..9de90674 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1120,6 +1120,7 @@ out: return ret; } +#if 0 static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) { proxy_instance_t *proxi; @@ -1130,20 +1131,17 @@ static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) return proxi; } - -static proxy_instance_t *current_proxy(gdata_t *gdata); +#endif static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) { - gdata_t *gdata = ckp->data; json_t *json_msg; char *msg, *buf; - JSON_CPACK(json_msg, "{sisssisb}", + JSON_CPACK(json_msg, "{sisssi}", "proxy", proxi->id, "enonce1", proxi->enonce1, - "nonce2len", proxi->nonce2len, - "reconnect", proxi == current_proxy(gdata) ? true : false); + "nonce2len", proxi->nonce2len); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); @@ -1152,28 +1150,21 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) free(buf); } -static void send_notify(gdata_t *gdata, int *sockd, const char *buf) +static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) { json_t *json_msg, *merkle_arr; - proxy_instance_t *proxi; notify_instance_t *ni; - int i, id = 0; - char *msg; + char *msg, *buf; + int i; - sscanf(buf, "getnotify=%d", &id); - proxi = proxy_by_id(gdata, id); - if (unlikely(!proxi)) { - ASPRINTF(&msg, "notready"); - goto out_send; - } merkle_arr = json_array(); mutex_lock(&proxi->notify_lock); ni = proxi->current_notify; if (unlikely(!ni)) { mutex_unlock(&proxi->notify_lock); - ASPRINTF(&msg, "notready"); - goto out_send; + LOGNOTICE("Proxi %d not ready to send notify", proxi->id); + return; } for (i = 0; i < ni->merkles; i++) json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); @@ -1188,10 +1179,10 @@ static void send_notify(gdata_t *gdata, int *sockd, const char *buf) msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); -out_send: - send_unix_msg(*sockd, msg); + ASPRINTF(&buf, "notify=%s", msg); free(msg); - _Close(sockd); + send_proc(ckp->stratifier, buf); + free(buf); } static void send_diff(proxy_instance_t *proxi, int *sockd) @@ -1511,7 +1502,6 @@ static void *proxy_recv(void *arg) notify_instance_t *ni, *tmp; share_msg_t *share, *tmpshare; int retries = 0, ret; - char buf[128]; time_t now; while (!proxy_alive(ckp, si, proxi, cs, true)) { @@ -1574,8 +1564,7 @@ static void *proxy_recv(void *arg) } if (parse_method(proxi, cs->buf)) { if (proxi->notified && proxi == current_proxy(gdata)) { - snprintf(buf, 127, "notify=%d", proxi->id); - send_proc(ckp->stratifier, buf); + send_notify(ckp, proxi); proxi->notified = false; } if (proxi->diffed) { @@ -1689,6 +1678,9 @@ reconnect: dealloc(buf); ASPRINTF(&buf, "proxy=%d", proxi->id); send_proc(ckp->stratifier, buf); + /* Send a notify for the new chosen proxy or the + * stratifier won't be able to switch. */ + send_notify(ckp, proxi); } } retry: @@ -1726,8 +1718,6 @@ retry: if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; - } else if (cmdmatch(buf, "getnotify")) { - send_notify(gdata, &sockd, buf); } else if (cmdmatch(buf, "getdiff")) { send_diff(proxi, &sockd); } else if (cmdmatch(buf, "reconnect")) { diff --git a/src/stratifier.c b/src/stratifier.c index 06f92b77..1e165940 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -297,6 +297,7 @@ struct proxy_base { int enonce2varlen; bool subscribed; + bool notified; }; typedef struct proxy_base proxy_t; @@ -1027,8 +1028,6 @@ static void reconnect_clients(sdata_t *sdata) ck_runlock(&sdata->instance_lock); } -static void _update_notify(ckpool_t *ckp, const int id); - static proxy_t *current_proxy(sdata_t *sdata) { proxy_t *proxy; @@ -1043,7 +1042,6 @@ static proxy_t *current_proxy(sdata_t *sdata) static void update_subscribe(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->data; - bool reconnect = true; const char *buf; proxy_t *proxy; json_t *val; @@ -1055,10 +1053,6 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) } buf = cmd + 10; LOGDEBUG("Update subscribe: %s", buf); - if (unlikely(!safecmp(buf, "notready"))) { - LOGNOTICE("Generator not ready to send subscribe for proxy %d", id); - return; - } val = json_loads(buf, 0, NULL); if (unlikely(!val)) { LOGWARNING("Failed to json decode subscribe response in update_subscribe"); @@ -1069,7 +1063,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) LOGNOTICE("Got updated subscribe for proxy %d", id); proxy = proxy_by_id(sdata, id); - json_get_bool(&reconnect, val, "reconnect"); + proxy->notified = false; /* Reset this */ ck_wlock(&sdata->workbase_lock); proxy->subscribed = true; @@ -1101,52 +1095,54 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8)); json_decref(val); - /* Notify implied required now too */ - _update_notify(ckp, id); - if (reconnect) - reconnect_clients(sdata); } static void update_diff(ckpool_t *ckp); -static void _update_notify(ckpool_t *ckp, const int id) +static void update_notify(ckpool_t *ckp, const char *cmd) { bool new_block = false, clean; sdata_t *sdata = ckp->data; char header[228]; - char *buf, *msg; + const char *buf; proxy_t *proxy; workbase_t *wb; + int i, id = 0; json_t *val; - int i; + if (unlikely(strlen(cmd) < 8)) { + LOGWARNING("Zero length string passed to update_notify"); + return; + } + buf = cmd + 7; + LOGDEBUG("Update notify: %s", buf); + + val = json_loads(buf, 0, NULL); + if (unlikely(!val)) { + LOGWARNING("Failed to json decode in update_notify"); + return; + } + json_get_int(&id, val, "proxy"); proxy = proxy_by_id(sdata, id); if (unlikely(!proxy->subscribed)) { - LOGINFO("No valid proxy subscription to update notify yet"); - return; + LOGNOTICE("No valid proxy %d subscription to update notify yet", id); + goto out; } + LOGNOTICE("Got updated notify for proxy %d", id); if (proxy != current_proxy(sdata)) { LOGINFO("Notify from backup proxy"); - return; + goto out; } - ASPRINTF(&msg, "getnotify=%d", id); - buf = send_recv_proc(ckp->generator, msg); - dealloc(msg); - if (unlikely(!buf)) { - LOGWARNING("Failed to get notify from generator in update_notify"); - return; - } - LOGDEBUG("Update notify: %s", buf); - if (unlikely(!safecmp(buf, "notready"))) { - LOGNOTICE("Generator not ready to send notify to stratifier"); - return; + if (!proxy->notified) { + /* This is the first notification from the current proxy, tell + * clients now to reconnect since we have enough information to + * switch. */ + proxy->notified = true; + reconnect_clients(sdata); } - LOGINFO("Got updated notify for proxy %d", id); wb = ckzalloc(sizeof(workbase_t)); - val = json_loads(buf, 0, NULL); - dealloc(buf); wb->ckp = ckp; wb->proxy = true; @@ -1173,7 +1169,6 @@ static void _update_notify(ckpool_t *ckp, const int id) json_strcpy(wb->ntime, val, "ntime"); sscanf(wb->ntime, "%x", &wb->ntime32); clean = json_is_true(json_object_get(val, "clean")); - json_decref(val); ts_realtime(&wb->gentime); snprintf(header, 225, "%s%s%s%s%s%s%s", wb->bbversion, wb->prevhash, @@ -1200,14 +1195,8 @@ static void _update_notify(ckpool_t *ckp, const int id) add_base(ckp, wb, &new_block); stratum_broadcast_update(sdata, new_block | clean); -} - -static void update_notify(ckpool_t *ckp, const char *cmd) -{ - int id = 0; - - sscanf(cmd, "notify=%d", &id); - _update_notify(ckp, id); +out: + json_decref(val); } static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client); @@ -1822,8 +1811,9 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) return buf; } -/* Sets the currently active proxy */ -static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) +/* Sets the currently active proxy. Clients will be told to reconnect once the + * first notify data comes from this proxy. */ +static void set_proxy(sdata_t *sdata, const char *buf) { proxy_t *proxy; int id = 0; @@ -1834,9 +1824,6 @@ static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) proxy = __proxy_by_id(sdata, id); sdata->proxy = proxy; mutex_unlock(&sdata->proxy_lock); - - _update_notify(ckp, id); - reconnect_clients(sdata); } static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) @@ -1946,7 +1933,7 @@ retry: } else if (cmdmatch(buf, "reconnect")) { request_reconnect(sdata, buf); } else if (cmdmatch(buf, "proxy")) { - set_proxy(ckp, sdata, buf); + set_proxy(sdata, buf); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else