From 2bb5679c368ead156afb61f3cd0ee66f9618a4d4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 29 Jan 2016 14:04:41 +1100 Subject: [PATCH] Receive and parse upstreamed block submits on client interface --- src/connector.c | 63 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/src/connector.c b/src/connector.c index be124b2a..6f818c1e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -163,6 +163,13 @@ struct connector_data { typedef struct connector_data cdata_t; +struct binmsg { + char *buf; + int len; +}; + +typedef struct binmsg binmsg_t; + /* Increase the reference count of instance */ static void __inc_instance_ref(client_instance_t *client) { @@ -480,9 +487,9 @@ static void send_client_msg(cdata_t *cdata, const int64_t id, char *buf) /* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { + int buflen, ret, slen = 0, blen = 0; ckpool_t *ckp = cdata->ckp; - int buflen, ret; - char *msg, *eol; + char *msg, *eol, *bkey; json_t *val; retry: @@ -510,9 +517,22 @@ reparse: eol = memchr(client->buf, '\n', client->bufofs); if (!eol) goto retry; + if (unlikely(client->bufofs > 5 && (bkey = strstr(eol - 5, "bkey\n" )))) { + int len; + + /* We have bkey data. Do we have enough to parse it? */ + slen = bkey - client->buf - 1; + len = client->bufofs - slen; + if (len < BKEY_LENOFS + BKEY_LENLEN) + goto retry; + blen = bkey_len(bkey); + if (len < blen) + goto retry; + buflen = slen + blen; + } else + buflen = eol - client->buf + 1; /* Do something useful with this message now */ - buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); @@ -535,6 +555,10 @@ reparse: } else { char *s; + if (unlikely(blen)) { + bkey = msg + slen + 1; + json_append_bkeys(val, bkey, blen); + } if (client->passthrough) { int64_t passthrough_id; @@ -1101,21 +1125,20 @@ out: return ret; } -static void usend_process(ckpool_t *ckp, char *buf) +static void usend_process(ckpool_t *ckp, binmsg_t *binmsg) { cdata_t *cdata = ckp->data; connsock_t *cs = &cdata->upstream_cs; - int len, sent; + int sent; - if (unlikely(!buf || !strlen(buf))) { + if (unlikely(!binmsg->buf || !strlen(binmsg->buf))) { LOGERR("Send empty message to usend_process"); goto out; } - LOGDEBUG("Sending upstream msg: %s", buf); - len = strlen(buf); + LOGDEBUG("Sending upstream msg: %s", binmsg->buf); while (42) { - sent = write_socket(cs->fd, buf, len); - if (sent == len) + sent = write_socket(cs->fd, binmsg->buf, binmsg->len); + if (sent == binmsg->len) break; if (cs->fd > 0) { LOGWARNING("Upstream pool failed, attempting reconnect while caching messages"); @@ -1126,7 +1149,8 @@ static void usend_process(ckpool_t *ckp, char *buf) while (!connect_upstream(ckp, cs)); } out: - free(buf); + free(binmsg->buf); + free(binmsg); } static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf) @@ -1144,10 +1168,11 @@ static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const cha static void ping_upstream(cdata_t *cdata) { - char *buf; + binmsg_t *binmsg = ckalloc(sizeof(binmsg_t)); - ASPRINTF(&buf, "{\"method\":\"ping\"}\n"); - ckmsgq_add(cdata->upstream_sends, buf); + ASPRINTF(&binmsg->buf, "{\"method\":\"ping\"}\n"); + binmsg->len = strlen(binmsg->buf); + ckmsgq_add(cdata->upstream_sends, binmsg); } static json_t *urecv_loads(const char *buf, const int len) @@ -1423,10 +1448,14 @@ retry: if (likely(buf[0] == '{')) { process_client_msg(cdata, buf, umsg->msglen); } else if (cmdmatch(buf, "upstream=")) { - char *msg = strdup(buf + 9); + binmsg_t *binmsg = ckalloc(sizeof(binmsg_t)); + + binmsg->buf = ckalloc(umsg->msglen); + binmsg->len = umsg->msglen - 9; + memcpy(binmsg->buf, buf + 9, binmsg->len); - LOGDEBUG("Upstreaming %s", msg); - ckmsgq_add(cdata->upstream_sends, msg); + LOGDEBUG("Upstreaming %s", umsg->buf); + ckmsgq_add(cdata->upstream_sends, binmsg); goto retry; } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client;