kanoi 10 years ago
parent
commit
ceac081a17
  1. 13
      src/ckpmsg.c
  2. 6
      src/ckpool.c
  3. 1
      src/ckpool.h
  4. 444
      src/connector.c
  5. 8
      src/libckpool.c
  6. 77
      src/stratifier.c

13
src/ckpmsg.c

@ -1,5 +1,5 @@
/*
* Copyright 2014 Con Kolivas
* Copyright 2014-2015 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
@ -54,15 +54,20 @@ void mkstamp(char *stamp, size_t siz)
int main(int argc, char **argv)
{
char *name = NULL, *socket_dir = NULL, *buf = NULL;
char *name = NULL, *socket_dir = NULL, *buf = NULL, *sockname = "listener";
int tmo1 = RECV_UNIX_TIMEOUT1;
int tmo2 = RECV_UNIX_TIMEOUT2;
bool proxy = false;
char stamp[128];
int c;
while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) {
while ((c = getopt(argc, argv, "N:n:s:pt:T:")) != -1) {
switch(c) {
/* Allows us to specify which process or socket to
* talk to. */
case 'N':
sockname = strdup(optarg);
break;
case 'n':
name = strdup(optarg);
break;
@ -92,7 +97,7 @@ int main(int argc, char **argv)
realloc_strcat(&socket_dir, name);
dealloc(name);
trail_slash(&socket_dir);
realloc_strcat(&socket_dir, "listener");
realloc_strcat(&socket_dir, sockname);
while (42) {
int sockd, len;

6
src/ckpool.c

@ -202,8 +202,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
return ret;
}
static void childsighandler(const int sig);
/* Create a standalone thread that queues received unix messages for a proc
* instance and adds them to linked list of received messages with their
* associated receive socket, then signal the associated rmsg_cond for the
@ -808,7 +806,7 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid
return;
LOGWARNING("Old process %s pid %d failed to respond to terminate request, killing",
pi->processname, oldpid);
if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 500))
if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 3000))
quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid);
}
@ -879,7 +877,7 @@ static void rm_namepid(const proc_instance_t *pi)
/* Disable signal handlers for child processes, but simply pass them onto the
* parent process to shut down cleanly. */
static void childsighandler(const int sig)
void childsighandler(const int sig)
{
signal(sig, SIG_IGN);
signal(SIGTERM, SIG_IGN);

1
src/ckpool.h

@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
void childsighandler(const int sig);
int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret);
bool json_get_string(char **store, const json_t *val, const char *res);
bool json_get_int64(int64_t *store, const json_t *val, const char *res);

444
src/connector.c

@ -29,12 +29,17 @@ struct client_instance {
/* For clients hashtable */
UT_hash_handle hh;
int64_t id;
/* fd cannot be changed while a ref is held */
int fd;
/* Reference count for when this instance is used outside of the
* connector_data lock */
int ref;
/* Have we disabled this client to be removed when there are no refs? */
bool invalid;
/* For dead_clients list */
client_instance_t *next;
client_instance_t *prev;
@ -58,6 +63,7 @@ struct sender_send {
client_instance_t *client;
char *buf;
int len;
int ofs;
};
typedef struct sender_send sender_send_t;
@ -68,6 +74,8 @@ struct connector_data {
cklock_t lock;
proc_instance_t *pi;
time_t start_time;
/* Array of server fds */
int *serverfd;
/* All time count of clients connected */
@ -93,10 +101,11 @@ struct connector_data {
/* For the linked list of pending sends */
sender_send_t *sender_sends;
sender_send_t *delayed_sends;
int64_t sends_generated;
int64_t sends_delayed;
int64_t sends_queued;
int64_t sends_size;
/* For protecting the pending sends list */
mutex_t sender_lock;
@ -219,6 +228,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
}
keep_sockalive(fd);
noblock_socket(fd);
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
cdata->nfds, fd, no_clients, client->address_name, port);
@ -229,7 +239,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
cdata->nfds++;
ck_wunlock(&cdata->lock);
client->fd = fd;
event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
@ -241,6 +250,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
* to it. We drop that reference when the socket is closed which
* removes it automatically from the epoll list. */
__inc_instance_ref(client);
client->fd = fd;
return 1;
}
@ -249,16 +259,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
static int drop_client(cdata_t *cdata, client_instance_t *client)
{
int64_t client_id = 0;
int fd;
int fd = -1;
ck_wlock(&cdata->lock);
fd = client->fd;
if (fd != -1) {
if (!client->invalid) {
client->invalid = true;
client_id = client->id;
fd = client->fd;
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL);
nolinger_socket(fd);
Close(client->fd);
HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the
@ -274,7 +282,22 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
return fd;
}
static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
/* For sending the drop command to the upstream pool in passthrough mode */
static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client)
{
json_t *val;
char *s;
JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address",
client->address_name, "server", client->server, "method", "mining.term",
"params");
s = json_dumps(val, 0);
json_decref(val);
send_proc(ckp->generator, s);
free(s);
}
static void stratifier_drop_id(ckpool_t *ckp, const int64_t id)
{
char buf[256];
@ -282,6 +305,11 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
send_proc(ckp->stratifier, buf);
}
static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client)
{
stratifier_drop_id(ckp, client->id);
}
/* Invalidate this instance. Remove them from the hashtables we look up
* regularly but keep the instances in a linked list until their ref count
* drops to zero when we can remove them lazily. Client must hold a reference
@ -292,9 +320,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
int ret;
ret = drop_client(cdata, client);
if (ckp->passthrough)
goto out;
stratifier_drop_client(ckp, client->id);
if (!ckp->passthrough && !client->passthrough)
stratifier_drop_client(ckp, client);
else if (ckp->passthrough)
generator_drop_client(ckp, client);
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
@ -303,12 +332,16 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
if (!client->ref) {
DL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector recycling client %"PRId64, client->id);
/* We only close the client fd once we're sure there
* are no references to it left to prevent fds being
* reused on new and old clients. */
nolinger_socket(client->fd);
Close(client->fd);
__recycle_client(cdata, client);
}
}
ck_wunlock(&cdata->lock);
out:
return ret;
}
@ -323,41 +356,33 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
json_t *val;
retry:
ret = wait_read_select(client->fd, 0);
if (ret < 1) {
if (!ret)
return;
LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting",
client->id, client->fd);
invalidate_client(ckp, cdata, client);
return;
}
buflen = PAGESIZE - client->bufofs;
ret = recv(client->fd, client->buf + client->bufofs, buflen, 0);
/* This read call is non-blocking since the socket is set to O_NOBLOCK */
ret = read(client->fd, client->buf + client->bufofs, buflen);
if (ret < 1) {
if (!ret)
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return;
LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s",
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs += ret;
reparse:
eol = memchr(client->buf, '\n', client->bufofs);
if (!eol) {
if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd);
invalidate_client(ckp, cdata, client);
return;
}
if (!eol)
goto retry;
}
/* Do something useful with this message now */
buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd);
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd);
invalidate_client(ckp, cdata, client);
return;
}
@ -382,16 +407,17 @@ reparse:
json_object_del(val, "client_id");
passthrough_id = (client->id << 32) | passthrough_id;
json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id));
} else
} else {
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
}
json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, 0);
/* Do not send messages of clients we've already dropped. We
* do this unlocked as the occasional false negative can be
* filtered by the stratifier. */
if (likely(client->fd != -1)) {
if (likely(!client->invalid)) {
if (ckp->passthrough)
send_proc(ckp->generator, s);
else
@ -413,8 +439,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
if (client) {
if (!client->invalid)
__inc_instance_ref(client);
else
client = NULL;
}
ck_wunlock(&cdata->lock);
return client;
@ -425,10 +455,8 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
void *receiver(void *arg)
{
cdata_t *cdata = (cdata_t *)arg;
bool dropped_backlog = false;
struct epoll_event event;
uint64_t serverfds, i;
time_t start_t;
int ret, epfd;
rename_proc("creceiver");
@ -436,7 +464,7 @@ void *receiver(void *arg)
epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0) {
LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL;
goto out;
}
serverfds = cdata->ckp->serverurls;
/* Add all the serverfds to the epoll */
@ -447,27 +475,16 @@ void *receiver(void *arg)
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL;
goto out;
}
}
while (!cdata->accept)
cksleep_ms(1);
start_t = time(NULL);
while (42) {
client_instance_t *client;
if (unlikely(!dropped_backlog && time(NULL) - start_t > 90)) {
/* When we first start we listen to as many connections
* as possible. After the first minute we drop the
* listen to the minimum to effectively ratelimit how
* fast we can receive new connections. */
dropped_backlog = true;
LOGNOTICE("Dropping server listen backlog to 0");
for (i = 0; i < serverfds; i++)
listen(cdata->serverfd[i], 0);
}
while (unlikely(!cdata->accept))
cksleep_ms(10);
ret = epoll_wait(epfd, &event, 1, 1000);
@ -489,41 +506,115 @@ void *receiver(void *arg)
}
client = ref_client_by_id(cdata, event.data.u64);
if (unlikely(!client)) {
LOGWARNING("Failed to find client by id in receiver!");
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64);
continue;
}
if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
/* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else
if (unlikely(client->invalid))
goto noparse;
/* We can have both messages and read hang ups so process the
* message first. */
if (likely(event.events & EPOLLIN))
parse_client_msg(cdata, client);
if (unlikely(client->invalid))
goto noparse;
if (unlikely(event.events & EPOLLERR)) {
socklen_t errlen = sizeof(int);
int error = 0;
/* See what type of error this is and raise the log
* level of the message if it's unexpected. */
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error != 104) {
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
} else {
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
}
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(event.events & EPOLLHUP)) {
/* Client connection reset by peer */
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(event.events & EPOLLRDHUP)) {
/* Client disconnected by peer */
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
}
noparse:
dec_instance_ref(cdata, client);
}
out:
/* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL;
}
/* Use a thread to send queued messages, using select() to only send to sockets
* ready for writing immediately to not delay other messages. */
void *sender(void *arg)
/* Send a sender_send message and return true if we've finished sending it or
* are unable to send any more. */
static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send)
{
client_instance_t *client = sender_send->client;
if (unlikely(client->invalid))
return true;
while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);
if (unlikely(ret < 1)) {
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret)
return false;
LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s",
client->id, client->fd, errno, strerror(errno));
invalidate_client(ckp, cdata, client);
return true;
}
sender_send->ofs += ret;
sender_send->len -= ret;
}
return true;
}
static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata)
{
dec_instance_ref(cdata, sender_send->client);
free(sender_send->buf);
free(sender_send);
}
/* Use a thread to send queued messages, appending them to the sends list and
* iterating over all of them, attempting to send them all non-blocking to
* only send to those clients ready to receive data. */
static void *sender(void *arg)
{
cdata_t *cdata = (cdata_t *)arg;
sender_send_t *sends = NULL;
ckpool_t *ckp = cdata->ckp;
bool sent = false;
rename_proc("csender");
while (42) {
sender_send_t *sender_send, *delayed;
client_instance_t *client;
int ret = 0, fd, ofs = 0;
int64_t sends_queued = 0, sends_size = 0;
sender_send_t *sending, *tmp;
/* Check all sends to see if they can be written out */
DL_FOREACH_SAFE(sends, sending, tmp) {
if (send_sender_send(ckp, cdata, sending)) {
DL_DELETE(sends, sending);
clear_sender_send(sending, cdata);
} else {
sends_queued++;
sends_size += sizeof(sender_send_t) + sending->len + 1;
}
}
mutex_lock(&cdata->sender_lock);
/* Poll every 10ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the
* delayed sends. */
if (!cdata->sender_sends && !sent) {
cdata->sends_delayed += sends_queued;
cdata->sends_queued = sends_queued;
cdata->sends_size = sends_size;
/* Poll every 10ms if there are no new sends. */
if (!cdata->sender_sends) {
const ts_t polltime = {0, 10000000};
ts_t timeout_ts;
@ -531,86 +622,25 @@ void *sender(void *arg)
timeraddspec(&timeout_ts, &polltime);
cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts);
}
sender_send = cdata->sender_sends;
if (sender_send)
DL_DELETE(cdata->sender_sends, sender_send);
mutex_unlock(&cdata->sender_lock);
sent = false;
/* Service delayed sends only if we have timed out on the
* conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) {
/* Find a delayed client that needs servicing and set
* ret accordingly. We do not need to use FOREACH_SAFE
* as we break out of the loop as soon as we manipuate
* the list. */
DL_FOREACH(cdata->delayed_sends, delayed) {
if ((ret = wait_write_select(delayed->client->fd, 0))) {
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
break;
}
}
/* None found ? */
if (!sender_send)
continue;
}
client = sender_send->client;
/* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll
* select on this one */
ck_rlock(&cdata->lock);
fd = client->fd;
if (!ret)
ret = wait_write_select(fd, 0);
ck_runlock(&cdata->lock);
if (ret < 1) {
if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
invalidate_client(ckp, cdata, client);
goto contfree;
}
LOGDEBUG("Client %"PRId64" not ready for writes", client->id);
/* Append it to the tail of the delayed sends list.
* This is the only function that alters it so no
* locking is required. Keep the client ref. */
DL_APPEND(cdata->delayed_sends, sender_send);
cdata->sends_delayed++;
continue;
}
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd);
invalidate_client(ckp, cdata, client);
break;
}
ofs += ret;
sender_send->len -= ret;
if (cdata->sender_sends) {
DL_CONCAT(sends, cdata->sender_sends);
cdata->sender_sends = NULL;
}
contfree:
sent = true;
free(sender_send->buf);
free(sender_send);
dec_instance_ref(cdata, client);
mutex_unlock(&cdata->sender_lock);
}
/* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL;
}
/* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */
static void send_client(cdata_t *cdata, int64_t id, char *buf)
static void send_client(cdata_t *cdata, const int64_t id, char *buf)
{
ckpool_t *ckp = cdata->ckp;
sender_send_t *sender_send;
client_instance_t *client;
int fd = -1, len;
int len;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
@ -623,28 +653,36 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
return;
}
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (likely(client)) {
fd = client->fd;
/* Grab a reference to this client until the sender_send has
* completed processing. */
__inc_instance_ref(client);
}
ck_wunlock(&cdata->lock);
if (unlikely(fd == -1)) {
ckpool_t *ckp = cdata->ckp;
/* Grab a reference to this client until the sender_send has
* completed processing. Is this a passthrough subclient ? */
if (id > 0xffffffffll) {
int64_t client_id, pass_id;
if (client) {
/* This shouldn't happen */
LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id);
invalidate_client(ckp, cdata, client);
} else {
client_id = id & 0xffffffffll;
pass_id = id >> 32;
/* Make sure the passthrough exists for passthrough subclients */
client = ref_client_by_id(cdata, pass_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to",
pass_id, client_id);
/* Now see if the subclient exists */
client = ref_client_by_id(cdata, client_id);
if (client) {
invalidate_client(ckp, cdata, client);
dec_instance_ref(cdata, client);
} else
stratifier_drop_id(ckp, id);
free(buf);
return;
}
} else {
client = ref_client_by_id(cdata, id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %"PRId64" to send to", id);
stratifier_drop_id(ckp, id);
free(buf);
return;
}
free(buf);
return;
}
sender_send = ckzalloc(sizeof(sender_send_t));
@ -671,7 +709,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
static void process_client_msg(cdata_t *cdata, const char *buf)
{
int64_t client_id64, client_id;
int64_t client_id;
json_t *json_msg;
char *msg;
@ -682,22 +720,18 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
}
/* Extract the client id from the json message and remove its entry */
client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
} else
client_id = client_id64;
/* Put client_id back in for a passthrough subclient, passing its
* upstream client_id instead of the passthrough's. */
if (client_id > 0xffffffffll)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL);
send_client(cdata, client_id, msg);
json_decref(json_msg);
}
static char *connector_stats(cdata_t *cdata)
static char *connector_stats(cdata_t *cdata, const int runtime)
{
json_t *val = json_object(), *subval;
client_instance_t *client;
@ -706,6 +740,10 @@ static char *connector_stats(cdata_t *cdata)
int64_t memsize;
char *buf;
/* If called in passthrough mode we log stats instead of the stratifier */
if (runtime)
json_set_int(val, "runtime", runtime);
ck_rlock(&cdata->lock);
objects = HASH_COUNT(cdata->clients);
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects;
@ -728,68 +766,57 @@ static char *connector_stats(cdata_t *cdata)
memsize = 0;
mutex_lock(&cdata->sender_lock);
generated = cdata->sends_generated;
DL_FOREACH(cdata->sender_sends, send) {
objects++;
memsize += sizeof(sender_send_t) + send->len + 1;
}
mutex_unlock(&cdata->sender_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated);
json_set_object(val, "sends", subval);
objects = 0;
memsize = 0;
mutex_lock(&cdata->sender_lock);
generated = cdata->sends_delayed;
DL_FOREACH(cdata->delayed_sends, send) {
objects++;
memsize += sizeof(sender_send_t) + send->len + 1;
}
JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed);
mutex_unlock(&cdata->sender_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "delays", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
LOGNOTICE("Connector stats: %s", buf);
if (runtime)
LOGNOTICE("Passthrough:%s", buf);
else
LOGNOTICE("Connector stats: %s", buf);
return buf;
}
static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{
int64_t client_id64, client_id;
unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp;
uint8_t test_cycle = 0;
char *buf;
time_t last_stats;
int64_t client_id;
int ret = 0;
char *buf;
LOGWARNING("%s connector ready", ckp->name);
last_stats = cdata->start_time;
retry:
if (ckp->passthrough) {
time_t diff = time(NULL);
if (diff - last_stats >= 60) {
last_stats = diff;
diff -= cdata->start_time;
buf = connector_stats(cdata, diff);
dealloc(buf);
}
}
if (umsg) {
Close(umsg->sockd);
free(umsg->buf);
dealloc(umsg);
}
if (!++test_cycle) {
/* Test for pthread join every 256 messages */
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
}
}
do {
umsg = get_unix_msg(pi);
} while (!umsg);
@ -803,12 +830,14 @@ retry:
} else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client;
ret = sscanf(buf, "dropclient=%"PRId64, &client_id64);
ret = sscanf(buf, "dropclient=%"PRId64, &client_id);
if (ret < 0) {
LOGDEBUG("Connector failed to parse dropclient command: %s", buf);
goto retry;
}
client_id = client_id64 & 0xffffffffll;
/* A passthrough client, we can't drop this yet */
if (client_id > 0xffffffffll)
goto retry;
client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id);
@ -831,7 +860,7 @@ retry:
char *msg;
LOGDEBUG("Connector received stats request");
msg = connector_stats(cdata);
msg = connector_stats(cdata, 0);
send_unix_msg(umsg->sockd, msg);
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
@ -910,7 +939,9 @@ int connector(proc_instance_t *pi)
Close(sockd);
goto out;
}
if (listen(sockd, SOMAXCONN) < 0) {
/* Set listen backlog to larger than SOMAXCONN in case the
* system configuration supports it */
if (listen(sockd, 8192) < 0) {
LOGERR("Connector failed to listen on socket");
Close(sockd);
goto out;
@ -952,7 +983,7 @@ int connector(proc_instance_t *pi)
ret = 1;
goto out;
}
if (listen(sockd, SOMAXCONN) < 0) {
if (listen(sockd, 8192) < 0) {
LOGERR("Connector failed to listen on socket");
Close(sockd);
goto out;
@ -974,6 +1005,7 @@ int connector(proc_instance_t *pi)
cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata);
create_pthread(&cdata->pth_receiver, receiver, cdata);
cdata->start_time = time(NULL);
create_unix_receiver(pi);

8
src/libckpool.c

@ -893,13 +893,13 @@ int wait_close(int sockd, int timeout)
if (unlikely(sockd < 0))
return -1;
sfd.fd = sockd;
sfd.events = POLLIN;
sfd.events = POLLRDHUP;
sfd.revents = 0;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
if (ret < 1)
return 0;
return sfd.revents & POLLHUP;
return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR);
}
/* Emulate a select read wait for high fds that select doesn't support */
@ -911,7 +911,7 @@ int wait_read_select(int sockd, int timeout)
if (unlikely(sockd < 0))
goto out;
sfd.fd = sockd;
sfd.events = POLLIN;
sfd.events = POLLIN | POLLRDHUP;
sfd.revents = 0;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
@ -993,7 +993,7 @@ int wait_write_select(int sockd, int timeout)
if (unlikely(sockd < 0))
goto out;
sfd.fd = sockd;
sfd.events = POLLOUT;
sfd.events = POLLOUT | POLLRDHUP;
sfd.revents = 0;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);

77
src/stratifier.c

@ -1289,17 +1289,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
if (client->workername) {
if (user) {
ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s",
client->id, client->address, user->throttled ? "throttled " : "",
user->username, client->workername, lazily ? "lazily" : "");
ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s",
client->id, client->address, user->throttled ? "throttled " : "",
user->username, client->workername, lazily ? "lazily" : "");
} else {
ASPRINTF(msg, "Client %"PRId64" %s no user worker %s dropped %s",
client->id, client->address, client->workername,
lazily ? "lazily" : "");
ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s",
client->id, client->address, client->workername,
lazily ? "lazily" : "");
}
} else {
ASPRINTF(msg, "Workerless client %"PRId64" %s dropped %s",
client->id, client->address, lazily ? "lazily" : "");
ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s",
client->id, client->address, lazily ? "lazily" : "");
}
__del_client(sdata, client);
__kill_instance(sdata, client);
@ -1348,7 +1348,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata)
/* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id,
const char *address, const int server)
const char *address, int server)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->data;
@ -1357,6 +1357,9 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i
client->id = id;
client->session_id = ++sdata->session_id;
strcpy(client->address, address);
/* Sanity check to not overflow lookup in ckp->serverurl[] */
if (server >= ckp->serverurls)
server = 0;
client->server = server;
client->diff = client->old_diff = ckp->startdiff;
client->ckp = ckp;
@ -1509,11 +1512,8 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd)
if (port)
url = strsep(&port, ",");
if (url && port) {
int port_no;
port_no = strtol(port, NULL, 10);
JSON_CPACK(json_msg, "{sosss[sii]}", "id", json_null(), "method", "client.reconnect",
"params", url, port_no, 0);
JSON_CPACK(json_msg, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect",
"params", url, port, 0);
} else
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
"params");
@ -2428,8 +2428,7 @@ static int send_recv_auth(stratum_instance_t *client)
json_get_string(&secondaryuserid, val, "secondaryuserid");
parse_worker_diffs(ckp, worker_array);
client->suggest_diff = worker->mindiff;
if (!user->auth_time)
user->auth_time = time(NULL);
user->auth_time = time(NULL);
}
if (secondaryuserid && (!safecmp(response, "ok.authorise") ||
!safecmp(response, "ok.addrauth"))) {
@ -3265,7 +3264,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va
{
json_t *val;
JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg);
JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg);
stratum_add_send(sdata, val, client_id);
}
@ -3409,14 +3408,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
}
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
/* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly.
* The client_id stays on the list but we won't send anything
* to it since it's unauthorised. Set the flag just in case. */
client->authorised = false;
* is a passthrough and to manage its messages accordingly. No
* data from this client id should ever come back to this
* stratifier after this so drop the client in the stratifier. */
LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address);
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(client->ckp->connector, buf);
drop_client(sdata, client_id);
return;
}
@ -3443,12 +3442,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
/* We should only accept authorised requests from here on */
if (!client->authorised) {
/* Dropping unauthorised clients here also allows the
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64" %s", client_id, client->address);
snprintf(buf, 255, "dropclient=%"PRId64, client_id);
send_proc(client->ckp->connector, buf);
LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method,
client_id, client->address);
return;
}
@ -3464,6 +3459,13 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
ckmsgq_add(sdata->stxnq, jp);
return;
}
if (cmdmatch(method, "mining.term")) {
LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address);
drop_client(sdata, client_id);
return;
}
/* Unhandled message here */
LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method);
return;
@ -3625,10 +3627,18 @@ static void discard_json_params(json_params_t *jp)
{
json_decref(jp->method);
json_decref(jp->params);
json_decref(jp->id_val);
if (jp->id_val)
json_decref(jp->id_val);
free(jp);
}
static void steal_json_id(json_t *val, json_params_t *jp)
{
/* Steal the id_val as is to avoid a copy */
json_object_set_new_nocheck(val, "id", jp->id_val);
jp->id_val = NULL;
}
static void sshare_process(ckpool_t *ckp, json_params_t *jp)
{
json_t *result_val, *json_msg, *err_val = NULL;
@ -3651,7 +3661,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp)
result_val = parse_submit(client, json_msg, jp->params, &err_val);
json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val);
steal_json_id(json_msg, jp);
stratum_add_send(sdata, json_msg, client_id);
out_decref:
dec_instance_ref(sdata, client);
@ -3713,7 +3723,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
json_msg = json_object();
json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val);
steal_json_id(json_msg, jp);
stratum_add_send(sdata, json_msg, client_id);
if (!json_is_true(result_val) || !client->suggest_diff)
@ -3877,7 +3887,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
goto out;
}
val = json_object();
json_object_set_nocheck(val, "id", jp->id_val);
steal_json_id(val, jp);
if (cmdmatch(msg, "mining.get_transactions")) {
int txns;
@ -4521,7 +4531,8 @@ int stratifier(proc_instance_t *pi)
create_pthread(&pth_blockupdate, blockupdate, ckp);
mutex_init(&sdata->stats_lock);
create_pthread(&pth_statsupdate, statsupdate, ckp);
if (!ckp->passthrough)
create_pthread(&pth_statsupdate, statsupdate, ckp);
mutex_init(&sdata->share_lock);
mutex_init(&sdata->block_lock);

Loading…
Cancel
Save