Browse Source

Append bkeys in passthrough_recv messages

master
ckolivas 9 years ago
parent
commit
4805bb410c
  1. 10
      src/connector.c
  2. 34
      src/generator.c

10
src/connector.c

@ -1253,8 +1253,8 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
uint32_t slen; uint32_t slen;
slen = strlen(buf); slen = strlen(buf);
if (likely(slen > 4)) { if (likely(slen > 5)) {
bkey = strstr(buf + slen - 4 - 1, "bkey"); bkey = strstr(buf + slen - 5, "bkey");
if (bkey) if (bkey)
LOGDEBUG("Bkey found in process_client_msg"); LOGDEBUG("Bkey found in process_client_msg");
} }
@ -1263,12 +1263,6 @@ static void process_client_msg(cdata_t *cdata, char *buf, uint32_t msglen)
LOGWARNING("Invalid json message in process_client_msg: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }
#if 0
if (unlikely(bkey))
msglen = msglen - (bkey - buf);
else
msglen = len;
#endif
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));

34
src/generator.c

@ -1922,6 +1922,33 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi); create_pthread(&pth, proxy_reconnect, proxi);
} }
static void forward_passthrough_msg(ckpool_t *ckp, char *buf, int len)
{
int slen = strlen(buf), blen = len - slen;
char *bkey = NULL;
if (unlikely(blen > 0))
bkey = strstr(buf + slen - 5, "bkey");
if (bkey) {
json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
if (unlikely(!val)) {
LOGWARNING("No json in bkey appended message %s", buf);
goto out;
}
json_append_bkeys(val, bkey, blen);
buf = json_dumps(val, JSON_COMPACT);
json_decref(val);
LOGDEBUG("Passthrough recv received upstream bkey msg: %s", buf);
send_proc(ckp->connector, buf);
free(buf);
return;
}
out:
LOGDEBUG("Passthrough recv received upstream msg: %s", buf);
send_proc(ckp->connector, buf);
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -1956,10 +1983,9 @@ static void *passthrough_recv(void *arg)
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
if (likely(ret > 0)) { if (likely(ret > 0))
LOGDEBUG("Passthrough recv received upstream msg: %s", cs->buf); forward_passthrough_msg(ckp, cs->buf, ret);
send_proc(ckp->connector, cs->buf); else if (ret < 0) {
} else if (ret < 0) {
/* Read failure */ /* Read failure */
LOGWARNING("Passthrough %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", LOGWARNING("Passthrough %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect",
proxi->id, proxi->url); proxi->id, proxi->url);

Loading…
Cancel
Save