Browse Source

ckdb - ensure all earlysock data is processed first

master
kanoi 9 years ago
parent
commit
fc9f809e01
  1. 79
      src/ckdb.c
  2. 4
      src/ckdb.h

79
src/ckdb.c

@ -351,16 +351,23 @@ K_STORE *cmd_breakqueue_store;
K_STORE *cmd_done_breakqueue_store; K_STORE *cmd_done_breakqueue_store;
// Locked access with breakqueue_free // Locked access with breakqueue_free
static int reload_processing; static int reload_processing;
static int cmd_processing;
static int sockd_count; static int sockd_count;
int max_sockd_count; int max_sockd_count;
// WORKQUEUE // WORKQUEUE
K_LIST *workqueue_free; K_LIST *workqueue_free;
// pool0 is all pool data during the reload
K_STORE *pool0_workqueue_store;
K_STORE *pool_workqueue_store; K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store; K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store; K_STORE *btc_workqueue_store;
mutex_t wq_waitlock; mutex_t wq_waitlock;
pthread_cond_t wq_waitcond; pthread_cond_t wq_waitcond;
// this counter ensures we don't switch early from pool0 to pool
static int pool0_left;
static int pool0_tot;
static int pool0_discarded;
// HEARTBEATQUEUE // HEARTBEATQUEUE
K_LIST *heartbeatqueue_free; K_LIST *heartbeatqueue_free;
@ -1166,6 +1173,7 @@ static void alloc_storage()
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
pool0_workqueue_store = k_new_store(workqueue_free);
pool_workqueue_store = k_new_store(workqueue_free); pool_workqueue_store = k_new_store(workqueue_free);
cmd_workqueue_store = k_new_store(workqueue_free); cmd_workqueue_store = k_new_store(workqueue_free);
btc_workqueue_store = k_new_store(workqueue_free); btc_workqueue_store = k_new_store(workqueue_free);
@ -1733,6 +1741,7 @@ static void dealloc_storage()
FREE_LIST(transfer); FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue); FREE_LISTS(heartbeatqueue);
// TODO: msgline // TODO: msgline
FREE_STORE(pool0_workqueue);
FREE_STORE(pool_workqueue); FREE_STORE(pool_workqueue);
FREE_STORE(cmd_workqueue); FREE_STORE(cmd_workqueue);
FREE_STORE(btc_workqueue); FREE_STORE(btc_workqueue);
@ -4473,6 +4482,8 @@ static void *process_socket(void *arg)
while (!everyone_die) { while (!everyone_die) {
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
bq_item = k_unlink_head(cmd_done_breakqueue_store); bq_item = k_unlink_head(cmd_done_breakqueue_store);
if (bq_item)
cmd_processing++;
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
if (!bq_item) { if (!bq_item) {
@ -4726,11 +4737,16 @@ static void *process_socket(void *arg)
workqueue->by = by_default; workqueue->by = by_default;
workqueue->code = (char *)__func__; workqueue->code = (char *)__func__;
workqueue->inet = inet_default; workqueue->inet = inet_default;
if (bq->seqentryflags == SE_SOCKET)
k_add_tail(pool_workqueue_store, wq_item); k_add_tail(pool_workqueue_store, wq_item);
else {
k_add_tail(pool0_workqueue_store, wq_item);
/* Stop the reload queue from growing too big /* Stop the reload queue from growing too big
* Use a size that 'should be big enough' */ * Use a size that 'should be big enough' */
while (reloading && pool_workqueue_store->count > 250000) { if (reloading && pool0_workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store);
pool0_discarded++;
pool0_left--;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
WORKQUEUE *wq; WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item); DATA_WORKQUEUE(wq, wq2_item);
@ -4742,6 +4758,7 @@ static void *process_socket(void *arg)
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq2_item); k_add_head(workqueue_free, wq2_item);
} }
}
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
wq_item = bq->ml_item = NULL; wq_item = bq->ml_item = NULL;
mutex_lock(&wq_waitlock); mutex_lock(&wq_waitlock);
@ -4778,6 +4795,7 @@ static void *process_socket(void *arg)
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
if (dec_sockd) if (dec_sockd)
sockd_count--; sockd_count--;
cmd_processing--;
k_add_head(breakqueue_free, bq_item); k_add_head(breakqueue_free, bq_item);
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
} }
@ -4841,8 +4859,13 @@ static void *socketer(void *arg)
} }
} else { } else {
int seqentryflags = SE_SOCKET; int seqentryflags = SE_SOCKET;
if (!reload_queue_complete) if (!reload_queue_complete) {
seqentryflags = SE_EARLYSOCK; seqentryflags = SE_EARLYSOCK;
K_WLOCK(workqueue_free);
pool0_tot++;
pool0_left++;
K_WUNLOCK(workqueue_free);
}
// Don't limit the speed filling up cmd_breakqueue_store // Don't limit the speed filling up cmd_breakqueue_store
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
@ -5227,22 +5250,24 @@ static bool reload_from(tv_t *start)
tmp_time = time(NULL); tmp_time = time(NULL);
// Report stats every 15s // Report stats every 15s
if ((tmp_time - tick_time) > 14) { if ((tmp_time - tick_time) > 14) {
int relq, relqd, cmdq, cmdqd, mx, poolq; int relq, relqd, cmdq, cmdqd, mx, pool0q;
K_RLOCK(breakqueue_free); K_RLOCK(breakqueue_free);
relq = reload_breakqueue_store->count + relq = reload_breakqueue_store->count +
reload_processing; reload_processing;
relqd = reload_done_breakqueue_store->count; relqd = reload_done_breakqueue_store->count;
cmdq = cmd_breakqueue_store->count; cmdq = cmd_breakqueue_store->count +
cmd_processing;
cmdqd = cmd_done_breakqueue_store->count; cmdqd = cmd_done_breakqueue_store->count;
mx = max_sockd_count; mx = max_sockd_count;
K_RUNLOCK(breakqueue_free); K_RUNLOCK(breakqueue_free);
K_RLOCK(workqueue_free); K_RLOCK(workqueue_free);
poolq = pool_workqueue_store->count; pool0q = pool0_workqueue_store->count;
// pool_workqueue_store should be zero
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" printf(TICK_PREFIX"reload %"PRIu64"/%d/%d"
" ckp %d/%d/%d (%d) \r", " ckp %d/%d/%d (%d) \r",
total+count, relq, relqd, total+count, relq, relqd,
cmdq, cmdqd, poolq, mx); cmdq, cmdqd, pool0q, mx);
fflush(stdout); fflush(stdout);
tick_time = tmp_time; tick_time = tmp_time;
} }
@ -5436,16 +5461,15 @@ static void *listener(void *arg)
pthread_t break_pt; pthread_t break_pt;
K_ITEM *wq_item; K_ITEM *wq_item;
time_t now; time_t now;
int wqcount, wqgot; int bq, bqp, bqd, wq0count, wqcount, wqgot;
char ooo_buf[256]; char ooo_buf[256];
tv_t wq_stt, wq_fin; tv_t wq_stt, wq_fin;
double min, sec; double min, sec;
int left;
SEQSET *seqset = NULL; SEQSET *seqset = NULL;
SEQDATA *seqdata; SEQDATA *seqdata;
K_ITEM *ss_item; K_ITEM *ss_item;
int cpus, i; int cpus, i;
bool reloader, cmder; bool reloader, cmder, pool0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -5500,13 +5524,22 @@ static void *listener(void *arg)
if (!everyone_die) { if (!everyone_die) {
K_RLOCK(workqueue_free); K_RLOCK(workqueue_free);
wq0count = pool0_workqueue_store->count;
wqcount = pool_workqueue_store->count; wqcount = pool_workqueue_store->count;
K_RLOCK(breakqueue_free);
bq = cmd_breakqueue_store->count;
bqp = cmd_processing;
bqd = cmd_done_breakqueue_store->count;
K_RUNLOCK(breakqueue_free);
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); LOGWARNING("reload shares OoO %s",
ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true); sequence_report(true);
LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)",
__func__, bq+bqp+bqd+wq0count+wqcount,
bq, bqp, bqd, wq0count, wqcount);
/* Until startup_complete, the values should be ignored /* Until startup_complete, the values should be ignored
* Setting them to 'now' means that they won't time out * Setting them to 'now' means that they won't time out
@ -5528,14 +5561,26 @@ static void *listener(void *arg)
wqgot = 0; wqgot = 0;
} }
// Process queued work /* Process queued work - ensure pool0 is emptied first,
* even if there is pending pool0 data being processed by breaker() */
pool0 = true;
while (!everyone_die) { while (!everyone_die) {
wq_item = NULL;
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
if (pool0) {
if (pool0_left == 0)
pool0 = false;
else {
wq_item = k_unlink_head(pool0_workqueue_store);
if (wq_item)
pool0_left--;
}
}
if (!pool0)
wq_item = k_unlink_head(pool_workqueue_store); wq_item = k_unlink_head(pool_workqueue_store);
left = pool_workqueue_store->count;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
if (left == 0 && wq_stt.tv_sec != 0L) if (!pool0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin); setnow(&wq_fin);
/* Don't keep a connection for more than ~10s or ~10000 items /* Don't keep a connection for more than ~10s or ~10000 items
@ -5553,11 +5598,11 @@ static void *listener(void *arg)
tick(); tick();
} }
if (left == 0 && wq_stt.tv_sec != 0L) { if (!pool0 && wq_stt.tv_sec != 0L) {
sec = tvdiff(&wq_fin, &wq_stt); sec = tvdiff(&wq_fin, &wq_stt);
min = floor(sec / 60.0); min = floor(sec / 60.0);
sec -= min * 60.0; sec -= min * 60.0;
LOGWARNING("reload queue completed %.0fm %.3fs", min, sec); LOGWARNING("pool queue completed %.0fm %.3fs", min, sec);
// Used as the flag to display the message once // Used as the flag to display the message once
wq_stt.tv_sec = 0L; wq_stt.tv_sec = 0L;
reload_queue_complete = true; reload_queue_complete = true;

4
src/ckdb.h

@ -51,7 +51,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.5" #define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.002" #define CKDB_VERSION DB_VERSION"-2.003"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1111,6 +1111,8 @@ typedef struct workqueue {
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
extern K_LIST *workqueue_free; extern K_LIST *workqueue_free;
// pool0 is all pool data during the reload
extern K_STORE *pool0_workqueue_store;
extern K_STORE *pool_workqueue_store; extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store; extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store; extern K_STORE *btc_workqueue_store;

Loading…
Cancel
Save