diff --git a/src/ckdb.c b/src/ckdb.c index 0a034cab..73a40ff6 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -268,13 +268,15 @@ bool sharesummary_marks_limit = false; bool db_users_complete = false; // DB load is complete bool db_load_complete = false; +// Before the reload starts (and during the reload) +bool prereload = true; // Different input data handling bool reloading = false; // Start marks processing during a larger reload static bool reloaded_N_files = false; // Data load is complete bool startup_complete = false; -// Set to true the first time workqueue reaches 0 after startup +// Set to true when pool0 completes, pool0 = socket data during reload static bool reload_queue_complete = false; // Tell everyone to die bool everyone_die = false; @@ -364,7 +366,7 @@ K_STORE *pool_workqueue_store; K_STORE *cmd_workqueue_store; K_STORE *btc_workqueue_store; // this counter ensures we don't switch early from pool0 to pool -int pool0_left; +int earlysock_left; int pool0_tot; int pool0_discarded; @@ -4498,6 +4500,25 @@ static void *process_socket(void *arg) DATA_BREAKQUEUE(bq, bq_item); DATA_MSGLINE(msgline, bq->ml_item); replied = btc = false; + switch (bq->cmdnum) { + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + break; + default: + // Non-pool commands can't affect pool0 + if (bq->seqentryflags == SE_EARLYSOCK) { + K_WLOCK(workqueue_free); + earlysock_left--; + K_WUNLOCK(workqueue_free); + } + break; + } switch (bq->cmdnum) { case CMD_REPLY: snprintf(reply, sizeof(reply), @@ -4745,12 +4766,13 @@ static void *process_socket(void *arg) k_add_tail(pool_workqueue_store, wq_item); else { k_add_tail(pool0_workqueue_store, wq_item); + pool0_tot++; /* Stop the reload queue from growing too big * Use a size that 'should be big enough' */ if (reloading && pool0_workqueue_store->count > 250000) { K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store); + earlysock_left--; pool0_discarded++; - pool0_left--; K_WUNLOCK(workqueue_free); WORKQUEUE *wq; DATA_WORKQUEUE(wq, wq2_item); @@ -4860,11 +4882,11 @@ static void *socketer(void *arg) } } else { int seqentryflags = SE_SOCKET; - if (!reload_queue_complete) { + // Flag all work for pool0 until the reload completes + if (prereload || reloading) { seqentryflags = SE_EARLYSOCK; K_WLOCK(workqueue_free); - pool0_tot++; - pool0_left++; + earlysock_left++; K_WUNLOCK(workqueue_free); } @@ -5229,6 +5251,7 @@ static bool reload_from(tv_t *start) LOGQUE(reload_buf, true); LOGQUE(reload_buf, false); + // Start after reloading = true create_pthread(&proc_pt, process_reload, NULL); total = 0; @@ -5251,7 +5274,7 @@ static bool reload_from(tv_t *start) tmp_time = time(NULL); // Report stats every 15s if ((tmp_time - tick_time) > 14) { - int relq, relqd, cmdq, cmdqd, mx, pool0q; + int relq, relqd, cmdq, cmdqd, mx, pool0q, poolq; K_RLOCK(breakqueue_free); relq = reload_breakqueue_store->count + reload_processing; @@ -5263,12 +5286,13 @@ static bool reload_from(tv_t *start) K_RUNLOCK(breakqueue_free); K_RLOCK(workqueue_free); pool0q = pool0_workqueue_store->count; + poolq = pool_workqueue_store->count; // pool_workqueue_store should be zero K_RUNLOCK(workqueue_free); printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" - " ckp %d/%d/%d (%d) \r", + " ckp %d/%d/%d/%d (%d) \r", total+count, relq, relqd, - cmdq, cmdqd, pool0q, mx); + cmdq, cmdqd, pool0q, poolq, mx); fflush(stdout); tick_time = tmp_time; } @@ -5290,7 +5314,7 @@ static bool reload_from(tv_t *start) } else fclose(fp); /* Don't free the old filename since - * process_reload() could access use it */ + * process_reload() could access it */ if (everyone_die) break; reload_timestamp.tv_sec += ROLL_S; @@ -5384,6 +5408,7 @@ static bool reload_from(tv_t *start) seq_reloadmax(); + prereload = false; reloading = false; FREENULL(reload_buf); return ret; @@ -5569,12 +5594,12 @@ static void *listener(void *arg) wq_item = NULL; K_WLOCK(workqueue_free); if (pool0) { - if (pool0_left == 0) + if (earlysock_left == 0) pool0 = false; else { wq_item = k_unlink_head(pool0_workqueue_store); if (wq_item) - pool0_left--; + earlysock_left--; } } if (!pool0) diff --git a/src/ckdb.h b/src/ckdb.h index 3d9cd2ca..9d1cc1b2 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.006" +#define CKDB_VERSION DB_VERSION"-2.007" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -324,6 +324,8 @@ extern bool sharesummary_marks_limit; extern bool db_users_complete; // DB load is complete extern bool db_load_complete; +// Before the reload starts (and during the reload) +extern bool prereload; // Different input data handling extern bool reloading; // Data load is complete @@ -1122,7 +1124,7 @@ extern K_STORE *pool_workqueue_store; extern K_STORE *cmd_workqueue_store; extern K_STORE *btc_workqueue_store; // this counter ensures we don't switch early from pool0 to pool -extern int pool0_left; +extern int earlysock_left; extern int pool0_tot; extern int pool0_discarded; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 1553bc8d..2aee0972 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -6788,7 +6788,8 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, char buf[256]; int relq_count, _reload_processing, relqd_count; int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; - int _pool0_left, _pool0_discarded, _pool0_tot, poolq_count; + int _earlysock_left, pool0_count, _pool0_discarded, _pool0_tot; + int poolq_count; LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); @@ -6804,16 +6805,19 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, K_RUNLOCK(breakqueue_free); K_RLOCK(workqueue_free); - _pool0_left = pool0_left; + _earlysock_left = earlysock_left; + pool0_count = pool0_workqueue_store->count; _pool0_discarded = pool0_discarded; _pool0_tot = pool0_tot; poolq_count = pool_workqueue_store->count; K_RUNLOCK(workqueue_free); - LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d pool0=%d/%d/%d poolq=%d max_sockd=%d", + LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%d pool0=%d/%d/%d " + "poolq=%d max_sockd=%d", relq_count, _reload_processing, relqd_count, cmdq_count, _cmd_processing, cmdqd_count, - _pool0_left, _pool0_discarded, _pool0_tot, + _earlysock_left, + pool0_count, _pool0_discarded, _pool0_tot, poolq_count, _max_sockd_count); snprintf(buf, sizeof(buf), "ok.%s", cmd);