From 685af50a7e8fa3025b3fd50984a9fc21bececd3a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 12 Apr 2016 13:38:54 +1000 Subject: [PATCH] Add identifier for clients in the stratifier to know what node/passthrough they belong to --- src/stratifier.c | 176 +++++++++++++++++++++++++---------------------- 1 file changed, 95 insertions(+), 81 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 03358643..dbb4bf11 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -244,6 +244,9 @@ struct stratum_instance { stratum_instance_t *next; stratum_instance_t *prev; + /* Descriptive of ID number and passthrough if any */ + char identity[128]; + /* Reference count for when this instance is used outside of the * instance_lock */ int ref; @@ -2769,17 +2772,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil DL_DELETE(sdata->remote_instances, client); if (client->workername) { if (user) { - ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", - client->id, client->address, user->throttled ? "throttled " : "", + ASPRINTF(msg, "Dropped client %s %s %suser %s worker %s %s", + client->identity, client->address, user->throttled ? "throttled " : "", user->username, client->workername, lazily ? "lazily" : ""); } else { - ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s", - client->id, client->address, client->workername, + ASPRINTF(msg, "Dropped client %s %s no user worker %s %s", + client->identity, client->address, client->workername, lazily ? "lazily" : ""); } } else { - ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s", - client->id, client->address, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped workerless client %s %s %s", + client->identity, client->address, lazily ? "lazily" : ""); } __del_client(sdata, client); __kill_instance(sdata, client); @@ -2832,8 +2835,8 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) } /* Enter with write instance_lock held */ -static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id, - const char *address, int server) +static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address, + int server) { stratum_instance_t *client; sdata_t *sdata = ckp->sdata; @@ -2858,13 +2861,20 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i stratum_instance_t *passthrough; int64_t pass_id = id >> 32; + id &= 0xffffffffll; passthrough = __instance_by_id(sdata, pass_id); if (passthrough && passthrough->node) { client->latency = passthrough->latency; - LOGINFO("Client %"PRId64" inherited node latency of %d", - id, client->latency); + LOGINFO("Client %s inherited node latency of %d", + client->identity, client->latency); + sprintf(client->identity, "node:%"PRId64" subclient:%"PRId64, + pass_id, id); + } else { + sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64, + pass_id, id); } - } + } else + sprintf(client->identity, "%"PRId64, id); return client; } @@ -2888,7 +2898,7 @@ out_unlock: ck_wunlock(&sdata->instance_lock); if (ret) - LOGNOTICE("Reconnecting old instance %"PRId64" to instance %"PRId64, old_id, id); + LOGINFO("Reconnecting old instance %"PRId64" to instance %"PRId64, old_id, id); return ret; } @@ -4410,8 +4420,8 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_ client->sdata = sdata; if (ckp->proxy) { - LOGINFO("Current %d, selecting proxy %d:%d for client %"PRId64, ckp_sdata->proxy->id, - sdata->subproxy->id, sdata->subproxy->subid, client->id); + LOGINFO("Current %d, selecting proxy %d:%d for client %s", ckp_sdata->proxy->id, + sdata->subproxy->id, sdata->subproxy->subid, client->identity); } if (!old_match) { @@ -4421,11 +4431,11 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_ client->reject = 3; return json_string("proxy full"); } - LOGINFO("Set new subscription %"PRId64" to new enonce1 %lx string %s", client->id, + LOGINFO("Set new subscription %s to new enonce1 %lx string %s", client->identity, client->enonce1_64, client->enonce1); } else { - LOGINFO("Set new subscription %"PRId64" to old matched enonce1 %lx string %s", - client->id, client->enonce1_64, client->enonce1); + LOGINFO("Set new subscription %s to old matched enonce1 %lx string %s", + client->identity, client->enonce1_64, client->enonce1); } /* Workbases will exist if sdata->current_workbase is not NULL */ @@ -5051,11 +5061,11 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ if (now_t < user->failed_authtime + user->auth_backoff) { if (!user->throttled) { user->throttled = true; - LOGNOTICE("Client %"PRId64" %s worker %s rate limited due to failed auth attempts", - client->id, client->address, buf); + LOGNOTICE("Client %s %s worker %s rate limited due to failed auth attempts", + client->identity, client->address, buf); } else{ - LOGINFO("Client %"PRId64" %s worker %s rate limited due to failed auth attempts", - client->id, client->address, buf); + LOGINFO("Client %s %s worker %s rate limited due to failed auth attempts", + client->identity, client->address, buf); } client->dropped = true; goto out; @@ -5083,20 +5093,20 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ client->authorised = ret; user->authorised = ret; if (ckp->proxy) { - LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s", - client->id, client->proxyid, client->subproxyid, buf, user->username); + LOGNOTICE("Authorised client %s to proxy %d:%d, worker %s as user %s", + client->identity, client->proxyid, client->subproxyid, buf, user->username); if (ckp->userproxy) check_global_user(ckp, user, client); } else { - LOGNOTICE("Authorised client %"PRId64" worker %s as user %s", - client->id, buf, user->username); + LOGNOTICE("Authorised client %s worker %s as user %s", + client->identity, buf, user->username); } user->failed_authtime = 0; user->auth_backoff = DEFAULT_AUTH_BACKOFF; /* Reset auth backoff time */ user->throttled = false; } else { - LOGNOTICE("Client %"PRId64" %s worker %s failed to authorise as user %s", - client->id, client->address, buf,user->username); + LOGNOTICE("Client %s %s worker %s failed to authorise as user %s", + client->identity, client->address, buf,user->username); user->failed_authtime = time(NULL); user->auth_backoff <<= 1; /* Cap backoff time to 10 mins */ @@ -5276,8 +5286,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d client->ssdc = 0; - LOGINFO("Client %"PRId64" biased dsps %.2f dsps %.2f drr %.2f adjust diff from %"PRId64" to: %"PRId64" ", - client->id, dsps, client->dsps5, drr, client->diff, optimal); + LOGINFO("Client %s biased dsps %.2f dsps %.2f drr %.2f adjust diff from %"PRId64" to: %"PRId64" ", + client->identity, dsps, client->dsps5, drr, client->diff, optimal); copy_tv(&client->ldc, &now_t); client->diff_change_job_id = next_blockid; @@ -5546,8 +5556,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, worker_instance_t *worker = client->worker_instance; client->best_diff = sdiff; - LOGINFO("User %s worker %s client %"PRId64" new best diff %lf", user->username, - worker->workername, client->id, sdiff); + LOGINFO("User %s worker %s client %s new best diff %lf", user->username, + worker->workername, client->identity, sdiff); check_best_diff(ckp, sdata, user, worker, sdiff, client); } bswap_256(sharehash, hash); @@ -5563,7 +5573,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, ts_to_tv(&now_tv, &now); latency = ms_tvdiff(&now_tv, &wb->retired); if (latency < client->latency) { - LOGDEBUG("Accepting %dms late share from client %"PRId64, latency, client->id); + LOGDEBUG("Accepting %dms late share from client %s", + latency, client->identity); goto no_stale; } } @@ -5594,25 +5605,25 @@ out_unlock: suffix_string(wdiff, wdiffsuffix, 16, 0); if (sdiff >= diff) { if (new_share(sdata, hash, id)) { - LOGINFO("Accepted client %"PRId64" share diff %.1f/%.0f/%s: %s", - client->id, sdiff, diff, wdiffsuffix, hexhash); + LOGINFO("Accepted client %s share diff %.1f/%.0f/%s: %s", + client->identity, sdiff, diff, wdiffsuffix, hexhash); result = true; } else { err = SE_DUPE; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); - LOGINFO("Rejected client %"PRId64" dupe diff %.1f/%.0f/%s: %s", - client->id, sdiff, diff, wdiffsuffix, hexhash); + LOGINFO("Rejected client %s dupe diff %.1f/%.0f/%s: %s", + client->identity, sdiff, diff, wdiffsuffix, hexhash); submit = false; } } else { err = SE_HIGH_DIFF; - LOGINFO("Rejected client %"PRId64" high diff %.1f/%.0f/%s: %s", - client->id, sdiff, diff, wdiffsuffix, hexhash); + LOGINFO("Rejected client %s high diff %.1f/%.0f/%s: %s", + client->identity, sdiff, diff, wdiffsuffix, hexhash); json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); submit = false; } } else - LOGINFO("Rejected client %"PRId64" invalid share %s", client->id, SHARE_ERR(err)); + LOGINFO("Rejected client %s invalid share %s", client->identity, SHARE_ERR(err)); /* Submit share to upstream pool in proxy mode. We submit valid and * stale shares and filter out the rest. */ @@ -5667,19 +5678,19 @@ out: if (client->first_invalid < client->last_share.tv_sec || !client->first_invalid) client->first_invalid = now_t; else if (client->first_invalid && client->first_invalid < now_t - 180 && client->reject < 3) { - LOGNOTICE("Client %"PRId64" rejecting for 180s, disconnecting", client->id); + LOGNOTICE("Client %s rejecting for 180s, disconnecting", client->identity); if (ckp->node) connector_drop_client(ckp, client->id); else stratum_send_message(sdata, client, "Disconnecting for continuous invalid shares"); client->reject = 3; } else if (client->first_invalid && client->first_invalid < now_t - 120 && client->reject < 2) { - LOGNOTICE("Client %"PRId64" rejecting for 120s, reconnecting", client->id); + LOGNOTICE("Client %s rejecting for 120s, reconnecting", client->identity); stratum_send_message(sdata, client, "Reconnecting for continuous invalid shares"); reconnect_client(sdata, client); client->reject = 2; } else if (client->first_invalid && client->first_invalid < now_t - 60 && !client->reject) { - LOGNOTICE("Client %"PRId64" rejecting for 60s, sending update", client->id); + LOGNOTICE("Client %s rejecting for 60s, sending update", client->identity); update_client(client, client->id); client->reject = 1; } @@ -5705,7 +5716,7 @@ out: json_set_string(val, "createinet", ckp->serverurl[client->server]); ckdbq_add(ckp, ID_SHAREERR, val); } - LOGINFO("Invalid share from client %"PRId64": %s", client->id, client->workername); + LOGINFO("Invalid share from client %s: %s", client->identity, client->workername); } free(fname); return json_boolean(result); @@ -5804,13 +5815,13 @@ static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char * int64_t sdiff; if (unlikely(!client_active(client))) { - LOGNOTICE("Attempted to suggest diff on unauthorised client %"PRId64, client->id); + LOGNOTICE("Attempted to suggest diff on unauthorised client %s", client->identity); return; } if (arr_val && json_is_integer(arr_val)) sdiff = json_integer_value(arr_val); else if (sscanf(method, "mining.suggest_difficulty(%"PRId64, &sdiff) != 1) { - LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id); + LOGINFO("Failed to parse suggest_difficulty for client %s", client->identity); return; } /* Clamp suggest diff to global pool mindiff */ @@ -5857,7 +5868,7 @@ static void send_node_all_txns(sdata_t *sdata, const stratum_instance_t *client) msg->json_msg = val; msg->client_id = client->id; ckmsgq_add(sdata->ssends, msg); - LOGNOTICE("Sending new node client %"PRId64" all transactions", client->id); + LOGNOTICE("Sending new node client %s all transactions", client->identity); } static void *setup_node(void *arg) @@ -5867,7 +5878,7 @@ static void *setup_node(void *arg) pthread_detach(pthread_self()); client->latency = round_trip(client->address) / 2; - LOGNOTICE("Node client %"PRId64" %s latency set to %dms", client->id, + LOGNOTICE("Node client %s %s latency set to %dms", client->identity, client->address, client->latency); send_node_all_txns(client->sdata, client); dec_instance_ref(client->sdata, client); @@ -5888,7 +5899,7 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); - LOGWARNING("Added client %"PRId64" %s as mining node on server %d:%s", client->id, + LOGWARNING("Added client %s %s as mining node on server %d:%s", client->identity, client->address, client->server, ckp->serverurl[client->server]); create_pthread(&pth, setup_node, client); @@ -5921,7 +5932,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie } if (cmdmatch(method, "mining.term")) { - LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + LOGDEBUG("Mining terminate requested from %s %s", client->identity, client->address); drop_client(ckp, sdata, client_id); return; } @@ -5930,8 +5941,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie json_t *val, *result_val; if (unlikely(client->subscribed)) { - LOGNOTICE("Client %"PRId64" %s trying to subscribe twice", - client_id, client->address); + LOGNOTICE("Client %s %s trying to subscribe twice", + client->identity, client->address); return; } result_val = parse_subscribe(client, client_id, params_val); @@ -5956,14 +5967,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a trusted remote node in the connector and * drop the client in the stratifier */ if (!ckp->trusted[client->server] || ckp->proxy) { - LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as remote node on non trusted server %d", - client_id, client->address, client->server); + LOGNOTICE("Dropping client %s %s trying to authorise as remote node on non trusted server %d", + client->identity, client->address, client->server); connector_drop_client(ckp, client_id); } else { add_remote_server(sdata, client); snprintf(buf, 255, "remote=%"PRId64, client_id); send_proc(ckp->connector, buf); } + sprintf(client->identity, "remote:%"PRId64, client_id); return; } @@ -5973,14 +5985,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ if (!ckp->nodeserver[client->server] || ckp->proxy) { - LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as node on non node server %d", - client_id, client->address, client->server); + LOGNOTICE("Dropping client %s %s trying to authorise as node on non node server %d", + client->identity, client->address, client->server); connector_drop_client(ckp, client_id); drop_client(ckp, sdata, client_id); } else { snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); add_mining_node(ckp, sdata, client); + sprintf(client->identity, "node:%"PRId64, client_id); } return; } @@ -5989,8 +6002,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie char buf[256]; if (ckp->proxy) { - LOGNOTICE("Dropping client %"PRId64" %s trying to connect as passthrough on proxy server %d", - client_id, client->address, client->server); + LOGNOTICE("Dropping client %s %s trying to connect as passthrough on proxy server %d", + client->identity, client->address, client->server); connector_drop_client(ckp, client_id); drop_client(ckp, sdata, client_id); } else { @@ -5998,10 +6011,11 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie * is a passthrough and to manage its messages accordingly. No * data from this client id should ever come back to this * stratifier after this so drop the client in the stratifier. */ - LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); + LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address); snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); drop_client(ckp, sdata, client_id); + sprintf(client->identity, "passthrough:%"PRId64, client_id); } return; } @@ -6012,8 +6026,8 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie json_params_t *jp; if (unlikely(client->authorised)) { - LOGNOTICE("Client %"PRId64" %s trying to authorise twice", - client_id, client->address); + LOGNOTICE("Client %s %s trying to authorise twice", + client->identity, client->address); return; } jp = create_json_params(client_id, method_val, params_val, id_val); @@ -6024,16 +6038,16 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* We should only accept requests from subscribed and authed users here * on */ if (!client->subscribed) { - LOGINFO("Dropping %s from unsubscribed client %"PRId64" %s", method, - client_id, client->address); + LOGINFO("Dropping %s from unsubscribed client %s %s", method, + client->identity, client->address); connector_drop_client(ckp, client_id); return; } /* We should only accept authorised requests from here on */ if (!client->authorised) { - LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method, - client_id, client->address); + LOGINFO("Dropping %s from unauthorised client %s %s", method, + client->identity, client->address); return; } @@ -6051,7 +6065,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie } /* Unhandled message here */ - LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); + LOGINFO("Unhandled client %s %s method %s", client->identity, client->address, method); return; } @@ -6077,7 +6091,7 @@ static void parse_share_result(ckpool_t *ckp, stratum_instance_t *client, json_t if (client->upstream_invalid < client->last_share.tv_sec || !client->upstream_invalid) client->upstream_invalid = now_t; else if (client->upstream_invalid && client->upstream_invalid < now_t - 150) { - LOGNOTICE("Client %"PRId64" upstream rejects for 150s, disconnecting", client->id); + LOGNOTICE("Client %s upstream rejects for 150s, disconnecting", client->identity); connector_drop_client(ckp, client->id); client->reject = 3; } @@ -6087,7 +6101,7 @@ static void parse_diff(stratum_instance_t *client, json_t *val) { double diff = json_number_value(json_array_get(val, 0)); - LOGINFO("Set client %"PRId64" to diff %lf", client->id, diff); + LOGINFO("Set client %s to diff %lf", client->identity, diff); client->diff = diff; } @@ -6099,19 +6113,19 @@ static void parse_subscribe_result(stratum_instance_t *client, json_t *val) len = strlen(client->enonce1) / 2; hex2bin(client->enonce1bin, client->enonce1, len); memcpy(&client->enonce1_64, client->enonce1bin, 8); - LOGINFO("Client %"PRId64" got enonce1 %lx string %s", client->id, client->enonce1_64, client->enonce1); + LOGINFO("Client %s got enonce1 %lx string %s", client->identity, client->enonce1_64, client->enonce1); } static void parse_authorise_result(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, json_t *val) { if (!json_is_true(val)) { - LOGNOTICE("Client %"PRId64" was not authorised upstream, dropping", client->id); + LOGNOTICE("Client %s was not authorised upstream, dropping", client->identity); client->authorised = false; connector_drop_client(ckp, client->id); drop_client(ckp, sdata, client->id); } else - LOGINFO("Client %"PRId64" was authorised upstream", client->id); + LOGINFO("Client %s was authorised upstream", client->identity); } static int node_msg_type(json_t *val) @@ -6386,10 +6400,10 @@ static void node_client_msg(ckpool_t *ckp, json_t *val, stratum_instance_t *clie if (msg_type < 0) { buf = json_dumps(val, 0); - LOGERR("Missing client %"PRId64" node method from %s", client->id, buf); + LOGERR("Missing client %s node method from %s", client->identity, buf); goto out; } - LOGDEBUG("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]); + LOGDEBUG("Got client %s node method %d:%s", client->identity, msg_type, stratum_msgs[msg_type]); id_val = json_object_get(val, "id"); method = json_object_get(val, "method"); params = json_object_get(val, "params"); @@ -6419,8 +6433,8 @@ static void node_client_msg(ckpool_t *ckp, json_t *val, stratum_instance_t *clie break; case SM_NONE: buf = json_dumps(val, 0); - LOGNOTICE("Unrecognised method from client %"PRId64" :%s", - client->id, buf); + LOGNOTICE("Unrecognised method from client %s :%s", + client->identity, buf); break; default: break; @@ -6464,8 +6478,8 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat int delays = 0; if (client->reject == 3) { - LOGINFO("Dropping client %"PRId64" %s tagged for lazy invalidation", - client_id, client->address); + LOGINFO("Dropping client %s %s tagged for lazy invalidation", + client->identity, client->address); connector_drop_client(ckp, client_id); return; } @@ -6482,10 +6496,10 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat const char *result = json_string_value(res_val); if (!safecmp(result, "pong")) - LOGDEBUG("Received pong from client %"PRId64, client_id); + LOGDEBUG("Received pong from client %s", client->identity); else - LOGDEBUG("Received spurious response %s from client %"PRId64, - result ? result : "", client_id); + LOGDEBUG("Received spurious response %s from client %s", + result ? result : "", client->identity); return; } send_json_err(sdata, client_id, id_val, "-3:method not found"); @@ -6574,7 +6588,7 @@ static void srecv_process(ckpool_t *ckp, json_t *val) goto out; } if (unlikely(noid)) - LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); + LOGINFO("Stratifier added instance %s server %d", client->identity, server); if (client->remote) parse_trusted_msg(ckp, sdata, msg->json_msg, client); @@ -6642,7 +6656,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) goto out; } if (unlikely(!client->authorised)) { - LOGDEBUG("Client %"PRId64" no longer authorised to submit shares", client_id); + LOGDEBUG("Client %s no longer authorised to submit shares", client->identity); goto out_decref; } json_msg = json_object();