Browse Source

Process mining.passthrough as a subclass of proxy and set up handling of clients in preparation for passthrough routing

master
Con Kolivas 10 years ago
parent
commit
6f12bd51a2
  1. 4
      src/ckpool.c
  2. 30
      src/connector.c
  3. 53
      src/generator.c
  4. 21
      src/stratifier.c

4
src/ckpool.c

@ -1042,7 +1042,7 @@ int main(int argc, char **argv)
ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = strdup(argv[ckp.args]);
ckp.initial_args[ckp.args] = NULL; ckp.initial_args[ckp.args] = NULL;
while ((c = getopt_long(argc, argv, "Ac:d:g:Hhkl:n:pS:s:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "Ac:d:g:Hhkl:n:PpS:s:", long_options, &i)) != -1) {
switch (c) { switch (c) {
case 'A': case 'A':
ckp.standalone = true; ckp.standalone = true;
@ -1093,7 +1093,7 @@ int main(int argc, char **argv)
case 'P': case 'P':
if (ckp.proxy) if (ckp.proxy)
quit(1, "Cannot set both proxy and passthrough mode"); quit(1, "Cannot set both proxy and passthrough mode");
ckp.passthrough = true; ckp.proxy = ckp.passthrough = true;
break; break;
case 'p': case 'p':
if (ckp.passthrough) if (ckp.passthrough)

30
src/connector.c

@ -51,6 +51,9 @@ struct client_instance {
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
bool passthrough;
int passthrough_id;
}; };
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
@ -464,6 +467,16 @@ static client_instance_t *client_by_id(conn_instance_t *ci, int id)
return client; return client;
} }
static void passthrough_client(conn_instance_t *ci, client_instance_t *client)
{
char *buf;
LOGINFO("Connector adding passthrough client %d", client->id);
client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(ci, client->id, buf);
}
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
{ {
int sockd = -1, client_id, ret = 0, selret; int sockd = -1, client_id, ret = 0, selret;
@ -540,6 +553,23 @@ retry:
LOGINFO("Connector dropped client id: %d", client_id); LOGINFO("Connector dropped client id: %d", client_id);
goto retry; goto retry;
} }
if (cmdmatch(buf, "passthrough")) {
client_instance_t *client;
int client_id;
ret = sscanf(buf, "passthrough=%d", &client_id);
if (ret < 0) {
LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry;
}
client = client_by_id(ci, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %d to pass through", client_id);
goto retry;
}
passthrough_client(ci, client);
goto retry;
}
if (cmdmatch(buf, "getfd")) { if (cmdmatch(buf, "getfd")) {
send_fd(ci->serverfd, sockd); send_fd(ci->serverfd, sockd);
goto retry; goto retry;

53
src/generator.c

@ -68,6 +68,7 @@ struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
server_instance_t *si; server_instance_t *si;
bool passthrough;
const char *auth; const char *auth;
const char *pass; const char *pass;
@ -565,6 +566,44 @@ out:
return ret; return ret;
} }
static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
json_t *req, *val = NULL, *res_val;
bool ret = false;
req = json_pack("{s:s,s:[s]}",
"method", "mining.passthrough",
"params", PACKAGE"/"VERSION);
ret = send_json_msg(cs, req);
json_decref(req);
if (!ret) {
LOGWARNING("Failed to send message in passthrough_stratum");
goto out;
}
if (read_socket_line(cs, 5) < 1) {
LOGWARNING("Failed to receive line in passthrough_stratum");
goto out;
}
val = json_msg_result(cs->buf, &res_val);
if (!val) {
LOGWARNING("Failed to get a json result in passthrough_stratum, got: %s",
cs->buf);
goto out;
}
ret = json_is_true(res_val);
if (!ret) {
LOGWARNING("Denied passthrough for stratum");
goto out;
}
proxi->passthrough = true;
out:
if (val)
json_decref(val);
if (!ret)
close(cs->fd);
return ret;
}
static bool parse_notify(proxy_instance_t *proxi, json_t *val) static bool parse_notify(proxy_instance_t *proxi, json_t *val)
{ {
const char *prev_hash, *bbversion, *nbit, *ntime; const char *prev_hash, *bbversion, *nbit, *ntime;
@ -1158,6 +1197,15 @@ retry:
cs->url, cs->port); cs->url, cs->port);
continue; continue;
} }
if (ckp->passthrough) {
if (!passthrough_stratum(cs, proxi)) {
LOGWARNING("Failed initial passthrough to %s:%s !",
cs->url, cs->port);
continue;
}
alive = proxi;
break;
}
/* Test we can connect, authorise and get stratum information */ /* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) { if (!subscribe_stratum(cs, proxi)) {
LOGINFO("Failed initial subscribe to %s:%s !", LOGINFO("Failed initial subscribe to %s:%s !",
@ -1181,7 +1229,8 @@ retry:
} }
ckp->chosen_server = 0; ckp->chosen_server = 0;
cs = alive->cs; cs = alive->cs;
LOGNOTICE("Connected to upstream server %s:%s as proxy", cs->url, cs->port); LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port,
ckp->passthrough ? " in passthrough mode" : "");
mutex_init(&alive->notify_lock); mutex_init(&alive->notify_lock);
create_pthread(&alive->pth_precv, proxy_recv, alive); create_pthread(&alive->pth_precv, proxy_recv, alive);
mutex_init(&alive->psend_lock); mutex_init(&alive->psend_lock);
@ -1229,9 +1278,11 @@ reconnect:
/* We've just subscribed and authorised so tell the stratifier to /* We've just subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */ * retrieve the first subscription. */
if (!ckp->passthrough) {
send_proc(ckp->stratifier, "subscribe"); send_proc(ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify"); send_proc(ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
}
do { do {
selret = wait_read_select(us->sockd, 5); selret = wait_read_select(us->sockd, 5);

21
src/stratifier.c

@ -234,6 +234,7 @@ struct stratum_instance {
int user_id; int user_id;
ckpool_t *ckp; ckpool_t *ckp;
bool passthrough;
}; };
typedef struct stratum_instance stratum_instance_t; typedef struct stratum_instance stratum_instance_t;
@ -1951,6 +1952,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
{ {
stratum_instance_t *client; stratum_instance_t *client;
const char *method; const char *method;
char buf[256];
/* Random broken clients send something not an integer as the id so we copy /* Random broken clients send something not an integer as the id so we copy
* the json item for id_val as is for the response. */ * the json item for id_val as is for the response. */
@ -1978,6 +1980,23 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
return; return;
} }
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
/* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly */
client->passthrough = true;
LOGINFO("Adding passthrough client %d", client->id);
snprintf(buf, 255, "passthrough=%d", client->id);
send_proc(client->ckp->connector, buf);
return;
}
/* No passthrough messages should get through from here on */
if (unlikely(client->passthrough)) {
LOGWARNING("Passthrough client messages falling through to stratifier");
return;
}
if (cmdmatch(method, "mining.auth")) { if (cmdmatch(method, "mining.auth")) {
json_params_t *jp = create_json_params(client_id, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, params_val, id_val, address);
@ -1987,8 +2006,6 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
/* We should only accept authorised requests from here on */ /* We should only accept authorised requests from here on */
if (!client->authorised) { if (!client->authorised) {
char buf[256];
/* Dropping unauthorised clients here also allows the /* Dropping unauthorised clients here also allows the
* stratifier process to restart since it will have lost all * stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */ * the stratum instance data. Clients will just reconnect. */

Loading…
Cancel
Save