Browse Source

More fixes

master
Con Kolivas 9 years ago
parent
commit
d69ed7ffcf
  1. 24
      src/ckpool.c
  2. 8
      src/connector.c
  3. 12
      src/libckpool.c
  4. 2
      src/libckpool.h

24
src/ckpool.c

@ -555,7 +555,7 @@ static int read_cs_length(connsock_t *cs, float *timeout, int len)
int ret = len; int ret = len;
while (cs->buflen < len) { while (cs->buflen < len) {
char readbuf[PAGESIZE] = {}; char readbuf[PAGESIZE];
ret = wait_read_select(cs->fd, *timeout); ret = wait_read_select(cs->fd, *timeout);
if (ret < 1) if (ret < 1)
@ -629,7 +629,7 @@ static int read_gz_line(connsock_t *cs, float *timeout)
buflen = cs->buflen; buflen = cs->buflen;
cs->buf = dest; cs->buf = dest;
dest = NULL; dest = NULL;
ret = cs->buflen = decompsize - 1; ret = cs->buflen = decompsize;
if (buflen) if (buflen)
add_bufline(cs, buf, buflen); add_bufline(cs, buf, buflen);
out: out:
@ -732,23 +732,20 @@ int write_cs(connsock_t *cs, const char *buf, int len)
unsigned long compsize, decompsize = len; unsigned long compsize, decompsize = len;
char *dest = NULL; char *dest = NULL;
uint32_t msglen; uint32_t msglen;
int ret = -1; int ret;
/* Connsock doesn't expect gz compressed messages */ /* Connsock doesn't expect gz compressed messages */
if (!cs->gz || len <= 1492) { if (!cs->gz || len <= 1492)
ret = write_socket(cs->fd, buf, len); return write_socket(cs->fd, buf, len);
goto out; compsize = round_up_page(len + 12);
} dest = alloca(compsize);
dest = ckalloc(len + 12);
/* Do compression here */ /* Do compression here */
compsize = len; compsize -= 12;
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (ret != Z_OK) { if (ret != Z_OK) {
LOGINFO("Failed to gz compress in write_cs, writing uncompressed"); LOGINFO("Failed to gz compress in write_cs, writing uncompressed");
ret = write_socket(cs->fd, buf, len); return write_socket(cs->fd, buf, len);
goto out;
} }
LOGDEBUG("Writing connsock message compressed %lu from %lu", compsize, decompsize);
/* Copy gz magic header */ /* Copy gz magic header */
sprintf(dest, gzip_magic); sprintf(dest, gzip_magic);
/* Copy compressed message length */ /* Copy compressed message length */
@ -758,13 +755,12 @@ int write_cs(connsock_t *cs, const char *buf, int len)
msglen = htole32(decompsize); msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4); memcpy(dest + 8, &msglen, 4);
len = compsize + 12; len = compsize + 12;
LOGDEBUG("Writing connsock message compressed %d from %lu", len, decompsize);
ret = write_socket(cs->fd, dest, len); ret = write_socket(cs->fd, dest, len);
if (ret == len) if (ret == len)
ret = decompsize; ret = decompsize;
else else
ret = -1; ret = -1;
out:
free(dest);
return ret; return ret;
} }

8
src/connector.c

@ -1005,18 +1005,19 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
* larger than one MTU. */ * larger than one MTU. */
if (client->gz) { if (client->gz) {
unsigned long compsize, decompsize = len; unsigned long compsize, decompsize = len;
char *dest = ckalloc(len + 12);
uint32_t msglen; uint32_t msglen;
char *dest;
int ret; int ret;
compsize = len; compsize = round_up_page(len + 12);
dest = ckalloc(compsize);
compsize -= 12;
ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len); ret = compress((Bytef *)dest + 12, &compsize, (Bytef *)buf, len);
if (unlikely(ret != Z_OK)) { if (unlikely(ret != Z_OK)) {
LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret); LOGINFO("Failed to gz compress in send_client, got %d sending uncompressed", ret);
free(dest); free(dest);
goto out; goto out;
} }
LOGDEBUG("Sending client message compressed %lu from %lu", compsize, decompsize);
/* Copy gz magic header */ /* Copy gz magic header */
sprintf(dest, gzip_magic); sprintf(dest, gzip_magic);
/* Copy compressed message length */ /* Copy compressed message length */
@ -1026,6 +1027,7 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
msglen = htole32(decompsize); msglen = htole32(decompsize);
memcpy(dest + 8, &msglen, 4); memcpy(dest + 8, &msglen, 4);
len = compsize + 12; len = compsize + 12;
LOGDEBUG("Sending client message compressed %d from %lu", len, decompsize);
free(buf); free(buf);
buf = dest; buf = dest;
} }

12
src/libckpool.c

@ -1389,6 +1389,18 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line)
return ptr; return ptr;
} }
/* Round up to the nearest page size for efficient malloc */
size_t round_up_page(size_t len)
{
int rem = len % PAGESIZE;
if (rem)
len += PAGESIZE - rem;
return len;
}
/* Adequate size s==len*2 + 1 must be alloced to use this variant */ /* Adequate size s==len*2 + 1 must be alloced to use this variant */
void __bin2hex(void *vs, const void *vp, size_t len) void __bin2hex(void *vs, const void *vp, size_t len)
{ {

2
src/libckpool.h

@ -531,6 +531,8 @@ void trail_slash(char **buf);
void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *_ckalloc(size_t len, const char *file, const char *func, const int line);
void *json_ckalloc(size_t size); void *json_ckalloc(size_t size);
void *_ckzalloc(size_t len, const char *file, const char *func, const int line); void *_ckzalloc(size_t len, const char *file, const char *func, const int line);
size_t round_up_page(size_t len);
extern const int hex2bin_tbl[]; extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len); void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len); void *bin2hex(const void *vp, size_t len);

Loading…
Cancel
Save