diff --git a/Makefile.am b/Makefile.am index 0a7b9299..329e98e1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,2 +1,3 @@ ACLOCAL_AMFLAGS = -I m4 SUBDIRS = src +EXTRA_DIST = ckpool.conf ckproxy.conf diff --git a/README b/README index 01f32aa1..3d8c90ea 100644 --- a/README +++ b/README @@ -256,3 +256,6 @@ and 3334 in proxy mode. maximum. "logdir" : Which directory to store pool and client logs. Default "logs" + +"maxclients" : Optional upper limit on the number of clients ckpool will +accept before rejecting further clients. diff --git a/configure.ac b/configure.ac index 218a2392..65d311c0 100644 --- a/configure.ac +++ b/configure.ac @@ -37,7 +37,7 @@ AC_CHECK_HEADERS(endian.h sys/endian.h arpa/inet.h sys/poll.h syslog.h) AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) -AC_CHECK_HEADERS(libpq-fe.h postgresql/libpq-fe.h grp.h) +AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) PTHREAD_LIBS="-lpthread" MATH_LIBS="-lm" diff --git a/src/ckpool.c b/src/ckpool.c index 8c522d46..e1d1186b 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -115,16 +115,16 @@ static void *ckmsg_queue(void *arg) tv_t now; ts_t abs; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs); + pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); msg = ckmsgq->msgs; if (msg) DL_DELETE(ckmsgq->msgs, msg); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); if (!msg) continue; @@ -141,13 +141,39 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) strncpy(ckmsgq->name, name, 15); ckmsgq->func = func; ckmsgq->ckp = ckp; - mutex_init(&ckmsgq->lock); - cond_init(&ckmsgq->cond); + ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); + ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(ckmsgq->lock); + cond_init(ckmsgq->cond); create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq); return ckmsgq; } +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count) +{ + ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); + pthread_mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(pthread_mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckmsgq[i].name, 15, "%.8s%x", name, i); + ckmsgq[i].func = func; + ckmsgq[i].ckp = ckp; + ckmsgq[i].lock = lock; + ckmsgq[i].cond = cond; + create_pthread(&ckmsgq[i].pth, ckmsg_queue, &ckmsgq[i]); + } + + return ckmsgq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -156,10 +182,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) msg->data = data; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(&ckmsgq->cond); - mutex_unlock(&ckmsgq->lock); + pthread_cond_signal(ckmsgq->cond); + mutex_unlock(ckmsgq->lock); } /* Return whether there are any messages queued in the ckmsgq linked list. */ @@ -167,10 +193,10 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { bool ret = true; - mutex_lock(&ckmsgq->lock); + mutex_lock(ckmsgq->lock); if (ckmsgq->msgs) ret = (ckmsgq->msgs->next == ckmsgq->msgs->prev); - mutex_unlock(&ckmsgq->lock); + mutex_unlock(ckmsgq->lock); return ret; } diff --git a/src/ckpool.h b/src/ckpool.h index 9284165d..d246e97f 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -34,8 +34,8 @@ struct ckmsgq { ckpool_t *ckp; char name[16]; pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_mutex_t *lock; + pthread_cond_t *cond; ckmsg_t *msgs; void (*func)(ckpool_t *, void *); }; @@ -181,6 +181,7 @@ struct ckpool_instance { #endif ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); +ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); diff --git a/src/connector.c b/src/connector.c index 2acaf19c..401f70a4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -30,6 +30,8 @@ struct connector_instance { int serverfd; int nfds; bool accept; + pthread_t pth_sender; + pthread_t pth_receiver; }; typedef struct connector_instance conn_instance_t; @@ -38,9 +40,6 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; - - /* For fdclients hashtable */ - UT_hash_handle fdhh; int fd; /* For dead_clients list */ @@ -59,8 +58,6 @@ typedef struct client_instance client_instance_t; /* For the hashtable of all clients */ static client_instance_t *clients; -/* A hashtable of the clients sorted by fd */ -static client_instance_t *fdclients; /* Linked list of dead clients no longer in use but may still have references */ static client_instance_t *dead_clients; @@ -87,10 +84,11 @@ static pthread_cond_t sender_cond; /* Accepts incoming connections on the server socket and generates client * instances */ -static int accept_client(conn_instance_t *ci) +static int accept_client(conn_instance_t *ci, int epfd) { - client_instance_t *client, *old_client; ckpool_t *ckp = ci->pi->ckp; + client_instance_t *client; + struct epoll_event event; int fd, port, no_clients; socklen_t address_len; @@ -98,7 +96,7 @@ static int accept_client(conn_instance_t *ci) no_clients = HASH_COUNT(clients); ck_runlock(&ci->lock); - if (ckp->maxclients && no_clients >= ckp->maxclients) { + if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) { LOGWARNING("Server full with %d clients", no_clients); return 0; } @@ -143,14 +141,21 @@ static int accept_client(conn_instance_t *ci) keep_sockalive(fd); nolinger_socket(fd); - LOGINFO("Connected new client %d on socket %d from %s:%d", ci->nfds, fd, client->address_name, port); + LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", + ci->nfds, fd, no_clients, client->address_name, port); client->fd = fd; + event.data.ptr = client; + event.events = EPOLLIN; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + LOGERR("Failed to epoll_ctl add in accept_client"); + free(client); + return 0; + } ck_wlock(&ci->lock); client->id = client_id++; HASH_ADD_I64(clients, id, client); - HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); ci->nfds++; ck_wunlock(&ci->lock); @@ -166,7 +171,6 @@ static int drop_client(conn_instance_t *ci, client_instance_t *client) if (fd != -1) { Close(client->fd); HASH_DEL(clients, client); - HASH_DELETE(fdhh, fdclients, client); LL_PREPEND(dead_clients, client); } ck_wunlock(&ci->lock); @@ -292,86 +296,64 @@ reparse: void *receiver(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; - client_instance_t *client, *tmp; - struct pollfd fds[65536]; - int ret, nfds, i; - bool update; + struct epoll_event event; + bool maxconn = true; + int ret, epfd; rename_proc("creceiver"); - /* First fd is reserved for the accepting socket */ - fds[0].fd = ci->serverfd; - fds[0].events = POLLIN; - fds[0].revents = 0; -rebuild_fds: - update = false; - nfds = 1; - - ck_rlock(&ci->lock); - HASH_ITER(fdhh, fdclients, client, tmp) { - if (unlikely(client->fd == -1)) { - LOGWARNING("Client id %d is still in fdclients hashtable with invalidated fd!", - client->id); - continue; - } - fds[nfds].fd = client->fd; - fds[nfds].events = POLLIN; - fds[nfds].revents = 0; - nfds++; + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0) { + LOGEMERG("FATAL: Failed to create epoll in receiver"); + return NULL; } - ck_runlock(&ci->lock); - -repoll: - while (!ci->accept) - cksleep_ms(100); - - ret = poll(fds, nfds, 1000); - if (unlikely(ret < 0)) { - LOGERR("Failed to poll in receiver"); - goto out; + event.data.fd = ci->serverfd; + event.events = EPOLLIN; + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event); + if (ret < 0) { + LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); + return NULL; } - for (i = 0; i < nfds && ret > 0; i++) { - int fd, accepted; + while (42) { + client_instance_t *client; - if (!fds[i].revents) + while (unlikely(!ci->accept)) + cksleep_ms(100); + ret = epoll_wait(epfd, &event, 1, 1000); + if (unlikely(ret == -1)) { + LOGEMERG("FATAL: Failed to epoll_wait in receiver"); + break; + } + if (unlikely(!ret)) { + if (unlikely(maxconn)) { + /* When we first start we listen to as many connections as + * possible. Once we stop receiving connections we drop the + * listen to the minimum to effectively ratelimit how fast we + * can receive connections. */ + LOGDEBUG("Dropping listen backlog to 0"); + maxconn = false; + listen(ci->serverfd, 0); + } continue; - - /* Reset for the next poll pass */ - fds[i].events = POLLIN; - fds[i].revents = 0; - --ret; - - /* Is this the listening server socket? */ - if (i == 0) { - accepted = accept_client(ci); - if (unlikely(accepted < 0)) - goto out; - if (accepted) - update = true; + } + if (event.data.fd == ci->serverfd) { + ret = accept_client(ci, epfd); + if (unlikely(ret < 0)) { + LOGEMERG("FATAL: Failed to accept_client in receiver"); + break; + } continue; } - - client = NULL; - fd = fds[i].fd; - - ck_rlock(&ci->lock); - HASH_FIND(fdhh, fdclients, &fd, SOI, client); - ck_runlock(&ci->lock); - - if (!client) { - /* Probably already removed, remove lazily */ - LOGDEBUG("Failed to find nfd client %d with polled fd %d in hashtable", - i, fd); - update = true; - } else - parse_client_msg(ci, client); + client = event.data.ptr; + if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { + /* Client disconnected */ + LOGDEBUG("Client fd %d HUP in epoll", client->fd); + invalidate_client(ci->pi->ckp, ci, client); + continue; + } + parse_client_msg(ci, client); } - - if (update) - goto rebuild_fds; - goto repoll; -out: return NULL; } @@ -564,6 +546,17 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) LOGWARNING("%s connector ready", ckp->name); retry: + if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) { + LOGEMERG("Connector sender thread shutdown, exiting"); + ret = 1; + goto out; + } + if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) { + LOGEMERG("Connector receiver thread shutdown, exiting"); + ret = 1; + goto out; + } + Close(sockd); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { @@ -675,7 +668,6 @@ out: int connector(proc_instance_t *pi) { - pthread_t pth_sender, pth_receiver; char *url = NULL, *port = NULL; ckpool_t *ckp = pi->ckp; int sockd, ret = 0; @@ -738,7 +730,7 @@ int connector(proc_instance_t *pi) if (tries) LOGWARNING("Connector successfully bound to socket"); - ret = listen(sockd, 10); + ret = listen(sockd, SOMAXCONN); if (ret < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); @@ -752,8 +744,8 @@ int connector(proc_instance_t *pi) ci.nfds = 0; mutex_init(&sender_lock); cond_init(&sender_cond); - create_pthread(&pth_sender, sender, &ci); - create_pthread(&pth_receiver, receiver, &ci); + create_pthread(&ci.pth_sender, sender, &ci); + create_pthread(&ci.pth_receiver, receiver, &ci); ret = connector_loop(pi, &ci); out: diff --git a/src/libckpool.c b/src/libckpool.c index bb34d79f..dab6799e 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -440,13 +440,14 @@ void block_socket(int fd) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); } -void _Close(int *fd) +void _close(int *fd, const char *file, const char *func, const int line) { if (*fd < 0) return; LOGDEBUG("Closing file handle %d", *fd); if (unlikely(close(*fd))) - LOGWARNING("Close of fd %d failed with errno %d:%s", *fd, errno, strerror(errno)); + LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d", + *fd, errno, strerror(errno), file, func, line); *fd = -1; } @@ -969,7 +970,6 @@ int _get_fd(int sockd, const char *file, const char *func, const int line) goto out; } out: - Close(sockd); cm = (int *)CMSG_DATA(cmptr); newfd = *cm; free(cmptr); diff --git a/src/libckpool.h b/src/libckpool.h index 26f6e670..b01967e5 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -426,8 +426,9 @@ void keep_sockalive(int fd); void nolinger_socket(int fd); void noblock_socket(int fd); void block_socket(int fd); -void _Close(int *fd); -#define Close(FD) _Close(&FD) +void _close(int *fd, const char *file, const char *func, const int line); +#define _Close(FD) _close(FD, __FILE__, __func__, __LINE__) +#define Close(FD) _close(&FD, __FILE__, __func__, __LINE__) int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); diff --git a/src/stratifier.c b/src/stratifier.c index 52073773..d9e6c4f1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -71,6 +71,7 @@ struct pool_stats { double dsps60; double dsps360; double dsps1440; + double dsps10080; }; typedef struct pool_stats pool_stats_t; @@ -173,6 +174,7 @@ static int64_t blockchange_id; static char lasthash[68], lastswaphash[68]; struct json_params { + json_t *method; json_t *params; json_t *id_val; int64_t client_id; @@ -226,6 +228,7 @@ struct user_instance { double dsps5; /* ... 5 minute ... */ double dsps60;/* etc */ double dsps1440; + double dsps10080; tv_t last_share; }; @@ -268,6 +271,7 @@ struct stratum_instance { double dsps5; /* ... 5 minute ... */ double dsps60;/* etc */ double dsps1440; + double dsps10080; tv_t ldc; /* Last diff change */ int ssdc; /* Shares since diff change */ tv_t first_share; @@ -720,24 +724,39 @@ static void send_generator(ckpool_t *ckp, const char *msg, int prio) gen_priority = 0; } +struct update_req { + pthread_t *pth; + ckpool_t *ckp; + int prio; +}; + +static void broadcast_ping(void); + /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void update_base(ckpool_t *ckp, int prio) +static void *do_update(void *arg) { + struct update_req *ur = (struct update_req *)arg; + ckpool_t *ckp = ur->ckp; bool new_block = false; + int prio = ur->prio; + bool ret = false; workbase_t *wb; json_t *val; char *buf; + pthread_detach(pthread_self()); + rename_proc("updater"); + buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGWARNING("Failed to get base from generator in update_base"); - return; + goto out; } if (unlikely(cmdmatch(buf, "failed"))) { LOGWARNING("Generator returned failure in update_base"); - return; + goto out; } wb = ckzalloc(sizeof(workbase_t)); @@ -782,6 +801,29 @@ static void update_base(ckpool_t *ckp, int prio) add_base(ckp, wb, &new_block); stratum_broadcast_update(new_block); + ret = true; + LOGINFO("Broadcast updated stratum base"); +out: + /* Send a ping to miners if we fail to get a base to keep them + * connected while bitcoind recovers(?) */ + if (!ret) { + LOGWARNING("Broadcast ping due to failed stratum base update"); + broadcast_ping(); + } + free(ur->pth); + free(ur); + return NULL; +} + +static void update_base(ckpool_t *ckp, int prio) +{ + struct update_req *ur = ckalloc(sizeof(struct update_req)); + pthread_t *pth = ckalloc(sizeof(pthread_t)); + + ur->pth = pth; + ur->ckp = ckp; + ur->prio = prio; + create_pthread(pth, do_update, ur); } static void drop_allclients(ckpool_t *ckp) @@ -1068,13 +1110,13 @@ static void stratum_broadcast(json_t *val) if (!bulk_send) return; - mutex_lock(&ssends->lock); + mutex_lock(ssends->lock); if (ssends->msgs) DL_CONCAT(ssends->msgs, bulk_send); else ssends->msgs = bulk_send; - pthread_cond_signal(&ssends->cond); - mutex_unlock(&ssends->lock); + pthread_cond_signal(ssends->cond); + mutex_unlock(ssends->lock); } static void stratum_add_send(json_t *val, int64_t client_id) @@ -1561,6 +1603,114 @@ static bool test_address(ckpool_t *ckp, const char *address) return ret; } +static const double nonces = 4294967296; + +static double dsps_from_key(json_t *val, const char *key) +{ + char *string, *endptr; + double ret = 0; + + json_get_string(&string, val, key); + if (!string) + return ret; + ret = strtod(string, &endptr) / nonces; + if (endptr) { + switch (endptr[0]) { + case 'E': + ret *= (double)1000; + case 'P': + ret *= (double)1000; + case 'T': + ret *= (double)1000; + case 'G': + ret *= (double)1000; + case 'M': + ret *= (double)1000; + case 'K': + ret *= (double)1000; + default: + break; + } + } + free(string); + return ret; +} + +static void read_userstats(ckpool_t *ckp, user_instance_t *instance) +{ + char s[512]; + json_t *val; + FILE *fp; + int ret; + + snprintf(s, 511, "%s/users/%s", ckp->logdir, instance->username); + fp = fopen(s, "re"); + if (!fp) { + LOGINFO("User %s does not have a logfile to read", instance->username); + return; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGINFO("Failed to read user %s logfile", instance->username); + return; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGINFO("Failed to json decode user %s logfile: %s", instance->username, s); + return; + } + + tv_time(&instance->last_share); + instance->dsps1 = dsps_from_key(val, "hashrate1m"); + instance->dsps5 = dsps_from_key(val, "hashrate5m"); + instance->dsps60 = dsps_from_key(val, "hashrate1hr"); + instance->dsps1440 = dsps_from_key(val, "hashrate1d"); + instance->dsps10080 = dsps_from_key(val, "hashrate7d"); + LOGINFO("Successfully read user %s stats %f %f %f %f %f", instance->username, + instance->dsps1, instance->dsps5, instance->dsps60, instance->dsps1440, + instance->dsps10080); + json_decref(val); +} + +static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) +{ + char s[512]; + json_t *val; + FILE *fp; + int ret; + + snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername); + fp = fopen(s, "re"); + if (!fp) { + LOGINFO("Worker %s does not have a logfile to read", worker->workername); + return; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGINFO("Failed to read worker %s logfile", worker->workername); + return; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s); + return; + } + + tv_time(&worker->last_share); + worker->dsps1 = dsps_from_key(val, "hashrate1m"); + worker->dsps5 = dsps_from_key(val, "hashrate5m"); + worker->dsps60 = dsps_from_key(val, "hashrate1d"); + worker->dsps1440 = dsps_from_key(val, "hashrate1d"); + LOGINFO("Successfully read worker %s stats %f %f %f %f", worker->workername, + worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440); + json_decref(val); +} + + /* This simply strips off the first part of the workername and matches it to a * user or creates a new one. */ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, @@ -1589,6 +1739,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, instance->id = user_instance_id++; HASH_ADD_STR(user_instances, username, instance); + read_userstats(ckp, instance); } DL_FOREACH(instance->instances, tmp) { if (!safecmp(workername, tmp->workername)) { @@ -1603,6 +1754,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, client->worker_instance->workername = strdup(workername); client->worker_instance->instance = instance; DL_APPEND(instance->worker_instances, client->worker_instance); + read_workerstats(ckp, client->worker_instance); } DL_APPEND(instance->instances, client); ck_wunlock(&instance_lock); @@ -1881,6 +2033,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool decay_time(&client->dsps5, diff, tdiff, 300); decay_time(&client->dsps60, diff, tdiff, 3600); decay_time(&client->dsps1440, diff, tdiff, 86400); + decay_time(&client->dsps10080, diff, tdiff, 604800); copy_tv(&client->last_share, &now_t); tdiff = sane_tdiff(&now_t, &worker->last_share); @@ -1895,6 +2048,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool decay_time(&instance->dsps5, diff, tdiff, 300); decay_time(&instance->dsps60, diff, tdiff, 3600); decay_time(&instance->dsps1440, diff, tdiff, 86400); + decay_time(&instance->dsps10080, diff, tdiff, 604800); copy_tv(&instance->last_share, &now_t); client->idle = false; @@ -2452,10 +2606,13 @@ static void update_client(stratum_instance_t *client, const int64_t client_id) stratum_send_diff(client); } -static json_params_t *create_json_params(const int64_t client_id, const json_t *params, const json_t *id_val, const char *address) +static json_params_t +*create_json_params(const int64_t client_id, json_t *method, const json_t *params, + const json_t *id_val, const char *address) { json_params_t *jp = ckalloc(sizeof(json_params_t)); + jp->method = json_copy(method); jp->params = json_deep_copy(params); jp->id_val = json_deep_copy(id_val); jp->client_id = client_id; @@ -2541,8 +2698,8 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t client->suggest_diff = sdiff; if (client->diff == sdiff) return; - if (sdiff < client->diff * 2 / 3) - client->diff = client->diff * 2 / 3; + if (sdiff < client->ckp->mindiff) + client->diff = client->ckp->mindiff; else client->diff = sdiff; stratum_send_diff(client); @@ -2609,7 +2766,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method } if (cmdmatch(method, "mining.auth") && client->subscribed) { - json_params_t *jp = create_json_params(client_id, params_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sauthq, jp); return; @@ -2627,7 +2784,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method } if (cmdmatch(method, "mining.submit")) { - json_params_t *jp = create_json_params(client_id, params_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sshareq, jp); return; @@ -2640,7 +2797,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method /* Covers both get_transactions and get_txnhashes */ if (cmdmatch(method, "mining.get")) { - json_params_t *jp = create_json_params(client_id, method_val, id_val, address); + json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(stxnq, jp); return; @@ -2751,6 +2908,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) static void discard_json_params(json_params_t **jp) { + json_decref((*jp)->method); json_decref((*jp)->params); json_decref((*jp)->id_val); free(*jp); @@ -2939,7 +3097,8 @@ static json_t *txnhashes_by_jobid(int64_t id) static void send_transactions(ckpool_t *ckp, json_params_t *jp) { - const char *msg = json_string_value(jp->params); + const char *msg = json_string_value(jp->method), + *params = json_string_value(json_array_get(jp->params, 0)); stratum_instance_t *client; json_t *val, *hashes; int64_t job_id = 0; @@ -2956,8 +3115,12 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) /* We don't actually send the transactions as that would use * up huge bandwidth, so we just return the number of - * transactions :) */ - sscanf(msg, "mining.get_transactions(%lx", &job_id); + * transactions :) . Support both forms of encoding the + * request in method name and as a parameter. */ + if (params && strlen(params) > 0) + sscanf(params, "%lx", &job_id); + else + sscanf(msg, "mining.get_transactions(%lx", &job_id); txns = transactions_by_jobid(job_id); if (txns != -1) { json_set_int(val, "result", txns); @@ -2989,7 +3152,11 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out_send; } client->last_txns = now_t; - sscanf(msg, "mining.get_txnhashes(%lx", &job_id); + if (!params || !strlen(params)) { + json_set_string(val, "error", "Invalid params"); + goto out_send; + } + sscanf(params, "%lx", &job_id); hashes = txnhashes_by_jobid(job_id); if (hashes) { json_object_set_new_nocheck(val, "result", hashes); @@ -3002,8 +3169,6 @@ out: discard_json_params(&jp); } -static const double nonces = 4294967296; - /* Called every 20 seconds, we send the updated stats to ckdb of those users * who have gone 10 minutes between updates. This ends up staggering stats to * avoid floods of stat data coming at once. */ @@ -3089,11 +3254,11 @@ static void *statsupdate(void *arg) sleep(1); while (42) { - double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440; + double ghs, ghs1, ghs5, ghs15, ghs60, ghs360, ghs1440, ghs10080; double bias, bias5, bias60, bias1440; double tdiff, per_tdiff; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; - char suffix360[16], suffix1440[16]; + char suffix360[16], suffix1440[16], suffix10080[16]; user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; double sps1, sps5, sps15, sps60; @@ -3136,6 +3301,10 @@ static void *statsupdate(void *arg) ghs1440 = stats.dsps1440 * nonces / bias1440; suffix_string(ghs1440, suffix1440, 16, 0); + bias = time_bias(tdiff, 604800); + ghs10080 = stats.dsps10080 * nonces / bias; + suffix_string(ghs10080, suffix10080, 16, 0); + snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); fp = fopen(fname, "we"); if (unlikely(!fp)) @@ -3151,13 +3320,14 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss,ss}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate15m", suffix15, "hashrate1hr", suffix60, "hashrate6hr", suffix360, - "hashrate1d", suffix1440); + "hashrate1d", suffix1440, + "hashrate7d", suffix10080); s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); LOGNOTICE("Pool:%s", s); @@ -3189,6 +3359,7 @@ static void *statsupdate(void *arg) decay_time(&client->dsps5, 0, per_tdiff, 300); decay_time(&client->dsps60, 0, per_tdiff, 3600); decay_time(&client->dsps1440, 0, per_tdiff, 86400); + decay_time(&client->dsps10080, 0, per_tdiff, 604800); if (per_tdiff > 600) client->idle = true; continue; @@ -3211,13 +3382,13 @@ static void *statsupdate(void *arg) ghs = worker->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = worker->dsps5 * nonces / bias5; + ghs = worker->dsps5 * nonces; suffix_string(ghs, suffix5, 16, 0); - ghs = worker->dsps60 * nonces / bias60; + ghs = worker->dsps60 * nonces; suffix_string(ghs, suffix60, 16, 0); - ghs = worker->dsps1440 * nonces / bias1440; + ghs = worker->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); JSON_CPACK(val, "{ss,ss,ss,ss}", @@ -3246,25 +3417,30 @@ static void *statsupdate(void *arg) decay_time(&instance->dsps5, 0, per_tdiff, 300); decay_time(&instance->dsps60, 0, per_tdiff, 3600); decay_time(&instance->dsps1440, 0, per_tdiff, 86400); + decay_time(&instance->dsps10080, 0, per_tdiff, 604800); idle = true; } ghs = instance->dsps1 * nonces; suffix_string(ghs, suffix1, 16, 0); - ghs = instance->dsps5 * nonces / bias5; + ghs = instance->dsps5 * nonces; suffix_string(ghs, suffix5, 16, 0); - ghs = instance->dsps60 * nonces / bias60; + ghs = instance->dsps60 * nonces; suffix_string(ghs, suffix60, 16, 0); - ghs = instance->dsps1440 * nonces / bias1440; + ghs = instance->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); - JSON_CPACK(val, "{ss,ss,ss,ss,si}", + ghs = instance->dsps10080 * nonces; + suffix_string(ghs, suffix10080, 16, 0); + + JSON_CPACK(val, "{ss,ss,ss,ss,ss,si}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate1hr", suffix60, "hashrate1d", suffix1440, + "hashrate7d", suffix10080, "workers", instance->workers); snprintf(fname, 511, "%s/users/%s", ckp->logdir, instance->username); @@ -3323,6 +3499,7 @@ static void *statsupdate(void *arg) decay_time(&stats.dsps60, stats.unaccounted_diff_shares, 20, 3600); decay_time(&stats.dsps360, stats.unaccounted_diff_shares, 20, 21600); decay_time(&stats.dsps1440, stats.unaccounted_diff_shares, 20, 86400); + decay_time(&stats.dsps10080, stats.unaccounted_diff_shares, 20, 604800); stats.unaccounted_shares = stats.unaccounted_diff_shares = @@ -3370,7 +3547,7 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; ckpool_t *ckp = pi->ckp; - int ret = 1; + int ret = 1, threads; char *buf; LOGWARNING("%s stratifier starting", ckp->name); @@ -3414,7 +3591,9 @@ int stratifier(proc_instance_t *pi) mutex_init(&ckdb_lock); ssends = create_ckmsgq(ckp, "ssender", &ssend_process); srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process); - sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); + /* Create half as many share processing threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);