Browse Source

Upstream workerstats from remote trusted servers.

master
Con Kolivas 8 years ago
parent
commit
e9f7074483
  1. 2
      src/ckpool.h
  2. 66
      src/stratifier.c

2
src/ckpool.h

@ -291,6 +291,7 @@ enum stratum_msgtype {
SM_PONG, SM_PONG,
SM_TRANSACTIONS, SM_TRANSACTIONS,
SM_SHAREERR, SM_SHAREERR,
SM_WORKERSTATS,
SM_NONE SM_NONE
}; };
@ -315,6 +316,7 @@ static const char __maybe_unused *stratum_msgs[] = {
"pong", "pong",
"transactions", "transactions",
"shareerr", "shareerr",
"workerstats",
"" ""
}; };

66
src/stratifier.c

@ -891,16 +891,31 @@ static void send_postponed(sdata_t *sdata)
static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id, static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id,
const int msg_type); const int msg_type);
/* Send a json msg to an upstream trusted remote server */
static void upstream_json(ckpool_t *ckp, const json_t *val)
{
char *msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL);
/* Connector absorbs and frees msg */
connector_upstream_msg(ckp, msg);
}
/* Upstream a json msgtype, absorbing the json in the process */
static void upstream_json_msgtype(ckpool_t *ckp, json_t *val, const int msg_type)
{
json_set_string(val, "method", stratum_msgs[msg_type]);
upstream_json(ckp, val);
json_decref(val);
}
/* Upstream a json msgtype, duplicating the json */
static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type) static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type)
{ {
json_t *json_msg = json_deep_copy(val); json_t *json_msg = json_deep_copy(val);
char *msg;
json_set_string(json_msg, "method", stratum_msgs[msg_type]); json_set_string(json_msg, "method", stratum_msgs[msg_type]);
msg = json_dumps(json_msg, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); upstream_json(ckp, json_msg);
json_decref(json_msg); json_decref(json_msg);
/* Connector absorbs and frees msg */
connector_upstream_msg(ckp, msg);
} }
static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb) static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb)
@ -3443,10 +3458,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
json_set_string(val, "createcode", __func__); json_set_string(val, "createcode", __func__);
json_get_int(&height, val, "height"); json_get_int(&height, val, "height");
json_get_double(&diff, val, "diff"); json_get_double(&diff, val, "diff");
if (ckp->remote) { if (ckp->remote)
upstream_msgtype(ckp, val, SM_BLOCK); upstream_json_msgtype(ckp, val, SM_BLOCK);
json_decref(val); else
} else
ckdbq_add(ckp, ID_BLOCK, val); ckdbq_add(ckp, ID_BLOCK, val);
free(found); free(found);
} }
@ -5627,8 +5641,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc
if (ckp->remote) { if (ckp->remote) {
json_set_string(val, "name", ckp->name); json_set_string(val, "name", ckp->name);
upstream_msgtype(ckp, val, SM_BLOCK); upstream_json_msgtype(ckp, val, SM_BLOCK);
json_decref(val);
} else } else
ckdbq_add(ckp, ID_BLOCK, val); ckdbq_add(ckp, ID_BLOCK, val);
} }
@ -5950,10 +5963,9 @@ out_unlock:
} else } else
LOGERR("Failed to fopen %s", fname); LOGERR("Failed to fopen %s", fname);
} }
if (ckp->remote) { if (ckp->remote)
upstream_msgtype(ckp, val, SM_SHARE); upstream_json_msgtype(ckp, val, SM_SHARE);
json_decref(val); else
} else
ckdbq_add(ckp, ID_SHARES, val); ckdbq_add(ckp, ID_SHARES, val);
out: out:
if (!sdata->wbincomplete && ((!result && !submit) || !share)) { if (!sdata->wbincomplete && ((!result && !submit) || !share)) {
@ -5998,10 +6010,9 @@ out:
json_set_string(val, "createby", "code"); json_set_string(val, "createby", "code");
json_set_string(val, "createcode", __func__); json_set_string(val, "createcode", __func__);
json_set_string(val, "createinet", ckp->serverurl[client->server]); json_set_string(val, "createinet", ckp->serverurl[client->server]);
if (ckp->remote) { if (ckp->remote)
upstream_msgtype(ckp, val, SM_SHAREERR); upstream_json_msgtype(ckp, val, SM_SHAREERR);
json_decref(val); else
} else
ckdbq_add(ckp, ID_SHAREERR, val); ckdbq_add(ckp, ID_SHAREERR, val);
} }
LOGINFO("Invalid share from client %s: %s", client->identity, client->workername); LOGINFO("Invalid share from client %s: %s", client->identity, client->workername);
@ -6638,6 +6649,18 @@ out:
} }
} }
/* Remap the remote client id to the local one and submit to ckdb */
static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t remote_id)
{
int64_t client_id;
json_get_int64(&client_id, val, "clientid");
/* Encode remote server client_id into remote client's id */
client_id = (remote_id << 32) | (client_id & 0xffffffffll);
json_set_int64(val, "clientid", client_id);
ckdbq_add(ckp, ID_WORKERSTATS, val);
}
#define parse_remote_workinfo(ckp, val) add_node_base(ckp, val, true) #define parse_remote_workinfo(ckp, val) add_node_base(ckp, val, true)
static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote, static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote,
@ -6819,6 +6842,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
parse_remote_share(ckp, sdata, val, buf); parse_remote_share(ckp, sdata, val, buf);
else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS]))
add_node_txns(ckp, sdata, val); add_node_txns(ckp, sdata, val);
else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS]))
parse_remote_workerstats(ckp, val, client->id);
else if (!safecmp(method, stratum_msgs[SM_WORKINFO])) else if (!safecmp(method, stratum_msgs[SM_WORKINFO]))
parse_remote_workinfo(ckp, val); parse_remote_workinfo(ckp, val);
else if (!safecmp(method, stratum_msgs[SM_AUTH])) else if (!safecmp(method, stratum_msgs[SM_AUTH]))
@ -7512,7 +7537,10 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
/* Add all entries outside of the instance lock */ /* Add all entries outside of the instance lock */
DL_FOREACH_SAFE(json_list, entry, tmpentry) { DL_FOREACH_SAFE(json_list, entry, tmpentry) {
ckdbq_add(ckp, ID_WORKERSTATS, entry->val); if (ckp->remote)
upstream_json_msgtype(ckp, entry->val, SM_WORKERSTATS);
else
ckdbq_add(ckp, ID_WORKERSTATS, entry->val);
DL_DELETE(json_list, entry); DL_DELETE(json_list, entry);
free(entry); free(entry);
} }

Loading…
Cancel
Save