Browse Source

Bkey add WIP

master
Con Kolivas 9 years ago
parent
commit
da636b26df
  1. 13
      src/connector.c
  2. 45
      src/libckpool.c
  3. 7
      src/libckpool.h
  4. 34
      src/stratifier.c

13
src/connector.c

@ -1104,17 +1104,26 @@ out:
return ret; return ret;
} }
static void process_client_msg(cdata_t *cdata, const char *buf) static void process_client_msg(cdata_t *cdata, char *buf)
{ {
char *msg, *bkey = NULL;
int64_t client_id; int64_t client_id;
json_t *json_msg; json_t *json_msg;
char *msg; int len;
len = strlen(buf);
if (len > 4 && !strncmp((buf + len - 4 - 1), "bkey", 4)) {
LOGWARNING("Bkey found in process_client_msg");
buf[len - 4 - 1] = '\0';
bkey = buf + len + 1;
}
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }
if (unlikely(bkey))
json_append_bkeys(json_msg, bkey);
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));

45
src/libckpool.c

@ -1260,10 +1260,7 @@ out:
* append further key:len:data combinations * append further key:len:data combinations
*/ */
#define BKEY_LENOFS 6 static inline uint32_t *bkey_lenptr(const char *bkey)
#define BKEY_LENLEN 4
static inline uint32_t *bkey_lenptr(char *bkey)
{ {
return (uint32_t *)(bkey + BKEY_LENOFS); return (uint32_t *)(bkey + BKEY_LENOFS);
} }
@ -1281,7 +1278,7 @@ char *bkey_object(void)
} }
/* Extract bkey length */ /* Extract bkey length */
uint32_t bkey_len(char *bkey) uint32_t bkey_len(const char *bkey)
{ {
uint32_t *lenptr = bkey_lenptr(bkey); uint32_t *lenptr = bkey_lenptr(bkey);
return le32toh(*lenptr); return le32toh(*lenptr);
@ -1339,7 +1336,8 @@ void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *fi
} }
void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line) void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line)
{ uint32_t msglen, *lenptr, newlen; {
uint32_t msglen, *lenptr, newlen;
int len; int len;
if (unlikely(!*bkey || !key || !bin)) { if (unlikely(!*bkey || !key || !bin)) {
@ -1362,7 +1360,7 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen
len += 1; len += 1;
/* Get current message length */ /* Get current message length */
lenptr = (uint32_t *)*bkey; lenptr = bkey_lenptr(*bkey);
msglen = le32toh(*lenptr); msglen = le32toh(*lenptr);
/* Add $key+length+bin */ /* Add $key+length+bin */
@ -1382,10 +1380,41 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen
memcpy(*bkey + msglen, bin, blen); memcpy(*bkey + msglen, bin, blen);
/* Adjust message length header */ /* Adjust message length header */
lenptr = (uint32_t *)*bkey; lenptr = bkey_lenptr(*bkey);
*lenptr = htole32(newlen); *lenptr = htole32(newlen);
} }
void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const char *func, const int line)
{
uint32_t ofs = BKEY_LENOFS + BKEY_LENLEN;
uint32_t msglen;
msglen = bkey_len(bkey);
if (unlikely(!msglen || msglen > 0x80000000)) {
LOGWARNING("Invalid msglen %u sent to json_append_bkey from %s %s:%d",
msglen, file, func, line);
return;
}
while (ofs < msglen) {
uint32_t binlen, *lenptr;
const char *key;
char *hex;
key = bkey + ofs;
LOGWARNING("Found key %s", key);
ofs += strlen(key) + 1;
lenptr = (uint32_t *)(bkey + ofs);
binlen = le32toh(*lenptr);
ofs += BKEY_LENLEN;
LOGWARNING("Found binlen %u", binlen);
hex = bin2hex(bkey + ofs, binlen);
LOGWARNING("Found hex %s", hex);
json_set_string(val, key, hex);
free(hex);
ofs += binlen;
}
}
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line) void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line)
{ {

7
src/libckpool.h

@ -319,12 +319,17 @@ struct unixsock {
typedef struct unixsock unixsock_t; typedef struct unixsock unixsock_t;
#define BKEY_LENOFS 6
#define BKEY_LENLEN 4
char *bkey_object(void); char *bkey_object(void);
uint32_t bkey_len(char *bkey); uint32_t bkey_len(const char *bkey);
void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *file, const char *func, const int line); void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *file, const char *func, const int line);
#define bkey_add_hex(bkey, key, hex) _bkey_add_hex(&(bkey), key, hex, __FILE__, __func__, __LINE__) #define bkey_add_hex(bkey, key, hex) _bkey_add_hex(&(bkey), key, hex, __FILE__, __func__, __LINE__)
void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line); void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen, const char *file, const char *func, const int line);
#define bkey_add_bin(bkey, key, bin) _bkey_add_bin(&(bkey), key, bin, __FILE__, __func__, __LINE__) #define bkey_add_bin(bkey, key, bin) _bkey_add_bin(&(bkey), key, bin, __FILE__, __func__, __LINE__)
void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const char *func, const int line);
#define json_append_bkeys(val, bkey) _json_append_bkeys(val, bkey, __FILE__, __func__, __LINE__)
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line);

34
src/stratifier.c

@ -154,6 +154,10 @@ typedef struct json_params json_params_t;
struct smsg { struct smsg {
json_t *json_msg; json_t *json_msg;
int64_t client_id; int64_t client_id;
/* bkey data if any */
char *bkey;
uint32_t bkeylen;
}; };
typedef struct smsg smsg_t; typedef struct smsg smsg_t;
@ -863,31 +867,34 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
if (sdata->node_instances) { if (sdata->node_instances) {
json_t *wb_val = json_object(); json_t *wb_val = json_object();
char *bkey = bkey_object();
uint32_t bkeylen;
json_set_int(wb_val, "jobid", wb->id); json_set_int(wb_val, "jobid", wb->id);
json_set_string(wb_val, "target", wb->target); bkey_add_hex(bkey, "target", wb->target);
json_set_double(wb_val, "diff", wb->diff); json_set_double(wb_val, "diff", wb->diff);
json_set_int(wb_val, "version", wb->version); json_set_int(wb_val, "version", wb->version);
json_set_int(wb_val, "curtime", wb->curtime); json_set_int(wb_val, "curtime", wb->curtime);
json_set_string(wb_val, "prevhash", wb->prevhash); bkey_add_hex(bkey, "prevhash", wb->prevhash);
json_set_string(wb_val, "ntime", wb->ntime); bkey_add_hex(bkey, "ntime", wb->ntime);
json_set_string(wb_val, "bbversion", wb->bbversion); bkey_add_hex(bkey, "bbversion", wb->bbversion);
json_set_string(wb_val, "nbit", wb->nbit); bkey_add_hex(bkey, "nbit", wb->nbit);
json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue); json_set_int(wb_val, "coinbasevalue", wb->coinbasevalue);
json_set_int(wb_val, "height", wb->height); json_set_int(wb_val, "height", wb->height);
json_set_string(wb_val, "flags", wb->flags); json_set_string(wb_val, "flags", wb->flags);
json_set_int(wb_val, "transactions", wb->transactions); json_set_int(wb_val, "transactions", wb->transactions);
if (likely(wb->transactions)) if (likely(wb->transactions))
json_set_string(wb_val, "txn_data", wb->txn_data); bkey_add_hex(bkey, "txn_data", wb->txn_data);
/* We don't need txn_hashes */ /* We don't need txn_hashes */
json_set_int(wb_val, "merkles", wb->merkles); json_set_int(wb_val, "merkles", wb->merkles);
json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array)); json_object_set_new_nocheck(wb_val, "merklehash", json_deep_copy(wb->merkle_array));
json_set_string(wb_val, "coinb1", wb->coinb1); bkey_add_hex(bkey, "coinb1", wb->coinb1);
json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen); json_set_int(wb_val, "enonce1varlen", wb->enonce1varlen);
json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen); json_set_int(wb_val, "enonce2varlen", wb->enonce2varlen);
json_set_int(wb_val, "coinb1len", wb->coinb1len); json_set_int(wb_val, "coinb1len", wb->coinb1len);
json_set_int(wb_val, "coinb2len", wb->coinb2len); json_set_int(wb_val, "coinb2len", wb->coinb2len);
json_set_string(wb_val, "coinb2", wb->coinb2); bkey_add_hex(bkey, "coinb2", wb->coinb2);
bkeylen = bkey_len(bkey);
DL_FOREACH(sdata->node_instances, client) { DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg; ckmsg_t *client_msg;
@ -899,11 +906,15 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
msg = ckzalloc(sizeof(smsg_t)); msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg; msg->json_msg = json_msg;
msg->client_id = client->id; msg->client_id = client->id;
msg->bkey = ckalloc(bkeylen);
memcpy(msg->bkey, bkey, bkeylen);
msg->bkeylen = bkeylen;
client_msg->data = msg; client_msg->data = msg;
DL_APPEND(bulk_send, client_msg); DL_APPEND(bulk_send, client_msg);
messages++; messages++;
} }
json_decref(wb_val); json_decref(wb_val);
free(bkey);
} }
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
@ -6161,6 +6172,13 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */ * connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, JSON_COMPACT); s = json_dumps(msg->json_msg, JSON_COMPACT);
if (msg->bkeylen) {
int len = strlen(s);
s = realloc(s, len + msg->bkeylen);
memcpy(s + len, msg->bkey, msg->bkeylen);
free(msg->bkey);
}
send_proc(ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
free_smsg(msg); free_smsg(msg);

Loading…
Cancel
Save