Browse Source

ckdb - use a pipe for reloading compressed files - allow .bz2 .gz .lrz

master
kanoi 10 years ago
parent
commit
a41e994548
  1. 140
      src/ckdb.c
  2. 3
      src/ckdb.h

140
src/ckdb.c

@ -2646,18 +2646,10 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
#define MAX_READ (10 * 1024 * 1024) #define MAX_READ (10 * 1024 * 1024)
static char *reload_buf; static char *reload_buf;
#define BZNAME ".bz2" static bool logline(char *buf, int siz, FILE *fp, char *filename)
static bool logline(char *buf, int siz, FILE *fp, BZFILE *bzf, char *filename)
{ {
static char *bzbuf = NULL; char *ret;
static int bzpos, thisred;
static size_t bzlen;
char *ret, *ptr;
int bzerror, use;
bool repeat;
if (!bzf) {
ret = fgets_unlocked(buf, siz, fp); ret = fgets_unlocked(buf, siz, fp);
if (!ret && ferror(fp)) { if (!ret && ferror(fp)) {
int err = errno; int err = errno;
@ -2670,92 +2662,56 @@ static bool logline(char *buf, int siz, FILE *fp, BZFILE *bzf, char *filename)
return false; return false;
} }
if (!bzbuf) { static struct decomp {
bzpos = 0; char *ext;
bzlen = MAX_READ*2 + 1; char *fmt;
bzbuf = malloc(bzlen); } dec_list[] = {
bzbuf[0] = '\0'; { ".bz2", "bzcat -q '%s'" },
} { ".gz", "zcat -q '%s'" },
{ ".lrz", "lrzip -q -d -o - '%s'" },
repeat = true; { NULL, NULL }
// Read at least MAX_READ or to EOF };
while (repeat && bzpos < MAX_READ) {
thisred = BZ2_bzRead(&bzerror, bzf, &(bzbuf[bzpos]), MAX_READ*2 - bzpos);
if (bzerror != BZ_OK && bzerror != BZ_STREAM_END) {
quithere(1, "Read failed on %s"BZNAME" (%d)",
filename, bzerror);
}
bzpos += thisred;
bzbuf[bzpos] = '\0';
if (bzerror == BZ_STREAM_END || thisred == 0)
repeat = false;
}
/* Either EOF or a read returned 0
* with no buffered data left in both cases */
if (bzpos == 0)
return false;
ptr = bzbuf;
while (*ptr && *ptr != '\n' && *ptr != '\r')
ptr++;
/* This will lose blank lines except for the first line
* - which is OK anyway since they are an error */
while (*ptr == '\n' || *ptr == '\r')
ptr++;
use = ptr - bzbuf;
// Return the 'line' (up to siz-1)
if (use > (siz - 1))
use = siz - 1;
memcpy(buf, bzbuf, use);
buf[use] = '\0';
// Buffer remainder
if (bzpos > use) {
bzpos -= use;
memmove(bzbuf, bzbuf+use, bzpos);
bzbuf[bzpos] = '\0';
} else {
bzpos = 0;
bzbuf[0] = '\0';
}
return true;
}
static bool logopen(char **filename, FILE **fp, BZFILE **bzf) static bool logopen(char **filename, FILE **fp, bool *apipe)
{ {
char *bzname; char buf[1024];
char *name;
size_t len; size_t len;
int bzerror; int i;
*apipe = false;
*fp = *bzf = NULL; *fp = NULL;
*fp = fopen(*filename, "re"); *fp = fopen(*filename, "re");
if (*fp) if (*fp)
return true; return true;
len = strlen(*filename) + sizeof(BZNAME); for (i = 0; dec_list[i].ext; i++) {
bzname = malloc(len + 1); len = strlen(*filename) + strlen(dec_list[i].ext);
if (!bzname) name = malloc(len + 1);
if (!name)
quithere(1, "(%d) OOM", (int)len); quithere(1, "(%d) OOM", (int)len);
strcpy(bzname, *filename); strcpy(name, *filename);
strcat(bzname, BZNAME); strcat(name, dec_list[i].ext);
if (access(name, R_OK))
*fp = fopen(bzname, "re"); free(name);
else {
snprintf(buf, sizeof(buf), dec_list[i].fmt, name);
*fp = popen(buf, "re");
if (!(*fp)) { if (!(*fp)) {
free(bzname); int errn = errno;
return false; quithere(1, "Failed to pipe (%d) \"%s\"",
} errn, buf);
} else {
/* An error is fatal, since the file exists, it must be valid *apipe = true;
* Skipping files can cause major problems */
*bzf = BZ2_bzReadOpen(&bzerror, *fp, 0, 0, NULL, 0);
if (bzerror != BZ_OK)
quithere(1, "BZ2_bzReadOpen() failed for '%s' (%d)", bzname, bzerror);
free(*filename); free(*filename);
*filename = bzname; *filename = name;
return true; return true;
} }
}
}
return false;
}
/* If the reload start file is missing and -r was specified correctly: /* If the reload start file is missing and -r was specified correctly:
* touch the filename reported in "Failed to open 'filename'", * touch the filename reported in "Failed to open 'filename'",
@ -2768,12 +2724,11 @@ static bool reload_from(tv_t *start)
char *missingfirst = NULL, *missinglast = NULL; char *missingfirst = NULL, *missinglast = NULL;
int missing_count; int missing_count;
int processing; int processing;
bool finished = false, matched = false, ret = true, ok; bool finished = false, matched = false, ret = true, ok, apipe = false;
char *filename = NULL; char *filename = NULL;
uint64_t count, total; uint64_t count, total;
tv_t now; tv_t now;
FILE *fp = NULL; FILE *fp = NULL;
BZFILE *bzf = NULL;
reload_buf = malloc(MAX_READ); reload_buf = malloc(MAX_READ);
if (!reload_buf) if (!reload_buf)
@ -2789,8 +2744,8 @@ static bool reload_from(tv_t *start)
LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run); LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run);
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
if (!logopen(&filename, &fp, &bzf)) if (!logopen(&filename, &fp, &apipe))
quithere(1, "Failed to open '%s' (or "BZNAME")", filename); quithere(1, "Failed to open '%s'", filename);
setnow(&now); setnow(&now);
tvs_to_buf(&now, run, sizeof(run)); tvs_to_buf(&now, run, sizeof(run));
@ -2807,7 +2762,7 @@ static bool reload_from(tv_t *start)
count = 0; count = 0;
while (!everyone_die && !matched && while (!everyone_die && !matched &&
logline(reload_buf, MAX_READ, fp, bzf, filename)) logline(reload_buf, MAX_READ, fp, filename))
matched = reload_line(conn, filename, ++count, reload_buf); matched = reload_line(conn, filename, ++count, reload_buf);
LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", LOGWARNING("%s(): %sread %"PRIu64" line%s from %s",
@ -2816,6 +2771,9 @@ static bool reload_from(tv_t *start)
count, count == 1 ? "" : "s", count, count == 1 ? "" : "s",
filename); filename);
total += count; total += count;
if (apipe)
pclose(fp);
else
fclose(fp); fclose(fp);
free(filename); free(filename);
if (everyone_die || matched) if (everyone_die || matched)
@ -2826,7 +2784,7 @@ static bool reload_from(tv_t *start)
break; break;
} }
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
ok = logopen(&filename, &fp, &bzf); ok = logopen(&filename, &fp, &apipe);
if (!ok) { if (!ok) {
missingfirst = strdup(filename); missingfirst = strdup(filename);
FREENULL(filename); FREENULL(filename);
@ -2846,7 +2804,7 @@ static bool reload_from(tv_t *start)
break; break;
} }
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
ok = logopen(&filename, &fp, &bzf); ok = logopen(&filename, &fp, &apipe);
if (ok) if (ok)
break; break;
errno = 0; errno = 0;

3
src/ckdb.h

@ -36,7 +36,6 @@
#elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H) #elif defined (HAVE_POSTGRESQL_LIBPQ_FE_H)
#include <postgresql/libpq-fe.h> #include <postgresql/libpq-fe.h>
#endif #endif
#include <bzlib.h>
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
@ -53,7 +52,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.6" #define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.720" #define CKDB_VERSION DB_VERSION"-0.730"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

Loading…
Cancel
Save