diff --git a/src/connector.c b/src/connector.c index 0c353ecf..3ce2bb2d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -870,7 +870,8 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) } if (unlikely(ckp->node && !id)) { - LOGWARNING("Message for node: %s", buf); + LOGDEBUG("Message for node: %s", buf); + send_proc(ckp->stratifier, buf); free(buf); return; } diff --git a/src/stratifier.c b/src/stratifier.c index 39f3160a..0378b2cf 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -829,11 +829,19 @@ static void send_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb) json_set_string(wb_val, "nbit", wb->nbit); json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "height", wb->height); + json_set_string(wb_val, "flags", wb->flags); json_set_int(wb_val, "transactions", wb->transactions); - json_set_string(wb_val, "txn_data", wb->txn_data); + if (likely(wb->transactions)) + json_set_string(wb_val, "txn_data", wb->txn_data); /* We don't need txn_hashes */ json_set_int(wb_val, "merkles", wb->merkles); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); + json_set_string(wb_val, "coinb1", wb->coinb1); + json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); + json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); + json_set_int(wb_val, "coinb1len", wb->coinb1len); + json_set_int(wb_val, "coinb2len", wb->coinb2len); + json_set_string(wb_val, "coinb2", wb->coinb2); DL_FOREACH(sdata->node_instances, client) { ckmsg_t *client_msg; @@ -949,7 +957,8 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl if (*new_block) purge_share_hashtable(sdata, wb->id); - send_workinfo(ckp, sdata, wb); + if (!ckp->passthrough) + send_workinfo(ckp, sdata, wb); /* Send the aged work message to ckdb once we have dropped the workbase lock * to prevent taking recursive locks */ @@ -1123,6 +1132,67 @@ out: return NULL; } +static void add_node_base(ckpool_t *ckp, json_t *val) +{ + workbase_t *wb = ckzalloc(sizeof(workbase_t)); + sdata_t *sdata = ckp->data; + bool new_block = false; + char header[228]; + + wb->ckp = ckp; + json_strcpy(wb->target, val, "target"); + json_dblcpy(&wb->diff, val, "diff"); + json_uintcpy(&wb->version, val, "version"); + json_uintcpy(&wb->curtime, val, "curtime"); + json_strcpy(wb->prevhash, val, "prevhash"); + json_strcpy(wb->ntime, val, "ntime"); + sscanf(wb->ntime, "%x", &wb->ntime32); + json_strcpy(wb->bbversion, val, "bbversion"); + json_strcpy(wb->nbit, val, "nbit"); + json_uint64cpy(&wb->coinbasevalue, val, "coinbasevalue"); + json_intcpy(&wb->height, val, "height"); + json_strdup(&wb->flags, val, "flags"); + json_intcpy(&wb->transactions, val, "transactions"); + if (wb->transactions) + json_strdup(&wb->txn_data, val, "txn_data"); + json_intcpy(&wb->merkles, val, "merkles"); + wb->merkle_array = json_array(); + if (wb->merkles) { + json_t *arr; + int i; + + arr = json_object_get(val, "merklehash"); + for (i = 0; i < wb->merkles; i++) { + strcpy(&wb->merklehash[i][0], json_string_value(json_array_get(arr, i))); + hex2bin(&wb->merklebin[i][0], &wb->merklehash[i][0], 32); + json_array_append_new(wb->merkle_array, json_string(&wb->merklehash[i][0])); + } + } + json_strdup(&wb->coinb1, val, "coinb1"); + json_intcpy(&wb->coinb1len, val, "coinb1len"); + wb->coinb1bin = ckzalloc(wb->coinb1len); + hex2bin(wb->coinb1bin, wb->coinb1, wb->coinb1len); + json_strdup(&wb->coinb2, val, "coinb2"); + json_intcpy(&wb->coinb2len, val, "coinb2len"); + wb->coinb2bin = ckzalloc(wb->coinb2len); + hex2bin(wb->coinb2bin, wb->coinb2, wb->coinb2len); + json_intcpy(&wb->enonce1varlen, val, "enonce1varlen"); + json_intcpy(&wb->enonce2varlen, val, "enonce2varlen"); + + snprintf(header, 225, "%s%s%s%s%s%s%s", + wb->bbversion, wb->prevhash, + "0000000000000000000000000000000000000000000000000000000000000000", + wb->ntime, wb->nbit, + "00000000", /* nonce */ + workpadding); + LOGDEBUG("Header: %s", header); + hex2bin(wb->headerbin, header, 112); + + add_base(ckp, sdata, wb, &new_block); + if (new_block) + LOGWARNING("Block hash changed to %s", sdata->lastswaphash); +} + static void update_base(ckpool_t *ckp, const int prio) { struct update_req *ur = ckalloc(sizeof(struct update_req)); @@ -3610,7 +3680,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_ } sdata = select_sdata(ckp, ckp_sdata, 0); - if (unlikely(!sdata || !sdata->current_workbase)) { + if (unlikely(!ckp->node && (!sdata || !sdata->current_workbase))) { LOGWARNING("Failed to provide subscription due to no %s", sdata ? "current workbase" : "sdata"); stratum_send_message(ckp_sdata, client, "Pool Initialising"); return json_string("Initialising"); @@ -3647,6 +3717,10 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_ } else client->useragent = ckzalloc(1); + /* We got what we needed */ + if (ckp->node) + return NULL; + if (ckp->proxy) { /* Use the session_id to tell us which user this was. * If it's not available, see if there's an IP address @@ -4306,6 +4380,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ /* We can set this outside of lock safely */ client->authorising = false; out: + LOGWARNING("Parsed %d", ret); return json_boolean(ret); } @@ -5075,7 +5150,7 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const stratum_send_update(sdata, client_id, true); } -static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client) +static void add_mining_node(sdata_t *sdata, stratum_instance_t *client) { client->node = true; @@ -5139,7 +5214,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie /* Add this client as a passthrough in the connector and * add it to the list of mining nodes in the stratifier */ - add_mining_node(ckp, sdata, client); + add_mining_node(sdata, client); snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(ckp->connector, buf); return; @@ -5210,6 +5285,27 @@ static void free_smsg(smsg_t *msg) free(msg); } +static void parse_diff(stratum_instance_t *client, json_t *val) +{ + double diff = json_number_value(json_array_get(val, 0)); + + LOGINFO("Set client %"PRId64" to diff %lf", client->id, diff); + client->diff = diff; +} + +static void parse_subscribe_result(stratum_instance_t *client, json_t *val) +{ + strncpy(client->enonce1, json_string_value(json_array_get(val, 1)), 16); + LOGDEBUG("Client %"PRId64" got enonce1 %s", client->id, client->enonce1); + sprintf(client->enonce1, "%016lx", client->enonce1_64); +} + +static void parse_authorise_result(stratum_instance_t *client, json_t *val) +{ + client->authorised = json_is_true(val); + LOGDEBUG("Client %"PRId64" is %sauthorised", client->id, client->authorised ? "" : "not "); +} + static int node_msg_type(json_t *val) { int i, ret = -1; @@ -5243,14 +5339,64 @@ out: } /* Entered with client holding ref count */ -static void parse_node_msg(json_t *val, const char *buf, stratum_instance_t *client) +static void node_client_msg(ckpool_t *ckp, json_t *val, const char *buf, stratum_instance_t *client) { + json_t *params, *method, *res_val, *id_val, *err_val = NULL; int msg_type = node_msg_type(val); + sdata_t *sdata = ckp->data; + json_params_t *jp; + int errnum; - if (msg_type > -1) - LOGWARNING("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]); - else - LOGWARNING("Missing client %"PRId64" node method from %s", client->id, buf); + if (msg_type < 0) { + LOGERR("Missing client %"PRId64" node method from %s", client->id, buf); + return; + } + LOGWARNING("Got client %"PRId64" node method %d:%s", client->id, msg_type, stratum_msgs[msg_type]); + id_val = json_object_get(val, "id"); + method = json_object_get(val, "method"); + params = json_object_get(val, "params"); + res_val = json_object_get(val, "result"); + switch (msg_type) { + case SM_SHARE: + jp = create_json_params(client->id, method, params, id_val); + ckmsgq_add(sdata->sshareq, jp); + break; + case SM_DIFF: + parse_diff(client, params); + break; + case SM_SUBSCRIBE: + parse_subscribe(client, client->id, params); + break; + case SM_SUBSCRIBERESULT: + parse_subscribe_result(client, res_val); + break; + case SM_AUTH: + parse_authorise(client, params, &err_val, &errnum); + break; + case SM_AUTHRESULT: + parse_authorise_result(client, res_val); + break; + default: + break; + } +} + +static void parse_node_msg(ckpool_t *ckp, json_t *val, const char *buf) +{ + int msg_type = node_msg_type(val); + + if (msg_type < 0) { + LOGERR("Missing node method from %s", buf); + return; + } + LOGDEBUG("Got node method %d:%s", msg_type, stratum_msgs[msg_type]); + switch (msg_type) { + case SM_WORKINFO: + add_node_base(ckp, val); + break; + default: + break; + } } /* Entered with client holding ref count */ @@ -5320,7 +5466,10 @@ static void srecv_process(ckpool_t *ckp, char *buf) msg->json_msg = val; val = json_object_get(msg->json_msg, "client_id"); if (unlikely(!val)) { - LOGWARNING("Failed to extract client_id from connector json smsg %s", buf); + if (ckp->node) + parse_node_msg(ckp, msg->json_msg, buf); + else + LOGWARNING("Failed to extract client_id from connector json smsg %s", buf); json_decref(msg->json_msg); free(msg); goto out; @@ -5374,7 +5523,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); if (ckp->node) - parse_node_msg(msg->json_msg, buf, client); + node_client_msg(ckp, msg->json_msg, buf, client); else parse_instance_msg(ckp, sdata, msg, client); dec_instance_ref(sdata, client);