@ -1512,19 +1512,115 @@ out:
free ( prio ) ;
free ( prio ) ;
}
}
# define SSEND_PREPEND 0
# define SSEND_APPEND 1
# define SSEND_POSTPONE 2
/* Downstream a json message to all remote servers except for the one matching
* client_id */
static void downstream_json ( sdata_t * sdata , const json_t * val , const int64_t client_id ,
const int prio )
{
stratum_instance_t * client ;
ckmsg_t * bulk_send = NULL ;
int messages = 0 ;
ck_rlock ( & sdata - > instance_lock ) ;
DL_FOREACH ( sdata - > remote_instances , client ) {
ckmsg_t * client_msg ;
json_t * json_msg ;
smsg_t * msg ;
/* Don't send remote workinfo back to same remote */
if ( client - > id = = client_id )
continue ;
json_msg = json_deep_copy ( val ) ;
client_msg = ckalloc ( sizeof ( ckmsg_t ) ) ;
msg = ckzalloc ( sizeof ( smsg_t ) ) ;
msg - > json_msg = json_msg ;
msg - > client_id = client - > id ;
client_msg - > data = msg ;
DL_APPEND ( bulk_send , client_msg ) ;
messages + + ;
}
ck_runlock ( & sdata - > instance_lock ) ;
if ( bulk_send ) {
LOGINFO ( " Sending json to %d remote servers " , messages ) ;
switch ( prio ) {
case SSEND_PREPEND :
ssend_bulk_prepend ( sdata , bulk_send , messages ) ;
break ;
case SSEND_APPEND :
ssend_bulk_append ( sdata , bulk_send , messages ) ;
break ;
case SSEND_POSTPONE :
ssend_bulk_postpone ( sdata , bulk_send , messages ) ;
break ;
}
}
}
/* Find any transactions that are missing from our transaction table during
* rebuild_txns by requesting their data from our bitcoind or requesting them
* from another server . */
static void request_txns ( ckpool_t * ckp , sdata_t * sdata , json_t * txns )
{
json_t * val , * arr_val ;
size_t index ;
/* See if our bitcoind has the transactions first to avoid requesting
* them from another server . */
json_array_foreach ( txns , index , arr_val ) {
const char * hash = json_string_value ( arr_val ) ;
char * data = generator_get_txn ( ckp , hash ) ;
txntable_t * txn ;
if ( ! data )
continue ;
ck_wlock ( & sdata - > workbase_lock ) ;
HASH_FIND_STR ( sdata - > txns , hash , txn ) ;
if ( likely ( ! txn ) ) {
txn = ckzalloc ( sizeof ( txntable_t ) ) ;
memcpy ( txn - > hash , hash , 65 ) ;
txn - > data = data ;
txn - > refcount = 100 ;
HASH_ADD_STR ( sdata - > txns , hash , txn ) ;
} else
free ( data ) ;
ck_wunlock ( & sdata - > workbase_lock ) ;
/* We're removing this object so decrement index for next pass */
json_array_remove ( txns , index ) ;
index - - ;
}
JSON_CPACK ( val , " {so} " , " hash " , txns ) ;
if ( ckp - > remote )
upstream_msgtype ( ckp , val , SM_REQTXNS ) ;
else if ( ckp - > node ) {
/* Nodes have no way to signal upstream pool yet */
} else {
/* We don't know which remote sent the transaction hash so ask
* all of them for it */
json_set_string ( val , " method " , stratum_msgs [ SM_REQTXNS ] ) ;
downstream_json ( sdata , val , 0 , SSEND_POSTPONE ) ;
}
}
/* Rebuilds transactions from txnhashes to be able to construct wb_merkle_bins */
/* Rebuilds transactions from txnhashes to be able to construct wb_merkle_bins */
static bool rebuild_txns ( ckpool_t * ckp , sdata_t * sdata , workbase_t * wb , json_t * txnhashes )
static bool rebuild_txns ( ckpool_t * ckp , sdata_t * sdata , workbase_t * wb , json_t * txnhashes )
{
{
const char * hashes = json_string_value ( txnhashes ) ;
const char * hashes = json_string_value ( txnhashes ) ;
json_t * txn_array ;
json_t * txn_array , * missing_txns ;
char hash [ 68 ] = { } ;
bool ret = false ;
bool ret = false ;
txntable_t * txn ;
txntable_t * txn ;
int i , len ;
int i , len = 0 ;
if ( likely ( hashes ) )
if ( likely ( hashes ) )
len = strlen ( hashes ) ;
len = strlen ( hashes ) ;
else
len = 0 ;
if ( ! hashes | | ! len )
if ( ! hashes | | ! len )
return ret ;
return ret ;
@ -1534,22 +1630,20 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *
}
}
ret = true ;
ret = true ;
txn_array = json_array ( ) ;
txn_array = json_array ( ) ;
missing_txns = json_array ( ) ;
ck_rlock ( & sdata - > workbase_lock ) ;
ck_rlock ( & sdata - > workbase_lock ) ;
for ( i = 0 ; i < wb - > txns ; i + + ) {
for ( i = 0 ; i < wb - > txns ; i + + ) {
json_t * txn_val ;
json_t * txn_val ;
char hash [ 68 ] ;
memcpy ( hash , hashes + i * 65 , 64 ) ;
memcpy ( hash , hashes + i * 65 , 64 ) ;
hash [ 64 ] = ' \0 ' ;
HASH_FIND_STR ( sdata - > txns , hash , txn ) ;
HASH_FIND_STR ( sdata - > txns , hash , txn ) ;
if ( unlikely ( ! txn ) ) {
if ( unlikely ( ! txn ) ) {
txn_val = json_string ( hash ) ;
json_array_append_new ( missing_txns , txn_val ) ;
ret = false ;
ret = false ;
continue ;
continue ;
}
}
/* This is unnecessary most of the time since it will be set
* in rebuild_txns as well but helps in the case we can ' t call
* rebuild_txns due to missing some txns */
txn - > refcount = 100 ;
txn - > refcount = 100 ;
txn - > seen = true ;
txn - > seen = true ;
JSON_CPACK ( txn_val , " {ss,ss} " ,
JSON_CPACK ( txn_val , " {ss,ss} " ,
@ -1561,9 +1655,13 @@ static bool rebuild_txns(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, json_t *
if ( ret ) {
if ( ret ) {
LOGINFO ( " Rebuilt txns into workbase with %d transactions " , ( int ) i ) ;
LOGINFO ( " Rebuilt txns into workbase with %d transactions " , ( int ) i ) ;
wb_merkle_bins ( ckp , sdata , wb , txn_array , false ) ;
wb_merkle_bins ( ckp , sdata , wb , txn_array , false ) ;
} else
} else {
LOGNOTICE ( " Failed to find all txns in rebuild_txns " ) ;
LOGNOTICE ( " Failed to find all txns in rebuild_txns " ) ;
request_txns ( ckp , sdata , missing_txns ) ;
}
json_decref ( txn_array ) ;
json_decref ( txn_array ) ;
json_decref ( missing_txns ) ;
return ret ;
return ret ;
}
}
@ -5619,38 +5717,6 @@ static void add_remote_blockdata(ckpool_t *ckp, json_t *val, const int cblen, co
free ( buf ) ;
free ( buf ) ;
}
}
static void downstream_blockdata ( sdata_t * sdata , const json_t * val , int64_t client_id )
{
stratum_instance_t * client ;
ckmsg_t * bulk_send = NULL ;
int messages = 0 ;
ck_rlock ( & sdata - > instance_lock ) ;
DL_FOREACH ( sdata - > remote_instances , client ) {
ckmsg_t * client_msg ;
json_t * json_msg ;
smsg_t * msg ;
/* Don't send remote workinfo back to same remote */
if ( client - > id = = client_id )
continue ;
json_msg = json_deep_copy ( val ) ;
client_msg = ckalloc ( sizeof ( ckmsg_t ) ) ;
msg = ckzalloc ( sizeof ( smsg_t ) ) ;
msg - > json_msg = json_msg ;
msg - > client_id = client - > id ;
client_msg - > data = msg ;
DL_APPEND ( bulk_send , client_msg ) ;
messages + + ;
}
ck_runlock ( & sdata - > instance_lock ) ;
if ( bulk_send ) {
LOGINFO ( " Sending block to %d remote servers " , messages ) ;
ssend_bulk_postpone ( sdata , bulk_send , messages ) ;
}
}
static void
static void
downstream_block ( ckpool_t * ckp , sdata_t * sdata , const json_t * val , const int cblen ,
downstream_block ( ckpool_t * ckp , sdata_t * sdata , const json_t * val , const int cblen ,
const char * coinbase , const uchar * data )
const char * coinbase , const uchar * data )
@ -5661,7 +5727,7 @@ downstream_block(ckpool_t *ckp, sdata_t *sdata, const json_t *val, const int cbl
strip_fields ( ckp , block_val ) ;
strip_fields ( ckp , block_val ) ;
json_set_string ( block_val , " method " , stratum_msgs [ SM_BLOCK ] ) ;
json_set_string ( block_val , " method " , stratum_msgs [ SM_BLOCK ] ) ;
add_remote_blockdata ( ckp , block_val , cblen , coinbase , data ) ;
add_remote_blockdata ( ckp , block_val , cblen , coinbase , data ) ;
downstream_blockdata ( sdata , block_val , 0 ) ;
downstream_json ( sdata , block_val , 0 , SSEND_PREPEND ) ;
json_decref ( block_val ) ;
json_decref ( block_val ) ;
}
}
@ -6887,7 +6953,7 @@ out_add:
val = json_deep_copy ( val ) ;
val = json_deep_copy ( val ) ;
remap_workinfo_id ( sdata , val ) ;
remap_workinfo_id ( sdata , val ) ;
if ( ! ckp - > remote )
if ( ! ckp - > remote )
downstream_blockdata ( sdata , val , client_id ) ;
downstream_json ( sdata , val , client_id , SSEND_PREPEND ) ;
ckdbq_add ( ckp , ID_BLOCK , val ) ;
ckdbq_add ( ckp , ID_BLOCK , val ) ;
}
}