diff --git a/src/generator.c b/src/generator.c index 5807e9a9..9419a6bd 100644 --- a/src/generator.c +++ b/src/generator.c @@ -130,7 +130,8 @@ struct proxy_instance { struct generator_data { mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */ - int proxy_notify_id; // Globally increasing notify id + int proxy_notify_id; /* Globally increasing notify id */ + server_instance_t *si; /* Current server instance */ ckmsgq_t *srvchk; // Server check message queue }; @@ -264,7 +265,6 @@ static int gen_loop(proc_instance_t *pi) int sockd = -1, ret = 0, selret; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; - gdata_t *gdata = ckp->data; bool started = false; char *buf = NULL; connsock_t *cs; @@ -291,7 +291,6 @@ reconnect: retry: Close(sockd); - ckmsgq_add(gdata->srvchk, si); do { selret = wait_recv_select(us->sockd, 5); @@ -1692,8 +1691,38 @@ out: return ret; } +/* Check which servers are alive, maintaining a connection with them and + * reconnect if a higher priority one is available. */ +static void *server_watchdog(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + gdata_t *gdata = ckp->data; + + while (42) { + server_instance_t *best = NULL; + ts_t timer_t; + int i; + + cksleep_prepare_r(&timer_t); + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + /* Have we reached the current server? */ + if (server_alive(ckp, si, true) && !best) + best = si; + } + if (best && best != gdata->si) { + gdata->si = best; + send_proc(ckp->generator, "reconnect"); + } + cksleep_ms_r(&timer_t, 5000); + } + return NULL; +} + static int server_mode(ckpool_t *ckp, proc_instance_t *pi) { + pthread_t pth_watchdog; server_instance_t *si; int i, ret; @@ -1710,6 +1739,7 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) cksem_post(&si->cs.sem); } + create_pthread(&pth_watchdog, server_watchdog, ckp); ret = gen_loop(pi); for (i = 0; i < ckp->btcds; i++) { @@ -1778,34 +1808,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) return ret; } -/* Tell the watchdog what the current server instance is, check which servers - * are alive, maintaining a connection with them and reconnect if a higher - * priority one is available. */ -static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) -{ - server_instance_t *best = NULL; - static time_t last_t = 0; - time_t now_t; - int i; - - /* Rate limit to checking only once every 5 seconds */ - now_t = time(NULL); - if (now_t <= last_t + 5) - return; - - last_t = now_t; - - for (i = 0; i < ckp->btcds; i++) { - server_instance_t *si = ckp->servers[i]; - - /* Have we reached the current server? */ - if (server_alive(ckp, si, true) && !best) - best = si; - } - if (best && (!cursi || cursi->id > best->id)) - send_proc(ckp->generator, "reconnect"); -} - static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) { gdata_t *gdata = ckp->data; @@ -1865,10 +1867,8 @@ int generator(proc_instance_t *pi) if (ckp->proxy) { gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); - } else { - gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); + } else ret = server_mode(ckp, pi); - } dealloc(ckp->data); return process_exit(ckp, pi, ret);