diff --git a/src/ckdb.c b/src/ckdb.c index 08b3855f..3f7c0f52 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -349,10 +349,11 @@ K_STORE *reload_breakqueue_store; K_STORE *reload_done_breakqueue_store; K_STORE *cmd_breakqueue_store; K_STORE *cmd_done_breakqueue_store; + // Locked access with breakqueue_free -static int reload_processing; -static int cmd_processing; -static int sockd_count; +int reload_processing; +int cmd_processing; +int sockd_count; int max_sockd_count; // WORKQUEUE @@ -362,12 +363,10 @@ K_STORE *pool0_workqueue_store; K_STORE *pool_workqueue_store; K_STORE *cmd_workqueue_store; K_STORE *btc_workqueue_store; -mutex_t wq_waitlock; -pthread_cond_t wq_waitcond; // this counter ensures we don't switch early from pool0 to pool -static int pool0_left; -static int pool0_tot; -static int pool0_discarded; +int pool0_left; +int pool0_tot; +int pool0_discarded; // HEARTBEATQUEUE K_LIST *heartbeatqueue_free; @@ -1784,8 +1783,6 @@ static bool setup_data() cklock_init(&fpm_lock); cksem_init(&socketer_sem); - mutex_init(&wq_waitlock); - cond_init(&wq_waitcond); LOGWARNING("%sSequence processing is %s", ignore_seq ? "ALERT: " : "", @@ -3223,7 +3220,7 @@ static void *breaker(void *arg) BREAKQUEUE *bq = NULL; char buf[128]; int thr, zeros; - bool reload, was_zero, msg = false; + bool reload, was_null, msg = false; int queue_sleep, queue_limit, count; pthread_detach(pthread_self()); @@ -3258,7 +3255,7 @@ static void *breaker(void *arg) if (reload) { /* reload has to wait for the reload to start, however, also - * check for startup_complete in case we missed the reload */ + * check for startup_complete in case we miss the reload */ while (!everyone_die && !reloading && !startup_complete) cksleep_ms(queue_sleep); } @@ -3266,7 +3263,7 @@ static void *breaker(void *arg) while (!everyone_die) { K_WLOCK(breakqueue_free); bq_item = NULL; - was_zero = false; + was_null = false; if (reload) count = reload_done_breakqueue_store->count; else @@ -3279,13 +3276,13 @@ static void *breaker(void *arg) else bq_item = k_unlink_head(cmd_breakqueue_store); if (!bq_item) - was_zero = true; + was_null = true; } K_WUNLOCK(breakqueue_free); if (!bq_item) { // Is the queue empty and the reload completed? - if (was_zero && reload && !reloading) + if (was_null && reload && !reloading) break; cksleep_ms(queue_sleep); @@ -4487,7 +4484,7 @@ static void *process_socket(void *arg) K_WUNLOCK(breakqueue_free); if (!bq_item) { - cksleep_ms(24); + cksleep_ms(CMD_QUEUE_SLEEP); continue; } @@ -4761,9 +4758,6 @@ static void *process_socket(void *arg) } K_WUNLOCK(workqueue_free); wq_item = bq->ml_item = NULL; - mutex_lock(&wq_waitlock); - pthread_cond_signal(&wq_waitcond); - mutex_unlock(&wq_waitlock); break; // Code error default: @@ -5636,19 +5630,8 @@ static void *listener(void *arg) } if (!wq_item) { - const ts_t tsdiff = {0, 420000000}; - tv_t now; - ts_t abs; - POOLINSTANCE_DATA_MSG(); - - tv_time(&now); - tv_to_ts(&abs, &now); - timeraddspec(&abs, &tsdiff); - - mutex_lock(&wq_waitlock); - cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); - mutex_unlock(&wq_waitlock); + cksleep_ms(CMD_QUEUE_SLEEP); } } diff --git a/src/ckdb.h b/src/ckdb.h index f099ed04..47d37b0c 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.003" +#define CKDB_VERSION DB_VERSION"-2.004" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1083,17 +1083,22 @@ typedef struct breakqueue { * The reload also uses this limit when filling the reload break queue * thus limiting the line processing of reload files */ -// 16300,42 equated to single thread limitation of ~388k per second #define RELOAD_QUEUE_LIMIT 16300 #define RELOAD_QUEUE_SLEEP 42 -#define CMD_QUEUE_LIMIT 16300 -#define CMD_QUEUE_SLEEP 42 +// Don't really limit the cmd queue +#define CMD_QUEUE_LIMIT 1048500 +#define CMD_QUEUE_SLEEP 1 extern K_LIST *breakqueue_free; extern K_STORE *reload_breakqueue_store; extern K_STORE *reload_done_breakqueue_store; extern K_STORE *cmd_breakqueue_store; extern K_STORE *cmd_done_breakqueue_store; + +// Locked access with breakqueue_free +extern int reload_processing; +extern int cmd_processing; +extern int sockd_count; extern int max_sockd_count; // WORKQUEUE @@ -1116,8 +1121,10 @@ extern K_STORE *pool0_workqueue_store; extern K_STORE *pool_workqueue_store; extern K_STORE *cmd_workqueue_store; extern K_STORE *btc_workqueue_store; -extern mutex_t wq_waitlock; -extern pthread_cond_t wq_waitcond; +// this counter ensures we don't switch early from pool0 to pool +extern int pool0_left; +extern int pool0_tot; +extern int pool0_discarded; // HEARTBEATQUEUE typedef struct heartbeatqueue { diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 62b7ac0b..1553bc8d 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -6786,15 +6786,35 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, { char ooo_buf[256]; char buf[256]; - int count; + int relq_count, _reload_processing, relqd_count; + int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; + int _pool0_left, _pool0_discarded, _pool0_tot, poolq_count; LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); K_RLOCK(breakqueue_free); - count = max_sockd_count; + relq_count = reload_breakqueue_store->count; + _reload_processing = reload_processing; + relqd_count = reload_done_breakqueue_store->count; + cmdq_count = cmd_breakqueue_store->count; + _cmd_processing = cmd_processing; + cmdqd_count = cmd_done_breakqueue_store->count; + _max_sockd_count = max_sockd_count; K_RUNLOCK(breakqueue_free); - LOGWARNING(" max_sockd_count=%d", count); + + K_RLOCK(workqueue_free); + _pool0_left = pool0_left; + _pool0_discarded = pool0_discarded; + _pool0_tot = pool0_tot; + poolq_count = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); + + LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d pool0=%d/%d/%d poolq=%d max_sockd=%d", + relq_count, _reload_processing, relqd_count, + cmdq_count, _cmd_processing, cmdqd_count, + _pool0_left, _pool0_discarded, _pool0_tot, + poolq_count, _max_sockd_count); snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf);