Browse Source

Up and downstream submitblocks in trusted remote mode

master
Con Kolivas 9 years ago
parent
commit
74352a0e32
  1. 111
      src/stratifier.c

111
src/stratifier.c

@ -472,6 +472,7 @@ struct stratifier_data {
stratum_instance_t *stratum_instances; stratum_instance_t *stratum_instances;
stratum_instance_t *recycled_instances; stratum_instance_t *recycled_instances;
stratum_instance_t *node_instances; stratum_instance_t *node_instances;
stratum_instance_t *remote_instances;
int stratum_generated; int stratum_generated;
int disconnected_generated; int disconnected_generated;
@ -1378,6 +1379,56 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
} }
} }
static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block)
{
char *buf;
ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n",
gbt_block);
send_proc(ckp->connector, buf);
free(buf);
}
static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const stratum_instance_t *source)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->data;
ckmsg_t *bulk_send = NULL;
int messages = 0;
ck_rlock(&sdata->instance_lock);
if (sdata->remote_instances) {
json_t *val = json_object();
JSON_CPACK(val, "{ss,ss}",
"method", "submitblock",
"submitblock", gbt_block);
DL_FOREACH(sdata->remote_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg;
if (client == source)
continue;
json_msg = json_copy(val);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(val);
}
ck_runlock(&sdata->instance_lock);
if (bulk_send) {
LOGNOTICE("Sending submitblock to downstream servers");
ssend_bulk_prepend(sdata, bulk_send, messages);
}
}
static void static void
process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen,
const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) const uchar *data, const uchar *hash, uchar *swap32, char *blockhash)
@ -1414,9 +1465,11 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i
if (wb->transactions) if (wb->transactions)
realloc_strcat(&gbt_block, wb->txn_data); realloc_strcat(&gbt_block, wb->txn_data);
send_generator(ckp, gbt_block, GEN_PRIORITY); send_generator(ckp, gbt_block, GEN_PRIORITY);
if (ckp->remote)
upstream_blocksubmit(ckp, gbt_block);
else
downstream_blocksubmits(ckp, gbt_block, NULL);
free(gbt_block); free(gbt_block);
} }
static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
@ -2444,6 +2497,8 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
if (unlikely(client->node)) if (unlikely(client->node))
DL_DELETE(sdata->node_instances, client); DL_DELETE(sdata->node_instances, client);
if (unlikely(client->remote))
DL_DELETE(sdata->remote_instances, client);
if (client->workername) { if (client->workername) {
if (user) { if (user) {
ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s",
@ -2602,11 +2657,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type)
return; return;
} }
if (ckp->node) {
json_decref(val);
return;
}
/* Use this locking as an opportunity to test other clients. */ /* Use this locking as an opportunity to test other clients. */
ck_rlock(&ckp_sdata->instance_lock); ck_rlock(&ckp_sdata->instance_lock);
HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) { HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) {
@ -2892,8 +2942,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
} }
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
if (unlikely(!found)) { if (!found) {
LOGERR("Failed to find blockhash %s in block_solve!", blockhash); LOGINFO("Failed to find blockhash %s in block_solve, possibly from downstream",
blockhash);
return; return;
} }
@ -2971,8 +3022,9 @@ static void block_reject(sdata_t *sdata, const char *blockhash)
} }
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
if (unlikely(!found)) { if (!found) {
LOGERR("Failed to find blockhash %s in block_reject!", blockhash); LOGINFO("Failed to find blockhash %s in block_reject, possibly from downstream",
blockhash);
return; return;
} }
val = found->data; val = found->data;
@ -5539,9 +5591,8 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
{ {
pthread_t pth; pthread_t pth;
client->node = true;
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
client->node = true;
DL_APPEND(sdata->node_instances, client); DL_APPEND(sdata->node_instances, client);
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
@ -5552,6 +5603,14 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
create_pthread(&pth, set_node_latency, client); create_pthread(&pth, set_node_latency, client);
} }
static void add_remote_server(sdata_t *sdata, stratum_instance_t *client)
{
ck_wlock(&sdata->instance_lock);
client->remote = true;
DL_APPEND(sdata->remote_instances, client);
ck_wunlock(&sdata->instance_lock);
}
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *id_val, json_t *method_val,
@ -5610,10 +5669,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
client_id, client->address, client->server); client_id, client->address, client->server);
connector_drop_client(ckp, client_id); connector_drop_client(ckp, client_id);
} else { } else {
add_remote_server(sdata, client);
snprintf(buf, 255, "remote=%"PRId64, client_id); snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
} }
client->remote = true;
return; return;
} }
@ -5893,6 +5952,23 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf)
LOGDEBUG("Adding %d remote workers to user %s", workers, username); LOGDEBUG("Adding %d remote workers to user %s", workers, username);
} }
static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf,
const stratum_instance_t *client)
{
json_t *submitblock_val;
const char *gbt_block;
submitblock_val = json_object_get(val, "submitblock");
gbt_block = json_string_value(submitblock_val);
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to get submitblock data from remote message %s", buf);
return;
}
LOGWARNING("Submitting possible downstream block!");
send_generator(ckp, gbt_block, GEN_PRIORITY);
downstream_blocksubmits(ckp, gbt_block, client);
}
static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)
{ {
json_t *workername_val = json_object_get(val, "workername"), json_t *workername_val = json_object_get(val, "workername"),
@ -5923,7 +5999,8 @@ static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)
reset_bestshares(sdata); reset_bestshares(sdata);
} }
static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
stratum_instance_t *client)
{ {
json_t *method_val = json_object_get(val, "method"); json_t *method_val = json_object_get(val, "method");
const char *method; const char *method;
@ -5938,6 +6015,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
parse_remote_shares(ckp, sdata, val, buf); parse_remote_shares(ckp, sdata, val, buf);
else if (!safecmp(method, "workers")) else if (!safecmp(method, "workers"))
parse_remote_workers(sdata, val, buf); parse_remote_workers(sdata, val, buf);
else if (!safecmp(method, "submitblock"))
parse_remote_blocksubmit(ckp, val, buf, client);
else if (!safecmp(method, "block")) else if (!safecmp(method, "block"))
parse_remote_block(sdata, val, buf); parse_remote_block(sdata, val, buf);
else else
@ -6135,7 +6214,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
if (client->remote) if (client->remote)
parse_trusted_msg(ckp, sdata, msg->json_msg, buf); parse_trusted_msg(ckp, sdata, msg->json_msg, buf, client);
else if (ckp->node) else if (ckp->node)
node_client_msg(ckp, msg->json_msg, buf, client); node_client_msg(ckp, msg->json_msg, buf, client);
else else

Loading…
Cancel
Save