diff --git a/src/ckdb.c b/src/ckdb.c index 999a0f6f..2b53baae 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -693,6 +693,18 @@ enum cmd_values { CMD_END }; +// LOGQUEUE +typedef struct logqueue { + char *msg; +} LOGQUEUE; + +#define ALLOC_LOGQUEUE 1024 +#define LIMIT_LOGQUEUE 0 +#define DATA_LOGQUEUE(_item) ((LOGQUEUE *)(_item->data)) + +static K_LIST *logqueue_free; +static K_STORE *logqueue_store; + // WORKQUEUE typedef struct workqueue { char *buf; @@ -1291,9 +1303,48 @@ static K_LIST *workerstatus_free; static K_STORE *workerstatus_store; static char logname[512]; -#define LOGFILE(_msg) rotating_log(logname, _msg) +#define LOGQUE(_msg) log_queue_message(_msg) +#define LOGFILE(_msg) rotating_log_nolock(_msg) #define LOGDUP "dup." +// low spec version of rotating_log() - no locking +static bool rotating_log_nolock(char *msg) +{ + char *filename; + FILE *fp; + bool ok = false; + + filename = rotating_filename(logname, time(NULL)); + fp = fopen(filename, "a+"); + if (unlikely(!fp)) { + LOGERR("Failed to fopen %s in rotating_log!", filename); + goto stageleft; + } + fprintf(fp, "%s\n", msg); + fclose(fp); + ok = true; + +stageleft: + free(filename); + + return ok; +} + +static void log_queue_message(char *msg) +{ + K_ITEM *lq_item; + + K_WLOCK(logqueue_free); + lq_item = k_unlink_head(logqueue_free); + K_WUNLOCK(logqueue_free); + + DATA_LOGQUEUE(lq_item)->msg = strdup(msg); + + K_WLOCK(logqueue_free); + k_add_tail(logqueue_store, lq_item); + K_WUNLOCK(logqueue_free); +} + void logmsg(int loglevel, const char *fmt, ...) { int logfd = 0; @@ -7520,6 +7571,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(workerstatus, 1, 1); USEINFO(workqueue, 1, 0); USEINFO(transfer, 0, 0); + USEINFO(logqueue, 1, 0); snprintf(tmp, sizeof(tmp), "%ctotalram=%"PRIu64, FLDSEP, tot); APPEND_REALLOC(buf, off, len, tmp); @@ -8011,6 +8063,53 @@ static void *summariser(__maybe_unused void *arg) return NULL; } +static void *logger(__maybe_unused void *arg) +{ + K_ITEM *lq_item; + char buf[128]; + tv_t now; + + pthread_detach(pthread_self()); + + setnow(&now); + snprintf(buf, sizeof(buf), "logstart.%ld,%ld", + now.tv_sec, now.tv_usec); + LOGFILE(buf); + + while (!everyone_die) { + K_WLOCK(logqueue_free); + lq_item = k_unlink_head(logqueue_store); + K_WUNLOCK(logqueue_free); + if (lq_item) { + LOGFILE(DATA_LOGQUEUE(lq_item)->msg); + free(DATA_LOGQUEUE(lq_item)->msg); + + K_WLOCK(logqueue_free); + k_add_head(logqueue_free, lq_item); + K_WUNLOCK(logqueue_free); + } else + cksleep_ms(42); + + } + + K_WLOCK(logqueue_free); + setnow(&now); + snprintf(buf, sizeof(buf), "logstopping.%d.%ld,%ld", + logqueue_store->count, + now.tv_sec, now.tv_usec); + LOGFILE(buf); + while((lq_item = k_unlink_head(logqueue_store))) + LOGFILE(DATA_LOGQUEUE(lq_item)->msg); + K_WUNLOCK(logqueue_free); + + setnow(&now); + snprintf(buf, sizeof(buf), "logstop.%ld,%ld", + now.tv_sec, now.tv_usec); + LOGFILE(buf); + + return NULL; +} + static void *socketer(__maybe_unused void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; @@ -8119,10 +8218,10 @@ static void *socketer(__maybe_unused void *arg) *dot = '\0'; snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", LOGDUP, now.tv_sec, now.tv_usec, duptype); - LOGFILE(reply); + LOGQUE(reply); LOGWARNING("Duplicate '%s' message received", duptype); } else { - LOGFILE(buf); + LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); switch (cmdnum) { case CMD_REPLY: @@ -8359,7 +8458,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) return true; } - LOGFILE(buf); + LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); switch (cmdnum) { // Ignore @@ -8463,7 +8562,7 @@ static bool reload_from(tv_t *start) setnow(&now); tvs_to_buf(&now, run, sizeof(run)); snprintf(reload_buf, MAX_READ, "reload.%s.s0", run); - LOGFILE(reload_buf); + LOGQUE(reload_buf); conn = dbconnect(); @@ -8541,7 +8640,7 @@ static bool reload_from(tv_t *start) PQfinish(conn); snprintf(reload_buf, MAX_READ, "reload.%s.%"PRIu64, run, total); - LOGFILE(reload_buf); + LOGQUE(reload_buf); LOGWARNING("%s(): read %d file%s, total %"PRIu64" line%s", __func__, processing, processing == 1 ? "" : "s", @@ -8612,11 +8711,18 @@ static void process_queued(K_ITEM *wq_item) static void *listener(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; + pthread_t log_pt; pthread_t sock_pt; pthread_t summ_pt; K_ITEM *wq_item; int qc; + logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), + ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); + logqueue_store = k_new_store(logqueue_free); + + create_pthread(&log_pt, logger, NULL); + create_pthread(&sock_pt, socketer, arg); create_pthread(&summ_pt, summariser, NULL);