Browse Source

ckdb - report queues in cmd_shsta and change the processing delays

master
kanoi 9 years ago
parent
commit
c9a23df224
  1. 45
      src/ckdb.c
  2. 19
      src/ckdb.h
  3. 26
      src/ckdb_cmd.c

45
src/ckdb.c

@ -349,10 +349,11 @@ K_STORE *reload_breakqueue_store;
K_STORE *reload_done_breakqueue_store;
K_STORE *cmd_breakqueue_store;
K_STORE *cmd_done_breakqueue_store;
// Locked access with breakqueue_free
static int reload_processing;
static int cmd_processing;
static int sockd_count;
int reload_processing;
int cmd_processing;
int sockd_count;
int max_sockd_count;
// WORKQUEUE
@ -362,12 +363,10 @@ K_STORE *pool0_workqueue_store;
K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store;
mutex_t wq_waitlock;
pthread_cond_t wq_waitcond;
// this counter ensures we don't switch early from pool0 to pool
static int pool0_left;
static int pool0_tot;
static int pool0_discarded;
int pool0_left;
int pool0_tot;
int pool0_discarded;
// HEARTBEATQUEUE
K_LIST *heartbeatqueue_free;
@ -1784,8 +1783,6 @@ static bool setup_data()
cklock_init(&fpm_lock);
cksem_init(&socketer_sem);
mutex_init(&wq_waitlock);
cond_init(&wq_waitcond);
LOGWARNING("%sSequence processing is %s",
ignore_seq ? "ALERT: " : "",
@ -3223,7 +3220,7 @@ static void *breaker(void *arg)
BREAKQUEUE *bq = NULL;
char buf[128];
int thr, zeros;
bool reload, was_zero, msg = false;
bool reload, was_null, msg = false;
int queue_sleep, queue_limit, count;
pthread_detach(pthread_self());
@ -3258,7 +3255,7 @@ static void *breaker(void *arg)
if (reload) {
/* reload has to wait for the reload to start, however, also
* check for startup_complete in case we missed the reload */
* check for startup_complete in case we miss the reload */
while (!everyone_die && !reloading && !startup_complete)
cksleep_ms(queue_sleep);
}
@ -3266,7 +3263,7 @@ static void *breaker(void *arg)
while (!everyone_die) {
K_WLOCK(breakqueue_free);
bq_item = NULL;
was_zero = false;
was_null = false;
if (reload)
count = reload_done_breakqueue_store->count;
else
@ -3279,13 +3276,13 @@ static void *breaker(void *arg)
else
bq_item = k_unlink_head(cmd_breakqueue_store);
if (!bq_item)
was_zero = true;
was_null = true;
}
K_WUNLOCK(breakqueue_free);
if (!bq_item) {
// Is the queue empty and the reload completed?
if (was_zero && reload && !reloading)
if (was_null && reload && !reloading)
break;
cksleep_ms(queue_sleep);
@ -4487,7 +4484,7 @@ static void *process_socket(void *arg)
K_WUNLOCK(breakqueue_free);
if (!bq_item) {
cksleep_ms(24);
cksleep_ms(CMD_QUEUE_SLEEP);
continue;
}
@ -4761,9 +4758,6 @@ static void *process_socket(void *arg)
}
K_WUNLOCK(workqueue_free);
wq_item = bq->ml_item = NULL;
mutex_lock(&wq_waitlock);
pthread_cond_signal(&wq_waitcond);
mutex_unlock(&wq_waitlock);
break;
// Code error
default:
@ -5636,19 +5630,8 @@ static void *listener(void *arg)
}
if (!wq_item) {
const ts_t tsdiff = {0, 420000000};
tv_t now;
ts_t abs;
POOLINSTANCE_DATA_MSG();
tv_time(&now);
tv_to_ts(&abs, &now);
timeraddspec(&abs, &tsdiff);
mutex_lock(&wq_waitlock);
cond_timedwait(&wq_waitcond, &wq_waitlock, &abs);
mutex_unlock(&wq_waitlock);
cksleep_ms(CMD_QUEUE_SLEEP);
}
}

19
src/ckdb.h

@ -51,7 +51,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.003"
#define CKDB_VERSION DB_VERSION"-2.004"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1083,17 +1083,22 @@ typedef struct breakqueue {
* The reload also uses this limit when filling the reload break queue
* thus limiting the line processing of reload files
*/
// 16300,42 equated to single thread limitation of ~388k per second
#define RELOAD_QUEUE_LIMIT 16300
#define RELOAD_QUEUE_SLEEP 42
#define CMD_QUEUE_LIMIT 16300
#define CMD_QUEUE_SLEEP 42
// Don't really limit the cmd queue
#define CMD_QUEUE_LIMIT 1048500
#define CMD_QUEUE_SLEEP 1
extern K_LIST *breakqueue_free;
extern K_STORE *reload_breakqueue_store;
extern K_STORE *reload_done_breakqueue_store;
extern K_STORE *cmd_breakqueue_store;
extern K_STORE *cmd_done_breakqueue_store;
// Locked access with breakqueue_free
extern int reload_processing;
extern int cmd_processing;
extern int sockd_count;
extern int max_sockd_count;
// WORKQUEUE
@ -1116,8 +1121,10 @@ extern K_STORE *pool0_workqueue_store;
extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store;
extern mutex_t wq_waitlock;
extern pthread_cond_t wq_waitcond;
// this counter ensures we don't switch early from pool0 to pool
extern int pool0_left;
extern int pool0_tot;
extern int pool0_discarded;
// HEARTBEATQUEUE
typedef struct heartbeatqueue {

26
src/ckdb_cmd.c

@ -6786,15 +6786,35 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
{
char ooo_buf[256];
char buf[256];
int count;
int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int _pool0_left, _pool0_discarded, _pool0_tot, poolq_count;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
K_RLOCK(breakqueue_free);
count = max_sockd_count;
relq_count = reload_breakqueue_store->count;
_reload_processing = reload_processing;
relqd_count = reload_done_breakqueue_store->count;
cmdq_count = cmd_breakqueue_store->count;
_cmd_processing = cmd_processing;
cmdqd_count = cmd_done_breakqueue_store->count;
_max_sockd_count = max_sockd_count;
K_RUNLOCK(breakqueue_free);
LOGWARNING(" max_sockd_count=%d", count);
K_RLOCK(workqueue_free);
_pool0_left = pool0_left;
_pool0_discarded = pool0_discarded;
_pool0_tot = pool0_tot;
poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d pool0=%d/%d/%d poolq=%d max_sockd=%d",
relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count,
_pool0_left, _pool0_discarded, _pool0_tot,
poolq_count, _max_sockd_count);
snprintf(buf, sizeof(buf), "ok.%s", cmd);
LOGDEBUG("%s.%s", id, buf);

Loading…
Cancel
Save