Browse Source

ckdb - fix pool0->pool handover

master
kanoi 9 years ago
parent
commit
b8f368becd
  1. 49
      src/ckdb.c
  2. 6
      src/ckdb.h
  3. 12
      src/ckdb_cmd.c

49
src/ckdb.c

@ -268,13 +268,15 @@ bool sharesummary_marks_limit = false;
bool db_users_complete = false; bool db_users_complete = false;
// DB load is complete // DB load is complete
bool db_load_complete = false; bool db_load_complete = false;
// Before the reload starts (and during the reload)
bool prereload = true;
// Different input data handling // Different input data handling
bool reloading = false; bool reloading = false;
// Start marks processing during a larger reload // Start marks processing during a larger reload
static bool reloaded_N_files = false; static bool reloaded_N_files = false;
// Data load is complete // Data load is complete
bool startup_complete = false; bool startup_complete = false;
// Set to true the first time workqueue reaches 0 after startup // Set to true when pool0 completes, pool0 = socket data during reload
static bool reload_queue_complete = false; static bool reload_queue_complete = false;
// Tell everyone to die // Tell everyone to die
bool everyone_die = false; bool everyone_die = false;
@ -364,7 +366,7 @@ K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store; K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store; K_STORE *btc_workqueue_store;
// this counter ensures we don't switch early from pool0 to pool // this counter ensures we don't switch early from pool0 to pool
int pool0_left; int earlysock_left;
int pool0_tot; int pool0_tot;
int pool0_discarded; int pool0_discarded;
@ -4498,6 +4500,25 @@ static void *process_socket(void *arg)
DATA_BREAKQUEUE(bq, bq_item); DATA_BREAKQUEUE(bq, bq_item);
DATA_MSGLINE(msgline, bq->ml_item); DATA_MSGLINE(msgline, bq->ml_item);
replied = btc = false; replied = btc = false;
switch (bq->cmdnum) {
case CMD_AUTH:
case CMD_ADDRAUTH:
case CMD_HEARTBEAT:
case CMD_SHARELOG:
case CMD_POOLSTAT:
case CMD_USERSTAT:
case CMD_WORKERSTAT:
case CMD_BLOCK:
break;
default:
// Non-pool commands can't affect pool0
if (bq->seqentryflags == SE_EARLYSOCK) {
K_WLOCK(workqueue_free);
earlysock_left--;
K_WUNLOCK(workqueue_free);
}
break;
}
switch (bq->cmdnum) { switch (bq->cmdnum) {
case CMD_REPLY: case CMD_REPLY:
snprintf(reply, sizeof(reply), snprintf(reply, sizeof(reply),
@ -4745,12 +4766,13 @@ static void *process_socket(void *arg)
k_add_tail(pool_workqueue_store, wq_item); k_add_tail(pool_workqueue_store, wq_item);
else { else {
k_add_tail(pool0_workqueue_store, wq_item); k_add_tail(pool0_workqueue_store, wq_item);
pool0_tot++;
/* Stop the reload queue from growing too big /* Stop the reload queue from growing too big
* Use a size that 'should be big enough' */ * Use a size that 'should be big enough' */
if (reloading && pool0_workqueue_store->count > 250000) { if (reloading && pool0_workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store); K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store);
earlysock_left--;
pool0_discarded++; pool0_discarded++;
pool0_left--;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
WORKQUEUE *wq; WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item); DATA_WORKQUEUE(wq, wq2_item);
@ -4860,11 +4882,11 @@ static void *socketer(void *arg)
} }
} else { } else {
int seqentryflags = SE_SOCKET; int seqentryflags = SE_SOCKET;
if (!reload_queue_complete) { // Flag all work for pool0 until the reload completes
if (prereload || reloading) {
seqentryflags = SE_EARLYSOCK; seqentryflags = SE_EARLYSOCK;
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
pool0_tot++; earlysock_left++;
pool0_left++;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
} }
@ -5229,6 +5251,7 @@ static bool reload_from(tv_t *start)
LOGQUE(reload_buf, true); LOGQUE(reload_buf, true);
LOGQUE(reload_buf, false); LOGQUE(reload_buf, false);
// Start after reloading = true
create_pthread(&proc_pt, process_reload, NULL); create_pthread(&proc_pt, process_reload, NULL);
total = 0; total = 0;
@ -5251,7 +5274,7 @@ static bool reload_from(tv_t *start)
tmp_time = time(NULL); tmp_time = time(NULL);
// Report stats every 15s // Report stats every 15s
if ((tmp_time - tick_time) > 14) { if ((tmp_time - tick_time) > 14) {
int relq, relqd, cmdq, cmdqd, mx, pool0q; int relq, relqd, cmdq, cmdqd, mx, pool0q, poolq;
K_RLOCK(breakqueue_free); K_RLOCK(breakqueue_free);
relq = reload_breakqueue_store->count + relq = reload_breakqueue_store->count +
reload_processing; reload_processing;
@ -5263,12 +5286,13 @@ static bool reload_from(tv_t *start)
K_RUNLOCK(breakqueue_free); K_RUNLOCK(breakqueue_free);
K_RLOCK(workqueue_free); K_RLOCK(workqueue_free);
pool0q = pool0_workqueue_store->count; pool0q = pool0_workqueue_store->count;
poolq = pool_workqueue_store->count;
// pool_workqueue_store should be zero // pool_workqueue_store should be zero
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" printf(TICK_PREFIX"reload %"PRIu64"/%d/%d"
" ckp %d/%d/%d (%d) \r", " ckp %d/%d/%d/%d (%d) \r",
total+count, relq, relqd, total+count, relq, relqd,
cmdq, cmdqd, pool0q, mx); cmdq, cmdqd, pool0q, poolq, mx);
fflush(stdout); fflush(stdout);
tick_time = tmp_time; tick_time = tmp_time;
} }
@ -5290,7 +5314,7 @@ static bool reload_from(tv_t *start)
} else } else
fclose(fp); fclose(fp);
/* Don't free the old filename since /* Don't free the old filename since
* process_reload() could access use it */ * process_reload() could access it */
if (everyone_die) if (everyone_die)
break; break;
reload_timestamp.tv_sec += ROLL_S; reload_timestamp.tv_sec += ROLL_S;
@ -5384,6 +5408,7 @@ static bool reload_from(tv_t *start)
seq_reloadmax(); seq_reloadmax();
prereload = false;
reloading = false; reloading = false;
FREENULL(reload_buf); FREENULL(reload_buf);
return ret; return ret;
@ -5569,12 +5594,12 @@ static void *listener(void *arg)
wq_item = NULL; wq_item = NULL;
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
if (pool0) { if (pool0) {
if (pool0_left == 0) if (earlysock_left == 0)
pool0 = false; pool0 = false;
else { else {
wq_item = k_unlink_head(pool0_workqueue_store); wq_item = k_unlink_head(pool0_workqueue_store);
if (wq_item) if (wq_item)
pool0_left--; earlysock_left--;
} }
} }
if (!pool0) if (!pool0)

6
src/ckdb.h

@ -51,7 +51,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.5" #define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.006" #define CKDB_VERSION DB_VERSION"-2.007"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -324,6 +324,8 @@ extern bool sharesummary_marks_limit;
extern bool db_users_complete; extern bool db_users_complete;
// DB load is complete // DB load is complete
extern bool db_load_complete; extern bool db_load_complete;
// Before the reload starts (and during the reload)
extern bool prereload;
// Different input data handling // Different input data handling
extern bool reloading; extern bool reloading;
// Data load is complete // Data load is complete
@ -1122,7 +1124,7 @@ extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store; extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store; extern K_STORE *btc_workqueue_store;
// this counter ensures we don't switch early from pool0 to pool // this counter ensures we don't switch early from pool0 to pool
extern int pool0_left; extern int earlysock_left;
extern int pool0_tot; extern int pool0_tot;
extern int pool0_discarded; extern int pool0_discarded;

12
src/ckdb_cmd.c

@ -6788,7 +6788,8 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
char buf[256]; char buf[256];
int relq_count, _reload_processing, relqd_count; int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int _pool0_left, _pool0_discarded, _pool0_tot, poolq_count; int _earlysock_left, pool0_count, _pool0_discarded, _pool0_tot;
int poolq_count;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true); sequence_report(true);
@ -6804,16 +6805,19 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
K_RUNLOCK(breakqueue_free); K_RUNLOCK(breakqueue_free);
K_RLOCK(workqueue_free); K_RLOCK(workqueue_free);
_pool0_left = pool0_left; _earlysock_left = earlysock_left;
pool0_count = pool0_workqueue_store->count;
_pool0_discarded = pool0_discarded; _pool0_discarded = pool0_discarded;
_pool0_tot = pool0_tot; _pool0_tot = pool0_tot;
poolq_count = pool_workqueue_store->count; poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d pool0=%d/%d/%d poolq=%d max_sockd=%d", LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%d pool0=%d/%d/%d "
"poolq=%d max_sockd=%d",
relq_count, _reload_processing, relqd_count, relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count, cmdq_count, _cmd_processing, cmdqd_count,
_pool0_left, _pool0_discarded, _pool0_tot, _earlysock_left,
pool0_count, _pool0_discarded, _pool0_tot,
poolq_count, _max_sockd_count); poolq_count, _max_sockd_count);
snprintf(buf, sizeof(buf), "ok.%s", cmd); snprintf(buf, sizeof(buf), "ok.%s", cmd);

Loading…
Cancel
Save