diff --git a/src/generator.c b/src/generator.c index 5f193306..d1b90ca0 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2223,8 +2223,9 @@ static void *proxy_recv(void *arg) alive = proxi->alive; while (42) { - notify_instance_t *ni, *tmp; + bool message = false, hup = false; share_msg_t *share, *tmpshare; + notify_instance_t *ni, *tmp; float timeout; time_t now; int ret; @@ -2279,8 +2280,10 @@ static void *proxy_recv(void *arg) if (likely(ret > 0)) { subproxy = event.data.ptr; cs = &subproxy->cs; - if (!subproxy->alive) + if (!subproxy->alive) { + cs = NULL; continue; + } /* Serialise messages from here once we have a cs by * holding the semaphore. */ @@ -2292,14 +2295,33 @@ static void *proxy_recv(void *arg) if (event.events & EPOLLIN) { timeout = 30; ret = read_socket_line(cs, &timeout); - } else if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) - ret = -1; - } - if (ret < 1) { - LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", + /* If we are unable to read anything within 30 + * seconds at this point after EPOLLIN is set + * then the socket is dead. */ + if (ret < 1) { + LOGNOTICE("Proxy %d:%d %s failed to read_socket_line in proxy_recv", + proxi->id, subproxy->subid, subproxy->url); + hup = true; + } else { + message = true; + timeout = 0; + } + } + if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { + LOGNOTICE("Proxy %d:%d %s epoll hangup in proxy_recv", + proxi->id, subproxy->subid, subproxy->url); + hup = true; + } + } else { + LOGNOTICE("Proxy %d:%d %s failed to epoll in proxy_recv", proxi->id, subproxy->subid, subproxy->url); - disable_subproxy(gdata, proxi, subproxy); - } else do { + hup = true; + } + + /* Parse any other messages already fully buffered with a zero + * timeout. */ + while (message || read_socket_line(cs, &timeout) > 0) { + timeout = 0; /* subproxy may have been recycled here if it is not a * parent and reconnect was issued */ if (parse_method(ckp, subproxy, cs->buf)) @@ -2309,8 +2331,11 @@ static void *proxy_recv(void *arg) LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", subproxy->id, subproxy->subid, cs->buf); } - timeout = 0; - } while ((ret = read_socket_line(cs, &timeout)) > 0); + } + + /* Process hangup only after parsing messages */ + if (hup) + disable_subproxy(gdata, proxi, subproxy); if (cs) cksem_post(&cs->sem); }