From da636b26dfc43e94c3756f66f7b1d4f503a411cf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 25 Jan 2016 11:47:38 +1100 Subject: [PATCH] Bkey add WIP --- src/connector.c | 13 +++++++++++-- src/libckpool.c | 45 +++++++++++++++++++++++++++++++++++++-------- src/libckpool.h | 7 ++++++- src/stratifier.c | 34 ++++++++++++++++++++++++++-------- 4 files changed, 80 insertions(+), 19 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2b69e170..daf5eb59 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1104,17 +1104,26 @@ out: return ret; } -static void process_client_msg(cdata_t *cdata, const char *buf) +static void process_client_msg(cdata_t *cdata, char *buf) { + char *msg, *bkey = NULL; int64_t client_id; json_t *json_msg; - char *msg; + int len; + len = strlen(buf); + if (len > 4 && !strncmp((buf + len - 4 - 1), "bkey", 4)) { + LOGWARNING("Bkey found in process_client_msg"); + buf[len - 4 - 1] = '\0'; + bkey = buf + len + 1; + } json_msg = json_loads(buf, 0, NULL); if (unlikely(!json_msg)) { LOGWARNING("Invalid json message in process_client_msg: %s", buf); return; } + if (unlikely(bkey)) + json_append_bkeys(json_msg, bkey); /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); diff --git a/src/libckpool.c b/src/libckpool.c index 2eca670c..79939eb9 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1260,10 +1260,7 @@ out: * append further key:len:data combinations */ -#define BKEY_LENOFS 6 -#define BKEY_LENLEN 4 - -static inline uint32_t *bkey_lenptr(char *bkey) +static inline uint32_t *bkey_lenptr(const char *bkey) { return (uint32_t *)(bkey + BKEY_LENOFS); } @@ -1281,7 +1278,7 @@ char *bkey_object(void) } /* Extract bkey length */ -uint32_t bkey_len(char *bkey) +uint32_t bkey_len(const char *bkey) { uint32_t *lenptr = bkey_lenptr(bkey); return le32toh(*lenptr); @@ -1339,7 +1336,8 @@ void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *fi } void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line) -{ uint32_t msglen, *lenptr, newlen; +{ + uint32_t msglen, *lenptr, newlen; int len; if (unlikely(!*bkey || !key || !bin)) { @@ -1362,7 +1360,7 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen len += 1; /* Get current message length */ - lenptr = (uint32_t *)*bkey; + lenptr = bkey_lenptr(*bkey); msglen = le32toh(*lenptr); /* Add $key+length+bin */ @@ -1382,10 +1380,41 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen memcpy(*bkey + msglen, bin, blen); /* Adjust message length header */ - lenptr = (uint32_t *)*bkey; + lenptr = bkey_lenptr(*bkey); *lenptr = htole32(newlen); } +void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const char *func, const int line) +{ + uint32_t ofs = BKEY_LENOFS + BKEY_LENLEN; + uint32_t msglen; + + msglen = bkey_len(bkey); + if (unlikely(!msglen || msglen > 0x80000000)) { + LOGWARNING("Invalid msglen %u sent to json_append_bkey from %s %s:%d", + msglen, file, func, line); + return; + } + while (ofs < msglen) { + uint32_t binlen, *lenptr; + const char *key; + char *hex; + + key = bkey + ofs; + LOGWARNING("Found key %s", key); + ofs += strlen(key) + 1; + lenptr = (uint32_t *)(bkey + ofs); + binlen = le32toh(*lenptr); + ofs += BKEY_LENLEN; + LOGWARNING("Found binlen %u", binlen); + hex = bin2hex(bkey + ofs, binlen); + LOGWARNING("Found hex %s", hex); + json_set_string(val, key, hex); + free(hex); + ofs += binlen; + } +} + void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line) { diff --git a/src/libckpool.h b/src/libckpool.h index 2a0cbc66..097b2751 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -319,12 +319,17 @@ struct unixsock { typedef struct unixsock unixsock_t; +#define BKEY_LENOFS 6 +#define BKEY_LENLEN 4 + char *bkey_object(void); -uint32_t bkey_len(char *bkey); +uint32_t bkey_len(const char *bkey); void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *file, const char *func, const int line); #define bkey_add_hex(bkey, key, hex) _bkey_add_hex(&(bkey), key, hex, __FILE__, __func__, __LINE__) void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line); #define bkey_add_bin(bkey, key, bin) _bkey_add_bin(&(bkey), key, bin, __FILE__, __func__, __LINE__) +void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const char *func, const int line); +#define json_append_bkeys(val, bkey) _json_append_bkeys(val, bkey, __FILE__, __func__, __LINE__) void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); diff --git a/src/stratifier.c b/src/stratifier.c index edd8b5b9..043392bc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -154,6 +154,10 @@ typedef struct json_params json_params_t; struct smsg { json_t *json_msg; int64_t client_id; + + /* bkey data if any */ + char *bkey; + uint32_t bkeylen; }; typedef struct smsg smsg_t; @@ -863,31 +867,34 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) ck_rlock(&sdata->instance_lock); if (sdata->node_instances) { json_t *wb_val = json_object(); + char *bkey = bkey_object(); + uint32_t bkeylen; json_set_int(wb_val, "jobid", wb->id); - json_set_string(wb_val, "target", wb->target); + bkey_add_hex(bkey, "target", wb->target); json_set_double(wb_val, "diff", wb->diff); json_set_int(wb_val, "version", wb->version); json_set_int(wb_val, "curtime", wb->curtime); - json_set_string(wb_val, "prevhash", wb->prevhash); - json_set_string(wb_val, "ntime", wb->ntime); - json_set_string(wb_val, "bbversion", wb->bbversion); - json_set_string(wb_val, "nbit", wb->nbit); + bkey_add_hex(bkey, "prevhash", wb->prevhash); + bkey_add_hex(bkey, "ntime", wb->ntime); + bkey_add_hex(bkey, "bbversion", wb->bbversion); + bkey_add_hex(bkey, "nbit", wb->nbit); json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "height", wb->height); json_set_string(wb_val, "flags", wb->flags); json_set_int(wb_val, "transactions", wb->transactions); if (likely(wb->transactions)) - json_set_string(wb_val, "txn_data", wb->txn_data); + bkey_add_hex(bkey, "txn_data", wb->txn_data); /* We don't need txn_hashes */ json_set_int(wb_val, "merkles", wb->merkles); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); - json_set_string(wb_val, "coinb1", wb->coinb1); + bkey_add_hex(bkey, "coinb1", wb->coinb1); json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); json_set_int(wb_val, "coinb1len", wb->coinb1len); json_set_int(wb_val, "coinb2len", wb->coinb2len); - json_set_string(wb_val, "coinb2", wb->coinb2); + bkey_add_hex(bkey, "coinb2", wb->coinb2); + bkeylen = bkey_len(bkey); DL_FOREACH(sdata->node_instances, client) { ckmsg_t *client_msg; @@ -899,11 +906,15 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb) msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = json_msg; msg->client_id = client->id; + msg->bkey = ckalloc(bkeylen); + memcpy(msg->bkey, bkey, bkeylen); + msg->bkeylen = bkeylen; client_msg->data = msg; DL_APPEND(bulk_send, client_msg); messages++; } json_decref(wb_val); + free(bkey); } ck_runlock(&sdata->instance_lock); @@ -6161,6 +6172,13 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, JSON_COMPACT); + if (msg->bkeylen) { + int len = strlen(s); + + s = realloc(s, len + msg->bkeylen); + memcpy(s + len, msg->bkey, msg->bkeylen); + free(msg->bkey); + } send_proc(ckp->connector, s); free(s); free_smsg(msg);