diff --git a/README b/README index 17f428a7..01f32aa1 100644 --- a/README +++ b/README @@ -138,6 +138,7 @@ ckpool supports the following options: -A Standalone mode tells ckpool not to try to communicate with ckdb or log any ckdb requests in the rotating ckdb logs it would otherwise store. All users are automatically accepted without any attempt to authorise users in any way. +This option is explicitly enabled when built without ckdb support. -c tells ckpool to override its default configuration filename and load the specified one. If -c is not specified, ckpool looks for ckpool.conf @@ -145,6 +146,7 @@ whereas in proxy or passthrough modes it will look for ckproxy.conf -d tells ckpool what the name of the ckdb process is that it should speak to, otherwise it will look for ckdb. +This option does not exist when built without ckdb support. -g will start ckpool as the group ID specified. @@ -179,6 +181,7 @@ it to scale to large hashrates. Standalone mode is Optional. -S tells ckpool which directory to look for the ckdb socket to talk to. +This option does not exist when built without ckdb support. -s tells ckpool which directory to place its own communication sockets (/tmp by default) @@ -249,4 +252,7 @@ and 3334 in proxy mode. "startdiff" : Starting diff that new clients are given. Default 42 +"maxdiff" : Optional maximum diff that vardiff will clamp to where zero is no +maximum. + "logdir" : Which directory to store pool and client logs. Default "logs" diff --git a/ckpool.conf b/ckpool.conf index 62424d30..340556bd 100644 --- a/ckpool.conf +++ b/ckpool.conf @@ -20,6 +20,7 @@ "serverurl" : "ckpool.org:3333", "mindiff" : 1, "startdiff" : 42, +"maxdiff" : 0, "logdir" : "logs" } Comments from here on are ignored. diff --git a/ckproxy.conf b/ckproxy.conf index b4e3f2bf..6754198b 100644 --- a/ckproxy.conf +++ b/ckproxy.conf @@ -15,6 +15,7 @@ "serverurl" : "192.168.1.100:3334", "mindiff" : 1, "startdiff" : 42, +"maxdiff" : 0, "logdir" : "logs" } Comments from here on are ignored. diff --git a/configure.ac b/configure.ac index 3f646444..218a2392 100644 --- a/configure.ac +++ b/configure.ac @@ -59,6 +59,7 @@ AC_ARG_WITH([ckdb], if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) + AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required]) PQ_LIBS="-lpq" else PQ_LIBS="" diff --git a/src/ckpool.c b/src/ckpool.c index 6c2b02ec..b350291d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -280,10 +280,10 @@ retry: if (newfd > 0) { LOGDEBUG("Sending new fd %d", newfd); send_fd(newfd, sockd); - close(newfd); + Close(newfd); } else LOGWARNING("Failed to get_fd"); - close(connfd); + Close(connfd); } else LOGWARNING("Failed to send_procmsg to connector"); } else if (cmdmatch(buf, "restart")) { @@ -300,7 +300,7 @@ retry: LOGINFO("Listener received unhandled message: %s", buf); send_unix_msg(sockd, "unknown"); } - close(sockd); + Close(sockd); goto retry; out: dealloc(buf); @@ -386,10 +386,7 @@ int read_socket_line(connsock_t *cs, int timeout) out: if (ret < 0) { dealloc(cs->buf); - if (cs->fd > 0) { - close(cs->fd); - cs->fd = -1; - } + Close(cs->fd); } return ret; } @@ -447,7 +444,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - close(sockd); + Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); @@ -484,7 +481,7 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGWARNING("Failed to send %s to socket %s", msg, path); else buf = recv_unix_msg(sockd); - close(sockd); + Close(sockd); out: if (unlikely(!buf)) LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line); @@ -515,7 +512,7 @@ char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, co LOGWARNING("Failed to send %s to ckdb", msg); else buf = recv_unix_msg(sockd); - close(sockd); + Close(sockd); out: if (unlikely(!buf)) LOGERR("Failure in send_recv_ckdb from %s %s:%d", file, func, line); @@ -608,7 +605,7 @@ out_empty: /* Assume that a failed request means the socket will be closed * and reopen it */ LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); - close(cs->fd); + Close(cs->fd); cs->fd = connect_socket(cs->url, cs->port); } out: @@ -983,6 +980,7 @@ static void parse_config(ckpool_t *ckp) json_get_string(&ckp->serverurl, json_conf, "serverurl"); json_get_int64(&ckp->mindiff, json_conf, "mindiff"); json_get_int64(&ckp->startdiff, json_conf, "startdiff"); + json_get_int64(&ckp->maxdiff, json_conf, "maxdiff"); json_get_string(&ckp->logdir, json_conf, "logdir"); arr_val = json_object_get(json_conf, "proxy"); if (arr_val && json_is_array(arr_val)) { @@ -1058,6 +1056,7 @@ static void *watchdog(void *arg) return NULL; } +#ifdef USE_CKDB static struct option long_options[] = { {"standalone", no_argument, 0, 'A'}, {"config", required_argument, 0, 'c'}, @@ -1075,6 +1074,22 @@ static struct option long_options[] = { {"sockdir", required_argument, 0, 's'}, {0, 0, 0, 0} }; +#else +static struct option long_options[] = { + {"config", required_argument, 0, 'c'}, + {"group", required_argument, 0, 'g'}, + {"handover", no_argument, 0, 'H'}, + {"help", no_argument, 0, 'h'}, + {"killold", no_argument, 0, 'k'}, + {"log-shares", no_argument, 0, 'L'}, + {"loglevel", required_argument, 0, 'l'}, + {"name", required_argument, 0, 'n'}, + {"passthrough", no_argument, 0, 'P'}, + {"proxy", no_argument, 0, 'p'}, + {"sockdir", required_argument, 0, 's'}, + {0, 0, 0, 0} +}; +#endif int main(int argc, char **argv) { @@ -1193,7 +1208,7 @@ int main(int argc, char **argv) } trail_slash(&ckp.socket_dir); - if (!ckp.standalone) { + if (!CKP_STANDALONE(&ckp)) { if (!ckp.ckdb_name) ckp.ckdb_name = "ckdb"; if (!ckp.ckdb_sockdir) { @@ -1295,12 +1310,12 @@ int main(int argc, char **argv) if (sockd > 0 && send_unix_msg(sockd, "getfd")) { ckp.oldconnfd = get_fd(sockd); - close(sockd); + Close(sockd); sockd = open_unix_client(ckp.main.us.path); send_unix_msg(sockd, "shutdown"); if (ckp.oldconnfd > 0) LOGWARNING("Inherited old socket with new file descriptor %d!", ckp.oldconnfd); - close(sockd); + Close(sockd); } } diff --git a/src/ckpool.h b/src/ckpool.h index e7715d47..9e3311cb 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -150,6 +150,7 @@ struct ckpool_instance { /* Difficulty settings */ int64_t mindiff; // Default 1 int64_t startdiff; // Default 42 + int64_t maxdiff; // No default /* Coinbase data */ char *btcaddress; // Address to mine to @@ -171,6 +172,12 @@ struct ckpool_instance { server_instance_t *btcdbackup; }; +#ifdef USE_CKDB +#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) +#else +#define CKP_STANDALONE(CKP) (true) +#endif + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); diff --git a/src/connector.c b/src/connector.c index 9548f599..97f51418 100644 --- a/src/connector.c +++ b/src/connector.c @@ -92,7 +92,7 @@ void *acceptor(void *arg) conn_instance_t *ci = (conn_instance_t *)arg; client_instance_t *client, *old_client; socklen_t address_len; - int fd; + int fd, port; rename_proc("acceptor"); @@ -115,22 +115,25 @@ retry: case AF_INET: inet4_in = (struct sockaddr_in *)&client->address; inet_ntop(AF_INET, &inet4_in->sin_addr, client->address_name, INET6_ADDRSTRLEN); + port = htons(inet4_in->sin_port); break; case AF_INET6: inet6_in = (struct sockaddr_in6 *)&client->address; inet_ntop(AF_INET6, &inet6_in->sin6_addr, client->address_name, INET6_ADDRSTRLEN); + port = htons(inet6_in->sin6_port); break; default: LOGWARNING("Unknown INET type for client %d on socket %d", ci->nfds, fd); - close(fd); + Close(fd); free(client); goto retry; } keep_sockalive(fd); + nolinger_socket(fd); - LOGINFO("Connected new client %d on socket %d from %s", ci->nfds, fd, client->address_name); + LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); client->fd = fd; @@ -153,11 +156,10 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) ck_wlock(&ci->lock); fd = client->fd; if (fd != -1) { - close(fd); + Close(client->fd); HASH_DEL(clients, client); HASH_DELETE(fdhh, fdclients, client); LL_PREPEND(dead_clients, client); - client->fd = -1; } ck_wunlock(&ci->lock); @@ -170,11 +172,8 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client) { char buf[256]; - int fd; - fd = drop_client(ci, client); - if (fd == -1) - return; + drop_client(ci, client); if (ckp->passthrough) return; sprintf(buf, "dropclient=%ld", client->id); @@ -185,34 +184,37 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { + int buflen, ret, selfail = 0; ckpool_t *ckp = ci->pi->ckp; - int buflen, ret, flags = 0; char msg[PAGESIZE], *eol; - bool moredata = false; json_t *val; retry: - buflen = PAGESIZE - client->bufofs; - if (moredata) - flags = MSG_DONTWAIT; - ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); + /* Select should always return positive after poll unless we have + * been disconnected. On retries, decide whether we should do further + * reads based on select readiness and only fail if we get an error. */ + ret = wait_read_select(client->fd, 0); if (ret < 1) { - /* Nothing else ready to be read */ - if (!ret && flags) + if (ret > selfail) return; - + LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", + client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + invalidate_client(ckp, ci, client); + return; + } + selfail = -1; + buflen = PAGESIZE - client->bufofs; + ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); + if (ret < 1) { /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the * client has disconnected. */ - LOGINFO("Client fd %d disconnected", client->fd); + LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, ci, client); return; } client->bufofs += ret; - if (client->bufofs == PAGESIZE) - moredata = true; - else - moredata = false; reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) { @@ -221,9 +223,7 @@ reparse: invalidate_client(ckp, ci, client); return; } - if (moredata) - goto retry; - return; + goto retry; } /* Do something useful with this message now */ @@ -234,9 +234,10 @@ reparse: return; } memcpy(msg, client->buf, buflen); - msg[buflen] = 0; + msg[buflen] = '\0'; client->bufofs -= buflen; memmove(client->buf, client->buf + buflen, client->bufofs); + client->buf[client->bufofs] = '\0'; if (!(val = json_loads(msg, 0, NULL))) { char *buf = strdup("Invalid JSON, disconnecting\n"); @@ -267,6 +268,7 @@ reparse: if (client->bufofs) goto reparse; + goto retry; } /* Waits on fds ready to read on from the list stored in conn_instance and @@ -287,6 +289,11 @@ retry: ck_rlock(&ci->lock); HASH_ITER(fdhh, fdclients, client, tmp) { + if (unlikely(client->fd == -1)) { + LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!", + client->id); + continue; + } fds[nfds].fd = client->fd; fds[nfds].events = POLLIN; fds[nfds].revents = 0; @@ -298,7 +305,7 @@ retry: cksleep_ms(100); goto retry; } - ret = poll(fds, nfds, 1000); + ret = poll(fds, nfds, 100); if (unlikely(ret < 0)) { LOGERR("Failed to poll in receiver"); goto out; @@ -398,7 +405,8 @@ void *sender(void *arg) ret = wait_write_select(fd, 0); if (ret < 1) { if (ret < 0) { - LOGDEBUG("Discarding message sent to interrupted client"); + LOGINFO("Client id %d fd %d interrupted", client->id, fd); + invalidate_client(ckp, ci, client); free(sender_send->buf); free(sender_send); continue; @@ -415,7 +423,7 @@ void *sender(void *arg) while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { - LOGINFO("Client id %d disconnected", client->id); + LOGINFO("Client id %d fd %d disconnected", client->id, fd); invalidate_client(ckp, ci, client); break; } @@ -456,7 +464,8 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf) if (unlikely(fd == -1)) { if (client) { - LOGINFO("Client id %ld disconnected", id); + /* This shouldn't happen */ + LOGWARNING("Client id %ld disconnected but fd already invalidated!", id); invalidate_client(ci->pi->ckp, ci, client); } else LOGINFO("Connector failed to find client id %ld to send to", id); @@ -517,7 +526,7 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) LOGWARNING("%s connector ready", ckp->name); retry: - close(sockd); + Close(sockd); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on connector socket, exiting"); @@ -621,7 +630,7 @@ retry: goto retry; out: - close(sockd); + Close(sockd); dealloc(buf); return ret; } @@ -684,7 +693,7 @@ int connector(proc_instance_t *pi) } while (++tries < 25); if (ret < 0) { LOGERR("Connector failed to bind to socket for 2 minutes"); - close(sockd); + Close(sockd); goto out; } } @@ -694,7 +703,7 @@ int connector(proc_instance_t *pi) ret = listen(sockd, 10); if (ret < 0) { LOGERR("Connector failed to listen on socket"); - close(sockd); + Close(sockd); goto out; } diff --git a/src/generator.c b/src/generator.c index 748d6f05..79a30681 100644 --- a/src/generator.c +++ b/src/generator.c @@ -177,8 +177,7 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) out: if (!ret) { /* Close and invalidate the file handle */ - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } else keep_sockalive(cs->fd); return ret; @@ -225,8 +224,7 @@ static void kill_server(server_instance_t *si) LOGNOTICE("Killing server"); cs = &si->cs; - close(cs->fd); - cs->fd = -1; + Close(cs->fd); dealloc(cs->url); dealloc(cs->port); dealloc(cs->auth); @@ -290,7 +288,7 @@ retry: buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in gen_loop"); - close(sockd); + Close(sockd); goto retry; } LOGDEBUG("Generator received request: %s", buf); @@ -365,7 +363,7 @@ retry: LOGDEBUG("Generator received ping request"); send_unix_msg(sockd, "pong"); } - close(sockd); + Close(sockd); goto retry; out: @@ -627,10 +625,8 @@ retry: goto retry; out: - if (!ret) { - close(cs->fd); - cs->fd = -1; - } + if (!ret) + Close(cs->fd); return ret; } @@ -670,7 +666,7 @@ out: if (val) json_decref(val); if (!ret) - close(cs->fd); + Close(cs->fd); return ret; } @@ -934,7 +930,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) json_decref(req); if (!ret) { LOGWARNING("Failed to send message in auth_stratum"); - close(cs->fd); + Close(cs->fd); goto out; } @@ -979,7 +975,7 @@ out: return ret; } -static void send_subscribe(proxy_instance_t *proxi, int sockd) +static void send_subscribe(proxy_instance_t *proxi, int *sockd) { json_t *json_msg; char *msg; @@ -988,12 +984,12 @@ static void send_subscribe(proxy_instance_t *proxi, int sockd) "nonce2len", proxi->nonce2len); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } -static void send_notify(proxy_instance_t *proxi, int sockd) +static void send_notify(proxy_instance_t *proxi, int *sockd) { json_t *json_msg, *merkle_arr; notify_instance_t *ni; @@ -1017,12 +1013,12 @@ static void send_notify(proxy_instance_t *proxi, int sockd) msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } -static void send_diff(proxy_instance_t *proxi, int sockd) +static void send_diff(proxy_instance_t *proxi, int *sockd) { json_t *json_msg; char *msg; @@ -1030,9 +1026,9 @@ static void send_diff(proxy_instance_t *proxi, int sockd) JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); - send_unix_msg(sockd, msg); + send_unix_msg(*sockd, msg); free(msg); - close(sockd); + _Close(sockd); } static void submit_share(proxy_instance_t *proxi, json_t *val) @@ -1242,8 +1238,7 @@ static void *proxy_send(void *arg) free(msg); if (!ret && cs->fd > 0) { LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect"); - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } } return NULL; @@ -1350,8 +1345,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t * out: if (!ret) { /* Close and invalidate the file handle */ - close(cs->fd); - cs->fd = -1; + Close(cs->fd); } else keep_sockalive(cs->fd); return ret; @@ -1421,8 +1415,7 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) LOGNOTICE("Killing proxy"); cs = proxi->cs; - close(cs->fd); - cs->fd = -1; + Close(cs->fd); /* All our notify data is invalid if we reconnect so discard them */ mutex_lock(&proxi->notify_lock); @@ -1495,7 +1488,7 @@ retry: buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in proxy_loop"); - close(sockd); + Close(sockd); goto retry; } LOGDEBUG("Proxy received request: %s", buf); @@ -1503,11 +1496,11 @@ retry: ret = 0; goto out; } else if (cmdmatch(buf, "getsubscribe")) { - send_subscribe(proxi, sockd); + send_subscribe(proxi, &sockd); } else if (cmdmatch(buf, "getnotify")) { - send_notify(proxi, sockd); + send_notify(proxi, &sockd); } else if (cmdmatch(buf, "getdiff")) { - send_diff(proxi, sockd); + send_diff(proxi, &sockd); } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { @@ -1536,11 +1529,11 @@ retry: else submit_share(proxi, val); } - close(sockd); + Close(sockd); goto retry; out: kill_proxy(ckp, proxi); - close(sockd); + Close(sockd); return ret; } @@ -1653,7 +1646,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) for (i = 0; i < ckp->proxies; i++) { si = ckp->servers[i]; - close(si->cs.fd); + Close(si->cs.fd); proxi = si->data; free(proxi->enonce1); free(proxi->enonce1bin); diff --git a/src/libckpool.c b/src/libckpool.c index 3c749880..32d08a98 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -418,6 +418,13 @@ void keep_sockalive(int fd) setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &tcp_keepintvl, sizeof(tcp_keepintvl)); } +void nolinger_socket(int fd) +{ + const struct linger so_linger = { 1, 0 }; + + setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); +} + void noblock_socket(int fd) { int flags = fcntl(fd, F_GETFL, 0); @@ -432,6 +439,16 @@ void block_socket(int fd) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); } +void _Close(int *fd) +{ + if (*fd < 0) + return; + LOGDEBUG("Closing file handle %d", *fd); + if (unlikely(close(*fd))) + LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno)); + *fd = -1; +} + int bind_socket(char *url, char *port) { struct addrinfo servinfobase, *servinfo, hints, *p; @@ -460,8 +477,7 @@ int bind_socket(char *url, char *port) ret = bind(sockd, p->ai_addr, p->ai_addrlen); if (ret < 0) { LOGWARNING("Failed to bind socket for %s:%s", url, port); - close(sockd); - sockd = -1; + Close(sockd); goto out; } @@ -500,7 +516,7 @@ int connect_socket(char *url, char *port) int selret; if (!sock_connecting()) { - close(sockd); + Close(sockd); LOGDEBUG("Failed sock connect"); continue; } @@ -517,8 +533,7 @@ int connect_socket(char *url, char *port) break; } } - close(sockd); - sockd = -1; + Close(sockd); LOGDEBUG("Select timeout/failed connect"); continue; } @@ -575,13 +590,10 @@ void empty_socket(int fd) } while (ret > 0); } -void close_unix_socket(const int sockd, const char *server_path) +void _close_unix_socket(int *sockd, const char *server_path) { - int ret; - - ret = close(sockd); - if (unlikely(ret < 0)) - LOGERR("Failed to close sock %d %s", sockd, server_path); + LOGDEBUG("Closing unix socket %d %s", *sockd, server_path); + _Close(sockd); } int _open_unix_server(const char *server_path, const char *file, const char *func, const int line) @@ -681,8 +693,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun ret = connect(sockd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); if (unlikely(ret < 0)) { LOGERR("Failed to bind to socket in open_unix_client"); - close(sockd); - sockd = -1; + Close(sockd); goto out; } @@ -951,7 +962,7 @@ int _get_fd(int sockd, const char *file, const char *func, const int line) goto out; } out: - close(sockd); + Close(sockd); cm = (int *)CMSG_DATA(cmptr); newfd = *cm; free(cmptr); @@ -1032,7 +1043,7 @@ bool rotating_log(const char *path, const char *msg) } fp = fdopen(fd, "ae"); if (unlikely(!fp)) { - close(fd); + Close(fd); LOGERR("Failed to fdopen %s in rotating_log!", filename); goto stageleft; } diff --git a/src/libckpool.h b/src/libckpool.h index c56f4c24..a769a0ed 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -413,13 +413,17 @@ static inline bool sock_timeout(void) bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); void keep_sockalive(int fd); +void nolinger_socket(int fd); void noblock_socket(int fd); void block_socket(int fd); +void _Close(int *fd); +#define Close(FD) _Close(&FD) int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); void empty_socket(int fd); -void close_unix_socket(const int sockd, const char *server_path); +void _close_unix_socket(int *sockd, const char *server_path); +#define close_unix_socket(sockd, server_path) _close_unix_socket(&sockd, server_path) int _open_unix_server(const char *server_path, const char *file, const char *func, const int line); #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); diff --git a/src/stratifier.c b/src/stratifier.c index f473c2a6..14a30911 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -272,6 +272,7 @@ struct stratum_instance { int ssdc; /* Shares since diff change */ tv_t first_share; tv_t last_share; + time_t first_invalid; /* Time of first invalid in run of non stale rejects */ time_t start_time; char address[INET6_ADDRSTRLEN]; @@ -279,6 +280,9 @@ struct stratum_instance { bool authorised; bool idle; bool notified_idle; + int reject; /* Indicator that this client is having a run of rejects + * or other problem and should be dropped lazily if + * this is set to 2 */ user_instance_t *user_instance; worker_instance_t *worker_instance; @@ -539,7 +543,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char fflush(stdout); } - if (ckp->standalone) + if (CKP_STANDALONE(ckp)) return json_decref(val); json_msg = ckdb_msg(ckp, val, idtype); @@ -1103,7 +1107,7 @@ static void drop_client(int64_t id) LOGINFO("Stratifier dropping client %ld", id); - ck_ilock(&instance_lock); + ck_wlock(&instance_lock); client = __instance_by_id(id); if (client) { stratum_instance_t *old_client = NULL; @@ -1113,15 +1117,13 @@ static void drop_client(int64_t id) client->authorised = false; } - ck_ulock(&instance_lock); HASH_DEL(stratum_instances, client); HASH_FIND(hh, disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64) HASH_ADD(hh, disconnected_instances, enonce1_64, sizeof(uint64_t), client); - ck_dwilock(&instance_lock); } - ck_uilock(&instance_lock); + ck_wunlock(&instance_lock); if (dec) dec_worker(client->user_instance); @@ -1240,18 +1242,18 @@ retry: dealloc(buf); buf = recv_unix_msg(sockd); if (!buf) { - close(sockd); + Close(sockd); LOGWARNING("Failed to get message in stratum_loop"); goto retry; } if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); send_unix_msg(sockd, "pong"); - close(sockd); + Close(sockd); goto retry; } - close(sockd); + Close(sockd); LOGDEBUG("Stratifier received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; @@ -1441,6 +1443,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js /* Create a new extranonce1 based on a uint64_t pointer */ if (!new_enonce1(client)) { stratum_send_message(client, "Pool full of clients"); + client->reject = 2; return json_string("proxy full"); } LOGINFO("Set new subscription %ld to new enonce1 %s", client->id, @@ -1683,7 +1686,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, user_instance->username); client->workername = strdup(buf); - if (client->ckp->standalone) + if (CKP_STANDALONE(client->ckp)) ret = true; else { *errnum = send_recv_auth(client); @@ -1742,7 +1745,8 @@ static double sane_tdiff(tv_t *end, tv_t *start) return tdiff; } -static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid) +static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid, + bool submit) { worker_instance_t *worker = client->worker_instance; double tdiff, bdiff, dsps, drr, network_diff, bias; @@ -1750,9 +1754,16 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool int64_t next_blockid, optimal; tv_t now_t; - /* Ignore successive rejects in count if they haven't submitted a valid - * share yet. */ - if (unlikely(!client->ssdc && !valid)) + mutex_lock(&stats_lock); + if (valid) { + stats.unaccounted_shares++; + stats.unaccounted_diff_shares += diff; + } else + stats.unaccounted_rejects += diff; + mutex_unlock(&stats_lock); + + /* Count only accepted and stale rejects in diff calculation. */ + if (!valid && !submit) return; tv_time(&now_t); @@ -1797,14 +1808,6 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool bias = time_bias(bdiff, 300); tdiff = sane_tdiff(&now_t, &client->ldc); - mutex_lock(&stats_lock); - if (valid) { - stats.unaccounted_shares++; - stats.unaccounted_diff_shares += diff; - } else - stats.unaccounted_rejects += diff; - mutex_unlock(&stats_lock); - /* Check the difficulty every 240 seconds or as many shares as we * should have had in that time, whichever comes first. */ if (client->ssdc < 72 && tdiff < 240) @@ -1840,6 +1843,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool optimal = client->suggest_diff; } else if (optimal < worker->mindiff) optimal = worker->mindiff; + if (ckp->maxdiff && optimal > ckp->maxdiff) + optimal = ckp->maxdiff; if (optimal > network_diff) optimal = network_diff; if (client->diff == optimal) @@ -2004,19 +2009,17 @@ static bool new_share(const uchar *hash, int64_t wb_id) share_t *share, *match = NULL; bool ret = false; - ck_ilock(&share_lock); + ck_wlock(&share_lock); HASH_FIND(hh, shares, hash, 32, match); if (match) goto out_unlock; share = ckzalloc(sizeof(share_t)); memcpy(share->hash, hash, 32); share->workbase_id = wb_id; - ck_ulock(&share_lock); HASH_ADD(hh, shares, hash, 32, share); - ck_dwilock(&share_lock); ret = true; out_unlock: - ck_uilock(&share_lock); + ck_wunlock(&share_lock); return ret; } @@ -2053,17 +2056,19 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, char *fname = NULL, *s, *nonce2; enum share_err err = SE_NONE; ckpool_t *ckp = client->ckp; + workbase_t *wb = NULL; char idstring[20]; uint32_t ntime32; - workbase_t *wb; uchar hash[32]; - int64_t id; + time_t now_t; json_t *val; + int64_t id; ts_t now; FILE *fp; int len; ts_realtime(&now); + now_t = now.tv_sec; sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); if (unlikely(!json_is_array(params_val))) { @@ -2155,7 +2160,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, } invalid = false; out_submit: - if (wb->proxy && sdiff >= wdiff) + if (sdiff >= wdiff) submit = true; out_unlock: ck_runlock(&workbase_lock); @@ -2191,19 +2196,19 @@ out_unlock: /* Submit share to upstream pool in proxy mode. We submit valid and * stale shares and filter out the rest. */ - if (submit) { + if (wb && wb->proxy && submit) { LOGINFO("Submitting share upstream: %s", hexhash); submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); } - add_submit(ckp, client, diff, result); + add_submit(ckp, client, diff, result, submit); /* Now write to the pool's sharelog. */ val = json_object(); json_set_int(val, "workinfoid", id); json_set_int(val, "clientid", client->id); json_set_string(val, "enonce1", client->enonce1); - if (!ckp->standalone) + if (!CKP_STANDALONE(ckp)) json_set_string(val, "secondaryuserid", user_instance->secondaryuserid); json_set_string(val, "nonce2", nonce2); json_set_string(val, "nonce", nonce); @@ -2237,6 +2242,26 @@ out_unlock: } ckdbq_add(ckp, ID_SHARES, val); out: + if ((!result && !submit) || !share) { + /* Is this the first in a run of invalids? */ + if (client->first_invalid < client->last_share.tv_sec || !client->first_invalid) + client->first_invalid = now_t; + else if (client->first_invalid && client->first_invalid < now_t - 120) { + LOGNOTICE("Client %d rejecting for 120s, disconnecting", client->id); + stratum_send_message(client, "Disconnecting for continuous invalid shares"); + client->reject = 2; + } else if (client->first_invalid && client->first_invalid < now_t - 60) { + if (!client->reject) { + LOGINFO("Client %d rejecting for 60s, sending diff", client->id); + stratum_send_diff(client); + client->reject = 1; + } + } + } else { + client->first_invalid = 0; + client->reject = 0; + } + if (!share) { JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", "clientid", client->id, @@ -2403,6 +2428,13 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method return; } + if (unlikely(client->reject == 2)) { + LOGINFO("Dropping client %d tagged for lazy invalidation", client_id); + snprintf(buf, 255, "dropclient=%ld", client->id); + send_proc(client->ckp->connector, buf); + return; + } + /* Random broken clients send something not an integer as the id so we copy * the json item for id_val as is for the response. */ method = json_string_value(method_val); @@ -2543,15 +2575,13 @@ static void srecv_process(ckpool_t *ckp, smsg_t *msg) json_object_clear(val); /* Parse the message here */ - ck_ilock(&instance_lock); + ck_wlock(&instance_lock); instance = __instance_by_id(msg->client_id); if (!instance) { /* client_id instance doesn't exist yet, create one */ - ck_ulock(&instance_lock); instance = __stratum_add_instance(ckp, msg->client_id); - ck_dwilock(&instance_lock); } - ck_uilock(&instance_lock); + ck_wunlock(&instance_lock); parse_instance_msg(msg); @@ -2878,8 +2908,8 @@ static void *statsupdate(void *arg) sleep(1); while (42) { + double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias, bias5, bias60, bias1440; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; - double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias; char suffix360[16], suffix1440[16]; user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; @@ -2896,32 +2926,31 @@ static void *statsupdate(void *arg) timersub(&now, &stats.start_time, &diff); tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000; - bias = time_bias(tdiff, 60); - ghs1 = stats.dsps1 * nonces / bias; + ghs1 = stats.dsps1 * nonces; suffix_string(ghs1, suffix1, 16, 0); - sps1 = stats.sps1 / bias; + sps1 = stats.sps1; - bias = time_bias(tdiff, 300); - ghs5 = stats.dsps5 * nonces / bias; + bias5 = time_bias(tdiff, 300); + ghs5 = stats.dsps5 * nonces / bias5; suffix_string(ghs5, suffix5, 16, 0); - sps5 = stats.sps5 / bias; + sps5 = stats.sps5 / bias5; bias = time_bias(tdiff, 900); ghs15 = stats.dsps15 * nonces / bias; suffix_string(ghs15, suffix15, 16, 0); sps15 = stats.sps15 / bias; - bias = time_bias(tdiff, 3600); - ghs60 = stats.dsps60 * nonces / bias; + bias60 = time_bias(tdiff, 3600); + ghs60 = stats.dsps60 * nonces / bias60; suffix_string(ghs60, suffix60, 16, 0); - sps60 = stats.sps60 / bias; + sps60 = stats.sps60 / bias60; bias = time_bias(tdiff, 21600); ghs360 = stats.dsps360 * nonces / bias; suffix_string(ghs360, suffix360, 16, 0); - bias = time_bias(tdiff, 86400); - ghs1440 = stats.dsps1440 * nonces / bias; + bias1440 = time_bias(tdiff, 86400); + ghs1440 = stats.dsps1440 * nonces / bias1440; suffix_string(ghs1440, suffix1440, 16, 0); snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); @@ -2996,11 +3025,14 @@ static void *statsupdate(void *arg) } ghs = worker->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = worker->dsps5 * nonces; + + ghs = worker->dsps5 * nonces / bias5; suffix_string(ghs, suffix5, 16, 0); - ghs = worker->dsps60 * nonces; + + ghs = worker->dsps60 * nonces / bias60; suffix_string(ghs, suffix60, 16, 0); - ghs = worker->dsps1440 * nonces; + + ghs = worker->dsps1440 * nonces / bias1440; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss}", @@ -3032,11 +3064,14 @@ static void *statsupdate(void *arg) } ghs = instance->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = instance->dsps5 * nonces; + + ghs = instance->dsps5 * nonces / bias5; suffix_string(ghs, suffix5, 16, 0); - ghs = instance->dsps60 * nonces; + + ghs = instance->dsps60 * nonces / bias60; suffix_string(ghs, suffix60, 16, 0); - ghs = instance->dsps1440 * nonces; + + ghs = instance->dsps1440 * nonces / bias1440; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss,si}", @@ -3197,7 +3232,7 @@ int stratifier(proc_instance_t *pi) sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); - if (!ckp->standalone) + if (!CKP_STANDALONE(ckp)) create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); cklock_init(&workbase_lock);