Browse Source

Pass through message to relevant processes with separate threads in passthrough mode

master
Con Kolivas 10 years ago
parent
commit
b1ae420fd3
  1. 10
      src/connector.c
  2. 78
      src/generator.c

10
src/connector.c

@ -176,6 +176,8 @@ static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instanc
fd = drop_client(ci, client);
if (fd == -1)
return;
if (ckp->passthrough)
return;
sprintf(buf, "dropclient=%d", client->id);
send_proc(ckp->stratifier, buf);
}
@ -236,8 +238,7 @@ reparse:
msg[buflen] = 0;
client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs);
val = json_loads(msg, 0, NULL);
if (!val) {
if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %d sent invalid json message %s", client->id, msg);
@ -250,7 +251,10 @@ reparse:
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
s = json_dumps(val, 0);
send_proc(ckp->stratifier, s);
if (ckp->passthrough)
send_proc(ckp->generator, s);
else
send_proc(ckp->stratifier, s);
free(s);
json_decref(val);
}

78
src/generator.c

@ -63,6 +63,13 @@ struct stratum_msg {
typedef struct stratum_msg stratum_msg_t;
struct pass_msg {
connsock_t *cs;
char *msg;
};
typedef struct pass_msg pass_msg_t;
/* Per proxied pool instance data */
struct proxy_instance {
ckpool_t *ckp;
@ -109,6 +116,8 @@ struct proxy_instance {
pthread_mutex_t share_lock;
share_msg_t *shares;
int share_id;
ckmsgq_t *passsends; // passthrough sends
};
typedef struct proxy_instance proxy_instance_t;
@ -1169,6 +1178,59 @@ static void *proxy_send(void *arg)
return NULL;
}
/* For receiving messages from an upstream pool to pass downstream */
static void *passthrough_recv(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
rename_proc("passrecv");
while (42) {
int ret;
do {
ret = read_socket_line(cs, 60);
} while (ret == 0);
if (ret < 1) {
/* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
send_proc(ckp->generator, "reconnect");
break;
}
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
send_proc(ckp->connector, cs->buf);
}
return NULL;
}
static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm)
{
int len, sent;
LOGDEBUG("Sending upstream json msg: %s", pm->msg);
len = strlen(pm->msg);
sent = write_socket(pm->cs->fd, pm->msg, len);
if (sent != len) {
LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg);
}
free(pm->msg);
free(pm);
}
static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
{
pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t));
pm->cs = proxi->cs;
pm->msg = strdup(msg);
ckmsgq_add(proxi->passsends, pm);
}
/* Cycle through the available proxies and find the first alive one */
static proxy_instance_t *live_proxy(ckpool_t *ckp)
{
@ -1232,10 +1294,15 @@ retry:
LOGNOTICE("Connected to upstream server %s:%s as proxy%s", cs->url, cs->port,
ckp->passthrough ? " in passthrough mode" : "");
mutex_init(&alive->notify_lock);
create_pthread(&alive->pth_precv, proxy_recv, alive);
mutex_init(&alive->psend_lock);
cond_init(&alive->psend_cond);
create_pthread(&alive->pth_psend, proxy_send, alive);
if (ckp->passthrough) {
create_pthread(&alive->pth_precv, passthrough_recv, alive);
alive->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
} else {
create_pthread(&alive->pth_precv, proxy_recv, alive);
mutex_init(&alive->psend_lock);
cond_init(&alive->psend_cond);
create_pthread(&alive->pth_psend, proxy_send, alive);
}
out:
send_proc(ckp->connector, alive ? "accept" : "reject");
return alive;
@ -1326,6 +1393,9 @@ retry:
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
} else if (ckp->passthrough) {
/* Anything remaining should be stratum messages */
passthrough_add_send(proxi, buf);
} else {
/* Anything remaining should be share submissions */
json_t *val = json_loads(buf, 0, NULL);

Loading…
Cancel
Save