From 7e5a0620afad59d99f6b0a96af2db84a2fba98ad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 5 Feb 2015 13:31:45 +1100 Subject: [PATCH] Cache responses in proxy mode in case they come out of order to be able to successfully managed different subscribe variations --- src/generator.c | 115 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 22 deletions(-) diff --git a/src/generator.c b/src/generator.c index 6fa53194..a18a5cc4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -117,6 +117,8 @@ struct proxy_instance { int64_t share_id; ckmsgq_t *passsends; // passthrough sends + + char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ }; typedef struct proxy_instance proxy_instance_t; @@ -490,40 +492,99 @@ static json_t *find_notify(json_t *val) return ret; } +/* Get stored line in the proxy linked list of messages if any exist or NULL */ +static char *cached_proxy_line(proxy_instance_t *proxi) +{ + char *buf = NULL; + + if (proxi->recvd_lines) { + char_entry_t *char_t = proxi->recvd_lines; + + DL_DELETE(proxi->recvd_lines, char_t); + buf = char_t->buf; + free(char_t); + } + return buf; +} + +/* Get next line in the proxy linked list of messages or a new line from the + * connsock if there are none. */ +static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi) +{ + char *buf = cached_proxy_line(proxi); + + if (!buf && read_socket_line(cs, 5) > 0) + buf = strdup(cs->buf); + return buf; +} + +/* For appending a line to the proxy recv list */ +static void append_proxy_line(proxy_instance_t *proxi, const char *buf) +{ + char_entry_t *char_t = ckalloc(sizeof(char_entry_t)); + char_t->buf = strdup(buf); + DL_APPEND(proxi->recvd_lines, char_t); +} + +/* Get a new line from the connsock and return a copy of it */ +static char *new_proxy_line(connsock_t *cs) +{ + char *buf = NULL; + + if (read_socket_line(cs, 5) < 1) + goto out; + buf = strdup(cs->buf); +out: + return buf; +} + static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *notify_val, *tmp; + bool parsed, ret = false; + int retries = 0, size; const char *string; - bool ret = false; - char *old; - int size; + char *buf, *old; - size = read_socket_line(cs, 5); - if (size < 1) { +retry: + parsed = true; + if (!(buf = new_proxy_line(cs))) { LOGWARNING("Failed to receive line in parse_subscribe"); goto out; } - LOGDEBUG("parse_subscribe received %s", cs->buf); + LOGDEBUG("parse_subscribe received %s", buf); /* Ignore err_val here stored in &tmp */ - val = json_msg_result(cs->buf, &res_val, &tmp); + val = json_msg_result(buf, &res_val, &tmp); if (!val || !res_val) { - LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf); - goto out; + LOGINFO("Failed to get a json result in parse_subscribe, got: %s", buf); + parsed = false; } if (!json_is_array(res_val)) { - LOGWARNING("Result in parse_subscribe not an array"); - goto out; + LOGINFO("Result in parse_subscribe not an array"); + parsed = false; } size = json_array_size(res_val); if (size < 3) { - LOGWARNING("Result in parse_subscribe array too small"); - goto out; + LOGINFO("Result in parse_subscribe array too small"); + parsed = false; } notify_val = find_notify(res_val); if (!notify_val) { - LOGWARNING("Failed to find notify in parse_subscribe"); + LOGINFO("Failed to find notify in parse_subscribe"); + parsed = false; + } + if (!parsed) { + if (++retries < 3) { + /* We don't want this response so put it on the proxy + * recvd list to be parsed later */ + append_proxy_line(proxi, buf); + buf = NULL; + goto retry; + } + LOGWARNING("Failed to parse subscribe response in parse_subscribe"); goto out; } + /* Free up old data in place if we are re-subscribing */ old = proxi->sessionid; proxi->sessionid = NULL; @@ -577,6 +638,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) out: if (val) json_decref(val); + free(buf); return ret; } @@ -962,6 +1024,7 @@ out: static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *req, *err_val; + char *buf = NULL; bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", @@ -979,25 +1042,24 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) /* Read and parse any extra methods sent. Anything left in the buffer * should be the response to our auth request. */ do { - int size; - - size = read_socket_line(cs, 5); - if (size < 1) { + free(buf); + buf = next_proxy_line(cs, proxi); + if (!buf) { LOGWARNING("Failed to receive line in auth_stratum"); ret = false; goto out; } - ret = parse_method(proxi, cs->buf); + ret = parse_method(proxi, buf); } while (ret); - val = json_msg_result(cs->buf, &res_val, &err_val); + val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Failed to get a json result in auth_stratum, got: %s", cs->buf); + LOGWARNING("Failed to get a json result in auth_stratum, got: %s", buf); goto out; } if (err_val && !json_is_null(err_val)) { - LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", cs->buf); + LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", buf); goto out; } if (res_val) { @@ -1014,6 +1076,15 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) out: if (val) json_decref(val); + if (ret) { + /* Now parse any cached responses so there are none in the + * queue and they can be managed one at a time from now on. */ + do { + dealloc(buf); + buf = cached_proxy_line(proxi); + parse_method(proxi, buf); + } while (buf); + } return ret; }