From b1ae420fd35461adaedd87c9d0382f6a2a4cc93d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 17 Aug 2014 17:22:17 +1000 Subject: [PATCH] Pass through message to relevant processes with separate threads in passthrough mode --- src/connector.c | 10 +++++-- src/generator.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 697f1834..90d87c41 100644 --- a/src/connector.c +++ b/src/connector.c @@ -176,6 +176,8 @@ static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instanc fd = drop_client(ci, client); if (fd == -1) return; + if (ckp->passthrough) + return; sprintf(buf, "dropclient=%d", client->id); send_proc(ckp->stratifier, buf); } @@ -236,8 +238,7 @@ reparse: msg[buflen] = 0; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); - val = json_loads(msg, 0, NULL); - if (!val) { + if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); LOGINFO("Client id %d sent invalid json message %s", client->id, msg); @@ -250,7 +251,10 @@ reparse: json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); s = json_dumps(val, 0); - send_proc(ckp->stratifier, s); + if (ckp->passthrough) + send_proc(ckp->generator, s); + else + send_proc(ckp->stratifier, s); free(s); json_decref(val); } diff --git a/src/generator.c b/src/generator.c index ac4ec045..2fc4639c 100644 --- a/src/generator.c +++ b/src/generator.c @@ -63,6 +63,13 @@ struct stratum_msg { typedef struct stratum_msg stratum_msg_t; +struct pass_msg { + connsock_t *cs; + char *msg; +}; + +typedef struct pass_msg pass_msg_t; + /* Per proxied pool instance data */ struct proxy_instance { ckpool_t *ckp; @@ -109,6 +116,8 @@ struct proxy_instance { pthread_mutex_t share_lock; share_msg_t *shares; int share_id; + + ckmsgq_t *passsends; // passthrough sends }; typedef struct proxy_instance proxy_instance_t; @@ -1169,6 +1178,59 @@ static void *proxy_send(void *arg) return NULL; } +/* For receiving messages from an upstream pool to pass downstream */ +static void *passthrough_recv(void *arg) +{ + proxy_instance_t *proxi = (proxy_instance_t *)arg; + connsock_t *cs = proxi->cs; + ckpool_t *ckp = proxi->ckp; + + rename_proc("passrecv"); + + while (42) { + int ret; + + do { + ret = read_socket_line(cs, 60); + } while (ret == 0); + + if (ret < 1) { + /* Send ourselves a reconnect message */ + LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); + send_proc(ckp->generator, "reconnect"); + break; + } + /* Simply forward the message on, as is, to the connector to + * process. Possibly parse parameters sent by upstream pool + * here */ + send_proc(ckp->connector, cs->buf); + } + return NULL; +} + +static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) +{ + int len, sent; + + LOGDEBUG("Sending upstream json msg: %s", pm->msg); + len = strlen(pm->msg); + sent = write_socket(pm->cs->fd, pm->msg, len); + if (sent != len) { + LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg); + } + free(pm->msg); + free(pm); +} + +static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) +{ + pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); + + pm->cs = proxi->cs; + pm->msg = strdup(msg); + ckmsgq_add(proxi->passsends, pm); +} + /* Cycle through the available proxies and find the first alive one */ static proxy_instance_t *live_proxy(ckpool_t *ckp) { @@ -1232,10 +1294,15 @@ retry: LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port, ckp->passthrough ? " in passthrough mode" : ""); mutex_init(&alive->notify_lock); - create_pthread(&alive->pth_precv, proxy_recv, alive); - mutex_init(&alive->psend_lock); - cond_init(&alive->psend_cond); - create_pthread(&alive->pth_psend, proxy_send, alive); + if (ckp->passthrough) { + create_pthread(&alive->pth_precv, passthrough_recv, alive); + alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); + } else { + create_pthread(&alive->pth_precv, proxy_recv, alive); + mutex_init(&alive->psend_lock); + cond_init(&alive->psend_cond); + create_pthread(&alive->pth_psend, proxy_send, alive); + } out: send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; @@ -1326,6 +1393,9 @@ retry: } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); + } else if (ckp->passthrough) { + /* Anything remaining should be stratum messages */ + passthrough_add_send(proxi, buf); } else { /* Anything remaining should be share submissions */ json_t *val = json_loads(buf, 0, NULL);