Browse Source

Propagate workinfos to all remote trusted servers to be able to submit all blocks at all remote servers.

master
Con Kolivas 8 years ago
parent
commit
bd367364c1
  1. 4
      src/connector.c
  2. 4
      src/libckpool.h
  3. 100
      src/stratifier.c
  4. 3
      src/stratifier.h

4
src/connector.c

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2016 Con Kolivas * Copyright 2014-2017 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -1210,6 +1210,8 @@ static void *urecv_process(void *arg)
parse_upstream_txns(ckp, val); parse_upstream_txns(ckp, val);
else if (!safecmp(method, stratum_msgs[SM_AUTHRESULT])) else if (!safecmp(method, stratum_msgs[SM_AUTHRESULT]))
parse_upstream_auth(ckp, val); parse_upstream_auth(ckp, val);
else if (!safecmp(method, stratum_msgs[SM_WORKINFO]))
parse_upstream_workinfo(ckp, val);
else if (!safecmp(method, "submitblock")) else if (!safecmp(method, "submitblock"))
parse_remote_submitblock(ckp, val, cs->buf); parse_remote_submitblock(ckp, val, cs->buf);
else if (!safecmp(method, "pong")) else if (!safecmp(method, "pong"))

4
src/libckpool.h

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2016 Con Kolivas * Copyright 2014-2017 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -369,7 +369,7 @@ static inline void json_intcpy(int *i, json_t *val, const char *key)
static inline void json_strdup(char **buf, json_t *val, const char *key) static inline void json_strdup(char **buf, json_t *val, const char *key)
{ {
*buf = strdup(json_string_value(json_object_get(val, key))); *buf = strdup(json_string_value(json_object_get(val, key)) ? : "");
} }
/* Helpers for setting a field will check for valid entry and print an error /* Helpers for setting a field will check for valid entry and print an error

100
src/stratifier.c

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2016 Con Kolivas * Copyright 2014-2017 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -85,6 +85,8 @@ struct workbase {
/* The id a remote workinfo is mapped to locally */ /* The id a remote workinfo is mapped to locally */
int64_t mapped_id; int64_t mapped_id;
/* The client id this remote workinfo came from */
int64_t client_id;
ts_t gentime; ts_t gentime;
tv_t retired; tv_t retired;
@ -955,8 +957,6 @@ static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *
json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue);
json_set_int(wb_val, "height", wb->height); json_set_int(wb_val, "height", wb->height);
json_set_string(wb_val, "flags", wb->flags); json_set_string(wb_val, "flags", wb->flags);
/* Set to zero to be backwards compat with older node code */
json_set_int(wb_val, "transactions", 0);
json_set_int(wb_val, "txns", wb->txns); json_set_int(wb_val, "txns", wb->txns);
json_set_string(wb_val, "txn_hashes", wb->txn_hashes); json_set_string(wb_val, "txn_hashes", wb->txn_hashes);
json_set_int(wb_val, "merkles", wb->merkles); json_set_int(wb_val, "merkles", wb->merkles);
@ -982,6 +982,20 @@ static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *
DL_APPEND(bulk_send, client_msg); DL_APPEND(bulk_send, client_msg);
messages++; messages++;
} }
DL_FOREACH(sdata->remote_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg = json_deep_copy(wb_val);
json_set_string(json_msg, "method", stratum_msgs[SM_WORKINFO]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (ckp->remote) if (ckp->remote)
@ -1567,10 +1581,54 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
val = generate_workinfo(ckp, wb, __func__); val = generate_workinfo(ckp, wb, __func__);
/* Replace workinfoid with mapped id */ /* Replace workinfoid with mapped id */
json_set_int64(val, "workinfoid", wb->mapped_id); json_set_int64(val, "workinfoid", wb->mapped_id);
/* If this is the upstream pool, send a copy of this to all OTHER remote
* trusted servers as well */
if (!ckp->remote) {
json_t *wb_val = json_deep_copy(val);
stratum_instance_t *client;
ckmsg_t *bulk_send = NULL;
int messages = 0;
/* Strip unnecessary fields and add extra fields needed */
strip_fields(ckp, wb_val);
json_set_int(wb_val, "txns", wb->txns);
json_set_string(wb_val, "txn_hashes", wb->txn_hashes);
json_set_int(wb_val, "merkles", wb->merkles);
ck_rlock(&sdata->instance_lock);
DL_FOREACH(sdata->remote_instances, client) {
ckmsg_t *client_msg;
json_t *json_msg;
smsg_t *msg;
/* Don't send remote workinfo back to same remote */
if (client->id == wb->client_id)
continue;
json_msg = json_deep_copy(wb_val);
json_set_string(json_msg, "method", stratum_msgs[SM_WORKINFO]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&sdata->instance_lock);
json_decref(wb_val);
if (bulk_send) {
LOGINFO("Sending remote workinfo to %d other remote servers", messages);
ssend_bulk_postpone(sdata, bulk_send, messages);
}
}
ckdbq_add(ckp, ID_WORKINFO, val); ckdbq_add(ckp, ID_WORKINFO, val);
} }
static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t client_id)
{ {
workbase_t *wb = ckzalloc(sizeof(workbase_t)); workbase_t *wb = ckzalloc(sizeof(workbase_t));
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
@ -1579,6 +1637,12 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted)
char header[228]; char header[228];
wb->ckp = ckp; wb->ckp = ckp;
/* This is the client id if this workbase came from a remote trusted
* server. */
wb->client_id = client_id;
/* Some of these fields are empty when running as a remote trusted
* server receiving other workinfos from the upstream pool */
json_int64cpy(&wb->id, val, "jobid"); json_int64cpy(&wb->id, val, "jobid");
json_strcpy(wb->target, val, "target"); json_strcpy(wb->target, val, "target");
json_dblcpy(&wb->diff, val, "diff"); json_dblcpy(&wb->diff, val, "diff");
@ -1592,8 +1656,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted)
json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue");
json_intcpy(&wb->height, val, "height"); json_intcpy(&wb->height, val, "height");
json_strdup(&wb->flags, val, "flags"); json_strdup(&wb->flags, val, "flags");
/* First see if the server uses the old communication format */
json_intcpy(&wb->txns, val, "transactions");
if (!ckp->proxy) { if (!ckp->proxy) {
/* This is a workbase from a trusted remote */ /* This is a workbase from a trusted remote */
json_intcpy(&wb->txns, val, "txns"); json_intcpy(&wb->txns, val, "txns");
@ -1606,16 +1668,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted)
wb->incomplete = true; wb->incomplete = true;
wb->txns = 0; wb->txns = 0;
} }
} else if (wb->txns) {
int i;
json_strdup(&wb->txn_data, val, "txn_data");
json_intcpy(&wb->merkles, val, "merkles");
wb->merkle_array = json_object_dup(val, "merklehash");
for (i = 0; i < wb->merkles; i++) {
strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i)));
hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32);
}
} else { } else {
json_intcpy(&wb->txns, val, "txns"); json_intcpy(&wb->txns, val, "txns");
txnhashes = json_object_get(val, "txn_hashes"); txnhashes = json_object_get(val, "txn_hashes");
@ -1653,12 +1705,13 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted)
LOGDEBUG("Header: %s", header); LOGDEBUG("Header: %s", header);
hex2bin(wb->headerbin, header, 112); hex2bin(wb->headerbin, header, 112);
/* If this is from a remote trusted server, add it to the /* If this is from a remote trusted server or an upstream server, add
* remote_workbases hashtable */ * it to the remote_workbases hashtable */
if (trusted) if (trusted)
add_remote_base(ckp, sdata, wb); add_remote_base(ckp, sdata, wb);
else else
add_base(ckp, sdata, wb, &new_block); add_base(ckp, sdata, wb, &new_block);
if (new_block) if (new_block)
LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); LOGNOTICE("Block hash changed to %s", sdata->lastswaphash);
} }
@ -6591,6 +6644,11 @@ out:
} }
} }
void parse_upstream_workinfo(ckpool_t *ckp, json_t *val)
{
add_node_base(ckp, val, true, 0);
}
/* Remap the remote client id to the local one and submit to ckdb */ /* Remap the remote client id to the local one and submit to ckdb */
static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t remote_id) static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t remote_id)
{ {
@ -6604,7 +6662,7 @@ static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t r
ckdbq_add(ckp, ID_WORKERSTATS, val); ckdbq_add(ckp, ID_WORKERSTATS, val);
} }
#define parse_remote_workinfo(ckp, val) add_node_base(ckp, val, true) #define parse_remote_workinfo(ckp, val, client_id) add_node_base(ckp, val, true, client_id)
static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote, static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote,
const int64_t remote_id) const int64_t remote_id)
@ -6830,7 +6888,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS])) else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS]))
parse_remote_workerstats(ckp, val, client->id); parse_remote_workerstats(ckp, val, client->id);
else if (!safecmp(method, stratum_msgs[SM_WORKINFO])) else if (!safecmp(method, stratum_msgs[SM_WORKINFO]))
parse_remote_workinfo(ckp, val); parse_remote_workinfo(ckp, val, client->id);
else if (!safecmp(method, stratum_msgs[SM_AUTH])) else if (!safecmp(method, stratum_msgs[SM_AUTH]))
parse_remote_auth(ckp, sdata, val, client, client->id); parse_remote_auth(ckp, sdata, val, client, client->id);
else if (!safecmp(method, stratum_msgs[SM_SHAREERR])) else if (!safecmp(method, stratum_msgs[SM_SHAREERR]))
@ -6921,7 +6979,7 @@ static void parse_node_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val)
add_node_txns(ckp, sdata, val); add_node_txns(ckp, sdata, val);
break; break;
case SM_WORKINFO: case SM_WORKINFO:
add_node_base(ckp, val, false); add_node_base(ckp, val, false, 0);
break; break;
case SM_BLOCK: case SM_BLOCK:
submit_node_block(ckp, sdata, val); submit_node_block(ckp, sdata, val);

3
src/stratifier.h

@ -1,5 +1,5 @@
/* /*
* Copyright 2014-2016 Con Kolivas * Copyright 2014-2017 Con Kolivas
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -13,6 +13,7 @@
void parse_remote_txns(ckpool_t *ckp, const json_t *val); void parse_remote_txns(ckpool_t *ckp, const json_t *val);
#define parse_upstream_txns(ckp, val) parse_remote_txns(ckp, val) #define parse_upstream_txns(ckp, val) parse_remote_txns(ckp, val)
void parse_upstream_auth(ckpool_t *ckp, json_t *val); void parse_upstream_auth(ckpool_t *ckp, json_t *val);
void parse_upstream_workinfo(ckpool_t *ckp, json_t *val);
char *stratifier_stats(ckpool_t *ckp, void *data); char *stratifier_stats(ckpool_t *ckp, void *data);
void stratifier_add_recv(ckpool_t *ckp, json_t *val); void stratifier_add_recv(ckpool_t *ckp, json_t *val);
void stratifier_block_solve(ckpool_t *ckp, const char *blockhash); void stratifier_block_solve(ckpool_t *ckp, const char *blockhash);

Loading…
Cancel
Save