Browse Source

Cache responses in proxy mode in case they come out of order to be able to successfully managed different subscribe variations

master
Con Kolivas 10 years ago
parent
commit
7e5a0620af
  1. 115
      src/generator.c

115
src/generator.c

@ -117,6 +117,8 @@ struct proxy_instance {
int64_t share_id;
ckmsgq_t *passsends; // passthrough sends
char_entry_t *recvd_lines; /* Linked list of unprocessed messages */
};
typedef struct proxy_instance proxy_instance_t;
@ -490,40 +492,99 @@ static json_t *find_notify(json_t *val)
return ret;
}
/* Get stored line in the proxy linked list of messages if any exist or NULL */
static char *cached_proxy_line(proxy_instance_t *proxi)
{
char *buf = NULL;
if (proxi->recvd_lines) {
char_entry_t *char_t = proxi->recvd_lines;
DL_DELETE(proxi->recvd_lines, char_t);
buf = char_t->buf;
free(char_t);
}
return buf;
}
/* Get next line in the proxy linked list of messages or a new line from the
* connsock if there are none. */
static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi)
{
char *buf = cached_proxy_line(proxi);
if (!buf && read_socket_line(cs, 5) > 0)
buf = strdup(cs->buf);
return buf;
}
/* For appending a line to the proxy recv list */
static void append_proxy_line(proxy_instance_t *proxi, const char *buf)
{
char_entry_t *char_t = ckalloc(sizeof(char_entry_t));
char_t->buf = strdup(buf);
DL_APPEND(proxi->recvd_lines, char_t);
}
/* Get a new line from the connsock and return a copy of it */
static char *new_proxy_line(connsock_t *cs)
{
char *buf = NULL;
if (read_socket_line(cs, 5) < 1)
goto out;
buf = strdup(cs->buf);
out:
return buf;
}
static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
{
json_t *val = NULL, *res_val, *notify_val, *tmp;
bool parsed, ret = false;
int retries = 0, size;
const char *string;
bool ret = false;
char *old;
int size;
char *buf, *old;
size = read_socket_line(cs, 5);
if (size < 1) {
retry:
parsed = true;
if (!(buf = new_proxy_line(cs))) {
LOGWARNING("Failed to receive line in parse_subscribe");
goto out;
}
LOGDEBUG("parse_subscribe received %s", cs->buf);
LOGDEBUG("parse_subscribe received %s", buf);
/* Ignore err_val here stored in &tmp */
val = json_msg_result(cs->buf, &res_val, &tmp);
val = json_msg_result(buf, &res_val, &tmp);
if (!val || !res_val) {
LOGWARNING("Failed to get a json result in parse_subscribe, got: %s", cs->buf);
goto out;
LOGINFO("Failed to get a json result in parse_subscribe, got: %s", buf);
parsed = false;
}
if (!json_is_array(res_val)) {
LOGWARNING("Result in parse_subscribe not an array");
goto out;
LOGINFO("Result in parse_subscribe not an array");
parsed = false;
}
size = json_array_size(res_val);
if (size < 3) {
LOGWARNING("Result in parse_subscribe array too small");
goto out;
LOGINFO("Result in parse_subscribe array too small");
parsed = false;
}
notify_val = find_notify(res_val);
if (!notify_val) {
LOGWARNING("Failed to find notify in parse_subscribe");
LOGINFO("Failed to find notify in parse_subscribe");
parsed = false;
}
if (!parsed) {
if (++retries < 3) {
/* We don't want this response so put it on the proxy
* recvd list to be parsed later */
append_proxy_line(proxi, buf);
buf = NULL;
goto retry;
}
LOGWARNING("Failed to parse subscribe response in parse_subscribe");
goto out;
}
/* Free up old data in place if we are re-subscribing */
old = proxi->sessionid;
proxi->sessionid = NULL;
@ -577,6 +638,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
out:
if (val)
json_decref(val);
free(buf);
return ret;
}
@ -962,6 +1024,7 @@ out:
static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
json_t *val = NULL, *res_val, *req, *err_val;
char *buf = NULL;
bool ret;
JSON_CPACK(req, "{s:i,s:s,s:[s,s]}",
@ -979,25 +1042,24 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
/* Read and parse any extra methods sent. Anything left in the buffer
* should be the response to our auth request. */
do {
int size;
size = read_socket_line(cs, 5);
if (size < 1) {
free(buf);
buf = next_proxy_line(cs, proxi);
if (!buf) {
LOGWARNING("Failed to receive line in auth_stratum");
ret = false;
goto out;
}
ret = parse_method(proxi, cs->buf);
ret = parse_method(proxi, buf);
} while (ret);
val = json_msg_result(cs->buf, &res_val, &err_val);
val = json_msg_result(buf, &res_val, &err_val);
if (!val) {
LOGWARNING("Failed to get a json result in auth_stratum, got: %s", cs->buf);
LOGWARNING("Failed to get a json result in auth_stratum, got: %s", buf);
goto out;
}
if (err_val && !json_is_null(err_val)) {
LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", cs->buf);
LOGWARNING("Failed to authorise in auth_stratum due to err_val, got: %s", buf);
goto out;
}
if (res_val) {
@ -1014,6 +1076,15 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
out:
if (val)
json_decref(val);
if (ret) {
/* Now parse any cached responses so there are none in the
* queue and they can be managed one at a time from now on. */
do {
dealloc(buf);
buf = cached_proxy_line(proxi);
parse_method(proxi, buf);
} while (buf);
}
return ret;
}

Loading…
Cancel
Save