Browse Source

Use the proxy connsock semaphore to serialise uses of the cs->buf to prevent races

master
Con Kolivas 9 years ago
parent
commit
8ef853b8fc
  1. 31
      src/generator.c

31
src/generator.c

@ -771,6 +771,9 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{s:s,s:[s]}",
"method", "mining.passthrough", "method", "mining.passthrough",
"params", PACKAGE"/"VERSION); "params", PACKAGE"/"VERSION);
/* Serialise all send/recvs */
cksem_wait(&cs->sem);
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
@ -796,6 +799,8 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
} }
proxi->passthrough = true; proxi->passthrough = true;
out: out:
cksem_post(&cs->sem);
if (val) if (val)
json_decref(val); json_decref(val);
if (!ret) if (!ret)
@ -1771,6 +1776,9 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
return true; return true;
if (proxi->disabled) if (proxi->disabled)
return false; return false;
/* Serialise all send/recvs here with the cs semaphore */
cksem_wait(&cs->sem);
if (!extract_sockaddr(proxi->url, &cs->url, &cs->port)) { if (!extract_sockaddr(proxi->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", proxi->url); LOGWARNING("Failed to extract address from %s", proxi->url);
goto out; goto out;
@ -1810,6 +1818,8 @@ static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
} }
proxi->authorised = ret = true; proxi->authorised = ret = true;
out: out:
cksem_post(&cs->sem);
if (!ret) { if (!ret) {
send_stratifier_deadproxy(ckp, proxi->id, proxi->subid); send_stratifier_deadproxy(ckp, proxi->id, proxi->subid);
/* Close and invalidate the file handle */ /* Close and invalidate the file handle */
@ -1937,7 +1947,10 @@ static void *passthrough_recv(void *arg)
} }
/* Make sure we receive a line within 90 seconds */ /* Make sure we receive a line within 90 seconds */
cksem_wait(&cs->sem);
ret = read_socket_line(cs, &timeout); ret = read_socket_line(cs, &timeout);
cksem_post(&cs->sem);
if (ret < 1) { if (ret < 1) {
reconnect_generator(ckp); reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect",
@ -2048,12 +2061,17 @@ static void *proxy_recv(void *arg)
} }
mutex_unlock(&gdata->share_lock); mutex_unlock(&gdata->share_lock);
cs = NULL;
/* If we don't get an update within 10 minutes the upstream pool /* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */ * has likely stopped responding. */
ret = epoll_wait(epfd, &event, 1, 600000); ret = epoll_wait(epfd, &event, 1, 600000);
if (likely(ret > 0)) { if (likely(ret > 0)) {
subproxy = event.data.ptr; subproxy = event.data.ptr;
cs = &subproxy->cs; cs = &subproxy->cs;
/* Serialise messages from here once we have a cs by
* holding the semaphore. */
cksem_wait(&cs->sem);
if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP))
ret = -1; ret = -1;
else { else {
@ -2065,9 +2083,7 @@ static void *proxy_recv(void *arg)
LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv",
proxi->id, subproxy->subid, subproxy->url); proxi->id, subproxy->subid, subproxy->url);
disable_subproxy(gdata, proxi, subproxy); disable_subproxy(gdata, proxi, subproxy);
continue; } else do {
}
do {
/* subproxy may have been recycled here if it is not a /* subproxy may have been recycled here if it is not a
* parent and reconnect was issued */ * parent and reconnect was issued */
if (parse_method(ckp, subproxy, cs->buf)) if (parse_method(ckp, subproxy, cs->buf))
@ -2079,6 +2095,8 @@ static void *proxy_recv(void *arg)
} }
timeout = 0; timeout = 0;
} while ((ret = read_socket_line(cs, &timeout)) > 0); } while ((ret = read_socket_line(cs, &timeout)) > 0);
if (cs)
cksem_post(&cs->sem);
} }
return NULL; return NULL;
@ -2132,7 +2150,6 @@ static void *userproxy_recv(void *arg)
if (unlikely(!proxy->authorised)) if (unlikely(!proxy->authorised))
continue; continue;
cs = &proxy->cs;
if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { if (event.events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) {
LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id, LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id,
proxy->subid, proxy->url); proxy->subid, proxy->url);
@ -2162,6 +2179,9 @@ static void *userproxy_recv(void *arg)
mutex_unlock(&gdata->share_lock); mutex_unlock(&gdata->share_lock);
timeout = 0; timeout = 0;
cs = &proxy->cs;
cksem_wait(&cs->sem);
while ((ret = read_socket_line(cs, &timeout)) > 0) { while ((ret = read_socket_line(cs, &timeout)) > 0) {
/* proxy may have been recycled here if it is not a /* proxy may have been recycled here if it is not a
* parent and reconnect was issued */ * parent and reconnect was issued */
@ -2174,6 +2194,7 @@ static void *userproxy_recv(void *arg)
} }
timeout = 0; timeout = 0;
} }
cksem_post(&cs->sem);
} }
return NULL; return NULL;
} }
@ -2811,6 +2832,8 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy->ckp = proxy->cs.ckp = ckp; proxy->ckp = proxy->cs.ckp = ckp;
HASH_ADD_INT(gdata->proxies, id, proxy); HASH_ADD_INT(gdata->proxies, id, proxy);
proxy->global = true; proxy->global = true;
cksem_init(&proxy->cs.sem);
cksem_post(&proxy->cs.sem);
return proxy; return proxy;
} }

Loading…
Cancel
Save