Browse Source

ckdb - add some more running stats to cmd_shsta

master
kanoi 9 years ago
parent
commit
ebb0a4873e
  1. 97
      src/ckdb.c
  2. 23
      src/ckdb.h
  3. 51
      src/ckdb_cmd.c

97
src/ckdb.c

@ -293,6 +293,21 @@ tv_t last_share_inv;
tv_t last_auth;
cklock_t last_lock;
// Running stats
// socketer()
tv_t sock_stt;
double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us;
uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv;
// breaker() summarised
tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
uint64_t break_reload_processed, break_cmd_processed;
// clistener()
double clis_us;
uint64_t clis_processed;
// blistener()
double blis_us;
uint64_t blis_processed;
static cklock_t fpm_lock;
static char *first_pool_message;
static sem_t socketer_sem;
@ -366,9 +381,9 @@ K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store;
// this counter ensures we don't switch early from pool0 to pool
int earlysock_left;
int pool0_tot;
int pool0_discarded;
int64_t earlysock_left;
int64_t pool0_tot;
int64_t pool0_discarded;
// HEARTBEATQUEUE
K_LIST *heartbeatqueue_free;
@ -3356,6 +3371,16 @@ static void *breaker(void *arg)
}
// The first one to start
K_WLOCK(breakqueue_free);
if (reload) {
if (!break_reload_stt.tv_sec)
setnow(&break_reload_stt);
} else {
if (!break_cmd_stt.tv_sec)
setnow(&break_cmd_stt);
}
K_WUNLOCK(breakqueue_free);
while (!everyone_die) {
K_WLOCK(breakqueue_free);
bq_item = NULL;
@ -3374,6 +3399,12 @@ static void *breaker(void *arg)
if (!bq_item)
was_null = true;
}
if (bq_item) {
if (reload)
break_reload_processed++;
else
break_cmd_processed++;
}
K_WUNLOCK(breakqueue_free);
if (!bq_item) {
@ -3443,9 +3474,11 @@ static void *breaker(void *arg)
K_RUNLOCK(breakqueue_free);
ck_wlock(&breakdown_lock);
if (reload)
if (reload) {
reload_breakdown_count--;
else
// The last one to finish - updated each exit
setnow(&break_reload_fin);
} else
cmd_breakdown_count--;
if ((reload_breakdown_count + cmd_breakdown_count) < 1) {
@ -4493,9 +4526,9 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item)
static void *clistener(__maybe_unused void *arg)
{
uint64_t processed = 0;
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
time_t now;
pthread_detach(pthread_self());
@ -4523,13 +4556,16 @@ static void *clistener(__maybe_unused void *arg)
}
if (wq_item) {
processed++;
setnow(&now1);
process_sockd(conn, wq_item);
setnow(&now2);
clis_us += us_tvdiff(&now2, &now1);
clis_processed++;
} else
cksleep_ms(42);
cksleep_ms(4);
}
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed);
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed);
clistener_using_data = false;
@ -4541,9 +4577,9 @@ static void *clistener(__maybe_unused void *arg)
static void *blistener(__maybe_unused void *arg)
{
uint64_t processed = 0;
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
time_t now;
pthread_detach(pthread_self());
@ -4570,13 +4606,16 @@ static void *blistener(__maybe_unused void *arg)
}
if (wq_item) {
processed++;
setnow(&now1);
process_sockd(conn, wq_item);
setnow(&now2);
blis_us += us_tvdiff(&now2, &now1);
blis_processed++;
} else
cksleep_ms(142);
cksleep_ms(42);
}
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed);
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed);
blistener_using_data = false;
@ -4965,9 +5004,8 @@ static void *socketer(void *arg)
char *end, *buf = NULL;
K_ITEM *bq_item = NULL;
BREAKQUEUE *bq = NULL;
uint64_t proc_early = 0, processed = 0;
int sockd;
tv_t now;
tv_t now, now1, now2;
pthread_detach(pthread_self());
@ -4988,16 +5026,26 @@ static void *socketer(void *arg)
create_pthread(&proc_pt, process_socket, arg);
}
setnow(&sock_stt);
while (!everyone_die) {
setnow(&now1);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGERR("%s() Failed to accept on socket", __func__);
int e = errno;
LOGERR("%s() Failed to accept on socket (%d:%s)",
__func__, e, strerror(e));
break;
}
setnow(&now2);
sock_us += us_tvdiff(&now2, &now1);
sock_acc++;
setnow(&now1);
buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2);
// Once we've read the message
setnow(&now);
sock_recv_us += us_tvdiff(&now, &now1);
sock_recv++;
if (buf) {
end = buf + strlen(buf) - 1;
// strip trailing \n and \r
@ -5017,10 +5065,13 @@ static void *socketer(void *arg)
// Flag all work for pool0 until the reload completes
if (prereload || reloading) {
seqentryflags = SE_EARLYSOCK;
setnow(&now1);
K_WLOCK(workqueue_free);
earlysock_left++;
K_WUNLOCK(workqueue_free);
proc_early++;
sock_proc_early++;
setnow(&now2);
sock_lock_wq_us += us_tvdiff(&now2, &now1);
}
if (SEQALL_LOG) {
@ -5044,11 +5095,12 @@ static void *socketer(void *arg)
}
}
processed++;
sock_processed++;
// Don't limit the speed filling up cmd_breakqueue_store
setnow(&now1);
K_WLOCK(breakqueue_free);
bq_item = k_unlink_head(breakqueue_free);
// keep the lock since none of these should be slow
K_WUNLOCK(breakqueue_free);
DATA_BREAKQUEUE(bq, bq_item);
bq->buf = buf;
copy_tv(&(bq->now), &now);
@ -5056,13 +5108,16 @@ static void *socketer(void *arg)
bq->sockd = sockd;
if (max_sockd_count < ++sockd_count)
max_sockd_count = sockd_count;
K_WLOCK(breakqueue_free);
k_add_tail(cmd_breakqueue_store, bq_item);
K_WUNLOCK(breakqueue_free);
setnow(&now2);
sock_lock_br_us += us_tvdiff(&now2, &now1);
}
}
LOGNOTICE("%s() exiting, early %"PRIu64" after %"PRIu64" (%"PRIu64")",
__func__, proc_early, processed - proc_early, processed);
__func__, sock_proc_early, sock_processed - sock_proc_early, sock_processed);
socketer_using_data = false;
@ -5759,12 +5814,12 @@ static void *listener(void *arg)
K_RLOCK(workqueue_free);
wq0count = pool0_workqueue_store->count;
wqcount = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
K_RLOCK(breakqueue_free);
bq = cmd_breakqueue_store->count;
bqp = cmd_processing;
bqd = cmd_done_breakqueue_store->count;
K_RUNLOCK(breakqueue_free);
K_RUNLOCK(workqueue_free);
LOGWARNING("reload shares OoO %s",
ooo_status(ooo_buf, sizeof(ooo_buf)));

23
src/ckdb.h

@ -51,7 +51,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.014"
#define CKDB_VERSION DB_VERSION"-2.015"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -343,6 +343,21 @@ extern tv_t last_share_inv;
extern tv_t last_auth;
extern cklock_t last_lock;
// Running stats
// socketer()
extern tv_t sock_stt;
extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us;
extern uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv;
// breaker() summarised
extern tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
extern uint64_t break_reload_processed, break_cmd_processed;
// clistener()
extern double clis_us;
extern uint64_t clis_processed;
// blistener()
extern double blis_us;
extern uint64_t blis_processed;
#define JSON_TRANSFER "json="
#define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1)
#define JSON_BEGIN '{'
@ -1124,9 +1139,9 @@ extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store;
// this counter ensures we don't switch early from pool0 to pool
extern int earlysock_left;
extern int pool0_tot;
extern int pool0_discarded;
extern int64_t earlysock_left;
extern int64_t pool0_tot;
extern int64_t pool0_discarded;
// HEARTBEATQUEUE
typedef struct heartbeatqueue {

51
src/ckdb_cmd.c

@ -6778,7 +6778,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
/* Show a share status report on the console
* Currently: sequence status, OoO info and max_sockd_count */
static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
__maybe_unused K_TREE *trf_root,
@ -6788,8 +6788,10 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
char buf[256];
int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int _earlysock_left, pool0_count, _pool0_discarded, _pool0_tot;
int poolq_count;
int pool0_count, poolq_count;
int64_t _earlysock_left, _pool0_discarded, _pool0_tot;
uint64_t count1, count2, count3, count4;
double tot1, tot2;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
@ -6812,14 +6814,53 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%d pool0=%d/%d/%d "
"poolq=%d max_sockd=%d",
LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%"PRId64
" pool0=%d/%"PRId64"/%"PRId64" poolq=%d max_sockd=%d",
relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count,
_earlysock_left,
pool0_count, _pool0_discarded, _pool0_tot,
poolq_count, _max_sockd_count);
count1 = sock_acc ? : 1;
count2 = sock_recv ? : 1;
count3 = sock_proc_early ? : 1;
count4 = sock_processed ? : 1;
LOGWARNING(" sock: tot %f sock %f/%"PRIu64"/%f recv %f/%"PRIu64"/%f "
"lckw %f/%"PRIu64"/%f lckb %f/%"PRIu64"/%f",
us_tvdiff(now, &sock_stt)/1000000,
sock_us/1000000, sock_acc, (sock_us/count1)/1000000,
sock_recv_us/1000000, sock_recv,
(sock_recv_us/count2)/1000000,
sock_lock_wq_us/1000000, sock_proc_early,
(sock_lock_wq_us/count3)/1000000,
sock_lock_br_us/1000000, sock_processed,
(sock_lock_br_us/count4)/1000000);
if (!break_reload_stt.tv_sec)
tot1 = 0;
else {
if (!break_reload_fin.tv_sec)
tot1 = us_tvdiff(now, &break_reload_stt);
else
tot1 = us_tvdiff(&break_reload_fin, &break_reload_stt);
}
if (!break_cmd_stt.tv_sec)
tot2 = 0;
else
tot2 = us_tvdiff(now, &break_cmd_stt);
count1 = break_reload_processed ? : 1;
count2 = break_cmd_processed ? : 1;
LOGWARNING(" break reload: %f/%"PRIu64"/%f cmd: %f/%"PRIu64"/%f",
tot1/1000000, break_reload_processed, (tot1/count1)/1000000,
tot2/1000000, break_cmd_processed, (tot2/count2)/1000000);
count1 = clis_processed ? : 1;
count2 = blis_processed ? : 1;
LOGWARNING(" clistener %f/%"PRIu64"/%f blistener: %f/%"PRIu64"/%f",
clis_us/1000000, clis_processed, (clis_us/count1)/1000000,
blis_us/1000000, blis_processed, (blis_us/count2)/1000000);
snprintf(buf, sizeof(buf), "ok.%s", cmd);
LOGDEBUG("%s.%s", id, buf);
return strdup(buf);

Loading…
Cancel
Save