diff --git a/src/connector.c b/src/connector.c index fd7f1543..67d5ce9f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1059,9 +1059,10 @@ static void usend_process(ckpool_t *ckp, char *buf) if (cs->fd > 0) { LOGWARNING("Upstream pool failed, attempting reconnect while caching messages"); Close(cs->fd); - sleep(5); - connect_upstream(cs); } + do + sleep(5); + while (!connect_upstream(cs)); } out: free(buf); diff --git a/src/stratifier.c b/src/stratifier.c index 2e10c723..c4bc8c68 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -299,6 +299,8 @@ struct stratum_instance { proxy_t *proxy; /* Proxy this is bound to in proxy mode */ int proxyid; /* Which proxy id */ int subproxyid; /* Which subproxy */ + + bool remote; /* Is this a trusted remote server */ }; struct share { @@ -4434,7 +4436,7 @@ static double time_bias(const double tdiff, const double period) } /* Needs to be entered with client holding a ref count. */ -static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const int diff, const bool valid, +static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double diff, const bool valid, const bool submit) { sdata_t *ckp_sdata = ckp->data, *sdata = client->sdata; @@ -5254,7 +5256,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie snprintf(buf, 255, "remote=%"PRId64, client_id); send_proc(ckp->connector, buf); } - drop_client(ckp, sdata, client_id); + client->remote = true; return; } @@ -5426,6 +5428,58 @@ out: return ret; } +static void parse_remote_shares(sdata_t *sdata, json_t *val, const char *buf) +{ + json_t *workername_val = json_object_get(val, "workername"); + worker_instance_t *worker; + const char *workername; + user_instance_t *user; + int64_t diff; + tv_t now_t; + + workername = json_string_value(workername_val); + if (unlikely(!workername_val || !workername)) { + LOGWARNING("Failed to get workername from remote message %s", buf); + return; + } + if (unlikely(!json_get_int64(&diff, val, "diff") || diff < 1)) { + LOGWARNING("Unable to parse valid diff from remote message %s", buf); + return; + } + user = user_by_workername(sdata, workername); + worker = get_worker(sdata, user, workername); + + mutex_lock(&sdata->stats_lock); + sdata->stats.unaccounted_shares++; + sdata->stats.unaccounted_diff_shares += diff; + mutex_unlock(&sdata->stats_lock); + + worker->shares += diff; + user->shares += diff; + tv_time(&now_t); + + decay_worker(worker, diff, &now_t); + copy_tv(&worker->last_share, &now_t); + worker->idle = false; + + LOGDEBUG("Added %"PRId64" shares to worker %s", diff, workername); +} + +static void parse_trusted_msg(sdata_t *sdata, json_t *val, const char *buf) +{ + json_t *method_val = json_object_get(val, "method"); + const char *method; + + LOGDEBUG("Got remote message %s", buf); + method = json_string_value(method_val); + if (unlikely(!method_val || !method)) { + LOGWARNING("Failed to get method from remote message %s", buf); + return; + } + if (likely(!safecmp(method, "shares"))) + parse_remote_shares(sdata, val, buf); +} + /* Entered with client holding ref count */ static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client) { @@ -5620,7 +5674,9 @@ static void srecv_process(ckpool_t *ckp, char *buf) if (unlikely(noid)) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); - if (ckp->node) + if (client->remote) + parse_trusted_msg(sdata, msg->json_msg, buf); + else if (ckp->node) node_client_msg(ckp, msg->json_msg, buf, client); else parse_instance_msg(ckp, sdata, msg, client);