From cef058b4f49b16920366107d1934927a13177063 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 21 Apr 2016 14:17:07 +1000 Subject: [PATCH] ckdb - more seqall logging and lognotice processing counts --- src/ckdb.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++------- src/ckdb.h | 2 +- 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 33558c61..28b3060e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2818,13 +2818,22 @@ static enum cmd_values process_seq(MSGLINE *msgline) /* If non-seqall data was in a CCL reload file, * it can't be processed by update_seq(), so don't */ if (msgline->n_seqall == 0 && msgline->n_seqstt == 0 && - msgline->n_seqpid == 0) + msgline->n_seqpid == 0) { + if (SEQALL_LOG) { + LOGNOTICE("%s() SEQALL 0 skipping %.42s...", + __func__, + st = safe_text(msgline->msg)); + FREENULL(st); + } return ckdb_cmds[msgline->which_cmds].cmd_val; + } if (SEQALL_LOG) { - LOGNOTICE("%s() SEQALL %"PRIu64, - __func__, msgline->n_seqall); + LOGNOTICE("%s() SEQALL %"PRIu64" %"PRIu64" %"PRIu64, + __func__, msgline->n_seqall, msgline->n_seqstt, + msgline->n_seqpid); } + dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, msgline->n_seqpid, SEQALL, &(msgline->now), &(msgline->cd), msgline->code, @@ -3302,6 +3311,7 @@ static void *breaker(void *arg) int thr, zeros; bool reload, was_null, msg = false; int queue_sleep, queue_limit, count; + uint64_t processed = 0; pthread_detach(pthread_self()); @@ -3333,11 +3343,17 @@ static void *breaker(void *arg) LOCK_INIT(buf); rename_proc(buf); + LOGNOTICE("%s() %s %s starting", + __func__, buf, reload ? "reload" : "cmd"); + if (reload) { /* reload has to wait for the reload to start, however, also * check for startup_complete in case we miss the reload */ while (!everyone_die && !reloading && !startup_complete) cksleep_ms(queue_sleep); + + LOGNOTICE("%s() %s reload processing", __func__, buf); + } while (!everyone_die) { @@ -3369,6 +3385,8 @@ static void *breaker(void *arg) continue; } + processed++; + DATA_BREAKQUEUE(bq, bq_item); if (reload) { @@ -3415,6 +3433,9 @@ static void *breaker(void *arg) K_WUNLOCK(breakqueue_free); } + LOGNOTICE("%s() %s %s exiting, processed %"PRIu64, + __func__, buf, reload ? "reload" : "cmd", processed); + // Get it now while the lock still exists, in case we need it K_RLOCK(breakqueue_free); // Not 100% exact since it could still increase, but close enough @@ -4472,6 +4493,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) static void *clistener(__maybe_unused void *arg) { + uint64_t processed = 0; PGconn *conn = NULL; K_ITEM *wq_item; time_t now; @@ -4481,6 +4503,8 @@ static void *clistener(__maybe_unused void *arg) LOCK_INIT("db_clistener"); rename_proc("db_clistener"); + LOGNOTICE("%s() processing", __func__); + clistener_using_data = true; conn = dbconnect(); @@ -4498,12 +4522,15 @@ static void *clistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) + if (wq_item) { + processed++; process_sockd(conn, wq_item); - else + } else cksleep_ms(42); } + LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + clistener_using_data = false; if (conn) @@ -4514,6 +4541,7 @@ static void *clistener(__maybe_unused void *arg) static void *blistener(__maybe_unused void *arg) { + uint64_t processed = 0; PGconn *conn = NULL; K_ITEM *wq_item; time_t now; @@ -4523,6 +4551,8 @@ static void *blistener(__maybe_unused void *arg) LOCK_INIT("db_blistener"); rename_proc("db_blistener"); + LOGNOTICE("%s() processing", __func__); + blistener_using_data = true; now = time(NULL); @@ -4539,12 +4569,15 @@ static void *blistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) + if (wq_item) { + processed++; process_sockd(conn, wq_item); - else + } else cksleep_ms(142); } + LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + blistener_using_data = false; if (conn) @@ -4932,6 +4965,7 @@ static void *socketer(void *arg) char *end, *buf = NULL; K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; + uint64_t proc_early = 0, processed = 0; int sockd; tv_t now; @@ -4986,6 +5020,7 @@ static void *socketer(void *arg) K_WLOCK(workqueue_free); earlysock_left++; K_WUNLOCK(workqueue_free); + proc_early++; } if (SEQALL_LOG) { @@ -5009,6 +5044,7 @@ static void *socketer(void *arg) } } + processed++; // Don't limit the speed filling up cmd_breakqueue_store K_WLOCK(breakqueue_free); bq_item = k_unlink_head(breakqueue_free); @@ -5025,6 +5061,9 @@ static void *socketer(void *arg) } } + LOGNOTICE("%s() exiting, early %"PRIu64" after %"PRIu64" (%"PRIu64")", + __func__, proc_early, processed - proc_early, processed); + socketer_using_data = false; close_unix_socket(us->sockd, us->path); @@ -5044,12 +5083,15 @@ static void *process_reload(__maybe_unused void *arg) enum cmd_values cmdnum; char *ans, *st = NULL; time_t now; + uint64_t processed = 0; pthread_detach(pthread_self()); LOCK_INIT("db_procreload"); rename_proc("db_procreload"); + LOGNOTICE("%s() starting", __func__); + conn = dbconnect(); now = time(NULL); @@ -5069,6 +5111,8 @@ static void *process_reload(__maybe_unused void *arg) continue; } + processed++; + // Don't keep a connection for more than ~10s ... of processing if ((time(NULL) - now) > 10) { PQfinish(conn); @@ -5198,6 +5242,8 @@ static void *process_reload(__maybe_unused void *arg) PQfinish(conn); + LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + return NULL; } @@ -5655,7 +5701,8 @@ static void *listener(void *arg) SEQDATA *seqdata; K_ITEM *ss_item; int cpus, i; - bool reloader, cmder, pool0; + bool reloader, cmder, pool0, switch_msg = false; + uint64_t proc0 = 0, proc1 = 0; pthread_detach(pthread_self()); @@ -5747,6 +5794,7 @@ static void *listener(void *arg) wqgot = 0; } + LOGNOTICE("%s() processing pool0", __func__); /* Process queued work - ensure pool0 is emptied first, * even if there is pending pool0 data being processed by breaker() */ pool0 = true; @@ -5756,9 +5804,10 @@ static void *listener(void *arg) wq_item = NULL; K_WLOCK(workqueue_free); if (pool0) { - if (earlysock_left == 0) + if (earlysock_left == 0) { pool0 = false; - else { + switch_msg = true; + } else { wq_item = k_unlink_head(pool0_workqueue_store); if (wq_item) earlysock_left--; @@ -5770,6 +5819,12 @@ static void *listener(void *arg) } K_WUNLOCK(workqueue_free); + if (switch_msg) { + switch_msg = false; + LOGNOTICE("%s() pool0 complete, processed %"PRIu64, + __func__, proc0); + } + if (wqcount == 0 && wq_stt.tv_sec != 0L) setnow(&wq_fin); @@ -5783,6 +5838,10 @@ static void *listener(void *arg) } if (wq_item) { + if (pool0) + proc0++; + else + proc1++; wqgot++; process_queued(conn, wq_item); tick(); @@ -5832,6 +5891,8 @@ static void *listener(void *arg) } sayonara: + LOGNOTICE("%s() exiting, pool0 %"PRIu64" pool %"PRIu64, + __func__, proc0, proc1); plistener_using_data = false; diff --git a/src/ckdb.h b/src/ckdb.h index 0a5d41fd..caf6a07b 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.013" +#define CKDB_VERSION DB_VERSION"-2.014" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__