Browse Source

Make read_socket_line use a timeout pointer allowing the value to be used for reentrant calls

master
Con Kolivas 10 years ago
parent
commit
c9245673af
  1. 18
      src/ckpool.c
  2. 2
      src/ckpool.h
  3. 16
      src/generator.c

18
src/ckpool.c

@ -495,7 +495,7 @@ void empty_buffer(connsock_t *cs)
/* Read from a socket into cs->buf till we get an '\n', converting it to '\0' /* Read from a socket into cs->buf till we get an '\n', converting it to '\0'
* and storing how much extra data we've received, to be moved to the beginning * and storing how much extra data we've received, to be moved to the beginning
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
int read_socket_line(connsock_t *cs, float timeout) int read_socket_line(connsock_t *cs, float *timeout)
{ {
int fd = cs->fd, ret = -1; int fd = cs->fd, ret = -1;
char *eom = NULL; char *eom = NULL;
@ -519,7 +519,12 @@ int read_socket_line(connsock_t *cs, float timeout)
tv_time(&start); tv_time(&start);
rewait: rewait:
ret = wait_read_select(fd, eom ? 0 : timeout); if (*timeout <= 0) {
LOGDEBUG("Timed out in read_socket_line");
ret = 0;
goto out;
}
ret = wait_read_select(fd, eom ? 0 : *timeout);
if (ret < 1) { if (ret < 1) {
if (!ret) { if (!ret) {
if (eom) if (eom)
@ -531,7 +536,7 @@ rewait:
} }
tv_time(&now); tv_time(&now);
diff = tvdiff(&now, &start); diff = tvdiff(&now, &start);
timeout -= diff; *timeout -= diff;
while (42) { while (42) {
char readbuf[PAGESIZE] = {}; char readbuf[PAGESIZE] = {};
int backoff = 1; int backoff = 1;
@ -543,7 +548,7 @@ rewait:
if (eom) if (eom)
break; break;
/* Have we used up all the timeout yet? */ /* Have we used up all the timeout yet? */
if (timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (*timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret))
goto rewait; goto rewait;
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
goto out; goto out;
@ -724,6 +729,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
char *http_req = NULL; char *http_req = NULL;
json_error_t err_val; json_error_t err_val;
json_t *val = NULL; json_t *val = NULL;
float timeout = 60;
int len, ret; int len, ret;
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
@ -766,7 +772,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
LOGWARNING("Failed to write to socket in json_rpc_call"); LOGWARNING("Failed to write to socket in json_rpc_call");
goto out_empty; goto out_empty;
} }
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, &timeout);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read socket line in json_rpc_call"); LOGWARNING("Failed to read socket line in json_rpc_call");
goto out_empty; goto out_empty;
@ -776,7 +782,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
goto out_empty; goto out_empty;
} }
do { do {
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, &timeout);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read http socket lines in json_rpc_call"); LOGWARNING("Failed to read http socket lines in json_rpc_call");
goto out_empty; goto out_empty;

2
src/ckpool.h

@ -242,7 +242,7 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, float timeout); int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);

16
src/generator.c

@ -517,8 +517,9 @@ static char *cached_proxy_line(proxy_instance_t *proxi)
static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi) static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi)
{ {
char *buf = cached_proxy_line(proxi); char *buf = cached_proxy_line(proxi);
float timeout = 10;
if (!buf && read_socket_line(cs, 5) > 0) if (!buf && read_socket_line(cs, &timeout) > 0)
buf = strdup(cs->buf); buf = strdup(cs->buf);
return buf; return buf;
} }
@ -534,9 +535,10 @@ static void append_proxy_line(proxy_instance_t *proxi, const char *buf)
/* Get a new line from the connsock and return a copy of it */ /* Get a new line from the connsock and return a copy of it */
static char *new_proxy_line(connsock_t *cs) static char *new_proxy_line(connsock_t *cs)
{ {
float timeout = 10;
char *buf = NULL; char *buf = NULL;
if (read_socket_line(cs, 5) < 1) if (read_socket_line(cs, &timeout) < 1)
goto out; goto out;
buf = strdup(cs->buf); buf = strdup(cs->buf);
out: out:
@ -719,6 +721,7 @@ out:
static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
{ {
json_t *req, *val = NULL, *res_val, *err_val; json_t *req, *val = NULL, *res_val, *err_val;
float timeout = 10;
bool ret = false; bool ret = false;
JSON_CPACK(req, "{s:s,s:[s]}", JSON_CPACK(req, "{s:s,s:[s]}",
@ -730,7 +733,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
LOGWARNING("Failed to send message in passthrough_stratum"); LOGWARNING("Failed to send message in passthrough_stratum");
goto out; goto out;
} }
if (read_socket_line(cs, 5) < 1) { if (read_socket_line(cs, &timeout) < 1) {
LOGWARNING("Failed to receive line in passthrough_stratum"); LOGWARNING("Failed to receive line in passthrough_stratum");
goto out; goto out;
} }
@ -1259,6 +1262,7 @@ static void *proxy_recv(void *arg)
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
int retries = 0, ret; int retries = 0, ret;
float timeout;
time_t now; time_t now;
now = time(NULL); now = time(NULL);
@ -1286,12 +1290,13 @@ static void *proxy_recv(void *arg)
/* If we don't get an update within 10 minutes the upstream pool /* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */ * has likely stopped responding. */
timeout = 10;
do { do {
if (cs->fd == -1) { if (cs->fd == -1) {
ret = -1; ret = -1;
break; break;
} }
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, &timeout);
} while (ret == 0 && ++retries < 120); } while (ret == 0 && ++retries < 120);
if (ret < 1) { if (ret < 1) {
@ -1393,10 +1398,11 @@ static void *passthrough_recv(void *arg)
rename_proc("passrecv"); rename_proc("passrecv");
while (42) { while (42) {
float timeout = 60;
int ret; int ret;
do { do {
ret = read_socket_line(cs, 60); ret = read_socket_line(cs, &timeout);
} while (ret == 0); } while (ret == 0);
if (ret < 1) { if (ret < 1) {

Loading…
Cancel
Save