From 5688225711620cd754b54e7d7c153d654d6f682b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 25 Feb 2015 21:44:17 +1100 Subject: [PATCH] Reconnect generator after a parent proxy has died or it has received its first notify only --- src/generator.c | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/generator.c b/src/generator.c index 512dac29..1634b68f 100644 --- a/src/generator.c +++ b/src/generator.c @@ -105,6 +105,7 @@ struct proxy_instance { bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ + bool notified; /* Has this proxy received any notifies yet */ bool disabled; /* Subproxy no longer to be used */ bool reconnect; /* We need to drop and reconnect */ bool reconnecting; /* Testing in progress */ @@ -795,6 +796,11 @@ out: static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni); +static void reconnect_generator(const ckpool_t *ckp) +{ + send_proc(ckp->generator, "reconnect"); +} + static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val) { const char *prev_hash, *bbversion, *nbit, *ntime; @@ -868,6 +874,12 @@ static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val) mutex_unlock(&proxy->notify_lock); send_notify(ckp, proxi, ni); + /* We have all the ingredients necessary to switch to this proxy if + * it's the best one so reassess now. */ + if (unlikely(parent_proxy(proxi) && !proxi->notified)) { + proxi->notified = true; + reconnect_generator(ckp); + } out: return ret; } @@ -1629,7 +1641,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); - return ret; + goto out; } if (!connect_proxy(cs)) { if (!pinging) { @@ -1673,6 +1685,8 @@ out: epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); Close(cs->fd); } + if (parent_proxy(proxi)) + reconnect_generator(ckp); } else { keep_sockalive(cs->fd); event.events = EPOLLIN; @@ -1803,11 +1817,6 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } -static void reconnect_generator(const ckpool_t *ckp) -{ - send_proc(ckp->generator, "reconnect"); -} - /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -1922,7 +1931,6 @@ static void *proxy_recv(void *arg) proxi->low_id, proxi->si->url); } alive = proxi->alive; - reconnect_generator(ckp); while (42) { share_msg_t *share, *tmpshare; @@ -1939,7 +1947,6 @@ static void *proxy_recv(void *arg) LOGWARNING("Proxy %d:%s failed, attempting reconnect", proxi->low_id, proxi->si->url); alive = false; - reconnect_generator(ckp); } sleep(5); proxi->reconnect_time = time(NULL); @@ -1950,7 +1957,6 @@ static void *proxy_recv(void *arg) if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->si->url); proxi->reconnect_time = 0; - reconnect_generator(ckp); alive = true; }