Browse Source

Decode bkeys for clients that don't support them, otherwise forward on entire message

master
ckolivas 9 years ago
parent
commit
d91295cf82
  1. 50
      src/connector.c
  2. 2
      src/stratifier.c

50
src/connector.c

@ -911,27 +911,25 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, u
{ {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
sender_send_t *sender_send; sender_send_t *sender_send;
uint32_t blen = len - slen;
client_instance_t *client; client_instance_t *client;
char *bkey = NULL; char *bkey = NULL;
json_t *val; json_t *val;
if (unlikely(blen > 0))
bkey = strstr(buf + slen - 4 - 1, "bkey");
if (unlikely(ckp->node && !id)) { if (unlikely(ckp->node && !id)) {
uint32_t blen = slen - len; if (bkey) {
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (blen > 0) { if (unlikely(!val)) {
bkey = strstr(buf + slen - 4 - 1, "bkey"); LOGWARNING("No json in bkey appended message %s", buf);
if (likely(bkey)) {
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGWARNING("No json in bkey appended message %s", buf);
free(buf);
return;
}
json_append_bkeys(val, bkey, blen);
free(buf); free(buf);
buf = json_dumps(val, JSON_EOL | JSON_COMPACT); return;
json_decref(val);
} }
json_append_bkeys(val, bkey, blen);
free(buf);
buf = json_dumps(val, JSON_COMPACT);
json_decref(val);
} }
LOGDEBUG("Message for node: %s", buf); LOGDEBUG("Message for node: %s", buf);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
@ -972,12 +970,17 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, u
if (ckp->node) { if (ckp->node) {
char *msg; char *msg;
val = json_loads(buf, 0, NULL); val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (!val) // Can happen if client sent invalid json message if (!val) {
// Can happen if client sent invalid json message
len = slen;
goto out; goto out;
}
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server)); json_object_set_new_nocheck(val, "server", json_integer(client->server));
if (bkey)
json_append_bkeys(val, bkey, blen);
msg = json_dumps(val, JSON_COMPACT); msg = json_dumps(val, JSON_COMPACT);
json_decref(val); json_decref(val);
send_proc(ckp->stratifier, msg); send_proc(ckp->stratifier, msg);
@ -986,6 +989,21 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf, int slen, u
if (ckp->redirector && !client->redirected) if (ckp->redirector && !client->redirected)
test_redirector_shares(ckp, client, buf); test_redirector_shares(ckp, client, buf);
} }
/* Append bkeys as regular json for clients that can't decode them */
if (unlikely(bkey && !client->bkey)) {
val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGERR("Failed to decode json in bkey encoded message %s", buf);
bkey = '\0';
len = strlen(buf);
goto out;
}
free(buf);
json_append_bkeys(val, bkey, blen);
buf = json_dumps(val, JSON_COMPACT);
json_decref(val);
}
out: out:
sender_send = ckzalloc(sizeof(sender_send_t)); sender_send = ckzalloc(sizeof(sender_send_t));
sender_send->client = client; sender_send->client = client;

2
src/stratifier.c

@ -6190,7 +6190,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
json_t *val; json_t *val;
int server; int server;
val = json_loads(buf, 0, NULL); val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) { if (unlikely(!val)) {
LOGWARNING("Received unrecognised non-json message: %s", buf); LOGWARNING("Received unrecognised non-json message: %s", buf);
goto out; goto out;

Loading…
Cancel
Save