From 5bff2819da694dc30721dee52e6c00c5ac3a2eba Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 09:05:40 +1100 Subject: [PATCH] Send notify from each proxy as soon as we receive it --- src/generator.c | 85 +++++++++++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/src/generator.c b/src/generator.c index e2c58fb8..4a7bd6d4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -100,7 +100,6 @@ struct proxy_instance { bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ - bool notified; /* Received new template for work */ bool reconnect; /* We need to drop and reconnect */ bool alive; @@ -977,6 +976,41 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) free(buf); } +static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) +{ + json_t *json_msg, *merkle_arr; + notify_instance_t *ni; + char *msg, *buf; + int i; + + merkle_arr = json_array(); + + mutex_lock(&proxi->notify_lock); + ni = proxi->current_notify; + if (unlikely(!ni)) { + mutex_unlock(&proxi->notify_lock); + LOGNOTICE("Proxi %d not ready to send notify", proxi->id); + return; + } + for (i = 0; i < ni->merkles; i++) + json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); + /* Use our own jobid instead of the server's one for easy lookup */ + JSON_CPACK(json_msg, "{si,si,ss,si,ss,ss,so,ss,ss,ss,sb}", "proxy", proxi->id, + "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, + "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, + "merklehash", merkle_arr, "bbversion", ni->bbversion, + "nbit", ni->nbit, "ntime", ni->ntime, + "clean", ni->clean); + mutex_unlock(&proxi->notify_lock); + + msg = json_dumps(json_msg, JSON_NO_UTF8); + json_decref(json_msg); + ASPRINTF(&buf, "notify=%s", msg); + free(msg); + send_proc(ckp->stratifier, buf); + free(buf); +} + static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg) { json_t *val = NULL, *method, *err_val, *params; @@ -1023,10 +1057,9 @@ static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg } if (cmdmatch(buf, "mining.notify")) { - if (parse_notify(proxi, params)) - proxi->notified = ret = true; - else - proxi->notified = ret = false; + ret = parse_notify(proxi, params); + if (ret) + send_notify(ckp, proxi); goto out; } @@ -1166,41 +1199,6 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) free(buf); } -static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) -{ - json_t *json_msg, *merkle_arr; - notify_instance_t *ni; - char *msg, *buf; - int i; - - merkle_arr = json_array(); - - mutex_lock(&proxi->notify_lock); - ni = proxi->current_notify; - if (unlikely(!ni)) { - mutex_unlock(&proxi->notify_lock); - LOGNOTICE("Proxi %d not ready to send notify", proxi->id); - return; - } - for (i = 0; i < ni->merkles; i++) - json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); - /* Use our own jobid instead of the server's one for easy lookup */ - JSON_CPACK(json_msg, "{si,si,ss,si,ss,ss,so,ss,ss,ss,sb}", "proxy", proxi->id, - "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, - "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, - "merklehash", merkle_arr, "bbversion", ni->bbversion, - "nbit", ni->nbit, "ntime", ni->ntime, - "clean", ni->clean); - mutex_unlock(&proxi->notify_lock); - - msg = json_dumps(json_msg, JSON_NO_UTF8); - json_decref(json_msg); - ASPRINTF(&buf, "notify=%s", msg); - free(msg); - send_proc(ckp->stratifier, buf); - free(buf); -} - static void submit_share(proxy_instance_t *proxi, json_t *val) { stratum_msg_t *msg; @@ -1412,7 +1410,6 @@ out: } else { keep_sockalive(cs->fd); send_subscribe(ckp, proxi); - proxi->notified = false; } return ret; } @@ -1471,6 +1468,7 @@ static void *passthrough_recv(void *arg) static proxy_instance_t *best_proxy(ckpool_t *ckp, gdata_t *gdata); +#if 0 static proxy_instance_t *current_proxy(gdata_t *gdata) { proxy_instance_t *ret; @@ -1481,6 +1479,7 @@ static proxy_instance_t *current_proxy(gdata_t *gdata) return ret; } +#endif /* For receiving messages from the upstream proxy, also responsible for setting * up the connection and testing it's alive. */ @@ -1566,10 +1565,6 @@ static void *proxy_recv(void *arg) continue; } if (parse_method(ckp, proxi, cs->buf)) { - if (proxi->notified && proxi == current_proxy(gdata)) { - send_notify(ckp, proxi); - proxi->notified = false; - } if (proxi->reconnect) { /* Call this proxy dead to allow us to fail * over to a backup pool until the reconnect