Browse Source

Receive and parse upstreamed block submits on client interface

master
Con Kolivas 9 years ago
parent
commit
2bb5679c36
  1. 63
      src/connector.c

63
src/connector.c

@ -163,6 +163,13 @@ struct connector_data {
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
struct binmsg {
char *buf;
int len;
};
typedef struct binmsg binmsg_t;
/* Increase the reference count of instance */ /* Increase the reference count of instance */
static void __inc_instance_ref(client_instance_t *client) static void __inc_instance_ref(client_instance_t *client)
{ {
@ -480,9 +487,9 @@ static void send_client_msg(cdata_t *cdata, const int64_t id, char *buf)
/* Client is holding a reference count from being on the epoll list */ /* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{ {
int buflen, ret, slen = 0, blen = 0;
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
int buflen, ret; char *msg, *eol, *bkey;
char *msg, *eol;
json_t *val; json_t *val;
retry: retry:
@ -510,9 +517,22 @@ reparse:
eol = memchr(client->buf, '\n', client->bufofs); eol = memchr(client->buf, '\n', client->bufofs);
if (!eol) if (!eol)
goto retry; goto retry;
if (unlikely(client->bufofs > 5 && (bkey = strstr(eol - 5, "bkey\n" )))) {
int len;
/* Do something useful with this message now */ /* We have bkey data. Do we have enough to parse it? */
slen = bkey - client->buf - 1;
len = client->bufofs - slen;
if (len < BKEY_LENOFS + BKEY_LENLEN)
goto retry;
blen = bkey_len(bkey);
if (len < blen)
goto retry;
buflen = slen + blen;
} else
buflen = eol - client->buf + 1; buflen = eol - client->buf + 1;
/* Do something useful with this message now */
if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) { if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) {
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
@ -535,6 +555,10 @@ reparse:
} else { } else {
char *s; char *s;
if (unlikely(blen)) {
bkey = msg + slen + 1;
json_append_bkeys(val, bkey, blen);
}
if (client->passthrough) { if (client->passthrough) {
int64_t passthrough_id; int64_t passthrough_id;
@ -1101,21 +1125,20 @@ out:
return ret; return ret;
} }
static void usend_process(ckpool_t *ckp, char *buf) static void usend_process(ckpool_t *ckp, binmsg_t *binmsg)
{ {
cdata_t *cdata = ckp->data; cdata_t *cdata = ckp->data;
connsock_t *cs = &cdata->upstream_cs; connsock_t *cs = &cdata->upstream_cs;
int len, sent; int sent;
if (unlikely(!buf || !strlen(buf))) { if (unlikely(!binmsg->buf || !strlen(binmsg->buf))) {
LOGERR("Send empty message to usend_process"); LOGERR("Send empty message to usend_process");
goto out; goto out;
} }
LOGDEBUG("Sending upstream msg: %s", buf); LOGDEBUG("Sending upstream msg: %s", binmsg->buf);
len = strlen(buf);
while (42) { while (42) {
sent = write_socket(cs->fd, buf, len); sent = write_socket(cs->fd, binmsg->buf, binmsg->len);
if (sent == len) if (sent == binmsg->len)
break; break;
if (cs->fd > 0) { if (cs->fd > 0) {
LOGWARNING("Upstream pool failed, attempting reconnect while caching messages"); LOGWARNING("Upstream pool failed, attempting reconnect while caching messages");
@ -1126,7 +1149,8 @@ static void usend_process(ckpool_t *ckp, char *buf)
while (!connect_upstream(ckp, cs)); while (!connect_upstream(ckp, cs));
} }
out: out:
free(buf); free(binmsg->buf);
free(binmsg);
} }
static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf) static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf)
@ -1144,10 +1168,11 @@ static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const cha
static void ping_upstream(cdata_t *cdata) static void ping_upstream(cdata_t *cdata)
{ {
char *buf; binmsg_t *binmsg = ckalloc(sizeof(binmsg_t));
ASPRINTF(&buf, "{\"method\":\"ping\"}\n"); ASPRINTF(&binmsg->buf, "{\"method\":\"ping\"}\n");
ckmsgq_add(cdata->upstream_sends, buf); binmsg->len = strlen(binmsg->buf);
ckmsgq_add(cdata->upstream_sends, binmsg);
} }
static json_t *urecv_loads(const char *buf, const int len) static json_t *urecv_loads(const char *buf, const int len)
@ -1423,10 +1448,14 @@ retry:
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf, umsg->msglen); process_client_msg(cdata, buf, umsg->msglen);
} else if (cmdmatch(buf, "upstream=")) { } else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); binmsg_t *binmsg = ckalloc(sizeof(binmsg_t));
binmsg->buf = ckalloc(umsg->msglen);
binmsg->len = umsg->msglen - 9;
memcpy(binmsg->buf, buf + 9, binmsg->len);
LOGDEBUG("Upstreaming %s", msg); LOGDEBUG("Upstreaming %s", umsg->buf);
ckmsgq_add(cdata->upstream_sends, msg); ckmsgq_add(cdata->upstream_sends, binmsg);
goto retry; goto retry;
} else if (cmdmatch(buf, "dropclient")) { } else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client; client_instance_t *client;

Loading…
Cancel
Save