Browse Source

Listen for upstream messages and submit blocks locally

master
Con Kolivas 9 years ago
parent
commit
ba1d832813
  1. 69
      src/connector.c

69
src/connector.c

@ -1009,6 +1009,7 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
cksem_wait(&cs->sem);
cs->fd = connect_socket(cs->url, cs->port); cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port); LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port);
@ -1047,6 +1048,8 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port); LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port);
ret = true; ret = true;
out: out:
cksem_post(&cs->sem);
return ret; return ret;
} }
@ -1078,10 +1081,73 @@ out:
free(buf); free(buf);
} }
static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf)
{
const char *gbt_block = json_string_value(json_object_get(val, "submitblock"));
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to find submitblock data from upstream submitblock method %s",
buf);
return;
}
LOGWARNING("Submitting possible upstream block!");
send_proc(ckp->generator, gbt_block);
}
static void *urecv_process(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
cdata_t *cdata = ckp->data;
connsock_t *cs = &cdata->upstream_cs;
ckp->proxy = true;
rename_proc("ureceiver");
pthread_detach(pthread_self());
while (42) {
const char *method;
float timeout = 5;
json_t *val;
int ret;
cksem_wait(&cs->sem);
ret = read_socket_line(cs, &timeout);
if (ret < 1) {
if (likely(!ret))
LOGDEBUG("No message from upstream pool");
else
LOGNOTICE("Failed to read from upstream pool");
goto nomsg;
}
val = json_loads(cs->buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Received non-json msg from upstream pool %s",
cs->buf);
goto nomsg;
}
method = json_string_value(json_object_get(val, "method"));
if (unlikely(!method)) {
LOGWARNING("Failed to find method from upstream pool json %s",
cs->buf);
json_decref(val);
goto nomsg;
}
if (!safecmp(method, "submitblock"))
parse_remote_submitblock(ckp, val, cs->buf);
else
LOGWARNING("Unrecognised upstream method %s", method);
nomsg:
cksem_post(&cs->sem);
}
return NULL;
}
static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata)
{ {
connsock_t *cs = &cdata->upstream_cs; connsock_t *cs = &cdata->upstream_cs;
bool ret = false; bool ret = false;
pthread_t pth;
cs->ckp = ckp; cs->ckp = ckp;
if (!ckp->upstream) { if (!ckp->upstream) {
@ -1093,11 +1159,14 @@ static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata)
goto out; goto out;
} }
cksem_init(&cs->sem);
cksem_post(&cs->sem);
/* Must succeed on initial connect to upstream pool */ /* Must succeed on initial connect to upstream pool */
if (!connect_upstream(ckp, cs)) { if (!connect_upstream(ckp, cs)) {
LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port); LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port);
goto out; goto out;
} }
create_pthread(&pth, urecv_process, ckp);
cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process); cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process);
ret = true; ret = true;
out: out:

Loading…
Cancel
Save