Browse Source

Process all hangups after all messages in proxy_recv.

master
Con Kolivas 8 years ago
parent
commit
6587ac2817
  1. 47
      src/generator.c

47
src/generator.c

@ -2223,8 +2223,9 @@ static void *proxy_recv(void *arg)
alive = proxi->alive; alive = proxi->alive;
while (42) { while (42) {
notify_instance_t *ni, *tmp; bool message = false, hup = false;
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
notify_instance_t *ni, *tmp;
float timeout; float timeout;
time_t now; time_t now;
int ret; int ret;
@ -2279,8 +2280,10 @@ static void *proxy_recv(void *arg)
if (likely(ret > 0)) { if (likely(ret > 0)) {
subproxy = event.data.ptr; subproxy = event.data.ptr;
cs = &subproxy->cs; cs = &subproxy->cs;
if (!subproxy->alive) if (!subproxy->alive) {
cs = NULL;
continue; continue;
}
/* Serialise messages from here once we have a cs by /* Serialise messages from here once we have a cs by
* holding the semaphore. */ * holding the semaphore. */
@ -2292,14 +2295,33 @@ static void *proxy_recv(void *arg)
if (event.events & EPOLLIN) { if (event.events & EPOLLIN) {
timeout = 30; timeout = 30;
ret = read_socket_line(cs, &timeout); ret = read_socket_line(cs, &timeout);
} else if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) /* If we are unable to read anything within 30
ret = -1; * seconds at this point after EPOLLIN is set
} * then the socket is dead. */
if (ret < 1) { if (ret < 1) {
LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", LOGNOTICE("Proxy %d:%d %s failed to read_socket_line in proxy_recv",
proxi->id, subproxy->subid, subproxy->url);
hup = true;
} else {
message = true;
timeout = 0;
}
}
if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) {
LOGNOTICE("Proxy %d:%d %s epoll hangup in proxy_recv",
proxi->id, subproxy->subid, subproxy->url);
hup = true;
}
} else {
LOGNOTICE("Proxy %d:%d %s failed to epoll in proxy_recv",
proxi->id, subproxy->subid, subproxy->url); proxi->id, subproxy->subid, subproxy->url);
disable_subproxy(gdata, proxi, subproxy); hup = true;
} else do { }
/* Parse any other messages already fully buffered with a zero
* timeout. */
while (message || read_socket_line(cs, &timeout) > 0) {
timeout = 0;
/* subproxy may have been recycled here if it is not a /* subproxy may have been recycled here if it is not a
* parent and reconnect was issued */ * parent and reconnect was issued */
if (parse_method(ckp, subproxy, cs->buf)) if (parse_method(ckp, subproxy, cs->buf))
@ -2309,8 +2331,11 @@ static void *proxy_recv(void *arg)
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
subproxy->id, subproxy->subid, cs->buf); subproxy->id, subproxy->subid, cs->buf);
} }
timeout = 0; }
} while ((ret = read_socket_line(cs, &timeout)) > 0);
/* Process hangup only after parsing messages */
if (hup)
disable_subproxy(gdata, proxi, subproxy);
if (cs) if (cs)
cksem_post(&cs->sem); cksem_post(&cs->sem);
} }

Loading…
Cancel
Save