diff --git a/src/connector.c b/src/connector.c index 28ff5359..e469e690 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -1210,6 +1210,8 @@ static void *urecv_process(void *arg) parse_upstream_txns(ckp, val); else if (!safecmp(method, stratum_msgs[SM_AUTHRESULT])) parse_upstream_auth(ckp, val); + else if (!safecmp(method, stratum_msgs[SM_WORKINFO])) + parse_upstream_workinfo(ckp, val); else if (!safecmp(method, "submitblock")) parse_remote_submitblock(ckp, val, cs->buf); else if (!safecmp(method, "pong")) diff --git a/src/libckpool.h b/src/libckpool.h index a0a00023..478794e9 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -369,7 +369,7 @@ static inline void json_intcpy(int *i, json_t *val, const char *key) static inline void json_strdup(char **buf, json_t *val, const char *key) { - *buf = strdup(json_string_value(json_object_get(val, key))); + *buf = strdup(json_string_value(json_object_get(val, key)) ? : ""); } /* Helpers for setting a field will check for valid entry and print an error diff --git a/src/stratifier.c b/src/stratifier.c index d68b4826..19b912d1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -85,6 +85,8 @@ struct workbase { /* The id a remote workinfo is mapped to locally */ int64_t mapped_id; + /* The client id this remote workinfo came from */ + int64_t client_id; ts_t gentime; tv_t retired; @@ -955,8 +957,6 @@ static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t * json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "height", wb->height); json_set_string(wb_val, "flags", wb->flags); - /* Set to zero to be backwards compat with older node code */ - json_set_int(wb_val, "transactions", 0); json_set_int(wb_val, "txns", wb->txns); json_set_string(wb_val, "txn_hashes", wb->txn_hashes); json_set_int(wb_val, "merkles", wb->merkles); @@ -982,6 +982,20 @@ static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t * DL_APPEND(bulk_send, client_msg); messages++; } + DL_FOREACH(sdata->remote_instances, client) { + ckmsg_t *client_msg; + smsg_t *msg; + json_t *json_msg = json_deep_copy(wb_val); + + json_set_string(json_msg, "method", stratum_msgs[SM_WORKINFO]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } ck_runlock(&sdata->instance_lock); if (ckp->remote) @@ -1567,10 +1581,54 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) val = generate_workinfo(ckp, wb, __func__); /* Replace workinfoid with mapped id */ json_set_int64(val, "workinfoid", wb->mapped_id); + + /* If this is the upstream pool, send a copy of this to all OTHER remote + * trusted servers as well */ + if (!ckp->remote) { + json_t *wb_val = json_deep_copy(val); + stratum_instance_t *client; + ckmsg_t *bulk_send = NULL; + int messages = 0; + + /* Strip unnecessary fields and add extra fields needed */ + strip_fields(ckp, wb_val); + json_set_int(wb_val, "txns", wb->txns); + json_set_string(wb_val, "txn_hashes", wb->txn_hashes); + json_set_int(wb_val, "merkles", wb->merkles); + + ck_rlock(&sdata->instance_lock); + DL_FOREACH(sdata->remote_instances, client) { + ckmsg_t *client_msg; + json_t *json_msg; + smsg_t *msg; + + /* Don't send remote workinfo back to same remote */ + if (client->id == wb->client_id) + continue; + json_msg = json_deep_copy(wb_val); + json_set_string(json_msg, "method", stratum_msgs[SM_WORKINFO]); + client_msg = ckalloc(sizeof(ckmsg_t)); + msg = ckzalloc(sizeof(smsg_t)); + msg->json_msg = json_msg; + msg->client_id = client->id; + client_msg->data = msg; + DL_APPEND(bulk_send, client_msg); + messages++; + } + ck_runlock(&sdata->instance_lock); + + json_decref(wb_val); + + if (bulk_send) { + LOGINFO("Sending remote workinfo to %d other remote servers", messages); + ssend_bulk_postpone(sdata, bulk_send, messages); + } + } + ckdbq_add(ckp, ID_WORKINFO, val); } -static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) +static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted, int64_t client_id) { workbase_t *wb = ckzalloc(sizeof(workbase_t)); sdata_t *sdata = ckp->sdata; @@ -1579,6 +1637,12 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) char header[228]; wb->ckp = ckp; + /* This is the client id if this workbase came from a remote trusted + * server. */ + wb->client_id = client_id; + + /* Some of these fields are empty when running as a remote trusted + * server receiving other workinfos from the upstream pool */ json_int64cpy(&wb->id, val, "jobid"); json_strcpy(wb->target, val, "target"); json_dblcpy(&wb->diff, val, "diff"); @@ -1592,8 +1656,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); json_intcpy(&wb->height, val, "height"); json_strdup(&wb->flags, val, "flags"); - /* First see if the server uses the old communication format */ - json_intcpy(&wb->txns, val, "transactions"); if (!ckp->proxy) { /* This is a workbase from a trusted remote */ json_intcpy(&wb->txns, val, "txns"); @@ -1606,16 +1668,6 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) wb->incomplete = true; wb->txns = 0; } - } else if (wb->txns) { - int i; - - json_strdup(&wb->txn_data, val, "txn_data"); - json_intcpy(&wb->merkles, val, "merkles"); - wb->merkle_array = json_object_dup(val, "merklehash"); - for (i = 0; i < wb->merkles; i++) { - strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(wb->merkle_array, i))); - hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); - } } else { json_intcpy(&wb->txns, val, "txns"); txnhashes = json_object_get(val, "txn_hashes"); @@ -1653,12 +1705,13 @@ static void add_node_base(ckpool_t *ckp, json_t *val, bool trusted) LOGDEBUG("Header: %s", header); hex2bin(wb->headerbin, header, 112); - /* If this is from a remote trusted server, add it to the - * remote_workbases hashtable */ + /* If this is from a remote trusted server or an upstream server, add + * it to the remote_workbases hashtable */ if (trusted) add_remote_base(ckp, sdata, wb); else add_base(ckp, sdata, wb, &new_block); + if (new_block) LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); } @@ -6591,6 +6644,11 @@ out: } } +void parse_upstream_workinfo(ckpool_t *ckp, json_t *val) +{ + add_node_base(ckp, val, true, 0); +} + /* Remap the remote client id to the local one and submit to ckdb */ static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t remote_id) { @@ -6604,7 +6662,7 @@ static void parse_remote_workerstats(ckpool_t *ckp, json_t *val, const int64_t r ckdbq_add(ckp, ID_WORKERSTATS, val); } -#define parse_remote_workinfo(ckp, val) add_node_base(ckp, val, true) +#define parse_remote_workinfo(ckp, val, client_id) add_node_base(ckp, val, true, client_id) static void parse_remote_auth(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratum_instance_t *remote, const int64_t remote_id) @@ -6830,7 +6888,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS])) parse_remote_workerstats(ckp, val, client->id); else if (!safecmp(method, stratum_msgs[SM_WORKINFO])) - parse_remote_workinfo(ckp, val); + parse_remote_workinfo(ckp, val, client->id); else if (!safecmp(method, stratum_msgs[SM_AUTH])) parse_remote_auth(ckp, sdata, val, client, client->id); else if (!safecmp(method, stratum_msgs[SM_SHAREERR])) @@ -6921,7 +6979,7 @@ static void parse_node_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val) add_node_txns(ckp, sdata, val); break; case SM_WORKINFO: - add_node_base(ckp, val, false); + add_node_base(ckp, val, false, 0); break; case SM_BLOCK: submit_node_block(ckp, sdata, val); diff --git a/src/stratifier.h b/src/stratifier.h index a7dca212..6e32e6ff 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 Con Kolivas + * Copyright 2014-2017 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -13,6 +13,7 @@ void parse_remote_txns(ckpool_t *ckp, const json_t *val); #define parse_upstream_txns(ckp, val) parse_remote_txns(ckp, val) void parse_upstream_auth(ckpool_t *ckp, json_t *val); +void parse_upstream_workinfo(ckpool_t *ckp, json_t *val); char *stratifier_stats(ckpool_t *ckp, void *data); void stratifier_add_recv(ckpool_t *ckp, json_t *val); void stratifier_block_solve(ckpool_t *ckp, const char *blockhash);