Browse Source

Add connection mechanism and message workqueues for trusted remotes

master
Con Kolivas 9 years ago
parent
commit
75a9e3ec3c
  1. 72
      src/ckpool.c
  2. 2
      src/ckpool.h
  3. 133
      src/connector.c
  4. 72
      src/generator.c
  5. 17
      src/stratifier.c

72
src/ckpool.c

@ -891,6 +891,78 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid
quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid);
} }
/* This is for blocking sends of json messages */
bool send_json_msg(connsock_t *cs, const json_t *json_msg)
{
int len, sent;
char *s;
s = json_dumps(json_msg, JSON_ESCAPE_SLASH | JSON_EOL);
LOGDEBUG("Sending json msg: %s", s);
len = strlen(s);
sent = write_socket(cs->fd, s, len);
dealloc(s);
if (sent != len) {
LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent);
return false;
}
return true;
}
/* Decode a string that should have a json message and return just the contents
* of the result key or NULL. */
static json_t *json_result(json_t *val)
{
json_t *res_val = NULL, *err_val;
res_val = json_object_get(val, "result");
/* (null) is a valid result while no value is an error, so mask out
* (null) and only handle lack of result */
if (json_is_null(res_val))
res_val = NULL;
else if (!res_val) {
char *ss;
err_val = json_object_get(val, "error");
if (err_val)
ss = json_dumps(err_val, 0);
else
ss = strdup("(unknown reason)");
LOGNOTICE("JSON-RPC decode of json_result failed: %s", ss);
free(ss);
}
return res_val;
}
/* Return the error value if one exists */
static json_t *json_errval(json_t *val)
{
json_t *err_val = json_object_get(val, "error");
return err_val;
}
/* Parse a string and return the json value it contains, if any, and the
* result in res_val. Return NULL if no result key is found. */
json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val)
{
json_error_t err;
json_t *val;
*res_val = NULL;
val = json_loads(msg, 0, &err);
if (!val) {
LOGWARNING("Json decode failed(%d): %s", err.line, err.text);
goto out;
}
*res_val = json_result(val);
*err_val = json_errval(val);
out:
return val;
}
/* Open the file in path, check if there is a pid in there that still exists /* Open the file in path, check if there is a pid in there that still exists
* and if not, write the pid into that file. */ * and if not, write the pid into that file. */
static bool write_pid(ckpool_t *ckp, const char *path, proc_instance_t *pi, const pid_t pid, const pid_t oldpid) static bool write_pid(ckpool_t *ckp, const char *path, proc_instance_t *pi, const pid_t pid, const pid_t oldpid)

2
src/ckpool.h

@ -322,6 +322,8 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co
#define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__) #define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
bool send_json_msg(connsock_t *cs, const json_t *json_msg);
json_t *json_msg_result(const char *msg, json_t **res_val, json_t **err_val);
void childsighandler(const int sig); void childsighandler(const int sig);
int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret);

133
src/connector.c

@ -61,6 +61,9 @@ struct client_instance {
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
sender_send_t *sending; sender_send_t *sending;
/* Is this a trusted remote server */
bool remote;
/* Is this the parent passthrough client */ /* Is this the parent passthrough client */
bool passthrough; bool passthrough;
@ -146,6 +149,10 @@ struct connector_data {
redirect_t *redirects; redirect_t *redirects;
/* What redirect we're currently up to */ /* What redirect we're currently up to */
int redirect; int redirect;
/* Pending sends to the upstream server */
ckmsgq_t *upstream_sends;
connsock_t upstream_cs;
}; };
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
@ -979,6 +986,113 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }
static void remote_server(cdata_t *cdata, client_instance_t *client)
{
char *buf;
LOGWARNING("Connector adding client %"PRId64" %s as remote trusted server",
client->id, client->address_name);
client->remote = true;
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(cdata, client->id, buf);
}
static bool connect_upstream(connsock_t *cs)
{
json_t *req, *val = NULL, *res_val, *err_val;
bool res, ret = false;
float timeout = 10;
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) {
LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port);
goto out;
}
keep_sockalive(cs->fd);
JSON_CPACK(req, "{ss,s[s]}",
"method", "mining.remote",
"params", PACKAGE"/"VERSION);
res = send_json_msg(cs, req);
json_decref(req);
if (!res) {
LOGWARNING("Failed to send message in connect_upstream");
goto out;
}
if (read_socket_line(cs, &timeout) < 1) {
LOGWARNING("Failed to receive line in connect_upstream");
goto out;
}
val = json_msg_result(cs->buf, &res_val, &err_val);
if (!val || !res_val) {
LOGWARNING("Failed to get a json result in connect_upstream, got: %s",
cs->buf);
goto out;
}
ret = json_is_true(res_val);
if (!ret) {
LOGWARNING("Denied upstream trusted connection");
goto out;
}
LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port);
ret = true;
out:
return ret;
}
static void usend_process(ckpool_t *ckp, char *buf)
{
cdata_t *cdata = ckp->data;
connsock_t *cs = &cdata->upstream_cs;
int len, sent;
if (unlikely(!buf || !strlen(buf))) {
LOGERR("Send empty message to usend_process");
goto out;
}
LOGDEBUG("Sending upstream msg: %s", buf);
len = strlen(buf);
while (42) {
sent = write_socket(cs->fd, buf, len);
if (sent == len)
break;
if (cs->fd > 0) {
LOGWARNING("Upstream pool failed, attempting reconnect while caching messages");
Close(cs->fd);
sleep(5);
connect_upstream(cs);
}
}
out:
free(buf);
}
static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata)
{
connsock_t *cs = &cdata->upstream_cs;
bool ret = false;
cs->ckp = ckp;
if (!ckp->upstream) {
LOGEMERG("No upstream server set in remote trusted server mode");
goto out;
}
if (!extract_sockaddr(ckp->upstream, &cs->url, &cs->port)) {
LOGEMERG("Failed to extract upstream address from %s", ckp->upstream);
goto out;
}
/* Must succeed on initial connect to upstream pool */
if (!connect_upstream(cs)) {
LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port);
goto out;
}
cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process);
ret = true;
out:
return ret;
}
static void process_client_msg(cdata_t *cdata, const char *buf) static void process_client_msg(cdata_t *cdata, const char *buf)
{ {
int64_t client_id; int64_t client_id;
@ -1183,6 +1297,21 @@ retry:
} }
passthrough_client(cdata, client); passthrough_client(cdata, client);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "remote")) {
client_instance_t *client;
ret = sscanf(buf, "remote=%"PRId64, &client_id);
if (ret < 0) {
LOGDEBUG("Connector failed to parse remote command: %s", buf);
goto retry;
}
client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %"PRId64" to add as remote", client_id);
goto retry;
}
remote_server(cdata, client);
dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) { } else if (cmdmatch(buf, "getxfd")) {
int fdno = -1; int fdno = -1;
@ -1297,6 +1426,10 @@ int connector(proc_instance_t *pi)
if (tries) if (tries)
LOGWARNING("Connector successfully bound to socket"); LOGWARNING("Connector successfully bound to socket");
if (ckp->remote && !setup_upstream(ckp, cdata)) {
ret = 1;
goto out;
}
cklock_init(&cdata->lock); cklock_init(&cdata->lock);
cdata->pi = pi; cdata->pi = pi;
cdata->nfds = 0; cdata->nfds = 0;

72
src/generator.c

@ -421,24 +421,6 @@ out:
return ret; return ret;
} }
/* This is for blocking sends of json messages */
static bool send_json_msg(connsock_t *cs, const json_t *json_msg)
{
int len, sent;
char *s;
s = json_dumps(json_msg, JSON_ESCAPE_SLASH | JSON_EOL);
LOGDEBUG("Sending json msg: %s", s);
len = strlen(s);
sent = write_socket(cs->fd, s, len);
dealloc(s);
if (sent != len) {
LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent);
return false;
}
return true;
}
static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy) static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy)
{ {
if (cs->fd > 0) { if (cs->fd > 0) {
@ -467,60 +449,6 @@ static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy
return true; return true;
} }
/* Decode a string that should have a json message and return just the contents
* of the result key or NULL. */
static json_t *json_result(json_t *val)
{
json_t *res_val = NULL, *err_val;
res_val = json_object_get(val, "result");
/* (null) is a valid result while no value is an error, so mask out
* (null) and only handle lack of result */
if (json_is_null(res_val))
res_val = NULL;
else if (!res_val) {
char *ss;
err_val = json_object_get(val, "error");
if (err_val)
ss = json_dumps(err_val, 0);
else
ss = strdup("(unknown reason)");
LOGNOTICE("JSON-RPC decode of json_result failed: %s", ss);
free(ss);
}
return res_val;
}
/* Return the error value if one exists */
static json_t *json_errval(json_t *val)
{
json_t *err_val = json_object_get(val, "error");
return err_val;
}
/* Parse a string and return the json value it contains, if any, and the
* result in res_val. Return NULL if no result key is found. */
static json_t *json_msg_result(char *msg, json_t **res_val, json_t **err_val)
{
json_error_t err;
json_t *val;
*res_val = NULL;
val = json_loads(msg, 0, &err);
if (!val) {
LOGWARNING("Json decode failed(%d): %s", err.line, err.text);
goto out;
}
*res_val = json_result(val);
*err_val = json_errval(val);
out:
return val;
}
/* For some reason notify is buried at various different array depths so use /* For some reason notify is buried at various different array depths so use
* a reentrant function to try and find it. */ * a reentrant function to try and find it. */
static json_t *find_notify(json_t *val) static json_t *find_notify(json_t *val)

17
src/stratifier.c

@ -5241,6 +5241,23 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
return; return;
} }
if (unlikely(cmdmatch(method, "mining.remote"))) {
char buf[256];
/* Add this client as a trusted remote node in the connector and
* drop the client in the stratifier */
if (!ckp->trusted[client->server]) {
LOGNOTICE("Dropping client %"PRId64" %s trying to authorise as remote node on non trusted server %d",
client_id, client->address, client->server);
connector_drop_client(ckp, client_id);
} else {
snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf);
}
drop_client(ckp, sdata, client_id);
return;
}
if (unlikely(cmdmatch(method, "mining.node"))) { if (unlikely(cmdmatch(method, "mining.node"))) {
char buf[256]; char buf[256];

Loading…
Cancel
Save