From a41e994548de182645a8fb71e844ef853afff6f7 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 5 Dec 2014 22:14:43 +1100 Subject: [PATCH] ckdb - use a pipe for reloading compressed files - allow .bz2 .gz .lrz --- src/ckdb.c | 164 ++++++++++++++++++++--------------------------------- src/ckdb.h | 3 +- 2 files changed, 62 insertions(+), 105 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 663cf23d..abc7e4b0 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2646,115 +2646,71 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) #define MAX_READ (10 * 1024 * 1024) static char *reload_buf; -#define BZNAME ".bz2" - -static bool logline(char *buf, int siz, FILE *fp, BZFILE *bzf, char *filename) +static bool logline(char *buf, int siz, FILE *fp, char *filename) { - static char *bzbuf = NULL; - static int bzpos, thisred; - static size_t bzlen; - char *ret, *ptr; - int bzerror, use; - bool repeat; - - if (!bzf) { - ret = fgets_unlocked(buf, siz, fp); - if (!ret && ferror(fp)) { - int err = errno; - quithere(1, "Read failed on %s (%d) '%s'", - filename, err, strerror(err)); - } - if (ret) - return true; - else - return false; - } + char *ret; - if (!bzbuf) { - bzpos = 0; - bzlen = MAX_READ*2 + 1; - bzbuf = malloc(bzlen); - bzbuf[0] = '\0'; + ret = fgets_unlocked(buf, siz, fp); + if (!ret && ferror(fp)) { + int err = errno; + quithere(1, "Read failed on %s (%d) '%s'", + filename, err, strerror(err)); } - - repeat = true; - // Read at least MAX_READ or to EOF - while (repeat && bzpos < MAX_READ) { - thisred = BZ2_bzRead(&bzerror, bzf, &(bzbuf[bzpos]), MAX_READ*2 - bzpos); - if (bzerror != BZ_OK && bzerror != BZ_STREAM_END) { - quithere(1, "Read failed on %s"BZNAME" (%d)", - filename, bzerror); - } - bzpos += thisred; - bzbuf[bzpos] = '\0'; - if (bzerror == BZ_STREAM_END || thisred == 0) - repeat = false; - } - /* Either EOF or a read returned 0 - * with no buffered data left in both cases */ - if (bzpos == 0) + if (ret) + return true; + else return false; - - ptr = bzbuf; - while (*ptr && *ptr != '\n' && *ptr != '\r') - ptr++; - /* This will lose blank lines except for the first line - * - which is OK anyway since they are an error */ - while (*ptr == '\n' || *ptr == '\r') - ptr++; - use = ptr - bzbuf; - - // Return the 'line' (up to siz-1) - if (use > (siz - 1)) - use = siz - 1; - memcpy(buf, bzbuf, use); - buf[use] = '\0'; - - // Buffer remainder - if (bzpos > use) { - bzpos -= use; - memmove(bzbuf, bzbuf+use, bzpos); - bzbuf[bzpos] = '\0'; - } else { - bzpos = 0; - bzbuf[0] = '\0'; - } - return true; } -static bool logopen(char **filename, FILE **fp, BZFILE **bzf) +static struct decomp { + char *ext; + char *fmt; +} dec_list[] = { + { ".bz2", "bzcat -q '%s'" }, + { ".gz", "zcat -q '%s'" }, + { ".lrz", "lrzip -q -d -o - '%s'" }, + { NULL, NULL } +}; + +static bool logopen(char **filename, FILE **fp, bool *apipe) { - char *bzname; + char buf[1024]; + char *name; size_t len; - int bzerror; + int i; + + *apipe = false; - *fp = *bzf = NULL; + *fp = NULL; *fp = fopen(*filename, "re"); if (*fp) return true; - len = strlen(*filename) + sizeof(BZNAME); - bzname = malloc(len + 1); - if (!bzname) - quithere(1, "(%d) OOM", (int)len); - strcpy(bzname, *filename); - strcat(bzname, BZNAME); - - *fp = fopen(bzname, "re"); - if (!(*fp)) { - free(bzname); - return false; + for (i = 0; dec_list[i].ext; i++) { + len = strlen(*filename) + strlen(dec_list[i].ext); + name = malloc(len + 1); + if (!name) + quithere(1, "(%d) OOM", (int)len); + strcpy(name, *filename); + strcat(name, dec_list[i].ext); + if (access(name, R_OK)) + free(name); + else { + snprintf(buf, sizeof(buf), dec_list[i].fmt, name); + *fp = popen(buf, "re"); + if (!(*fp)) { + int errn = errno; + quithere(1, "Failed to pipe (%d) \"%s\"", + errn, buf); + } else { + *apipe = true; + free(*filename); + *filename = name; + return true; + } + } } - - /* An error is fatal, since the file exists, it must be valid - * Skipping files can cause major problems */ - *bzf = BZ2_bzReadOpen(&bzerror, *fp, 0, 0, NULL, 0); - if (bzerror != BZ_OK) - quithere(1, "BZ2_bzReadOpen() failed for '%s' (%d)", bzname, bzerror); - - free(*filename); - *filename = bzname; - return true; + return false; } /* If the reload start file is missing and -r was specified correctly: @@ -2768,12 +2724,11 @@ static bool reload_from(tv_t *start) char *missingfirst = NULL, *missinglast = NULL; int missing_count; int processing; - bool finished = false, matched = false, ret = true, ok; + bool finished = false, matched = false, ret = true, ok, apipe = false; char *filename = NULL; uint64_t count, total; tv_t now; FILE *fp = NULL; - BZFILE *bzf = NULL; reload_buf = malloc(MAX_READ); if (!reload_buf) @@ -2789,8 +2744,8 @@ static bool reload_from(tv_t *start) LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - if (!logopen(&filename, &fp, &bzf)) - quithere(1, "Failed to open '%s' (or "BZNAME")", filename); + if (!logopen(&filename, &fp, &apipe)) + quithere(1, "Failed to open '%s'", filename); setnow(&now); tvs_to_buf(&now, run, sizeof(run)); @@ -2807,7 +2762,7 @@ static bool reload_from(tv_t *start) count = 0; while (!everyone_die && !matched && - logline(reload_buf, MAX_READ, fp, bzf, filename)) + logline(reload_buf, MAX_READ, fp, filename)) matched = reload_line(conn, filename, ++count, reload_buf); LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", @@ -2816,7 +2771,10 @@ static bool reload_from(tv_t *start) count, count == 1 ? "" : "s", filename); total += count; - fclose(fp); + if (apipe) + pclose(fp); + else + fclose(fp); free(filename); if (everyone_die || matched) break; @@ -2826,7 +2784,7 @@ static bool reload_from(tv_t *start) break; } filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - ok = logopen(&filename, &fp, &bzf); + ok = logopen(&filename, &fp, &apipe); if (!ok) { missingfirst = strdup(filename); FREENULL(filename); @@ -2846,7 +2804,7 @@ static bool reload_from(tv_t *start) break; } filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); - ok = logopen(&filename, &fp, &bzf); + ok = logopen(&filename, &fp, &apipe); if (ok) break; errno = 0; diff --git a/src/ckdb.h b/src/ckdb.h index 1826e0e6..0386852d 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -36,7 +36,6 @@ #elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H) #include #endif -#include #include "ckpool.h" #include "libckpool.h" @@ -53,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.720" +#define CKDB_VERSION DB_VERSION"-0.730" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__