diff --git a/src/ckdb.c b/src/ckdb.c index 169b2a38..08b3855f 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -351,16 +351,23 @@ K_STORE *cmd_breakqueue_store; K_STORE *cmd_done_breakqueue_store; // Locked access with breakqueue_free static int reload_processing; +static int cmd_processing; static int sockd_count; int max_sockd_count; // WORKQUEUE K_LIST *workqueue_free; +// pool0 is all pool data during the reload +K_STORE *pool0_workqueue_store; K_STORE *pool_workqueue_store; K_STORE *cmd_workqueue_store; K_STORE *btc_workqueue_store; mutex_t wq_waitlock; pthread_cond_t wq_waitcond; +// this counter ensures we don't switch early from pool0 to pool +static int pool0_left; +static int pool0_tot; +static int pool0_discarded; // HEARTBEATQUEUE K_LIST *heartbeatqueue_free; @@ -1166,6 +1173,7 @@ static void alloc_storage() workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); + pool0_workqueue_store = k_new_store(workqueue_free); pool_workqueue_store = k_new_store(workqueue_free); cmd_workqueue_store = k_new_store(workqueue_free); btc_workqueue_store = k_new_store(workqueue_free); @@ -1733,6 +1741,7 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); // TODO: msgline + FREE_STORE(pool0_workqueue); FREE_STORE(pool_workqueue); FREE_STORE(cmd_workqueue); FREE_STORE(btc_workqueue); @@ -4473,6 +4482,8 @@ static void *process_socket(void *arg) while (!everyone_die) { K_WLOCK(breakqueue_free); bq_item = k_unlink_head(cmd_done_breakqueue_store); + if (bq_item) + cmd_processing++; K_WUNLOCK(breakqueue_free); if (!bq_item) { @@ -4726,21 +4737,27 @@ static void *process_socket(void *arg) workqueue->by = by_default; workqueue->code = (char *)__func__; workqueue->inet = inet_default; - k_add_tail(pool_workqueue_store, wq_item); - /* Stop the reload queue from growing too big - * Use a size that 'should be big enough' */ - while (reloading && pool_workqueue_store->count > 250000) { - K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); - K_WUNLOCK(workqueue_free); - WORKQUEUE *wq; - DATA_WORKQUEUE(wq, wq2_item); - K_ITEM *ml_item = wq->msgline_item; - free_msgline_data(ml_item, true, false); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - K_WLOCK(workqueue_free); - k_add_head(workqueue_free, wq2_item); + if (bq->seqentryflags == SE_SOCKET) + k_add_tail(pool_workqueue_store, wq_item); + else { + k_add_tail(pool0_workqueue_store, wq_item); + /* Stop the reload queue from growing too big + * Use a size that 'should be big enough' */ + if (reloading && pool0_workqueue_store->count > 250000) { + K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store); + pool0_discarded++; + pool0_left--; + K_WUNLOCK(workqueue_free); + WORKQUEUE *wq; + DATA_WORKQUEUE(wq, wq2_item); + K_ITEM *ml_item = wq->msgline_item; + free_msgline_data(ml_item, true, false); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq2_item); + } } K_WUNLOCK(workqueue_free); wq_item = bq->ml_item = NULL; @@ -4778,6 +4795,7 @@ static void *process_socket(void *arg) K_WLOCK(breakqueue_free); if (dec_sockd) sockd_count--; + cmd_processing--; k_add_head(breakqueue_free, bq_item); K_WUNLOCK(breakqueue_free); } @@ -4841,8 +4859,13 @@ static void *socketer(void *arg) } } else { int seqentryflags = SE_SOCKET; - if (!reload_queue_complete) + if (!reload_queue_complete) { seqentryflags = SE_EARLYSOCK; + K_WLOCK(workqueue_free); + pool0_tot++; + pool0_left++; + K_WUNLOCK(workqueue_free); + } // Don't limit the speed filling up cmd_breakqueue_store K_WLOCK(breakqueue_free); @@ -5227,22 +5250,24 @@ static bool reload_from(tv_t *start) tmp_time = time(NULL); // Report stats every 15s if ((tmp_time - tick_time) > 14) { - int relq, relqd, cmdq, cmdqd, mx, poolq; + int relq, relqd, cmdq, cmdqd, mx, pool0q; K_RLOCK(breakqueue_free); relq = reload_breakqueue_store->count + reload_processing; relqd = reload_done_breakqueue_store->count; - cmdq = cmd_breakqueue_store->count; + cmdq = cmd_breakqueue_store->count + + cmd_processing; cmdqd = cmd_done_breakqueue_store->count; mx = max_sockd_count; K_RUNLOCK(breakqueue_free); K_RLOCK(workqueue_free); - poolq = pool_workqueue_store->count; + pool0q = pool0_workqueue_store->count; + // pool_workqueue_store should be zero K_RUNLOCK(workqueue_free); printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" " ckp %d/%d/%d (%d) \r", total+count, relq, relqd, - cmdq, cmdqd, poolq, mx); + cmdq, cmdqd, pool0q, mx); fflush(stdout); tick_time = tmp_time; } @@ -5436,16 +5461,15 @@ static void *listener(void *arg) pthread_t break_pt; K_ITEM *wq_item; time_t now; - int wqcount, wqgot; + int bq, bqp, bqd, wq0count, wqcount, wqgot; char ooo_buf[256]; tv_t wq_stt, wq_fin; double min, sec; - int left; SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; int cpus, i; - bool reloader, cmder; + bool reloader, cmder, pool0; pthread_detach(pthread_self()); @@ -5500,13 +5524,22 @@ static void *listener(void *arg) if (!everyone_die) { K_RLOCK(workqueue_free); + wq0count = pool0_workqueue_store->count; wqcount = pool_workqueue_store->count; + K_RLOCK(breakqueue_free); + bq = cmd_breakqueue_store->count; + bqp = cmd_processing; + bqd = cmd_done_breakqueue_store->count; + K_RUNLOCK(breakqueue_free); K_RUNLOCK(workqueue_free); - LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + LOGWARNING("reload shares OoO %s", + ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); - LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); + LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)", + __func__, bq+bqp+bqd+wq0count+wqcount, + bq, bqp, bqd, wq0count, wqcount); /* Until startup_complete, the values should be ignored * Setting them to 'now' means that they won't time out @@ -5528,14 +5561,26 @@ static void *listener(void *arg) wqgot = 0; } - // Process queued work + /* Process queued work - ensure pool0 is emptied first, + * even if there is pending pool0 data being processed by breaker() */ + pool0 = true; while (!everyone_die) { + wq_item = NULL; K_WLOCK(workqueue_free); - wq_item = k_unlink_head(pool_workqueue_store); - left = pool_workqueue_store->count; + if (pool0) { + if (pool0_left == 0) + pool0 = false; + else { + wq_item = k_unlink_head(pool0_workqueue_store); + if (wq_item) + pool0_left--; + } + } + if (!pool0) + wq_item = k_unlink_head(pool_workqueue_store); K_WUNLOCK(workqueue_free); - if (left == 0 && wq_stt.tv_sec != 0L) + if (!pool0 && wq_stt.tv_sec != 0L) setnow(&wq_fin); /* Don't keep a connection for more than ~10s or ~10000 items @@ -5553,11 +5598,11 @@ static void *listener(void *arg) tick(); } - if (left == 0 && wq_stt.tv_sec != 0L) { + if (!pool0 && wq_stt.tv_sec != 0L) { sec = tvdiff(&wq_fin, &wq_stt); min = floor(sec / 60.0); sec -= min * 60.0; - LOGWARNING("reload queue completed %.0fm %.3fs", min, sec); + LOGWARNING("pool queue completed %.0fm %.3fs", min, sec); // Used as the flag to display the message once wq_stt.tv_sec = 0L; reload_queue_complete = true; diff --git a/src/ckdb.h b/src/ckdb.h index 2bbaeaca..f099ed04 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.002" +#define CKDB_VERSION DB_VERSION"-2.003" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1111,6 +1111,8 @@ typedef struct workqueue { #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) extern K_LIST *workqueue_free; +// pool0 is all pool data during the reload +extern K_STORE *pool0_workqueue_store; extern K_STORE *pool_workqueue_store; extern K_STORE *cmd_workqueue_store; extern K_STORE *btc_workqueue_store;