From 654baf6bf512f475276e9ca9109bca220bc3cef3 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 24 Aug 2016 17:28:11 +1000 Subject: [PATCH] ckdb - use a separate thread for message logging --- src/ckdb.c | 409 +++++++++++++++++++++++++++++++++++++++++------------ src/ckdb.h | 20 ++- 2 files changed, 340 insertions(+), 89 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 8243f507..52994d67 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -327,6 +327,15 @@ static bool reload_queue_complete = false; bool everyone_die = false; // Set to true every time a store is created static bool seqdata_reload_lost = false; +// Tell the ioqueue thread to exit +static bool ioqueue_die = false; +/* Tell the console ioqueue to clear it's list as fast as possible + * In the case where a console log unexpectedly ends up having a very large + * number of messages, since it limits the number of messages per second + * this could take a long time to clear + * To resolve this, you can disconnect from the console if your IO speed to + * the console is slow, then send it all by sending flush.2 */ +static bool ioqueue_flush = false; /* These are included in cmd_homepage * to help identify when ckpool locks up (or dies) */ @@ -402,6 +411,14 @@ int64_t mismatch_all_workmarkers; int64_t mismatch_all_marks; int64_t mismatch_all_total; +// IOQUEUE +static K_LIST *ioqueue_free; +static K_STORE *ioqueue_store; +static K_STORE *console_ioqueue_store; +// Trigger ioqueue_store processing +static mutex_t f_ioqueue_waitlock; +static pthread_cond_t f_ioqueue_waitcond; + // LOGQUEUE K_LIST *logqueue_free; K_STORE *logqueue_store; @@ -823,6 +840,245 @@ static bool no_data_log = false; * 'restorefrom/' */ static char *logpath; +static void ioprocess(IOQUEUE *io) +{ + char stamp[128], tzinfo[16], tzch; + long minoff, hroff; + struct tm tm; + int ms; + + if (io->when.tv_sec == 0) + stamp[0] = '\0'; + else { + ms = (int)(io->when.tv_usec / 1000); + localtime_r(&(io->when.tv_sec), &tm); + minoff = tm.tm_gmtoff / 60; + if (minoff < 0) { + tzch = '-'; + minoff *= -1; + } else + tzch = '+'; + hroff = minoff / 60; + if (minoff % 60) { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld:%02ld", + tzch, hroff, minoff % 60); + } else { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld", + tzch, hroff); + } + snprintf(stamp, sizeof(stamp), + "[%d-%02d-%02d %02d:%02d:%02d.%03d%s] ", + tm.tm_year + 1900, + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec, ms, + tzinfo); + } + + if (io->logfd && global_ckp) { + int logfd = global_ckp->logfd; + if (logfd) { + FILE *LOGFP = global_ckp->logfp; + + flock(logfd, LOCK_EX); + if (io->errn) { + fprintf(LOGFP, "%s%s with errno %d: %s\n", + stamp, io->msg, + io->errn, strerror(io->errn)); + } else + fprintf(LOGFP, "%s%s\n", stamp, io->msg); + flock(logfd, LOCK_UN); + } + } + + if (io->logout) { + if (io->errn) { + fprintf(stdout, "%s%s with errno %d: %s%s", + stamp, io->msg, + io->errn, strerror(io->errn), + io->eol ? "\n" : "\r"); + } else { + fprintf(stdout, "%s%s%s", + stamp, io->msg, + io->eol ? "\n" : "\r"); + } + if (io->flush) + fflush(stdout); + } + + if (io->logerr) { + if (io->errn) { + fprintf(stderr, "%s%s with errno %d: %s%s", + stamp, io->msg, + io->errn, strerror(io->errn), + io->eol ? "\n" : "\r"); + } else { + fprintf(stderr, "%s%s%s", + stamp, io->msg, + io->eol ? "\n" : "\r"); + } + if (io->flush) + fflush(stderr); + } +} + +static void *iomsgs(void *consol) +{ + ts_t when, when_add; + K_ITEM *io_item; + IOQUEUE *io; + char buf[64]; + + snprintf(buf, sizeof(buf), + "db%s_%s%s", + dbcode, consol ? "c" : "f", __func__); + LOCK_INIT(buf); + rename_proc(buf); + + when_add.tv_sec = 0; + when_add.tv_nsec = 100000000; // 100ms + + while (0x80085) { + // WARNING taking locks can produce messages ... + K_WLOCK(ioqueue_free); + if (consol) + io_item = k_unlink_head(console_ioqueue_store); + else + io_item = k_unlink_head(ioqueue_store); + K_WUNLOCK(ioqueue_free); + + if (io_item) { + DATA_IOQUEUE(io, io_item); + ioprocess(io); + free(io->msg); + // WARNING taking locks can produce messages ... + K_WLOCK(ioqueue_free); + k_add_head(ioqueue_free, io_item); + K_WUNLOCK(ioqueue_free); + } else { + if (ioqueue_die) + break; + if (consol) { + // The queue is clear + ioqueue_flush = false; + } + } + + if (consol) { + if (!ioqueue_flush) { + // max 50 per second + cksleep_ms(20); + } + } else { + if (!io_item) { + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&f_ioqueue_waitlock); + cond_timedwait(&f_ioqueue_waitcond, + &f_ioqueue_waitlock, + &when); + mutex_unlock(&f_ioqueue_waitlock); + } + } + } + + return NULL; +} + +#define io_msg(stamp, msg, errn, logfd, logerr) \ + _io_msg(stamp, msg, false, errn, logfd, false, logerr, true, true, \ + WHERE_FFL_HERE) +#define cr_msg(stamp, msg) \ + _io_msg(stamp, msg, true, 0, false, true, false, false, true, WHERE_FFL_HERE) +#define lf_msg(stamp, msg) \ + _io_msg(stamp, msg, true, 0, false, true, false, true, true, WHERE_FFL_HERE) +#define err_msg(stamp, msg, errn) \ + _io_msg(stamp, msg, true, errn, false, false, true, true, true, WHERE_FFL_HERE) + +static void _io_msg(bool stamp, char *msg, bool alloc, int errn, bool logfd, + bool logout, bool logerr, bool eol, bool flush, + WHERE_FFL_ARGS) +{ + K_ITEM *fio_item = NULL, *cio_item = NULL; + bool msgused = false; + IOQUEUE *io; + tv_t now; + + if (!logfd && !logout && !logerr) { + quitfrom(1, file, func, line, + "%s() called without output", __func__); + } + + if (stamp) + setnow(&now); + else + now.tv_sec = 0; + + // WARNING taking locks can produce messages ... + K_WLOCK(ioqueue_free); + if (logfd) + fio_item = k_unlink_head(ioqueue_free); + if (logout || logerr) + cio_item = k_unlink_head(ioqueue_free); + K_WUNLOCK(ioqueue_free); + + if (logfd) { + DATA_IOQUEUE(io, fio_item); + if (!alloc) { + io->msg = msg; + msgused = true; + } else { + io->msg = strdup(msg); + if (!(io->msg)) + quithere(1, "strdup (%d) OOM", (int)strlen(msg)); + } + copy_tv(&(io->when), &now); + io->errn = errn; + io->logfd = logfd; + io->logout = false; + io->logerr = false; + io->eol = eol; + io->flush = flush; + } + + if (logout || logerr) { + DATA_IOQUEUE(io, cio_item); + if (!alloc && !msgused) + io->msg = msg; + else { + io->msg = strdup(msg); + if (!(io->msg)) + quithere(1, "strdup (%d) OOM", (int)strlen(msg)); + } + copy_tv(&(io->when), &now); + io->errn = errn; + io->logfd = false; + io->logout = logout; + io->logerr = logerr; + io->eol = eol; + io->flush = flush; + } + + // WARNING taking locks can produce messages ... + K_WLOCK(ioqueue_free); + if (fio_item) + k_add_tail(ioqueue_store, fio_item); + if (cio_item) + k_add_tail(console_ioqueue_store, cio_item); + K_WUNLOCK(ioqueue_free); + + if (fio_item) { + mutex_lock(&f_ioqueue_waitlock); + pthread_cond_signal(&f_ioqueue_waitcond); + mutex_unlock(&f_ioqueue_waitlock); + } +} + static void replace_ymd(char *srch, char *match, int val) { char buf[32], *ptr, *found; @@ -914,85 +1170,30 @@ static void log_queue_message(char *msg, bool db) void logmsg(int loglevel, const char *fmt, ...) { - int logfd = 0; char *buf = NULL; - struct tm tm; - tv_t now_tv; - int ms; va_list ap; - char stamp[128]; - char *extra = EMPTY; - char tzinfo[16]; - char tzch; - long minoff, hroff; + int errn; - if (loglevel > global_ckp->loglevel) - return; - - tv_time(&now_tv); - ms = (int)(now_tv.tv_usec / 1000); - localtime_r(&(now_tv.tv_sec), &tm); - minoff = tm.tm_gmtoff / 60; - if (minoff < 0) { - tzch = '-'; - minoff *= -1; - } else - tzch = '+'; - hroff = minoff / 60; - if (minoff % 60) { - snprintf(tzinfo, sizeof(tzinfo), - "%c%02ld:%02ld", - tzch, hroff, minoff % 60); - } else { - snprintf(tzinfo, sizeof(tzinfo), - "%c%02ld", - tzch, hroff); - } - snprintf(stamp, sizeof(stamp), - "[%d-%02d-%02d %02d:%02d:%02d.%03d%s]", - tm.tm_year + 1900, - tm.tm_mon + 1, - tm.tm_mday, - tm.tm_hour, - tm.tm_min, - tm.tm_sec, ms, - tzinfo); + errn = errno; + errno = 0; if (!fmt) { - fprintf(stderr, "%s %s() called without fmt\n", stamp, __func__); + err_msg(true, "logmsg() called without fmt", errn); return; } - if (!global_ckp) - extra = " !!NULL global_ckp!!"; - else - logfd = global_ckp->logfd; + if (loglevel > global_ckp->loglevel) + return; va_start(ap, fmt); VASPRINTF(&buf, fmt, ap); va_end(ap); - if (logfd) { - FILE *LOGFP = global_ckp->logfp; - - flock(logfd, LOCK_EX); - fprintf(LOGFP, "%s %s", stamp, buf); - if (loglevel <= LOG_ERR && errno != 0) - fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno)); - errno = 0; - fprintf(LOGFP, "\n"); - flock(logfd, LOCK_UN); - } - if (loglevel <= LOG_WARNING) { - if (loglevel <= LOG_ERR && errno != 0) { - fprintf(stderr, "%s %s with errno %d: %s%s\n", - stamp, buf, errno, strerror(errno), extra); - errno = 0; - } else - fprintf(stderr, "%s %s%s\n", stamp, buf, extra); - fflush(stderr); - } - free(buf); + // iomsgs() will free buf + if (loglevel <= LOG_ERR) + io_msg(true, buf, errn, true, loglevel <= LOG_WARNING); + else + io_msg(true, buf, 0, true, loglevel <= LOG_WARNING); } void setnowts(ts_t *now) @@ -1230,15 +1431,14 @@ static time_t last_tick; void tick() { time_t now; - char ch; + char ch[2]; now = time(NULL); if (now != last_tick) { last_tick = now; - ch = status_chars[ticks++ & 0x3]; - putchar(ch); - putchar('\r'); - fflush(stdout); + ch[0] = status_chars[ticks++ & 0x3]; + ch[1] = '\0'; + cr_msg(false, ch); } } @@ -3907,6 +4107,11 @@ static void *breaker(void *arg) else typ = ISCMD; + snprintf(buf, sizeof(buf), "db%s_%c%02d%s", + dbcode, reload ? 'r' : 'c', mythread, __func__); + LOCK_INIT(buf); + rename_proc(buf); + if (mythread == 0) { pthread_detach(pthread_self()); @@ -3941,11 +4146,6 @@ static void *breaker(void *arg) when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; } - snprintf(buf, sizeof(buf), "db%s_%c%02d%s", - dbcode, reload ? 'r' : 'c', mythread, __func__); - LOCK_INIT(buf); - rename_proc(buf); - LOGNOTICE("%s() %s starting", __func__, buf); if (reload) { @@ -5856,6 +6056,11 @@ static void *process_socket(void *arg) fflush(stderr); if (global_ckp && global_ckp->logfd) fflush(global_ckp->logfp); + if (*(msgline->id)) { + // If you set the flush id to 2 + if(atoi(msgline->id) == 2) + ioqueue_flush = true; + } setnow(&(msgline->processed)); break; case CMD_USERSET: @@ -6409,7 +6614,14 @@ static void *process_reload(__maybe_unused void *arg) if (arg) mythread = *(int *)(arg); - else { + else + mythread = 0; + + snprintf(buf, sizeof(buf), "db%s_p%02drload", dbcode, mythread); + LOCK_INIT(buf); + rename_proc(buf); + + if (!arg) { pthread_detach(pthread_self()); for (i = 0; i < THREAD_LIMIT; i++) { @@ -6417,7 +6629,6 @@ static void *process_reload(__maybe_unused void *arg) running[i] = false; } - mythread = 0; running[0] = true; // Set to create the rest of the threads @@ -6426,10 +6637,6 @@ static void *process_reload(__maybe_unused void *arg) LOGNOTICE("%s() starting", __func__); } - snprintf(buf, sizeof(buf), "db%s_p%02drload", dbcode, mythread); - LOCK_INIT(buf); - rename_proc(buf); - when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; @@ -6734,7 +6941,7 @@ static bool reload_from(tv_t *start, const tv_t *finish) { // proc_pt could exit after this returns static pthread_t proc_pt; - char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; + char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1], tickmsg[256]; size_t rflen = strlen(restorefrom); char *missingfirst = NULL, *missinglast = NULL, *st = NULL; int missing_count, processing, counter; @@ -6816,11 +7023,12 @@ static bool reload_from(tv_t *start, const tv_t *finish) poolq = pool_workqueue_store->count; // pool_workqueue_store should be zero K_RUNLOCK(workqueue_free); - printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" - " ckp %d/%d/%d/%d (%d) \r", + snprintf(tickmsg, sizeof(tickmsg), + TICK_PREFIX"reload %"PRIu64"/%d/%d" + " ckp %d/%d/%d/%d (%d) ", total+count, relq, relqd, cmdq, cmdqd, pool0q, poolq, mx); - fflush(stdout); + cr_msg(false, tickmsg); tick_time = tmp_time; } } @@ -8450,6 +8658,7 @@ int main(int argc, char **argv) struct sigaction handler; char *btc_user = "user"; char *btc_pass = "p"; + pthread_t f_iomsgs_pt, c_iomsgs_pt; char buf[512]; ckpool_t ckp; int c, ret, i = 0, j; @@ -8850,6 +9059,23 @@ int main(int argc, char **argv) cond_init(&wq_cmd_waitcond); cond_init(&wq_btc_waitcond); + mutex_init(&f_ioqueue_waitlock); + cond_init(&f_ioqueue_waitcond); + + // Initialise IOQUEUE before anything needs it + ioqueue_free = k_new_list("IOQueue", sizeof(IOQUEUE), + ALLOC_IOQUEUE, LIMIT_IOQUEUE, true); + ioqueue_store = k_new_store(ioqueue_free); + console_ioqueue_store = k_new_store(ioqueue_free); + +#if LOCK_CHECK + DLPRIO(ioqueue, PRIO_TERMINAL); +#endif + + create_pthread(&f_iomsgs_pt, iomsgs, NULL); + bool consol = true; + create_pthread(&c_iomsgs_pt, iomsgs, &consol); + // Emulate a list for lock checking process_pplns_free = k_lock_only_list("ProcessPPLNS"); workers_db_free = k_lock_only_list("WorkersDB"); @@ -8923,7 +9149,8 @@ int main(int argc, char **argv) } if (msg) { trigger = curr; - printf("%s %ds due to%s%s%s%s%s%s%s%s%s\n", + snprintf(buf, sizeof(buf), + "%s %ds due to%s%s%s%s%s%s%s%s%s\n", msg, (int)(curr - start), socketer_using_data ? " socketer" : EMPTY, summariser_using_data ? " summariser" : EMPTY, @@ -8934,13 +9161,19 @@ int main(int argc, char **argv) marker_using_data ? " marker" : EMPTY, breakdown_using_data ? " breakdown" : EMPTY, replier_using_data ? " replier" : EMPTY); - fflush(stdout); + lf_msg(true, buf); } sleep(1); } dealloc_storage(); + ioqueue_die = true; + join_pthread(f_iomsgs_pt); + join_pthread(c_iomsgs_pt); + FREE_STORE(console_ioqueue); + FREE_LISTS(ioqueue); + clean_up(&ckp); return 0; diff --git a/src/ckdb.h b/src/ckdb.h index d76dd319..bf623a9c 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.419" +#define CKDB_VERSION DB_VERSION"-2.420" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1115,6 +1115,23 @@ enum cmd_values { (_row)->pointers = (_row)->pointers; \ } while (0) +// IOQUEUE +typedef struct ioqueue { + char *msg; + tv_t when; + int errn; + bool logfd; + bool logout; + bool logerr; + bool eol; + bool flush; +} IOQUEUE; + +#define ALLOC_IOQUEUE 1024 +#define LIMIT_IOQUEUE 0 +#define INIT_IOQUEUE(_item) INIT_GENERIC(_item, ioqueue) +#define DATA_IOQUEUE(_var, _item) DATA_GENERIC(_var, _item, ioqueue, true) + // LOGQUEUE typedef struct logqueue { char *msg; @@ -2945,6 +2962,7 @@ enum reply_type { }; extern void logmsg(int loglevel, const char *fmt, ...); +extern void setnowts(ts_t *now); extern void setnow(tv_t *now); extern void tick(); extern PGconn *dbconnect();