diff --git a/src/generator.c b/src/generator.c index dd8182ec..a0a60718 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1512,7 +1512,7 @@ static proxy_instance_t *create_subproxy(proxy_instance_t *proxi) subproxy->ckp = proxi->ckp; subproxy->cs = ckzalloc(sizeof(connsock_t)); subproxy->si = proxi->si; - subproxy->id = proxi->subproxy_count++; + subproxy->id = proxi->subproxy_count; subproxy->auth = proxi->auth; subproxy->pass = proxi->pass; subproxy->proxy = proxi; @@ -1532,6 +1532,7 @@ static bool recruit_subproxy(proxy_instance_t *proxi, int epfd) } mutex_lock(&proxi->proxy_lock); + proxi->subproxy_count++; HASH_ADD_INT(proxi->subproxies, id, subproxy); proxi->client_headroom += proxi->clients_per_proxy; mutex_unlock(&proxi->proxy_lock); @@ -1808,11 +1809,13 @@ reconnect: if (proxi != cproxy) { proxi = cproxy; if (!ckp->passthrough) { + proxy_instance_t *proxy = proxi->proxy; + connsock_t *cs = proxi->cs; LOGWARNING("Successfully connected to proxy %d %s:%s as proxy", proxi->id, cs->url, cs->port); dealloc(buf); - ASPRINTF(&buf, "proxy=%d", proxi->id); + ASPRINTF(&buf, "proxy=%d:%d", proxy->id, proxi->id); send_proc(ckp->stratifier, buf); } } diff --git a/src/stratifier.c b/src/stratifier.c index c5569562..df2f15be 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1006,6 +1006,7 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata) memcpy(dsdata->donkeytxnbin, sdata->donkeytxnbin, 25); randomiser = ((int64_t)time(NULL)) << 32; dsdata->blockchange_id = dsdata->workbase_id = randomiser; + cklock_init(&dsdata->workbase_lock); return dsdata; } @@ -1105,10 +1106,10 @@ static proxy_t *current_proxy(sdata_t *sdata) static void update_subscribe(ckpool_t *ckp, const char *cmd) { sdata_t *sdata = ckp->data; + int id = 0, subid = 0; const char *buf; proxy_t *proxy; json_t *val; - int id = 0; if (unlikely(strlen(cmd) < 11)) { LOGWARNING("Received zero length string for subscribe in update_subscribe"); @@ -1122,11 +1123,13 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) return; } json_get_int(&id, val, "proxy"); + json_get_int(&subid, val, "subproxy"); - LOGNOTICE("Got updated subscribe for proxy %d", id); + LOGNOTICE("Got updated subscribe for proxy %d:%d", id, subid); - proxy = proxy_by_id(sdata, id); + proxy = subproxy_by_id(sdata, id, subid); proxy->notified = false; /* Reset this */ + sdata = proxy->sdata; ck_wlock(&sdata->workbase_lock); proxy->subscribed = true; @@ -1883,14 +1886,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) * first notify data comes from this proxy. */ static void set_proxy(sdata_t *sdata, const char *buf) { - proxy_t *proxy; - int id = 0; + proxy_t *proxy, *subproxy; + int id = 0, subid = 0; - sscanf(buf, "proxy=%d", &id); + sscanf(buf, "proxy=%d:%d", &id, &subid); mutex_lock(&sdata->proxy_lock); proxy = __proxy_by_id(sdata, id); - sdata->proxy = proxy; + subproxy = __subproxy_by_id(proxy, subid); + sdata->proxy = subproxy; mutex_unlock(&sdata->proxy_lock); /* We will receive a notification immediately after this and it should