diff --git a/src/ckpool.h b/src/ckpool.h index 13788226..59c0c9c7 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -291,6 +291,7 @@ enum stratum_msgtype { SM_PONG, SM_TRANSACTIONS, SM_SHAREERR, + SM_WORKERSTATS, SM_NONE }; @@ -315,6 +316,7 @@ static const char __maybe_unused *stratum_msgs[] = { "pong", "transactions", "shareerr", + "workerstats", "" }; diff --git a/src/stratifier.c b/src/stratifier.c index 56a38202..9f891ccf 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -891,16 +891,31 @@ static void send_postponed(sdata_t *sdata) static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id, const int msg_type); +/* Send a json msg to an upstream trusted remote server */ +static void upstream_json(ckpool_t *ckp, const json_t *val) +{ + char *msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); + + /* Connector absorbs and frees msg */ + connector_upstream_msg(ckp, msg); +} + +/* Upstream a json msgtype, absorbing the json in the process */ +static void upstream_json_msgtype(ckpool_t *ckp, json_t *val, const int msg_type) +{ + json_set_string(val, "method", stratum_msgs[msg_type]); + upstream_json(ckp, val); + json_decref(val); +} + +/* Upstream a json msgtype, duplicating the json */ static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type) { json_t *json_msg = json_deep_copy(val); - char *msg; json_set_string(json_msg, "method", stratum_msgs[msg_type]); - msg = json_dumps(json_msg, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); + upstream_json(ckp, json_msg); json_decref(json_msg); - /* Connector absorbs and frees msg */ - connector_upstream_msg(ckp, msg); } static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb) @@ -3443,10 +3458,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) json_set_string(val, "createcode", __func__); json_get_int(&height, val, "height"); json_get_double(&diff, val, "diff"); - if (ckp->remote) { - upstream_msgtype(ckp, val, SM_BLOCK); - json_decref(val); - } else + if (ckp->remote) + upstream_json_msgtype(ckp, val, SM_BLOCK); + else ckdbq_add(ckp, ID_BLOCK, val); free(found); } @@ -5627,8 +5641,7 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc if (ckp->remote) { json_set_string(val, "name", ckp->name); - upstream_msgtype(ckp, val, SM_BLOCK); - json_decref(val); + upstream_json_msgtype(ckp, val, SM_BLOCK); } else ckdbq_add(ckp, ID_BLOCK, val); } @@ -5950,10 +5963,9 @@ out_unlock: } else LOGERR("Failed to fopen %s", fname); } - if (ckp->remote) { - upstream_msgtype(ckp, val, SM_SHARE); - json_decref(val); - } else + if (ckp->remote) + upstream_json_msgtype(ckp, val, SM_SHARE); + else ckdbq_add(ckp, ID_SHARES, val); out: if (!sdata->wbincomplete && ((!result && !submit) || !share)) { @@ -5998,10 +6010,9 @@ out: json_set_string(val, "createby", "code"); json_set_string(val, "createcode", __func__); json_set_string(val, "createinet", ckp->serverurl[client->server]); - if (ckp->remote) { - upstream_msgtype(ckp, val, SM_SHAREERR); - json_decref(val); - } else + if (ckp->remote) + upstream_json_msgtype(ckp, val, SM_SHAREERR); + else ckdbq_add(ckp, ID_SHAREERR, val); } LOGINFO("Invalid share from client %s: %s", client->identity, client->workername); @@ -6638,6 +6649,18 @@ out: } } +/* Remap the remote client id to the local one and submit to ckdb */ +static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t remote_id) +{ + int64_t client_id; + + json_get_int64(&client_id, val, "clientid"); + /* Encode remote server client_id into remote client's id */ + client_id = (remote_id << 32) | (client_id & 0xffffffffll); + json_set_int64(val, "clientid", client_id); + ckdbq_add(ckp, ID_WORKERSTATS, val); +} + #define parse_remote_workinfo(ckp, val) add_node_base(ckp, val, true) static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote, @@ -6819,6 +6842,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu parse_remote_share(ckp, sdata, val, buf); else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) add_node_txns(ckp, sdata, val); + else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS])) + parse_remote_workerstats(ckp, val, client->id); else if (!safecmp(method, stratum_msgs[SM_WORKINFO])) parse_remote_workinfo(ckp, val); else if (!safecmp(method, stratum_msgs[SM_AUTH])) @@ -7512,7 +7537,10 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) /* Add all entries outside of the instance lock */ DL_FOREACH_SAFE(json_list, entry, tmpentry) { - ckdbq_add(ckp, ID_WORKERSTATS, entry->val); + if (ckp->remote) + upstream_json_msgtype(ckp, entry->val, SM_WORKERSTATS); + else + ckdbq_add(ckp, ID_WORKERSTATS, entry->val); DL_DELETE(json_list, entry); free(entry); }