Browse Source

Send transactions up and downstream in trusted mode to be submitted to the local btcd.

master
Con Kolivas 8 years ago
parent
commit
d9e5cf03f1
  1. 8
      src/connector.c
  2. 94
      src/stratifier.c
  3. 1
      src/stratifier.h

8
src/connector.c

@ -1186,14 +1186,18 @@ static void *urecv_process(void *arg)
LOGWARNING("Failed to find method from upstream pool json %s", LOGWARNING("Failed to find method from upstream pool json %s",
cs->buf); cs->buf);
json_decref(val); json_decref(val);
goto nomsg; goto decref;
} }
if (!safecmp(method, "submitblock")) if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS]))
parse_remote_txns(ckp, val);
else if (!safecmp(method, "submitblock"))
parse_remote_submitblock(ckp, val, cs->buf); parse_remote_submitblock(ckp, val, cs->buf);
else if (!safecmp(method, "pong")) else if (!safecmp(method, "pong"))
LOGDEBUG("Received upstream pong"); LOGDEBUG("Received upstream pong");
else else
LOGWARNING("Unrecognised upstream method %s", method); LOGWARNING("Unrecognised upstream method %s", method);
decref:
json_decref(val);
nomsg: nomsg:
cksem_post(&cs->sem); cksem_post(&cs->sem);

94
src/stratifier.c

@ -1168,18 +1168,30 @@ static void add_txn(ckpool_t *ckp, sdata_t *sdata, txntable_t **txns, const char
HASH_ADD_STR(*txns, hash, txn); HASH_ADD_STR(*txns, hash, txn);
} }
static void send_node_transactions(sdata_t *sdata, const json_t *txn_val) static void upstream_txns(ckpool_t *ckp, const json_t *txn_val)
{
json_t *json_msg = json_deep_copy(txn_val);
char *buf;
json_set_string(json_msg, "method", stratum_msgs[SM_TRANSACTIONS]);
ASPRINTF(&buf, "upstream=%s", json_dumps(json_msg, JSON_EOL));
json_decref(json_msg);
send_proc(ckp->connector, buf);
free(buf);
}
static void send_node_transactions(ckpool_t *ckp, sdata_t *sdata, const json_t *txn_val)
{ {
stratum_instance_t *client; stratum_instance_t *client;
ckmsg_t *bulk_send = NULL; ckmsg_t *bulk_send = NULL;
ckmsg_t *client_msg;
int messages = 0; int messages = 0;
json_t *json_msg;
smsg_t *msg;
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
DL_FOREACH(sdata->node_instances, client) { DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg; json_msg = json_deep_copy(txn_val);
smsg_t *msg;
json_t *json_msg = json_deep_copy(txn_val);
json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]); json_set_string(json_msg, "node.method", stratum_msgs[SM_TRANSACTIONS]);
client_msg = ckalloc(sizeof(ckmsg_t)); client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t)); msg = ckzalloc(sizeof(smsg_t));
@ -1189,8 +1201,22 @@ static void send_node_transactions(sdata_t *sdata, const json_t *txn_val)
DL_APPEND(bulk_send, client_msg); DL_APPEND(bulk_send, client_msg);
messages++; messages++;
} }
DL_FOREACH(sdata->remote_instances, client) {
json_msg = json_deep_copy(txn_val);
json_set_string(json_msg, "method", stratum_msgs[SM_TRANSACTIONS]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (ckp->remote)
upstream_txns(ckp, txn_val);
if (bulk_send) { if (bulk_send) {
LOGINFO("Sending transactions to mining nodes"); LOGINFO("Sending transactions to mining nodes");
ssend_bulk_append(sdata, bulk_send, messages); ssend_bulk_append(sdata, bulk_send, messages);
@ -1310,7 +1336,7 @@ static void wb_merkle_bins(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
JSON_CPACK(val, "{so}", "transaction", txn_array); JSON_CPACK(val, "{so}", "transaction", txn_array);
send_node_transactions(sdata, val); send_node_transactions(ckp, sdata, val);
json_decref(val); json_decref(val);
if (added || purged) if (added || purged)
@ -6356,6 +6382,18 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna
return user; return user;
} }
/* Submit the transactions in node/remote mode so the local btcd has all the
* transactions that will go into the next blocksolve. */
static void submit_transaction(ckpool_t *ckp, const char *hash)
{
char *buf;
if (unlikely(!ckp->generator_ready))
return;
ASPRINTF(&buf, "submittxn:%s", hash);
send_generator(ckp, buf, GEN_LAX);
free(buf);
}
static void parse_remote_shares(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_shares(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf)
{ {
@ -6401,6 +6439,35 @@ static void parse_remote_shares(ckpool_t *ckp, sdata_t *sdata, json_t *val, cons
LOGINFO("Added %"PRId64" remote shares to worker %s", diff, workername); LOGINFO("Added %"PRId64" remote shares to worker %s", diff, workername);
} }
/* In remote mode we simply submit them to our bitcoind to guarantee fast
* block change and compact block propagation between pools. */
void parse_remote_txns(ckpool_t *ckp, const json_t *val)
{
json_t *txn_array, *txn_val, *data_val;
int i, arr_size;
int added = 0;
txn_array = json_object_get(val, "transaction");
arr_size = json_array_size(txn_array);
for (i = 0; i < arr_size; i++) {
const char *data;
txn_val = json_array_get(txn_array, i);
data_val = json_object_get(txn_val, "data");
data = json_string_value(data_val);
if (unlikely(!data)) {
LOGERR("Failed to get hash/data in parse_remote_txns");
continue;
}
submit_transaction(ckp, data);
added++;
}
if (added)
LOGNOTICE("Submitted %d remote transactions", added);
}
/* Get the remote worker count once per minute from all the remote servers */ /* Get the remote worker count once per minute from all the remote servers */
static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf)
{ {
@ -6491,6 +6558,8 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
} }
if (likely(!safecmp(method, "shares"))) if (likely(!safecmp(method, "shares")))
parse_remote_shares(ckp, sdata, val, buf); parse_remote_shares(ckp, sdata, val, buf);
else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS]))
parse_remote_txns(ckp, val);
else if (!safecmp(method, "workers")) else if (!safecmp(method, "workers"))
parse_remote_workers(sdata, val, buf); parse_remote_workers(sdata, val, buf);
else if (!safecmp(method, "submitblock")) else if (!safecmp(method, "submitblock"))
@ -6505,19 +6574,6 @@ out:
free(buf); free(buf);
} }
/* Submit the transactions in node mode so the local btcd has all the
* transactions that will go into the next blocksolve. */
static void submit_transaction(ckpool_t *ckp, const char *hash)
{
char *buf;
if (unlikely(!ckp->generator_ready))
return;
ASPRINTF(&buf, "submittxn:%s", hash);
send_generator(ckp, buf, GEN_LAX);
free(buf);
}
static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val) static void add_node_txns(ckpool_t *ckp, sdata_t *sdata, const json_t *val)
{ {
json_t *txn_array, *txn_val, *data_val, *hash_val; json_t *txn_array, *txn_val, *data_val, *hash_val;

1
src/stratifier.h

@ -10,6 +10,7 @@
#ifndef STRATIFIER_H #ifndef STRATIFIER_H
#define STRATIFIER_H #define STRATIFIER_H
void parse_remote_txns(ckpool_t *ckp, const json_t *val);
void stratifier_add_recv(ckpool_t *ckp, json_t *val); void stratifier_add_recv(ckpool_t *ckp, json_t *val);
void *stratifier(void *arg); void *stratifier(void *arg);

Loading…
Cancel
Save