diff --git a/src/ckdb.c b/src/ckdb.c index e3e8eb25..578bb942 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -503,6 +503,8 @@ K_LIST *seqset_free; static K_STORE *seqset_store; // Initialised when seqset_free is allocated static char *seqnam[SEQ_MAX]; +// Lock access to the static found data in update_seq() +static cklock_t seq_found_lock; // Full lock for access to sequence processing data #define SEQLOCK() K_WLOCK(seqset_free); @@ -2604,6 +2606,62 @@ static void seq_reloadmax() SEQUNLOCK(); } +/* Local structure for update_seq() to remember trans found as ranges + * It's in use if last.tv_sec != 0 */ +typedef struct seqfound { + tv_t last; + int set; + uint64_t seqstt; + uint64_t seqpid; + // seq range + uint64_t seq1, seq2; + // cd range + tv_t cd1, cd2; + bool forced_msg; +} SEQFOUND; + +static void msgs_seq(SEQFOUND *found_msgs) +{ + char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ]; + char r_buf[64], t_buf3[DATE_BUFSIZ], c_buf[64]; + int i; + + for (i = 0; i < SEQ_MAX; i++) { + if (found_msgs[i].last.tv_sec != 0) { + btu64_to_buf(&(found_msgs[i].seqstt), t_buf, + sizeof(t_buf)); + bt_to_buf(&(found_msgs[i].cd1.tv_sec), t_buf2, + sizeof(t_buf2)); + if (found_msgs[i].seq2 == found_msgs[i].seq1) { + r_buf[0] = '\0'; + c_buf[0] = '\0'; + } else { + snprintf(r_buf, sizeof(r_buf), + "-%"PRIu64, + found_msgs[i].seq2); + snprintf(c_buf, sizeof(c_buf), + " (%"PRIu64")", + found_msgs[i].seq2 + 1 - + found_msgs[i].seq1); + } + if (found_msgs[i].cd1.tv_sec == found_msgs[i].cd2.tv_sec) + t_buf3[0] = '\0'; + else { + ms_to_buf(&(found_msgs[i].cd2.tv_sec), + t_buf3, sizeof(t_buf3)); + } + LOGWARNING("Seq found trans %s %"PRIu64"%s%s" + " set:%d/%"PRIu64"=%s/%"PRIu64 + " %s%s%s", + seqnam[i], found_msgs[i].seq1, r_buf, c_buf, + found_msgs[i].set, + found_msgs[i].seqstt, t_buf, + found_msgs[i].seqpid, t_buf2, + t_buf3[0] ? ".." : EMPTY, t_buf3); + } + } +} + /* Most of the extra message logic in here is to avoid putting too many * messages or incorrect messages on the console when errors occur * It wont lose msglines from the reload or the queue, since if there is any @@ -2616,11 +2674,15 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, char *nam, tv_t *now, tv_t *cd, char *code, int seqentryflags, char *msg) { + static SEQFOUND found[SEQ_MAX]; + // flag to avoid always accessing the lock since trans is extremely rare + static bool has_found; + char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; bool firstseq, newseq, expseq, gothigh, okhi, gotstale, gotstalestart; SEQSET *seqset = NULL, *seqset0 = NULL, seqset_pre = { 0 }; SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; - bool dup, wastrans, doitem, dotime, gotrecover; + bool dup, wastrans, doitem, dotime, gotrecover, used; SEQDATA *seqdata; SEQENTRY *seqentry, seqentry_copy, *u_entry; K_ITEM *seqset_item = NULL, *st_item = NULL, *stl_item = NULL; @@ -2630,6 +2692,8 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, uint64_t u; int set = -1, expset = -1, highlimit, i; K_STORE *lost = NULL; + SEQFOUND found_msgs[SEQ_MAX]; + tv_t found_now; LOGDEBUG("%s() SQ %c:%d/%s/%"PRIu64"/%"PRIu64"/%"PRIu64"/%s '%.80s...", __func__, SECHR(seqentryflags), seq, nam, n_seqcmd, n_seqstt, @@ -3111,12 +3175,90 @@ setitemdata: FREENULL(st); } - if (wastrans || gotrecover) { + if (wastrans) { + for (i = 0; i < SEQ_MAX; i++) + found_msgs[i].last.tv_sec = 0; + used = false; + setnow(&found_now); + ck_wlock(&seq_found_lock); + if (found[seq].last.tv_sec != 0) { + // Can we append it? + if (found[seq].seq2 == (n_seqcmd - 1) && + found[seq].set == set && + found[seq].seqstt == n_seqstt && + found[seq].seqpid == n_seqpid) { + found[seq].seq2++; + if (tv_newer(cd, &(found[seq].cd1))) + copy_tv(&(found[seq].cd1), cd); + if (tv_newer(&(found[seq].cd2), cd)) + copy_tv(&(found[seq].cd2), cd); + used = true; + } else { + // No, so force display it + found[seq].forced_msg = true; + } + } + // Check if there are any ranges >= 2s old (or forced) + for (i = 0; i < SEQ_MAX; i++) { + if (found[i].forced_msg || (found[i].last.tv_sec != 0 && + tvdiff(&found_now, &(found[i].last)) >= 2.0)) { + memcpy(&(found_msgs[i]), &(found[i]), + sizeof(SEQFOUND)); + // will be displayed, so erase it + found[i].last.tv_sec = 0; + found[i].forced_msg = false; + } + } + // Store it - found[seq] will (now) be unused + if (!used) { + copy_tv(&(found[seq].last), &found_now); + found[seq].set = set; + found[seq].seqstt = n_seqstt; + found[seq].seqpid = n_seqpid; + found[seq].seq1 = found[seq].seq2 = n_seqcmd; + copy_tv(&(found[seq].cd1), cd); + copy_tv(&(found[seq].cd2), cd); + } + has_found = false; + for (i = 0; i < SEQ_MAX; i++) { + if (found[i].last.tv_sec != 0) + has_found = true; + } + ck_wunlock(&seq_found_lock); + msgs_seq(found_msgs); + } else { + if (has_found) { + for (i = 0; i < SEQ_MAX; i++) + found_msgs[i].last.tv_sec = 0; + ck_wlock(&seq_found_lock); + if (has_found) { + for (i = 0; i < SEQ_MAX; i++) { + if (found[i].last.tv_sec != 0 && + tvdiff(&found_now, &(found[i].last)) >= 2.0) { + memcpy(&(found_msgs[i]), + &(found[i]), + sizeof(SEQFOUND)); + // will be displayed, so erase it + found[i].last.tv_sec = 0; + found[i].forced_msg = false; + } + } + has_found = false; + for (i = 0; i < SEQ_MAX; i++) { + if (found[i].last.tv_sec != 0) + has_found = true; + } + } + ck_wunlock(&seq_found_lock); + msgs_seq(found_msgs); + } + } + + if (gotrecover) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGWARNING("%s %s %"PRIu64" set:%d/%"PRIu64"=%s/%"PRIu64 - " %s/%s", - gotrecover ? "SEQ recovered" : "Seq found trans", + LOGWARNING("SEQ recovered %s %"PRIu64" set:%d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code); } @@ -8647,6 +8789,7 @@ int main(int argc, char **argv) cklock_init(&last_lock); cklock_init(&btc_lock); cklock_init(&poolinstance_lock); + cklock_init(&seq_found_lock); mutex_init(&bq_reload_waitlock); mutex_init(&bq_cmd_waitlock); diff --git a/src/ckdb.h b/src/ckdb.h index b5028120..83117ffe 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.409" +#define CKDB_VERSION DB_VERSION"-2.410" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -279,7 +279,9 @@ enum data_type { TYPE_BLOB, TYPE_DOUBLE, TYPE_T, - TYPE_BT + TYPE_BT, + TYPE_HMS, + TYPE_MS }; // BLOB does what PTR needs @@ -3037,6 +3039,8 @@ extern char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, #define t_to_buf(_data, _buf, _siz) _t_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define bt_to_buf(_data, _buf, _siz) _bt_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define btu64_to_buf(_data, _buf, _siz) _btu64_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define hms_to_buf(_data, _buf, _siz) _hms_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define ms_to_buf(_data, _buf, _siz) _ms_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) extern char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS); @@ -3059,6 +3063,10 @@ extern char *_t_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); // Convert seconds (only) time to (brief) M-DD/HH:MM:SS extern char *_bt_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); +// Convert tv to HH:MM:SS +extern char *_hms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); +// Convert tv to MM:SS +extern char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS); extern void dsp_transfer(K_ITEM *item, FILE *stream); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index c5d79ce4..52c2fad8 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -511,6 +511,8 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_ case TYPE_BTV: case TYPE_T: case TYPE_BT: + case TYPE_HMS: + case TYPE_MS: siz = DATE_BUFSIZ; break; case TYPE_CTV: @@ -597,6 +599,19 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_ tm.tm_min, tm.tm_sec); break; + case TYPE_HMS: + gmtime_r((time_t *)data, &tm); + snprintf(buf, siz, "%02d:%02d:%02d", + tm.tm_hour, + tm.tm_min, + tm.tm_sec); + break; + case TYPE_MS: + gmtime_r((time_t *)data, &tm); + snprintf(buf, siz, "%02d:%02d", + tm.tm_min, + tm.tm_sec); + break; } return buf; @@ -674,6 +689,18 @@ char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) return _data_to_buf(TYPE_BT, (void *)&t, buf, siz, WHERE_FFL_PASS); } +// Convert to HH:MM:SS +char *_hms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + return _data_to_buf(TYPE_HMS, (void *)data, buf, siz, WHERE_FFL_PASS); +} + +// Convert to MM:SS +char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + return _data_to_buf(TYPE_MS, (void *)data, buf, siz, WHERE_FFL_PASS); +} + // For mutiple variable function calls that need the data char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS) {