From 5004f3081d1f69049f137401eba7595a4331d6ca Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 5 Aug 2014 20:34:37 +1000 Subject: [PATCH] ckdb - handle reload files and old format data --- src/ckdb.c | 100 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 18 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 1737965e..aa2909ef 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -340,6 +340,10 @@ static const tv_t default_expiry = { DEFAULT_EXPIRY, 0L }; #define DATE_uS_EOT 0L static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; +// All data will be after: 2-Jan-2014 00:00:00+00 +#define DATE_DEBUG 1388620800L +static const tv_t date_first = { DATE_DEBUG, 0L }; + #define HISTORYDATEINIT(_row, _cd, _by, _code, _inet) do { \ _row->createdate.tv_sec = (_cd)->tv_sec; \ _row->createdate.tv_usec = (_cd)->tv_usec; \ @@ -600,6 +604,20 @@ static K_TREE *transfer_root; static K_LIST *transfer_free; static K_STORE *transfer_store; +// older version missing field defaults +static TRANSFER auth_1 = { "poolinstance", "", auth_1.value }; +static K_ITEM auth_poolinstance = { "tmp", NULL, NULL, (void *)(&auth_1) }; +static TRANSFER poolstats_1 = { "elapsed", "0", poolstats_1.value }; +static K_ITEM poolstats_elapsed = { "tmp", NULL, NULL, (void *)(&poolstats_1) }; +static TRANSFER userstats_1 = { "elapsed", "0", userstats_1.value }; +static K_ITEM userstats_elapsed = { "tmp", NULL, NULL, (void *)(&userstats_1) }; +static TRANSFER userstats_2 = { "workername", "all", userstats_2.value }; +static K_ITEM userstats_workername = { "tmp", NULL, NULL, (void *)(&userstats_2) }; +static TRANSFER userstats_3 = { "idle", FALSE_STR, userstats_3.value }; +static K_ITEM userstats_idle = { "tmp", NULL, NULL, (void *)(&userstats_3) }; +static TRANSFER userstats_4 = { "eos", TRUE_STR, userstats_4.value }; +static K_ITEM userstats_eos = { "tmp", NULL, NULL, (void *)(&userstats_4) }; + // USERS typedef struct users { int64_t userid; @@ -5204,6 +5222,10 @@ static bool reload() if (!tv_newer(&start, &(dbstatus.userstats))) copy_tv(&start, &(dbstatus.userstats)); + if (start.tv_sec < DATE_DEBUG) { + start.tv_sec = DATE_DEBUG; + start.tv_usec = 0L; + } ok = reload_from(&start); free_ktree(userstats_db_root, NULL); @@ -5512,9 +5534,9 @@ static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, if (!i_poolinstance) return strdup(reply); - i_elapsed = require_name("elapsed", 1, NULL, reply, siz); + i_elapsed = optional_name("elapsed", 1, NULL); if (!i_elapsed) - return strdup(reply); + i_elapsed = &poolstats_elapsed; i_users = require_name("users", 1, NULL, reply, siz); if (!i_users) @@ -5609,17 +5631,17 @@ static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, if (!i_poolinstance) return strdup(reply); - i_elapsed = require_name("elapsed", 1, NULL, reply, siz); + i_elapsed = optional_name("elapsed", 1, NULL); if (!i_elapsed) - return strdup(reply); + i_elapsed = &userstats_elapsed; i_username = require_name("username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = optional_name("workername", 1, NULL); if (!i_workername) - return strdup(reply); + i_workername = &userstats_workername; i_hashrate = require_name("hashrate", 1, NULL, reply, siz); if (!i_hashrate) @@ -5637,15 +5659,15 @@ static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, if (!i_hashrate24hr) return strdup(reply); - i_idle = require_name("idle", 1, NULL, reply, siz); + i_idle = optional_name("idle", 1, NULL); if (!i_idle) - return strdup(reply); + i_idle = &userstats_idle; idle = (strcasecmp(DATA_TRANSFER(i_idle)->data, TRUE_STR) == 0); - i_eos = require_name("eos", 1, NULL, reply, siz); + i_eos = optional_name("eos", 1, NULL); if (!i_eos) - return strdup(reply); + i_eos = &userstats_eos; eos = (strcasecmp(DATA_TRANSFER(i_eos)->data, TRUE_STR) == 0); @@ -6424,9 +6446,9 @@ static char *cmd_auth_do(char *cmd, char *id, __maybe_unused tv_t *now, char *by LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = optional_name("poolinstance", 1, NULL); if (!i_poolinstance) - return strdup(reply); + i_poolinstance = &auth_poolinstance; i_username = require_name("username", 1, NULL, reply, siz); if (!i_username) @@ -7238,11 +7260,16 @@ jilted: #define MAX_READ (10 * 1024 * 1024) // TODO: be able to specify a start time on the command line -// TODO: handle missing files? -// TODO: handle a new database with no data or some missing data +/* To handle a new database with no data: + * touch the filename reported in "Failed to open 'filename'" + * when ckdb aborts */ static bool reload_from(tv_t *start) { char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; + size_t rflen = strlen(restorefrom); + char *missing, *missing2; + int missing_count; + int processing; bool ok = true; char *filename; char data[MAX_READ]; @@ -7266,8 +7293,10 @@ static bool reload_from(tv_t *start) LOGFILE(data); total = 0; + processing = 0; while (ok) { LOGWARNING("%s(): processing %s", __func__, filename); + processing++; count = 0; while (ok && fgets_unlocked(data, MAX_READ, fp)) @@ -7281,7 +7310,7 @@ static bool reload_from(tv_t *start) } LOGWARNING("%s(): read %"PRIu64" lines from %s", - count, filename); + __func__, count, filename); total += count; } @@ -7294,15 +7323,42 @@ static bool reload_from(tv_t *start) filename = rotating_filename(restorefrom, start->tv_sec); fp = fopen(filename, "r"); if (!fp) { - LOGWARNING("%s(): completed total %"PRIu64" lines", __func__, total); - break; + missing = filename; + filename = NULL; + missing_count = 1; + setnow(&now); + now.tv_sec += ROLL_S; + while (42) { + start->tv_sec += ROLL_S; + if (!tv_newer(start, &now)) { + ok = false; + break; + } + filename = rotating_filename(restorefrom, start->tv_sec); + fp = fopen(filename, "r"); + if (fp) + break; + if (missing_count++ > 1) + free(missing2); + missing2 = filename; + } + if (missing_count == 1) + LOGWARNING("%s(): skipped %s", __func__, missing+rflen); + else { + LOGWARNING("%s(): skipped %d files from %s to %s", + __func__, missing_count, missing+rflen, missing2+rflen); + free(missing2); + } + free(missing); } } } - free(filename); + if (filename) + free(filename); snprintf(data, sizeof(data), "reload.%s.%"PRIu64, run, total); LOGFILE(data); + LOGWARNING("%s(): %d files, total %"PRIu64" lines", __func__, processing, total); reloading = false; @@ -7471,6 +7527,8 @@ static void *listener(void *arg) return NULL; } +#define RELOADFILES "ckdb" + static void check_restore_dir() { struct stat statbuf; @@ -7485,6 +7543,12 @@ static void check_restore_dir() if (stat(restorefrom, &statbuf)) quit(1, "ERR: -r '%s' directory doesn't exist", restorefrom); + + restorefrom = realloc(restorefrom, strlen(restorefrom)+sizeof(RELOADFILES)); + if (!restorefrom) + quithere(1, "OOM"); + + strcat(restorefrom, RELOADFILES); } int main(int argc, char **argv)