diff --git a/src/connector.c b/src/connector.c index 2b69e170..cbeb4535 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1009,6 +1009,7 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs) bool res, ret = false; float timeout = 10; + cksem_wait(&cs->sem); cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port); @@ -1047,6 +1048,8 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs) LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port); ret = true; out: + cksem_post(&cs->sem); + return ret; } @@ -1078,10 +1081,73 @@ out: free(buf); } +static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf) +{ + const char *gbt_block = json_string_value(json_object_get(val, "submitblock")); + + if (unlikely(!gbt_block)) { + LOGWARNING("Failed to find submitblock data from upstream submitblock method %s", + buf); + return; + } + LOGWARNING("Submitting possible upstream block!"); + send_proc(ckp->generator, gbt_block); +} + +static void *urecv_process(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + cdata_t *cdata = ckp->data; + connsock_t *cs = &cdata->upstream_cs; + ckp->proxy = true; + + rename_proc("ureceiver"); + + pthread_detach(pthread_self()); + + while (42) { + const char *method; + float timeout = 5; + json_t *val; + int ret; + + cksem_wait(&cs->sem); + ret = read_socket_line(cs, &timeout); + if (ret < 1) { + if (likely(!ret)) + LOGDEBUG("No message from upstream pool"); + else + LOGNOTICE("Failed to read from upstream pool"); + goto nomsg; + } + val = json_loads(cs->buf, 0, NULL); + if (unlikely(!val)) { + LOGWARNING("Received non-json msg from upstream pool %s", + cs->buf); + goto nomsg; + } + method = json_string_value(json_object_get(val, "method")); + if (unlikely(!method)) { + LOGWARNING("Failed to find method from upstream pool json %s", + cs->buf); + json_decref(val); + goto nomsg; + } + if (!safecmp(method, "submitblock")) + parse_remote_submitblock(ckp, val, cs->buf); + else + LOGWARNING("Unrecognised upstream method %s", method); +nomsg: + cksem_post(&cs->sem); + } + return NULL; +} + static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) { connsock_t *cs = &cdata->upstream_cs; bool ret = false; + pthread_t pth; cs->ckp = ckp; if (!ckp->upstream) { @@ -1093,11 +1159,14 @@ static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) goto out; } + cksem_init(&cs->sem); + cksem_post(&cs->sem); /* Must succeed on initial connect to upstream pool */ if (!connect_upstream(ckp, cs)) { LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port); goto out; } + create_pthread(&pth, urecv_process, ckp); cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process); ret = true; out: