Browse Source

ckdb - calculate file time lengths for the reload ratio

master
kanoi 8 years ago
parent
commit
72374c328c
  1. 41
      src/ckdb.c
  2. 2
      src/ckdb.h

41
src/ckdb.c

@ -290,6 +290,8 @@ int64_t confirm_last_workinfoid;
#define WORKINFO_AGE 660 #define WORKINFO_AGE 660
static tv_t reload_timestamp; static tv_t reload_timestamp;
// CDTRF in the last reloaded record - locked under breakqueue_free
static char last_reload_cd[CDATE_BUFSIZ];
// Shared by threads - accessed under breakqueue_free lock // Shared by threads - accessed under breakqueue_free lock
static uint64_t reload_processed = 0; static uint64_t reload_processed = 0;
// Shared by threads - accessed under workqueue_free lock // Shared by threads - accessed under workqueue_free lock
@ -3884,9 +3886,10 @@ static void *breaker(void *arg)
static bool cmd0 = false; static bool cmd0 = false;
struct breaker_setup *setup; struct breaker_setup *setup;
K_ITEM *bq_item = NULL; K_ITEM *bq_item = NULL, *cd_item = NULL;
BREAKQUEUE *bq = NULL; BREAKQUEUE *bq = NULL;
MSGLINE *msgline = NULL; MSGLINE *msgline = NULL;
TRANSFER *cd_trf;
char buf[128]; char buf[128];
bool reload, was_null, msg; bool reload, was_null, msg;
int queue_sleep, queue_limit, count; int queue_sleep, queue_limit, count;
@ -4137,8 +4140,14 @@ static void *breaker(void *arg)
} }
} }
} }
if (reload)
cd_item = find_transfer(msgline->trf_root, CDTRF);
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
if (reload) { if (reload) {
if (cd_item) {
DATA_TRANSFER(cd_trf, cd_item);
STRNCPY(last_reload_cd, cd_trf->mvalue);
}
k_add_tail(reload_done_breakqueue_store, bq_item); k_add_tail(reload_done_breakqueue_store, bq_item);
mutex_lock(&process_reload_waitlock); mutex_lock(&process_reload_waitlock);
process_reload_signals++; process_reload_signals++;
@ -6729,11 +6738,12 @@ static bool reload_from(tv_t *start, const tv_t *finish)
bool finished = false, ret = true, ok, apipe = false; bool finished = false, ret = true, ok, apipe = false;
char *filename = NULL; char *filename = NULL;
uint64_t count, total; uint64_t count, total;
tv_t now, begin, file_begin, file_end; tv_t now, begin, file_begin, file_end, last_cd;
time_t last_file = 0;
double diff, ratio; double diff, ratio;
FILE *fp = NULL; FILE *fp = NULL;
int file_N_limit; int file_N_limit;
time_t tick_time, tmp_time; time_t tick_time, tmp_time, last_sec, tot_sec = 0;
reload_buf = malloc(MAX_READ); reload_buf = malloc(MAX_READ);
if (!reload_buf) if (!reload_buf)
@ -6755,6 +6765,7 @@ static bool reload_from(tv_t *start, const tv_t *finish)
filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec); filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec);
if (!logopen(&filename, &fp, &apipe)) if (!logopen(&filename, &fp, &apipe))
quithere(1, "Failed to open '%s'", filename); quithere(1, "Failed to open '%s'", filename);
last_file = reload_timestamp.tv_sec;
setnow(&now); setnow(&now);
copy_tv(&begin, &now); copy_tv(&begin, &now);
@ -6815,7 +6826,17 @@ static bool reload_from(tv_t *start, const tv_t *finish)
diff = tvdiff(&file_end, &file_begin); diff = tvdiff(&file_end, &file_begin);
if (diff == 0) if (diff == 0)
diff = 1; diff = 1;
ratio = (double)ROLL_S / diff; // Work out how long the file was
K_RLOCK(breakqueue_free);
txt_to_ctv(CDTRF, last_reload_cd, &last_cd, sizeof(last_cd));
K_RUNLOCK(breakqueue_free);
last_sec = last_cd.tv_sec - last_file;
if (last_cd.tv_usec)
last_sec++;
if (last_sec < 2 || last_sec > ROLL_S)
last_sec = ROLL_S;
ratio = (double)last_sec / diff;
tot_sec += last_sec;
LOGWARNING("%s(): %sread %"PRIu64" line%s %.2f/s (%.1fx)" LOGWARNING("%s(): %sread %"PRIu64" line%s %.2f/s (%.1fx)"
" from %s", " from %s",
@ -6854,7 +6875,9 @@ static bool reload_from(tv_t *start, const tv_t *finish)
filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec); filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec);
ok = logopen(&filename, &fp, &apipe); ok = logopen(&filename, &fp, &apipe);
if (!ok) { if (ok)
last_file = reload_timestamp.tv_sec;
else {
missingfirst = strdup(filename); missingfirst = strdup(filename);
FREENULL(filename); FREENULL(filename);
errno = 0; errno = 0;
@ -6874,8 +6897,10 @@ static bool reload_from(tv_t *start, const tv_t *finish)
} }
filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec); filename = hour_filename(restorefrom, restorename, reload_timestamp.tv_sec);
ok = logopen(&filename, &fp, &apipe); ok = logopen(&filename, &fp, &apipe);
if (ok) if (ok) {
last_file = reload_timestamp.tv_sec;
break; break;
}
errno = 0; errno = 0;
if (missing_count++ > 1) if (missing_count++ > 1)
free(missinglast); free(missinglast);
@ -6907,7 +6932,7 @@ static bool reload_from(tv_t *start, const tv_t *finish)
diff = tvdiff(&now, &begin); diff = tvdiff(&now, &begin);
if (diff == 0) if (diff == 0)
diff = 1; diff = 1;
ratio = (double)(processing * ROLL_S) / diff; ratio = (double)tot_sec / diff;
snprintf(reload_buf, MAX_READ, "reload.%s.%"PRIu64, run, total); snprintf(reload_buf, MAX_READ, "reload.%s.%"PRIu64, run, total);
LOGQUE(reload_buf, true); LOGQUE(reload_buf, true);
@ -7466,7 +7491,7 @@ static bool make_keysummaries()
&now, NULL); &now, NULL);
K_WUNLOCK(process_pplns_free); K_WUNLOCK(process_pplns_free);
setnow(&proc_lock_fin); setnow(&proc_lock_fin);
LOGWARNING("%s() pplns lock time %.3fs+%.3fs", LOGWARNING("%s() pplns lock time %.3f+%.3fs",
__func__, tvdiff(&proc_lock_got, &proc_lock_stt), __func__, tvdiff(&proc_lock_got, &proc_lock_stt),
tvdiff(&proc_lock_fin, &proc_lock_got)); tvdiff(&proc_lock_fin, &proc_lock_got));

2
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.412" #define CKDB_VERSION DB_VERSION"-2.413"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

Loading…
Cancel
Save