Browse Source

Reconnect generator after a parent proxy has died or it has received its first notify only

master
Con Kolivas 10 years ago
parent
commit
5688225711
  1. 24
      src/generator.c

24
src/generator.c

@ -105,6 +105,7 @@ struct proxy_instance {
bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_sessionid; /* Doesn't support session id resume on subscribe */
bool no_params; /* Doesn't want any parameters on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */
bool notified; /* Has this proxy received any notifies yet */
bool disabled; /* Subproxy no longer to be used */ bool disabled; /* Subproxy no longer to be used */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool reconnecting; /* Testing in progress */ bool reconnecting; /* Testing in progress */
@ -795,6 +796,11 @@ out:
static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni); static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni);
static void reconnect_generator(const ckpool_t *ckp)
{
send_proc(ckp->generator, "reconnect");
}
static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val) static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val)
{ {
const char *prev_hash, *bbversion, *nbit, *ntime; const char *prev_hash, *bbversion, *nbit, *ntime;
@ -868,6 +874,12 @@ static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val)
mutex_unlock(&proxy->notify_lock); mutex_unlock(&proxy->notify_lock);
send_notify(ckp, proxi, ni); send_notify(ckp, proxi, ni);
/* We have all the ingredients necessary to switch to this proxy if
* it's the best one so reassess now. */
if (unlikely(parent_proxy(proxi) && !proxi->notified)) {
proxi->notified = true;
reconnect_generator(ckp);
}
out: out:
return ret; return ret;
} }
@ -1629,7 +1641,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
return true; return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return ret; goto out;
} }
if (!connect_proxy(cs)) { if (!connect_proxy(cs)) {
if (!pinging) { if (!pinging) {
@ -1673,6 +1685,8 @@ out:
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL);
Close(cs->fd); Close(cs->fd);
} }
if (parent_proxy(proxi))
reconnect_generator(ckp);
} else { } else {
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
event.events = EPOLLIN; event.events = EPOLLIN;
@ -1803,11 +1817,6 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi); create_pthread(&pth, proxy_reconnect, proxi);
} }
static void reconnect_generator(const ckpool_t *ckp)
{
send_proc(ckp->generator, "reconnect");
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -1922,7 +1931,6 @@ static void *proxy_recv(void *arg)
proxi->low_id, proxi->si->url); proxi->low_id, proxi->si->url);
} }
alive = proxi->alive; alive = proxi->alive;
reconnect_generator(ckp);
while (42) { while (42) {
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
@ -1939,7 +1947,6 @@ static void *proxy_recv(void *arg)
LOGWARNING("Proxy %d:%s failed, attempting reconnect", LOGWARNING("Proxy %d:%s failed, attempting reconnect",
proxi->low_id, proxi->si->url); proxi->low_id, proxi->si->url);
alive = false; alive = false;
reconnect_generator(ckp);
} }
sleep(5); sleep(5);
proxi->reconnect_time = time(NULL); proxi->reconnect_time = time(NULL);
@ -1950,7 +1957,6 @@ static void *proxy_recv(void *arg)
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) { if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->si->url); LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->si->url);
proxi->reconnect_time = 0; proxi->reconnect_time = 0;
reconnect_generator(ckp);
alive = true; alive = true;
} }

Loading…
Cancel
Save