Browse Source

Do initial parsing of subscribe/auth in node mode

master
Con Kolivas 9 years ago
parent
commit
3dc070fcc2
  1. 3
      src/connector.c
  2. 173
      src/stratifier.c

3
src/connector.c

@ -870,7 +870,8 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
}
if (unlikely(ckp->node && !id)) {
LOGWARNING("Message for node: %s", buf);
LOGDEBUG("Message for node: %s", buf);
send_proc(ckp->stratifier, buf);
free(buf);
return;
}

173
src/stratifier.c

@ -829,11 +829,19 @@ static void send_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb)
json_set_string(wb_val, "nbit", wb->nbit);
json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue);
json_set_int(wb_val, "height", wb->height);
json_set_string(wb_val, "flags", wb->flags);
json_set_int(wb_val, "transactions", wb->transactions);
json_set_string(wb_val, "txn_data", wb->txn_data);
if (likely(wb->transactions))
json_set_string(wb_val, "txn_data", wb->txn_data);
/* We don't need txn_hashes */
json_set_int(wb_val, "merkles", wb->merkles);
json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array));
json_set_string(wb_val, "coinb1", wb->coinb1);
json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen);
json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen);
json_set_int(wb_val, "coinb1len", wb->coinb1len);
json_set_int(wb_val, "coinb2len", wb->coinb2len);
json_set_string(wb_val, "coinb2", wb->coinb2);
DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg;
@ -949,7 +957,8 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
if (*new_block)
purge_share_hashtable(sdata, wb->id);
send_workinfo(ckp, sdata, wb);
if (!ckp->passthrough)
send_workinfo(ckp, sdata, wb);
/* Send the aged work message to ckdb once we have dropped the workbase lock
* to prevent taking recursive locks */
@ -1123,6 +1132,67 @@ out:
return NULL;
}
static void add_node_base(ckpool_t *ckp, json_t *val)
{
workbase_t *wb = ckzalloc(sizeof(workbase_t));
sdata_t *sdata = ckp->data;
bool new_block = false;
char header[228];
wb->ckp = ckp;
json_strcpy(wb->target, val, "target");
json_dblcpy(&wb->diff, val, "diff");
json_uintcpy(&wb->version, val, "version");
json_uintcpy(&wb->curtime, val, "curtime");
json_strcpy(wb->prevhash, val, "prevhash");
json_strcpy(wb->ntime, val, "ntime");
sscanf(wb->ntime, "%x", &wb->ntime32);
json_strcpy(wb->bbversion, val, "bbversion");
json_strcpy(wb->nbit, val, "nbit");
json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue");
json_intcpy(&wb->height, val, "height");
json_strdup(&wb->flags, val, "flags");
json_intcpy(&wb->transactions, val, "transactions");
if (wb->transactions)
json_strdup(&wb->txn_data, val, "txn_data");
json_intcpy(&wb->merkles, val, "merkles");
wb->merkle_array = json_array();
if (wb->merkles) {
json_t *arr;
int i;
arr = json_object_get(val, "merklehash");
for (i = 0; i < wb->merkles; i++) {
strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(arr, i)));
hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32);
json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[i][0]));
}
}
json_strdup(&wb->coinb1, val, "coinb1");
json_intcpy(&wb->coinb1len, val, "coinb1len");
wb->coinb1bin = ckzalloc(wb->coinb1len);
hex2bin(wb->coinb1bin, wb->coinb1, wb->coinb1len);
json_strdup(&wb->coinb2, val, "coinb2");
json_intcpy(&wb->coinb2len, val, "coinb2len");
wb->coinb2bin = ckzalloc(wb->coinb2len);
hex2bin(wb->coinb2bin, wb->coinb2, wb->coinb2len);
json_intcpy(&wb->enonce1varlen, val, "enonce1varlen");
json_intcpy(&wb->enonce2varlen, val, "enonce2varlen");
snprintf(header, 225, "%s%s%s%s%s%s%s",
wb->bbversion, wb->prevhash,
"0000000000000000000000000000000000000000000000000000000000000000",
wb->ntime, wb->nbit,
"00000000", /* nonce */
workpadding);
LOGDEBUG("Header: %s", header);
hex2bin(wb->headerbin, header, 112);
add_base(ckp, sdata, wb, &new_block);
if (new_block)
LOGWARNING("Block hash changed to %s", sdata->lastswaphash);
}
static void update_base(ckpool_t *ckp, const int prio)
{
struct update_req *ur = ckalloc(sizeof(struct update_req));
@ -3610,7 +3680,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
}
sdata = select_sdata(ckp, ckp_sdata, 0);
if (unlikely(!sdata || !sdata->current_workbase)) {
if (unlikely(!ckp->node && (!sdata || !sdata->current_workbase))) {
LOGWARNING("Failed to provide subscription due to no %s", sdata ? "current workbase" : "sdata");
stratum_send_message(ckp_sdata, client, "Pool Initialising");
return json_string("Initialising");
@ -3647,6 +3717,10 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
} else
client->useragent = ckzalloc(1);
/* We got what we needed */
if (ckp->node)
return NULL;
if (ckp->proxy) {
/* Use the session_id to tell us which user this was.
* If it's not available, see if there's an IP address
@ -4306,6 +4380,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
/* We can set this outside of lock safely */
client->authorising = false;
out:
LOGWARNING("Parsed %d", ret);
return json_boolean(ret);
}
@ -5075,7 +5150,7 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const
stratum_send_update(sdata, client_id, true);
}
static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client)
static void add_mining_node(sdata_t *sdata, stratum_instance_t *client)
{
client->node = true;
@ -5139,7 +5214,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */
add_mining_node(ckp, sdata, client);
add_mining_node(sdata, client);
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf);
return;
@ -5210,6 +5285,27 @@ static void free_smsg(smsg_t *msg)
free(msg);
}
static void parse_diff(stratum_instance_t *client, json_t *val)
{
double diff = json_number_value(json_array_get(val, 0));
LOGINFO("Set client %"PRId64" to diff %lf", client->id, diff);
client->diff = diff;
}
static void parse_subscribe_result(stratum_instance_t *client, json_t *val)
{
strncpy(client->enonce1, json_string_value(json_array_get(val, 1)), 16);
LOGDEBUG("Client %"PRId64" got enonce1 %s", client->id, client->enonce1);
sprintf(client->enonce1, "%016lx", client->enonce1_64);
}
static void parse_authorise_result(stratum_instance_t *client, json_t *val)
{
client->authorised = json_is_true(val);
LOGDEBUG("Client %"PRId64" is %sauthorised", client->id, client->authorised ? "" : "not ");
}
static int node_msg_type(json_t *val)
{
int i, ret = -1;
@ -5243,14 +5339,64 @@ out:
}
/* Entered with client holding ref count */
static void parse_node_msg(json_t *val, const char *buf, stratum_instance_t *client)
static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client)
{
json_t *params, *method, *res_val, *id_val, *err_val = NULL;
int msg_type = node_msg_type(val);
sdata_t *sdata = ckp->data;
json_params_t *jp;
int errnum;
if (msg_type > -1)
LOGWARNING("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]);
else
LOGWARNING("Missing client %"PRId64" node method from %s", client->id, buf);
if (msg_type < 0) {
LOGERR("Missing client %"PRId64" node method from %s", client->id, buf);
return;
}
LOGWARNING("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]);
id_val = json_object_get(val, "id");
method = json_object_get(val, "method");
params = json_object_get(val, "params");
res_val = json_object_get(val, "result");
switch (msg_type) {
case SM_SHARE:
jp = create_json_params(client->id, method, params, id_val);
ckmsgq_add(sdata->sshareq, jp);
break;
case SM_DIFF:
parse_diff(client, params);
break;
case SM_SUBSCRIBE:
parse_subscribe(client, client->id, params);
break;
case SM_SUBSCRIBERESULT:
parse_subscribe_result(client, res_val);
break;
case SM_AUTH:
parse_authorise(client, params, &err_val, &errnum);
break;
case SM_AUTHRESULT:
parse_authorise_result(client, res_val);
break;
default:
break;
}
}
static void parse_node_msg(ckpool_t *ckp, json_t *val, const char *buf)
{
int msg_type = node_msg_type(val);
if (msg_type < 0) {
LOGERR("Missing node method from %s", buf);
return;
}
LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]);
switch (msg_type) {
case SM_WORKINFO:
add_node_base(ckp, val);
break;
default:
break;
}
}
/* Entered with client holding ref count */
@ -5320,7 +5466,10 @@ static void srecv_process(ckpool_t *ckp, char *buf)
msg->json_msg = val;
val = json_object_get(msg->json_msg, "client_id");
if (unlikely(!val)) {
LOGWARNING("Failed to extract client_id from connector json smsg %s", buf);
if (ckp->node)
parse_node_msg(ckp, msg->json_msg, buf);
else
LOGWARNING("Failed to extract client_id from connector json smsg %s", buf);
json_decref(msg->json_msg);
free(msg);
goto out;
@ -5374,7 +5523,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
if (ckp->node)
parse_node_msg(msg->json_msg, buf, client);
node_client_msg(ckp, msg->json_msg, buf, client);
else
parse_instance_msg(ckp, sdata, msg, client);
dec_instance_ref(sdata, client);

Loading…
Cancel
Save