Browse Source

Move connector to using unix receive queues

master
Con Kolivas 10 years ago
parent
commit
cd754e98e4
  1. 49
      src/connector.c
  2. 2
      src/stratifier.c

49
src/connector.c

@ -768,25 +768,22 @@ static char *connector_stats(cdata_t *cdata)
static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{
int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id;
unixsock_t *us = &pi->us;
unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp;
uint8_t test_cycle = 0;
char *buf = NULL;
do {
selret = wait_read_select(us->sockd, 5);
if (!selret && !ping_main(ckp)) {
LOGEMERG("Connector failed to ping main process, exiting");
ret = 1;
goto out;
}
} while (selret < 1);
char *buf;
int ret = 0;
LOGWARNING("%s connector ready", ckp->name);
retry:
if (umsg) {
Close(umsg->sockd);
free(umsg->buf);
dealloc(umsg);
}
if (!++test_cycle) {
/* Test for pthread join every 256 messages */
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
@ -801,21 +798,11 @@ retry:
}
}
Close(sockd);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGEMERG("Failed to accept on connector socket, exiting");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in connector_loop");
goto retry;
}
do {
umsg = get_unix_msg(pi);
} while (!umsg);
buf = umsg->buf;
LOGDEBUG("Connector received message: %s", buf);
/* The bulk of the messages will be json messages to send to clients
* so look for them first. */
@ -841,7 +828,7 @@ retry:
LOGINFO("Connector dropped client id: %"PRId64, client_id);
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Connector received ping request");
send_unix_msg(sockd, "pong");
send_unix_msg(umsg->sockd, "pong");
} else if (cmdmatch(buf, "accept")) {
LOGDEBUG("Connector received accept signal");
cdata->accept = true;
@ -853,7 +840,7 @@ retry:
LOGDEBUG("Connector received stats request");
msg = connector_stats(cdata);
send_unix_msg(sockd, msg);
send_unix_msg(umsg->sockd, msg);
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) {
@ -878,13 +865,11 @@ retry:
sscanf(buf, "getxfd%d", &fdno);
if (fdno > -1 && fdno < ckp->serverurls)
send_fd(cdata->serverfd[fdno], sockd);
send_fd(cdata->serverfd[fdno], umsg->sockd);
} else
LOGWARNING("Unhandled connector message: %s", buf);
goto retry;
out:
Close(sockd);
dealloc(buf);
return ret;
}
@ -996,6 +981,8 @@ int connector(proc_instance_t *pi)
create_pthread(&cdata->pth_sender, sender, cdata);
create_pthread(&cdata->pth_receiver, receiver, cdata);
create_unix_receiver(pi);
ret = connector_loop(pi, cdata);
out:
dealloc(ckp->data);

2
src/stratifier.c

@ -1770,7 +1770,7 @@ retry:
umsg = get_unix_msg(pi);
if (unlikely(!umsg &&!ping_main(ckp))) {
LOGEMERG("Generator failed to ping main process, exiting");
LOGEMERG("Stratifier failed to ping main process, exiting");
ret = 1;
goto out;
}

Loading…
Cancel
Save