diff --git a/src/ckpool.c b/src/ckpool.c index 6a91289d..6da833d1 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -82,6 +82,225 @@ bool ping_main(ckpool_t *ckp) return true; } +/* Peek in a socket, and then receive only one line at a time, allocing enough + * memory in *buf */ +int read_socket_line(connsock_t *cs, int timeout) +{ + size_t buflen = 0, bufofs = 0; + int ret = -1, bufsiz; + char *eom = NULL; + tv_t tv_timeout; + fd_set rd; + + dealloc(cs->buf); + if (unlikely(cs->fd < 0)) + return ret; + + bufsiz = PAGESIZE; + while (!eom) { + char readbuf[PAGESIZE] = {}; + int extralen; + + FD_ZERO(&rd); + FD_SET(cs->fd, &rd); + tv_timeout.tv_sec = timeout; + tv_timeout.tv_usec = 0; + ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout); + if (ret < 1) { + if (!ret) + LOGDEBUG("Select timed out in read_socket_line"); + else + LOGERR("Select failed in read_socket_line"); + goto out; + } + ret = recv(cs->fd, readbuf, bufsiz - 2, MSG_PEEK); + if (ret < 0) { + LOGERR("Failed to recv in read_socket_line"); + goto out; + } + if (!ret) + continue; + eom = strchr(readbuf, '\n'); + if (eom) + extralen = eom - readbuf + 1; + else + extralen = ret; + buflen += extralen + 1; + align_len(&buflen); + cs->buf = realloc(cs->buf, buflen); + if (unlikely(!cs->buf)) + quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); + ret = recv(cs->fd, cs->buf + bufofs, extralen, 0); + if (ret < 0) { + LOGERR("Failed to recv %d bytes in read_socket_line", (int)buflen); + goto out; + } + bufofs += ret; + } + eom = cs->buf + bufofs; + eom[0] = '\0'; + ret = bufofs + 1; +out: + if (ret < 1) + dealloc(cs->buf); + return ret; +} + +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + char *path = pi->us.path; + bool ret = false; + int sockd; + + if (unlikely(!path || !strlen(path))) { + LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); + goto out; + } + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to socket %s in send_proc", path); + goto out; + } + if (unlikely(kill(pi->pid, 0))) { + LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); + goto out; + } + sockd = open_unix_client(path); + if (unlikely(sockd < 0)) { + LOGWARNING("Failed to open socket %s", path); + goto out; + } + if (unlikely(!send_unix_msg(sockd, msg))) + LOGWARNING("Failed to send %s to socket %s", msg, path); + else + ret = true; + close(sockd); +out: + if (unlikely(!ret)) + LOGERR("Failure in send_proc from %s %s:%d", file, func, line); + return ret; +} + +/* Send a single message to a process instance and retrieve the response, then + * close the socket. */ +char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + char *path = pi->us.path, *buf = NULL; + int sockd; + + if (unlikely(!path || !strlen(path))) { + LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); + goto out; + } + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to socket %s in send_proc", path); + goto out; + } + if (unlikely(kill(pi->pid, 0))) { + LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); + goto out; + } + sockd = open_unix_client(path); + if (unlikely(sockd < 0)) { + LOGWARNING("Failed to open socket %s", path); + goto out; + } + if (unlikely(!send_unix_msg(sockd, msg))) + LOGWARNING("Failed to send %s to socket %s", msg, path); + else + buf = recv_unix_msg(sockd); + close(sockd); +out: + if (unlikely(!buf)) + LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line); + return buf; +} + + +json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) +{ + char *http_req = NULL; + json_error_t err_val; + json_t *val = NULL; + int len, ret; + + if (unlikely(cs->fd < 0)) { + LOGWARNING("FD %d invalid in json_rpc_call", cs->fd); + goto out; + } + if (unlikely(!cs->url)) { + LOGWARNING("No URL in json_rpc_call"); + goto out; + } + if (unlikely(!cs->port)) { + LOGWARNING("No port in json_rpc_call"); + goto out; + } + if (unlikely(!cs->auth)) { + LOGWARNING("No auth in json_rpc_call"); + goto out; + } + if (unlikely(!rpc_req)) { + LOGWARNING("Null rpc_req passed to json_rpc_call"); + goto out; + } + len = strlen(rpc_req); + if (unlikely(!len)) { + LOGWARNING("Zero length rpc_req passed to json_rpc_call"); + goto out; + } + http_req = ckalloc(len + 256); // Leave room for headers + sprintf(http_req, + "POST / HTTP/1.1\n" + "Authorization: Basic %s\n" + "Host: %s:%s\n" + "Content-type: application/json\n" + "Content-Length: %d\n\n%s", + cs->auth, cs->url, cs->port, len, rpc_req); + + len = strlen(http_req); + ret = write_socket(cs->fd, http_req, len); + if (ret != len) { + LOGWARNING("Failed to write to socket in json_rpc_call"); + goto out_empty; + } + ret = read_socket_line(cs, 5); + if (ret < 1) { + LOGWARNING("Failed to read socket line in json_rpc_call"); + goto out_empty; + } + if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) { + LOGWARNING("HTTP response not ok: %s", cs->buf); + goto out_empty; + } + do { + ret = read_socket_line(cs, 5); + if (ret < 1) { + LOGWARNING("Failed to read http socket lines in json_rpc_call"); + goto out_empty; + } + } while (strncmp(cs->buf, "{", 1)); + + val = json_loads(cs->buf, 0, &err_val); + if (!val) + LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text); +out_empty: + empty_socket(cs->fd); + if (!val) { + /* Assume that a failed request means the socket will be closed + * and reopen it */ + LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); + close(cs->fd); + cs->fd = connect_socket(cs->url, cs->port); + } +out: + free(http_req); + dealloc(cs->buf); + return val; +} + + /* Open the file in path, check if there is a pid in there that still exists * and if not, write the pid into that file. */ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid) @@ -449,8 +668,8 @@ static void *watchdog(void *arg) int main(int argc, char **argv) { struct sigaction handler; - int len, c, ret, i; char buf[512] = {}; + int c, ret, i; ckpool_t ckp; global_ckp = &ckp; @@ -502,7 +721,6 @@ int main(int argc, char **argv) ckp.socket_dir = strdup("/tmp/"); realloc_strcat(&ckp.socket_dir, ckp.name); } - len = strlen(ckp.socket_dir); trail_slash(&ckp.socket_dir); /* Ignore sigpipe */ @@ -547,7 +765,6 @@ int main(int argc, char **argv) quit(0, "No proxy entries found in config file %s", ckp.config); /* Create the log directory */ - len = strlen(ckp.logdir); trail_slash(&ckp.logdir); ret = mkdir(ckp.logdir, 0700); if (ret && errno != EEXIST) diff --git a/src/ckpool.h b/src/ckpool.h index dff9c54a..fc0916be 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -109,6 +109,14 @@ struct ckpool_instance { ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); +int read_socket_line(connsock_t *cs, int timeout); +bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) +char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) + +json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); + int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret); #define ASPRINTF(strp, fmt, ...) do { \ diff --git a/src/libckpool.c b/src/libckpool.c index d74db825..b764b7fe 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -462,70 +462,6 @@ out: return ret; } -/* Peek in a socket, and then receive only one line at a time, allocing enough - * memory in *buf */ -int read_socket_line(connsock_t *cs, int timeout) -{ - size_t buflen = 0, bufofs = 0; - int ret = -1, bufsiz; - char *eom = NULL; - tv_t tv_timeout; - fd_set rd; - - dealloc(cs->buf); - if (unlikely(cs->fd < 0)) - return ret; - - bufsiz = PAGESIZE; - while (!eom) { - char readbuf[PAGESIZE] = {}; - int extralen; - - FD_ZERO(&rd); - FD_SET(cs->fd, &rd); - tv_timeout.tv_sec = timeout; - tv_timeout.tv_usec = 0; - ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout); - if (ret < 1) { - if (!ret) - LOGDEBUG("Select timed out in read_socket_line"); - else - LOGERR("Select failed in read_socket_line"); - goto out; - } - ret = recv(cs->fd, readbuf, bufsiz - 2, MSG_PEEK); - if (ret < 0) { - LOGERR("Failed to recv in read_socket_line"); - goto out; - } - if (!ret) - continue; - eom = strchr(readbuf, '\n'); - if (eom) - extralen = eom - readbuf + 1; - else - extralen = ret; - buflen += extralen + 1; - align_len(&buflen); - cs->buf = realloc(cs->buf, buflen); - if (unlikely(!cs->buf)) - quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); - ret = recv(cs->fd, cs->buf + bufofs, extralen, 0); - if (ret < 0) { - LOGERR("Failed to recv %d bytes in read_socket_line", (int)buflen); - goto out; - } - bufofs += ret; - } - eom = cs->buf + bufofs; - eom[0] = '\0'; - ret = bufofs + 1; -out: - if (ret < 1) - dealloc(cs->buf); - return ret; -} - void empty_socket(int fd) { int ret; @@ -814,78 +750,6 @@ out: return retval; } -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - char *path = pi->us.path; - bool ret = false; - int sockd; - - if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); - goto out; - } - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to socket %s in send_proc", path); - goto out; - } - if (unlikely(kill(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); - goto out; - } - sockd = open_unix_client(path); - if (unlikely(sockd < 0)) { - LOGWARNING("Failed to open socket %s", path); - goto out; - } - if (unlikely(!send_unix_msg(sockd, msg))) - LOGWARNING("Failed to send %s to socket %s", msg, path); - else - ret = true; - close(sockd); -out: - if (unlikely(!ret)) - LOGERR("Failure in send_proc from %s %s:%d", file, func, line); - return ret; -} - -/* Send a single message to a process instance and retrieve the response, then - * close the socket. */ -char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - char *path = pi->us.path, *buf = NULL; - int sockd; - - if (unlikely(!path || !strlen(path))) { - LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); - goto out; - } - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to socket %s in send_proc", path); - goto out; - } - if (unlikely(kill(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); - goto out; - } - sockd = open_unix_client(path); - if (unlikely(sockd < 0)) { - LOGWARNING("Failed to open socket %s", path); - goto out; - } - if (unlikely(!send_unix_msg(sockd, msg))) - LOGWARNING("Failed to send %s to socket %s", msg, path); - else - buf = recv_unix_msg(sockd); - close(sockd); -out: - if (unlikely(!buf)) - LOGERR("Failure in send_recv_proc from %s %s:%d", file, func, line); - return buf; -} - - /* Extracts a string value from a json array with error checking. To be used * when the value of the string returned is only examined and not to be stored. * See json_array_string below */ @@ -917,89 +781,6 @@ char *json_array_string(json_t *val, unsigned int entry) } -json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) -{ - char *http_req = NULL; - json_error_t err_val; - json_t *val = NULL; - int len, ret; - - if (unlikely(cs->fd < 0)) { - LOGWARNING("FD %d invalid in json_rpc_call", cs->fd); - goto out; - } - if (unlikely(!cs->url)) { - LOGWARNING("No URL in json_rpc_call"); - goto out; - } - if (unlikely(!cs->port)) { - LOGWARNING("No port in json_rpc_call"); - goto out; - } - if (unlikely(!cs->auth)) { - LOGWARNING("No auth in json_rpc_call"); - goto out; - } - if (unlikely(!rpc_req)) { - LOGWARNING("Null rpc_req passed to json_rpc_call"); - goto out; - } - len = strlen(rpc_req); - if (unlikely(!len)) { - LOGWARNING("Zero length rpc_req passed to json_rpc_call"); - goto out; - } - http_req = ckalloc(len + 256); // Leave room for headers - sprintf(http_req, - "POST / HTTP/1.1\n" - "Authorization: Basic %s\n" - "Host: %s:%s\n" - "Content-type: application/json\n" - "Content-Length: %d\n\n%s", - cs->auth, cs->url, cs->port, len, rpc_req); - - len = strlen(http_req); - ret = write_socket(cs->fd, http_req, len); - if (ret != len) { - LOGWARNING("Failed to write to socket in json_rpc_call"); - goto out_empty; - } - ret = read_socket_line(cs, 5); - if (ret < 1) { - LOGWARNING("Failed to read socket line in json_rpc_call"); - goto out_empty; - } - if (strncasecmp(cs->buf, "HTTP/1.1 200 OK", 15)) { - LOGWARNING("HTTP response not ok: %s", cs->buf); - goto out_empty; - } - do { - ret = read_socket_line(cs, 5); - if (ret < 1) { - LOGWARNING("Failed to read http socket lines in json_rpc_call"); - goto out_empty; - } - } while (strncmp(cs->buf, "{", 1)); - - val = json_loads(cs->buf, 0, &err_val); - if (!val) - LOGWARNING("JSON decode failed(%d): %s", err_val.line, err_val.text); -out_empty: - empty_socket(cs->fd); - if (!val) { - /* Assume that a failed request means the socket will be closed - * and reopen it */ - LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); - close(cs->fd); - cs->fd = connect_socket(cs->url, cs->port); - } -out: - free(http_req); - dealloc(cs->buf); - return val; -} - - /* Align a size_t to 4 byte boundaries for fussy arches */ void align_len(size_t *len) { diff --git a/src/libckpool.h b/src/libckpool.h index b5c06a95..0f77ec65 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -313,7 +313,6 @@ void block_socket(int fd); int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); -int read_socket_line(connsock_t *cs, int timeout); void empty_socket(int fd); void close_unix_socket(const int sockd, const char *server_path); int _open_unix_server(const char *server_path, const char *file, const char *func, const int line); @@ -328,16 +327,10 @@ int wait_write_select(int sockd, int timeout); int write_length(int sockd, const void *buf, int len); bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line); #define send_unix_msg(sockd, buf) _send_unix_msg(sockd, buf, __FILE__, __func__, __LINE__) -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) -char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) const char *__json_array_string(json_t *val, unsigned int entry); char *json_array_string(json_t *val, unsigned int entry); -json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); - void align_len(size_t *len); void realloc_strcat(char **ptr, const char *s); void trail_slash(char **buf);