Browse Source

Upstream messages directly from stratifier to connector in trusted mode.

master
Con Kolivas 8 years ago
parent
commit
e011d17321
  1. 14
      src/connector.c
  2. 1
      src/connector.h
  3. 30
      src/stratifier.c

14
src/connector.c

@ -167,6 +167,14 @@ struct connector_data {
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
void connector_upstream_msg(ckpool_t *ckp, char *msg)
{
cdata_t *cdata = ckp->cdata;
LOGDEBUG("Upstreaming %s", msg);
ckmsgq_add(cdata->upstream_sends, msg);
}
/* Increase the reference count of instance */ /* Increase the reference count of instance */
static void __inc_instance_ref(client_instance_t *client) static void __inc_instance_ref(client_instance_t *client)
{ {
@ -1374,12 +1382,6 @@ retry:
json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
ckmsgq_add(cdata->cmpq, val); ckmsgq_add(cdata->cmpq, val);
} else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9);
LOGDEBUG("Upstreaming %s", msg);
ckmsgq_add(cdata->upstream_sends, msg);
goto retry;
} else if (cmdmatch(buf, "dropclient")) { } else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client; client_instance_t *client;

1
src/connector.h

@ -10,6 +10,7 @@
#ifndef CONNECTOR_H #ifndef CONNECTOR_H
#define CONNECTOR_H #define CONNECTOR_H
void connector_upstream_msg(ckpool_t *ckp, char *msg);
void connector_add_message(ckpool_t *ckp, json_t *val); void connector_add_message(ckpool_t *ckp, json_t *val);
void *connector(void *arg); void *connector(void *arg);

30
src/stratifier.c

@ -891,13 +891,13 @@ static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_i
static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type) static void upstream_msgtype(ckpool_t *ckp, const json_t *val, const int msg_type)
{ {
json_t *json_msg = json_deep_copy(val); json_t *json_msg = json_deep_copy(val);
char *buf; char *msg;
json_set_string(json_msg, "method", stratum_msgs[msg_type]); json_set_string(json_msg, "method", stratum_msgs[msg_type]);
ASPRINTF(&buf, "upstream=%s", json_dumps(json_msg, JSON_EOL)); msg = json_dumps(json_msg, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL);
json_decref(json_msg); json_decref(json_msg);
send_proc(ckp->connector, buf); /* Connector absorbs and frees msg */
free(buf); connector_upstream_msg(ckp, msg);
} }
static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb) static void send_node_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb)
@ -3322,11 +3322,11 @@ static json_t *user_stats(const user_instance_t *user)
static void upstream_block(ckpool_t *ckp, const int height, const char *workername, static void upstream_block(ckpool_t *ckp, const int height, const char *workername,
const double diff) const double diff)
{ {
char buf[512]; char *msg;
snprintf(buf, 511, "upstream={\"method\":\"block\",\"workername\":\"%s\",\"diff\":%lf,\"height\":%d,\"name\":\"%s\"}\n", ASPRINTF(&msg, "{\"method\":\"block\",\"workername\":\"%s\",\"diff\":%lf,\"height\":%d,\"name\":\"%s\"}\n",
workername, diff, height, ckp->name); workername, diff, height, ckp->name);
send_proc(ckp->connector, buf); connector_upstream_msg(ckp, msg);
} }
static void block_solve(ckpool_t *ckp, const char *blockhash) static void block_solve(ckpool_t *ckp, const char *blockhash)
@ -5358,11 +5358,11 @@ static double time_bias(const double tdiff, const double period)
static void upstream_shares(ckpool_t *ckp, const char *workername, const int64_t diff, static void upstream_shares(ckpool_t *ckp, const char *workername, const int64_t diff,
const double sdiff) const double sdiff)
{ {
char buf[512]; char *msg;
snprintf(buf, 511, "upstream={\"method\":\"shares\",\"workername\":\"%s\",\"diff\":%"PRId64",\"sdiff\":%lf}\n", ASPRINTF(&msg, "{\"method\":\"shares\",\"workername\":\"%s\",\"diff\":%"PRId64",\"sdiff\":%lf}\n",
workername, diff, sdiff); workername, diff, sdiff);
send_proc(ckp->connector, buf); connector_upstream_msg(ckp, msg);
} }
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with client holding a ref count. */
@ -7323,11 +7323,11 @@ static void dump_log_entries(log_entry_t **entries)
static void upstream_workers(ckpool_t *ckp, user_instance_t *user) static void upstream_workers(ckpool_t *ckp, user_instance_t *user)
{ {
char buf[256]; char *msg;
snprintf(buf, 255, "upstream={\"method\":\"workers\",\"username\":\"%s\",\"workers\":%d}\n", ASPRINTF(&msg, "{\"method\":\"workers\",\"username\":\"%s\",\"workers\":%d}\n",
user->username, user->workers); user->username, user->workers);
send_proc(ckp->connector, buf); connector_upstream_msg(ckp, msg);
} }

Loading…
Cancel
Save