|
|
|
@ -29,12 +29,17 @@ struct client_instance {
|
|
|
|
|
/* For clients hashtable */ |
|
|
|
|
UT_hash_handle hh; |
|
|
|
|
int64_t id; |
|
|
|
|
|
|
|
|
|
/* fd cannot be changed while a ref is held */ |
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
/* Reference count for when this instance is used outside of the
|
|
|
|
|
* connector_data lock */ |
|
|
|
|
int ref; |
|
|
|
|
|
|
|
|
|
/* Have we disabled this client to be removed when there are no refs? */ |
|
|
|
|
bool invalid; |
|
|
|
|
|
|
|
|
|
/* For dead_clients list */ |
|
|
|
|
client_instance_t *next; |
|
|
|
|
client_instance_t *prev; |
|
|
|
@ -58,6 +63,7 @@ struct sender_send {
|
|
|
|
|
client_instance_t *client; |
|
|
|
|
char *buf; |
|
|
|
|
int len; |
|
|
|
|
int ofs; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
typedef struct sender_send sender_send_t; |
|
|
|
@ -68,6 +74,8 @@ struct connector_data {
|
|
|
|
|
cklock_t lock; |
|
|
|
|
proc_instance_t *pi; |
|
|
|
|
|
|
|
|
|
time_t start_time; |
|
|
|
|
|
|
|
|
|
/* Array of server fds */ |
|
|
|
|
int *serverfd; |
|
|
|
|
/* All time count of clients connected */ |
|
|
|
@ -93,10 +101,11 @@ struct connector_data {
|
|
|
|
|
|
|
|
|
|
/* For the linked list of pending sends */ |
|
|
|
|
sender_send_t *sender_sends; |
|
|
|
|
sender_send_t *delayed_sends; |
|
|
|
|
|
|
|
|
|
int64_t sends_generated; |
|
|
|
|
int64_t sends_delayed; |
|
|
|
|
int64_t sends_queued; |
|
|
|
|
int64_t sends_size; |
|
|
|
|
|
|
|
|
|
/* For protecting the pending sends list */ |
|
|
|
|
mutex_t sender_lock; |
|
|
|
@ -219,17 +228,21 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
keep_sockalive(fd); |
|
|
|
|
nolinger_socket(fd); |
|
|
|
|
noblock_socket(fd); |
|
|
|
|
|
|
|
|
|
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", |
|
|
|
|
cdata->nfds, fd, no_clients, client->address_name, port); |
|
|
|
|
|
|
|
|
|
client->fd = fd; |
|
|
|
|
event.data.ptr = client; |
|
|
|
|
event.events = EPOLLIN; |
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
client->id = cdata->client_id++; |
|
|
|
|
HASH_ADD_I64(cdata->clients, id, client); |
|
|
|
|
cdata->nfds++; |
|
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
event.data.u64 = client->id; |
|
|
|
|
event.events = EPOLLIN | EPOLLRDHUP; |
|
|
|
|
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { |
|
|
|
|
LOGERR("Failed to epoll_ctl add in accept_client"); |
|
|
|
|
recycle_client(cdata, client); |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -237,12 +250,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
|
|
|
|
|
* to it. We drop that reference when the socket is closed which |
|
|
|
|
* removes it automatically from the epoll list. */ |
|
|
|
|
__inc_instance_ref(client); |
|
|
|
|
|
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
client->id = cdata->client_id++; |
|
|
|
|
HASH_ADD_I64(cdata->clients, id, client); |
|
|
|
|
cdata->nfds++; |
|
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
client->fd = fd; |
|
|
|
|
|
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
@ -251,15 +259,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
|
|
|
|
|
static int drop_client(cdata_t *cdata, client_instance_t *client) |
|
|
|
|
{ |
|
|
|
|
int64_t client_id = 0; |
|
|
|
|
int fd; |
|
|
|
|
int fd = -1; |
|
|
|
|
|
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
fd = client->fd; |
|
|
|
|
if (fd != -1) { |
|
|
|
|
if (!client->invalid) { |
|
|
|
|
client->invalid = true; |
|
|
|
|
client_id = client->id; |
|
|
|
|
|
|
|
|
|
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL); |
|
|
|
|
Close(client->fd); |
|
|
|
|
fd = client->fd; |
|
|
|
|
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); |
|
|
|
|
HASH_DEL(cdata->clients, client); |
|
|
|
|
DL_APPEND(cdata->dead_clients, client); |
|
|
|
|
/* This is the reference to this client's presence in the
|
|
|
|
@ -275,7 +282,22 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
|
|
|
|
|
return fd; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void stratifier_drop_client(ckpool_t *ckp, const int64_t id) |
|
|
|
|
/* For sending the drop command to the upstream pool in passthrough mode */ |
|
|
|
|
static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) |
|
|
|
|
{ |
|
|
|
|
json_t *val; |
|
|
|
|
char *s; |
|
|
|
|
|
|
|
|
|
JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", |
|
|
|
|
client->address_name, "server", client->server, "method", "mining.term", |
|
|
|
|
"params"); |
|
|
|
|
s = json_dumps(val, 0); |
|
|
|
|
json_decref(val); |
|
|
|
|
send_proc(ckp->generator, s); |
|
|
|
|
free(s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) |
|
|
|
|
{ |
|
|
|
|
char buf[256]; |
|
|
|
|
|
|
|
|
@ -283,6 +305,11 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id)
|
|
|
|
|
send_proc(ckp->stratifier, buf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) |
|
|
|
|
{ |
|
|
|
|
stratifier_drop_id(ckp, client->id); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Invalidate this instance. Remove them from the hashtables we look up
|
|
|
|
|
* regularly but keep the instances in a linked list until their ref count |
|
|
|
|
* drops to zero when we can remove them lazily. Client must hold a reference |
|
|
|
@ -293,9 +320,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
|
|
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
ret = drop_client(cdata, client); |
|
|
|
|
if (ckp->passthrough) |
|
|
|
|
goto out; |
|
|
|
|
stratifier_drop_client(ckp, client->id); |
|
|
|
|
if (!ckp->passthrough && !client->passthrough) |
|
|
|
|
stratifier_drop_client(ckp, client); |
|
|
|
|
else if (ckp->passthrough) |
|
|
|
|
generator_drop_client(ckp, client); |
|
|
|
|
|
|
|
|
|
/* Cull old unused clients lazily when there are no more reference
|
|
|
|
|
* counts for them. */ |
|
|
|
@ -304,12 +332,16 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
|
|
|
|
|
if (!client->ref) { |
|
|
|
|
DL_DELETE(cdata->dead_clients, client); |
|
|
|
|
LOGINFO("Connector recycling client %"PRId64, client->id); |
|
|
|
|
/* We only close the client fd once we're sure there
|
|
|
|
|
* are no references to it left to prevent fds being |
|
|
|
|
* reused on new and old clients. */ |
|
|
|
|
nolinger_socket(client->fd); |
|
|
|
|
Close(client->fd); |
|
|
|
|
__recycle_client(cdata, client); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
out: |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -318,52 +350,39 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf);
|
|
|
|
|
/* Client is holding a reference count from being on the epoll list */ |
|
|
|
|
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) |
|
|
|
|
{ |
|
|
|
|
int buflen, ret, selfail = 0; |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
char msg[PAGESIZE], *eol; |
|
|
|
|
int buflen, ret; |
|
|
|
|
json_t *val; |
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
|
/* Select should always return positive after poll unless we have
|
|
|
|
|
* been disconnected. On retries, decdatade whether we should do further |
|
|
|
|
* reads based on select readiness and only fail if we get an error. */ |
|
|
|
|
ret = wait_read_select(client->fd, 0); |
|
|
|
|
if (ret < 1) { |
|
|
|
|
if (ret > selfail) |
|
|
|
|
return; |
|
|
|
|
LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", |
|
|
|
|
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); |
|
|
|
|
if (unlikely(client->bufofs > MAX_MSGSIZE)) { |
|
|
|
|
LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", |
|
|
|
|
client->id, client->fd); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
selfail = -1; |
|
|
|
|
buflen = PAGESIZE - client->bufofs; |
|
|
|
|
ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); |
|
|
|
|
/* This read call is non-blocking since the socket is set to O_NOBLOCK */ |
|
|
|
|
ret = read(client->fd, client->buf + client->bufofs, buflen); |
|
|
|
|
if (ret < 1) { |
|
|
|
|
/* We should have something to read if called since poll set
|
|
|
|
|
* this fd's revents status so if there's nothing it means the |
|
|
|
|
* client has disconnected. */ |
|
|
|
|
LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", |
|
|
|
|
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); |
|
|
|
|
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) |
|
|
|
|
return; |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", |
|
|
|
|
client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
client->bufofs += ret; |
|
|
|
|
reparse: |
|
|
|
|
eol = memchr(client->buf, '\n', client->bufofs); |
|
|
|
|
if (!eol) { |
|
|
|
|
if (unlikely(client->bufofs > MAX_MSGSIZE)) { |
|
|
|
|
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (!eol) |
|
|
|
|
goto retry; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Do something useful with this message now */ |
|
|
|
|
buflen = eol - client->buf + 1; |
|
|
|
|
if (unlikely(buflen > MAX_MSGSIZE)) { |
|
|
|
|
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); |
|
|
|
|
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -387,16 +406,17 @@ reparse:
|
|
|
|
|
json_getdel_int64(&passthrough_id, val, "client_id"); |
|
|
|
|
passthrough_id = (client->id << 32) | passthrough_id; |
|
|
|
|
json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); |
|
|
|
|
} else |
|
|
|
|
} else { |
|
|
|
|
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); |
|
|
|
|
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); |
|
|
|
|
} |
|
|
|
|
json_object_set_new_nocheck(val, "server", json_integer(client->server)); |
|
|
|
|
s = json_dumps(val, 0); |
|
|
|
|
|
|
|
|
|
/* Do not send messages of clients we've already dropped. We
|
|
|
|
|
* do this unlocked as the occasional false negative can be |
|
|
|
|
* filtered by the stratifier. */ |
|
|
|
|
if (likely(client->fd != -1)) { |
|
|
|
|
if (likely(!client->invalid)) { |
|
|
|
|
if (ckp->passthrough) |
|
|
|
|
send_proc(ckp->generator, s); |
|
|
|
|
else |
|
|
|
@ -418,8 +438,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
|
|
|
|
|
|
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
HASH_FIND_I64(cdata->clients, &id, client); |
|
|
|
|
if (client) |
|
|
|
|
if (client) { |
|
|
|
|
if (!client->invalid) |
|
|
|
|
__inc_instance_ref(client); |
|
|
|
|
else |
|
|
|
|
client = NULL; |
|
|
|
|
} |
|
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
return client; |
|
|
|
@ -439,19 +463,18 @@ void *receiver(void *arg)
|
|
|
|
|
epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); |
|
|
|
|
if (epfd < 0) { |
|
|
|
|
LOGEMERG("FATAL: Failed to create epoll in receiver"); |
|
|
|
|
return NULL; |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
serverfds = cdata->ckp->serverurls; |
|
|
|
|
/* Add all the serverfds to the epoll */ |
|
|
|
|
for (i = 0; i < serverfds; i++) { |
|
|
|
|
/* The small values will be easily identifiable compared to
|
|
|
|
|
* pointers */ |
|
|
|
|
/* The small values will be less than the first client ids */ |
|
|
|
|
event.data.u64 = i; |
|
|
|
|
event.events = EPOLLIN; |
|
|
|
|
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); |
|
|
|
|
return NULL; |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -480,44 +503,117 @@ void *receiver(void *arg)
|
|
|
|
|
} |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
client = event.data.ptr; |
|
|
|
|
/* Recheck this client still exists in the same form when it
|
|
|
|
|
* was queued. */ |
|
|
|
|
client = ref_client_by_id(cdata, client->id); |
|
|
|
|
if (unlikely(!client)) |
|
|
|
|
client = ref_client_by_id(cdata, event.data.u64); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); |
|
|
|
|
continue; |
|
|
|
|
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { |
|
|
|
|
/* Client disconnected */ |
|
|
|
|
LOGDEBUG("Client fd %d HUP in epoll", client->fd); |
|
|
|
|
invalidate_client(cdata->pi->ckp, cdata, client); |
|
|
|
|
} else |
|
|
|
|
} |
|
|
|
|
if (unlikely(client->invalid)) |
|
|
|
|
goto noparse; |
|
|
|
|
/* We can have both messages and read hang ups so process the
|
|
|
|
|
* message first. */ |
|
|
|
|
if (likely(event.events & EPOLLIN)) |
|
|
|
|
parse_client_msg(cdata, client); |
|
|
|
|
if (unlikely(client->invalid)) |
|
|
|
|
goto noparse; |
|
|
|
|
if (unlikely(event.events & EPOLLERR)) { |
|
|
|
|
socklen_t errlen = sizeof(int); |
|
|
|
|
int error = 0; |
|
|
|
|
|
|
|
|
|
/* See what type of error this is and raise the log
|
|
|
|
|
* level of the message if it's unexpected. */ |
|
|
|
|
getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); |
|
|
|
|
if (error != 104) { |
|
|
|
|
LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", |
|
|
|
|
client->id, client->fd, error, strerror(error)); |
|
|
|
|
} else { |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", |
|
|
|
|
client->id, client->fd, error, strerror(error)); |
|
|
|
|
} |
|
|
|
|
invalidate_client(cdata->pi->ckp, cdata, client); |
|
|
|
|
} else if (unlikely(event.events & EPOLLHUP)) { |
|
|
|
|
/* Client connection reset by peer */ |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); |
|
|
|
|
invalidate_client(cdata->pi->ckp, cdata, client); |
|
|
|
|
} else if (unlikely(event.events & EPOLLRDHUP)) { |
|
|
|
|
/* Client disconnected by peer */ |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); |
|
|
|
|
invalidate_client(cdata->pi->ckp, cdata, client); |
|
|
|
|
} |
|
|
|
|
noparse: |
|
|
|
|
dec_instance_ref(cdata, client); |
|
|
|
|
} |
|
|
|
|
out: |
|
|
|
|
/* We shouldn't get here unless there's an error */ |
|
|
|
|
childsighandler(15); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Use a thread to send queued messages, using select() to only send to sockets
|
|
|
|
|
* ready for writing immediately to not delay other messages. */ |
|
|
|
|
void *sender(void *arg) |
|
|
|
|
/* Send a sender_send message and return true if we've finished sending it or
|
|
|
|
|
* are unable to send any more. */ |
|
|
|
|
static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) |
|
|
|
|
{ |
|
|
|
|
client_instance_t *client = sender_send->client; |
|
|
|
|
|
|
|
|
|
if (unlikely(client->invalid)) |
|
|
|
|
return true; |
|
|
|
|
|
|
|
|
|
while (sender_send->len) { |
|
|
|
|
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); |
|
|
|
|
|
|
|
|
|
if (unlikely(ret < 1)) { |
|
|
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) |
|
|
|
|
return false; |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", |
|
|
|
|
client->id, client->fd, errno, strerror(errno)); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
sender_send->ofs += ret; |
|
|
|
|
sender_send->len -= ret; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) |
|
|
|
|
{ |
|
|
|
|
dec_instance_ref(cdata, sender_send->client); |
|
|
|
|
free(sender_send->buf); |
|
|
|
|
free(sender_send); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Use a thread to send queued messages, appending them to the sends list and
|
|
|
|
|
* iterating over all of them, attempting to send them all non-blocking to |
|
|
|
|
* only send to those clients ready to receive data. */ |
|
|
|
|
static void *sender(void *arg) |
|
|
|
|
{ |
|
|
|
|
cdata_t *cdata = (cdata_t *)arg; |
|
|
|
|
sender_send_t *sends = NULL; |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
bool sent = false; |
|
|
|
|
|
|
|
|
|
rename_proc("csender"); |
|
|
|
|
|
|
|
|
|
while (42) { |
|
|
|
|
sender_send_t *sender_send, *delayed; |
|
|
|
|
client_instance_t *client; |
|
|
|
|
int ret = 0, fd, ofs = 0; |
|
|
|
|
int64_t sends_queued = 0, sends_size = 0; |
|
|
|
|
sender_send_t *sending, *tmp; |
|
|
|
|
|
|
|
|
|
/* Check all sends to see if they can be written out */ |
|
|
|
|
DL_FOREACH_SAFE(sends, sending, tmp) { |
|
|
|
|
if (send_sender_send(ckp, cdata, sending)) { |
|
|
|
|
DL_DELETE(sends, sending); |
|
|
|
|
clear_sender_send(sending, cdata); |
|
|
|
|
} else { |
|
|
|
|
sends_queued++; |
|
|
|
|
sends_size += sizeof(sender_send_t) + sending->len + 1; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
/* Poll every 10ms if there are no new sends. Re-examine
|
|
|
|
|
* delayed sends immediately after a successful send in case |
|
|
|
|
* endless new sends more frequently end up starving the |
|
|
|
|
* delayed sends. */ |
|
|
|
|
if (!cdata->sender_sends && !sent) { |
|
|
|
|
cdata->sends_delayed += sends_queued; |
|
|
|
|
cdata->sends_queued = sends_queued; |
|
|
|
|
cdata->sends_size = sends_size; |
|
|
|
|
/* Poll every 10ms if there are no new sends. */ |
|
|
|
|
if (!cdata->sender_sends) { |
|
|
|
|
const ts_t polltime = {0, 10000000}; |
|
|
|
|
ts_t timeout_ts; |
|
|
|
|
|
|
|
|
@ -525,86 +621,25 @@ void *sender(void *arg)
|
|
|
|
|
timeraddspec(&timeout_ts, &polltime); |
|
|
|
|
cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); |
|
|
|
|
} |
|
|
|
|
sender_send = cdata->sender_sends; |
|
|
|
|
if (sender_send) |
|
|
|
|
DL_DELETE(cdata->sender_sends, sender_send); |
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
sent = false; |
|
|
|
|
|
|
|
|
|
/* Service delayed sends only if we have timed out on the
|
|
|
|
|
* conditional with no new sends appearing or have just |
|
|
|
|
* serviced another message successfully. */ |
|
|
|
|
if (!sender_send) { |
|
|
|
|
/* Find a delayed client that needs servicing and set
|
|
|
|
|
* ret accordingly. We do not need to use FOREACH_SAFE |
|
|
|
|
* as we break out of the loop as soon as we manipuate |
|
|
|
|
* the list. */ |
|
|
|
|
DL_FOREACH(cdata->delayed_sends, delayed) { |
|
|
|
|
if ((ret = wait_write_select(delayed->client->fd, 0))) { |
|
|
|
|
sender_send = cdata->delayed_sends; |
|
|
|
|
DL_DELETE(cdata->delayed_sends, sender_send); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* None found ? */ |
|
|
|
|
if (!sender_send) |
|
|
|
|
continue; |
|
|
|
|
if (cdata->sender_sends) { |
|
|
|
|
DL_CONCAT(sends, cdata->sender_sends); |
|
|
|
|
cdata->sender_sends = NULL; |
|
|
|
|
} |
|
|
|
|
client = sender_send->client; |
|
|
|
|
|
|
|
|
|
/* If this socket is not ready to receive data from us, put the
|
|
|
|
|
* send back on the tail of the list and decrease the timeout |
|
|
|
|
* to poll to either look for a client that is ready or poll |
|
|
|
|
* select on this one */ |
|
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
|
fd = client->fd; |
|
|
|
|
if (!ret) |
|
|
|
|
ret = wait_write_select(fd, 0); |
|
|
|
|
ck_runlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
if (ret < 1) { |
|
|
|
|
if (ret < 0) { |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
goto contfree; |
|
|
|
|
} |
|
|
|
|
LOGDEBUG("Client %"PRId64" not ready for writes", client->id); |
|
|
|
|
|
|
|
|
|
/* Append it to the tail of the delayed sends list.
|
|
|
|
|
* This is the only function that alters it so no |
|
|
|
|
* locking is required. Keep the client ref. */ |
|
|
|
|
DL_APPEND(cdata->delayed_sends, sender_send); |
|
|
|
|
cdata->sends_delayed++; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
while (sender_send->len) { |
|
|
|
|
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); |
|
|
|
|
if (unlikely(ret < 0)) { |
|
|
|
|
LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
ofs += ret; |
|
|
|
|
sender_send->len -= ret; |
|
|
|
|
} |
|
|
|
|
contfree: |
|
|
|
|
sent = true; |
|
|
|
|
free(sender_send->buf); |
|
|
|
|
free(sender_send); |
|
|
|
|
dec_instance_ref(cdata, client); |
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* We shouldn't get here unless there's an error */ |
|
|
|
|
childsighandler(15); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Send a client by id a heap allocated buffer, allowing this function to
|
|
|
|
|
* free the ram. */ |
|
|
|
|
static void send_client(cdata_t *cdata, int64_t id, char *buf) |
|
|
|
|
static void send_client(cdata_t *cdata, const int64_t id, char *buf) |
|
|
|
|
{ |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
sender_send_t *sender_send; |
|
|
|
|
client_instance_t *client; |
|
|
|
|
int fd = -1, len; |
|
|
|
|
int len; |
|
|
|
|
|
|
|
|
|
if (unlikely(!buf)) { |
|
|
|
|
LOGWARNING("Connector send_client sent a null buffer"); |
|
|
|
@ -617,29 +652,37 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
HASH_FIND_I64(cdata->clients, &id, client); |
|
|
|
|
if (likely(client)) { |
|
|
|
|
fd = client->fd; |
|
|
|
|
/* Grab a reference to this client until the sender_send has
|
|
|
|
|
* completed processing. */ |
|
|
|
|
__inc_instance_ref(client); |
|
|
|
|
} |
|
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
if (unlikely(fd == -1)) { |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
|
|
|
|
|
* completed processing. Is this a passthrough subclient ? */ |
|
|
|
|
if (id > 0xffffffffll) { |
|
|
|
|
int64_t client_id, pass_id; |
|
|
|
|
|
|
|
|
|
client_id = id & 0xffffffffll; |
|
|
|
|
pass_id = id >> 32; |
|
|
|
|
/* Make sure the passthrough exists for passthrough subclients */ |
|
|
|
|
client = ref_client_by_id(cdata, pass_id); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", |
|
|
|
|
pass_id, client_id); |
|
|
|
|
/* Now see if the subclient exists */ |
|
|
|
|
client = ref_client_by_id(cdata, client_id); |
|
|
|
|
if (client) { |
|
|
|
|
/* This shouldn't happen */ |
|
|
|
|
LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
dec_instance_ref(cdata, client); |
|
|
|
|
} else |
|
|
|
|
stratifier_drop_id(ckp, id); |
|
|
|
|
free(buf); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
client = ref_client_by_id(cdata, id); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
LOGINFO("Connector failed to find client id %"PRId64" to send to", id); |
|
|
|
|
} |
|
|
|
|
stratifier_drop_id(ckp, id); |
|
|
|
|
free(buf); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sender_send = ckzalloc(sizeof(sender_send_t)); |
|
|
|
|
sender_send->client = client; |
|
|
|
@ -676,7 +719,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
|
|
|
|
|
|
|
|
|
|
static void process_client_msg(cdata_t *cdata, const char *buf) |
|
|
|
|
{ |
|
|
|
|
int64_t client_id64, client_id; |
|
|
|
|
int64_t client_id; |
|
|
|
|
json_t *json_msg; |
|
|
|
|
char *msg; |
|
|
|
|
|
|
|
|
@ -687,21 +730,18 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Extract the client id from the json message and remove its entry */ |
|
|
|
|
json_getdel_int64(&client_id64, json_msg, "client_id"); |
|
|
|
|
if (client_id64 > 0xffffffffll) { |
|
|
|
|
int64_t passthrough_id; |
|
|
|
|
|
|
|
|
|
passthrough_id = client_id64 & 0xffffffffll; |
|
|
|
|
client_id = client_id64 >> 32; |
|
|
|
|
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); |
|
|
|
|
} else |
|
|
|
|
client_id = client_id64; |
|
|
|
|
client_id = json_integer_value(json_object_get(json_msg, "client_id")); |
|
|
|
|
json_object_del(json_msg, "client_id"); |
|
|
|
|
/* Put client_id back in for a passthrough subclient, passing its
|
|
|
|
|
* upstream client_id instead of the passthrough's. */ |
|
|
|
|
if (client_id > 0xffffffffll) |
|
|
|
|
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); |
|
|
|
|
msg = json_dumps(json_msg, JSON_EOL); |
|
|
|
|
send_client(cdata, client_id, msg); |
|
|
|
|
json_decref(json_msg); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static char *connector_stats(cdata_t *cdata) |
|
|
|
|
static char *connector_stats(cdata_t *cdata, const int runtime) |
|
|
|
|
{ |
|
|
|
|
json_t *val = json_object(), *subval; |
|
|
|
|
client_instance_t *client; |
|
|
|
@ -710,6 +750,10 @@ static char *connector_stats(cdata_t *cdata)
|
|
|
|
|
int64_t memsize; |
|
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
/* If called in passthrough mode we log stats instead of the stratifier */ |
|
|
|
|
if (runtime) |
|
|
|
|
json_set_int(val, "runtime", runtime); |
|
|
|
|
|
|
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
|
objects = HASH_COUNT(cdata->clients); |
|
|
|
|
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; |
|
|
|
@ -732,68 +776,57 @@ static char *connector_stats(cdata_t *cdata)
|
|
|
|
|
memsize = 0; |
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
generated = cdata->sends_generated; |
|
|
|
|
DL_FOREACH(cdata->sender_sends, send) { |
|
|
|
|
objects++; |
|
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
|
} |
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); |
|
|
|
|
json_set_object(val, "sends", subval); |
|
|
|
|
|
|
|
|
|
objects = 0; |
|
|
|
|
memsize = 0; |
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
generated = cdata->sends_delayed; |
|
|
|
|
DL_FOREACH(cdata->delayed_sends, send) { |
|
|
|
|
objects++; |
|
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
|
} |
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); |
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
json_set_object(val, "delays", subval); |
|
|
|
|
|
|
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|
|
json_decref(val); |
|
|
|
|
if (runtime) |
|
|
|
|
LOGNOTICE("Passthrough:%s", buf); |
|
|
|
|
else |
|
|
|
|
LOGNOTICE("Connector stats: %s", buf); |
|
|
|
|
return buf; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) |
|
|
|
|
{ |
|
|
|
|
int64_t client_id64, client_id; |
|
|
|
|
unix_msg_t *umsg = NULL; |
|
|
|
|
ckpool_t *ckp = pi->ckp; |
|
|
|
|
uint8_t test_cycle = 0; |
|
|
|
|
char *buf; |
|
|
|
|
time_t last_stats; |
|
|
|
|
int64_t client_id; |
|
|
|
|
int ret = 0; |
|
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
LOGWARNING("%s connector ready", ckp->name); |
|
|
|
|
last_stats = cdata->start_time; |
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
|
if (ckp->passthrough) { |
|
|
|
|
time_t diff = time(NULL); |
|
|
|
|
|
|
|
|
|
if (diff - last_stats >= 60) { |
|
|
|
|
last_stats = diff; |
|
|
|
|
diff -= cdata->start_time; |
|
|
|
|
buf = connector_stats(cdata, diff); |
|
|
|
|
dealloc(buf); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (umsg) { |
|
|
|
|
Close(umsg->sockd); |
|
|
|
|
free(umsg->buf); |
|
|
|
|
dealloc(umsg); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!++test_cycle) { |
|
|
|
|
/* Test for pthread join every 256 messages */ |
|
|
|
|
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { |
|
|
|
|
LOGEMERG("Connector sender thread shutdown, exiting"); |
|
|
|
|
ret = 1; |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { |
|
|
|
|
LOGEMERG("Connector receiver thread shutdown, exiting"); |
|
|
|
|
ret = 1; |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
do { |
|
|
|
|
umsg = get_unix_msg(pi); |
|
|
|
|
} while (!umsg); |
|
|
|
@ -807,12 +840,14 @@ retry:
|
|
|
|
|
} else if (cmdmatch(buf, "dropclient")) { |
|
|
|
|
client_instance_t *client; |
|
|
|
|
|
|
|
|
|
ret = sscanf(buf, "dropclient=%"PRId64, &client_id64); |
|
|
|
|
if (unlikely(ret < 0)) { |
|
|
|
|
ret = sscanf(buf, "dropclient=%"PRId64, &client_id); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
LOGDEBUG("Connector failed to parse dropclient command: %s", buf); |
|
|
|
|
goto retry; |
|
|
|
|
} |
|
|
|
|
client_id = client_id64 & 0xffffffffll; |
|
|
|
|
/* A passthrough client, we can't drop this yet */ |
|
|
|
|
if (client_id > 0xffffffffll) |
|
|
|
|
goto retry; |
|
|
|
|
client = ref_client_by_id(cdata, client_id); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id); |
|
|
|
@ -823,16 +858,16 @@ retry:
|
|
|
|
|
if (ret >= 0) |
|
|
|
|
LOGINFO("Connector dropped client id: %"PRId64, client_id); |
|
|
|
|
} else if (cmdmatch(buf, "testclient")) { |
|
|
|
|
ret = sscanf(buf, "testclient=%"PRId64, &client_id64); |
|
|
|
|
ret = sscanf(buf, "testclient=%"PRId64, &client_id); |
|
|
|
|
if (unlikely(ret < 0)) { |
|
|
|
|
LOGDEBUG("Connector failed to parse testclient command: %s", buf); |
|
|
|
|
goto retry; |
|
|
|
|
} |
|
|
|
|
client_id = client_id64 & 0xffffffffll; |
|
|
|
|
client_id &= 0xffffffffll; |
|
|
|
|
if (client_exists(cdata, client_id)) |
|
|
|
|
goto retry; |
|
|
|
|
LOGINFO("Connector detected non-existent client id: %"PRId64, client_id); |
|
|
|
|
stratifier_drop_client(ckp, client_id); |
|
|
|
|
stratifier_drop_id(ckp, client_id); |
|
|
|
|
} else if (cmdmatch(buf, "ping")) { |
|
|
|
|
LOGDEBUG("Connector received ping request"); |
|
|
|
|
send_unix_msg(umsg->sockd, "pong"); |
|
|
|
@ -846,7 +881,7 @@ retry:
|
|
|
|
|
char *msg; |
|
|
|
|
|
|
|
|
|
LOGDEBUG("Connector received stats request"); |
|
|
|
|
msg = connector_stats(cdata); |
|
|
|
|
msg = connector_stats(cdata, 0); |
|
|
|
|
send_unix_msg(umsg->sockd, msg); |
|
|
|
|
} else if (cmdmatch(buf, "loglevel")) { |
|
|
|
|
sscanf(buf, "loglevel=%d", &ckp->loglevel); |
|
|
|
@ -925,7 +960,9 @@ int connector(proc_instance_t *pi)
|
|
|
|
|
Close(sockd); |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
if (listen(sockd, SOMAXCONN) < 0) { |
|
|
|
|
/* Set listen backlog to larger than SOMAXCONN in case the
|
|
|
|
|
* system configuration supports it */ |
|
|
|
|
if (listen(sockd, 8192) < 0) { |
|
|
|
|
LOGERR("Connector failed to listen on socket"); |
|
|
|
|
Close(sockd); |
|
|
|
|
goto out; |
|
|
|
@ -967,7 +1004,7 @@ int connector(proc_instance_t *pi)
|
|
|
|
|
ret = 1; |
|
|
|
|
goto out; |
|
|
|
|
} |
|
|
|
|
if (listen(sockd, SOMAXCONN) < 0) { |
|
|
|
|
if (listen(sockd, 8192) < 0) { |
|
|
|
|
LOGERR("Connector failed to listen on socket"); |
|
|
|
|
Close(sockd); |
|
|
|
|
goto out; |
|
|
|
@ -982,11 +1019,14 @@ int connector(proc_instance_t *pi)
|
|
|
|
|
cklock_init(&cdata->lock); |
|
|
|
|
cdata->pi = pi; |
|
|
|
|
cdata->nfds = 0; |
|
|
|
|
cdata->client_id = 1; |
|
|
|
|
/* Set the client id to the highest serverurl count to distinguish
|
|
|
|
|
* them from the server fds in epoll. */ |
|
|
|
|
cdata->client_id = ckp->serverurls; |
|
|
|
|
mutex_init(&cdata->sender_lock); |
|
|
|
|
cond_init(&cdata->sender_cond); |
|
|
|
|
create_pthread(&cdata->pth_sender, sender, cdata); |
|
|
|
|
create_pthread(&cdata->pth_receiver, receiver, cdata); |
|
|
|
|
cdata->start_time = time(NULL); |
|
|
|
|
|
|
|
|
|
create_unix_receiver(pi); |
|
|
|
|
|
|
|
|
|