From 6f12bd51a2e0251e592c5bd1406091b6b1cbdea6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 17 Aug 2014 14:28:46 +1000 Subject: [PATCH] Process mining.passthrough as a subclass of proxy and set up handling of clients in preparation for passthrough routing --- src/ckpool.c | 4 ++-- src/connector.c | 30 ++++++++++++++++++++++++ src/generator.c | 59 ++++++++++++++++++++++++++++++++++++++++++++---- src/stratifier.c | 21 +++++++++++++++-- 4 files changed, 106 insertions(+), 8 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index c66e727e..6e8d8110 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1042,7 +1042,7 @@ int main(int argc, char **argv) ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = NULL; - while ((c = getopt_long(argc, argv, "Ac:d:g:Hhkl:n:pS:s:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "Ac:d:g:Hhkl:n:PpS:s:", long_options, &i)) != -1) { switch (c) { case 'A': ckp.standalone = true; @@ -1093,7 +1093,7 @@ int main(int argc, char **argv) case 'P': if (ckp.proxy) quit(1, "Cannot set both proxy and passthrough mode"); - ckp.passthrough = true; + ckp.proxy = ckp.passthrough = true; break; case 'p': if (ckp.passthrough) diff --git a/src/connector.c b/src/connector.c index bcd15ef4..697f1834 100644 --- a/src/connector.c +++ b/src/connector.c @@ -51,6 +51,9 @@ struct client_instance { char buf[PAGESIZE]; int bufofs; + + bool passthrough; + int passthrough_id; }; typedef struct client_instance client_instance_t; @@ -464,6 +467,16 @@ static client_instance_t *client_by_id(conn_instance_t *ci, int id) return client; } +static void passthrough_client(conn_instance_t *ci, client_instance_t *client) +{ + char *buf; + + LOGINFO("Connector adding passthrough client %d", client->id); + client->passthrough = true; + ASPRINTF(&buf, "{\"result\": true}\n"); + send_client(ci, client->id, buf); +} + static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) { int sockd = -1, client_id, ret = 0, selret; @@ -540,6 +553,23 @@ retry: LOGINFO("Connector dropped client id: %d", client_id); goto retry; } + if (cmdmatch(buf, "passthrough")) { + client_instance_t *client; + int client_id; + + ret = sscanf(buf, "passthrough=%d", &client_id); + if (ret < 0) { + LOGDEBUG("Connector failed to parse passthrough command: %s", buf); + goto retry; + } + client = client_by_id(ci, client_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find client id %d to pass through", client_id); + goto retry; + } + passthrough_client(ci, client); + goto retry; + } if (cmdmatch(buf, "getfd")) { send_fd(ci->serverfd, sockd); goto retry; diff --git a/src/generator.c b/src/generator.c index 5acc3227..ac4ec045 100644 --- a/src/generator.c +++ b/src/generator.c @@ -68,6 +68,7 @@ struct proxy_instance { ckpool_t *ckp; connsock_t *cs; server_instance_t *si; + bool passthrough; const char *auth; const char *pass; @@ -565,6 +566,44 @@ out: return ret; } +static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) +{ + json_t *req, *val = NULL, *res_val; + bool ret = false; + + req = json_pack("{s:s,s:[s]}", + "method", "mining.passthrough", + "params", PACKAGE"/"VERSION); + ret = send_json_msg(cs, req); + json_decref(req); + if (!ret) { + LOGWARNING("Failed to send message in passthrough_stratum"); + goto out; + } + if (read_socket_line(cs, 5) < 1) { + LOGWARNING("Failed to receive line in passthrough_stratum"); + goto out; + } + val = json_msg_result(cs->buf, &res_val); + if (!val) { + LOGWARNING("Failed to get a json result in passthrough_stratum, got: %s", + cs->buf); + goto out; + } + ret = json_is_true(res_val); + if (!ret) { + LOGWARNING("Denied passthrough for stratum"); + goto out; + } + proxi->passthrough = true; +out: + if (val) + json_decref(val); + if (!ret) + close(cs->fd); + return ret; +} + static bool parse_notify(proxy_instance_t *proxi, json_t *val) { const char *prev_hash, *bbversion, *nbit, *ntime; @@ -1158,6 +1197,15 @@ retry: cs->url, cs->port); continue; } + if (ckp->passthrough) { + if (!passthrough_stratum(cs, proxi)) { + LOGWARNING("Failed initial passthrough to %s:%s !", + cs->url, cs->port); + continue; + } + alive = proxi; + break; + } /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { LOGINFO("Failed initial subscribe to %s:%s !", @@ -1181,7 +1229,8 @@ retry: } ckp->chosen_server = 0; cs = alive->cs; - LOGNOTICE("Connected to upstream server %s:%s as proxy", cs->url, cs->port); + LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, + ckp->passthrough ? " in passthrough mode" : ""); mutex_init(&alive->notify_lock); create_pthread(&alive->pth_precv, proxy_recv, alive); mutex_init(&alive->psend_lock); @@ -1229,9 +1278,11 @@ reconnect: /* We've just subscribed and authorised so tell the stratifier to * retrieve the first subscription. */ - send_proc(ckp->stratifier, "subscribe"); - send_proc(ckp->stratifier, "notify"); - proxi->notified = false; + if (!ckp->passthrough) { + send_proc(ckp->stratifier, "subscribe"); + send_proc(ckp->stratifier, "notify"); + proxi->notified = false; + } do { selret = wait_read_select(us->sockd, 5); diff --git a/src/stratifier.c b/src/stratifier.c index 6fd372d7..307c7e86 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -234,6 +234,7 @@ struct stratum_instance { int user_id; ckpool_t *ckp; + bool passthrough; }; typedef struct stratum_instance stratum_instance_t; @@ -1951,6 +1952,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val { stratum_instance_t *client; const char *method; + char buf[256]; /* Random broken clients send something not an integer as the id so we copy * the json item for id_val as is for the response. */ @@ -1978,6 +1980,23 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val return; } + if (unlikely(cmdmatch(method, "mining.passthrough"))) { + /* We need to inform the connector process that this client + * is a passthrough and to manage its messages accordingly */ + client->passthrough = true; + LOGINFO("Adding passthrough client %d", client->id); + snprintf(buf, 255, "passthrough=%d", client->id); + send_proc(client->ckp->connector, buf); + return; + + } + + /* No passthrough messages should get through from here on */ + if (unlikely(client->passthrough)) { + LOGWARNING("Passthrough client messages falling through to stratifier"); + return; + } + if (cmdmatch(method, "mining.auth")) { json_params_t *jp = create_json_params(client_id, params_val, id_val, address); @@ -1987,8 +2006,6 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val /* We should only accept authorised requests from here on */ if (!client->authorised) { - char buf[256]; - /* Dropping unauthorised clients here also allows the * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */