Browse Source

Don't use persistent connections to bitcoind

master
Con Kolivas 9 years ago
parent
commit
d2d5a5daaa
  1. 20
      src/ckpool.c
  2. 41
      src/generator.c

20
src/ckpool.c

@ -594,7 +594,6 @@ out:
if (ret < 0) { if (ret < 0) {
empty_buffer(cs); empty_buffer(cs);
dealloc(cs->buf); dealloc(cs->buf);
Close(cs->fd);
} }
return ret; return ret;
} }
@ -741,6 +740,8 @@ static const char *rpc_method(const char *rpc_req)
return rpc_req; return rpc_req;
} }
/* All of these calls are made to bitcoind which prefers open/close instead
* of persistent connections so cs->fd is always invalid. */
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{ {
float timeout = RPC_TIMEOUT; float timeout = RPC_TIMEOUT;
@ -753,8 +754,9 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
/* Serialise all calls in case we use cs from multiple threads */ /* Serialise all calls in case we use cs from multiple threads */
cksem_wait(&cs->sem); cksem_wait(&cs->sem);
cs->fd = connect_socket(cs->url, cs->port);
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
LOGWARNING("FD %d invalid in %s", cs->fd, __func__); LOGWARNING("Unable to connect socket to %s:%s in %s", cs->url, cs->port, __func__);
goto out; goto out;
} }
if (unlikely(!cs->url)) { if (unlikely(!cs->url)) {
@ -837,20 +839,8 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
out_empty: out_empty:
empty_socket(cs->fd); empty_socket(cs->fd);
empty_buffer(cs); empty_buffer(cs);
if (!val) {
/* Assume that a failed request means the socket will be closed
* and reopen it */
Close(cs->fd);
}
out: out:
if (cs->fd < 0) { Close(cs->fd);
/* Attempt to reopen a socket that has been closed due to a
* failed request or if the socket was closed while trying to
* read/write to it. */
cs->fd = connect_socket(cs->url, cs->port);
LOGWARNING("Attempt to reopen socket to %s:%s %ssuccessful",
cs->url, cs->port, cs->fd > 0 ? "" : "un");
}
free(http_req); free(http_req);
dealloc(cs->buf); dealloc(cs->buf);
cksem_post(&cs->sem); cksem_post(&cs->sem);

41
src/generator.c

@ -137,18 +137,18 @@ struct generator_data {
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
/* Use a temporary fd when testing server_alive to avoid races on cs->fd */
static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
{ {
char *userpass = NULL; char *userpass = NULL;
bool ret = false; bool ret = false;
connsock_t *cs; connsock_t *cs;
gbtbase_t *gbt; gbtbase_t *gbt;
int fd;
cs = &si->cs; if (si->alive)
/* Has this server already been reconnected? */
if (cs->fd > 0)
return true; return true;
si->alive = false; cs = &si->cs;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return ret; return ret;
@ -163,8 +163,8 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
return ret; return ret;
} }
cs->fd = connect_socket(cs->url, cs->port); fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (fd < 0) {
if (!pinging) if (!pinging)
LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port); LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port);
return ret; return ret;
@ -185,16 +185,11 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress);
goto out; goto out;
} }
ret = true; si->alive = ret = true;
out:
if (!ret) {
/* Close and invalidate the file handle */
Close(cs->fd);
} else {
si->alive = true;
LOGNOTICE("Server alive: %s:%s", cs->url, cs->port); LOGNOTICE("Server alive: %s:%s", cs->url, cs->port);
keep_sockalive(cs->fd); out:
} /* Close the file handle */
close(fd);
return ret; return ret;
} }
@ -216,7 +211,7 @@ retry:
server_instance_t *si = ckp->servers[i]; server_instance_t *si = ckp->servers[i];
cs = &si->cs; cs = &si->cs;
if (si->alive && cs->fd > 0) { if (si->alive) {
alive = si; alive = si;
goto living; goto living;
} }
@ -301,11 +296,10 @@ retry:
} }
} while (selret < 1); } while (selret < 1);
if (unlikely(cs->fd < 0)) { if (unlikely(!si->alive)) {
LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port); LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port);
goto reconnect; goto reconnect;
} }
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGEMERG("Failed to accept on generator socket"); LOGEMERG("Failed to accept on generator socket");
@ -329,6 +323,7 @@ retry:
LOGWARNING("Failed to get block template from %s:%s", LOGWARNING("Failed to get block template from %s:%s",
cs->url, cs->port); cs->url, cs->port);
send_unix_msg(sockd, "Failed"); send_unix_msg(sockd, "Failed");
si->alive = false;
goto reconnect; goto reconnect;
} else { } else {
char *s = json_dumps(gbt->json, JSON_NO_UTF8); char *s = json_dumps(gbt->json, JSON_NO_UTF8);
@ -343,6 +338,7 @@ retry:
else if (!get_bestblockhash(cs, hash)) { else if (!get_bestblockhash(cs, hash)) {
LOGINFO("No best block hash support from %s:%s", LOGINFO("No best block hash support from %s:%s",
cs->url, cs->port); cs->url, cs->port);
si->alive = false;
send_unix_msg(sockd, "failed"); send_unix_msg(sockd, "failed");
} else { } else {
send_unix_msg(sockd, hash); send_unix_msg(sockd, hash);
@ -353,11 +349,13 @@ retry:
if (si->notify) if (si->notify)
send_unix_msg(sockd, "notify"); send_unix_msg(sockd, "notify");
else if ((height = get_blockcount(cs)) == -1) { else if ((height = get_blockcount(cs)) == -1) {
si->alive = false;
send_unix_msg(sockd, "failed"); send_unix_msg(sockd, "failed");
goto reconnect; goto reconnect;
} else { } else {
LOGDEBUG("Height: %d", height); LOGDEBUG("Height: %d", height);
if (!get_blockhash(cs, height, hash)) { if (!get_blockhash(cs, height, hash)) {
si->alive = false;
send_unix_msg(sockd, "failed"); send_unix_msg(sockd, "failed");
goto reconnect; goto reconnect;
} else { } else {
@ -1728,6 +1726,8 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds); ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds);
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
connsock_t *cs;
ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[i]; si = ckp->servers[i];
si->url = ckp->btcdurl[i]; si->url = ckp->btcdurl[i];
@ -1735,8 +1735,9 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
si->pass = ckp->btcdpass[i]; si->pass = ckp->btcdpass[i];
si->notify = ckp->btcdnotify[i]; si->notify = ckp->btcdnotify[i];
si->id = i; si->id = i;
cksem_init(&si->cs.sem); cs = &si->cs;
cksem_post(&si->cs.sem); cksem_init(&cs->sem);
cksem_post(&cs->sem);
} }
create_pthread(&pth_watchdog, server_watchdog, ckp); create_pthread(&pth_watchdog, server_watchdog, ckp);

Loading…
Cancel
Save