Browse Source

Remove concept of current proxy from generator, allowing stratifier to do all the management

master
Con Kolivas 10 years ago
parent
commit
172e60b567
  1. 55
      src/generator.c

55
src/generator.c

@ -124,8 +124,6 @@ struct proxy_instance {
char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ char_entry_t *recvd_lines; /* Linked list of unprocessed messages */
time_t reconnect_time;
int epfd; /* Epoll fd used by the parent proxy */ int epfd; /* Epoll fd used by the parent proxy */
mutex_t proxy_lock; /* Lock protecting hashlist of proxies */ mutex_t proxy_lock; /* Lock protecting hashlist of proxies */
@ -140,7 +138,6 @@ struct generator_data {
ckpool_t *ckp; ckpool_t *ckp;
mutex_t lock; /* Lock protecting linked lists */ mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *proxies; /* Hash list of all proxies */
proxy_instance_t *proxy; /* Current proxy */
proxy_instance_t *dead_proxies; /* Disabled proxies */ proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
@ -851,12 +848,6 @@ static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val)
mutex_unlock(&proxy->notify_lock); mutex_unlock(&proxy->notify_lock);
send_notify(ckp, proxi, ni); send_notify(ckp, proxi, ni);
/* We have all the ingredients necessary to switch to this proxy if
* it's the best one so reassess now. */
if (unlikely(parent_proxy(proxi) && !proxi->notified)) {
proxi->notified = true;
reconnect_generator(ckp);
}
out: out:
return ret; return ret;
} }
@ -1002,8 +993,10 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
HASH_DELETE(sh, proxi->subproxies, subproxy); HASH_DELETE(sh, proxi->subproxies, subproxy);
mutex_unlock(&proxi->proxy_lock); mutex_unlock(&proxi->proxy_lock);
if (subproxy) if (subproxy) {
send_stratifier_deadproxy(gdata->ckp, subproxy->id, subproxy->subid);
store_proxy(gdata, subproxy); store_proxy(gdata, subproxy);
}
} }
static bool parse_reconnect(proxy_instance_t *proxy, json_t *val) static bool parse_reconnect(proxy_instance_t *proxy, json_t *val)
@ -1717,8 +1710,6 @@ out:
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL);
Close(cs->fd); Close(cs->fd);
} }
if (parent_proxy(proxi))
reconnect_generator(ckp);
} else { } else {
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
event.events = EPOLLIN; event.events = EPOLLIN;
@ -1871,17 +1862,6 @@ static void *passthrough_recv(void *arg)
return NULL; return NULL;
} }
static proxy_instance_t *current_proxy(gdata_t *gdata)
{
proxy_instance_t *ret;
mutex_lock(&gdata->lock);
ret = gdata->proxy;
mutex_unlock(&gdata->lock);
return ret;
}
static bool subproxies_alive(proxy_instance_t *proxy) static bool subproxies_alive(proxy_instance_t *proxy)
{ {
proxy_instance_t *subproxy, *tmp; proxy_instance_t *subproxy, *tmp;
@ -1943,15 +1923,10 @@ static void *proxy_recv(void *arg)
alive = false; alive = false;
} }
sleep(5); sleep(5);
proxi->reconnect_time = time(NULL);
} }
} }
/* Wait 30 seconds before declaring this upstream pool alive if (!alive) {
* to prevent switching to unstable pools. */
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->url); LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->url);
proxi->reconnect_time = 0;
alive = true; alive = true;
} }
@ -2113,7 +2088,6 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
ret = proxi; ret = proxi;
} }
} }
gdata->proxy = ret;
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
if (ret) if (ret)
@ -2244,7 +2218,7 @@ out:
send_api_response(val, sockd); send_api_response(val, sockd);
} }
static void delete_proxy(gdata_t *gdata, proxy_instance_t *proxy) static void delete_proxy(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxy)
{ {
proxy_instance_t *subproxy; proxy_instance_t *subproxy;
@ -2264,6 +2238,7 @@ static void delete_proxy(gdata_t *gdata, proxy_instance_t *proxy)
HASH_DELETE(sh, proxy->subproxies, subproxy); HASH_DELETE(sh, proxy->subproxies, subproxy);
mutex_unlock(&proxy->proxy_lock); mutex_unlock(&proxy->proxy_lock);
send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid);
if (subproxy && proxy != subproxy) if (subproxy && proxy != subproxy)
store_proxy(gdata, subproxy); store_proxy(gdata, subproxy);
} while (subproxy); } while (subproxy);
@ -2295,14 +2270,12 @@ static void parse_delproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const
"auth", proxy->auth, "pass", proxy->pass); "auth", proxy->auth, "pass", proxy->pass);
LOGNOTICE("Deleting proxy %d:%s", proxy->id, proxy->url); LOGNOTICE("Deleting proxy %d:%s", proxy->id, proxy->url);
delete_proxy(gdata, proxy); delete_proxy(ckp, gdata, proxy);
reconnect_generator(ckp);
out: out:
send_api_response(val, sockd); send_api_response(val, sockd);
} }
static void parse_ableproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, static void parse_ableproxy(gdata_t *gdata, const int sockd, const char *buf, bool disable)
const char *buf, bool disable)
{ {
proxy_instance_t *proxy; proxy_instance_t *proxy;
@ -2327,10 +2300,9 @@ static void parse_ableproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd,
proxy->disabled = disable; proxy->disabled = disable;
LOGNOTICE("%sabling proxy %d:%s", disable ? "Dis" : "En", id, proxy->url); LOGNOTICE("%sabling proxy %d:%s", disable ? "Dis" : "En", id, proxy->url);
} }
if (disable) { if (disable)
disable_subproxy(gdata, proxy, proxy); disable_subproxy(gdata, proxy, proxy);
reconnect_generator(ckp); else
} else
reconnect_proxy(proxy); reconnect_proxy(proxy);
out: out:
send_api_response(val, sockd); send_api_response(val, sockd);
@ -2357,9 +2329,6 @@ reconnect:
if (!ckp->passthrough) { if (!ckp->passthrough) {
LOGWARNING("Successfully connected to proxy %d %s as proxy", LOGWARNING("Successfully connected to proxy %d %s as proxy",
proxi->id, proxi->url); proxi->id, proxi->url);
dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf);
} }
} }
retry: retry:
@ -2407,9 +2376,9 @@ retry:
} else if (cmdmatch(buf, "delproxy")) { } else if (cmdmatch(buf, "delproxy")) {
parse_delproxy(ckp, gdata, sockd, buf + 9); parse_delproxy(ckp, gdata, sockd, buf + 9);
} else if (cmdmatch(buf, "enableproxy")) { } else if (cmdmatch(buf, "enableproxy")) {
parse_ableproxy(ckp, gdata, sockd, buf + 12, false); parse_ableproxy(gdata, sockd, buf + 12, false);
} else if (cmdmatch(buf, "disableproxy")) { } else if (cmdmatch(buf, "disableproxy")) {
parse_ableproxy(ckp, gdata, sockd, buf + 13, true); parse_ableproxy(gdata, sockd, buf + 13, true);
} else if (cmdmatch(buf, "shutdown")) { } else if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
goto out; goto out;

Loading…
Cancel
Save