Browse Source

Simply encode/decode binary keys correctly in messages for now

master
Con Kolivas 9 years ago
parent
commit
debaa03d9d
  1. 11
      src/ckpool.c
  2. 5
      src/ckpool.h
  3. 18
      src/connector.c
  4. 18
      src/libckpool.c
  5. 5
      src/stratifier.c

11
src/ckpool.c

@ -718,7 +718,8 @@ out:
/* Send a single message to a process instance when there will be no response, /* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */ * closing the socket immediately. */
void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file,
const char *func, const int line)
{ {
char *path = pi->us.path; char *path = pi->us.path;
bool ret = false; bool ret = false;
@ -748,7 +749,7 @@ void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, con
LOGWARNING("Failed to open socket %s", path); LOGWARNING("Failed to open socket %s", path);
goto out; goto out;
} }
if (unlikely(!send_unix_msg(sockd, msg))) if (unlikely(!send_unix(sockd, msg, len)))
LOGWARNING("Failed to send %s to socket %s", msg, path); LOGWARNING("Failed to send %s to socket %s", msg, path);
else else
ret = true; ret = true;
@ -761,11 +762,13 @@ out:
/* As per send_proc_data but must be a string */ /* As per send_proc_data but must be a string */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{ {
if (unlikely(!msg || !strlen(msg))) { uint32_t len;
if (unlikely(!msg || !(len = strlen(msg)))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname); LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return; return;
} }
return _send_proc_data(pi, msg, file, func, line); return _send_proc_data(pi, msg, len, file, func, line);
} }
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then

5
src/ckpool.h

@ -326,8 +326,9 @@ void empty_buffer(connsock_t *cs);
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len);
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc_data(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc_data(proc_instance_t *pi, const char *msg, uint32_t len, const char *file,
#define send_proc_data(pi, msg) _send_proc_data(pi, msg, __FILE__, __func__, __LINE__) const char *func, const int line);
#define send_proc_data(pi, msg, len) _send_proc_data(pi, msg, len, __FILE__, __func__, __LINE__)
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,

18
src/connector.c

@ -1199,7 +1199,7 @@ out:
return ret; return ret;
} }
static void process_client_msg(cdata_t *cdata, char *buf) static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
{ {
char *msg, *bkey = NULL; char *msg, *bkey = NULL;
int64_t client_id; int64_t client_id;
@ -1207,18 +1207,20 @@ static void process_client_msg(cdata_t *cdata, char *buf)
int len; int len;
len = strlen(buf); len = strlen(buf);
if (len > 4 && !strncmp((buf + len - 4 - 1), "bkey", 4)) { if (likely(len > 4)) {
LOGWARNING("Bkey found in process_client_msg"); bkey = strstr(buf + len - 4 - 1, "bkey");
buf[len - 4 - 1] = '\0'; if (bkey)
bkey = buf + len + 1; LOGDEBUG("Bkey found in process_client_msg");
} }
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }
if (unlikely(bkey)) if (unlikely(bkey)) {
LOGWARNING("Bkey should be at %d, msglen %u", len + 1, msglen);
json_append_bkeys(json_msg, bkey); json_append_bkeys(json_msg, bkey);
}
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
@ -1342,7 +1344,7 @@ retry:
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf); process_client_msg(cdata, buf, umsg->msglen);
} else if (cmdmatch(buf, "upstream=")) { } else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); char *msg = strdup(buf + 9);

18
src/libckpool.c

@ -1331,19 +1331,22 @@ void _bkey_add_hex(char **bkey, const char *key, const char *hex, const char *fi
msglen = le32toh(*lenptr); msglen = le32toh(*lenptr);
/* Add $key+length+bin */ /* Add $key+length+bin */
newlen = len + msglen + hlen; newlen = msglen + len + BKEY_LENLEN + hlen;
*bkey = realloc(*bkey, round_up_page(newlen)); *bkey = realloc(*bkey, round_up_page(newlen));
/* Append keyname */ /* Append keyname */
LOGDEBUG("Adding key %s @ ofs %u", key, msglen);
sprintf(*bkey + msglen, "%s", key); sprintf(*bkey + msglen, "%s", key);
msglen += len; msglen += len;
/* Append bin length */ /* Append bin length */
LOGDEBUG("Adding len %u @ ofs %u", hlen, msglen);
lenptr = (uint32_t *)(*bkey + msglen); lenptr = (uint32_t *)(*bkey + msglen);
*lenptr = htole32(hlen); *lenptr = htole32(hlen);
msglen += BKEY_LENLEN; msglen += BKEY_LENLEN;
/* Append binary data */ /* Append binary data */
LOGDEBUG("Adding hex of len %u @ ofs %u %s", hlen, msglen, hex);
hex2bin(*bkey + msglen, hex, hlen); hex2bin(*bkey + msglen, hex, hlen);
/* Adjust message length header */ /* Adjust message length header */
@ -1380,19 +1383,22 @@ void _bkey_add_bin(char **bkey, const char *key, const char *bin, const int blen
msglen = le32toh(*lenptr); msglen = le32toh(*lenptr);
/* Add $key+length+bin */ /* Add $key+length+bin */
newlen = len + 4 + blen; newlen = msglen + len + BKEY_LENLEN + blen;
*bkey = realloc(*bkey, round_up_page(newlen)); *bkey = realloc(*bkey, round_up_page(newlen));
/* Append keyname */ /* Append keyname */
LOGDEBUG("Adding key %s @ ofs %u", key, msglen);
sprintf(*bkey + msglen, "%s", key); sprintf(*bkey + msglen, "%s", key);
msglen += len; msglen += len;
/* Append bin length */ /* Append bin length */
LOGDEBUG("Adding len %u @ ofs %u", blen, msglen);
lenptr = (uint32_t *)(*bkey + msglen); lenptr = (uint32_t *)(*bkey + msglen);
*lenptr = htole32(blen); *lenptr = htole32(blen);
msglen += 4; msglen += BKEY_LENLEN;
/* Append binary data */ /* Append binary data */
LOGDEBUG("Adding bin of len %u @ ofs %u", blen, msglen);
memcpy(*bkey + msglen, bin, blen); memcpy(*bkey + msglen, bin, blen);
/* Adjust message length header */ /* Adjust message length header */
@ -1417,14 +1423,14 @@ void _json_append_bkeys(json_t *val, const char *bkey, const char *file, const c
char *hex; char *hex;
key = bkey + ofs; key = bkey + ofs;
LOGWARNING("Found key %s", key); LOGDEBUG("Found key %s @ ofs %u", key, ofs);
ofs += strlen(key) + 1; ofs += strlen(key) + 1;
lenptr = (uint32_t *)(bkey + ofs); lenptr = (uint32_t *)(bkey + ofs);
binlen = le32toh(*lenptr); binlen = le32toh(*lenptr);
LOGDEBUG("Found binlen %u @ ofs %u", binlen, ofs);
ofs += BKEY_LENLEN; ofs += BKEY_LENLEN;
LOGWARNING("Found binlen %u", binlen);
hex = bin2hex(bkey + ofs, binlen); hex = bin2hex(bkey + ofs, binlen);
LOGWARNING("Found hex %s", hex); LOGDEBUG("Found hex %s @ ofs %u", hex, ofs);
json_set_string(val, key, hex); json_set_string(val, key, hex);
free(hex); free(hex);
ofs += binlen; ofs += binlen;

5
src/stratifier.c

@ -6261,13 +6261,14 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */ * connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, JSON_COMPACT); s = json_dumps(msg->json_msg, JSON_COMPACT);
if (msg->bkeylen) { if (unlikely(msg->bkeylen)) {
int len = strlen(s); int len = strlen(s);
s = realloc(s, len + msg->bkeylen); s = realloc(s, len + msg->bkeylen);
memcpy(s + len, msg->bkey, msg->bkeylen); memcpy(s + len, msg->bkey, msg->bkeylen);
free(msg->bkey); free(msg->bkey);
} send_proc_data(ckp->connector, s, len + msg->bkeylen);
} else
send_proc(ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
free_smsg(msg); free_smsg(msg);

Loading…
Cancel
Save