kanoi 9 years ago
parent
commit
297e1c1754
  1. 26
      src/ckpool.c
  2. 45
      src/generator.c
  3. 65
      src/libckpool.c
  4. 5
      src/libckpool.h
  5. 7
      src/stratifier.c

26
src/ckpool.c

@ -528,7 +528,7 @@ rewait:
ret = 0;
goto out;
}
ret = wait_recv_select(cs->fd, eom ? 0 : *timeout);
ret = wait_read_select(cs->fd, eom ? 0 : *timeout);
polled = true;
if (ret < 1) {
if (!ret) {
@ -594,7 +594,6 @@ out:
if (ret < 0) {
empty_buffer(cs);
dealloc(cs->buf);
Close(cs->fd);
}
return ret;
}
@ -741,6 +740,8 @@ static const char *rpc_method(const char *rpc_req)
return rpc_req;
}
/* All of these calls are made to bitcoind which prefers open/close instead
* of persistent connections so cs->fd is always invalid. */
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{
float timeout = RPC_TIMEOUT;
@ -753,8 +754,9 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
/* Serialise all calls in case we use cs from multiple threads */
cksem_wait(&cs->sem);
cs->fd = connect_socket(cs->url, cs->port);
if (unlikely(cs->fd < 0)) {
LOGWARNING("FD %d invalid in %s", cs->fd, __func__);
LOGWARNING("Unable to connect socket to %s:%s in %s", cs->url, cs->port, __func__);
goto out;
}
if (unlikely(!cs->url)) {
@ -837,20 +839,8 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
out_empty:
empty_socket(cs->fd);
empty_buffer(cs);
if (!val) {
/* Assume that a failed request means the socket will be closed
* and reopen it */
Close(cs->fd);
}
out:
if (cs->fd < 0) {
/* Attempt to reopen a socket that has been closed due to a
* failed request or if the socket was closed while trying to
* read/write to it. */
cs->fd = connect_socket(cs->url, cs->port);
LOGWARNING("Attempt to reopen socket to %s:%s %ssuccessful",
cs->url, cs->port, cs->fd > 0 ? "" : "un");
}
Close(cs->fd);
free(http_req);
dealloc(cs->buf);
cksem_post(&cs->sem);
@ -1583,7 +1573,9 @@ int main(int argc, char **argv)
}
if (!ckp.name) {
if (ckp.proxy)
if (ckp.passthrough)
ckp.name = "ckpassthrough";
else if (ckp.proxy)
ckp.name = "ckproxy";
else
ckp.name = "ckpool";

45
src/generator.c

@ -137,18 +137,18 @@ struct generator_data {
typedef struct generator_data gdata_t;
/* Use a temporary fd when testing server_alive to avoid races on cs->fd */
static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
{
char *userpass = NULL;
bool ret = false;
connsock_t *cs;
gbtbase_t *gbt;
int fd;
cs = &si->cs;
/* Has this server already been reconnected? */
if (cs->fd > 0)
if (si->alive)
return true;
si->alive = false;
cs = &si->cs;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
return ret;
@ -163,8 +163,8 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
return ret;
}
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) {
fd = connect_socket(cs->url, cs->port);
if (fd < 0) {
if (!pinging)
LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port);
return ret;
@ -185,16 +185,11 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress);
goto out;
}
ret = true;
si->alive = ret = true;
LOGNOTICE("Server alive: %s:%s", cs->url, cs->port);
out:
if (!ret) {
/* Close and invalidate the file handle */
Close(cs->fd);
} else {
si->alive = true;
LOGNOTICE("Server alive: %s:%s", cs->url, cs->port);
keep_sockalive(cs->fd);
}
/* Close the file handle */
close(fd);
return ret;
}
@ -216,7 +211,7 @@ retry:
server_instance_t *si = ckp->servers[i];
cs = &si->cs;
if (si->alive && cs->fd > 0) {
if (si->alive) {
alive = si;
goto living;
}
@ -293,7 +288,7 @@ retry:
Close(sockd);
do {
selret = wait_recv_select(us->sockd, 5);
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
@ -301,11 +296,10 @@ retry:
}
} while (selret < 1);
if (unlikely(cs->fd < 0)) {
if (unlikely(!si->alive)) {
LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port);
goto reconnect;
}
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGEMERG("Failed to accept on generator socket");
@ -329,6 +323,7 @@ retry:
LOGWARNING("Failed to get block template from %s:%s",
cs->url, cs->port);
send_unix_msg(sockd, "Failed");
si->alive = false;
goto reconnect;
} else {
char *s = json_dumps(gbt->json, JSON_NO_UTF8);
@ -343,6 +338,7 @@ retry:
else if (!get_bestblockhash(cs, hash)) {
LOGINFO("No best block hash support from %s:%s",
cs->url, cs->port);
si->alive = false;
send_unix_msg(sockd, "failed");
} else {
send_unix_msg(sockd, hash);
@ -353,11 +349,13 @@ retry:
if (si->notify)
send_unix_msg(sockd, "notify");
else if ((height = get_blockcount(cs)) == -1) {
si->alive = false;
send_unix_msg(sockd, "failed");
goto reconnect;
} else {
LOGDEBUG("Height: %d", height);
if (!get_blockhash(cs, height, hash)) {
si->alive = false;
send_unix_msg(sockd, "failed");
goto reconnect;
} else {
@ -1626,7 +1624,7 @@ retry:
ckmsgq_add(gdata->srvchk, proxi->si);
do {
selret = wait_recv_select(us->sockd, 5);
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Generator failed to ping main process, exiting");
ret = 1;
@ -1728,6 +1726,8 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds);
for (i = 0; i < ckp->btcds; i++) {
connsock_t *cs;
ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[i];
si->url = ckp->btcdurl[i];
@ -1735,8 +1735,9 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
si->pass = ckp->btcdpass[i];
si->notify = ckp->btcdnotify[i];
si->id = i;
cksem_init(&si->cs.sem);
cksem_post(&si->cs.sem);
cs = &si->cs;
cksem_init(&cs->sem);
cksem_post(&cs->sem);
}
create_pthread(&pth_watchdog, server_watchdog, ckp);

65
src/libckpool.c

@ -16,6 +16,7 @@
#else
#include <sys/un.h>
#endif
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/prctl.h>
#include <sys/stat.h>
@ -905,46 +906,18 @@ int wait_close(int sockd, int timeout)
return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR);
}
/* Emulate a select read wait for high fds that select doesn't support.
* wait_read_select is for unix sockets and _wait_recv_select for regular
* sockets. */
int _wait_read_select(int *sockd, float timeout)
/* Emulate a select read wait for high fds that select doesn't support. */
int wait_read_select(int sockd, float timeout)
{
struct pollfd sfd;
int ret = -1;
struct epoll_event event;
int epfd, ret;
if (unlikely(*sockd < 0))
goto out;
sfd.fd = *sockd;
sfd.events = POLLIN | POLLRDHUP;
epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
if (ret > 0 && sfd.revents & (POLLERR)) {
ret = -1;
_Close(sockd);
}
out:
return ret;
}
int _wait_recv_select(int *sockd, float timeout)
{
struct pollfd sfd;
int ret = -1;
if (unlikely(*sockd < 0))
goto out;
sfd.fd = *sockd;
sfd.events = POLLIN | POLLRDHUP;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
/* If POLLRDHUP occurs, we may still have data to read so let recv()
* after this determine if the socket can still be used. */
if (ret > 0 && sfd.revents & (POLLHUP | POLLERR)) {
ret = -1;
_Close(sockd);
}
out:
ret = epoll_wait(epfd, &event, 1, timeout);
close(epfd);
return ret;
}
@ -1018,19 +991,15 @@ out:
/* Emulate a select write wait for high fds that select doesn't support */
int wait_write_select(int sockd, float timeout)
{
struct pollfd sfd;
int ret = -1;
struct epoll_event event;
int epfd, ret;
if (unlikely(sockd < 0))
goto out;
sfd.fd = sockd;
sfd.events = POLLOUT | POLLRDHUP;
sfd.revents = 0;
epfd = epoll_create1(EPOLL_CLOEXEC);
event.events = EPOLLOUT | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockd, &event);
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
if (ret && !(sfd.revents & POLLOUT))
ret = -1;
out:
ret = epoll_wait(epfd, &event, 1, timeout);
close(epfd);
return ret;
}

5
src/libckpool.h

@ -496,10 +496,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_close(int sockd, int timeout);
int _wait_read_select(int *sockd, float timeout);
#define wait_read_select(SOCKD, TIMEOUT) _wait_read_select(&(SOCKD), TIMEOUT)
int _wait_recv_select(int *sockd, float timeout);
#define wait_recv_select(SOCKD, TIMEOUT) _wait_recv_select(&(SOCKD), TIMEOUT)
int wait_read_select(int sockd, float timeout);
int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);
#define RECV_UNIX_TIMEOUT1 30

7
src/stratifier.c

@ -921,6 +921,8 @@ static void *do_update(void *arg)
pthread_detach(pthread_self());
rename_proc("updater");
/* Serialise access to getbase to avoid out of order new block notifies */
cksem_wait(&sdata->update_sem);
retry:
buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) {
@ -976,8 +978,6 @@ retry:
json_decref(val);
generate_coinbase(ckp, wb);
/* Serialise access to add_base to avoid out of order new block notifies */
cksem_wait(&sdata->update_sem);
add_base(ckp, wb, &new_block);
/* Reset the update time to avoid stacked low priority notifies. Bring
* forward the next notify in case of a new block. */
@ -985,12 +985,13 @@ retry:
if (new_block)
now_t -= ckp->update_interval / 2;
sdata->update_time = now_t;
cksem_post(&sdata->update_sem);
stratum_broadcast_update(sdata, new_block);
ret = true;
LOGINFO("Broadcast updated stratum base");
out:
cksem_post(&sdata->update_sem);
/* Send a ping to miners if we fail to get a base to keep them
* connected while bitcoind recovers(?) */
if (unlikely(!ret)) {

Loading…
Cancel
Save