diff --git a/src/connector.c b/src/connector.c index 6e3451d4..281a4a87 100644 --- a/src/connector.c +++ b/src/connector.c @@ -167,6 +167,14 @@ struct connector_data { typedef struct connector_data cdata_t; +void connector_upstream_msg(ckpool_t *ckp, char *msg) +{ + cdata_t *cdata = ckp->cdata; + + LOGDEBUG("Upstreaming %s", msg); + ckmsgq_add(cdata->upstream_sends, msg); +} + /* Increase the reference count of instance */ static void __inc_instance_ref(client_instance_t *client) { @@ -1374,12 +1382,6 @@ retry: json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); ckmsgq_add(cdata->cmpq, val); - } else if (cmdmatch(buf, "upstream=")) { - char *msg = strdup(buf + 9); - - LOGDEBUG("Upstreaming %s", msg); - ckmsgq_add(cdata->upstream_sends, msg); - goto retry; } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; diff --git a/src/connector.h b/src/connector.h index 6a3be5f1..03146e63 100644 --- a/src/connector.h +++ b/src/connector.h @@ -10,6 +10,7 @@ #ifndef CONNECTOR_H #define CONNECTOR_H +void connector_upstream_msg(ckpool_t *ckp, char *msg); void connector_add_message(ckpool_t *ckp, json_t *val); void *connector(void *arg); diff --git a/src/stratifier.c b/src/stratifier.c index a5379af1..251d474c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -891,13 +891,13 @@ static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_i static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type) { json_t *json_msg = json_deep_copy(val); - char *buf; + char *msg; json_set_string(json_msg, "method", stratum_msgs[msg_type]); - ASPRINTF(&buf, "upstream=%s", json_dumps(json_msg, JSON_EOL)); + msg = json_dumps(json_msg, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); json_decref(json_msg); - send_proc(ckp->connector, buf); - free(buf); + /* Connector absorbs and frees msg */ + connector_upstream_msg(ckp, msg); } static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb) @@ -3322,11 +3322,11 @@ static json_t *user_stats(const user_instance_t *user) static void upstream_block(ckpool_t *ckp, const int height, const char *workername, const double diff) { - char buf[512]; + char *msg; - snprintf(buf, 511, "upstream={\"method\":\"block\",\"workername\":\"%s\",\"diff\":%lf,\"height\":%d,\"name\":\"%s\"}\n", + ASPRINTF(&msg, "{\"method\":\"block\",\"workername\":\"%s\",\"diff\":%lf,\"height\":%d,\"name\":\"%s\"}\n", workername, diff, height, ckp->name); - send_proc(ckp->connector, buf); + connector_upstream_msg(ckp, msg); } static void block_solve(ckpool_t *ckp, const char *blockhash) @@ -5358,11 +5358,11 @@ static double time_bias(const double tdiff, const double period) static void upstream_shares(ckpool_t *ckp, const char *workername, const int64_t diff, const double sdiff) { - char buf[512]; + char *msg; - snprintf(buf, 511, "upstream={\"method\":\"shares\",\"workername\":\"%s\",\"diff\":%"PRId64",\"sdiff\":%lf}\n", - workername, diff, sdiff); - send_proc(ckp->connector, buf); + ASPRINTF(&msg, "{\"method\":\"shares\",\"workername\":\"%s\",\"diff\":%"PRId64",\"sdiff\":%lf}\n", + workername, diff, sdiff); + connector_upstream_msg(ckp, msg); } /* Needs to be entered with client holding a ref count. */ @@ -7323,11 +7323,11 @@ static void dump_log_entries(log_entry_t **entries) static void upstream_workers(ckpool_t *ckp, user_instance_t *user) { - char buf[256]; + char *msg; - snprintf(buf, 255, "upstream={\"method\":\"workers\",\"username\":\"%s\",\"workers\":%d}\n", - user->username, user->workers); - send_proc(ckp->connector, buf); + ASPRINTF(&msg, "{\"method\":\"workers\",\"username\":\"%s\",\"workers\":%d}\n", + user->username, user->workers); + connector_upstream_msg(ckp, msg); }