From 4805bb410c0c27dd22fcdd88747b21058f5096ca Mon Sep 17 00:00:00 2001 From: ckolivas Date: Thu, 28 Jan 2016 13:09:21 +1100 Subject: [PATCH] Append bkeys in passthrough_recv messages --- src/connector.c | 10 ++-------- src/generator.c | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/connector.c b/src/connector.c index 422e2b68..d444a188 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1253,8 +1253,8 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) uint32_t slen; slen = strlen(buf); - if (likely(slen > 4)) { - bkey = strstr(buf + slen - 4 - 1, "bkey"); + if (likely(slen > 5)) { + bkey = strstr(buf + slen - 5, "bkey"); if (bkey) LOGDEBUG("Bkey found in process_client_msg"); } @@ -1263,12 +1263,6 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen) LOGWARNING("Invalid json message in process_client_msg: %s", buf); return; } -#if 0 - if (unlikely(bkey)) - msglen = msglen - (bkey - buf); - else - msglen = len; -#endif /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); diff --git a/src/generator.c b/src/generator.c index 35d3e11e..bfe15abb 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1922,6 +1922,33 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } +static void forward_passthrough_msg(ckpool_t *ckp, char *buf, int len) +{ + int slen = strlen(buf), blen = len - slen; + char *bkey = NULL; + + if (unlikely(blen > 0)) + bkey = strstr(buf + slen - 5, "bkey"); + if (bkey) { + json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL); + + if (unlikely(!val)) { + LOGWARNING("No json in bkey appended message %s", buf); + goto out; + } + json_append_bkeys(val, bkey, blen); + buf = json_dumps(val, JSON_COMPACT); + json_decref(val); + LOGDEBUG("Passthrough recv received upstream bkey msg: %s", buf); + send_proc(ckp->connector, buf); + free(buf); + return; + } +out: + LOGDEBUG("Passthrough recv received upstream msg: %s", buf); + send_proc(ckp->connector, buf); +} + /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -1956,10 +1983,9 @@ static void *passthrough_recv(void *arg) /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - if (likely(ret > 0)) { - LOGDEBUG("Passthrough recv received upstream msg: %s", cs->buf); - send_proc(ckp->connector, cs->buf); - } else if (ret < 0) { + if (likely(ret > 0)) + forward_passthrough_msg(ckp, cs->buf, ret); + else if (ret < 0) { /* Read failure */ LOGWARNING("Passthrough %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", proxi->id, proxi->url);