Browse Source

Add identifier for clients in the stratifier to know what node/passthrough they belong to

master
Con Kolivas 9 years ago
parent
commit
685af50a7e
  1. 176
      src/stratifier.c

176
src/stratifier.c

@ -244,6 +244,9 @@ struct stratum_instance {
stratum_instance_t *next;
stratum_instance_t *prev;
/* Descriptive of ID number and passthrough if any */
char identity[128];
/* Reference count for when this instance is used outside of the
* instance_lock */
int ref;
@ -2769,17 +2772,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
DL_DELETE(sdata->remote_instances, client);
if (client->workername) {
if (user) {
ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s",
client->id, client->address, user->throttled ? "throttled " : "",
ASPRINTF(msg, "Dropped client %s %s %suser %s worker %s %s",
client->identity, client->address, user->throttled ? "throttled " : "",
user->username, client->workername, lazily ? "lazily" : "");
} else {
ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s",
client->id, client->address, client->workername,
ASPRINTF(msg, "Dropped client %s %s no user worker %s %s",
client->identity, client->address, client->workername,
lazily ? "lazily" : "");
}
} else {
ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s",
client->id, client->address, lazily ? "lazily" : "");
ASPRINTF(msg, "Dropped workerless client %s %s %s",
client->identity, client->address, lazily ? "lazily" : "");
}
__del_client(sdata, client);
__kill_instance(sdata, client);
@ -2832,8 +2835,8 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata)
}
/* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id,
const char *address, int server)
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address,
int server)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->sdata;
@ -2858,13 +2861,20 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i
stratum_instance_t *passthrough;
int64_t pass_id = id >> 32;
id &= 0xffffffffll;
passthrough = __instance_by_id(sdata, pass_id);
if (passthrough && passthrough->node) {
client->latency = passthrough->latency;
LOGINFO("Client %"PRId64" inherited node latency of %d",
id, client->latency);
LOGINFO("Client %s inherited node latency of %d",
client->identity, client->latency);
sprintf(client->identity, "node:%"PRId64" subclient:%"PRId64,
pass_id, id);
} else {
sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64,
pass_id, id);
}
}
} else
sprintf(client->identity, "%"PRId64, id);
return client;
}
@ -2888,7 +2898,7 @@ out_unlock:
ck_wunlock(&sdata->instance_lock);
if (ret)
LOGNOTICE("Reconnecting old instance %"PRId64" to instance %"PRId64, old_id, id);
LOGINFO("Reconnecting old instance %"PRId64" to instance %"PRId64, old_id, id);
return ret;
}
@ -4410,8 +4420,8 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
client->sdata = sdata;
if (ckp->proxy) {
LOGINFO("Current %d, selecting proxy %d:%d for client %"PRId64, ckp_sdata->proxy->id,
sdata->subproxy->id, sdata->subproxy->subid, client->id);
LOGINFO("Current %d, selecting proxy %d:%d for client %s", ckp_sdata->proxy->id,
sdata->subproxy->id, sdata->subproxy->subid, client->identity);
}
if (!old_match) {
@ -4421,11 +4431,11 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
client->reject = 3;
return json_string("proxy full");
}
LOGINFO("Set new subscription %"PRId64" to new enonce1 %lx string %s", client->id,
LOGINFO("Set new subscription %s to new enonce1 %lx string %s", client->identity,
client->enonce1_64, client->enonce1);
} else {
LOGINFO("Set new subscription %"PRId64" to old matched enonce1 %lx string %s",
client->id, client->enonce1_64, client->enonce1);
LOGINFO("Set new subscription %s to old matched enonce1 %lx string %s",
client->identity, client->enonce1_64, client->enonce1);
}
/* Workbases will exist if sdata->current_workbase is not NULL */
@ -5051,11 +5061,11 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
if (now_t < user->failed_authtime + user->auth_backoff) {
if (!user->throttled) {
user->throttled = true;
LOGNOTICE("Client %"PRId64" %s worker %s rate limited due to failed auth attempts",
client->id, client->address, buf);
LOGNOTICE("Client %s %s worker %s rate limited due to failed auth attempts",
client->identity, client->address, buf);
} else{
LOGINFO("Client %"PRId64" %s worker %s rate limited due to failed auth attempts",
client->id, client->address, buf);
LOGINFO("Client %s %s worker %s rate limited due to failed auth attempts",
client->identity, client->address, buf);
}
client->dropped = true;
goto out;
@ -5083,20 +5093,20 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
client->authorised = ret;
user->authorised = ret;
if (ckp->proxy) {
LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s",
client->id, client->proxyid, client->subproxyid, buf, user->username);
LOGNOTICE("Authorised client %s to proxy %d:%d, worker %s as user %s",
client->identity, client->proxyid, client->subproxyid, buf, user->username);
if (ckp->userproxy)
check_global_user(ckp, user, client);
} else {
LOGNOTICE("Authorised client %"PRId64" worker %s as user %s",
client->id, buf, user->username);
LOGNOTICE("Authorised client %s worker %s as user %s",
client->identity, buf, user->username);
}
user->failed_authtime = 0;
user->auth_backoff = DEFAULT_AUTH_BACKOFF; /* Reset auth backoff time */
user->throttled = false;
} else {
LOGNOTICE("Client %"PRId64" %s worker %s failed to authorise as user %s",
client->id, client->address, buf,user->username);
LOGNOTICE("Client %s %s worker %s failed to authorise as user %s",
client->identity, client->address, buf,user->username);
user->failed_authtime = time(NULL);
user->auth_backoff <<= 1;
/* Cap backoff time to 10 mins */
@ -5276,8 +5286,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d
client->ssdc = 0;
LOGINFO("Client %"PRId64" biased dsps %.2f dsps %.2f drr %.2f adjust diff from %"PRId64" to: %"PRId64" ",
client->id, dsps, client->dsps5, drr, client->diff, optimal);
LOGINFO("Client %s biased dsps %.2f dsps %.2f drr %.2f adjust diff from %"PRId64" to: %"PRId64" ",
client->identity, dsps, client->dsps5, drr, client->diff, optimal);
copy_tv(&client->ldc, &now_t);
client->diff_change_job_id = next_blockid;
@ -5546,8 +5556,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
worker_instance_t *worker = client->worker_instance;
client->best_diff = sdiff;
LOGINFO("User %s worker %s client %"PRId64" new best diff %lf", user->username,
worker->workername, client->id, sdiff);
LOGINFO("User %s worker %s client %s new best diff %lf", user->username,
worker->workername, client->identity, sdiff);
check_best_diff(ckp, sdata, user, worker, sdiff, client);
}
bswap_256(sharehash, hash);
@ -5563,7 +5573,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
ts_to_tv(&now_tv, &now);
latency = ms_tvdiff(&now_tv, &wb->retired);
if (latency < client->latency) {
LOGDEBUG("Accepting %dms late share from client %"PRId64, latency, client->id);
LOGDEBUG("Accepting %dms late share from client %s",
latency, client->identity);
goto no_stale;
}
}
@ -5594,25 +5605,25 @@ out_unlock:
suffix_string(wdiff, wdiffsuffix, 16, 0);
if (sdiff >= diff) {
if (new_share(sdata, hash, id)) {
LOGINFO("Accepted client %"PRId64" share diff %.1f/%.0f/%s: %s",
client->id, sdiff, diff, wdiffsuffix, hexhash);
LOGINFO("Accepted client %s share diff %.1f/%.0f/%s: %s",
client->identity, sdiff, diff, wdiffsuffix, hexhash);
result = true;
} else {
err = SE_DUPE;
json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
LOGINFO("Rejected client %"PRId64" dupe diff %.1f/%.0f/%s: %s",
client->id, sdiff, diff, wdiffsuffix, hexhash);
LOGINFO("Rejected client %s dupe diff %.1f/%.0f/%s: %s",
client->identity, sdiff, diff, wdiffsuffix, hexhash);
submit = false;
}
} else {
err = SE_HIGH_DIFF;
LOGINFO("Rejected client %"PRId64" high diff %.1f/%.0f/%s: %s",
client->id, sdiff, diff, wdiffsuffix, hexhash);
LOGINFO("Rejected client %s high diff %.1f/%.0f/%s: %s",
client->identity, sdiff, diff, wdiffsuffix, hexhash);
json_set_string(json_msg, "reject-reason", SHARE_ERR(err));
submit = false;
}
} else
LOGINFO("Rejected client %"PRId64" invalid share %s", client->id, SHARE_ERR(err));
LOGINFO("Rejected client %s invalid share %s", client->identity, SHARE_ERR(err));
/* Submit share to upstream pool in proxy mode. We submit valid and
* stale shares and filter out the rest. */
@ -5667,19 +5678,19 @@ out:
if (client->first_invalid < client->last_share.tv_sec || !client->first_invalid)
client->first_invalid = now_t;
else if (client->first_invalid && client->first_invalid < now_t - 180 && client->reject < 3) {
LOGNOTICE("Client %"PRId64" rejecting for 180s, disconnecting", client->id);
LOGNOTICE("Client %s rejecting for 180s, disconnecting", client->identity);
if (ckp->node)
connector_drop_client(ckp, client->id);
else
stratum_send_message(sdata, client, "Disconnecting for continuous invalid shares");
client->reject = 3;
} else if (client->first_invalid && client->first_invalid < now_t - 120 && client->reject < 2) {
LOGNOTICE("Client %"PRId64" rejecting for 120s, reconnecting", client->id);
LOGNOTICE("Client %s rejecting for 120s, reconnecting", client->identity);
stratum_send_message(sdata, client, "Reconnecting for continuous invalid shares");
reconnect_client(sdata, client);
client->reject = 2;
} else if (client->first_invalid && client->first_invalid < now_t - 60 && !client->reject) {
LOGNOTICE("Client %"PRId64" rejecting for 60s, sending update", client->id);
LOGNOTICE("Client %s rejecting for 60s, sending update", client->identity);
update_client(client, client->id);
client->reject = 1;
}
@ -5705,7 +5716,7 @@ out:
json_set_string(val, "createinet", ckp->serverurl[client->server]);
ckdbq_add(ckp, ID_SHAREERR, val);
}
LOGINFO("Invalid share from client %"PRId64": %s", client->id, client->workername);
LOGINFO("Invalid share from client %s: %s", client->identity, client->workername);
}
free(fname);
return json_boolean(result);
@ -5804,13 +5815,13 @@ static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char *
int64_t sdiff;
if (unlikely(!client_active(client))) {
LOGNOTICE("Attempted to suggest diff on unauthorised client %"PRId64, client->id);
LOGNOTICE("Attempted to suggest diff on unauthorised client %s", client->identity);
return;
}
if (arr_val && json_is_integer(arr_val))
sdiff = json_integer_value(arr_val);
else if (sscanf(method, "mining.suggest_difficulty(%"PRId64, &sdiff) != 1) {
LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id);
LOGINFO("Failed to parse suggest_difficulty for client %s", client->identity);
return;
}
/* Clamp suggest diff to global pool mindiff */
@ -5857,7 +5868,7 @@ static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client)
msg->json_msg = val;
msg->client_id = client->id;
ckmsgq_add(sdata->ssends, msg);
LOGNOTICE("Sending new node client %"PRId64" all transactions", client->id);
LOGNOTICE("Sending new node client %s all transactions", client->identity);
}
static void *setup_node(void *arg)
@ -5867,7 +5878,7 @@ static void *setup_node(void *arg)
pthread_detach(pthread_self());
client->latency = round_trip(client->address) / 2;
LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id,
LOGNOTICE("Node client %s %s latency set to %dms", client->identity,
client->address, client->latency);
send_node_all_txns(client->sdata, client);
dec_instance_ref(client->sdata, client);
@ -5888,7 +5899,7 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock);
LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id,
LOGWARNING("Added client %s %s as mining node on server %d:%s", client->identity,
client->address, client->server, ckp->serverurl[client->server]);
create_pthread(&pth, setup_node, client);
@ -5921,7 +5932,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
}
if (cmdmatch(method, "mining.term")) {
LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address);
LOGDEBUG("Mining terminate requested from %s %s", client->identity, client->address);
drop_client(ckp, sdata, client_id);
return;
}
@ -5930,8 +5941,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
json_t *val, *result_val;
if (unlikely(client->subscribed)) {
LOGNOTICE("Client %"PRId64" %s trying to subscribe twice",
client_id, client->address);
LOGNOTICE("Client %s %s trying to subscribe twice",
client->identity, client->address);
return;
}
result_val = parse_subscribe(client, client_id, params_val);
@ -5956,14 +5967,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a trusted remote node in the connector and
* drop the client in the stratifier */
if (!ckp->trusted[client->server] || ckp->proxy) {
LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as remote node on non trusted server %d",
client_id, client->address, client->server);
LOGNOTICE("Dropping client %s %s trying to authorise as remote node on non trusted server %d",
client->identity, client->address, client->server);
connector_drop_client(ckp, client_id);
} else {
add_remote_server(sdata, client);
snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf);
}
sprintf(client->identity, "remote:%"PRId64, client_id);
return;
}
@ -5973,14 +5985,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */
if (!ckp->nodeserver[client->server] || ckp->proxy) {
LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as node on non node server %d",
client_id, client->address, client->server);
LOGNOTICE("Dropping client %s %s trying to authorise as node on non node server %d",
client->identity, client->address, client->server);
connector_drop_client(ckp, client_id);
drop_client(ckp, sdata, client_id);
} else {
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
add_mining_node(ckp, sdata, client);
sprintf(client->identity, "node:%"PRId64, client_id);
}
return;
}
@ -5989,8 +6002,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
char buf[256];
if (ckp->proxy) {
LOGNOTICE("Dropping client %"PRId64" %s trying to connect as passthrough on proxy server %d",
client_id, client->address, client->server);
LOGNOTICE("Dropping client %s %s trying to connect as passthrough on proxy server %d",
client->identity, client->address, client->server);
connector_drop_client(ckp, client_id);
drop_client(ckp, sdata, client_id);
} else {
@ -5998,10 +6011,11 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
* is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address);
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
drop_client(ckp, sdata, client_id);
sprintf(client->identity, "passthrough:%"PRId64, client_id);
}
return;
}
@ -6012,8 +6026,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
json_params_t *jp;
if (unlikely(client->authorised)) {
LOGNOTICE("Client %"PRId64" %s trying to authorise twice",
client_id, client->address);
LOGNOTICE("Client %s %s trying to authorise twice",
client->identity, client->address);
return;
}
jp = create_json_params(client_id, method_val, params_val, id_val);
@ -6024,16 +6038,16 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* We should only accept requests from subscribed and authed users here
* on */
if (!client->subscribed) {
LOGINFO("Dropping %s from unsubscribed client %"PRId64" %s", method,
client_id, client->address);
LOGINFO("Dropping %s from unsubscribed client %s %s", method,
client->identity, client->address);
connector_drop_client(ckp, client_id);
return;
}
/* We should only accept authorised requests from here on */
if (!client->authorised) {
LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method,
client_id, client->address);
LOGINFO("Dropping %s from unauthorised client %s %s", method,
client->identity, client->address);
return;
}
@ -6051,7 +6065,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
}
/* Unhandled message here */
LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method);
LOGINFO("Unhandled client %s %s method %s", client->identity, client->address, method);
return;
}
@ -6077,7 +6091,7 @@ static void parse_share_result(ckpool_t *ckp, stratum_instance_t *client, json_t
if (client->upstream_invalid < client->last_share.tv_sec || !client->upstream_invalid)
client->upstream_invalid = now_t;
else if (client->upstream_invalid && client->upstream_invalid < now_t - 150) {
LOGNOTICE("Client %"PRId64" upstream rejects for 150s, disconnecting", client->id);
LOGNOTICE("Client %s upstream rejects for 150s, disconnecting", client->identity);
connector_drop_client(ckp, client->id);
client->reject = 3;
}
@ -6087,7 +6101,7 @@ static void parse_diff(stratum_instance_t *client, json_t *val)
{
double diff = json_number_value(json_array_get(val, 0));
LOGINFO("Set client %"PRId64" to diff %lf", client->id, diff);
LOGINFO("Set client %s to diff %lf", client->identity, diff);
client->diff = diff;
}
@ -6099,19 +6113,19 @@ static void parse_subscribe_result(stratum_instance_t *client, json_t *val)
len = strlen(client->enonce1) / 2;
hex2bin(client->enonce1bin, client->enonce1, len);
memcpy(&client->enonce1_64, client->enonce1bin, 8);
LOGINFO("Client %"PRId64" got enonce1 %lx string %s", client->id, client->enonce1_64, client->enonce1);
LOGINFO("Client %s got enonce1 %lx string %s", client->identity, client->enonce1_64, client->enonce1);
}
static void parse_authorise_result(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
json_t *val)
{
if (!json_is_true(val)) {
LOGNOTICE("Client %"PRId64" was not authorised upstream, dropping", client->id);
LOGNOTICE("Client %s was not authorised upstream, dropping", client->identity);
client->authorised = false;
connector_drop_client(ckp, client->id);
drop_client(ckp, sdata, client->id);
} else
LOGINFO("Client %"PRId64" was authorised upstream", client->id);
LOGINFO("Client %s was authorised upstream", client->identity);
}
static int node_msg_type(json_t *val)
@ -6386,10 +6400,10 @@ static void node_client_msg(ckpool_t *ckp, json_t *val, stratum_instance_t *clie
if (msg_type < 0) {
buf = json_dumps(val, 0);
LOGERR("Missing client %"PRId64" node method from %s", client->id, buf);
LOGERR("Missing client %s node method from %s", client->identity, buf);
goto out;
}
LOGDEBUG("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]);
LOGDEBUG("Got client %s node method %d:%s", client->identity, msg_type, stratum_msgs[msg_type]);
id_val = json_object_get(val, "id");
method = json_object_get(val, "method");
params = json_object_get(val, "params");
@ -6419,8 +6433,8 @@ static void node_client_msg(ckpool_t *ckp, json_t *val, stratum_instance_t *clie
break;
case SM_NONE:
buf = json_dumps(val, 0);
LOGNOTICE("Unrecognised method from client %"PRId64" :%s",
client->id, buf);
LOGNOTICE("Unrecognised method from client %s :%s",
client->identity, buf);
break;
default:
break;
@ -6464,8 +6478,8 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
int delays = 0;
if (client->reject == 3) {
LOGINFO("Dropping client %"PRId64" %s tagged for lazy invalidation",
client_id, client->address);
LOGINFO("Dropping client %s %s tagged for lazy invalidation",
client->identity, client->address);
connector_drop_client(ckp, client_id);
return;
}
@ -6482,10 +6496,10 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
const char *result = json_string_value(res_val);
if (!safecmp(result, "pong"))
LOGDEBUG("Received pong from client %"PRId64, client_id);
LOGDEBUG("Received pong from client %s", client->identity);
else
LOGDEBUG("Received spurious response %s from client %"PRId64,
result ? result : "", client_id);
LOGDEBUG("Received spurious response %s from client %s",
result ? result : "", client->identity);
return;
}
send_json_err(sdata, client_id, id_val, "-3:method not found");
@ -6574,7 +6588,7 @@ static void srecv_process(ckpool_t *ckp, json_t *val)
goto out;
}
if (unlikely(noid))
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
LOGINFO("Stratifier added instance %s server %d", client->identity, server);
if (client->remote)
parse_trusted_msg(ckp, sdata, msg->json_msg, client);
@ -6642,7 +6656,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp)
goto out;
}
if (unlikely(!client->authorised)) {
LOGDEBUG("Client %"PRId64" no longer authorised to submit shares", client_id);
LOGDEBUG("Client %s no longer authorised to submit shares", client->identity);
goto out_decref;
}
json_msg = json_object();

Loading…
Cancel
Save