diff --git a/src/ckdb.c b/src/ckdb.c index 28b3060e..8e783804 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -293,6 +293,21 @@ tv_t last_share_inv; tv_t last_auth; cklock_t last_lock; +// Running stats +// socketer() +tv_t sock_stt; +double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; +uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv; +// breaker() summarised +tv_t break_reload_stt, break_cmd_stt, break_reload_fin; +uint64_t break_reload_processed, break_cmd_processed; +// clistener() +double clis_us; +uint64_t clis_processed; +// blistener() +double blis_us; +uint64_t blis_processed; + static cklock_t fpm_lock; static char *first_pool_message; static sem_t socketer_sem; @@ -366,9 +381,9 @@ K_STORE *pool_workqueue_store; K_STORE *cmd_workqueue_store; K_STORE *btc_workqueue_store; // this counter ensures we don't switch early from pool0 to pool -int earlysock_left; -int pool0_tot; -int pool0_discarded; +int64_t earlysock_left; +int64_t pool0_tot; +int64_t pool0_discarded; // HEARTBEATQUEUE K_LIST *heartbeatqueue_free; @@ -3356,6 +3371,16 @@ static void *breaker(void *arg) } + // The first one to start + K_WLOCK(breakqueue_free); + if (reload) { + if (!break_reload_stt.tv_sec) + setnow(&break_reload_stt); + } else { + if (!break_cmd_stt.tv_sec) + setnow(&break_cmd_stt); + } + K_WUNLOCK(breakqueue_free); while (!everyone_die) { K_WLOCK(breakqueue_free); bq_item = NULL; @@ -3374,6 +3399,12 @@ static void *breaker(void *arg) if (!bq_item) was_null = true; } + if (bq_item) { + if (reload) + break_reload_processed++; + else + break_cmd_processed++; + } K_WUNLOCK(breakqueue_free); if (!bq_item) { @@ -3443,9 +3474,11 @@ static void *breaker(void *arg) K_RUNLOCK(breakqueue_free); ck_wlock(&breakdown_lock); - if (reload) + if (reload) { reload_breakdown_count--; - else + // The last one to finish - updated each exit + setnow(&break_reload_fin); + } else cmd_breakdown_count--; if ((reload_breakdown_count + cmd_breakdown_count) < 1) { @@ -4493,9 +4526,9 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) static void *clistener(__maybe_unused void *arg) { - uint64_t processed = 0; PGconn *conn = NULL; K_ITEM *wq_item; + tv_t now1, now2; time_t now; pthread_detach(pthread_self()); @@ -4523,13 +4556,16 @@ static void *clistener(__maybe_unused void *arg) } if (wq_item) { - processed++; + setnow(&now1); process_sockd(conn, wq_item); + setnow(&now2); + clis_us += us_tvdiff(&now2, &now1); + clis_processed++; } else - cksleep_ms(42); + cksleep_ms(4); } - LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed); clistener_using_data = false; @@ -4541,9 +4577,9 @@ static void *clistener(__maybe_unused void *arg) static void *blistener(__maybe_unused void *arg) { - uint64_t processed = 0; PGconn *conn = NULL; K_ITEM *wq_item; + tv_t now1, now2; time_t now; pthread_detach(pthread_self()); @@ -4570,13 +4606,16 @@ static void *blistener(__maybe_unused void *arg) } if (wq_item) { - processed++; + setnow(&now1); process_sockd(conn, wq_item); + setnow(&now2); + blis_us += us_tvdiff(&now2, &now1); + blis_processed++; } else - cksleep_ms(142); + cksleep_ms(42); } - LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed); blistener_using_data = false; @@ -4965,9 +5004,8 @@ static void *socketer(void *arg) char *end, *buf = NULL; K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; - uint64_t proc_early = 0, processed = 0; int sockd; - tv_t now; + tv_t now, now1, now2; pthread_detach(pthread_self()); @@ -4988,16 +5026,26 @@ static void *socketer(void *arg) create_pthread(&proc_pt, process_socket, arg); } + setnow(&sock_stt); while (!everyone_die) { + setnow(&now1); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { - LOGERR("%s() Failed to accept on socket", __func__); + int e = errno; + LOGERR("%s() Failed to accept on socket (%d:%s)", + __func__, e, strerror(e)); break; } + setnow(&now2); + sock_us += us_tvdiff(&now2, &now1); + sock_acc++; + setnow(&now1); buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2); // Once we've read the message setnow(&now); + sock_recv_us += us_tvdiff(&now, &now1); + sock_recv++; if (buf) { end = buf + strlen(buf) - 1; // strip trailing \n and \r @@ -5017,10 +5065,13 @@ static void *socketer(void *arg) // Flag all work for pool0 until the reload completes if (prereload || reloading) { seqentryflags = SE_EARLYSOCK; + setnow(&now1); K_WLOCK(workqueue_free); earlysock_left++; K_WUNLOCK(workqueue_free); - proc_early++; + sock_proc_early++; + setnow(&now2); + sock_lock_wq_us += us_tvdiff(&now2, &now1); } if (SEQALL_LOG) { @@ -5044,11 +5095,12 @@ static void *socketer(void *arg) } } - processed++; + sock_processed++; // Don't limit the speed filling up cmd_breakqueue_store + setnow(&now1); K_WLOCK(breakqueue_free); bq_item = k_unlink_head(breakqueue_free); - // keep the lock since none of these should be slow + K_WUNLOCK(breakqueue_free); DATA_BREAKQUEUE(bq, bq_item); bq->buf = buf; copy_tv(&(bq->now), &now); @@ -5056,13 +5108,16 @@ static void *socketer(void *arg) bq->sockd = sockd; if (max_sockd_count < ++sockd_count) max_sockd_count = sockd_count; + K_WLOCK(breakqueue_free); k_add_tail(cmd_breakqueue_store, bq_item); K_WUNLOCK(breakqueue_free); + setnow(&now2); + sock_lock_br_us += us_tvdiff(&now2, &now1); } } LOGNOTICE("%s() exiting, early %"PRIu64" after %"PRIu64" (%"PRIu64")", - __func__, proc_early, processed - proc_early, processed); + __func__, sock_proc_early, sock_processed - sock_proc_early, sock_processed); socketer_using_data = false; @@ -5759,12 +5814,12 @@ static void *listener(void *arg) K_RLOCK(workqueue_free); wq0count = pool0_workqueue_store->count; wqcount = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); K_RLOCK(breakqueue_free); bq = cmd_breakqueue_store->count; bqp = cmd_processing; bqd = cmd_done_breakqueue_store->count; K_RUNLOCK(breakqueue_free); - K_RUNLOCK(workqueue_free); LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); diff --git a/src/ckdb.h b/src/ckdb.h index caf6a07b..f7385c79 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.014" +#define CKDB_VERSION DB_VERSION"-2.015" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -343,6 +343,21 @@ extern tv_t last_share_inv; extern tv_t last_auth; extern cklock_t last_lock; +// Running stats +// socketer() +extern tv_t sock_stt; +extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; +extern uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv; +// breaker() summarised +extern tv_t break_reload_stt, break_cmd_stt, break_reload_fin; +extern uint64_t break_reload_processed, break_cmd_processed; +// clistener() +extern double clis_us; +extern uint64_t clis_processed; +// blistener() +extern double blis_us; +extern uint64_t blis_processed; + #define JSON_TRANSFER "json=" #define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1) #define JSON_BEGIN '{' @@ -1124,9 +1139,9 @@ extern K_STORE *pool_workqueue_store; extern K_STORE *cmd_workqueue_store; extern K_STORE *btc_workqueue_store; // this counter ensures we don't switch early from pool0 to pool -extern int earlysock_left; -extern int pool0_tot; -extern int pool0_discarded; +extern int64_t earlysock_left; +extern int64_t pool0_tot; +extern int64_t pool0_discarded; // HEARTBEATQUEUE typedef struct heartbeatqueue { diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 2aee0972..ec840ebf 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -6778,7 +6778,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, /* Show a share status report on the console * Currently: sequence status, OoO info and max_sockd_count */ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, - __maybe_unused tv_t *now, __maybe_unused char *by, + tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root, @@ -6788,8 +6788,10 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, char buf[256]; int relq_count, _reload_processing, relqd_count; int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; - int _earlysock_left, pool0_count, _pool0_discarded, _pool0_tot; - int poolq_count; + int pool0_count, poolq_count; + int64_t _earlysock_left, _pool0_discarded, _pool0_tot; + uint64_t count1, count2, count3, count4; + double tot1, tot2; LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); @@ -6812,14 +6814,53 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, poolq_count = pool_workqueue_store->count; K_RUNLOCK(workqueue_free); - LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%d pool0=%d/%d/%d " - "poolq=%d max_sockd=%d", + LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%"PRId64 + " pool0=%d/%"PRId64"/%"PRId64" poolq=%d max_sockd=%d", relq_count, _reload_processing, relqd_count, cmdq_count, _cmd_processing, cmdqd_count, _earlysock_left, pool0_count, _pool0_discarded, _pool0_tot, poolq_count, _max_sockd_count); + count1 = sock_acc ? : 1; + count2 = sock_recv ? : 1; + count3 = sock_proc_early ? : 1; + count4 = sock_processed ? : 1; + LOGWARNING(" sock: tot %f sock %f/%"PRIu64"/%f recv %f/%"PRIu64"/%f " + "lckw %f/%"PRIu64"/%f lckb %f/%"PRIu64"/%f", + us_tvdiff(now, &sock_stt)/1000000, + sock_us/1000000, sock_acc, (sock_us/count1)/1000000, + sock_recv_us/1000000, sock_recv, + (sock_recv_us/count2)/1000000, + sock_lock_wq_us/1000000, sock_proc_early, + (sock_lock_wq_us/count3)/1000000, + sock_lock_br_us/1000000, sock_processed, + (sock_lock_br_us/count4)/1000000); + + if (!break_reload_stt.tv_sec) + tot1 = 0; + else { + if (!break_reload_fin.tv_sec) + tot1 = us_tvdiff(now, &break_reload_stt); + else + tot1 = us_tvdiff(&break_reload_fin, &break_reload_stt); + } + if (!break_cmd_stt.tv_sec) + tot2 = 0; + else + tot2 = us_tvdiff(now, &break_cmd_stt); + count1 = break_reload_processed ? : 1; + count2 = break_cmd_processed ? : 1; + LOGWARNING(" break reload: %f/%"PRIu64"/%f cmd: %f/%"PRIu64"/%f", + tot1/1000000, break_reload_processed, (tot1/count1)/1000000, + tot2/1000000, break_cmd_processed, (tot2/count2)/1000000); + + count1 = clis_processed ? : 1; + count2 = blis_processed ? : 1; + LOGWARNING(" clistener %f/%"PRIu64"/%f blistener: %f/%"PRIu64"/%f", + clis_us/1000000, clis_processed, (clis_us/count1)/1000000, + blis_us/1000000, blis_processed, (blis_us/count2)/1000000); + snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf); return strdup(buf);