From 98a2197568f31fc2fe95f110a584eec665cf6385 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 11 Jun 2015 16:12:20 +1000 Subject: [PATCH] Repair functioning of passthrough and add basic functions for share management in redirector --- src/ckpool.c | 4 +-- src/generator.c | 70 ++++++++++++++++++++++--------------------------- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 63cedf66..7587ee7c 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1616,10 +1616,10 @@ int main(int argc, char **argv) if (!ckp.name) { if (ckp.redirector) ckp.name = "ckredirector"; - else if (ckp.proxy) - ckp.name = "ckproxy"; else if (ckp.passthrough) ckp.name = "ckpassthrough"; + else if (ckp.proxy) + ckp.name = "ckproxy"; else ckp.name = "ckpool"; } diff --git a/src/generator.c b/src/generator.c index 9e9452c4..461dc01c 100644 --- a/src/generator.c +++ b/src/generator.c @@ -433,8 +433,10 @@ static bool send_json_msg(connsock_t *cs, const json_t *json_msg) return true; } -static bool connect_proxy(connsock_t *cs) +static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy) { + if (cs->fd > 0) + Close(cs->fd); cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGINFO("Failed to connect socket to %s:%s in connect_proxy", @@ -442,6 +444,18 @@ static bool connect_proxy(connsock_t *cs) return false; } keep_sockalive(cs->fd); + if (!ckp->passthrough) { + struct epoll_event event; + + event.events = EPOLLIN | EPOLLRDHUP; + event.data.ptr = proxy; + /* Add this connsock_t to the epoll list */ + if (unlikely(epoll_ctl(proxy->epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) { + LOGERR("Failed to add fd %d to epfd %d to epoll_ctl in proxy_alive", + cs->fd, proxy->epfd); + return false; + } + } return true; } @@ -685,13 +699,13 @@ out: return ret; } -static bool subscribe_stratum(connsock_t *cs, proxy_instance_t *proxi) +static bool subscribe_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi) { bool ret = false; json_t *req; retry: - /* Attempt to connect with the client description */ + /* Attempt to connect with the client description g*/ if (!proxi->no_params) { JSON_CPACK(req, "{s:i,s:s,s:[s]}", "id", 0, @@ -723,7 +737,7 @@ retry: LOGINFO("Proxy %d:%d %s failed connecting with parameters in subscribe_stratum, retrying without", proxi->id, proxi->subid, proxi->url); proxi->no_params = true; - ret = connect_proxy(cs); + ret = connect_proxy(ckp, cs, proxi); if (!ret) { LOGNOTICE("Proxy %d:%d %s failed to reconnect in subscribe_stratum", proxi->id, proxi->subid, proxi->url); @@ -1714,7 +1728,7 @@ static void parse_redirector_share(ckpool_t *ckp, const char *msg) client_id = json_integer_value(json_object_get(val, "client_id")); /* Make sure this is a passthrough client value! */ if (unlikely(client_id < 0xffffffffll)) { - LOGERR("redirect_client got invalid client id %"PRId64, client_id); + LOGERR("parse_redirector_share got invalid client id %"PRId64, client_id); goto out; } /* Diff is irrelevant here as we don't keep track of it so use 0 */ @@ -1750,9 +1764,8 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) } static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, - bool pinging, int epfd) + bool pinging) { - struct epoll_event event; bool ret = false; /* Has this proxy already been reconnected? */ @@ -1764,7 +1777,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, LOGWARNING("Failed to extract address from %s", proxi->url); goto out; } - if (!connect_proxy(cs)) { + if (!connect_proxy(ckp, cs, proxi)) { if (!pinging) { LOGINFO("Failed to connect to %s:%s in proxy_mode!", cs->url, cs->port); @@ -1781,7 +1794,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, goto out; } /* Test we can connect, authorise and get stratum information */ - if (!subscribe_stratum(cs, proxi)) { + if (!subscribe_stratum(ckp, cs, proxi)) { if (!pinging) { LOGWARNING("Failed initial subscribe to %s:%s !", cs->url, cs->port); @@ -1802,20 +1815,8 @@ out: if (!ret) { send_stratifier_deadproxy(ckp, proxi->id, proxi->subid); /* Close and invalidate the file handle */ - if (cs->fd > 0) { - epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); + if (cs->fd > 0) Close(cs->fd); - } - } else { - keep_sockalive(cs->fd); - event.events = EPOLLIN | EPOLLRDHUP; - event.data.ptr = proxi; - /* Add this connsock_t to the epoll list */ - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) { - LOGERR("Failed to add fd %d to epfd %d to epoll_ctl in proxy_alive", - cs->fd, epfd); - return false; - } } proxi->alive = ret; return ret; @@ -1833,7 +1834,7 @@ static void *proxy_recruit(void *arg) retry: recruit = false; proxy = create_subproxy(ckp, gdata, parent, parent->url); - alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd); + alive = proxy_alive(ckp, proxy, &proxy->cs, false); if (!alive) { LOGNOTICE("Subproxy failed proxy_alive testing"); store_proxy(gdata, proxy); @@ -1892,7 +1893,7 @@ static void *proxy_reconnect(void *arg) ckpool_t *ckp = proxy->ckp; pthread_detach(pthread_self()); - proxy_alive(ckp, proxy, cs, true, proxy->epfd); + proxy_alive(ckp, proxy, cs, true); proxy->reconnecting = false; return NULL; } @@ -1943,19 +1944,11 @@ static void *passthrough_recv(void *arg) proxy_instance_t *proxi = (proxy_instance_t *)arg; connsock_t *cs = &proxi->cs; ckpool_t *ckp = proxi->ckp; - struct epoll_event event; bool alive; - int epfd; rename_proc("passrecv"); - proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); - if (epfd < 0){ - LOGEMERG("FATAL: Failed to create epoll in passrecv"); - return NULL; - } - - if (proxy_alive(ckp, proxi, cs, false, epfd)) { + if (proxy_alive(ckp, proxi, cs, false)) { reconnect_generator(ckp); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url); } @@ -1964,7 +1957,7 @@ static void *passthrough_recv(void *arg) while (42) { int ret; - while (!proxy_alive(ckp, proxi, cs, true, epfd)) { + while (!proxy_alive(ckp, proxi, cs, true)) { if (alive) { alive = false; reconnect_generator(ckp); @@ -1975,13 +1968,12 @@ static void *passthrough_recv(void *arg) reconnect_generator(ckp); /* Make sure we receive a line within 90 seconds */ - ret = epoll_wait(epfd, &event, 1, 90000); - if (likely(ret > 0)) - ret = read_socket_line(cs, 60); + ret = read_socket_line(cs, 90); if (ret < 1) { LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", proxi->id, proxi->url); alive = proxi->alive = false; + Close(cs->fd); reconnect_generator(ckp); continue; } @@ -2038,9 +2030,9 @@ static void *proxy_recv(void *arg) return NULL; } - if (proxy_alive(ckp, proxi, cs, false, epfd)) { + if (proxy_alive(ckp, proxi, cs, false)) LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url); - } + alive = proxi->alive; while (42) {