From 137f0b784228f8ae60395bd0c2de79e79ab2811c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 31 Jan 2017 19:46:11 +1100 Subject: [PATCH] Use a virtualid to reference subclients in remote servers to be able to speak to the upstream pool, allowing them to work with passthroughs. --- src/connector.c | 20 ++++++++-- src/connector.h | 1 + src/stratifier.c | 101 ++++++++++++++++++++++++++++++++++------------- 3 files changed, 92 insertions(+), 30 deletions(-) diff --git a/src/connector.c b/src/connector.c index 17683f8b..b7973d5a 100644 --- a/src/connector.c +++ b/src/connector.c @@ -137,7 +137,7 @@ struct connector_data { int clients_generated; int dead_generated; - int64_t client_id; + int64_t client_ids; /* client message process queue */ ckmsgq_t *cmpq; @@ -243,6 +243,20 @@ static void recycle_client(cdata_t *cdata, client_instance_t *client) ck_wunlock(&cdata->lock); } +/* Allows the stratifier to get a unique local virtualid for subclients */ +int64_t connector_newclientid(ckpool_t *ckp) +{ + int64_t ret; + + cdata_t *cdata = ckp->cdata; + + ck_wlock(&cdata->lock); + ret = cdata->client_ids++; + ck_wunlock(&cdata->lock); + + return ret; +} + /* Accepts incoming connections on the server socket and generates client * instances */ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) @@ -310,7 +324,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds, fd, no_clients, client->address_name, port); ck_wlock(&cdata->lock); - client->id = cdata->client_id++; + client->id = cdata->client_ids++; HASH_ADD_I64(cdata->clients, id, client); cdata->nfds++; ck_wunlock(&cdata->lock); @@ -1654,7 +1668,7 @@ void *connector(void *arg) cdata->nfds = 0; /* Set the client id to the highest serverurl count to distinguish * them from the server fds in epoll. */ - cdata->client_id = ckp->serverurls; + cdata->client_ids = ckp->serverurls; mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); diff --git a/src/connector.h b/src/connector.h index 4bb79bd0..be945ef3 100644 --- a/src/connector.h +++ b/src/connector.h @@ -10,6 +10,7 @@ #ifndef CONNECTOR_H #define CONNECTOR_H +int64_t connector_newclientid(ckpool_t *ckp); void connector_upstream_msg(ckpool_t *ckp, char *msg); void connector_add_message(ckpool_t *ckp, json_t *val); char *connector_stats(void *data, const int runtime); diff --git a/src/stratifier.c b/src/stratifier.c index 36a7bcac..93669bef 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -253,6 +253,9 @@ struct stratum_instance { UT_hash_handle hh; int64_t id; + /* Virtualid used as unique local id for passthrough clients */ + int64_t virtualid; + stratum_instance_t *next; stratum_instance_t *prev; @@ -3357,7 +3360,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) return client; } -/* Enter with write instance_lock held */ +/* Enter with write instance_lock held, drops and grabs it again */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address, int server) { @@ -3366,6 +3369,8 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con int64_t pass_id; client = __recruit_stratum_instance(sdata); + ck_wunlock(&sdata->instance_lock); + client->start_time = time(NULL); client->id = id; client->session_id = ++sdata->session_id; @@ -3377,7 +3382,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; tv_time(&client->ldc); - HASH_ADD_I64(sdata->stratum_instances, id, client); /* Points to ckp sdata in ckpool mode, but is changed later in proxy * mode . */ client->sdata = sdata; @@ -3398,8 +3402,14 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64, pass_id, id); } - } else + client->virtualid = connector_newclientid(ckp); + } else { sprintf(client->identity, "%"PRId64, id); + client->virtualid = id; + } + + ck_wlock(&sdata->instance_lock); + HASH_ADD_I64(sdata->stratum_instances, id, client); return client; } @@ -4453,8 +4463,6 @@ static void get_uptime(sdata_t *sdata, int *sockd) send_api_response(val, *sockd); } -static void srecv_process(ckpool_t *ckp, json_t *val); - /* For emergency use only, flushes all pending ckdbq messages */ static void ckdbq_flush(sdata_t *sdata) { @@ -5862,23 +5870,28 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc send_node_block(sdata, client->enonce1, nonce, nonce2, ntime32, wb->id, diff, client->id); - JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}", - "height", wb->height, - "blockhash", blockhash, - "confirmed", "n", - "workinfoid", wb->id, - "username", client->user_instance->username, - "workername", client->workername, - "clientid", client->id, - "enonce1", client->enonce1, - "nonce2", nonce2, - "nonce", nonce, - "reward", wb->coinbasevalue, - "diff", diff, - "createdate", cdfield, - "createby", "code", - "createcode", __func__, - "createinet", ckp->serverurl[client->server]); + val = json_object(); + // JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}", + json_set_int(val, "height", wb->height); + json_set_string(val,"blockhash", blockhash); + json_set_string(val,"confirmed", "n"); + json_set_int64(val, "workinfoid", wb->id); + json_set_string(val, "username", client->user_instance->username); + json_set_string(val, "workername", client->workername); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); + json_set_string(val, "enonce1", client->enonce1); + json_set_string(val, "nonce2", nonce2); + json_set_string(val, "nonce", nonce); + json_set_int64(val, "reward", wb->coinbasevalue); + json_set_double(val, "diff", diff); + json_set_string(val, "createdate", cdfield); + json_set_string(val, "createby", "code"); + json_set_string(val, "createcode", __func__); + json_set_string(val, "createinet", ckp->serverurl[client->server]); + val_copy = json_deep_copy(val); if (ckp->remote) { @@ -6180,7 +6193,10 @@ out_nowb: /* Now write to the pool's sharelog. */ val = json_object(); json_set_int(val, "workinfoid", id); - json_set_int(val, "clientid", client->id); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); json_set_string(val, "enonce1", client->enonce1); if (!CKP_STANDALONE(ckp)) json_set_string(val, "secondaryuserid", user->secondaryuserid); @@ -6250,7 +6266,10 @@ out: if (!share) { if (!CKP_STANDALONE(ckp) || ckp->remote) { val = json_object(); - json_set_int(val, "clientid", client->id); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); if (user->secondaryuserid) json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "enonce1", client->enonce1); @@ -6564,13 +6583,13 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie if (unlikely(cmdmatch(method, "mining.passthrough"))) { char buf[256]; - if (ckp->proxy || ckp->node || ckp->remote) { + if (ckp->proxy || ckp->node ) { LOGNOTICE("Dropping client %s %s trying to connect as passthrough on unsupported server %d", client->identity, client->address, client->server); connector_drop_client(ckp, client_id); drop_client(ckp, sdata, client_id); } else { - /* We need is a passthrough and to manage its messages + /*Flag this as a passthrough and manage its messages * accordingly. No data from this client id should ever * come directly back to this stratifier. */ LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address); @@ -6861,6 +6880,31 @@ static void send_auth_failure(sdata_t *sdata, stratum_instance_t *client) stratum_send_message(sdata, client, "Failed authorisation :("); } +/* For finding a client by its virtualid instead of client->id. This is an + * inefficient lookup but only occurs once on parsing a remote auth from the + * upstream pool on passthrough subclients. */ +static stratum_instance_t *ref_instance_by_virtualid(sdata_t *sdata, int64_t *client_id) +{ + stratum_instance_t *client, *ret = NULL; + + ck_wlock(&sdata->instance_lock); + for (client = sdata->stratum_instances; client; client = client->hh.next) { + if (likely(client->virtualid != *client_id)) + continue; + if (likely(!client->dropped)) { + ret = client; + __inc_instance_ref(ret); + /* Replace the client_id with the correct one, allowing + * us to send the response to the correct client */ + *client_id = client->id; + } + break; + } + ck_wunlock(&sdata->instance_lock); + + return ret; +} + void parse_upstream_auth(ckpool_t *ckp, json_t *val) { json_t *id_val = NULL, *err_val = NULL; @@ -6878,6 +6922,9 @@ void parse_upstream_auth(ckpool_t *ckp, json_t *val) goto out; err_val = json_object_get(val, "error"); client = ref_instance_by_id(sdata, client_id); + /* Is this client_id a virtualid from a passthrough subclient */ + if (!client) + client = ref_instance_by_virtualid(sdata, &client_id); if (!client) { LOGINFO("Failed to find client id %"PRId64" in parse_upstream_auth", client_id); @@ -7545,7 +7592,7 @@ static void upstream_auth(ckpool_t *ckp, stratum_instance_t *client, json_params json_set_string(val, "useragent", client->useragent ? : ""); json_set_string(val, "enonce1", client->enonce1 ? : ""); json_set_string(val, "address", client->address); - json_set_int(val, "clientid", client->id); + json_set_int64(val, "clientid", client->virtualid); msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); json_decref(val); connector_upstream_msg(ckp, msg);