Browse Source

Create a separate thread for sending messages to clients, not blocking on any clients not ready for reads

master
Con Kolivas 11 years ago
parent
commit
457f1c17ee
  1. 145
      src/connector.c
  2. 6
      src/libckpool.c
  3. 2
      src/libckpool.h
  4. 4
      src/stratifier.c

145
src/connector.c

@ -19,6 +19,7 @@
#include "ckpool.h"
#include "libckpool.h"
#include "uthash.h"
#include "utlist.h"
#define MAX_MSGSIZE 1024
@ -47,6 +48,24 @@ typedef struct client_instance client_instance_t;
static client_instance_t *clients;
static int client_id;
struct sender_send {
struct sender_send *next;
struct sender_send *prev;
client_instance_t *client;
char *buf;
int len;
};
typedef struct sender_send sender_send_t;
/* For the linked list of pending sends */
static sender_send_t *sender_sends;
/* For protecting the pending sends list */
static pthread_mutex_t sender_lock;
static pthread_cond_t sender_cond;
/* Accepts incoming connections to the server socket and generates client
* instances */
void *acceptor(void *arg)
@ -97,7 +116,7 @@ static void invalidate_client(ckpool_t *ckp, client_instance_t *client)
send_proc(&ckp->stratifier, buf);
}
static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf);
static void send_client(conn_instance_t *ci, int id, char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
{
@ -150,8 +169,10 @@ reparse:
memmove(client->buf, client->buf + buflen, client->bufofs);
val = json_loads(msg, 0, NULL);
if (!val) {
char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %d sent invalid json message %s", client->id, msg);
send_client(ckp, ci, client->id, "Invalid JSON, disconnecting\n");
send_client(ci, client->id, buf);
invalidate_client(ckp, client);
return;
} else {
@ -177,7 +198,7 @@ void *receiver(void *arg)
struct pollfd fds[65536];
int ret, nfds, i;
rename_proc("receiver");
rename_proc("creceiver");
retry:
nfds = 0;
@ -237,10 +258,89 @@ out:
return NULL;
}
static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf)
/* Use a thread to send queued messages, using select() to only send to sockets
* ready for writing immediately to not delay other messages. */
void *sender(void *arg)
{
int fd = -1, ret, len, ofs = 0;
conn_instance_t *ci = (conn_instance_t *)arg;
ckpool_t *ckp = ci->pi->ckp;
rename_proc("creceiver");
while (42) {
sender_send_t *sender_send;
client_instance_t *client;
tv_t timeout_tv = {0, 0};
int ret, fd, ofs = 0;
fd_set writefds;
mutex_lock(&sender_lock);
if (!sender_sends) {
ts_t timeout_ts;
ts_realtime(&timeout_ts);
timeout_ts.tv_sec += 1;
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
}
sender_send = sender_sends;
if (likely(sender_send))
DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock);
if (!sender_send)
continue;
client = sender_send->client;
ck_rlock(&ci->lock);
fd = client->fd;
ck_runlock(&ci->lock);
if (fd == -1) {
LOGDEBUG("Discarding message sent to invalidated client");
free(sender_send->buf);
free(sender_send);
continue;
}
FD_ZERO(&writefds);
FD_SET(fd, &writefds);
ret = select(fd + 1, NULL, &writefds, NULL, &timeout_tv);
if (ret < 1) {
LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the list */
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
mutex_unlock(&sender_lock);
continue;
}
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
if (interrupted())
continue;
LOGWARNING("Client id %d disconnected", client->id);
invalidate_client(ckp, client);
break;
}
ofs += ret;
sender_send->len -= ret;
}
free(sender_send->buf);
free(sender_send);
}
return NULL;
}
/* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */
static void send_client(conn_instance_t *ci, int id, char *buf)
{
sender_send_t *sender_send;
client_instance_t *client;
int fd = -1, len;
if (unlikely(!buf)) {
LOGWARNING("Connector send_client sent a null buffer");
@ -249,6 +349,7 @@ static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *
len = strlen(buf);
if (unlikely(!len)) {
LOGWARNING("Connector send_client sent a zero length buffer");
free(buf);
return;
}
@ -266,21 +367,18 @@ static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *
return;
}
while (len) {
ret = send(fd, buf + ofs, len , 0);
if (unlikely(ret < 0)) {
if (interrupted())
continue;
LOGWARNING("Client id %d disconnected", id);
invalidate_client(ckp, client);
return;
}
ofs += ret;
len -= ret;
}
sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client;
sender_send->buf = buf;
sender_send->len = len;
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
pthread_cond_signal(&sender_cond);
mutex_unlock(&sender_lock);
}
static int connector_loop(ckpool_t *ckp, proc_instance_t *pi, conn_instance_t *ci)
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
{
int sockd, client_id, ret = 0;
unixsock_t *us = &pi->us;
@ -288,7 +386,6 @@ static int connector_loop(ckpool_t *ckp, proc_instance_t *pi, conn_instance_t *c
json_t *json_msg;
retry:
dealloc(buf);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
if (interrupted())
@ -319,18 +416,17 @@ retry:
dealloc(buf);
buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n");
send_client(ckp, ci, client_id, buf);
send_client(ci, client_id, buf);
json_decref(json_msg);
goto retry;
out:
dealloc(buf);
return ret;
}
int connector(proc_instance_t *pi)
{
pthread_t pth_acceptor, pth_receiver;
pthread_t pth_sender, pth_acceptor, pth_receiver;
char *url = NULL, *port = NULL;
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
@ -401,10 +497,13 @@ int connector(proc_instance_t *pi)
ci.pi = pi;
ci.serverfd = sockd;
ci.nfds = 0;
mutex_init(&sender_lock);
cond_init(&sender_cond);
create_pthread(&pth_sender, sender, &ci);
create_pthread(&pth_acceptor, acceptor, &ci);
create_pthread(&pth_receiver, receiver, &ci);
ret = connector_loop(ckp, pi, &ci);
ret = connector_loop(pi, &ci);
//join_pthread(pth_acceptor);
out:

6
src/libckpool.c

@ -1277,14 +1277,14 @@ void tv_time(tv_t *tv)
gettimeofday(tv, NULL);
}
void ts_time(ts_t *ts)
void ts_realtime(ts_t *ts)
{
clock_gettime(CLOCK_MONOTONIC, ts);
clock_gettime(CLOCK_REALTIME, ts);
}
void cksleep_prepare_r(ts_t *ts)
{
ts_time(ts);
clock_gettime(CLOCK_MONOTONIC, ts);
}
void nanosleep_abstime(ts_t *ts_end)

2
src/libckpool.h

@ -297,7 +297,7 @@ void us_to_ts(ts_t *spec, int64_t us);
void ms_to_ts(ts_t *spec, int64_t ms);
void ms_to_tv(tv_t *val, int64_t ms);
void tv_time(tv_t *tv);
void ts_time(ts_t *ts);
void ts_realtime(ts_t *ts);
void cksleep_prepare_r(ts_t *ts);
void nanosleep_abstime(ts_t *ts_end);

4
src/stratifier.c

@ -1268,7 +1268,7 @@ static void *stratum_receiver(void *arg)
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
stratum_msg_t *msg;
rename_proc("receiver");
rename_proc("sreceiver");
while (42) {
stratum_instance_t *instance;
@ -1313,7 +1313,7 @@ static void *stratum_sender(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
rename_proc("sender");
rename_proc("ssender");
while (42) {
stratum_msg_t *msg;

Loading…
Cancel
Save