Browse Source

Use a virtualid to reference subclients in remote servers to be able to speak to the upstream pool, allowing them to work with passthroughs.

master
Con Kolivas 8 years ago
parent
commit
137f0b7842
  1. 20
      src/connector.c
  2. 1
      src/connector.h
  3. 101
      src/stratifier.c

20
src/connector.c

@ -137,7 +137,7 @@ struct connector_data {
int clients_generated; int clients_generated;
int dead_generated; int dead_generated;
int64_t client_id; int64_t client_ids;
/* client message process queue */ /* client message process queue */
ckmsgq_t *cmpq; ckmsgq_t *cmpq;
@ -243,6 +243,20 @@ static void recycle_client(cdata_t *cdata, client_instance_t *client)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
} }
/* Allows the stratifier to get a unique local virtualid for subclients */
int64_t connector_newclientid(ckpool_t *ckp)
{
int64_t ret;
cdata_t *cdata = ckp->cdata;
ck_wlock(&cdata->lock);
ret = cdata->client_ids++;
ck_wunlock(&cdata->lock);
return ret;
}
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
@ -310,7 +324,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
cdata->nfds, fd, no_clients, client->address_name, port); cdata->nfds, fd, no_clients, client->address_name, port);
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
client->id = cdata->client_id++; client->id = cdata->client_ids++;
HASH_ADD_I64(cdata->clients, id, client); HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++; cdata->nfds++;
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
@ -1654,7 +1668,7 @@ void *connector(void *arg)
cdata->nfds = 0; cdata->nfds = 0;
/* Set the client id to the highest serverurl count to distinguish /* Set the client id to the highest serverurl count to distinguish
* them from the server fds in epoll. */ * them from the server fds in epoll. */
cdata->client_id = ckp->serverurls; cdata->client_ids = ckp->serverurls;
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond); cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);

1
src/connector.h

@ -10,6 +10,7 @@
#ifndef CONNECTOR_H #ifndef CONNECTOR_H
#define CONNECTOR_H #define CONNECTOR_H
int64_t connector_newclientid(ckpool_t *ckp);
void connector_upstream_msg(ckpool_t *ckp, char *msg); void connector_upstream_msg(ckpool_t *ckp, char *msg);
void connector_add_message(ckpool_t *ckp, json_t *val); void connector_add_message(ckpool_t *ckp, json_t *val);
char *connector_stats(void *data, const int runtime); char *connector_stats(void *data, const int runtime);

101
src/stratifier.c

@ -253,6 +253,9 @@ struct stratum_instance {
UT_hash_handle hh; UT_hash_handle hh;
int64_t id; int64_t id;
/* Virtualid used as unique local id for passthrough clients */
int64_t virtualid;
stratum_instance_t *next; stratum_instance_t *next;
stratum_instance_t *prev; stratum_instance_t *prev;
@ -3357,7 +3360,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata)
return client; return client;
} }
/* Enter with write instance_lock held */ /* Enter with write instance_lock held, drops and grabs it again */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address, static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address,
int server) int server)
{ {
@ -3366,6 +3369,8 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
int64_t pass_id; int64_t pass_id;
client = __recruit_stratum_instance(sdata); client = __recruit_stratum_instance(sdata);
ck_wunlock(&sdata->instance_lock);
client->start_time = time(NULL); client->start_time = time(NULL);
client->id = id; client->id = id;
client->session_id = ++sdata->session_id; client->session_id = ++sdata->session_id;
@ -3377,7 +3382,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
client->diff = client->old_diff = ckp->startdiff; client->diff = client->old_diff = ckp->startdiff;
client->ckp = ckp; client->ckp = ckp;
tv_time(&client->ldc); tv_time(&client->ldc);
HASH_ADD_I64(sdata->stratum_instances, id, client);
/* Points to ckp sdata in ckpool mode, but is changed later in proxy /* Points to ckp sdata in ckpool mode, but is changed later in proxy
* mode . */ * mode . */
client->sdata = sdata; client->sdata = sdata;
@ -3398,8 +3402,14 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64, sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64,
pass_id, id); pass_id, id);
} }
} else client->virtualid = connector_newclientid(ckp);
} else {
sprintf(client->identity, "%"PRId64, id); sprintf(client->identity, "%"PRId64, id);
client->virtualid = id;
}
ck_wlock(&sdata->instance_lock);
HASH_ADD_I64(sdata->stratum_instances, id, client);
return client; return client;
} }
@ -4453,8 +4463,6 @@ static void get_uptime(sdata_t *sdata, int *sockd)
send_api_response(val, *sockd); send_api_response(val, *sockd);
} }
static void srecv_process(ckpool_t *ckp, json_t *val);
/* For emergency use only, flushes all pending ckdbq messages */ /* For emergency use only, flushes all pending ckdbq messages */
static void ckdbq_flush(sdata_t *sdata) static void ckdbq_flush(sdata_t *sdata)
{ {
@ -5862,23 +5870,28 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc
send_node_block(sdata, client->enonce1, nonce, nonce2, ntime32, wb->id, send_node_block(sdata, client->enonce1, nonce, nonce2, ntime32, wb->id,
diff, client->id); diff, client->id);
JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}", val = json_object();
"height", wb->height, // JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}",
"blockhash", blockhash, json_set_int(val, "height", wb->height);
"confirmed", "n", json_set_string(val,"blockhash", blockhash);
"workinfoid", wb->id, json_set_string(val,"confirmed", "n");
"username", client->user_instance->username, json_set_int64(val, "workinfoid", wb->id);
"workername", client->workername, json_set_string(val, "username", client->user_instance->username);
"clientid", client->id, json_set_string(val, "workername", client->workername);
"enonce1", client->enonce1, if (ckp->remote)
"nonce2", nonce2, json_set_int64(val, "clientid", client->virtualid);
"nonce", nonce, else
"reward", wb->coinbasevalue, json_set_int64(val, "clientid", client->id);
"diff", diff, json_set_string(val, "enonce1", client->enonce1);
"createdate", cdfield, json_set_string(val, "nonce2", nonce2);
"createby", "code", json_set_string(val, "nonce", nonce);
"createcode", __func__, json_set_int64(val, "reward", wb->coinbasevalue);
"createinet", ckp->serverurl[client->server]); json_set_double(val, "diff", diff);
json_set_string(val, "createdate", cdfield);
json_set_string(val, "createby", "code");
json_set_string(val, "createcode", __func__);
json_set_string(val, "createinet", ckp->serverurl[client->server]);
val_copy = json_deep_copy(val); val_copy = json_deep_copy(val);
if (ckp->remote) { if (ckp->remote) {
@ -6180,7 +6193,10 @@ out_nowb:
/* Now write to the pool's sharelog. */ /* Now write to the pool's sharelog. */
val = json_object(); val = json_object();
json_set_int(val, "workinfoid", id); json_set_int(val, "workinfoid", id);
json_set_int(val, "clientid", client->id); if (ckp->remote)
json_set_int64(val, "clientid", client->virtualid);
else
json_set_int64(val, "clientid", client->id);
json_set_string(val, "enonce1", client->enonce1); json_set_string(val, "enonce1", client->enonce1);
if (!CKP_STANDALONE(ckp)) if (!CKP_STANDALONE(ckp))
json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "secondaryuserid", user->secondaryuserid);
@ -6250,7 +6266,10 @@ out:
if (!share) { if (!share) {
if (!CKP_STANDALONE(ckp) || ckp->remote) { if (!CKP_STANDALONE(ckp) || ckp->remote) {
val = json_object(); val = json_object();
json_set_int(val, "clientid", client->id); if (ckp->remote)
json_set_int64(val, "clientid", client->virtualid);
else
json_set_int64(val, "clientid", client->id);
if (user->secondaryuserid) if (user->secondaryuserid)
json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "secondaryuserid", user->secondaryuserid);
json_set_string(val, "enonce1", client->enonce1); json_set_string(val, "enonce1", client->enonce1);
@ -6564,13 +6583,13 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
if (unlikely(cmdmatch(method, "mining.passthrough"))) { if (unlikely(cmdmatch(method, "mining.passthrough"))) {
char buf[256]; char buf[256];
if (ckp->proxy || ckp->node || ckp->remote) { if (ckp->proxy || ckp->node ) {
LOGNOTICE("Dropping client %s %s trying to connect as passthrough on unsupported server %d", LOGNOTICE("Dropping client %s %s trying to connect as passthrough on unsupported server %d",
client->identity, client->address, client->server); client->identity, client->address, client->server);
connector_drop_client(ckp, client_id); connector_drop_client(ckp, client_id);
drop_client(ckp, sdata, client_id); drop_client(ckp, sdata, client_id);
} else { } else {
/* We need is a passthrough and to manage its messages /*Flag this as a passthrough and manage its messages
* accordingly. No data from this client id should ever * accordingly. No data from this client id should ever
* come directly back to this stratifier. */ * come directly back to this stratifier. */
LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address); LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address);
@ -6861,6 +6880,31 @@ static void send_auth_failure(sdata_t *sdata, stratum_instance_t *client)
stratum_send_message(sdata, client, "Failed authorisation :("); stratum_send_message(sdata, client, "Failed authorisation :(");
} }
/* For finding a client by its virtualid instead of client->id. This is an
* inefficient lookup but only occurs once on parsing a remote auth from the
* upstream pool on passthrough subclients. */
static stratum_instance_t *ref_instance_by_virtualid(sdata_t *sdata, int64_t *client_id)
{
stratum_instance_t *client, *ret = NULL;
ck_wlock(&sdata->instance_lock);
for (client = sdata->stratum_instances; client; client = client->hh.next) {
if (likely(client->virtualid != *client_id))
continue;
if (likely(!client->dropped)) {
ret = client;
__inc_instance_ref(ret);
/* Replace the client_id with the correct one, allowing
* us to send the response to the correct client */
*client_id = client->id;
}
break;
}
ck_wunlock(&sdata->instance_lock);
return ret;
}
void parse_upstream_auth(ckpool_t *ckp, json_t *val) void parse_upstream_auth(ckpool_t *ckp, json_t *val)
{ {
json_t *id_val = NULL, *err_val = NULL; json_t *id_val = NULL, *err_val = NULL;
@ -6878,6 +6922,9 @@ void parse_upstream_auth(ckpool_t *ckp, json_t *val)
goto out; goto out;
err_val = json_object_get(val, "error"); err_val = json_object_get(val, "error");
client = ref_instance_by_id(sdata, client_id); client = ref_instance_by_id(sdata, client_id);
/* Is this client_id a virtualid from a passthrough subclient */
if (!client)
client = ref_instance_by_virtualid(sdata, &client_id);
if (!client) { if (!client) {
LOGINFO("Failed to find client id %"PRId64" in parse_upstream_auth", LOGINFO("Failed to find client id %"PRId64" in parse_upstream_auth",
client_id); client_id);
@ -7545,7 +7592,7 @@ static void upstream_auth(ckpool_t *ckp, stratum_instance_t *client, json_params
json_set_string(val, "useragent", client->useragent ? : ""); json_set_string(val, "useragent", client->useragent ? : "");
json_set_string(val, "enonce1", client->enonce1 ? : ""); json_set_string(val, "enonce1", client->enonce1 ? : "");
json_set_string(val, "address", client->address); json_set_string(val, "address", client->address);
json_set_int(val, "clientid", client->id); json_set_int64(val, "clientid", client->virtualid);
msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL);
json_decref(val); json_decref(val);
connector_upstream_msg(ckp, msg); connector_upstream_msg(ckp, msg);

Loading…
Cancel
Save