Browse Source

Move the functions that require ckpool specific variable types out of libckpool

master
Con Kolivas 11 years ago
parent
commit
bc78acceca
  1. 223
      src/ckpool.c
  2. 8
      src/ckpool.h
  3. 219
      src/libckpool.c
  4. 7
      src/libckpool.h

223
src/ckpool.c

@ -82,6 +82,225 @@ bool ping_main(ckpool_t *ckp)
return true; return true;
} }
/* Peek in a socket, and then receive only one line at a time, allocing enough
* memory in *buf */
int read_socket_line(connsock_t *cs, int timeout)
{
size_t buflen = 0, bufofs = 0;
int ret = -1, bufsiz;
char *eom = NULL;
tv_t tv_timeout;
fd_set rd;
dealloc(cs->buf);
if (unlikely(cs->fd < 0))
return ret;
bufsiz = PAGESIZE;
while (!eom) {
char readbuf[PAGESIZE] = {};
int extralen;
FD_ZERO(&rd);
FD_SET(cs->fd, &rd);
tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0;
ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout);
if (ret < 1) {
if (!ret)
LOGDEBUG("Select timed out in read_socket_line");
else
LOGERR("Select failed in read_socket_line");
goto out;
}
ret = recv(cs->fd, readbuf, bufsiz - 2, MSG_PEEK);
if (ret < 0) {
LOGERR("Failed to recv in read_socket_line");
goto out;
}
if (!ret)
continue;
eom = strchr(readbuf, '\n');
if (eom)
extralen = eom - readbuf + 1;
else
extralen = ret;
buflen += extralen + 1;
align_len(&buflen);
cs->buf = realloc(cs->buf, buflen);
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
ret = recv(cs->fd, cs->buf + bufofs, extralen, 0);
if (ret < 0) {
LOGERR("Failed to recv %d bytes in read_socket_line", (int)buflen);
goto out;
}
bufofs += ret;
}
eom = cs->buf + bufofs;
eom[0] = '\0';
ret = bufofs + 1;
out:
if (ret < 1)
dealloc(cs->buf);
return ret;
}
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path;
bool ret = false;
int sockd;
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
}
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path);
if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
ret = true;
close(sockd);
out:
if (unlikely(!ret))
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
return ret;
}
/* Send a single message to a process instance and retrieve the response, then
* close the socket. */
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path, *buf = NULL;
int sockd;
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
}
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path);
if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
buf = recv_unix_msg(sockd);
close(sockd);
out:
if (unlikely(!buf))
LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line);
return buf;
}
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{
char *http_req = NULL;
json_error_t err_val;
json_t *val = NULL;
int len, ret;
if (unlikely(cs->fd < 0)) {
LOGWARNING("FD %d invalid in json_rpc_call", cs->fd);
goto out;
}
if (unlikely(!cs->url)) {
LOGWARNING("No URL in json_rpc_call");
goto out;
}
if (unlikely(!cs->port)) {
LOGWARNING("No port in json_rpc_call");
goto out;
}
if (unlikely(!cs->auth)) {
LOGWARNING("No auth in json_rpc_call");
goto out;
}
if (unlikely(!rpc_req)) {
LOGWARNING("Null rpc_req passed to json_rpc_call");
goto out;
}
len = strlen(rpc_req);
if (unlikely(!len)) {
LOGWARNING("Zero length rpc_req passed to json_rpc_call");
goto out;
}
http_req = ckalloc(len + 256); // Leave room for headers
sprintf(http_req,
"POST / HTTP/1.1\n"
"Authorization: Basic %s\n"
"Host: %s:%s\n"
"Content-type: application/json\n"
"Content-Length: %d\n\n%s",
cs->auth, cs->url, cs->port, len, rpc_req);
len = strlen(http_req);
ret = write_socket(cs->fd, http_req, len);
if (ret != len) {
LOGWARNING("Failed to write to socket in json_rpc_call");
goto out_empty;
}
ret = read_socket_line(cs, 5);
if (ret < 1) {
LOGWARNING("Failed to read socket line in json_rpc_call");
goto out_empty;
}
if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) {
LOGWARNING("HTTP response not ok: %s", cs->buf);
goto out_empty;
}
do {
ret = read_socket_line(cs, 5);
if (ret < 1) {
LOGWARNING("Failed to read http socket lines in json_rpc_call");
goto out_empty;
}
} while (strncmp(cs->buf, "{", 1));
val = json_loads(cs->buf, 0, &err_val);
if (!val)
LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text);
out_empty:
empty_socket(cs->fd);
if (!val) {
/* Assume that a failed request means the socket will be closed
* and reopen it */
LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port);
close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port);
}
out:
free(http_req);
dealloc(cs->buf);
return val;
}
/* Open the file in path, check if there is a pid in there that still exists /* Open the file in path, check if there is a pid in there that still exists
* and if not, write the pid into that file. */ * and if not, write the pid into that file. */
static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid) static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid)
@ -449,8 +668,8 @@ static void *watchdog(void *arg)
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
struct sigaction handler; struct sigaction handler;
int len, c, ret, i;
char buf[512] = {}; char buf[512] = {};
int c, ret, i;
ckpool_t ckp; ckpool_t ckp;
global_ckp = &ckp; global_ckp = &ckp;
@ -502,7 +721,6 @@ int main(int argc, char **argv)
ckp.socket_dir = strdup("/tmp/"); ckp.socket_dir = strdup("/tmp/");
realloc_strcat(&ckp.socket_dir, ckp.name); realloc_strcat(&ckp.socket_dir, ckp.name);
} }
len = strlen(ckp.socket_dir);
trail_slash(&ckp.socket_dir); trail_slash(&ckp.socket_dir);
/* Ignore sigpipe */ /* Ignore sigpipe */
@ -547,7 +765,6 @@ int main(int argc, char **argv)
quit(0, "No proxy entries found in config file %s", ckp.config); quit(0, "No proxy entries found in config file %s", ckp.config);
/* Create the log directory */ /* Create the log directory */
len = strlen(ckp.logdir);
trail_slash(&ckp.logdir); trail_slash(&ckp.logdir);
ret = mkdir(ckp.logdir, 0700); ret = mkdir(ckp.logdir, 0700);
if (ret && errno != EEXIST) if (ret && errno != EEXIST)

8
src/ckpool.h

@ -109,6 +109,14 @@ struct ckpool_instance {
ckpool_t *global_ckp; ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
int read_socket_line(connsock_t *cs, int timeout);
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret); int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret);
#define ASPRINTF(strp, fmt, ...) do { \ #define ASPRINTF(strp, fmt, ...) do { \

219
src/libckpool.c

@ -462,70 +462,6 @@ out:
return ret; return ret;
} }
/* Peek in a socket, and then receive only one line at a time, allocing enough
* memory in *buf */
int read_socket_line(connsock_t *cs, int timeout)
{
size_t buflen = 0, bufofs = 0;
int ret = -1, bufsiz;
char *eom = NULL;
tv_t tv_timeout;
fd_set rd;
dealloc(cs->buf);
if (unlikely(cs->fd < 0))
return ret;
bufsiz = PAGESIZE;
while (!eom) {
char readbuf[PAGESIZE] = {};
int extralen;
FD_ZERO(&rd);
FD_SET(cs->fd, &rd);
tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0;
ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout);
if (ret < 1) {
if (!ret)
LOGDEBUG("Select timed out in read_socket_line");
else
LOGERR("Select failed in read_socket_line");
goto out;
}
ret = recv(cs->fd, readbuf, bufsiz - 2, MSG_PEEK);
if (ret < 0) {
LOGERR("Failed to recv in read_socket_line");
goto out;
}
if (!ret)
continue;
eom = strchr(readbuf, '\n');
if (eom)
extralen = eom - readbuf + 1;
else
extralen = ret;
buflen += extralen + 1;
align_len(&buflen);
cs->buf = realloc(cs->buf, buflen);
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
ret = recv(cs->fd, cs->buf + bufofs, extralen, 0);
if (ret < 0) {
LOGERR("Failed to recv %d bytes in read_socket_line", (int)buflen);
goto out;
}
bufofs += ret;
}
eom = cs->buf + bufofs;
eom[0] = '\0';
ret = bufofs + 1;
out:
if (ret < 1)
dealloc(cs->buf);
return ret;
}
void empty_socket(int fd) void empty_socket(int fd)
{ {
int ret; int ret;
@ -814,78 +750,6 @@ out:
return retval; return retval;
} }
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path;
bool ret = false;
int sockd;
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
}
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path);
if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
ret = true;
close(sockd);
out:
if (unlikely(!ret))
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
return ret;
}
/* Send a single message to a process instance and retrieve the response, then
* close the socket. */
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
char *path = pi->us.path, *buf = NULL;
int sockd;
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
}
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
if (unlikely(kill(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path);
if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
buf = recv_unix_msg(sockd);
close(sockd);
out:
if (unlikely(!buf))
LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line);
return buf;
}
/* Extracts a string value from a json array with error checking. To be used /* Extracts a string value from a json array with error checking. To be used
* when the value of the string returned is only examined and not to be stored. * when the value of the string returned is only examined and not to be stored.
* See json_array_string below */ * See json_array_string below */
@ -917,89 +781,6 @@ char *json_array_string(json_t *val, unsigned int entry)
} }
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{
char *http_req = NULL;
json_error_t err_val;
json_t *val = NULL;
int len, ret;
if (unlikely(cs->fd < 0)) {
LOGWARNING("FD %d invalid in json_rpc_call", cs->fd);
goto out;
}
if (unlikely(!cs->url)) {
LOGWARNING("No URL in json_rpc_call");
goto out;
}
if (unlikely(!cs->port)) {
LOGWARNING("No port in json_rpc_call");
goto out;
}
if (unlikely(!cs->auth)) {
LOGWARNING("No auth in json_rpc_call");
goto out;
}
if (unlikely(!rpc_req)) {
LOGWARNING("Null rpc_req passed to json_rpc_call");
goto out;
}
len = strlen(rpc_req);
if (unlikely(!len)) {
LOGWARNING("Zero length rpc_req passed to json_rpc_call");
goto out;
}
http_req = ckalloc(len + 256); // Leave room for headers
sprintf(http_req,
"POST / HTTP/1.1\n"
"Authorization: Basic %s\n"
"Host: %s:%s\n"
"Content-type: application/json\n"
"Content-Length: %d\n\n%s",
cs->auth, cs->url, cs->port, len, rpc_req);
len = strlen(http_req);
ret = write_socket(cs->fd, http_req, len);
if (ret != len) {
LOGWARNING("Failed to write to socket in json_rpc_call");
goto out_empty;
}
ret = read_socket_line(cs, 5);
if (ret < 1) {
LOGWARNING("Failed to read socket line in json_rpc_call");
goto out_empty;
}
if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) {
LOGWARNING("HTTP response not ok: %s", cs->buf);
goto out_empty;
}
do {
ret = read_socket_line(cs, 5);
if (ret < 1) {
LOGWARNING("Failed to read http socket lines in json_rpc_call");
goto out_empty;
}
} while (strncmp(cs->buf, "{", 1));
val = json_loads(cs->buf, 0, &err_val);
if (!val)
LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text);
out_empty:
empty_socket(cs->fd);
if (!val) {
/* Assume that a failed request means the socket will be closed
* and reopen it */
LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port);
close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port);
}
out:
free(http_req);
dealloc(cs->buf);
return val;
}
/* Align a size_t to 4 byte boundaries for fussy arches */ /* Align a size_t to 4 byte boundaries for fussy arches */
void align_len(size_t *len) void align_len(size_t *len)
{ {

7
src/libckpool.h

@ -313,7 +313,6 @@ void block_socket(int fd);
int bind_socket(char *url, char *port); int bind_socket(char *url, char *port);
int connect_socket(char *url, char *port); int connect_socket(char *url, char *port);
int write_socket(int fd, const void *buf, size_t nbyte); int write_socket(int fd, const void *buf, size_t nbyte);
int read_socket_line(connsock_t *cs, int timeout);
void empty_socket(int fd); void empty_socket(int fd);
void close_unix_socket(const int sockd, const char *server_path); void close_unix_socket(const int sockd, const char *server_path);
int _open_unix_server(const char *server_path, const char *file, const char *func, const int line); int _open_unix_server(const char *server_path, const char *file, const char *func, const int line);
@ -328,16 +327,10 @@ int wait_write_select(int sockd, int timeout);
int write_length(int sockd, const void *buf, int len); int write_length(int sockd, const void *buf, int len);
bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line); bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line);
#define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__) #define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__)
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
const char *__json_array_string(json_t *val, unsigned int entry); const char *__json_array_string(json_t *val, unsigned int entry);
char *json_array_string(json_t *val, unsigned int entry); char *json_array_string(json_t *val, unsigned int entry);
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
void align_len(size_t *len); void align_len(size_t *len);
void realloc_strcat(char **ptr, const char *s); void realloc_strcat(char **ptr, const char *s);
void trail_slash(char **buf); void trail_slash(char **buf);

Loading…
Cancel
Save