Browse Source

ckdb - confirm option to check CCL timestamp

master
kanoi 10 years ago
parent
commit
1737dc5a85
  1. 808
      src/ckdb.c

808
src/ckdb.c

@ -47,7 +47,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.7"
#define CKDB_VERSION DB_VERSION"-0.52"
#define CKDB_VERSION DB_VERSION"-0.53"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -690,16 +690,27 @@ static bool confirm_sharesummary;
* a quicker confirm if required due to not running confirms regularly
* TODO: ... once the code includes flagging confirmed sharesummaries
* Valid options are:
* bNNN - confirm all workinfoid's from the previous db block before NNN (or 0)
* up to the workinfoid of the 1st db block height equal or after NNN
* wNNN - confirm all workinfoid's from NNN up to the last aged sharesummary
* bNNN - confirm all workinfoid's from the previous db block before
* NNN (or 0) up to the workinfoid of the 1st db block height
* equal or after NNN
* wNNN - confirm all workinfoid's from NNN up to the last aged
* sharesummary
* rNNN-MMM - confirm all workinfoid's from NNN to MMM inclusive
* i - just show the dbload information then exit
* cNNN-MMM - check the CCL record timestamps then exit
* i - just show the DB load information then exit
*/
static char *confirm_range;
static int confirm_block;
static int64_t confirm_range_start;
static int64_t confirm_range_finish;
static bool confirm_check_createdate;
static int64_t ccl_mismatch_abs;
static int64_t ccl_mismatch;
static double ccl_mismatch_min;
static double ccl_mismatch_max;
static int64_t ccl_unordered_abs;
static int64_t ccl_unordered;
static double ccl_unordered_most;
// The workinfoid range we are processing
static int64_t confirm_first_workinfoid;
@ -709,6 +720,8 @@ static int64_t confirm_last_workinfoid;
#define WORKINFO_AGE 660
static tv_t confirm_finish;
static tv_t reload_timestamp;
// DB users,workers,auth load is complete
static bool db_auths_complete = false;
// DB load is complete
@ -1386,295 +1399,6 @@ static K_TREE *workerstatus_root;
static K_LIST *workerstatus_free;
static K_STORE *workerstatus_store;
static char logname[512];
static char *dbcode;
#define LOGQUE(_msg) log_queue_message(_msg)
#define LOGFILE(_msg) rotating_log_nolock(_msg)
#define LOGDUP "dup."
// low spec version of rotating_log() - no locking
static bool rotating_log_nolock(char *msg)
{
char *filename;
FILE *fp;
bool ok = false;
filename = rotating_filename(logname, time(NULL));
fp = fopen(filename, "a+e");
if (unlikely(!fp)) {
LOGERR("Failed to fopen %s in rotating_log!", filename);
goto stageleft;
}
fprintf(fp, "%s\n", msg);
fclose(fp);
ok = true;
stageleft:
free(filename);
return ok;
}
static void log_queue_message(char *msg)
{
K_ITEM *lq_item;
K_WLOCK(logqueue_free);
lq_item = k_unlink_head(logqueue_free);
DATA_LOGQUEUE(lq_item)->msg = strdup(msg);
k_add_tail(logqueue_store, lq_item);
K_WUNLOCK(logqueue_free);
}
void logmsg(int loglevel, const char *fmt, ...)
{
int logfd = 0;
char *buf = NULL;
struct tm tm;
time_t now_t;
va_list ap;
char stamp[128];
char *extra = EMPTY;
if (loglevel > global_ckp->loglevel)
return;
now_t = time(NULL);
localtime_r(&now_t, &tm);
snprintf(stamp, sizeof(stamp),
"[%d-%02d-%02d %02d:%02d:%02d]",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
if (!fmt) {
fprintf(stderr, "%s %s() called without fmt\n", stamp, __func__);
return;
}
if (!global_ckp)
extra = " !!NULL global_ckp!!";
else
logfd = global_ckp->logfd;
va_start(ap, fmt);
VASPRINTF(&buf, fmt, ap);
va_end(ap);
if (logfd) {
FILE *LOGFP = global_ckp->logfp;
flock(logfd, LOCK_EX);
fprintf(LOGFP, "%s %s", stamp, buf);
if (loglevel <= LOG_ERR && errno != 0)
fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno));
errno = 0;
fprintf(LOGFP, "\n");
flock(logfd, LOCK_UN);
}
if (loglevel <= LOG_WARNING) {
if (loglevel <= LOG_ERR && errno != 0) {
fprintf(stderr, "%s %s with errno %d: %s%s\n",
stamp, buf, errno, strerror(errno), extra);
errno = 0;
} else
fprintf(stderr, "%s %s%s\n", stamp, buf, extra);
fflush(stderr);
}
free(buf);
}
static void setnow(tv_t *now)
{
ts_t spec;
spec.tv_sec = 0;
spec.tv_nsec = 0;
clock_gettime(CLOCK_REALTIME, &spec);
now->tv_sec = spec.tv_sec;
now->tv_usec = spec.tv_nsec / 1000;
}
#define CKPQ_READ true
#define CKPQ_WRITE false
#define CKPQexec(_conn, _qry, _isread) _CKPQexec(_conn, _qry, _isread, WHERE_FFL_HERE)
// Bug check to ensure no unexpected write txns occur
static PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
{
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
return PQexec(conn, qry);
}
#define CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \
_CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \
_isread, WHERE_FFL_HERE)
static PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
int nParams,
const Oid *paramTypes,
const char *const * paramValues,
const int *paramLengths,
const int *paramFormats,
int resultFormat,
bool isread, WHERE_FFL_ARGS)
{
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths,
paramFormats, resultFormat);
}
// Force use CKPQ... for PQ functions in use
#define PQexec CKPQexec
#define PQexecParams CKPQexecParams
static uint64_t ticks;
static time_t last_tick;
static void tick()
{
time_t now;
char ch;
now = time(NULL);
if (now != last_tick) {
last_tick = now;
ch = status_chars[ticks++ & 0x3];
putchar(ch);
putchar('\r');
fflush(stdout);
}
}
static void dsp_transfer(K_ITEM *item, FILE *stream)
{
TRANSFER *t = NULL;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
t = DATA_TRANSFER(item);
fprintf(stream, " name='%s' data='%s' malloc=%c\n",
t->name, t->data,
(t->value == t->data) ? 'N' : 'Y');
}
}
// order by name asc
static cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b)
{
return CMP_STR(DATA_TRANSFER(a)->name,
DATA_TRANSFER(b)->name);
}
static K_ITEM *find_transfer(K_TREE *trf_root, char *name)
{
TRANSFER transfer;
K_TREE_CTX ctx[1];
K_ITEM look;
STRNCPY(transfer.name, name);
look.data = (void *)(&transfer);
return find_in_ktree(trf_root, &look, cmp_transfer, ctx);
}
static K_ITEM *optional_name(K_TREE *trf_root, char *name, int len, char *patt)
{
K_ITEM *item;
char *value;
regex_t re;
int ret;
item = find_transfer(trf_root, name);
if (!item)
return NULL;
value = DATA_TRANSFER(item)->data;
if (!value || (int)strlen(value) < len)
return NULL;
if (patt) {
if (regcomp(&re, patt, REG_NOSUB) != 0)
return NULL;
ret = regexec(&re, value, (size_t)0, NULL, 0);
regfree(&re);
if (ret != 0)
return NULL;
}
return item;
}
#define require_name(_root, _name, _len, _patt, _reply, _siz) \
_require_name(_root, _name, _len, _patt, _reply, \
_siz, WHERE_FFL_HERE)
static K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt,
char *reply, size_t siz, WHERE_FFL_ARGS)
{
K_ITEM *item;
char *value;
regex_t re;
size_t dlen;
int ret;
item = find_transfer(trf_root, name);
if (!item) {
LOGERR("%s(): failed, field '%s' missing from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.missing %s", name);
return NULL;
}
value = DATA_TRANSFER(item)->data;
if (value)
dlen = strlen(value);
else
dlen = 0;
if (!value || (int)dlen < len) {
LOGERR("%s(): failed, field '%s' short (%s%d<%d) from %s():%d",
__func__, name, value ? EMPTY : "null",
(int)dlen, len, func, line);
snprintf(reply, siz, "failed.short %s", name);
return NULL;
}
if (patt) {
if (regcomp(&re, patt, REG_NOSUB) != 0) {
LOGERR("%s(): failed, field '%s' failed to"
" compile patt from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.REC %s", name);
return NULL;
}
ret = regexec(&re, value, (size_t)0, NULL, 0);
regfree(&re);
if (ret != 0) {
LOGERR("%s(): failed, field '%s' invalid from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.invalid %s", name);
return NULL;
}
}
return item;
}
static void _txt_to_data(enum data_type typ, char *nam, char *fld, void *data, size_t siz, WHERE_FFL_ARGS)
{
char *tmp;
@ -1912,60 +1636,396 @@ static char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz,
break;
}
return buf;
return buf;
}
#define str_to_buf(_data, _buf, _siz) _str_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define bigint_to_buf(_data, _buf, _siz) _bigint_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define int_to_buf(_data, _buf, _siz) _int_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define tv_to_buf(_data, _buf, _siz) _tv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define ctv_to_buf(_data, _buf, _siz) _ctv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define tvs_to_buf(_data, _buf, _siz) _tvs_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
//#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
static char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_STR, (void *)data, buf, siz, WHERE_FFL_PASS);
}
static char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_BIGINT, (void *)(&data), buf, siz, WHERE_FFL_PASS);
}
static char *_int_to_buf(int32_t data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_INT, (void *)(&data), buf, siz, WHERE_FFL_PASS);
}
static char *_tv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_TV, (void *)data, buf, siz, WHERE_FFL_PASS);
}
// Convert tv to S,uS
static char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_CTV, (void *)data, buf, siz, WHERE_FFL_PASS);
}
// Convert tv to seconds (ignore uS)
static char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_TVS, (void *)data, buf, siz, WHERE_FFL_PASS);
}
/* unused yet
static char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_BLOB, (void *)data, buf, siz, WHERE_FFL_PASS);
}
*/
static char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz, WHERE_FFL_PASS);
}
static char logname[512];
static char *dbcode;
// CCLs are every ...
#define ROLL_S 3600
#define LOGQUE(_msg) log_queue_message(_msg)
#define LOGFILE(_msg) rotating_log_nolock(_msg)
#define LOGDUP "dup."
// low spec version of rotating_log() - no locking
static bool rotating_log_nolock(char *msg)
{
char *filename;
FILE *fp;
bool ok = false;
filename = rotating_filename(logname, time(NULL));
fp = fopen(filename, "a+e");
if (unlikely(!fp)) {
LOGERR("Failed to fopen %s in rotating_log!", filename);
goto stageleft;
}
fprintf(fp, "%s\n", msg);
fclose(fp);
ok = true;
stageleft:
free(filename);
return ok;
}
static void log_queue_message(char *msg)
{
K_ITEM *lq_item;
K_WLOCK(logqueue_free);
lq_item = k_unlink_head(logqueue_free);
DATA_LOGQUEUE(lq_item)->msg = strdup(msg);
k_add_tail(logqueue_store, lq_item);
K_WUNLOCK(logqueue_free);
}
void logmsg(int loglevel, const char *fmt, ...)
{
int logfd = 0;
char *buf = NULL;
struct tm tm;
time_t now_t;
va_list ap;
char stamp[128];
char *extra = EMPTY;
if (loglevel > global_ckp->loglevel)
return;
now_t = time(NULL);
localtime_r(&now_t, &tm);
snprintf(stamp, sizeof(stamp),
"[%d-%02d-%02d %02d:%02d:%02d]",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
if (!fmt) {
fprintf(stderr, "%s %s() called without fmt\n", stamp, __func__);
return;
}
if (!global_ckp)
extra = " !!NULL global_ckp!!";
else
logfd = global_ckp->logfd;
va_start(ap, fmt);
VASPRINTF(&buf, fmt, ap);
va_end(ap);
if (logfd) {
FILE *LOGFP = global_ckp->logfp;
flock(logfd, LOCK_EX);
fprintf(LOGFP, "%s %s", stamp, buf);
if (loglevel <= LOG_ERR && errno != 0)
fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno));
errno = 0;
fprintf(LOGFP, "\n");
flock(logfd, LOCK_UN);
}
if (loglevel <= LOG_WARNING) {
if (loglevel <= LOG_ERR && errno != 0) {
fprintf(stderr, "%s %s with errno %d: %s%s\n",
stamp, buf, errno, strerror(errno), extra);
errno = 0;
} else
fprintf(stderr, "%s %s%s\n", stamp, buf, extra);
fflush(stderr);
}
free(buf);
}
static void setnow(tv_t *now)
{
ts_t spec;
spec.tv_sec = 0;
spec.tv_nsec = 0;
clock_gettime(CLOCK_REALTIME, &spec);
now->tv_sec = spec.tv_sec;
now->tv_usec = spec.tv_nsec / 1000;
}
static void check_createdate_ccl(char *cmd, tv_t *cd)
{
static tv_t last_cd;
static char last_cmd[CMD_SIZ+1];
char cd_buf1[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ];
char *filename;
double td;
if (cd->tv_sec < reload_timestamp.tv_sec ||
cd->tv_sec >= (reload_timestamp.tv_sec + ROLL_S)) {
ccl_mismatch_abs++;
td = tvdiff(cd, &reload_timestamp);
if (td < -10 || td > ROLL_S + 10)
ccl_mismatch++;
filename = rotating_filename("", reload_timestamp.tv_sec);
tv_to_buf(cd, cd_buf1, sizeof(cd_buf1));
LOGERR("%s(): CCL contains mismatch data: cmd:%s CCL:%.10s cd:%s",
__func__, cmd, filename, cd_buf1);
free(filename);
if (ccl_mismatch_min > td)
ccl_mismatch_min = td;
if (ccl_mismatch_max < td)
ccl_mismatch_max = td;
}
td = tvdiff(cd, &last_cd);
if (td < 0.0) {
ccl_unordered_abs++;
if (ccl_unordered_most > td)
ccl_unordered_most = td;
}
if (td < -2.5) {
ccl_unordered++;
tv_to_buf(&last_cd, cd_buf1, sizeof(cd_buf1));
tv_to_buf(cd, cd_buf2, sizeof(cd_buf2));
LOGERR("%s(): CCL time unordered: %s<->%s %ld,%ld<->%ld,%ld %s<->%s",
__func__, last_cmd, cmd, last_cd.tv_sec,last_cd.tv_usec,
cd->tv_sec, cd->tv_usec, cd_buf1, cd_buf2);
}
copy_tv(&last_cd, cd);
STRNCPY(last_cmd, cmd);
}
#define str_to_buf(_data, _buf, _siz) _str_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define bigint_to_buf(_data, _buf, _siz) _bigint_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define int_to_buf(_data, _buf, _siz) _int_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define tv_to_buf(_data, _buf, _siz) _tv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define ctv_to_buf(_data, _buf, _siz) _ctv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define tvs_to_buf(_data, _buf, _siz) _tvs_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
//#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define CKPQ_READ true
#define CKPQ_WRITE false
static char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS)
#define CKPQexec(_conn, _qry, _isread) _CKPQexec(_conn, _qry, _isread, WHERE_FFL_HERE)
// Bug check to ensure no unexpected write txns occur
static PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_STR, (void *)data, buf, siz, WHERE_FFL_PASS);
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
return PQexec(conn, qry);
}
static char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS)
#define CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \
_CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \
_isread, WHERE_FFL_HERE)
static PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
int nParams,
const Oid *paramTypes,
const char *const * paramValues,
const int *paramLengths,
const int *paramFormats,
int resultFormat,
bool isread, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_BIGINT, (void *)(&data), buf, siz, WHERE_FFL_PASS);
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths,
paramFormats, resultFormat);
}
static char *_int_to_buf(int32_t data, char *buf, size_t siz, WHERE_FFL_ARGS)
// Force use CKPQ... for PQ functions in use
#define PQexec CKPQexec
#define PQexecParams CKPQexecParams
static uint64_t ticks;
static time_t last_tick;
static void tick()
{
return _data_to_buf(TYPE_INT, (void *)(&data), buf, siz, WHERE_FFL_PASS);
time_t now;
char ch;
now = time(NULL);
if (now != last_tick) {
last_tick = now;
ch = status_chars[ticks++ & 0x3];
putchar(ch);
putchar('\r');
fflush(stdout);
}
}
static char *_tv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
static void dsp_transfer(K_ITEM *item, FILE *stream)
{
return _data_to_buf(TYPE_TV, (void *)data, buf, siz, WHERE_FFL_PASS);
TRANSFER *t = NULL;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
t = DATA_TRANSFER(item);
fprintf(stream, " name='%s' data='%s' malloc=%c\n",
t->name, t->data,
(t->value == t->data) ? 'N' : 'Y');
}
}
// Convert tv to S,uS
static char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
// order by name asc
static cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b)
{
return _data_to_buf(TYPE_CTV, (void *)data, buf, siz, WHERE_FFL_PASS);
return CMP_STR(DATA_TRANSFER(a)->name,
DATA_TRANSFER(b)->name);
}
// Convert tv to seconds (ignore uS)
static char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
static K_ITEM *find_transfer(K_TREE *trf_root, char *name)
{
return _data_to_buf(TYPE_TVS, (void *)data, buf, siz, WHERE_FFL_PASS);
TRANSFER transfer;
K_TREE_CTX ctx[1];
K_ITEM look;
STRNCPY(transfer.name, name);
look.data = (void *)(&transfer);
return find_in_ktree(trf_root, &look, cmp_transfer, ctx);
}
/* unused yet
static char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS)
static K_ITEM *optional_name(K_TREE *trf_root, char *name, int len, char *patt)
{
return _data_to_buf(TYPE_BLOB, (void *)data, buf, siz, WHERE_FFL_PASS);
K_ITEM *item;
char *value;
regex_t re;
int ret;
item = find_transfer(trf_root, name);
if (!item)
return NULL;
value = DATA_TRANSFER(item)->data;
if (!value || (int)strlen(value) < len)
return NULL;
if (patt) {
if (regcomp(&re, patt, REG_NOSUB) != 0)
return NULL;
ret = regexec(&re, value, (size_t)0, NULL, 0);
regfree(&re);
if (ret != 0)
return NULL;
}
return item;
}
*/
static char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS)
#define require_name(_root, _name, _len, _patt, _reply, _siz) \
_require_name(_root, _name, _len, _patt, _reply, \
_siz, WHERE_FFL_HERE)
static K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt,
char *reply, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz, WHERE_FFL_PASS);
K_ITEM *item;
char *value;
regex_t re;
size_t dlen;
int ret;
item = find_transfer(trf_root, name);
if (!item) {
LOGERR("%s(): failed, field '%s' missing from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.missing %s", name);
return NULL;
}
value = DATA_TRANSFER(item)->data;
if (value)
dlen = strlen(value);
else
dlen = 0;
if (!value || (int)dlen < len) {
LOGERR("%s(): failed, field '%s' short (%s%d<%d) from %s():%d",
__func__, name, value ? EMPTY : "null",
(int)dlen, len, func, line);
snprintf(reply, siz, "failed.short %s", name);
return NULL;
}
if (patt) {
if (regcomp(&re, patt, REG_NOSUB) != 0) {
LOGERR("%s(): failed, field '%s' failed to"
" compile patt from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.REC %s", name);
return NULL;
}
ret = regexec(&re, value, (size_t)0, NULL, 0);
regfree(&re);
if (ret != 0) {
LOGERR("%s(): failed, field '%s' invalid from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.invalid %s", name);
return NULL;
}
}
return item;
}
static PGconn *dbconnect()
@ -8174,6 +8234,8 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
free(cmdptr);
return CMD_REPLY;
}
if (confirm_check_createdate)
check_createdate_ccl(cmd, cd);
}
free(cmdptr);
return cmds[*which_cmds].cmd_val;
@ -8868,8 +8930,6 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
return false;
}
// Log files are every ...
#define ROLL_S 3600
// 10Mb for now - transactiontree can be large
#define MAX_READ (10 * 1024 * 1024)
static char *reload_buf;
@ -8897,10 +8957,14 @@ static bool reload_from(tv_t *start)
reloading = true;
copy_tv(&reload_timestamp, start);
reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S;
tv_to_buf(start, buf, sizeof(buf));
LOGWARNING("%s(): from %s", __func__, buf);
tv_to_buf(&reload_timestamp, run, sizeof(run));
LOGWARNING("%s(): from %s (stamp %s)", __func__, buf, run);
filename = rotating_filename(restorefrom, start->tv_sec);
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re");
if (!fp)
quithere(1, "Failed to open '%s'", filename);
@ -8937,10 +9001,12 @@ static bool reload_from(tv_t *start)
free(filename);
if (matched)
break;
start->tv_sec += ROLL_S;
if (confirm_sharesummary && tv_newer(&confirm_finish, start))
reload_timestamp.tv_sec += ROLL_S;
if (confirm_sharesummary && tv_newer(&confirm_finish, &reload_timestamp)) {
LOGWARNING("%s(): confirm range complete", __func__);
break;
filename = rotating_filename(restorefrom, start->tv_sec);
}
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re");
if (!fp) {
missingfirst = strdup(filename);
@ -8951,17 +9017,17 @@ static bool reload_from(tv_t *start)
setnow(&now);
now.tv_sec += ROLL_S;
while (42) {
start->tv_sec += ROLL_S;
reload_timestamp.tv_sec += ROLL_S;
/* WARNING: if the system clock is wrong, any CCLs
* missing or not created due to a ckpool outage of
* an hour or more can stop the reload early and
* cause DB problems! Though, the clock being wrong
* can screw up ckpool and ckdb anyway ... */
if (!tv_newer(start, &now)) {
if (!tv_newer(&reload_timestamp, &now)) {
finished = true;
break;
}
filename = rotating_filename(restorefrom, start->tv_sec);
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
fp = fopen(filename, "re");
if (fp)
break;
@ -9390,6 +9456,16 @@ static void confirm_reload()
DATA_BLOCKS(b_end_item)->height);
last_reason = last_buf;
break;
case 'i':
LOGWARNING("%s(): info displayed - exiting", __func__);
exit(0);
case 'c':
case 'r':
confirm_first_workinfoid = confirm_range_start;
confirm_last_workinfoid = confirm_range_finish;
first_reason = "start range";
last_reason = "end range";
break;
case 'w':
confirm_first_workinfoid = confirm_range_start;
// last from default
@ -9400,15 +9476,6 @@ static void confirm_reload()
}
first_reason = "start range";
break;
case 'r':
confirm_first_workinfoid = confirm_range_start;
confirm_last_workinfoid = confirm_range_finish;
first_reason = "start range";
last_reason = "end range";
break;
case 'i':
LOGWARNING("%s(): info displayed - exiting", __func__);
exit(0);
default:
quithere(1, "Code fail");
}
@ -9506,6 +9573,15 @@ static void confirm_reload()
return;
}
if (confirm_check_createdate) {
LOGERR("%s(): CCL mismatches %"PRId64"/%"PRId64" %.6f/%.6f unordered "
"%"PRId64"/%"PRId64" %.6f",
__func__, ccl_mismatch, ccl_mismatch_abs,
ccl_mismatch_min, ccl_mismatch_max,
ccl_unordered, ccl_unordered_abs, ccl_unordered_most);
return;
}
compare_summaries(sharesummary_workinfoid_save, "DB",
sharesummary_workinfoid_root, "ReLoad",
true, true);
@ -9521,9 +9597,23 @@ static void confirm_summaries()
// Simple value check to abort early
if (confirm_range && *confirm_range) {
if (confirm_range[0] != 'i' && strlen(confirm_range) < 2) {
LOGEMERG("%s() invalid confirm range length '%s'", __func__, confirm_range);
return;
switch(tolower(confirm_range[0])) {
case 'b':
case 'c':
case 'r':
case 'w':
if (strlen(confirm_range) < 2) {
LOGEMERG("%s() invalid confirm range length '%s'",
__func__, confirm_range);
return;
}
break;
case 'i':
break;
default:
LOGEMERG("%s() invalid confirm range '%s'",
__func__, confirm_range);
return;
}
switch(tolower(confirm_range[0])) {
case 'b':
@ -9534,19 +9624,16 @@ static void confirm_summaries()
return;
}
break;
case 'w':
confirm_range_start = atoll(confirm_range+1);
if (confirm_range_start <= 0) {
LOGEMERG("%s() invalid confirm start '%s' - must be >0",
__func__, confirm_range);
return;
}
case 'i':
break;
case 'c':
confirm_check_createdate = true;
case 'r':
range = strdup(confirm_range);
minus = strchr(range+1, '-');
if (!minus || minus == range+1) {
LOGEMERG("%s() invalid confirm range '%s' - must be rNNN-MMM",
__func__, confirm_range);
LOGEMERG("%s() invalid confirm range '%s' - must be %cNNN-MMM",
__func__, confirm_range, tolower(confirm_range[0]));
return;
}
*(minus++) = '\0';
@ -9569,12 +9656,13 @@ static void confirm_summaries()
}
free(range);
break;
case 'i':
break;
default:
LOGEMERG("%s() invalid confirm range '%s'",
__func__, confirm_range);
return;
case 'w':
confirm_range_start = atoll(confirm_range+1);
if (confirm_range_start <= 0) {
LOGEMERG("%s() invalid confirm start '%s' - must be >0",
__func__, confirm_range);
return;
}
}
}

Loading…
Cancel
Save