diff --git a/src/ckpool.c b/src/ckpool.c index 9ed16c86..75ba27dc 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -528,7 +528,7 @@ rewait: ret = 0; goto out; } - ret = wait_recv_select(cs->fd, eom ? 0 : *timeout); + ret = wait_read_select(cs->fd, eom ? 0 : *timeout); polled = true; if (ret < 1) { if (!ret) { @@ -594,7 +594,6 @@ out: if (ret < 0) { empty_buffer(cs); dealloc(cs->buf); - Close(cs->fd); } return ret; } @@ -741,6 +740,8 @@ static const char *rpc_method(const char *rpc_req) return rpc_req; } +/* All of these calls are made to bitcoind which prefers open/close instead + * of persistent connections so cs->fd is always invalid. */ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) { float timeout = RPC_TIMEOUT; @@ -753,8 +754,9 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) /* Serialise all calls in case we use cs from multiple threads */ cksem_wait(&cs->sem); + cs->fd = connect_socket(cs->url, cs->port); if (unlikely(cs->fd < 0)) { - LOGWARNING("FD %d invalid in %s", cs->fd, __func__); + LOGWARNING("Unable to connect socket to %s:%s in %s", cs->url, cs->port, __func__); goto out; } if (unlikely(!cs->url)) { @@ -837,20 +839,8 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) out_empty: empty_socket(cs->fd); empty_buffer(cs); - if (!val) { - /* Assume that a failed request means the socket will be closed - * and reopen it */ - Close(cs->fd); - } out: - if (cs->fd < 0) { - /* Attempt to reopen a socket that has been closed due to a - * failed request or if the socket was closed while trying to - * read/write to it. */ - cs->fd = connect_socket(cs->url, cs->port); - LOGWARNING("Attempt to reopen socket to %s:%s %ssuccessful", - cs->url, cs->port, cs->fd > 0 ? "" : "un"); - } + Close(cs->fd); free(http_req); dealloc(cs->buf); cksem_post(&cs->sem); @@ -1583,7 +1573,9 @@ int main(int argc, char **argv) } if (!ckp.name) { - if (ckp.proxy) + if (ckp.passthrough) + ckp.name = "ckpassthrough"; + else if (ckp.proxy) ckp.name = "ckproxy"; else ckp.name = "ckpool"; diff --git a/src/generator.c b/src/generator.c index 7d1dc1a0..28e61d79 100644 --- a/src/generator.c +++ b/src/generator.c @@ -137,18 +137,18 @@ struct generator_data { typedef struct generator_data gdata_t; +/* Use a temporary fd when testing server_alive to avoid races on cs->fd */ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) { char *userpass = NULL; bool ret = false; connsock_t *cs; gbtbase_t *gbt; + int fd; - cs = &si->cs; - /* Has this server already been reconnected? */ - if (cs->fd > 0) + if (si->alive) return true; - si->alive = false; + cs = &si->cs; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); return ret; @@ -163,8 +163,8 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) return ret; } - cs->fd = connect_socket(cs->url, cs->port); - if (cs->fd < 0) { + fd = connect_socket(cs->url, cs->port); + if (fd < 0) { if (!pinging) LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port); return ret; @@ -185,16 +185,11 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); goto out; } - ret = true; + si->alive = ret = true; + LOGNOTICE("Server alive: %s:%s", cs->url, cs->port); out: - if (!ret) { - /* Close and invalidate the file handle */ - Close(cs->fd); - } else { - si->alive = true; - LOGNOTICE("Server alive: %s:%s", cs->url, cs->port); - keep_sockalive(cs->fd); - } + /* Close the file handle */ + close(fd); return ret; } @@ -216,7 +211,7 @@ retry: server_instance_t *si = ckp->servers[i]; cs = &si->cs; - if (si->alive && cs->fd > 0) { + if (si->alive) { alive = si; goto living; } @@ -293,7 +288,7 @@ retry: Close(sockd); do { - selret = wait_recv_select(us->sockd, 5); + selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; @@ -301,11 +296,10 @@ retry: } } while (selret < 1); - if (unlikely(cs->fd < 0)) { + if (unlikely(!si->alive)) { LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port); goto reconnect; } - sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on generator socket"); @@ -329,6 +323,7 @@ retry: LOGWARNING("Failed to get block template from %s:%s", cs->url, cs->port); send_unix_msg(sockd, "Failed"); + si->alive = false; goto reconnect; } else { char *s = json_dumps(gbt->json, JSON_NO_UTF8); @@ -343,6 +338,7 @@ retry: else if (!get_bestblockhash(cs, hash)) { LOGINFO("No best block hash support from %s:%s", cs->url, cs->port); + si->alive = false; send_unix_msg(sockd, "failed"); } else { send_unix_msg(sockd, hash); @@ -353,11 +349,13 @@ retry: if (si->notify) send_unix_msg(sockd, "notify"); else if ((height = get_blockcount(cs)) == -1) { + si->alive = false; send_unix_msg(sockd, "failed"); goto reconnect; } else { LOGDEBUG("Height: %d", height); if (!get_blockhash(cs, height, hash)) { + si->alive = false; send_unix_msg(sockd, "failed"); goto reconnect; } else { @@ -1626,7 +1624,7 @@ retry: ckmsgq_add(gdata->srvchk, proxi->si); do { - selret = wait_recv_select(us->sockd, 5); + selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; @@ -1728,6 +1726,8 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds); for (i = 0; i < ckp->btcds; i++) { + connsock_t *cs; + ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); si = ckp->servers[i]; si->url = ckp->btcdurl[i]; @@ -1735,8 +1735,9 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) si->pass = ckp->btcdpass[i]; si->notify = ckp->btcdnotify[i]; si->id = i; - cksem_init(&si->cs.sem); - cksem_post(&si->cs.sem); + cs = &si->cs; + cksem_init(&cs->sem); + cksem_post(&cs->sem); } create_pthread(&pth_watchdog, server_watchdog, ckp); diff --git a/src/libckpool.c b/src/libckpool.c index 50a12089..092b93bd 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -16,6 +16,7 @@ #else #include #endif +#include #include #include #include @@ -905,46 +906,18 @@ int wait_close(int sockd, int timeout) return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } -/* Emulate a select read wait for high fds that select doesn't support. - * wait_read_select is for unix sockets and _wait_recv_select for regular - * sockets. */ -int _wait_read_select(int *sockd, float timeout) +/* Emulate a select read wait for high fds that select doesn't support. */ +int wait_read_select(int sockd, float timeout) { - struct pollfd sfd; - int ret = -1; + struct epoll_event event; + int epfd, ret; - if (unlikely(*sockd < 0)) - goto out; - sfd.fd = *sockd; - sfd.events = POLLIN | POLLRDHUP; + epfd = epoll_create1(EPOLL_CLOEXEC); + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; - ret = poll(&sfd, 1, timeout); - if (ret > 0 && sfd.revents & (POLLERR)) { - ret = -1; - _Close(sockd); - } -out: - return ret; -} - -int _wait_recv_select(int *sockd, float timeout) -{ - struct pollfd sfd; - int ret = -1; - - if (unlikely(*sockd < 0)) - goto out; - sfd.fd = *sockd; - sfd.events = POLLIN | POLLRDHUP; - timeout *= 1000; - ret = poll(&sfd, 1, timeout); - /* If POLLRDHUP occurs, we may still have data to read so let recv() - * after this determine if the socket can still be used. */ - if (ret > 0 && sfd.revents & (POLLHUP | POLLERR)) { - ret = -1; - _Close(sockd); - } -out: + ret = epoll_wait(epfd, &event, 1, timeout); + close(epfd); return ret; } @@ -1018,19 +991,15 @@ out: /* Emulate a select write wait for high fds that select doesn't support */ int wait_write_select(int sockd, float timeout) { - struct pollfd sfd; - int ret = -1; + struct epoll_event event; + int epfd, ret; - if (unlikely(sockd < 0)) - goto out; - sfd.fd = sockd; - sfd.events = POLLOUT | POLLRDHUP; - sfd.revents = 0; + epfd = epoll_create1(EPOLL_CLOEXEC); + event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event); timeout *= 1000; - ret = poll(&sfd, 1, timeout); - if (ret && !(sfd.revents & POLLOUT)) - ret = -1; -out: + ret = epoll_wait(epfd, &event, 1, timeout); + close(epfd); return ret; } diff --git a/src/libckpool.h b/src/libckpool.h index 222ce26f..9d8ba341 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -496,10 +496,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) int wait_close(int sockd, int timeout); -int _wait_read_select(int *sockd, float timeout); -#define wait_read_select(SOCKD, TIMEOUT) _wait_read_select(&(SOCKD), TIMEOUT) -int _wait_recv_select(int *sockd, float timeout); -#define wait_recv_select(SOCKD, TIMEOUT) _wait_recv_select(&(SOCKD), TIMEOUT) +int wait_read_select(int sockd, float timeout); int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); #define RECV_UNIX_TIMEOUT1 30 diff --git a/src/stratifier.c b/src/stratifier.c index fef6eb13..26e931ab 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -921,6 +921,8 @@ static void *do_update(void *arg) pthread_detach(pthread_self()); rename_proc("updater"); + /* Serialise access to getbase to avoid out of order new block notifies */ + cksem_wait(&sdata->update_sem); retry: buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { @@ -976,8 +978,6 @@ retry: json_decref(val); generate_coinbase(ckp, wb); - /* Serialise access to add_base to avoid out of order new block notifies */ - cksem_wait(&sdata->update_sem); add_base(ckp, wb, &new_block); /* Reset the update time to avoid stacked low priority notifies. Bring * forward the next notify in case of a new block. */ @@ -985,12 +985,13 @@ retry: if (new_block) now_t -= ckp->update_interval / 2; sdata->update_time = now_t; - cksem_post(&sdata->update_sem); stratum_broadcast_update(sdata, new_block); ret = true; LOGINFO("Broadcast updated stratum base"); out: + cksem_post(&sdata->update_sem); + /* Send a ping to miners if we fail to get a base to keep them * connected while bitcoind recovers(?) */ if (unlikely(!ret)) {