Browse Source

Create rudimentary subproxy structures, moving proxy receives to epoll in order to receive from multiple connections upstream

master
Con Kolivas 10 years ago
parent
commit
fe4a25f861
  1. 94
      src/generator.c

94
src/generator.c

@ -9,6 +9,7 @@
#include "config.h"
#include <sys/epoll.h>
#include <sys/socket.h>
#include <jansson.h>
#include <string.h>
@ -81,7 +82,7 @@ struct proxy_instance {
connsock_t *cs;
server_instance_t *si;
bool passthrough;
int id; /* Proxy server id */
int id; /* Proxy server id, or subproxy id if this is a subproxy */
const char *auth;
const char *pass;
@ -123,6 +124,13 @@ struct proxy_instance {
char_entry_t *recvd_lines; /* Linked list of unprocessed messages */
time_t reconnect_time;
pthread_mutex_t proxy_lock; /* Lock protecting hashlist of proxies */
int64_t clients_per_proxy; /* How many clients can connect to each subproxy */
int64_t client_headroom; /* How many more clients can we connect */
proxy_instance_t *proxy; /* Parent proxy of subproxies */
proxy_instance_t *subproxies; /* Hashlist of subproxies of this proxy */
int subproxy_count; /* Number of subproxies */
};
/* Private data for the generator */
@ -630,15 +638,18 @@ retry:
LOGWARNING("Invalid nonce2len %d in parse_subscribe", size);
goto out;
}
if (size == 3 || (size == 4 && proxi->ckp->clientsvspeed))
LOGWARNING("Proxy %d:%s Nonce2 length %d means proxied clients can't be >5TH each",
proxi->id, proxi->si->url, size);
else if (size < 3) {
if (size < 3) {
LOGWARNING("Proxy %d:%s Nonce2 length %d too small to be able to proxy",
proxi->id, proxi->si->url, size);
goto out;
}
proxi->nonce2len = size;
if (!proxi->proxy) {
/* Set the number of clients per proxy on the parent proxy */
proxi->clients_per_proxy = 1ll << ((size - 3) * 8);
LOGNOTICE("Proxy %d:%s clients per proxy: %"PRId64, proxi->id, proxi->si->url,
proxi->clients_per_proxy);
}
LOGINFO("Found notify with enonce %s nonce2len %d", proxi->enonce1,
proxi->nonce2len);
@ -1360,8 +1371,9 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
}
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging)
connsock_t *cs, bool pinging, int epfd)
{
struct epoll_event event;
bool ret = false;
/* Has this proxy already been reconnected? */
@ -1411,6 +1423,11 @@ out:
keep_sockalive(cs->fd);
if (!ckp->passthrough)
send_subscribe(ckp, proxi);
event.events = EPOLLIN;
event.data.ptr = proxi;
/* Add this connsock_t to the epoll list */
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1))
quit(1, "FATAL: Failed to add epfd to epoll_ctl in proxy_alive");
}
return ret;
}
@ -1423,10 +1440,18 @@ static void *passthrough_recv(void *arg)
server_instance_t *si = proxi->si;
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
struct epoll_event event;
int epfd;
rename_proc("passrecv");
if (proxy_alive(ckp, si, proxi, cs, false)) {
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0){
LOGEMERG("FATAL: Failed to create epoll in passrecv");
return NULL;
}
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
proxi->alive = true;
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
@ -1436,7 +1461,7 @@ static void *passthrough_recv(void *arg)
while (42) {
int ret;
while (!proxy_alive(ckp, si, proxi, cs, true)) {
while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) {
if (proxi->alive) {
proxi->alive = false;
send_proc(ckp->generator, "reconnect");
@ -1448,10 +1473,10 @@ static void *passthrough_recv(void *arg)
send_proc(ckp->generator, "reconnect");
}
do {
/* Make sure we receive a line within 90 seconds */
ret = epoll_wait(epfd, &event, 1, 90000);
if (likely(ret > 0))
ret = read_socket_line(cs, 60);
} while (ret == 0);
if (ret < 1) {
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url);
@ -1491,10 +1516,18 @@ static void *proxy_recv(void *arg)
connsock_t *cs = proxi->cs;
ckpool_t *ckp = proxi->ckp;
gdata_t *gdata = ckp->data;
struct epoll_event event;
int epfd;
rename_proc("proxyrecv");
if (proxy_alive(ckp, si, proxi, cs, false)) {
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0){
LOGEMERG("FATAL: Failed to create epoll in proxyrecv");
return NULL;
}
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
proxi->alive = true;
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
@ -1504,10 +1537,10 @@ static void *proxy_recv(void *arg)
while (42) {
notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare;
int retries = 0, ret;
time_t now;
int ret;
while (!proxy_alive(ckp, si, proxi, cs, true)) {
while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) {
if (proxi->alive) {
proxi->alive = false;
send_proc(ckp->generator, "reconnect");
@ -1550,17 +1583,12 @@ static void *proxy_recv(void *arg)
/* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */
do {
if (cs->fd == -1) {
ret = -1;
break;
}
ret = epoll_wait(epfd, &event, 1, 600000);
if (likely(ret > 0))
ret = read_socket_line(cs, 5);
} while (ret == 0 && ++retries < 120);
if (ret < 1) {
if (proxi->alive) {
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
LOGWARNING("Proxy %d:%s failed to epoll/read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url);
}
continue;
@ -1589,8 +1617,30 @@ static void *proxy_recv(void *arg)
return NULL;
}
/* Creates a duplicate instance or proxi to be used as a subproxy, ignoring
* fields we don't use in the subproxy. */
static proxy_instance_t *create_subproxy(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy = ckzalloc(sizeof(proxy_instance_t));
subproxy->ckp = proxi->ckp;
subproxy->cs = ckzalloc(sizeof(connsock_t));
subproxy->si = proxi->si;
subproxy->id = proxi->subproxy_count++;
subproxy->auth = proxi->auth;
subproxy->pass = proxi->pass;
subproxy->proxy = proxi;
return subproxy;
}
/* Create a single subproxy instance immediately to be the first used
* by the stratifier. To be used in future code */
static void prepare_proxy(proxy_instance_t *proxi)
{
proxy_instance_t *subproxy = create_subproxy(proxi);
mutex_init(&proxi->proxy_lock);
HASH_ADD_INT(proxi->subproxies, id, subproxy);
mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi);

Loading…
Cancel
Save