Browse Source

Add mechanism for parsing trusted remote messages and counting shares

master
Con Kolivas 9 years ago
parent
commit
c56a91cbce
  1. 5
      src/connector.c
  2. 62
      src/stratifier.c

5
src/connector.c

@ -1059,9 +1059,10 @@ static void usend_process(ckpool_t *ckp, char *buf)
if (cs->fd > 0) {
LOGWARNING("Upstream pool failed, attempting reconnect while caching messages");
Close(cs->fd);
sleep(5);
connect_upstream(cs);
}
do
sleep(5);
while (!connect_upstream(cs));
}
out:
free(buf);

62
src/stratifier.c

@ -299,6 +299,8 @@ struct stratum_instance {
proxy_t *proxy; /* Proxy this is bound to in proxy mode */
int proxyid; /* Which proxy id */
int subproxyid; /* Which subproxy */
bool remote; /* Is this a trusted remote server */
};
struct share {
@ -4434,7 +4436,7 @@ static double time_bias(const double tdiff, const double period)
}
/* Needs to be entered with client holding a ref count. */
static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const int diff, const bool valid,
static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double diff, const bool valid,
const bool submit)
{
sdata_t *ckp_sdata = ckp->data, *sdata = client->sdata;
@ -5254,7 +5256,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf);
}
drop_client(ckp, sdata, client_id);
client->remote = true;
return;
}
@ -5426,6 +5428,58 @@ out:
return ret;
}
static void parse_remote_shares(sdata_t *sdata, json_t *val, const char *buf)
{
json_t *workername_val = json_object_get(val, "workername");
worker_instance_t *worker;
const char *workername;
user_instance_t *user;
int64_t diff;
tv_t now_t;
workername = json_string_value(workername_val);
if (unlikely(!workername_val || !workername)) {
LOGWARNING("Failed to get workername from remote message %s", buf);
return;
}
if (unlikely(!json_get_int64(&diff, val, "diff") || diff < 1)) {
LOGWARNING("Unable to parse valid diff from remote message %s", buf);
return;
}
user = user_by_workername(sdata, workername);
worker = get_worker(sdata, user, workername);
mutex_lock(&sdata->stats_lock);
sdata->stats.unaccounted_shares++;
sdata->stats.unaccounted_diff_shares += diff;
mutex_unlock(&sdata->stats_lock);
worker->shares += diff;
user->shares += diff;
tv_time(&now_t);
decay_worker(worker, diff, &now_t);
copy_tv(&worker->last_share, &now_t);
worker->idle = false;
LOGDEBUG("Added %"PRId64" shares to worker %s", diff, workername);
}
static void parse_trusted_msg(sdata_t *sdata, json_t *val, const char *buf)
{
json_t *method_val = json_object_get(val, "method");
const char *method;
LOGDEBUG("Got remote message %s", buf);
method = json_string_value(method_val);
if (unlikely(!method_val || !method)) {
LOGWARNING("Failed to get method from remote message %s", buf);
return;
}
if (likely(!safecmp(method, "shares")))
parse_remote_shares(sdata, val, buf);
}
/* Entered with client holding ref count */
static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client)
{
@ -5620,7 +5674,9 @@ static void srecv_process(ckpool_t *ckp, char *buf)
if (unlikely(noid))
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
if (ckp->node)
if (client->remote)
parse_trusted_msg(sdata, msg->json_msg, buf);
else if (ckp->node)
node_client_msg(ckp, msg->json_msg, buf, client);
else
parse_instance_msg(ckp, sdata, msg, client);

Loading…
Cancel
Save