Browse Source

Repair functioning of passthrough and add basic functions for share management in redirector

master
Con Kolivas 10 years ago
parent
commit
98a2197568
  1. 4
      src/ckpool.c
  2. 70
      src/generator.c

4
src/ckpool.c

@ -1616,10 +1616,10 @@ int main(int argc, char **argv)
if (!ckp.name) {
if (ckp.redirector)
ckp.name = "ckredirector";
else if (ckp.proxy)
ckp.name = "ckproxy";
else if (ckp.passthrough)
ckp.name = "ckpassthrough";
else if (ckp.proxy)
ckp.name = "ckproxy";
else
ckp.name = "ckpool";
}

70
src/generator.c

@ -433,8 +433,10 @@ static bool send_json_msg(connsock_t *cs, const json_t *json_msg)
return true;
}
static bool connect_proxy(connsock_t *cs)
static bool connect_proxy(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxy)
{
if (cs->fd > 0)
Close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) {
LOGINFO("Failed to connect socket to %s:%s in connect_proxy",
@ -442,6 +444,18 @@ static bool connect_proxy(connsock_t *cs)
return false;
}
keep_sockalive(cs->fd);
if (!ckp->passthrough) {
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = proxy;
/* Add this connsock_t to the epoll list */
if (unlikely(epoll_ctl(proxy->epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) {
LOGERR("Failed to add fd %d to epfd %d to epoll_ctl in proxy_alive",
cs->fd, proxy->epfd);
return false;
}
}
return true;
}
@ -685,13 +699,13 @@ out:
return ret;
}
static bool subscribe_stratum(connsock_t *cs, proxy_instance_t *proxi)
static bool subscribe_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
{
bool ret = false;
json_t *req;
retry:
/* Attempt to connect with the client description */
/* Attempt to connect with the client description g*/
if (!proxi->no_params) {
JSON_CPACK(req, "{s:i,s:s,s:[s]}",
"id", 0,
@ -723,7 +737,7 @@ retry:
LOGINFO("Proxy %d:%d %s failed connecting with parameters in subscribe_stratum, retrying without",
proxi->id, proxi->subid, proxi->url);
proxi->no_params = true;
ret = connect_proxy(cs);
ret = connect_proxy(ckp, cs, proxi);
if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to reconnect in subscribe_stratum",
proxi->id, proxi->subid, proxi->url);
@ -1714,7 +1728,7 @@ static void parse_redirector_share(ckpool_t *ckp, const char *msg)
client_id = json_integer_value(json_object_get(val, "client_id"));
/* Make sure this is a passthrough client value! */
if (unlikely(client_id < 0xffffffffll)) {
LOGERR("redirect_client got invalid client id %"PRId64, client_id);
LOGERR("parse_redirector_share got invalid client id %"PRId64, client_id);
goto out;
}
/* Diff is irrelevant here as we don't keep track of it so use 0 */
@ -1750,9 +1764,8 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
}
static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
bool pinging, int epfd)
bool pinging)
{
struct epoll_event event;
bool ret = false;
/* Has this proxy already been reconnected? */
@ -1764,7 +1777,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
LOGWARNING("Failed to extract address from %s", proxi->url);
goto out;
}
if (!connect_proxy(cs)) {
if (!connect_proxy(ckp, cs, proxi)) {
if (!pinging) {
LOGINFO("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port);
@ -1781,7 +1794,7 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
goto out;
}
/* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) {
if (!subscribe_stratum(ckp, cs, proxi)) {
if (!pinging) {
LOGWARNING("Failed initial subscribe to %s:%s !",
cs->url, cs->port);
@ -1802,21 +1815,9 @@ out:
if (!ret) {
send_stratifier_deadproxy(ckp, proxi->id, proxi->subid);
/* Close and invalidate the file handle */
if (cs->fd > 0) {
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL);
if (cs->fd > 0)
Close(cs->fd);
}
} else {
keep_sockalive(cs->fd);
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = proxi;
/* Add this connsock_t to the epoll list */
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) {
LOGERR("Failed to add fd %d to epfd %d to epoll_ctl in proxy_alive",
cs->fd, epfd);
return false;
}
}
proxi->alive = ret;
return ret;
}
@ -1833,7 +1834,7 @@ static void *proxy_recruit(void *arg)
retry:
recruit = false;
proxy = create_subproxy(ckp, gdata, parent, parent->url);
alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd);
alive = proxy_alive(ckp, proxy, &proxy->cs, false);
if (!alive) {
LOGNOTICE("Subproxy failed proxy_alive testing");
store_proxy(gdata, proxy);
@ -1892,7 +1893,7 @@ static void *proxy_reconnect(void *arg)
ckpool_t *ckp = proxy->ckp;
pthread_detach(pthread_self());
proxy_alive(ckp, proxy, cs, true, proxy->epfd);
proxy_alive(ckp, proxy, cs, true);
proxy->reconnecting = false;
return NULL;
}
@ -1943,19 +1944,11 @@ static void *passthrough_recv(void *arg)
proxy_instance_t *proxi = (proxy_instance_t *)arg;
connsock_t *cs = &proxi->cs;
ckpool_t *ckp = proxi->ckp;
struct epoll_event event;
bool alive;
int epfd;
rename_proc("passrecv");
proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0){
LOGEMERG("FATAL: Failed to create epoll in passrecv");
return NULL;
}
if (proxy_alive(ckp, proxi, cs, false, epfd)) {
if (proxy_alive(ckp, proxi, cs, false)) {
reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url);
}
@ -1964,7 +1957,7 @@ static void *passthrough_recv(void *arg)
while (42) {
int ret;
while (!proxy_alive(ckp, proxi, cs, true, epfd)) {
while (!proxy_alive(ckp, proxi, cs, true)) {
if (alive) {
alive = false;
reconnect_generator(ckp);
@ -1975,13 +1968,12 @@ static void *passthrough_recv(void *arg)
reconnect_generator(ckp);
/* Make sure we receive a line within 90 seconds */
ret = epoll_wait(epfd, &event, 1, 90000);
if (likely(ret > 0))
ret = read_socket_line(cs, 60);
ret = read_socket_line(cs, 90);
if (ret < 1) {
LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect",
proxi->id, proxi->url);
alive = proxi->alive = false;
Close(cs->fd);
reconnect_generator(ckp);
continue;
}
@ -2038,9 +2030,9 @@ static void *proxy_recv(void *arg)
return NULL;
}
if (proxy_alive(ckp, proxi, cs, false, epfd)) {
if (proxy_alive(ckp, proxi, cs, false))
LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url);
}
alive = proxi->alive;
while (42) {

Loading…
Cancel
Save