diff --git a/src/ckpool.c b/src/ckpool.c index a89fbdd8..b3259661 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -891,6 +891,78 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); } +/* This is for blocking sends of json messages */ +bool send_json_msg(connsock_t *cs, const json_t *json_msg) +{ + int len, sent; + char *s; + + s = json_dumps(json_msg, JSON_ESCAPE_SLASH | JSON_EOL); + LOGDEBUG("Sending json msg: %s", s); + len = strlen(s); + sent = write_socket(cs->fd, s, len); + dealloc(s); + if (sent != len) { + LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent); + return false; + } + return true; +} + +/* Decode a string that should have a json message and return just the contents + * of the result key or NULL. */ +static json_t *json_result(json_t *val) +{ + json_t *res_val = NULL, *err_val; + + res_val = json_object_get(val, "result"); + /* (null) is a valid result while no value is an error, so mask out + * (null) and only handle lack of result */ + if (json_is_null(res_val)) + res_val = NULL; + else if (!res_val) { + char *ss; + + err_val = json_object_get(val, "error"); + if (err_val) + ss = json_dumps(err_val, 0); + else + ss = strdup("(unknown reason)"); + + LOGNOTICE("JSON-RPC decode of json_result failed: %s", ss); + free(ss); + } + return res_val; +} + +/* Return the error value if one exists */ +static json_t *json_errval(json_t *val) +{ + json_t *err_val = json_object_get(val, "error"); + + return err_val; +} + +/* Parse a string and return the json value it contains, if any, and the + * result in res_val. Return NULL if no result key is found. */ +json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val) +{ + json_error_t err; + json_t *val; + + *res_val = NULL; + val = json_loads(msg, 0, &err); + if (!val) { + LOGWARNING("Json decode failed(%d): %s", err.line, err.text); + goto out; + } + *res_val = json_result(val); + *err_val = json_errval(val); + +out: + return val; +} + /* Open the file in path, check if there is a pid in there that still exists * and if not, write the pid into that file. */ static bool write_pid(ckpool_t *ckp, const char *path, proc_instance_t *pi, const pid_t pid, const pid_t oldpid) diff --git a/src/ckpool.h b/src/ckpool.h index 16781320..4a5dad9d 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -322,6 +322,8 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co #define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); +bool send_json_msg(connsock_t *cs, const json_t *json_msg); +json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val); void childsighandler(const int sig); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); diff --git a/src/connector.c b/src/connector.c index a440e5a2..fd7f1543 100644 --- a/src/connector.c +++ b/src/connector.c @@ -61,6 +61,9 @@ struct client_instance { /* Are we currently sending a blocked message from this client */ sender_send_t *sending; + /* Is this a trusted remote server */ + bool remote; + /* Is this the parent passthrough client */ bool passthrough; @@ -146,6 +149,10 @@ struct connector_data { redirect_t *redirects; /* What redirect we're currently up to */ int redirect; + + /* Pending sends to the upstream server */ + ckmsgq_t *upstream_sends; + connsock_t upstream_cs; }; typedef struct connector_data cdata_t; @@ -979,6 +986,113 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) send_client(cdata, client->id, buf); } +static void remote_server(cdata_t *cdata, client_instance_t *client) +{ + char *buf; + + LOGWARNING("Connector adding client %"PRId64" %s as remote trusted server", + client->id, client->address_name); + client->remote = true; + ASPRINTF(&buf, "{\"result\": true}\n"); + send_client(cdata, client->id, buf); +} + +static bool connect_upstream(connsock_t *cs) +{ + json_t *req, *val = NULL, *res_val, *err_val; + bool res, ret = false; + float timeout = 10; + + cs->fd = connect_socket(cs->url, cs->port); + if (cs->fd < 0) { + LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port); + goto out; + } + keep_sockalive(cs->fd); + + JSON_CPACK(req, "{ss,s[s]}", + "method", "mining.remote", + "params", PACKAGE"/"VERSION); + res = send_json_msg(cs, req); + json_decref(req); + if (!res) { + LOGWARNING("Failed to send message in connect_upstream"); + goto out; + } + if (read_socket_line(cs, &timeout) < 1) { + LOGWARNING("Failed to receive line in connect_upstream"); + goto out; + } + val = json_msg_result(cs->buf, &res_val, &err_val); + if (!val || !res_val) { + LOGWARNING("Failed to get a json result in connect_upstream, got: %s", + cs->buf); + goto out; + } + ret = json_is_true(res_val); + if (!ret) { + LOGWARNING("Denied upstream trusted connection"); + goto out; + } + LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port); + ret = true; +out: + return ret; +} + +static void usend_process(ckpool_t *ckp, char *buf) +{ + cdata_t *cdata = ckp->data; + connsock_t *cs = &cdata->upstream_cs; + int len, sent; + + if (unlikely(!buf || !strlen(buf))) { + LOGERR("Send empty message to usend_process"); + goto out; + } + LOGDEBUG("Sending upstream msg: %s", buf); + len = strlen(buf); + while (42) { + sent = write_socket(cs->fd, buf, len); + if (sent == len) + break; + if (cs->fd > 0) { + LOGWARNING("Upstream pool failed, attempting reconnect while caching messages"); + Close(cs->fd); + sleep(5); + connect_upstream(cs); + } + } +out: + free(buf); +} + +static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) +{ + connsock_t *cs = &cdata->upstream_cs; + bool ret = false; + + cs->ckp = ckp; + if (!ckp->upstream) { + LOGEMERG("No upstream server set in remote trusted server mode"); + goto out; + } + if (!extract_sockaddr(ckp->upstream, &cs->url, &cs->port)) { + LOGEMERG("Failed to extract upstream address from %s", ckp->upstream); + goto out; + } + + /* Must succeed on initial connect to upstream pool */ + if (!connect_upstream(cs)) { + LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port); + goto out; + } + cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process); + ret = true; +out: + return ret; +} + static void process_client_msg(cdata_t *cdata, const char *buf) { int64_t client_id; @@ -1183,6 +1297,21 @@ retry: } passthrough_client(cdata, client); dec_instance_ref(cdata, client); + } else if (cmdmatch(buf, "remote")) { + client_instance_t *client; + + ret = sscanf(buf, "remote=%"PRId64, &client_id); + if (ret < 0) { + LOGDEBUG("Connector failed to parse remote command: %s", buf); + goto retry; + } + client = ref_client_by_id(cdata, client_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find client id %"PRId64" to add as remote", client_id); + goto retry; + } + remote_server(cdata, client); + dec_instance_ref(cdata, client); } else if (cmdmatch(buf, "getxfd")) { int fdno = -1; @@ -1297,6 +1426,10 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); + if (ckp->remote && !setup_upstream(ckp, cdata)) { + ret = 1; + goto out; + } cklock_init(&cdata->lock); cdata->pi = pi; cdata->nfds = 0; diff --git a/src/generator.c b/src/generator.c index da221781..e55500ab 100644 --- a/src/generator.c +++ b/src/generator.c @@ -421,24 +421,6 @@ out: return ret; } -/* This is for blocking sends of json messages */ -static bool send_json_msg(connsock_t *cs, const json_t *json_msg) -{ - int len, sent; - char *s; - - s = json_dumps(json_msg, JSON_ESCAPE_SLASH | JSON_EOL); - LOGDEBUG("Sending json msg: %s", s); - len = strlen(s); - sent = write_socket(cs->fd, s, len); - dealloc(s); - if (sent != len) { - LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent); - return false; - } - return true; -} - static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy) { if (cs->fd > 0) { @@ -467,60 +449,6 @@ static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy return true; } -/* Decode a string that should have a json message and return just the contents - * of the result key or NULL. */ -static json_t *json_result(json_t *val) -{ - json_t *res_val = NULL, *err_val; - - res_val = json_object_get(val, "result"); - /* (null) is a valid result while no value is an error, so mask out - * (null) and only handle lack of result */ - if (json_is_null(res_val)) - res_val = NULL; - else if (!res_val) { - char *ss; - - err_val = json_object_get(val, "error"); - if (err_val) - ss = json_dumps(err_val, 0); - else - ss = strdup("(unknown reason)"); - - LOGNOTICE("JSON-RPC decode of json_result failed: %s", ss); - free(ss); - } - return res_val; -} - -/* Return the error value if one exists */ -static json_t *json_errval(json_t *val) -{ - json_t *err_val = json_object_get(val, "error"); - - return err_val; -} - -/* Parse a string and return the json value it contains, if any, and the - * result in res_val. Return NULL if no result key is found. */ -static json_t *json_msg_result(char *msg, json_t **res_val, json_t **err_val) -{ - json_error_t err; - json_t *val; - - *res_val = NULL; - val = json_loads(msg, 0, &err); - if (!val) { - LOGWARNING("Json decode failed(%d): %s", err.line, err.text); - goto out; - } - *res_val = json_result(val); - *err_val = json_errval(val); - -out: - return val; -} - /* For some reason notify is buried at various different array depths so use * a reentrant function to try and find it. */ static json_t *find_notify(json_t *val) diff --git a/src/stratifier.c b/src/stratifier.c index 223e5f34..2e10c723 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5241,6 +5241,23 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie return; } + if (unlikely(cmdmatch(method, "mining.remote"))) { + char buf[256]; + + /* Add this client as a trusted remote node in the connector and + * drop the client in the stratifier */ + if (!ckp->trusted[client->server]) { + LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as remote node on non trusted server %d", + client_id, client->address, client->server); + connector_drop_client(ckp, client_id); + } else { + snprintf(buf, 255, "remote=%"PRId64, client_id); + send_proc(ckp->connector, buf); + } + drop_client(ckp, sdata, client_id); + return; + } + if (unlikely(cmdmatch(method, "mining.node"))) { char buf[256];