From 26f1173e74fdd453aedd531b2e67c109a285f1ef Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 29 Jan 2016 12:10:05 +1100 Subject: [PATCH] Change upstream submit to use bkeys, sending hash and data separately WIP --- src/connector.c | 41 +++++++++++++++++++++++++++++++++++------ src/generator.c | 2 +- src/stratifier.c | 42 +++++++++++++++++++++++++++--------------- 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/src/connector.c b/src/connector.c index d1ecc2af..be124b2a 100644 --- a/src/connector.c +++ b/src/connector.c @@ -525,7 +525,7 @@ reparse: client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); client->buf[client->bufofs] = '\0'; - if (!(val = json_loads(msg, 0, NULL))) { + if (!(val = json_loads(msg, JSON_DISABLE_EOF_CHECK, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg); @@ -1150,6 +1150,38 @@ static void ping_upstream(cdata_t *cdata) ckmsgq_add(cdata->upstream_sends, buf); } +static json_t *urecv_loads(const char *buf, const int len) +{ + json_t *val = NULL; + char *bkey = NULL; + int slen; + + slen = strlen(buf); + if (unlikely(!slen)) { + LOGWARNING("Received empty message from upstream pool"); + goto out; + } + val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); + if (unlikely(!val)) { + LOGWARNING("Received non-json msg from upstream pool %s", + buf); + goto out; + } + if (len == slen) + goto out; + bkey = strstr(buf + slen - 5, "bkey\n"); + if (likely(bkey)) { + int blen; + + LOGDEBUG("Bkey found in upstream pool msg"); + blen = len - (bkey - buf); + json_append_bkeys(val, bkey, blen); + } else + LOGWARNING("Non-bkey extranous data from upstream pool msg %s", buf); +out: + return val; +} + static void *urecv_process(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; @@ -1182,12 +1214,9 @@ static void *urecv_process(void *arg) goto nomsg; } alive = true; - val = json_loads(cs->buf, 0, NULL); - if (unlikely(!val)) { - LOGWARNING("Received non-json msg from upstream pool %s", - cs->buf); + val = urecv_loads(cs->buf, ret); + if (unlikely(!val)) goto nomsg; - } method = json_string_value(json_object_get(val, "method")); if (unlikely(!method)) { LOGWARNING("Failed to find method from upstream pool json %s", diff --git a/src/generator.c b/src/generator.c index 95112d14..7f25eccc 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1928,7 +1928,7 @@ static void forward_passthrough_msg(ckpool_t *ckp, char *buf, int len) char *bkey = NULL; if (unlikely(len > slen)) { - bkey = strstr(buf + slen - 5, "bkey"); + bkey = strstr(buf + slen - 5, "bkey\n"); if (bkey) { json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); int blen; diff --git a/src/stratifier.c b/src/stratifier.c index 89a75c8e..386fbaa7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1391,13 +1391,20 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non } } -static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block) -{ - char *buf; - - ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n", - gbt_block); - send_proc(ckp->connector, buf); +static void upstream_blocksubmit(ckpool_t *ckp, const char *hash, const char *data) +{ + char *buf, *bkey = bkey_object(); + uint32_t bkeylen, len; + + bkey_add_hex(bkey, "hash", hash); + bkey_add_hex(bkey, "data", data); + bkeylen = bkey_len(bkey); + ASPRINTF(&buf, "upstream={\"method\":\"submitblock\"}"); + len = strlen(buf); + buf = realloc(buf, round_up_page(len + 1 + bkeylen)); + memcpy(buf + len, bkey, bkeylen); + free(bkey); + send_proc_data(ckp->connector, buf, len + bkeylen); free(buf); } @@ -1414,7 +1421,7 @@ static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const JSON_CPACK(val, "{ss,ss}", "method", "submitblock", - "submitblock", gbt_block); + "submitdata", gbt_block); DL_FOREACH(sdata->remote_instances, client) { ckmsg_t *client_msg; smsg_t *msg; @@ -1478,7 +1485,7 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i realloc_strcat(&gbt_block, wb->txn_data); send_generator(ckp, gbt_block, GEN_PRIORITY); if (ckp->remote) - upstream_blocksubmit(ckp, gbt_block); + upstream_blocksubmit(ckp, blockhash, gbt_block + 12 + 64 + 1); else downstream_blocksubmits(ckp, gbt_block, NULL); free(gbt_block); @@ -5985,18 +5992,23 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf, const stratum_instance_t *client) { - json_t *submitblock_val; - const char *gbt_block; + json_t *hash_val, *data_val; + const char *hash, *data; + char *gbt_block; - submitblock_val = json_object_get(val, "submitblock"); - gbt_block = json_string_value(submitblock_val); - if (unlikely(!gbt_block)) { - LOGWARNING("Failed to get submitblock data from remote message %s", buf); + hash_val = json_object_get(val, "hash"); + hash = json_string_value(hash_val); + data_val = json_object_get(val, "data"); + data = json_string_value(data_val); + if (unlikely(!hash || !data)) { + LOGWARNING("Failed to extract hash and data from remote submitblock msg %s", buf); return; } + ASPRINTF(&gbt_block, "submitblock:%s,%s", hash, data); LOGWARNING("Submitting possible downstream block!"); send_generator(ckp, gbt_block, GEN_PRIORITY); downstream_blocksubmits(ckp, gbt_block, client); + free(gbt_block); } static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)