Browse Source

Fix logic fails

master
Con Kolivas 9 years ago
parent
commit
effeba4999
  1. 19
      src/ckpool.c
  2. 25
      src/connector.c

19
src/ckpool.c

@ -556,8 +556,8 @@ static void add_bufline(connsock_t *cs, const char *readbuf, const int len)
static int read_cs_length(connsock_t *cs, float *timeout, int len) static int read_cs_length(connsock_t *cs, float *timeout, int len)
{ {
tv_t start, now; tv_t start, now;
int ret = len;
float diff; float diff;
int ret;
tv_time(&start); tv_time(&start);
@ -568,23 +568,25 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len)
if (*timeout < 0) { if (*timeout < 0) {
LOGDEBUG("Timed out in read_cs_length"); LOGDEBUG("Timed out in read_cs_length");
ret = 0; ret = 0;
break; goto out;
} }
ret = wait_read_select(cs->fd, *timeout); ret = wait_read_select(cs->fd, *timeout);
if (ret < 1) if (ret < 1)
break; goto out;
readlen = len - cs->bufofs; readlen = len - cs->bufofs;
if (readlen >= PAGESIZE) if (readlen >= PAGESIZE)
readlen = PAGESIZE - 4; readlen = PAGESIZE - 4;
ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, readlen, MSG_DONTWAIT);
if (ret < 1) if (ret < 1)
break; goto out;
add_bufline(cs, readbuf, ret); add_bufline(cs, readbuf, ret);
tv_time(&now); tv_time(&now);
diff = tvdiff(&now, &start); diff = tvdiff(&now, &start);
copy_tv(&start, &now); copy_tv(&start, &now);
*timeout -= diff; *timeout -= diff;
} }
ret = len;
out:
return ret; return ret;
} }
@ -741,7 +743,7 @@ out:
if (ret < 0) { if (ret < 0) {
empty_buffer(cs); empty_buffer(cs);
dealloc(cs->buf); dealloc(cs->buf);
} else if (ret == 3 && !strncmp(cs->buf, gzip_magic, 3)) } else if (ret == 3 && !memcmp(cs->buf, gzip_magic, 3))
ret = read_gz_line(cs, timeout); ret = read_gz_line(cs, timeout);
return ret; return ret;
} }
@ -758,7 +760,8 @@ int write_cs(connsock_t *cs, const char *buf, int len)
uint32_t msglen; uint32_t msglen;
int ret; int ret;
/* Connsock doesn't expect gz compressed messages */ /* Connsock doesn't expect gz compressed messages. Only compress if it's
* larger than one MTU. */
if (!cs->gz || len <= 1492) if (!cs->gz || len <= 1492)
return write_socket(cs->fd, buf, len); return write_socket(cs->fd, buf, len);
compsize = round_up_page(len + 12); compsize = round_up_page(len + 12);
@ -770,8 +773,10 @@ int write_cs(connsock_t *cs, const char *buf, int len)
LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); LOGINFO("Failed to gz compress in write_cs, writing uncompressed");
return write_socket(cs->fd, buf, len); return write_socket(cs->fd, buf, len);
} }
if (unlikely(compsize + 12 >= decompsize))
return write_socket(cs->fd, buf, len);
/* Copy gz magic header */ /* Copy gz magic header */
sprintf(dest, gzip_magic); memcpy(dest, gzip_magic, 4);
/* Copy compressed message length */ /* Copy compressed message length */
msglen = htole32(compsize); msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4); memcpy(dest + 4, &msglen, 4);

25
src/connector.c

@ -1008,33 +1008,32 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
/* Does this client accept compressed data? Only compress if it's /* Does this client accept compressed data? Only compress if it's
* larger than one MTU. */ * larger than one MTU. */
if (client->gz) { if (client->gz && len > 1492) {
unsigned long compsize, decompsize = len; unsigned long compsize, decompsize = len;
uint32_t msglen; uint32_t msglen;
char *dest; Bytef *dest;
int ret; int ret;
compsize = round_up_page(len + 12); compsize = round_up_page(len);
dest = ckalloc(compsize); dest = alloca(compsize);
compsize -= 12; ret = compress(dest, &compsize, (Bytef *)buf, len);
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (unlikely(ret != Z_OK)) { if (unlikely(ret != Z_OK)) {
LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret); LOGWARNING("Failed to gz compress in send_client, got %d sending uncompressed", ret);
free(dest);
goto out; goto out;
} }
if (unlikely(compsize + 12 >= decompsize))
goto out;
/* Copy gz magic header */ /* Copy gz magic header */
sprintf(dest, gzip_magic); memcpy(buf, gzip_magic, 4);
/* Copy compressed message length */ /* Copy compressed message length */
msglen = htole32(compsize); msglen = htole32(compsize);
memcpy(dest + 4, &msglen, 4); memcpy(buf + 4, &msglen, 4);
/* Copy decompressed message length */ /* Copy decompressed message length */
msglen = htole32(decompsize); msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4); memcpy(buf + 8, &msglen, 4);
memcpy(buf + 12, dest, compsize);
len = compsize + 12; len = compsize + 12;
LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize); LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize);
free(buf);
buf = dest;
} }
out: out:
sender_send = ckzalloc(sizeof(sender_send_t)); sender_send = ckzalloc(sizeof(sender_send_t));

Loading…
Cancel
Save