Browse Source

ckdb - add libbz dependency to the ckdb build and allow reload to read both uncompressed and bzip2 compressed log files

master
kanoi 10 years ago
parent
commit
fb6a9874ec
  1. 8
      configure.ac
  2. 2
      src/Makefile.am
  3. 138
      src/ckdb.c
  4. 3
      src/ckdb.h

8
configure.ac

@ -38,6 +38,7 @@ AC_CHECK_HEADERS(alloca.h pthread.h stdio.h math.h signal.h sys/prctl.h)
AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h) AC_CHECK_HEADERS(sys/types.h sys/socket.h sys/stat.h linux/un.h netdb.h)
AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h) AC_CHECK_HEADERS(stdint.h netinet/in.h netinet/tcp.h sys/ioctl.h getopt.h)
AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h) AC_CHECK_HEADERS(sys/epoll.h libpq-fe.h postgresql/libpq-fe.h grp.h)
AC_CHECK_HEADERS(bzlib.h)
PTHREAD_LIBS="-lpthread" PTHREAD_LIBS="-lpthread"
MATH_LIBS="-lm" MATH_LIBS="-lm"
@ -59,13 +60,18 @@ AC_ARG_WITH([ckdb],
if test "x$ckdb" != "xno"; then if test "x$ckdb" != "xno"; then
AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev AC_CHECK_LIB([pq], [main],[PQ=-lpq],echo "Error: Required library libpq-dev
not found. Install it or disable postgresql support with --without-ckdb" && exit 1) not found. Install it or disable postgresql support with --without-ckdb" && exit 1)
AC_CHECK_LIB([bz2], [main],[BZ2=-lbz],echo "Error: Required library libbz
not found. Install it or disable libbz2 support with --without-ckdb" && exit 1)
AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required]) AC_DEFINE([USE_CKDB], [1], [Defined to 1 if ckdb support required])
PQ_LIBS="-lpq" PQ_LIBS="-lpq"
BZ2_LIBS="-lbz2"
else else
PQ_LIBS="" PQ_LIBS=""
BZ2_LIBS=""
fi fi
AM_CONDITIONAL([WANT_CKDB], [test "x$ckdb" != "xno"]) AM_CONDITIONAL([WANT_CKDB], [test "x$ckdb" != "xno"])
AC_SUBST(PQ_LIBS) AC_SUBST(PQ_LIBS)
AC_SUBST(BZ2_LIBS)
AC_OUTPUT([Makefile] [src/Makefile]) AC_OUTPUT([Makefile] [src/Makefile])
@ -75,7 +81,7 @@ echo " CPPFLAGS.............: $CPPFLAGS"
echo " CFLAGS...............: $CFLAGS" echo " CFLAGS...............: $CFLAGS"
echo " LDFLAGS..............: $LDFLAGS" echo " LDFLAGS..............: $LDFLAGS"
echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS" echo " LDADD................: $PTHREAD_LIBS $MATH_LIBS $RT_LIBS $JANSSON_LIBS"
echo " db LDADD.............: $PQ_LIBS" echo " db LDADD.............: $PQ_LIBS $BZ2_LIBS"
echo echo
echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')" echo "Installation...........: make install (as root if needed, with 'su' or 'sudo')"
echo " prefix...............: $prefix" echo " prefix...............: $prefix"

2
src/Makefile.am

@ -23,5 +23,5 @@ if WANT_CKDB
bin_PROGRAMS += ckdb bin_PROGRAMS += ckdb
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \ ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \
ckdb.h klist.c ktree.c klist.h ktree.h ckdb.h klist.c ktree.c klist.h ktree.h
ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @PQ_LIBS@ ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @PQ_LIBS@ @BZ2_LIBS@
endif endif

138
src/ckdb.c

@ -2646,6 +2646,117 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
#define MAX_READ (10 * 1024 * 1024) #define MAX_READ (10 * 1024 * 1024)
static char *reload_buf; static char *reload_buf;
#define BZNAME ".bz2"
static bool logline(char *buf, int siz, FILE *fp, BZFILE *bzf, char *filename)
{
static char *bzbuf = NULL;
static int bzpos, thisred;
static size_t bzlen;
char *ret, *ptr;
int bzerror, use;
bool repeat;
if (!bzf) {
ret = fgets_unlocked(buf, siz, fp);
if (!ret && ferror(fp)) {
int err = errno;
quithere(1, "Read failed on %s (%d) '%s'",
filename, err, strerror(err));
}
if (ret)
return true;
else
return false;
}
if (!bzbuf) {
bzpos = 0;
bzlen = MAX_READ*2 + 1;
bzbuf = malloc(bzlen);
bzbuf[0] = '\0';
}
repeat = true;
// Read at least MAX_READ or to EOF
while (repeat && bzpos < MAX_READ) {
thisred = BZ2_bzRead(&bzerror, bzf, &(bzbuf[bzpos]), MAX_READ*2 - bzpos);
if (bzerror != BZ_OK && bzerror != BZ_STREAM_END) {
quithere(1, "Read failed on %s"BZNAME" (%d)",
filename, bzerror);
}
bzpos += thisred;
bzbuf[bzpos] = '\0';
if (bzerror == BZ_STREAM_END || thisred == 0)
repeat = false;
}
/* Either EOF or a read returned 0
* with no buffered data left in both cases */
if (bzpos == 0)
return false;
ptr = bzbuf;
while (*ptr && *ptr != '\n' && *ptr != '\r')
ptr++;
/* This will lose blank lines except for the first line
* - which is OK anyway since they are an error */
while (*ptr == '\n' || *ptr == '\r')
ptr++;
use = ptr - bzbuf;
// Return the 'line' (up to siz-1)
if (use > (siz - 1))
use = siz - 1;
memcpy(buf, bzbuf, use);
buf[use] = '\0';
// Buffer remainder
if (bzpos > use) {
bzpos -= use;
memmove(bzbuf, bzbuf+use, bzpos);
bzbuf[bzpos] = '\0';
} else {
bzpos = 0;
bzbuf[0] = '\0';
}
return true;
}
static bool logopen(char **filename, FILE **fp, BZFILE **bzf)
{
char *bzname;
size_t len;
int bzerror;
*fp = *bzf = NULL;
*fp = fopen(*filename, "re");
if (*fp)
return true;
len = strlen(*filename) + sizeof(BZNAME);
bzname = malloc(len + 1);
if (!bzname)
quithere(1, "(%d) OOM", (int)len);
strcpy(bzname, *filename);
strcat(bzname, BZNAME);
*fp = fopen(bzname, "re");
if (!(*fp)) {
free(bzname);
return false;
}
/* An error is fatal, since the file exists, it must be valid
* Skipping files can cause major problems */
*bzf = BZ2_bzReadOpen(&bzerror, *fp, 0, 0, NULL, 0);
if (bzerror != BZ_OK)
quithere(1, "BZ2_bzReadOpen() failed for '%s' (%d)", bzname, bzerror);
free(*filename);
*filename = bzname;
return true;
}
/* If the reload start file is missing and -r was specified correctly: /* If the reload start file is missing and -r was specified correctly:
* touch the filename reported in "Failed to open 'filename'", * touch the filename reported in "Failed to open 'filename'",
* if ckdb aborts at the beginning of the reload, then start again */ * if ckdb aborts at the beginning of the reload, then start again */
@ -2657,11 +2768,12 @@ static bool reload_from(tv_t *start)
char *missingfirst = NULL, *missinglast = NULL; char *missingfirst = NULL, *missinglast = NULL;
int missing_count; int missing_count;
int processing; int processing;
bool finished = false, matched = false, ret = true; bool finished = false, matched = false, ret = true, ok;
char *filename = NULL; char *filename = NULL;
uint64_t count, total; uint64_t count, total;
tv_t now; tv_t now;
FILE *fp = NULL; FILE *fp = NULL;
BZFILE *bzf = NULL;
reload_buf = malloc(MAX_READ); reload_buf = malloc(MAX_READ);
if (!reload_buf) if (!reload_buf)
@ -2677,9 +2789,8 @@ static bool reload_from(tv_t *start)
LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run); LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run);
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re"); if (!logopen(&filename, &fp, &bzf))
if (!fp) quithere(1, "Failed to open '%s' (or "BZNAME")", filename);
quithere(1, "Failed to open '%s'", filename);
setnow(&now); setnow(&now);
tvs_to_buf(&now, run, sizeof(run)); tvs_to_buf(&now, run, sizeof(run));
@ -2695,14 +2806,9 @@ static bool reload_from(tv_t *start)
processing++; processing++;
count = 0; count = 0;
while (!everyone_die && !matched && fgets_unlocked(reload_buf, MAX_READ, fp)) while (!everyone_die && !matched &&
matched = reload_line(conn, filename, ++count, reload_buf); logline(reload_buf, MAX_READ, fp, bzf, filename))
matched = reload_line(conn, filename, ++count, reload_buf);
if (ferror(fp)) {
int err = errno;
quithere(1, "Read failed on %s (%d) '%s'",
filename, err, strerror(err));
}
LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", LOGWARNING("%s(): %sread %"PRIu64" line%s from %s",
__func__, __func__,
@ -2720,8 +2826,8 @@ static bool reload_from(tv_t *start)
break; break;
} }
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re"); ok = logopen(&filename, &fp, &bzf);
if (!fp) { if (!ok) {
missingfirst = strdup(filename); missingfirst = strdup(filename);
FREENULL(filename); FREENULL(filename);
errno = 0; errno = 0;
@ -2740,8 +2846,8 @@ static bool reload_from(tv_t *start)
break; break;
} }
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re"); ok = logopen(&filename, &fp, &bzf);
if (fp) if (ok)
break; break;
errno = 0; errno = 0;
if (missing_count++ > 1) if (missing_count++ > 1)

3
src/ckdb.h

@ -36,6 +36,7 @@
#elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H) #elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H)
#include <postgresql/libpq-fe.h> #include <postgresql/libpq-fe.h>
#endif #endif
#include <bzlib.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -52,7 +53,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.6" #define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.710" #define CKDB_VERSION DB_VERSION"-0.720"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

Loading…
Cancel
Save