diff --git a/configure.ac b/configure.ac index 65d311c0..32de3b12 100644 --- a/configure.ac +++ b/configure.ac @@ -38,6 +38,7 @@ AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) +AC_CHECK_HEADERS(bzlib.h) PTHREAD_LIBS="-lpthread" MATH_LIBS="-lm" @@ -59,13 +60,18 @@ AC_ARG_WITH([ckdb], if test "x$ckdb" != "xno"; then AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev not found. Install it or disable postgresql support with --without-ckdb" && exit 1) + AC_CHECK_LIB([bz2], [main],[BZ2=-lbz],echo "Error: Required library libbz + not found. Install it or disable libbz2 support with --without-ckdb" && exit 1) AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required]) PQ_LIBS="-lpq" + BZ2_LIBS="-lbz2" else PQ_LIBS="" + BZ2_LIBS="" fi AM_CONDITIONAL([WANT_CKDB], [test "x$ckdb" != "xno"]) AC_SUBST(PQ_LIBS) +AC_SUBST(BZ2_LIBS) AC_OUTPUT([Makefile] [src/Makefile]) @@ -75,7 +81,7 @@ echo " CPPFLAGS.............: $CPPFLAGS" echo " CFLAGS...............: $CFLAGS" echo " LDFLAGS..............: $LDFLAGS" echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" -echo " db LDADD.............: $PQ_LIBS" +echo " db LDADD.............: $PQ_LIBS $BZ2_LIBS" echo echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo " prefix...............: $prefix" diff --git a/src/Makefile.am b/src/Makefile.am index 5e032e62..0dd0d50c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,5 +23,5 @@ if WANT_CKDB bin_PROGRAMS += ckdb ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb.h klist.c ktree.c klist.h ktree.h -ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @PQ_LIBS@ +ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @PQ_LIBS@ @BZ2_LIBS@ endif diff --git a/src/ckdb.c b/src/ckdb.c index 9a10cb35..663cf23d 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2646,6 +2646,117 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) #define MAX_READ (10 * 1024 * 1024) static char *reload_buf; +#define BZNAME ".bz2" + +static bool logline(char *buf, int siz, FILE *fp, BZFILE *bzf, char *filename) +{ + static char *bzbuf = NULL; + static int bzpos, thisred; + static size_t bzlen; + char *ret, *ptr; + int bzerror, use; + bool repeat; + + if (!bzf) { + ret = fgets_unlocked(buf, siz, fp); + if (!ret && ferror(fp)) { + int err = errno; + quithere(1, "Read failed on %s (%d) '%s'", + filename, err, strerror(err)); + } + if (ret) + return true; + else + return false; + } + + if (!bzbuf) { + bzpos = 0; + bzlen = MAX_READ*2 + 1; + bzbuf = malloc(bzlen); + bzbuf[0] = '\0'; + } + + repeat = true; + // Read at least MAX_READ or to EOF + while (repeat && bzpos < MAX_READ) { + thisred = BZ2_bzRead(&bzerror, bzf, &(bzbuf[bzpos]), MAX_READ*2 - bzpos); + if (bzerror != BZ_OK && bzerror != BZ_STREAM_END) { + quithere(1, "Read failed on %s"BZNAME" (%d)", + filename, bzerror); + } + bzpos += thisred; + bzbuf[bzpos] = '\0'; + if (bzerror == BZ_STREAM_END || thisred == 0) + repeat = false; + } + /* Either EOF or a read returned 0 + * with no buffered data left in both cases */ + if (bzpos == 0) + return false; + + ptr = bzbuf; + while (*ptr && *ptr != '\n' && *ptr != '\r') + ptr++; + /* This will lose blank lines except for the first line + * - which is OK anyway since they are an error */ + while (*ptr == '\n' || *ptr == '\r') + ptr++; + use = ptr - bzbuf; + + // Return the 'line' (up to siz-1) + if (use > (siz - 1)) + use = siz - 1; + memcpy(buf, bzbuf, use); + buf[use] = '\0'; + + // Buffer remainder + if (bzpos > use) { + bzpos -= use; + memmove(bzbuf, bzbuf+use, bzpos); + bzbuf[bzpos] = '\0'; + } else { + bzpos = 0; + bzbuf[0] = '\0'; + } + return true; +} + +static bool logopen(char **filename, FILE **fp, BZFILE **bzf) +{ + char *bzname; + size_t len; + int bzerror; + + *fp = *bzf = NULL; + *fp = fopen(*filename, "re"); + if (*fp) + return true; + + len = strlen(*filename) + sizeof(BZNAME); + bzname = malloc(len + 1); + if (!bzname) + quithere(1, "(%d) OOM", (int)len); + strcpy(bzname, *filename); + strcat(bzname, BZNAME); + + *fp = fopen(bzname, "re"); + if (!(*fp)) { + free(bzname); + return false; + } + + /* An error is fatal, since the file exists, it must be valid + * Skipping files can cause major problems */ + *bzf = BZ2_bzReadOpen(&bzerror, *fp, 0, 0, NULL, 0); + if (bzerror != BZ_OK) + quithere(1, "BZ2_bzReadOpen() failed for '%s' (%d)", bzname, bzerror); + + free(*filename); + *filename = bzname; + return true; +} + /* If the reload start file is missing and -r was specified correctly: * touch the filename reported in "Failed to open 'filename'", * if ckdb aborts at the beginning of the reload, then start again */ @@ -2657,11 +2768,12 @@ static bool reload_from(tv_t *start) char *missingfirst = NULL, *missinglast = NULL; int missing_count; int processing; - bool finished = false, matched = false, ret = true; + bool finished = false, matched = false, ret = true, ok; char *filename = NULL; uint64_t count, total; tv_t now; FILE *fp = NULL; + BZFILE *bzf = NULL; reload_buf = malloc(MAX_READ); if (!reload_buf) @@ -2677,9 +2789,8 @@ static bool reload_from(tv_t *start) LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - fp = fopen(filename, "re"); - if (!fp) - quithere(1, "Failed to open '%s'", filename); + if (!logopen(&filename, &fp, &bzf)) + quithere(1, "Failed to open '%s' (or "BZNAME")", filename); setnow(&now); tvs_to_buf(&now, run, sizeof(run)); @@ -2695,14 +2806,9 @@ static bool reload_from(tv_t *start) processing++; count = 0; - while (!everyone_die && !matched && fgets_unlocked(reload_buf, MAX_READ, fp)) - matched = reload_line(conn, filename, ++count, reload_buf); - - if (ferror(fp)) { - int err = errno; - quithere(1, "Read failed on %s (%d) '%s'", - filename, err, strerror(err)); - } + while (!everyone_die && !matched && + logline(reload_buf, MAX_READ, fp, bzf, filename)) + matched = reload_line(conn, filename, ++count, reload_buf); LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", __func__, @@ -2720,8 +2826,8 @@ static bool reload_from(tv_t *start) break; } filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - fp = fopen(filename, "re"); - if (!fp) { + ok = logopen(&filename, &fp, &bzf); + if (!ok) { missingfirst = strdup(filename); FREENULL(filename); errno = 0; @@ -2740,8 +2846,8 @@ static bool reload_from(tv_t *start) break; } filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - fp = fopen(filename, "re"); - if (fp) + ok = logopen(&filename, &fp, &bzf); + if (ok) break; errno = 0; if (missing_count++ > 1) diff --git a/src/ckdb.h b/src/ckdb.h index 20bd1c2c..1826e0e6 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -36,6 +36,7 @@ #elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H) #include #endif +#include #include "ckpool.h" #include "libckpool.h" @@ -52,7 +53,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.710" +#define CKDB_VERSION DB_VERSION"-0.720" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__