Browse Source

Use signal handlers from sender and receiver threads in the connector avoid needing to pthread tryjoin on every message

master
Con Kolivas 10 years ago
parent
commit
62d1ec3f8a
  1. 4
      src/ckpool.c
  2. 1
      src/ckpool.h
  3. 25
      src/connector.c

4
src/ckpool.c

@ -202,8 +202,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq)
return ret; return ret;
} }
static void childsighandler(const int sig);
/* Create a standalone thread that queues received unix messages for a proc /* Create a standalone thread that queues received unix messages for a proc
* instance and adds them to linked list of received messages with their * instance and adds them to linked list of received messages with their
* associated receive socket, then signal the associated rmsg_cond for the * associated receive socket, then signal the associated rmsg_cond for the
@ -879,7 +877,7 @@ static void rm_namepid(const proc_instance_t *pi)
/* Disable signal handlers for child processes, but simply pass them onto the /* Disable signal handlers for child processes, but simply pass them onto the
* parent process to shut down cleanly. */ * parent process to shut down cleanly. */
static void childsighandler(const int sig) void childsighandler(const int sig)
{ {
signal(sig, SIG_IGN); signal(sig, SIG_IGN);
signal(SIGTERM, SIG_IGN); signal(SIGTERM, SIG_IGN);

1
src/ckpool.h

@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
void childsighandler(const int sig);
int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret);
bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_string(char **store, const json_t *val, const char *res);
bool json_get_int64(int64_t *store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res);

25
src/connector.c

@ -450,7 +450,7 @@ void *receiver(void *arg)
epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0) { if (epfd < 0) {
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL; goto out;
} }
serverfds = cdata->ckp->serverurls; serverfds = cdata->ckp->serverurls;
/* Add all the serverfds to the epoll */ /* Add all the serverfds to the epoll */
@ -461,7 +461,7 @@ void *receiver(void *arg)
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
if (ret < 0) { if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL; goto out;
} }
} }
@ -541,6 +541,9 @@ void *receiver(void *arg)
noparse: noparse:
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} }
out:
/* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL; return NULL;
} }
@ -641,7 +644,8 @@ contfree:
free(sender_send); free(sender_send);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
} }
/* We shouldn't get here unless there's an error */
childsighandler(15);
return NULL; return NULL;
} }
@ -804,7 +808,6 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{ {
unix_msg_t *umsg = NULL; unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
uint8_t test_cycle = 0;
int64_t client_id; int64_t client_id;
char *buf; char *buf;
int ret = 0; int ret = 0;
@ -818,20 +821,6 @@ retry:
dealloc(umsg); dealloc(umsg);
} }
if (!++test_cycle) {
/* Test for pthread join every 256 messages */
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
}
}
do { do {
umsg = get_unix_msg(pi); umsg = get_unix_msg(pi);
} while (!umsg); } while (!umsg);

Loading…
Cancel
Save