kanoi 10 years ago
parent
commit
a6496d4211
  1. 1
      Makefile.am
  2. 3
      README
  3. 2
      configure.ac
  4. 46
      src/ckpool.c
  5. 5
      src/ckpool.h
  6. 162
      src/connector.c
  7. 6
      src/libckpool.c
  8. 5
      src/libckpool.h
  9. 241
      src/stratifier.c

1
Makefile.am

@ -1,2 +1,3 @@
ACLOCAL_AMFLAGS = -I m4 ACLOCAL_AMFLAGS = -I m4
SUBDIRS = src SUBDIRS = src
EXTRA_DIST = ckpool.conf ckproxy.conf

3
README

@ -256,3 +256,6 @@ and 3334 in proxy mode.
maximum. maximum.
"logdir" : Which directory to store pool and client logs. Default "logs" "logdir" : Which directory to store pool and client logs. Default "logs"
"maxclients" : Optional upper limit on the number of clients ckpool will
accept before rejecting further clients.

2
configure.ac

@ -37,7 +37,7 @@ AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h)
AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h)
AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h)
AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
PTHREAD_LIBS="-lpthread" PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm" MATH_LIBS="-lm"

46
src/ckpool.c

@ -115,16 +115,16 @@ static void *ckmsg_queue(void *arg)
tv_t now; tv_t now;
ts_t abs; ts_t abs;
mutex_lock(&ckmsgq->lock); mutex_lock(ckmsgq->lock);
tv_time(&now); tv_time(&now);
tv_to_ts(&abs, &now); tv_to_ts(&abs, &now);
abs.tv_sec++; abs.tv_sec++;
if (!ckmsgq->msgs) if (!ckmsgq->msgs)
pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs); pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
msg = ckmsgq->msgs; msg = ckmsgq->msgs;
if (msg) if (msg)
DL_DELETE(ckmsgq->msgs, msg); DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock); mutex_unlock(ckmsgq->lock);
if (!msg) if (!msg)
continue; continue;
@ -141,13 +141,39 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
strncpy(ckmsgq->name, name, 15); strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func; ckmsgq->func = func;
ckmsgq->ckp = ckp; ckmsgq->ckp = ckp;
mutex_init(&ckmsgq->lock); ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t));
cond_init(&ckmsgq->cond); ckmsgq->cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(ckmsgq->lock);
cond_init(ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
return ckmsgq; return ckmsgq;
} }
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count);
pthread_mutex_t *lock;
pthread_cond_t *cond;
int i;
lock = ckalloc(sizeof(pthread_mutex_t));
cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock);
cond_init(cond);
for (i = 0; i < count; i++) {
snprintf(ckmsgq[i].name, 15, "%.8s%x", name, i);
ckmsgq[i].func = func;
ckmsgq[i].ckp = ckp;
ckmsgq[i].lock = lock;
ckmsgq[i].cond = cond;
create_pthread(&ckmsgq[i].pth, ckmsg_queue, &ckmsgq[i]);
}
return ckmsgq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the /* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread to wake up and process it. */ * ckmsgq parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
@ -156,10 +182,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
msg->data = data; msg->data = data;
mutex_lock(&ckmsgq->lock); mutex_lock(ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg); DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond); pthread_cond_signal(ckmsgq->cond);
mutex_unlock(&ckmsgq->lock); mutex_unlock(ckmsgq->lock);
} }
/* Return whether there are any messages queued in the ckmsgq linked list. */ /* Return whether there are any messages queued in the ckmsgq linked list. */
@ -167,10 +193,10 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
{ {
bool ret = true; bool ret = true;
mutex_lock(&ckmsgq->lock); mutex_lock(ckmsgq->lock);
if (ckmsgq->msgs) if (ckmsgq->msgs)
ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev); ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev);
mutex_unlock(&ckmsgq->lock); mutex_unlock(ckmsgq->lock);
return ret; return ret;
} }

5
src/ckpool.h

@ -34,8 +34,8 @@ struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
char name[16]; char name[16];
pthread_t pth; pthread_t pth;
pthread_mutex_t lock; pthread_mutex_t *lock;
pthread_cond_t cond; pthread_cond_t *cond;
ckmsg_t *msgs; ckmsg_t *msgs;
void (*func)(ckpool_t *, void *); void (*func)(ckpool_t *, void *);
}; };
@ -181,6 +181,7 @@ struct ckpool_instance {
#endif #endif
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);

162
src/connector.c

@ -11,7 +11,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/poll.h> #include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -30,6 +30,8 @@ struct connector_instance {
int serverfd; int serverfd;
int nfds; int nfds;
bool accept; bool accept;
pthread_t pth_sender;
pthread_t pth_receiver;
}; };
typedef struct connector_instance conn_instance_t; typedef struct connector_instance conn_instance_t;
@ -38,9 +40,6 @@ struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
int64_t id; int64_t id;
/* For fdclients hashtable */
UT_hash_handle fdhh;
int fd; int fd;
/* For dead_clients list */ /* For dead_clients list */
@ -59,8 +58,6 @@ typedef struct client_instance client_instance_t;
/* For the hashtable of all clients */ /* For the hashtable of all clients */
static client_instance_t *clients; static client_instance_t *clients;
/* A hashtable of the clients sorted by fd */
static client_instance_t *fdclients;
/* Linked list of dead clients no longer in use but may still have references */ /* Linked list of dead clients no longer in use but may still have references */
static client_instance_t *dead_clients; static client_instance_t *dead_clients;
@ -87,10 +84,11 @@ static pthread_cond_t sender_cond;
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(conn_instance_t *ci) static int accept_client(conn_instance_t *ci, int epfd)
{ {
client_instance_t *client, *old_client;
ckpool_t *ckp = ci->pi->ckp; ckpool_t *ckp = ci->pi->ckp;
client_instance_t *client;
struct epoll_event event;
int fd, port, no_clients; int fd, port, no_clients;
socklen_t address_len; socklen_t address_len;
@ -98,7 +96,7 @@ static int accept_client(conn_instance_t *ci)
no_clients = HASH_COUNT(clients); no_clients = HASH_COUNT(clients);
ck_runlock(&ci->lock); ck_runlock(&ci->lock);
if (ckp->maxclients && no_clients >= ckp->maxclients) { if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) {
LOGWARNING("Server full with %d clients", no_clients); LOGWARNING("Server full with %d clients", no_clients);
return 0; return 0;
} }
@ -143,14 +141,21 @@ static int accept_client(conn_instance_t *ci)
keep_sockalive(fd); keep_sockalive(fd);
nolinger_socket(fd); nolinger_socket(fd);
LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
ci->nfds, fd, no_clients, client->address_name, port);
client->fd = fd; client->fd = fd;
event.data.ptr = client;
event.events = EPOLLIN;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client");
free(client);
return 0;
}
ck_wlock(&ci->lock); ck_wlock(&ci->lock);
client->id = client_id++; client->id = client_id++;
HASH_ADD_I64(clients, id, client); HASH_ADD_I64(clients, id, client);
HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client);
ci->nfds++; ci->nfds++;
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
@ -166,7 +171,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client)
if (fd != -1) { if (fd != -1) {
Close(client->fd); Close(client->fd);
HASH_DEL(clients, client); HASH_DEL(clients, client);
HASH_DELETE(fdhh, fdclients, client);
LL_PREPEND(dead_clients, client); LL_PREPEND(dead_clients, client);
} }
ck_wunlock(&ci->lock); ck_wunlock(&ci->lock);
@ -292,86 +296,64 @@ reparse:
void *receiver(void *arg) void *receiver(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client, *tmp; struct epoll_event event;
struct pollfd fds[65536]; bool maxconn = true;
int ret, nfds, i; int ret, epfd;
bool update;
rename_proc("creceiver"); rename_proc("creceiver");
/* First fd is reserved for the accepting socket */ epfd = epoll_create1(EPOLL_CLOEXEC);
fds[0].fd = ci->serverfd; if (epfd < 0) {
fds[0].events = POLLIN; LOGEMERG("FATAL: Failed to create epoll in receiver");
fds[0].revents = 0; return NULL;
rebuild_fds:
update = false;
nfds = 1;
ck_rlock(&ci->lock);
HASH_ITER(fdhh, fdclients, client, tmp) {
if (unlikely(client->fd == -1)) {
LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!",
client->id);
continue;
}
fds[nfds].fd = client->fd;
fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
nfds++;
} }
ck_runlock(&ci->lock); event.data.fd = ci->serverfd;
event.events = EPOLLIN;
repoll: ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event);
while (!ci->accept) if (ret < 0) {
cksleep_ms(100); LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL;
ret = poll(fds, nfds, 1000);
if (unlikely(ret < 0)) {
LOGERR("Failed to poll in receiver");
goto out;
} }
for (i = 0; i < nfds && ret > 0; i++) { while (42) {
int fd, accepted; client_instance_t *client;
if (!fds[i].revents) while (unlikely(!ci->accept))
cksleep_ms(100);
ret = epoll_wait(epfd, &event, 1, 1000);
if (unlikely(ret == -1)) {
LOGEMERG("FATAL: Failed to epoll_wait in receiver");
break;
}
if (unlikely(!ret)) {
if (unlikely(maxconn)) {
/* When we first start we listen to as many connections as
* possible. Once we stop receiving connections we drop the
* listen to the minimum to effectively ratelimit how fast we
* can receive connections. */
LOGDEBUG("Dropping listen backlog to 0");
maxconn = false;
listen(ci->serverfd, 0);
}
continue; continue;
}
/* Reset for the next poll pass */ if (event.data.fd == ci->serverfd) {
fds[i].events = POLLIN; ret = accept_client(ci, epfd);
fds[i].revents = 0; if (unlikely(ret < 0)) {
--ret; LOGEMERG("FATAL: Failed to accept_client in receiver");
break;
/* Is this the listening server socket? */ }
if (i == 0) {
accepted = accept_client(ci);
if (unlikely(accepted < 0))
goto out;
if (accepted)
update = true;
continue; continue;
} }
client = event.data.ptr;
client = NULL; if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
fd = fds[i].fd; /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd);
ck_rlock(&ci->lock); invalidate_client(ci->pi->ckp, ci, client);
HASH_FIND(fdhh, fdclients, &fd, SOI, client); continue;
ck_runlock(&ci->lock); }
parse_client_msg(ci, client);
if (!client) {
/* Probably already removed, remove lazily */
LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable",
i, fd);
update = true;
} else
parse_client_msg(ci, client);
} }
if (update)
goto rebuild_fds;
goto repoll;
out:
return NULL; return NULL;
} }
@ -564,6 +546,17 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
LOGWARNING("%s connector ready", ckp->name); LOGWARNING("%s connector ready", ckp->name);
retry: retry:
if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
}
Close(sockd); Close(sockd);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
@ -675,7 +668,6 @@ out:
int connector(proc_instance_t *pi) int connector(proc_instance_t *pi)
{ {
pthread_t pth_sender, pth_receiver;
char *url = NULL, *port = NULL; char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int sockd, ret = 0; int sockd, ret = 0;
@ -738,7 +730,7 @@ int connector(proc_instance_t *pi)
if (tries) if (tries)
LOGWARNING("Connector successfully bound to socket"); LOGWARNING("Connector successfully bound to socket");
ret = listen(sockd, 10); ret = listen(sockd, SOMAXCONN);
if (ret < 0) { if (ret < 0) {
LOGERR("Connector failed to listen on socket"); LOGERR("Connector failed to listen on socket");
Close(sockd); Close(sockd);
@ -752,8 +744,8 @@ int connector(proc_instance_t *pi)
ci.nfds = 0; ci.nfds = 0;
mutex_init(&sender_lock); mutex_init(&sender_lock);
cond_init(&sender_cond); cond_init(&sender_cond);
create_pthread(&pth_sender, sender, &ci); create_pthread(&ci.pth_sender, sender, &ci);
create_pthread(&pth_receiver, receiver, &ci); create_pthread(&ci.pth_receiver, receiver, &ci);
ret = connector_loop(pi, &ci); ret = connector_loop(pi, &ci);
out: out:

6
src/libckpool.c

@ -440,13 +440,14 @@ void block_socket(int fd)
fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
} }
void _Close(int *fd) void _close(int *fd, const char *file, const char *func, const int line)
{ {
if (*fd < 0) if (*fd < 0)
return; return;
LOGDEBUG("Closing file handle %d", *fd); LOGDEBUG("Closing file handle %d", *fd);
if (unlikely(close(*fd))) if (unlikely(close(*fd)))
LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno)); LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d",
*fd, errno, strerror(errno), file, func, line);
*fd = -1; *fd = -1;
} }
@ -969,7 +970,6 @@ int _get_fd(int sockd, const char *file, const char *func, const int line)
goto out; goto out;
} }
out: out:
Close(sockd);
cm = (int *)CMSG_DATA(cmptr); cm = (int *)CMSG_DATA(cmptr);
newfd = *cm; newfd = *cm;
free(cmptr); free(cmptr);

5
src/libckpool.h

@ -426,8 +426,9 @@ void keep_sockalive(int fd);
void nolinger_socket(int fd); void nolinger_socket(int fd);
void noblock_socket(int fd); void noblock_socket(int fd);
void block_socket(int fd); void block_socket(int fd);
void _Close(int *fd); void _close(int *fd, const char *file, const char *func, const int line);
#define Close(FD) _Close(&FD) #define _Close(FD) _close(FD, __FILE__, __func__, __LINE__)
#define Close(FD) _close(&FD, __FILE__, __func__, __LINE__)
int bind_socket(char *url, char *port); int bind_socket(char *url, char *port);
int connect_socket(char *url, char *port); int connect_socket(char *url, char *port);
int write_socket(int fd, const void *buf, size_t nbyte); int write_socket(int fd, const void *buf, size_t nbyte);

241
src/stratifier.c

@ -71,6 +71,7 @@ struct pool_stats {
double dsps60; double dsps60;
double dsps360; double dsps360;
double dsps1440; double dsps1440;
double dsps10080;
}; };
typedef struct pool_stats pool_stats_t; typedef struct pool_stats pool_stats_t;
@ -173,6 +174,7 @@ static int64_t blockchange_id;
static char lasthash[68], lastswaphash[68]; static char lasthash[68], lastswaphash[68];
struct json_params { struct json_params {
json_t *method;
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
int64_t client_id; int64_t client_id;
@ -226,6 +228,7 @@ struct user_instance {
double dsps5; /* ... 5 minute ... */ double dsps5; /* ... 5 minute ... */
double dsps60;/* etc */ double dsps60;/* etc */
double dsps1440; double dsps1440;
double dsps10080;
tv_t last_share; tv_t last_share;
}; };
@ -268,6 +271,7 @@ struct stratum_instance {
double dsps5; /* ... 5 minute ... */ double dsps5; /* ... 5 minute ... */
double dsps60;/* etc */ double dsps60;/* etc */
double dsps1440; double dsps1440;
double dsps10080;
tv_t ldc; /* Last diff change */ tv_t ldc; /* Last diff change */
int ssdc; /* Shares since diff change */ int ssdc; /* Shares since diff change */
tv_t first_share; tv_t first_share;
@ -720,24 +724,39 @@ static void send_generator(ckpool_t *ckp, const char *msg, int prio)
gen_priority = 0; gen_priority = 0;
} }
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(void);
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template * since checking should have been done earlier, and creates the base template
* for generating work templates. */ * for generating work templates. */
static void update_base(ckpool_t *ckp, int prio) static void *do_update(void *arg)
{ {
struct update_req *ur = (struct update_req *)arg;
ckpool_t *ckp = ur->ckp;
bool new_block = false; bool new_block = false;
int prio = ur->prio;
bool ret = false;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
char *buf; char *buf;
pthread_detach(pthread_self());
rename_proc("updater");
buf = send_recv_generator(ckp, "getbase", prio); buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Failed to get base from generator in update_base"); LOGWARNING("Failed to get base from generator in update_base");
return; goto out;
} }
if (unlikely(cmdmatch(buf, "failed"))) { if (unlikely(cmdmatch(buf, "failed"))) {
LOGWARNING("Generator returned failure in update_base"); LOGWARNING("Generator returned failure in update_base");
return; goto out;
} }
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
@ -782,6 +801,29 @@ static void update_base(ckpool_t *ckp, int prio)
add_base(ckp, wb, &new_block); add_base(ckp, wb, &new_block);
stratum_broadcast_update(new_block); stratum_broadcast_update(new_block);
ret = true;
LOGINFO("Broadcast updated stratum base");
out:
/* Send a ping to miners if we fail to get a base to keep them
* connected while bitcoind recovers(?) */
if (!ret) {
LOGWARNING("Broadcast ping due to failed stratum base update");
broadcast_ping();
}
free(ur->pth);
free(ur);
return NULL;
}
static void update_base(ckpool_t *ckp, int prio)
{
struct update_req *ur = ckalloc(sizeof(struct update_req));
pthread_t *pth = ckalloc(sizeof(pthread_t));
ur->pth = pth;
ur->ckp = ckp;
ur->prio = prio;
create_pthread(pth, do_update, ur);
} }
static void drop_allclients(ckpool_t *ckp) static void drop_allclients(ckpool_t *ckp)
@ -1068,13 +1110,13 @@ static void stratum_broadcast(json_t *val)
if (!bulk_send) if (!bulk_send)
return; return;
mutex_lock(&ssends->lock); mutex_lock(ssends->lock);
if (ssends->msgs) if (ssends->msgs)
DL_CONCAT(ssends->msgs, bulk_send); DL_CONCAT(ssends->msgs, bulk_send);
else else
ssends->msgs = bulk_send; ssends->msgs = bulk_send;
pthread_cond_signal(&ssends->cond); pthread_cond_signal(ssends->cond);
mutex_unlock(&ssends->lock); mutex_unlock(ssends->lock);
} }
static void stratum_add_send(json_t *val, int64_t client_id) static void stratum_add_send(json_t *val, int64_t client_id)
@ -1561,6 +1603,114 @@ static bool test_address(ckpool_t *ckp, const char *address)
return ret; return ret;
} }
static const double nonces = 4294967296;
static double dsps_from_key(json_t *val, const char *key)
{
char *string, *endptr;
double ret = 0;
json_get_string(&string, val, key);
if (!string)
return ret;
ret = strtod(string, &endptr) / nonces;
if (endptr) {
switch (endptr[0]) {
case 'E':
ret *= (double)1000;
case 'P':
ret *= (double)1000;
case 'T':
ret *= (double)1000;
case 'G':
ret *= (double)1000;
case 'M':
ret *= (double)1000;
case 'K':
ret *= (double)1000;
default:
break;
}
}
free(string);
return ret;
}
static void read_userstats(ckpool_t *ckp, user_instance_t *instance)
{
char s[512];
json_t *val;
FILE *fp;
int ret;
snprintf(s, 511, "%s/users/%s", ckp->logdir, instance->username);
fp = fopen(s, "re");
if (!fp) {
LOGINFO("User %s does not have a logfile to read", instance->username);
return;
}
memset(s, 0, 512);
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGINFO("Failed to read user %s logfile", instance->username);
return;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGINFO("Failed to json decode user %s logfile: %s", instance->username, s);
return;
}
tv_time(&instance->last_share);
instance->dsps1 = dsps_from_key(val, "hashrate1m");
instance->dsps5 = dsps_from_key(val, "hashrate5m");
instance->dsps60 = dsps_from_key(val, "hashrate1hr");
instance->dsps1440 = dsps_from_key(val, "hashrate1d");
instance->dsps10080 = dsps_from_key(val, "hashrate7d");
LOGINFO("Successfully read user %s stats %f %f %f %f %f", instance->username,
instance->dsps1, instance->dsps5, instance->dsps60, instance->dsps1440,
instance->dsps10080);
json_decref(val);
}
static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
{
char s[512];
json_t *val;
FILE *fp;
int ret;
snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername);
fp = fopen(s, "re");
if (!fp) {
LOGINFO("Worker %s does not have a logfile to read", worker->workername);
return;
}
memset(s, 0, 512);
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGINFO("Failed to read worker %s logfile", worker->workername);
return;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s);
return;
}
tv_time(&worker->last_share);
worker->dsps1 = dsps_from_key(val, "hashrate1m");
worker->dsps5 = dsps_from_key(val, "hashrate5m");
worker->dsps60 = dsps_from_key(val, "hashrate1d");
worker->dsps1440 = dsps_from_key(val, "hashrate1d");
LOGINFO("Successfully read worker %s stats %f %f %f %f", worker->workername,
worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440);
json_decref(val);
}
/* This simply strips off the first part of the workername and matches it to a /* This simply strips off the first part of the workername and matches it to a
* user or creates a new one. */ * user or creates a new one. */
static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
@ -1589,6 +1739,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
instance->id = user_instance_id++; instance->id = user_instance_id++;
HASH_ADD_STR(user_instances, username, instance); HASH_ADD_STR(user_instances, username, instance);
read_userstats(ckp, instance);
} }
DL_FOREACH(instance->instances, tmp) { DL_FOREACH(instance->instances, tmp) {
if (!safecmp(workername, tmp->workername)) { if (!safecmp(workername, tmp->workername)) {
@ -1603,6 +1754,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
client->worker_instance->workername = strdup(workername); client->worker_instance->workername = strdup(workername);
client->worker_instance->instance = instance; client->worker_instance->instance = instance;
DL_APPEND(instance->worker_instances, client->worker_instance); DL_APPEND(instance->worker_instances, client->worker_instance);
read_workerstats(ckp, client->worker_instance);
} }
DL_APPEND(instance->instances, client); DL_APPEND(instance->instances, client);
ck_wunlock(&instance_lock); ck_wunlock(&instance_lock);
@ -1881,6 +2033,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
decay_time(&client->dsps5, diff, tdiff, 300); decay_time(&client->dsps5, diff, tdiff, 300);
decay_time(&client->dsps60, diff, tdiff, 3600); decay_time(&client->dsps60, diff, tdiff, 3600);
decay_time(&client->dsps1440, diff, tdiff, 86400); decay_time(&client->dsps1440, diff, tdiff, 86400);
decay_time(&client->dsps10080, diff, tdiff, 604800);
copy_tv(&client->last_share, &now_t); copy_tv(&client->last_share, &now_t);
tdiff = sane_tdiff(&now_t, &worker->last_share); tdiff = sane_tdiff(&now_t, &worker->last_share);
@ -1895,6 +2048,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
decay_time(&instance->dsps5, diff, tdiff, 300); decay_time(&instance->dsps5, diff, tdiff, 300);
decay_time(&instance->dsps60, diff, tdiff, 3600); decay_time(&instance->dsps60, diff, tdiff, 3600);
decay_time(&instance->dsps1440, diff, tdiff, 86400); decay_time(&instance->dsps1440, diff, tdiff, 86400);
decay_time(&instance->dsps10080, diff, tdiff, 604800);
copy_tv(&instance->last_share, &now_t); copy_tv(&instance->last_share, &now_t);
client->idle = false; client->idle = false;
@ -2452,10 +2606,13 @@ static void update_client(stratum_instance_t *client, const int64_t client_id)
stratum_send_diff(client); stratum_send_diff(client);
} }
static json_params_t *create_json_params(const int64_t client_id, const json_t *params, const json_t *id_val, const char *address) static json_params_t
*create_json_params(const int64_t client_id, json_t *method, const json_t *params,
const json_t *id_val, const char *address)
{ {
json_params_t *jp = ckalloc(sizeof(json_params_t)); json_params_t *jp = ckalloc(sizeof(json_params_t));
jp->method = json_copy(method);
jp->params = json_deep_copy(params); jp->params = json_deep_copy(params);
jp->id_val = json_deep_copy(id_val); jp->id_val = json_deep_copy(id_val);
jp->client_id = client_id; jp->client_id = client_id;
@ -2541,8 +2698,8 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t
client->suggest_diff = sdiff; client->suggest_diff = sdiff;
if (client->diff == sdiff) if (client->diff == sdiff)
return; return;
if (sdiff < client->diff * 2 / 3) if (sdiff < client->ckp->mindiff)
client->diff = client->diff * 2 / 3; client->diff = client->ckp->mindiff;
else else
client->diff = sdiff; client->diff = sdiff;
stratum_send_diff(client); stratum_send_diff(client);
@ -2609,7 +2766,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
} }
if (cmdmatch(method, "mining.auth") && client->subscribed) { if (cmdmatch(method, "mining.auth") && client->subscribed) {
json_params_t *jp = create_json_params(client_id, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sauthq, jp); ckmsgq_add(sauthq, jp);
return; return;
@ -2627,7 +2784,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
} }
if (cmdmatch(method, "mining.submit")) { if (cmdmatch(method, "mining.submit")) {
json_params_t *jp = create_json_params(client_id, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sshareq, jp); ckmsgq_add(sshareq, jp);
return; return;
@ -2640,7 +2797,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
/* Covers both get_transactions and get_txnhashes */ /* Covers both get_transactions and get_txnhashes */
if (cmdmatch(method, "mining.get")) { if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(stxnq, jp); ckmsgq_add(stxnq, jp);
return; return;
@ -2751,6 +2908,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
static void discard_json_params(json_params_t **jp) static void discard_json_params(json_params_t **jp)
{ {
json_decref((*jp)->method);
json_decref((*jp)->params); json_decref((*jp)->params);
json_decref((*jp)->id_val); json_decref((*jp)->id_val);
free(*jp); free(*jp);
@ -2939,7 +3097,8 @@ static json_t *txnhashes_by_jobid(int64_t id)
static void send_transactions(ckpool_t *ckp, json_params_t *jp) static void send_transactions(ckpool_t *ckp, json_params_t *jp)
{ {
const char *msg = json_string_value(jp->params); const char *msg = json_string_value(jp->method),
*params = json_string_value(json_array_get(jp->params, 0));
stratum_instance_t *client; stratum_instance_t *client;
json_t *val, *hashes; json_t *val, *hashes;
int64_t job_id = 0; int64_t job_id = 0;
@ -2956,8 +3115,12 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
/* We don't actually send the transactions as that would use /* We don't actually send the transactions as that would use
* up huge bandwidth, so we just return the number of * up huge bandwidth, so we just return the number of
* transactions :) */ * transactions :) . Support both forms of encoding the
sscanf(msg, "mining.get_transactions(%lx", &job_id); * request in method name and as a parameter. */
if (params && strlen(params) > 0)
sscanf(params, "%lx", &job_id);
else
sscanf(msg, "mining.get_transactions(%lx", &job_id);
txns = transactions_by_jobid(job_id); txns = transactions_by_jobid(job_id);
if (txns != -1) { if (txns != -1) {
json_set_int(val, "result", txns); json_set_int(val, "result", txns);
@ -2989,7 +3152,11 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
goto out_send; goto out_send;
} }
client->last_txns = now_t; client->last_txns = now_t;
sscanf(msg, "mining.get_txnhashes(%lx", &job_id); if (!params || !strlen(params)) {
json_set_string(val, "error", "Invalid params");
goto out_send;
}
sscanf(params, "%lx", &job_id);
hashes = txnhashes_by_jobid(job_id); hashes = txnhashes_by_jobid(job_id);
if (hashes) { if (hashes) {
json_object_set_new_nocheck(val, "result", hashes); json_object_set_new_nocheck(val, "result", hashes);
@ -3002,8 +3169,6 @@ out:
discard_json_params(&jp); discard_json_params(&jp);
} }
static const double nonces = 4294967296;
/* Called every 20 seconds, we send the updated stats to ckdb of those users /* Called every 20 seconds, we send the updated stats to ckdb of those users
* who have gone 10 minutes between updates. This ends up staggering stats to * who have gone 10 minutes between updates. This ends up staggering stats to
* avoid floods of stat data coming at once. */ * avoid floods of stat data coming at once. */
@ -3089,11 +3254,11 @@ static void *statsupdate(void *arg)
sleep(1); sleep(1);
while (42) { while (42) {
double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440; double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080;
double bias, bias5, bias60, bias1440; double bias, bias5, bias60, bias1440;
double tdiff, per_tdiff; double tdiff, per_tdiff;
char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64];
char suffix360[16], suffix1440[16]; char suffix360[16], suffix1440[16], suffix10080[16];
user_instance_t *instance, *tmpuser; user_instance_t *instance, *tmpuser;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
double sps1, sps5, sps15, sps60; double sps1, sps5, sps15, sps60;
@ -3136,6 +3301,10 @@ static void *statsupdate(void *arg)
ghs1440 = stats.dsps1440 * nonces / bias1440; ghs1440 = stats.dsps1440 * nonces / bias1440;
suffix_string(ghs1440, suffix1440, 16, 0); suffix_string(ghs1440, suffix1440, 16, 0);
bias = time_bias(tdiff, 604800);
ghs10080 = stats.dsps10080 * nonces / bias;
suffix_string(ghs10080, suffix10080, 16, 0);
snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir);
fp = fopen(fname, "we"); fp = fopen(fname, "we");
if (unlikely(!fp)) if (unlikely(!fp))
@ -3151,13 +3320,14 @@ static void *statsupdate(void *arg)
fprintf(fp, "%s\n", s); fprintf(fp, "%s\n", s);
dealloc(s); dealloc(s);
JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss}", JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss,ss}",
"hashrate1m", suffix1, "hashrate1m", suffix1,
"hashrate5m", suffix5, "hashrate5m", suffix5,
"hashrate15m", suffix15, "hashrate15m", suffix15,
"hashrate1hr", suffix60, "hashrate1hr", suffix60,
"hashrate6hr", suffix360, "hashrate6hr", suffix360,
"hashrate1d", suffix1440); "hashrate1d", suffix1440,
"hashrate7d", suffix10080);
s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val); json_decref(val);
LOGNOTICE("Pool:%s", s); LOGNOTICE("Pool:%s", s);
@ -3189,6 +3359,7 @@ static void *statsupdate(void *arg)
decay_time(&client->dsps5, 0, per_tdiff, 300); decay_time(&client->dsps5, 0, per_tdiff, 300);
decay_time(&client->dsps60, 0, per_tdiff, 3600); decay_time(&client->dsps60, 0, per_tdiff, 3600);
decay_time(&client->dsps1440, 0, per_tdiff, 86400); decay_time(&client->dsps1440, 0, per_tdiff, 86400);
decay_time(&client->dsps10080, 0, per_tdiff, 604800);
if (per_tdiff > 600) if (per_tdiff > 600)
client->idle = true; client->idle = true;
continue; continue;
@ -3211,13 +3382,13 @@ static void *statsupdate(void *arg)
ghs = worker->dsps1 * nonces; ghs = worker->dsps1 * nonces;
suffix_string(ghs, suffix1, 16, 0); suffix_string(ghs, suffix1, 16, 0);
ghs = worker->dsps5 * nonces / bias5; ghs = worker->dsps5 * nonces;
suffix_string(ghs, suffix5, 16, 0); suffix_string(ghs, suffix5, 16, 0);
ghs = worker->dsps60 * nonces / bias60; ghs = worker->dsps60 * nonces;
suffix_string(ghs, suffix60, 16, 0); suffix_string(ghs, suffix60, 16, 0);
ghs = worker->dsps1440 * nonces / bias1440; ghs = worker->dsps1440 * nonces;
suffix_string(ghs, suffix1440, 16, 0); suffix_string(ghs, suffix1440, 16, 0);
JSON_CPACK(val, "{ss,ss,ss,ss}", JSON_CPACK(val, "{ss,ss,ss,ss}",
@ -3246,25 +3417,30 @@ static void *statsupdate(void *arg)
decay_time(&instance->dsps5, 0, per_tdiff, 300); decay_time(&instance->dsps5, 0, per_tdiff, 300);
decay_time(&instance->dsps60, 0, per_tdiff, 3600); decay_time(&instance->dsps60, 0, per_tdiff, 3600);
decay_time(&instance->dsps1440, 0, per_tdiff, 86400); decay_time(&instance->dsps1440, 0, per_tdiff, 86400);
decay_time(&instance->dsps10080, 0, per_tdiff, 604800);
idle = true; idle = true;
} }
ghs = instance->dsps1 * nonces; ghs = instance->dsps1 * nonces;
suffix_string(ghs, suffix1, 16, 0); suffix_string(ghs, suffix1, 16, 0);
ghs = instance->dsps5 * nonces / bias5; ghs = instance->dsps5 * nonces;
suffix_string(ghs, suffix5, 16, 0); suffix_string(ghs, suffix5, 16, 0);
ghs = instance->dsps60 * nonces / bias60; ghs = instance->dsps60 * nonces;
suffix_string(ghs, suffix60, 16, 0); suffix_string(ghs, suffix60, 16, 0);
ghs = instance->dsps1440 * nonces / bias1440; ghs = instance->dsps1440 * nonces;
suffix_string(ghs, suffix1440, 16, 0); suffix_string(ghs, suffix1440, 16, 0);
JSON_CPACK(val, "{ss,ss,ss,ss,si}", ghs = instance->dsps10080 * nonces;
suffix_string(ghs, suffix10080, 16, 0);
JSON_CPACK(val, "{ss,ss,ss,ss,ss,si}",
"hashrate1m", suffix1, "hashrate1m", suffix1,
"hashrate5m", suffix5, "hashrate5m", suffix5,
"hashrate1hr", suffix60, "hashrate1hr", suffix60,
"hashrate1d", suffix1440, "hashrate1d", suffix1440,
"hashrate7d", suffix10080,
"workers", instance->workers); "workers", instance->workers);
snprintf(fname, 511, "%s/users/%s", ckp->logdir, instance->username); snprintf(fname, 511, "%s/users/%s", ckp->logdir, instance->username);
@ -3323,6 +3499,7 @@ static void *statsupdate(void *arg)
decay_time(&stats.dsps60, stats.unaccounted_diff_shares, 20, 3600); decay_time(&stats.dsps60, stats.unaccounted_diff_shares, 20, 3600);
decay_time(&stats.dsps360, stats.unaccounted_diff_shares, 20, 21600); decay_time(&stats.dsps360, stats.unaccounted_diff_shares, 20, 21600);
decay_time(&stats.dsps1440, stats.unaccounted_diff_shares, 20, 86400); decay_time(&stats.dsps1440, stats.unaccounted_diff_shares, 20, 86400);
decay_time(&stats.dsps10080, stats.unaccounted_diff_shares, 20, 604800);
stats.unaccounted_shares = stats.unaccounted_shares =
stats.unaccounted_diff_shares = stats.unaccounted_diff_shares =
@ -3370,7 +3547,7 @@ int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 1; int ret = 1, threads;
char *buf; char *buf;
LOGWARNING("%s stratifier starting", ckp->name); LOGWARNING("%s stratifier starting", ckp->name);
@ -3414,7 +3591,9 @@ int stratifier(proc_instance_t *pi)
mutex_init(&ckdb_lock); mutex_init(&ckdb_lock);
ssends = create_ckmsgq(ckp, "ssender", &ssend_process); ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process);
sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); /* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);

Loading…
Cancel
Save