diff --git a/src/generator.c b/src/generator.c index 608771b2..73a73194 100644 --- a/src/generator.c +++ b/src/generator.c @@ -123,6 +123,8 @@ struct proxy_instance { ckmsgq_t *passsends; // passthrough sends char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ + + time_t reconnect_time; }; /* Private data for the generator */ @@ -1475,6 +1477,8 @@ static void *passthrough_recv(void *arg) return NULL; } +static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata); + /* For receiving messages from the upstream proxy, also responsible for setting * up the connection and testing it's alive. */ static void *proxy_recv(void *arg) @@ -1497,6 +1501,7 @@ static void *proxy_recv(void *arg) notify_instance_t *ni, *tmp; share_msg_t *share, *tmpshare; int retries = 0, ret; + char buf[128]; time_t now; while (!proxy_alive(ckp, si, proxi, cs, true)) { @@ -1505,9 +1510,14 @@ static void *proxy_recv(void *arg) send_proc(ckp->generator, "reconnect"); } sleep(5); + proxi->reconnect_time = time(NULL); } - if (!proxi->alive) { + /* Wait 90 seconds before declaring this upstream pool alive + * to prevent switching to unstable pools. */ + if (!proxi->alive && (!current_proxy(ckp, ckp->data) || + time(NULL) - proxi->reconnect_time > 90)) { proxi->alive = true; + proxi->reconnect_time = 0; send_proc(ckp->generator, "reconnect"); } @@ -1551,7 +1561,8 @@ static void *proxy_recv(void *arg) } if (parse_method(proxi, cs->buf)) { if (proxi->notified) { - send_proc(ckp->stratifier, "notify"); + snprintf(buf, 127, "notify=%d", proxi->id); + send_proc(ckp->stratifier, buf); proxi->notified = false; } if (proxi->diffed) { @@ -1612,6 +1623,7 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata) while (42) { if (!ping_main(ckp)) break; + mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { if (proxi->alive) { @@ -1625,6 +1637,7 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata) } gdata->proxy = ret; mutex_unlock(&gdata->lock); + if (ret) break; sleep(1);