Browse Source

Make the server watchdog a standalone thread that doesn't need messaging

master
Con Kolivas 9 years ago
parent
commit
c645a6fc69
  1. 68
      src/generator.c

68
src/generator.c

@ -130,7 +130,8 @@ struct proxy_instance {
struct generator_data { struct generator_data {
mutex_t lock; /* Lock protecting linked lists */ mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxy_list; /* Linked list of all active proxies */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; /* Globally increasing notify id */
server_instance_t *si; /* Current server instance */
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
}; };
@ -264,7 +265,6 @@ static int gen_loop(proc_instance_t *pi)
int sockd = -1, ret = 0, selret; int sockd = -1, ret = 0, selret;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data;
bool started = false; bool started = false;
char *buf = NULL; char *buf = NULL;
connsock_t *cs; connsock_t *cs;
@ -291,7 +291,6 @@ reconnect:
retry: retry:
Close(sockd); Close(sockd);
ckmsgq_add(gdata->srvchk, si);
do { do {
selret = wait_recv_select(us->sockd, 5); selret = wait_recv_select(us->sockd, 5);
@ -1692,8 +1691,38 @@ out:
return ret; return ret;
} }
/* Check which servers are alive, maintaining a connection with them and
* reconnect if a higher priority one is available. */
static void *server_watchdog(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
gdata_t *gdata = ckp->data;
while (42) {
server_instance_t *best = NULL;
ts_t timer_t;
int i;
cksleep_prepare_r(&timer_t);
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
/* Have we reached the current server? */
if (server_alive(ckp, si, true) && !best)
best = si;
}
if (best && best != gdata->si) {
gdata->si = best;
send_proc(ckp->generator, "reconnect");
}
cksleep_ms_r(&timer_t, 5000);
}
return NULL;
}
static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
pthread_t pth_watchdog;
server_instance_t *si; server_instance_t *si;
int i, ret; int i, ret;
@ -1710,6 +1739,7 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
cksem_post(&si->cs.sem); cksem_post(&si->cs.sem);
} }
create_pthread(&pth_watchdog, server_watchdog, ckp);
ret = gen_loop(pi); ret = gen_loop(pi);
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
@ -1778,34 +1808,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
return ret; return ret;
} }
/* Tell the watchdog what the current server instance is, check which servers
* are alive, maintaining a connection with them and reconnect if a higher
* priority one is available. */
static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{
server_instance_t *best = NULL;
static time_t last_t = 0;
time_t now_t;
int i;
/* Rate limit to checking only once every 5 seconds */
now_t = time(NULL);
if (now_t <= last_t + 5)
return;
last_t = now_t;
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
/* Have we reached the current server? */
if (server_alive(ckp, si, true) && !best)
best = si;
}
if (best && (!cursi || cursi->id > best->id))
send_proc(ckp->generator, "reconnect");
}
static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{ {
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
@ -1865,10 +1867,8 @@ int generator(proc_instance_t *pi)
if (ckp->proxy) { if (ckp->proxy) {
gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog);
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);
} else { } else
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);
ret = server_mode(ckp, pi); ret = server_mode(ckp, pi);
}
dealloc(ckp->data); dealloc(ckp->data);
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);

Loading…
Cancel
Save