diff --git a/src/ckpool.c b/src/ckpool.c index a54c68b4..6b9d1853 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -718,7 +718,8 @@ out: /* Send a single message to a process instance when there will be no response, * closing the socket immediately. */ -void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file, + const char *func, const int line) { char *path = pi->us.path; bool ret = false; @@ -748,7 +749,7 @@ void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, con LOGWARNING("Failed to open socket %s", path); goto out; } - if (unlikely(!send_unix_msg(sockd, msg))) + if (unlikely(!send_unix(sockd, msg, len))) LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; @@ -761,11 +762,13 @@ out: /* As per send_proc_data but must be a string */ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - if (unlikely(!msg || !strlen(msg))) { + uint32_t len; + + if (unlikely(!msg || !(len = strlen(msg)))) { LOGERR("Attempted to send null message to %s in send_proc", pi->processname); return; } - return _send_proc_data(pi, msg, file, func, line); + return _send_proc_data(pi, msg, len, file, func, line); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 100c5cbe..8d0c33dc 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -326,8 +326,9 @@ void empty_buffer(connsock_t *cs); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int read_socket_line(connsock_t *cs, float *timeout); -void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define send_proc_data(pi, msg) _send_proc_data(pi, msg, __FILE__, __func__, __LINE__) +void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file, + const char *func, const int line); +#define send_proc_data(pi, msg, len) _send_proc_data(pi, msg, len, __FILE__, __func__, __LINE__) void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, diff --git a/src/connector.c b/src/connector.c index 1bb09824..46bcf6ff 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1199,7 +1199,7 @@ out: return ret; } -static void process_client_msg(cdata_t *cdata, char *buf) +static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) { char *msg, *bkey = NULL; int64_t client_id; @@ -1207,18 +1207,20 @@ static void process_client_msg(cdata_t *cdata, char *buf) int len; len = strlen(buf); - if (len > 4 && !strncmp((buf + len - 4 - 1), "bkey", 4)) { - LOGWARNING("Bkey found in process_client_msg"); - buf[len - 4 - 1] = '\0'; - bkey = buf + len + 1; + if (likely(len > 4)) { + bkey = strstr(buf + len - 4 - 1, "bkey"); + if (bkey) + LOGDEBUG("Bkey found in process_client_msg"); } - json_msg = json_loads(buf, 0, NULL); + json_msg = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); if (unlikely(!json_msg)) { LOGWARNING("Invalid json message in process_client_msg: %s", buf); return; } - if (unlikely(bkey)) + if (unlikely(bkey)) { + LOGWARNING("Bkey should be at %d, msglen %u", len + 1, msglen); json_append_bkeys(json_msg, bkey); + } /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); @@ -1342,7 +1344,7 @@ retry: /* The bulk of the messages will be json messages to send to clients * so look for them first. */ if (likely(buf[0] == '{')) { - process_client_msg(cdata, buf); + process_client_msg(cdata, buf, umsg->msglen); } else if (cmdmatch(buf, "upstream=")) { char *msg = strdup(buf + 9); diff --git a/src/libckpool.c b/src/libckpool.c index 93261c1a..09561daf 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1331,19 +1331,22 @@ void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *fi msglen = le32toh(*lenptr); /* Add $key+length+bin */ - newlen = len + msglen + hlen; + newlen = msglen + len + BKEY_LENLEN + hlen; *bkey = realloc(*bkey, round_up_page(newlen)); /* Append keyname */ + LOGDEBUG("Adding key %s @ ofs %u", key, msglen); sprintf(*bkey + msglen, "%s", key); msglen += len; /* Append bin length */ + LOGDEBUG("Adding len %u @ ofs %u", hlen, msglen); lenptr = (uint32_t *)(*bkey + msglen); *lenptr = htole32(hlen); msglen += BKEY_LENLEN; /* Append binary data */ + LOGDEBUG("Adding hex of len %u @ ofs %u %s", hlen, msglen, hex); hex2bin(*bkey + msglen, hex, hlen); /* Adjust message length header */ @@ -1380,19 +1383,22 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen msglen = le32toh(*lenptr); /* Add $key+length+bin */ - newlen = len + 4 + blen; + newlen = msglen + len + BKEY_LENLEN + blen; *bkey = realloc(*bkey, round_up_page(newlen)); /* Append keyname */ + LOGDEBUG("Adding key %s @ ofs %u", key, msglen); sprintf(*bkey + msglen, "%s", key); msglen += len; /* Append bin length */ + LOGDEBUG("Adding len %u @ ofs %u", blen, msglen); lenptr = (uint32_t *)(*bkey + msglen); *lenptr = htole32(blen); - msglen += 4; + msglen += BKEY_LENLEN; /* Append binary data */ + LOGDEBUG("Adding bin of len %u @ ofs %u", blen, msglen); memcpy(*bkey + msglen, bin, blen); /* Adjust message length header */ @@ -1417,14 +1423,14 @@ void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const c char *hex; key = bkey + ofs; - LOGWARNING("Found key %s", key); + LOGDEBUG("Found key %s @ ofs %u", key, ofs); ofs += strlen(key) + 1; lenptr = (uint32_t *)(bkey + ofs); binlen = le32toh(*lenptr); + LOGDEBUG("Found binlen %u @ ofs %u", binlen, ofs); ofs += BKEY_LENLEN; - LOGWARNING("Found binlen %u", binlen); hex = bin2hex(bkey + ofs, binlen); - LOGWARNING("Found hex %s", hex); + LOGDEBUG("Found hex %s @ ofs %u", hex, ofs); json_set_string(val, key, hex); free(hex); ofs += binlen; diff --git a/src/stratifier.c b/src/stratifier.c index d39b5342..6581f7c3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -6261,14 +6261,15 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, JSON_COMPACT); - if (msg->bkeylen) { + if (unlikely(msg->bkeylen)) { int len = strlen(s); s = realloc(s, len + msg->bkeylen); memcpy(s + len, msg->bkey, msg->bkeylen); free(msg->bkey); - } - send_proc(ckp->connector, s); + send_proc_data(ckp->connector, s, len + msg->bkeylen); + } else + send_proc(ckp->connector, s); free(s); free_smsg(msg); }