kanoi 9 years ago
parent
commit
3d2be3373e
  1. 2
      configure.ac
  2. 4
      src/ckpool.c
  3. 169
      src/connector.c
  4. 4
      src/libckpool.c
  5. 23
      src/stratifier.c

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.9.1, kernel@kolivas.org) AC_INIT(ckpool, 0.9.2, kernel@kolivas.org)
AC_CANONICAL_SYSTEM AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_MACRO_DIR([m4])

4
src/ckpool.c

@ -479,6 +479,10 @@ retry:
msg = send_recv_proc(ckp->connector, "stats"); msg = send_recv_proc(ckp->connector, "stats");
send_unix_msg(sockd, msg); send_unix_msg(sockd, msg);
dealloc(msg); dealloc(msg);
} else if (cmdmatch(buf, "ckdbflush")) {
LOGWARNING("Received ckdb flush message");
send_procmsg(ckp->stratifier, buf);
send_unix_msg(sockd, "flushing");
} else { } else {
LOGINFO("Listener received unhandled message: %s", buf); LOGINFO("Listener received unhandled message: %s", buf);
send_unix_msg(sockd, "unknown"); send_unix_msg(sockd, "unknown");

169
src/connector.c

@ -135,6 +135,12 @@ struct connector_data {
int64_t client_id; int64_t client_id;
/* client message process queue */
ckmsgq_t *cmpq;
/* client message event process queue */
ckmsgq_t *cevents;
/* For the linked list of pending sends */ /* For the linked list of pending sends */
sender_send_t *sender_sends; sender_send_t *sender_sends;
@ -205,14 +211,14 @@ static client_instance_t *recruit_client(cdata_t *cdata)
} else } else
LOGDEBUG("Connector recycled client instance"); LOGDEBUG("Connector recycled client instance");
client->buf = realloc(client->buf, PAGESIZE); client->buf = ckzalloc(PAGESIZE);
client->buf[0] = '\0';
return client; return client;
} }
static void __recycle_client(cdata_t *cdata, client_instance_t *client) static void __recycle_client(cdata_t *cdata, client_instance_t *client)
{ {
dealloc(client->buf);
memset(client, 0, sizeof(client_instance_t)); memset(client, 0, sizeof(client_instance_t));
client->id = -1; client->id = -1;
DL_APPEND(cdata->recycled_clients, client); DL_APPEND(cdata->recycled_clients, client);
@ -298,7 +304,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
event.data.u64 = client->id; event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
return 0; return 0;
@ -456,10 +462,10 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val)
} }
} }
/* Client is holding a reference count from being on the epoll list */ /* Client is holding a reference count from being on the epoll list. Returns
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) * true if we will still be receiving messages from this client. */
static bool parse_client_msg(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{ {
ckpool_t *ckp = cdata->ckp;
int buflen, ret; int buflen, ret;
json_t *val; json_t *val;
char *eol; char *eol;
@ -469,8 +475,7 @@ retry:
if (!client->remote) { if (!client->remote) {
LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting",
client->id, client->fd); client->id, client->fd);
invalidate_client(ckp, cdata, client); return false;
return;
} }
client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1)); client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1));
} }
@ -478,11 +483,10 @@ retry:
ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE); ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE);
if (ret < 1) { if (ret < 1) {
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return; return true;
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s", LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %lu ret %d errno %d %s",
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, cdata, client); return false;
return;
} }
client->bufofs += ret; client->bufofs += ret;
reparse: reparse:
@ -494,8 +498,7 @@ reparse:
buflen = eol - client->buf + 1; buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) {
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd);
invalidate_client(ckp, cdata, client); return false;
return;
} }
if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) { if (!(val = json_loads(client->buf, JSON_DISABLE_EOF_CHECK, NULL))) {
@ -503,8 +506,7 @@ reparse:
LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf); LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, client->buf);
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
invalidate_client(ckp, cdata, client); return false;
return;
} else { } else {
char *s; char *s;
@ -563,9 +565,65 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
return client; return client;
} }
static void client_event_processor(ckpool_t *ckp, struct epoll_event *event)
{
const uint32_t events = event->events;
const uint64_t id = event->data.u64;
cdata_t *cdata = ckp->data;
client_instance_t *client;
client = ref_client_by_id(cdata, id);
if (unlikely(!client)) {
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id);
return;
}
if (unlikely(client->invalid))
goto out;
/* We can have both messages and read hang ups so process the
* message first. */
if (likely(events & EPOLLIN)) {
/* Rearm the client for epoll events if we have successfully
* parsed a message from it */
if (parse_client_msg(ckp, cdata, client)) {
event->data.u64 = id;
event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event);
} else
invalidate_client(ckp, cdata, client);
}
if (unlikely(client->invalid))
goto out;
if (unlikely(events & EPOLLERR)) {
socklen_t errlen = sizeof(int);
int error = 0;
/* See what type of error this is and raise the log
* level of the message if it's unexpected. */
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error != 104) {
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
} else {
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
}
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLHUP)) {
/* Client connection reset by peer */
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(events & EPOLLRDHUP)) {
/* Client disconnected by peer */
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
}
out:
dec_instance_ref(cdata, client);
}
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */ * handles the incoming messages */
void *receiver(void *arg) static void *receiver(void *arg)
{ {
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
struct epoll_event event; struct epoll_event event;
@ -596,7 +654,7 @@ void *receiver(void *arg)
cksleep_ms(1); cksleep_ms(1);
while (42) { while (42) {
client_instance_t *client; uint64_t edu64;
while (unlikely(!cdata->accept)) while (unlikely(!cdata->accept))
cksleep_ms(10); cksleep_ms(10);
@ -609,53 +667,16 @@ void *receiver(void *arg)
/* Nothing to service, still very unlikely */ /* Nothing to service, still very unlikely */
continue; continue;
} }
if (event.data.u64 < serverfds) { edu64 = event.data.u64;
ret = accept_client(cdata, epfd, event.data.u64); if (edu64 < serverfds) {
ret = accept_client(cdata, epfd, edu64);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGEMERG("FATAL: Failed to accept_client in receiver"); LOGEMERG("FATAL: Failed to accept_client in receiver");
break; break;
} }
continue; continue;
} }
client = ref_client_by_id(cdata, event.data.u64); ckmsgq_add(cdata->cevents, &event);
if (unlikely(!client)) {
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64);
continue;
}
if (unlikely(client->invalid))
goto noparse;
/* We can have both messages and read hang ups so process the
* message first. */
if (likely(event.events & EPOLLIN))
parse_client_msg(cdata, client);
if (unlikely(client->invalid))
goto noparse;
if (unlikely(event.events & EPOLLERR)) {
socklen_t errlen = sizeof(int);
int error = 0;
/* See what type of error this is and raise the log
* level of the message if it's unexpected. */
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error != 104) {
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
} else {
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s",
client->id, client->fd, error, strerror(error));
}
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(event.events & EPOLLHUP)) {
/* Client connection reset by peer */
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
} else if (unlikely(event.events & EPOLLRDHUP)) {
/* Client disconnected by peer */
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd);
invalidate_client(cdata->pi->ckp, cdata, client);
}
noparse:
dec_instance_ref(cdata, client);
} }
out: out:
/* We shouldn't get here unless there's an error */ /* We shouldn't get here unless there's an error */
@ -688,7 +709,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);
if (unlikely(ret < 1)) { if (ret < 1) {
/* Invalidate clients that block for more than 60 seconds */ /* Invalidate clients that block for more than 60 seconds */
if (unlikely(client->blocked_time && now_t - client->blocked_time >= 60)) { if (unlikely(client->blocked_time && now_t - client->blocked_time >= 60)) {
LOGNOTICE("Client id %"PRId64" fd %d blocked for >60 seconds, disconnecting", LOGNOTICE("Client id %"PRId64" fd %d blocked for >60 seconds, disconnecting",
@ -1200,18 +1221,11 @@ out:
return ret; return ret;
} }
static void process_client_msg(cdata_t *cdata, const char *buf) static void client_message_processor(ckpool_t *ckp, json_t *json_msg)
{ {
int64_t client_id; int64_t client_id;
json_t *json_msg;
char *msg; char *msg;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
@ -1221,10 +1235,22 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(cdata, client_id, msg); send_client(ckp->data, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
} }
static void process_client_msg(cdata_t *cdata, const char *buf)
{
json_t *json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
ckmsgq_add(cdata->cmpq, json_msg);
}
/* Send the passthrough the terminate node.method */ /* Send the passthrough the terminate node.method */
static void drop_passthrough_client(cdata_t *cdata, const int64_t id) static void drop_passthrough_client(cdata_t *cdata, const int64_t id)
{ {
@ -1441,10 +1467,9 @@ out:
int connector(proc_instance_t *pi) int connector(proc_instance_t *pi)
{ {
cdata_t *cdata = ckzalloc(sizeof(cdata_t)); cdata_t *cdata = ckzalloc(sizeof(cdata_t));
int threads, sockd, ret = 0, i, tries = 0;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int sockd, ret = 0, i;
const int on = 1; const int on = 1;
int tries = 0;
LOGWARNING("%s connector starting", ckp->name); LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata; ckp->data = cdata;
@ -1539,6 +1564,8 @@ int connector(proc_instance_t *pi)
if (tries) if (tries)
LOGWARNING("Connector successfully bound to socket"); LOGWARNING("Connector successfully bound to socket");
cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor);
if (ckp->remote && !setup_upstream(ckp, cdata)) { if (ckp->remote && !setup_upstream(ckp, cdata)) {
ret = 1; ret = 1;
goto out; goto out;
@ -1552,6 +1579,8 @@ int connector(proc_instance_t *pi)
mutex_init(&cdata->sender_lock); mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond); cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_sender, sender, cdata);
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
cdata->cevents = create_ckmsgqs(ckp, "cevent", &client_event_processor, threads);
create_pthread(&cdata->pth_receiver, receiver, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata);
cdata->start_time = time(NULL); cdata->start_time = time(NULL);

4
src/libckpool.c

@ -971,7 +971,7 @@ int wait_close(int sockd, int timeout)
/* Emulate a select read wait for high fds that select doesn't support. */ /* Emulate a select read wait for high fds that select doesn't support. */
int wait_read_select(int sockd, float timeout) int wait_read_select(int sockd, float timeout)
{ {
struct epoll_event event; struct epoll_event event = {0, {NULL}};
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);
@ -1053,7 +1053,7 @@ out:
/* Emulate a select write wait for high fds that select doesn't support */ /* Emulate a select write wait for high fds that select doesn't support */
int wait_write_select(int sockd, float timeout) int wait_write_select(int sockd, float timeout)
{ {
struct epoll_event event; struct epoll_event event = {0, {NULL}};
int epfd, ret; int epfd, ret;
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = epoll_create1(EPOLL_CLOEXEC);

23
src/stratifier.c

@ -3916,6 +3916,27 @@ static void get_poolstats(sdata_t *sdata, int *sockd)
static void srecv_process(ckpool_t *ckp, char *buf); static void srecv_process(ckpool_t *ckp, char *buf);
/* For emergency use only, flushes all pending ckdbq messages */
static void ckdbq_flush(sdata_t *sdata)
{
ckmsgq_t *ckdbq = sdata->ckdbq;
int flushed = 0;
mutex_lock(ckdbq->lock);
while (ckdbq->msgs) {
ckmsg_t *msg = ckdbq->msgs;
DL_DELETE(ckdbq->msgs, msg);
free(msg->data);
free(msg);
ckdbq->messages--;
flushed++;
}
mutex_unlock(ckdbq->lock);
LOGWARNING("Flushed %d messages from ckdb queue", flushed);
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
@ -4080,6 +4101,8 @@ retry:
dead_proxy(sdata, buf); dead_proxy(sdata, buf);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "ckdbflush")) {
ckdbq_flush(sdata);
} else } else
LOGWARNING("Unhandled stratifier message: %s", buf); LOGWARNING("Unhandled stratifier message: %s", buf);
goto retry; goto retry;

Loading…
Cancel
Save