diff --git a/src/generator.c b/src/generator.c index 4dc48d36..e40b920d 100644 --- a/src/generator.c +++ b/src/generator.c @@ -771,6 +771,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) JSON_CPACK(req, "{s:s,s:[s]}", "method", "mining.passthrough", "params", PACKAGE"/"VERSION); + + /* Serialise all send/recvs */ + cksem_wait(&cs->sem); ret = send_json_msg(cs, req); json_decref(req); if (!ret) { @@ -796,6 +799,8 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) } proxi->passthrough = true; out: + cksem_post(&cs->sem); + if (val) json_decref(val); if (!ret) @@ -1771,6 +1776,9 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, return true; if (proxi->disabled) return false; + + /* Serialise all send/recvs here with the cs semaphore */ + cksem_wait(&cs->sem); if (!extract_sockaddr(proxi->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", proxi->url); goto out; @@ -1810,6 +1818,8 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, } proxi->authorised = ret = true; out: + cksem_post(&cs->sem); + if (!ret) { send_stratifier_deadproxy(ckp, proxi->id, proxi->subid); /* Close and invalidate the file handle */ @@ -1937,7 +1947,10 @@ static void *passthrough_recv(void *arg) } /* Make sure we receive a line within 90 seconds */ + cksem_wait(&cs->sem); ret = read_socket_line(cs, &timeout); + cksem_post(&cs->sem); + if (ret < 1) { reconnect_generator(ckp); LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", @@ -2048,12 +2061,17 @@ static void *proxy_recv(void *arg) } mutex_unlock(&gdata->share_lock); + cs = NULL; /* If we don't get an update within 10 minutes the upstream pool * has likely stopped responding. */ ret = epoll_wait(epfd, &event, 1, 600000); if (likely(ret > 0)) { subproxy = event.data.ptr; cs = &subproxy->cs; + + /* Serialise messages from here once we have a cs by + * holding the semaphore. */ + cksem_wait(&cs->sem); if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) ret = -1; else { @@ -2065,9 +2083,7 @@ static void *proxy_recv(void *arg) LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", proxi->id, subproxy->subid, subproxy->url); disable_subproxy(gdata, proxi, subproxy); - continue; - } - do { + } else do { /* subproxy may have been recycled here if it is not a * parent and reconnect was issued */ if (parse_method(ckp, subproxy, cs->buf)) @@ -2079,6 +2095,8 @@ static void *proxy_recv(void *arg) } timeout = 0; } while ((ret = read_socket_line(cs, &timeout)) > 0); + if (cs) + cksem_post(&cs->sem); } return NULL; @@ -2132,7 +2150,6 @@ static void *userproxy_recv(void *arg) if (unlikely(!proxy->authorised)) continue; - cs = &proxy->cs; if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id, proxy->subid, proxy->url); @@ -2162,6 +2179,9 @@ static void *userproxy_recv(void *arg) mutex_unlock(&gdata->share_lock); timeout = 0; + cs = &proxy->cs; + + cksem_wait(&cs->sem); while ((ret = read_socket_line(cs, &timeout)) > 0) { /* proxy may have been recycled here if it is not a * parent and reconnect was issued */ @@ -2174,6 +2194,7 @@ static void *userproxy_recv(void *arg) } timeout = 0; } + cksem_post(&cs->sem); } return NULL; } @@ -2811,6 +2832,8 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id proxy->ckp = proxy->cs.ckp = ckp; HASH_ADD_INT(gdata->proxies, id, proxy); proxy->global = true; + cksem_init(&proxy->cs.sem); + cksem_post(&proxy->cs.sem); return proxy; }