kanoi 10 years ago
parent
commit
8146f4d748
  1. 6
      README
  2. 1
      ckpool.conf
  3. 1
      ckproxy.conf
  4. 1
      configure.ac
  5. 43
      src/ckpool.c
  6. 7
      src/ckpool.h
  7. 77
      src/connector.c
  8. 61
      src/generator.c
  9. 41
      src/libckpool.c
  10. 6
      src/libckpool.h
  11. 143
      src/stratifier.c

6
README

@ -138,6 +138,7 @@ ckpool supports the following options:
-A Standalone mode tells ckpool not to try to communicate with ckdb or log any -A Standalone mode tells ckpool not to try to communicate with ckdb or log any
ckdb requests in the rotating ckdb logs it would otherwise store. All users ckdb requests in the rotating ckdb logs it would otherwise store. All users
are automatically accepted without any attempt to authorise users in any way. are automatically accepted without any attempt to authorise users in any way.
This option is explicitly enabled when built without ckdb support.
-c <CONFIG> tells ckpool to override its default configuration filename and -c <CONFIG> tells ckpool to override its default configuration filename and
load the specified one. If -c is not specified, ckpool looks for ckpool.conf load the specified one. If -c is not specified, ckpool looks for ckpool.conf
@ -145,6 +146,7 @@ whereas in proxy or passthrough modes it will look for ckproxy.conf
-d <CKDB-NAME> tells ckpool what the name of the ckdb process is that it should -d <CKDB-NAME> tells ckpool what the name of the ckdb process is that it should
speak to, otherwise it will look for ckdb. speak to, otherwise it will look for ckdb.
This option does not exist when built without ckdb support.
-g <GROUP> will start ckpool as the group ID specified. -g <GROUP> will start ckpool as the group ID specified.
@ -179,6 +181,7 @@ it to scale to large hashrates. Standalone mode is Optional.
-S <CKDB-SOCKDIR> tells ckpool which directory to look for the ckdb socket to -S <CKDB-SOCKDIR> tells ckpool which directory to look for the ckdb socket to
talk to. talk to.
This option does not exist when built without ckdb support.
-s <SOCKDIR> tells ckpool which directory to place its own communication -s <SOCKDIR> tells ckpool which directory to place its own communication
sockets (/tmp by default) sockets (/tmp by default)
@ -249,4 +252,7 @@ and 3334 in proxy mode.
"startdiff" : Starting diff that new clients are given. Default 42 "startdiff" : Starting diff that new clients are given. Default 42
"maxdiff" : Optional maximum diff that vardiff will clamp to where zero is no
maximum.
"logdir" : Which directory to store pool and client logs. Default "logs" "logdir" : Which directory to store pool and client logs. Default "logs"

1
ckpool.conf

@ -20,6 +20,7 @@
"serverurl" : "ckpool.org:3333", "serverurl" : "ckpool.org:3333",
"mindiff" : 1, "mindiff" : 1,
"startdiff" : 42, "startdiff" : 42,
"maxdiff" : 0,
"logdir" : "logs" "logdir" : "logs"
} }
Comments from here on are ignored. Comments from here on are ignored.

1
ckproxy.conf

@ -15,6 +15,7 @@
"serverurl" : "192.168.1.100:3334", "serverurl" : "192.168.1.100:3334",
"mindiff" : 1, "mindiff" : 1,
"startdiff" : 42, "startdiff" : 42,
"maxdiff" : 0,
"logdir" : "logs" "logdir" : "logs"
} }
Comments from here on are ignored. Comments from here on are ignored.

1
configure.ac

@ -59,6 +59,7 @@ AC_ARG_WITH([ckdb],
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev
not found. Install it or disable postgresql support with --without-ckdb" && exit 1) not found. Install it or disable postgresql support with --without-ckdb" && exit 1)
AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required])
PQ_LIBS="-lpq" PQ_LIBS="-lpq"
else else
PQ_LIBS="" PQ_LIBS=""

43
src/ckpool.c

@ -280,10 +280,10 @@ retry:
if (newfd > 0) { if (newfd > 0) {
LOGDEBUG("Sending new fd %d", newfd); LOGDEBUG("Sending new fd %d", newfd);
send_fd(newfd, sockd); send_fd(newfd, sockd);
close(newfd); Close(newfd);
} else } else
LOGWARNING("Failed to get_fd"); LOGWARNING("Failed to get_fd");
close(connfd); Close(connfd);
} else } else
LOGWARNING("Failed to send_procmsg to connector"); LOGWARNING("Failed to send_procmsg to connector");
} else if (cmdmatch(buf, "restart")) { } else if (cmdmatch(buf, "restart")) {
@ -300,7 +300,7 @@ retry:
LOGINFO("Listener received unhandled message: %s", buf); LOGINFO("Listener received unhandled message: %s", buf);
send_unix_msg(sockd, "unknown"); send_unix_msg(sockd, "unknown");
} }
close(sockd); Close(sockd);
goto retry; goto retry;
out: out:
dealloc(buf); dealloc(buf);
@ -386,10 +386,7 @@ int read_socket_line(connsock_t *cs, int timeout)
out: out:
if (ret < 0) { if (ret < 0) {
dealloc(cs->buf); dealloc(cs->buf);
if (cs->fd > 0) { Close(cs->fd);
close(cs->fd);
cs->fd = -1;
}
} }
return ret; return ret;
} }
@ -447,7 +444,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
LOGWARNING("Failed to send %s to socket %s", msg, path); LOGWARNING("Failed to send %s to socket %s", msg, path);
else else
ret = true; ret = true;
close(sockd); Close(sockd);
out: out:
if (unlikely(!ret)) { if (unlikely(!ret)) {
LOGERR("Failure in send_proc from %s %s:%d", file, func, line); LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
@ -484,7 +481,7 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
LOGWARNING("Failed to send %s to socket %s", msg, path); LOGWARNING("Failed to send %s to socket %s", msg, path);
else else
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
close(sockd); Close(sockd);
out: out:
if (unlikely(!buf)) if (unlikely(!buf))
LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line); LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line);
@ -515,7 +512,7 @@ char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, co
LOGWARNING("Failed to send %s to ckdb", msg); LOGWARNING("Failed to send %s to ckdb", msg);
else else
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
close(sockd); Close(sockd);
out: out:
if (unlikely(!buf)) if (unlikely(!buf))
LOGERR("Failure in send_recv_ckdb from %s %s:%d", file, func, line); LOGERR("Failure in send_recv_ckdb from %s %s:%d", file, func, line);
@ -608,7 +605,7 @@ out_empty:
/* Assume that a failed request means the socket will be closed /* Assume that a failed request means the socket will be closed
* and reopen it */ * and reopen it */
LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port);
close(cs->fd); Close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port); cs->fd = connect_socket(cs->url, cs->port);
} }
out: out:
@ -983,6 +980,7 @@ static void parse_config(ckpool_t *ckp)
json_get_string(&ckp->serverurl, json_conf, "serverurl"); json_get_string(&ckp->serverurl, json_conf, "serverurl");
json_get_int64(&ckp->mindiff, json_conf, "mindiff"); json_get_int64(&ckp->mindiff, json_conf, "mindiff");
json_get_int64(&ckp->startdiff, json_conf, "startdiff"); json_get_int64(&ckp->startdiff, json_conf, "startdiff");
json_get_int64(&ckp->maxdiff, json_conf, "maxdiff");
json_get_string(&ckp->logdir, json_conf, "logdir"); json_get_string(&ckp->logdir, json_conf, "logdir");
arr_val = json_object_get(json_conf, "proxy"); arr_val = json_object_get(json_conf, "proxy");
if (arr_val && json_is_array(arr_val)) { if (arr_val && json_is_array(arr_val)) {
@ -1058,6 +1056,7 @@ static void *watchdog(void *arg)
return NULL; return NULL;
} }
#ifdef USE_CKDB
static struct option long_options[] = { static struct option long_options[] = {
{"standalone", no_argument, 0, 'A'}, {"standalone", no_argument, 0, 'A'},
{"config", required_argument, 0, 'c'}, {"config", required_argument, 0, 'c'},
@ -1075,6 +1074,22 @@ static struct option long_options[] = {
{"sockdir", required_argument, 0, 's'}, {"sockdir", required_argument, 0, 's'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
#else
static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"group", required_argument, 0, 'g'},
{"handover", no_argument, 0, 'H'},
{"help", no_argument, 0, 'h'},
{"killold", no_argument, 0, 'k'},
{"log-shares", no_argument, 0, 'L'},
{"loglevel", required_argument, 0, 'l'},
{"name", required_argument, 0, 'n'},
{"passthrough", no_argument, 0, 'P'},
{"proxy", no_argument, 0, 'p'},
{"sockdir", required_argument, 0, 's'},
{0, 0, 0, 0}
};
#endif
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
@ -1193,7 +1208,7 @@ int main(int argc, char **argv)
} }
trail_slash(&ckp.socket_dir); trail_slash(&ckp.socket_dir);
if (!ckp.standalone) { if (!CKP_STANDALONE(&ckp)) {
if (!ckp.ckdb_name) if (!ckp.ckdb_name)
ckp.ckdb_name = "ckdb"; ckp.ckdb_name = "ckdb";
if (!ckp.ckdb_sockdir) { if (!ckp.ckdb_sockdir) {
@ -1295,12 +1310,12 @@ int main(int argc, char **argv)
if (sockd > 0 && send_unix_msg(sockd, "getfd")) { if (sockd > 0 && send_unix_msg(sockd, "getfd")) {
ckp.oldconnfd = get_fd(sockd); ckp.oldconnfd = get_fd(sockd);
close(sockd); Close(sockd);
sockd = open_unix_client(ckp.main.us.path); sockd = open_unix_client(ckp.main.us.path);
send_unix_msg(sockd, "shutdown"); send_unix_msg(sockd, "shutdown");
if (ckp.oldconnfd > 0) if (ckp.oldconnfd > 0)
LOGWARNING("Inherited old socket with new file descriptor %d!", ckp.oldconnfd); LOGWARNING("Inherited old socket with new file descriptor %d!", ckp.oldconnfd);
close(sockd); Close(sockd);
} }
} }

7
src/ckpool.h

@ -150,6 +150,7 @@ struct ckpool_instance {
/* Difficulty settings */ /* Difficulty settings */
int64_t mindiff; // Default 1 int64_t mindiff; // Default 1
int64_t startdiff; // Default 42 int64_t startdiff; // Default 42
int64_t maxdiff; // No default
/* Coinbase data */ /* Coinbase data */
char *btcaddress; // Address to mine to char *btcaddress; // Address to mine to
@ -171,6 +172,12 @@ struct ckpool_instance {
server_instance_t *btcdbackup; server_instance_t *btcdbackup;
}; };
#ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else
#define CKP_STANDALONE(CKP) (true)
#endif
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);

77
src/connector.c

@ -92,7 +92,7 @@ void *acceptor(void *arg)
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client, *old_client; client_instance_t *client, *old_client;
socklen_t address_len; socklen_t address_len;
int fd; int fd, port;
rename_proc("acceptor"); rename_proc("acceptor");
@ -115,22 +115,25 @@ retry:
case AF_INET: case AF_INET:
inet4_in = (struct sockaddr_in *)&client->address; inet4_in = (struct sockaddr_in *)&client->address;
inet_ntop(AF_INET, &inet4_in->sin_addr, client->address_name, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &inet4_in->sin_addr, client->address_name, INET6_ADDRSTRLEN);
port = htons(inet4_in->sin_port);
break; break;
case AF_INET6: case AF_INET6:
inet6_in = (struct sockaddr_in6 *)&client->address; inet6_in = (struct sockaddr_in6 *)&client->address;
inet_ntop(AF_INET6, &inet6_in->sin6_addr, client->address_name, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &inet6_in->sin6_addr, client->address_name, INET6_ADDRSTRLEN);
port = htons(inet6_in->sin6_port);
break; break;
default: default:
LOGWARNING("Unknown INET type for client %d on socket %d", LOGWARNING("Unknown INET type for client %d on socket %d",
ci->nfds, fd); ci->nfds, fd);
close(fd); Close(fd);
free(client); free(client);
goto retry; goto retry;
} }
keep_sockalive(fd); keep_sockalive(fd);
nolinger_socket(fd);
LOGINFO("Connected new client %d on socket %d from %s", ci->nfds, fd, client->address_name); LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port);
client->fd = fd; client->fd = fd;
@ -153,11 +156,10 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client)
ck_wlock(&ci->lock); ck_wlock(&ci->lock);
fd = client->fd; fd = client->fd;
if (fd != -1) { if (fd != -1) {
close(fd); Close(client->fd);
HASH_DEL(clients, client); HASH_DEL(clients, client);
HASH_DELETE(fdhh, fdclients, client); HASH_DELETE(fdhh, fdclients, client);
LL_PREPEND(dead_clients, client); LL_PREPEND(dead_clients, client);
client->fd = -1;
} }
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
@ -170,11 +172,8 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client)
static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client) static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client)
{ {
char buf[256]; char buf[256];
int fd;
fd = drop_client(ci, client); drop_client(ci, client);
if (fd == -1)
return;
if (ckp->passthrough) if (ckp->passthrough)
return; return;
sprintf(buf, "dropclient=%ld", client->id); sprintf(buf, "dropclient=%ld", client->id);
@ -185,34 +184,37 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
{ {
int buflen, ret, selfail = 0;
ckpool_t *ckp = ci->pi->ckp; ckpool_t *ckp = ci->pi->ckp;
int buflen, ret, flags = 0;
char msg[PAGESIZE], *eol; char msg[PAGESIZE], *eol;
bool moredata = false;
json_t *val; json_t *val;
retry: retry:
buflen = PAGESIZE - client->bufofs; /* Select should always return positive after poll unless we have
if (moredata) * been disconnected. On retries, decide whether we should do further
flags = MSG_DONTWAIT; * reads based on select readiness and only fail if we get an error. */
ret = recv(client->fd, client->buf + client->bufofs, buflen, flags); ret = wait_read_select(client->fd, 0);
if (ret < 1) { if (ret < 1) {
/* Nothing else ready to be read */ if (ret > selfail)
if (!ret && flags)
return; return;
LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client);
return;
}
selfail = -1;
buflen = PAGESIZE - client->bufofs;
ret = recv(client->fd, client->buf + client->bufofs, buflen, 0);
if (ret < 1) {
/* We should have something to read if called since poll set /* We should have something to read if called since poll set
* this fd's revents status so if there's nothing it means the * this fd's revents status so if there's nothing it means the
* client has disconnected. */ * client has disconnected. */
LOGINFO("Client fd %d disconnected", client->fd); LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client); invalidate_client(ckp, ci, client);
return; return;
} }
client->bufofs += ret; client->bufofs += ret;
if (client->bufofs == PAGESIZE)
moredata = true;
else
moredata = false;
reparse: reparse:
eol = memchr(client->buf, '\n', client->bufofs); eol = memchr(client->buf, '\n', client->bufofs);
if (!eol) { if (!eol) {
@ -221,9 +223,7 @@ reparse:
invalidate_client(ckp, ci, client); invalidate_client(ckp, ci, client);
return; return;
} }
if (moredata)
goto retry; goto retry;
return;
} }
/* Do something useful with this message now */ /* Do something useful with this message now */
@ -234,9 +234,10 @@ reparse:
return; return;
} }
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
msg[buflen] = 0; msg[buflen] = '\0';
client->bufofs -= buflen; client->bufofs -= buflen;
memmove(client->buf, client->buf + buflen, client->bufofs); memmove(client->buf, client->buf + buflen, client->bufofs);
client->buf[client->bufofs] = '\0';
if (!(val = json_loads(msg, 0, NULL))) { if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n"); char *buf = strdup("Invalid JSON, disconnecting\n");
@ -267,6 +268,7 @@ reparse:
if (client->bufofs) if (client->bufofs)
goto reparse; goto reparse;
goto retry;
} }
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
@ -287,6 +289,11 @@ retry:
ck_rlock(&ci->lock); ck_rlock(&ci->lock);
HASH_ITER(fdhh, fdclients, client, tmp) { HASH_ITER(fdhh, fdclients, client, tmp) {
if (unlikely(client->fd == -1)) {
LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!",
client->id);
continue;
}
fds[nfds].fd = client->fd; fds[nfds].fd = client->fd;
fds[nfds].events = POLLIN; fds[nfds].events = POLLIN;
fds[nfds].revents = 0; fds[nfds].revents = 0;
@ -298,7 +305,7 @@ retry:
cksleep_ms(100); cksleep_ms(100);
goto retry; goto retry;
} }
ret = poll(fds, nfds, 1000); ret = poll(fds, nfds, 100);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGERR("Failed to poll in receiver"); LOGERR("Failed to poll in receiver");
goto out; goto out;
@ -398,7 +405,8 @@ void *sender(void *arg)
ret = wait_write_select(fd, 0); ret = wait_write_select(fd, 0);
if (ret < 1) { if (ret < 1) {
if (ret < 0) { if (ret < 0) {
LOGDEBUG("Discarding message sent to interrupted client"); LOGINFO("Client id %d fd %d interrupted", client->id, fd);
invalidate_client(ckp, ci, client);
free(sender_send->buf); free(sender_send->buf);
free(sender_send); free(sender_send);
continue; continue;
@ -415,7 +423,7 @@ void *sender(void *arg)
while (sender_send->len) { while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGINFO("Client id %d disconnected", client->id); LOGINFO("Client id %d fd %d disconnected", client->id, fd);
invalidate_client(ckp, ci, client); invalidate_client(ckp, ci, client);
break; break;
} }
@ -456,7 +464,8 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf)
if (unlikely(fd == -1)) { if (unlikely(fd == -1)) {
if (client) { if (client) {
LOGINFO("Client id %ld disconnected", id); /* This shouldn't happen */
LOGWARNING("Client id %ld disconnected but fd already invalidated!", id);
invalidate_client(ci->pi->ckp, ci, client); invalidate_client(ci->pi->ckp, ci, client);
} else } else
LOGINFO("Connector failed to find client id %ld to send to", id); LOGINFO("Connector failed to find client id %ld to send to", id);
@ -517,7 +526,7 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
LOGWARNING("%s connector ready", ckp->name); LOGWARNING("%s connector ready", ckp->name);
retry: retry:
close(sockd); Close(sockd);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGEMERG("Failed to accept on connector socket, exiting"); LOGEMERG("Failed to accept on connector socket, exiting");
@ -621,7 +630,7 @@ retry:
goto retry; goto retry;
out: out:
close(sockd); Close(sockd);
dealloc(buf); dealloc(buf);
return ret; return ret;
} }
@ -684,7 +693,7 @@ int connector(proc_instance_t *pi)
} while (++tries < 25); } while (++tries < 25);
if (ret < 0) { if (ret < 0) {
LOGERR("Connector failed to bind to socket for 2 minutes"); LOGERR("Connector failed to bind to socket for 2 minutes");
close(sockd); Close(sockd);
goto out; goto out;
} }
} }
@ -694,7 +703,7 @@ int connector(proc_instance_t *pi)
ret = listen(sockd, 10); ret = listen(sockd, 10);
if (ret < 0) { if (ret < 0) {
LOGERR("Connector failed to listen on socket"); LOGERR("Connector failed to listen on socket");
close(sockd); Close(sockd);
goto out; goto out;
} }

61
src/generator.c

@ -177,8 +177,7 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
out: out:
if (!ret) { if (!ret) {
/* Close and invalidate the file handle */ /* Close and invalidate the file handle */
close(cs->fd); Close(cs->fd);
cs->fd = -1;
} else } else
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
return ret; return ret;
@ -225,8 +224,7 @@ static void kill_server(server_instance_t *si)
LOGNOTICE("Killing server"); LOGNOTICE("Killing server");
cs = &si->cs; cs = &si->cs;
close(cs->fd); Close(cs->fd);
cs->fd = -1;
dealloc(cs->url); dealloc(cs->url);
dealloc(cs->port); dealloc(cs->port);
dealloc(cs->auth); dealloc(cs->auth);
@ -290,7 +288,7 @@ retry:
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
if (!buf) { if (!buf) {
LOGWARNING("Failed to get message in gen_loop"); LOGWARNING("Failed to get message in gen_loop");
close(sockd); Close(sockd);
goto retry; goto retry;
} }
LOGDEBUG("Generator received request: %s", buf); LOGDEBUG("Generator received request: %s", buf);
@ -365,7 +363,7 @@ retry:
LOGDEBUG("Generator received ping request"); LOGDEBUG("Generator received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
} }
close(sockd); Close(sockd);
goto retry; goto retry;
out: out:
@ -627,10 +625,8 @@ retry:
goto retry; goto retry;
out: out:
if (!ret) { if (!ret)
close(cs->fd); Close(cs->fd);
cs->fd = -1;
}
return ret; return ret;
} }
@ -670,7 +666,7 @@ out:
if (val) if (val)
json_decref(val); json_decref(val);
if (!ret) if (!ret)
close(cs->fd); Close(cs->fd);
return ret; return ret;
} }
@ -934,7 +930,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
LOGWARNING("Failed to send message in auth_stratum"); LOGWARNING("Failed to send message in auth_stratum");
close(cs->fd); Close(cs->fd);
goto out; goto out;
} }
@ -979,7 +975,7 @@ out:
return ret; return ret;
} }
static void send_subscribe(proxy_instance_t *proxi, int sockd) static void send_subscribe(proxy_instance_t *proxi, int *sockd)
{ {
json_t *json_msg; json_t *json_msg;
char *msg; char *msg;
@ -988,12 +984,12 @@ static void send_subscribe(proxy_instance_t *proxi, int sockd)
"nonce2len", proxi->nonce2len); "nonce2len", proxi->nonce2len);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
send_unix_msg(sockd, msg); send_unix_msg(*sockd, msg);
free(msg); free(msg);
close(sockd); _Close(sockd);
} }
static void send_notify(proxy_instance_t *proxi, int sockd) static void send_notify(proxy_instance_t *proxi, int *sockd)
{ {
json_t *json_msg, *merkle_arr; json_t *json_msg, *merkle_arr;
notify_instance_t *ni; notify_instance_t *ni;
@ -1017,12 +1013,12 @@ static void send_notify(proxy_instance_t *proxi, int sockd)
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
send_unix_msg(sockd, msg); send_unix_msg(*sockd, msg);
free(msg); free(msg);
close(sockd); _Close(sockd);
} }
static void send_diff(proxy_instance_t *proxi, int sockd) static void send_diff(proxy_instance_t *proxi, int *sockd)
{ {
json_t *json_msg; json_t *json_msg;
char *msg; char *msg;
@ -1030,9 +1026,9 @@ static void send_diff(proxy_instance_t *proxi, int sockd)
JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff); JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
send_unix_msg(sockd, msg); send_unix_msg(*sockd, msg);
free(msg); free(msg);
close(sockd); _Close(sockd);
} }
static void submit_share(proxy_instance_t *proxi, json_t *val) static void submit_share(proxy_instance_t *proxi, json_t *val)
@ -1242,8 +1238,7 @@ static void *proxy_send(void *arg)
free(msg); free(msg);
if (!ret && cs->fd > 0) { if (!ret && cs->fd > 0) {
LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect"); LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect");
close(cs->fd); Close(cs->fd);
cs->fd = -1;
} }
} }
return NULL; return NULL;
@ -1350,8 +1345,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
out: out:
if (!ret) { if (!ret) {
/* Close and invalidate the file handle */ /* Close and invalidate the file handle */
close(cs->fd); Close(cs->fd);
cs->fd = -1;
} else } else
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
return ret; return ret;
@ -1421,8 +1415,7 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi)
LOGNOTICE("Killing proxy"); LOGNOTICE("Killing proxy");
cs = proxi->cs; cs = proxi->cs;
close(cs->fd); Close(cs->fd);
cs->fd = -1;
/* All our notify data is invalid if we reconnect so discard them */ /* All our notify data is invalid if we reconnect so discard them */
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
@ -1495,7 +1488,7 @@ retry:
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
if (!buf) { if (!buf) {
LOGWARNING("Failed to get message in proxy_loop"); LOGWARNING("Failed to get message in proxy_loop");
close(sockd); Close(sockd);
goto retry; goto retry;
} }
LOGDEBUG("Proxy received request: %s", buf); LOGDEBUG("Proxy received request: %s", buf);
@ -1503,11 +1496,11 @@ retry:
ret = 0; ret = 0;
goto out; goto out;
} else if (cmdmatch(buf, "getsubscribe")) { } else if (cmdmatch(buf, "getsubscribe")) {
send_subscribe(proxi, sockd); send_subscribe(proxi, &sockd);
} else if (cmdmatch(buf, "getnotify")) { } else if (cmdmatch(buf, "getnotify")) {
send_notify(proxi, sockd); send_notify(proxi, &sockd);
} else if (cmdmatch(buf, "getdiff")) { } else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, sockd); send_diff(proxi, &sockd);
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
goto reconnect; goto reconnect;
} else if (cmdmatch(buf, "submitblock:")) { } else if (cmdmatch(buf, "submitblock:")) {
@ -1536,11 +1529,11 @@ retry:
else else
submit_share(proxi, val); submit_share(proxi, val);
} }
close(sockd); Close(sockd);
goto retry; goto retry;
out: out:
kill_proxy(ckp, proxi); kill_proxy(ckp, proxi);
close(sockd); Close(sockd);
return ret; return ret;
} }
@ -1653,7 +1646,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
for (i = 0; i < ckp->proxies; i++) { for (i = 0; i < ckp->proxies; i++) {
si = ckp->servers[i]; si = ckp->servers[i];
close(si->cs.fd); Close(si->cs.fd);
proxi = si->data; proxi = si->data;
free(proxi->enonce1); free(proxi->enonce1);
free(proxi->enonce1bin); free(proxi->enonce1bin);

41
src/libckpool.c

@ -418,6 +418,13 @@ void keep_sockalive(int fd)
setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &tcp_keepintvl, sizeof(tcp_keepintvl)); setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &tcp_keepintvl, sizeof(tcp_keepintvl));
} }
void nolinger_socket(int fd)
{
const struct linger so_linger = { 1, 0 };
setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
}
void noblock_socket(int fd) void noblock_socket(int fd)
{ {
int flags = fcntl(fd, F_GETFL, 0); int flags = fcntl(fd, F_GETFL, 0);
@ -432,6 +439,16 @@ void block_socket(int fd)
fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
} }
void _Close(int *fd)
{
if (*fd < 0)
return;
LOGDEBUG("Closing file handle %d", *fd);
if (unlikely(close(*fd)))
LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno));
*fd = -1;
}
int bind_socket(char *url, char *port) int bind_socket(char *url, char *port)
{ {
struct addrinfo servinfobase, *servinfo, hints, *p; struct addrinfo servinfobase, *servinfo, hints, *p;
@ -460,8 +477,7 @@ int bind_socket(char *url, char *port)
ret = bind(sockd, p->ai_addr, p->ai_addrlen); ret = bind(sockd, p->ai_addr, p->ai_addrlen);
if (ret < 0) { if (ret < 0) {
LOGWARNING("Failed to bind socket for %s:%s", url, port); LOGWARNING("Failed to bind socket for %s:%s", url, port);
close(sockd); Close(sockd);
sockd = -1;
goto out; goto out;
} }
@ -500,7 +516,7 @@ int connect_socket(char *url, char *port)
int selret; int selret;
if (!sock_connecting()) { if (!sock_connecting()) {
close(sockd); Close(sockd);
LOGDEBUG("Failed sock connect"); LOGDEBUG("Failed sock connect");
continue; continue;
} }
@ -517,8 +533,7 @@ int connect_socket(char *url, char *port)
break; break;
} }
} }
close(sockd); Close(sockd);
sockd = -1;
LOGDEBUG("Select timeout/failed connect"); LOGDEBUG("Select timeout/failed connect");
continue; continue;
} }
@ -575,13 +590,10 @@ void empty_socket(int fd)
} while (ret > 0); } while (ret > 0);
} }
void close_unix_socket(const int sockd, const char *server_path) void _close_unix_socket(int *sockd, const char *server_path)
{ {
int ret; LOGDEBUG("Closing unix socket %d %s", *sockd, server_path);
_Close(sockd);
ret = close(sockd);
if (unlikely(ret < 0))
LOGERR("Failed to close sock %d %s", sockd, server_path);
} }
int _open_unix_server(const char *server_path, const char *file, const char *func, const int line) int _open_unix_server(const char *server_path, const char *file, const char *func, const int line)
@ -681,8 +693,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun
ret = connect(sockd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); ret = connect(sockd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGERR("Failed to bind to socket in open_unix_client"); LOGERR("Failed to bind to socket in open_unix_client");
close(sockd); Close(sockd);
sockd = -1;
goto out; goto out;
} }
@ -951,7 +962,7 @@ int _get_fd(int sockd, const char *file, const char *func, const int line)
goto out; goto out;
} }
out: out:
close(sockd); Close(sockd);
cm = (int *)CMSG_DATA(cmptr); cm = (int *)CMSG_DATA(cmptr);
newfd = *cm; newfd = *cm;
free(cmptr); free(cmptr);
@ -1032,7 +1043,7 @@ bool rotating_log(const char *path, const char *msg)
} }
fp = fdopen(fd, "ae"); fp = fdopen(fd, "ae");
if (unlikely(!fp)) { if (unlikely(!fp)) {
close(fd); Close(fd);
LOGERR("Failed to fdopen %s in rotating_log!", filename); LOGERR("Failed to fdopen %s in rotating_log!", filename);
goto stageleft; goto stageleft;
} }

6
src/libckpool.h

@ -413,13 +413,17 @@ static inline bool sock_timeout(void)
bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port);
void keep_sockalive(int fd); void keep_sockalive(int fd);
void nolinger_socket(int fd);
void noblock_socket(int fd); void noblock_socket(int fd);
void block_socket(int fd); void block_socket(int fd);
void _Close(int *fd);
#define Close(FD) _Close(&FD)
int bind_socket(char *url, char *port); int bind_socket(char *url, char *port);
int connect_socket(char *url, char *port); int connect_socket(char *url, char *port);
int write_socket(int fd, const void *buf, size_t nbyte); int write_socket(int fd, const void *buf, size_t nbyte);
void empty_socket(int fd); void empty_socket(int fd);
void close_unix_socket(const int sockd, const char *server_path); void _close_unix_socket(int *sockd, const char *server_path);
#define close_unix_socket(sockd, server_path) _close_unix_socket(&sockd, server_path)
int _open_unix_server(const char *server_path, const char *file, const char *func, const int line); int _open_unix_server(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__)
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);

143
src/stratifier.c

@ -272,6 +272,7 @@ struct stratum_instance {
int ssdc; /* Shares since diff change */ int ssdc; /* Shares since diff change */
tv_t first_share; tv_t first_share;
tv_t last_share; tv_t last_share;
time_t first_invalid; /* Time of first invalid in run of non stale rejects */
time_t start_time; time_t start_time;
char address[INET6_ADDRSTRLEN]; char address[INET6_ADDRSTRLEN];
@ -279,6 +280,9 @@ struct stratum_instance {
bool authorised; bool authorised;
bool idle; bool idle;
bool notified_idle; bool notified_idle;
int reject; /* Indicator that this client is having a run of rejects
* or other problem and should be dropped lazily if
* this is set to 2 */
user_instance_t *user_instance; user_instance_t *user_instance;
worker_instance_t *worker_instance; worker_instance_t *worker_instance;
@ -539,7 +543,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
fflush(stdout); fflush(stdout);
} }
if (ckp->standalone) if (CKP_STANDALONE(ckp))
return json_decref(val); return json_decref(val);
json_msg = ckdb_msg(ckp, val, idtype); json_msg = ckdb_msg(ckp, val, idtype);
@ -1103,7 +1107,7 @@ static void drop_client(int64_t id)
LOGINFO("Stratifier dropping client %ld", id); LOGINFO("Stratifier dropping client %ld", id);
ck_ilock(&instance_lock); ck_wlock(&instance_lock);
client = __instance_by_id(id); client = __instance_by_id(id);
if (client) { if (client) {
stratum_instance_t *old_client = NULL; stratum_instance_t *old_client = NULL;
@ -1113,15 +1117,13 @@ static void drop_client(int64_t id)
client->authorised = false; client->authorised = false;
} }
ck_ulock(&instance_lock);
HASH_DEL(stratum_instances, client); HASH_DEL(stratum_instances, client);
HASH_FIND(hh, disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); HASH_FIND(hh, disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */ /* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64) if (!client->ckp->proxy && !old_client && client->enonce1_64)
HASH_ADD(hh, disconnected_instances, enonce1_64, sizeof(uint64_t), client); HASH_ADD(hh, disconnected_instances, enonce1_64, sizeof(uint64_t), client);
ck_dwilock(&instance_lock);
} }
ck_uilock(&instance_lock); ck_wunlock(&instance_lock);
if (dec) if (dec)
dec_worker(client->user_instance); dec_worker(client->user_instance);
@ -1240,18 +1242,18 @@ retry:
dealloc(buf); dealloc(buf);
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
if (!buf) { if (!buf) {
close(sockd); Close(sockd);
LOGWARNING("Failed to get message in stratum_loop"); LOGWARNING("Failed to get message in stratum_loop");
goto retry; goto retry;
} }
if (cmdmatch(buf, "ping")) { if (cmdmatch(buf, "ping")) {
LOGDEBUG("Stratifier received ping request"); LOGDEBUG("Stratifier received ping request");
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
close(sockd); Close(sockd);
goto retry; goto retry;
} }
close(sockd); Close(sockd);
LOGDEBUG("Stratifier received request: %s", buf); LOGDEBUG("Stratifier received request: %s", buf);
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
@ -1441,6 +1443,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js
/* Create a new extranonce1 based on a uint64_t pointer */ /* Create a new extranonce1 based on a uint64_t pointer */
if (!new_enonce1(client)) { if (!new_enonce1(client)) {
stratum_send_message(client, "Pool full of clients"); stratum_send_message(client, "Pool full of clients");
client->reject = 2;
return json_string("proxy full"); return json_string("proxy full");
} }
LOGINFO("Set new subscription %ld to new enonce1 %s", client->id, LOGINFO("Set new subscription %ld to new enonce1 %s", client->id,
@ -1683,7 +1686,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf,
user_instance->username); user_instance->username);
client->workername = strdup(buf); client->workername = strdup(buf);
if (client->ckp->standalone) if (CKP_STANDALONE(client->ckp))
ret = true; ret = true;
else { else {
*errnum = send_recv_auth(client); *errnum = send_recv_auth(client);
@ -1742,7 +1745,8 @@ static double sane_tdiff(tv_t *end, tv_t *start)
return tdiff; return tdiff;
} }
static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid) static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid,
bool submit)
{ {
worker_instance_t *worker = client->worker_instance; worker_instance_t *worker = client->worker_instance;
double tdiff, bdiff, dsps, drr, network_diff, bias; double tdiff, bdiff, dsps, drr, network_diff, bias;
@ -1750,9 +1754,16 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
int64_t next_blockid, optimal; int64_t next_blockid, optimal;
tv_t now_t; tv_t now_t;
/* Ignore successive rejects in count if they haven't submitted a valid mutex_lock(&stats_lock);
* share yet. */ if (valid) {
if (unlikely(!client->ssdc && !valid)) stats.unaccounted_shares++;
stats.unaccounted_diff_shares += diff;
} else
stats.unaccounted_rejects += diff;
mutex_unlock(&stats_lock);
/* Count only accepted and stale rejects in diff calculation. */
if (!valid && !submit)
return; return;
tv_time(&now_t); tv_time(&now_t);
@ -1797,14 +1808,6 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
bias = time_bias(bdiff, 300); bias = time_bias(bdiff, 300);
tdiff = sane_tdiff(&now_t, &client->ldc); tdiff = sane_tdiff(&now_t, &client->ldc);
mutex_lock(&stats_lock);
if (valid) {
stats.unaccounted_shares++;
stats.unaccounted_diff_shares += diff;
} else
stats.unaccounted_rejects += diff;
mutex_unlock(&stats_lock);
/* Check the difficulty every 240 seconds or as many shares as we /* Check the difficulty every 240 seconds or as many shares as we
* should have had in that time, whichever comes first. */ * should have had in that time, whichever comes first. */
if (client->ssdc < 72 && tdiff < 240) if (client->ssdc < 72 && tdiff < 240)
@ -1840,6 +1843,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
optimal = client->suggest_diff; optimal = client->suggest_diff;
} else if (optimal < worker->mindiff) } else if (optimal < worker->mindiff)
optimal = worker->mindiff; optimal = worker->mindiff;
if (ckp->maxdiff && optimal > ckp->maxdiff)
optimal = ckp->maxdiff;
if (optimal > network_diff) if (optimal > network_diff)
optimal = network_diff; optimal = network_diff;
if (client->diff == optimal) if (client->diff == optimal)
@ -2004,19 +2009,17 @@ static bool new_share(const uchar *hash, int64_t wb_id)
share_t *share, *match = NULL; share_t *share, *match = NULL;
bool ret = false; bool ret = false;
ck_ilock(&share_lock); ck_wlock(&share_lock);
HASH_FIND(hh, shares, hash, 32, match); HASH_FIND(hh, shares, hash, 32, match);
if (match) if (match)
goto out_unlock; goto out_unlock;
share = ckzalloc(sizeof(share_t)); share = ckzalloc(sizeof(share_t));
memcpy(share->hash, hash, 32); memcpy(share->hash, hash, 32);
share->workbase_id = wb_id; share->workbase_id = wb_id;
ck_ulock(&share_lock);
HASH_ADD(hh, shares, hash, 32, share); HASH_ADD(hh, shares, hash, 32, share);
ck_dwilock(&share_lock);
ret = true; ret = true;
out_unlock: out_unlock:
ck_uilock(&share_lock); ck_wunlock(&share_lock);
return ret; return ret;
} }
@ -2053,17 +2056,19 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
char *fname = NULL, *s, *nonce2; char *fname = NULL, *s, *nonce2;
enum share_err err = SE_NONE; enum share_err err = SE_NONE;
ckpool_t *ckp = client->ckp; ckpool_t *ckp = client->ckp;
workbase_t *wb = NULL;
char idstring[20]; char idstring[20];
uint32_t ntime32; uint32_t ntime32;
workbase_t *wb;
uchar hash[32]; uchar hash[32];
int64_t id; time_t now_t;
json_t *val; json_t *val;
int64_t id;
ts_t now; ts_t now;
FILE *fp; FILE *fp;
int len; int len;
ts_realtime(&now); ts_realtime(&now);
now_t = now.tv_sec;
sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec);
if (unlikely(!json_is_array(params_val))) { if (unlikely(!json_is_array(params_val))) {
@ -2155,7 +2160,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
} }
invalid = false; invalid = false;
out_submit: out_submit:
if (wb->proxy && sdiff >= wdiff) if (sdiff >= wdiff)
submit = true; submit = true;
out_unlock: out_unlock:
ck_runlock(&workbase_lock); ck_runlock(&workbase_lock);
@ -2191,19 +2196,19 @@ out_unlock:
/* Submit share to upstream pool in proxy mode. We submit valid and /* Submit share to upstream pool in proxy mode. We submit valid and
* stale shares and filter out the rest. */ * stale shares and filter out the rest. */
if (submit) { if (wb && wb->proxy && submit) {
LOGINFO("Submitting share upstream: %s", hexhash); LOGINFO("Submitting share upstream: %s", hexhash);
submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id")));
} }
add_submit(ckp, client, diff, result); add_submit(ckp, client, diff, result, submit);
/* Now write to the pool's sharelog. */ /* Now write to the pool's sharelog. */
val = json_object(); val = json_object();
json_set_int(val, "workinfoid", id); json_set_int(val, "workinfoid", id);
json_set_int(val, "clientid", client->id); json_set_int(val, "clientid", client->id);
json_set_string(val, "enonce1", client->enonce1); json_set_string(val, "enonce1", client->enonce1);
if (!ckp->standalone) if (!CKP_STANDALONE(ckp))
json_set_string(val, "secondaryuserid", user_instance->secondaryuserid); json_set_string(val, "secondaryuserid", user_instance->secondaryuserid);
json_set_string(val, "nonce2", nonce2); json_set_string(val, "nonce2", nonce2);
json_set_string(val, "nonce", nonce); json_set_string(val, "nonce", nonce);
@ -2237,6 +2242,26 @@ out_unlock:
} }
ckdbq_add(ckp, ID_SHARES, val); ckdbq_add(ckp, ID_SHARES, val);
out: out:
if ((!result && !submit) || !share) {
/* Is this the first in a run of invalids? */
if (client->first_invalid < client->last_share.tv_sec || !client->first_invalid)
client->first_invalid = now_t;
else if (client->first_invalid && client->first_invalid < now_t - 120) {
LOGNOTICE("Client %d rejecting for 120s, disconnecting", client->id);
stratum_send_message(client, "Disconnecting for continuous invalid shares");
client->reject = 2;
} else if (client->first_invalid && client->first_invalid < now_t - 60) {
if (!client->reject) {
LOGINFO("Client %d rejecting for 60s, sending diff", client->id);
stratum_send_diff(client);
client->reject = 1;
}
}
} else {
client->first_invalid = 0;
client->reject = 0;
}
if (!share) { if (!share) {
JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
"clientid", client->id, "clientid", client->id,
@ -2403,6 +2428,13 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
return; return;
} }
if (unlikely(client->reject == 2)) {
LOGINFO("Dropping client %d tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%ld", client->id);
send_proc(client->ckp->connector, buf);
return;
}
/* Random broken clients send something not an integer as the id so we copy /* Random broken clients send something not an integer as the id so we copy
* the json item for id_val as is for the response. */ * the json item for id_val as is for the response. */
method = json_string_value(method_val); method = json_string_value(method_val);
@ -2543,15 +2575,13 @@ static void srecv_process(ckpool_t *ckp, smsg_t *msg)
json_object_clear(val); json_object_clear(val);
/* Parse the message here */ /* Parse the message here */
ck_ilock(&instance_lock); ck_wlock(&instance_lock);
instance = __instance_by_id(msg->client_id); instance = __instance_by_id(msg->client_id);
if (!instance) { if (!instance) {
/* client_id instance doesn't exist yet, create one */ /* client_id instance doesn't exist yet, create one */
ck_ulock(&instance_lock);
instance = __stratum_add_instance(ckp, msg->client_id); instance = __stratum_add_instance(ckp, msg->client_id);
ck_dwilock(&instance_lock);
} }
ck_uilock(&instance_lock); ck_wunlock(&instance_lock);
parse_instance_msg(msg); parse_instance_msg(msg);
@ -2878,8 +2908,8 @@ static void *statsupdate(void *arg)
sleep(1); sleep(1);
while (42) { while (42) {
double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias, bias5, bias60, bias1440;
char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64];
double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, tdiff, bias;
char suffix360[16], suffix1440[16]; char suffix360[16], suffix1440[16];
user_instance_t *instance, *tmpuser; user_instance_t *instance, *tmpuser;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
@ -2896,32 +2926,31 @@ static void *statsupdate(void *arg)
timersub(&now, &stats.start_time, &diff); timersub(&now, &stats.start_time, &diff);
tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000; tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000;
bias = time_bias(tdiff, 60); ghs1 = stats.dsps1 * nonces;
ghs1 = stats.dsps1 * nonces / bias;
suffix_string(ghs1, suffix1, 16, 0); suffix_string(ghs1, suffix1, 16, 0);
sps1 = stats.sps1 / bias; sps1 = stats.sps1;
bias = time_bias(tdiff, 300); bias5 = time_bias(tdiff, 300);
ghs5 = stats.dsps5 * nonces / bias; ghs5 = stats.dsps5 * nonces / bias5;
suffix_string(ghs5, suffix5, 16, 0); suffix_string(ghs5, suffix5, 16, 0);
sps5 = stats.sps5 / bias; sps5 = stats.sps5 / bias5;
bias = time_bias(tdiff, 900); bias = time_bias(tdiff, 900);
ghs15 = stats.dsps15 * nonces / bias; ghs15 = stats.dsps15 * nonces / bias;
suffix_string(ghs15, suffix15, 16, 0); suffix_string(ghs15, suffix15, 16, 0);
sps15 = stats.sps15 / bias; sps15 = stats.sps15 / bias;
bias = time_bias(tdiff, 3600); bias60 = time_bias(tdiff, 3600);
ghs60 = stats.dsps60 * nonces / bias; ghs60 = stats.dsps60 * nonces / bias60;
suffix_string(ghs60, suffix60, 16, 0); suffix_string(ghs60, suffix60, 16, 0);
sps60 = stats.sps60 / bias; sps60 = stats.sps60 / bias60;
bias = time_bias(tdiff, 21600); bias = time_bias(tdiff, 21600);
ghs360 = stats.dsps360 * nonces / bias; ghs360 = stats.dsps360 * nonces / bias;
suffix_string(ghs360, suffix360, 16, 0); suffix_string(ghs360, suffix360, 16, 0);
bias = time_bias(tdiff, 86400); bias1440 = time_bias(tdiff, 86400);
ghs1440 = stats.dsps1440 * nonces / bias; ghs1440 = stats.dsps1440 * nonces / bias1440;
suffix_string(ghs1440, suffix1440, 16, 0); suffix_string(ghs1440, suffix1440, 16, 0);
snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir);
@ -2996,11 +3025,14 @@ static void *statsupdate(void *arg)
} }
ghs = worker->dsps1 * nonces; ghs = worker->dsps1 * nonces;
suffix_string(ghs, suffix1, 16, 0); suffix_string(ghs, suffix1, 16, 0);
ghs = worker->dsps5 * nonces;
ghs = worker->dsps5 * nonces / bias5;
suffix_string(ghs, suffix5, 16, 0); suffix_string(ghs, suffix5, 16, 0);
ghs = worker->dsps60 * nonces;
ghs = worker->dsps60 * nonces / bias60;
suffix_string(ghs, suffix60, 16, 0); suffix_string(ghs, suffix60, 16, 0);
ghs = worker->dsps1440 * nonces;
ghs = worker->dsps1440 * nonces / bias1440;
suffix_string(ghs, suffix1440, 16, 0); suffix_string(ghs, suffix1440, 16, 0);
JSON_CPACK(val, "{ss,ss,ss,ss}", JSON_CPACK(val, "{ss,ss,ss,ss}",
@ -3032,11 +3064,14 @@ static void *statsupdate(void *arg)
} }
ghs = instance->dsps1 * nonces; ghs = instance->dsps1 * nonces;
suffix_string(ghs, suffix1, 16, 0); suffix_string(ghs, suffix1, 16, 0);
ghs = instance->dsps5 * nonces;
ghs = instance->dsps5 * nonces / bias5;
suffix_string(ghs, suffix5, 16, 0); suffix_string(ghs, suffix5, 16, 0);
ghs = instance->dsps60 * nonces;
ghs = instance->dsps60 * nonces / bias60;
suffix_string(ghs, suffix60, 16, 0); suffix_string(ghs, suffix60, 16, 0);
ghs = instance->dsps1440 * nonces;
ghs = instance->dsps1440 * nonces / bias1440;
suffix_string(ghs, suffix1440, 16, 0); suffix_string(ghs, suffix1440, 16, 0);
JSON_CPACK(val, "{ss,ss,ss,ss,si}", JSON_CPACK(val, "{ss,ss,ss,ss,si}",
@ -3197,7 +3232,7 @@ int stratifier(proc_instance_t *pi)
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!ckp->standalone) if (!CKP_STANDALONE(ckp))
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);

Loading…
Cancel
Save