Browse Source

ckdb - more seqall logging and lognotice processing counts

master
kanoi 9 years ago
parent
commit
cef058b4f4
  1. 81
      src/ckdb.c
  2. 2
      src/ckdb.h

81
src/ckdb.c

@ -2818,13 +2818,22 @@ static enum cmd_values process_seq(MSGLINE *msgline)
/* If non-seqall data was in a CCL reload file, /* If non-seqall data was in a CCL reload file,
* it can't be processed by update_seq(), so don't */ * it can't be processed by update_seq(), so don't */
if (msgline->n_seqall == 0 && msgline->n_seqstt == 0 && if (msgline->n_seqall == 0 && msgline->n_seqstt == 0 &&
msgline->n_seqpid == 0) msgline->n_seqpid == 0) {
if (SEQALL_LOG) {
LOGNOTICE("%s() SEQALL 0 skipping %.42s...",
__func__,
st = safe_text(msgline->msg));
FREENULL(st);
}
return ckdb_cmds[msgline->which_cmds].cmd_val; return ckdb_cmds[msgline->which_cmds].cmd_val;
}
if (SEQALL_LOG) { if (SEQALL_LOG) {
LOGNOTICE("%s() SEQALL %"PRIu64, LOGNOTICE("%s() SEQALL %"PRIu64" %"PRIu64" %"PRIu64,
__func__, msgline->n_seqall); __func__, msgline->n_seqall, msgline->n_seqstt,
msgline->n_seqpid);
} }
dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt,
msgline->n_seqpid, SEQALL, &(msgline->now), msgline->n_seqpid, SEQALL, &(msgline->now),
&(msgline->cd), msgline->code, &(msgline->cd), msgline->code,
@ -3302,6 +3311,7 @@ static void *breaker(void *arg)
int thr, zeros; int thr, zeros;
bool reload, was_null, msg = false; bool reload, was_null, msg = false;
int queue_sleep, queue_limit, count; int queue_sleep, queue_limit, count;
uint64_t processed = 0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -3333,11 +3343,17 @@ static void *breaker(void *arg)
LOCK_INIT(buf); LOCK_INIT(buf);
rename_proc(buf); rename_proc(buf);
LOGNOTICE("%s() %s %s starting",
__func__, buf, reload ? "reload" : "cmd");
if (reload) { if (reload) {
/* reload has to wait for the reload to start, however, also /* reload has to wait for the reload to start, however, also
* check for startup_complete in case we miss the reload */ * check for startup_complete in case we miss the reload */
while (!everyone_die && !reloading && !startup_complete) while (!everyone_die && !reloading && !startup_complete)
cksleep_ms(queue_sleep); cksleep_ms(queue_sleep);
LOGNOTICE("%s() %s reload processing", __func__, buf);
} }
while (!everyone_die) { while (!everyone_die) {
@ -3369,6 +3385,8 @@ static void *breaker(void *arg)
continue; continue;
} }
processed++;
DATA_BREAKQUEUE(bq, bq_item); DATA_BREAKQUEUE(bq, bq_item);
if (reload) { if (reload) {
@ -3415,6 +3433,9 @@ static void *breaker(void *arg)
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
} }
LOGNOTICE("%s() %s %s exiting, processed %"PRIu64,
__func__, buf, reload ? "reload" : "cmd", processed);
// Get it now while the lock still exists, in case we need it // Get it now while the lock still exists, in case we need it
K_RLOCK(breakqueue_free); K_RLOCK(breakqueue_free);
// Not 100% exact since it could still increase, but close enough // Not 100% exact since it could still increase, but close enough
@ -4472,6 +4493,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item)
static void *clistener(__maybe_unused void *arg) static void *clistener(__maybe_unused void *arg)
{ {
uint64_t processed = 0;
PGconn *conn = NULL; PGconn *conn = NULL;
K_ITEM *wq_item; K_ITEM *wq_item;
time_t now; time_t now;
@ -4481,6 +4503,8 @@ static void *clistener(__maybe_unused void *arg)
LOCK_INIT("db_clistener"); LOCK_INIT("db_clistener");
rename_proc("db_clistener"); rename_proc("db_clistener");
LOGNOTICE("%s() processing", __func__);
clistener_using_data = true; clistener_using_data = true;
conn = dbconnect(); conn = dbconnect();
@ -4498,12 +4522,15 @@ static void *clistener(__maybe_unused void *arg)
now = time(NULL); now = time(NULL);
} }
if (wq_item) if (wq_item) {
processed++;
process_sockd(conn, wq_item); process_sockd(conn, wq_item);
else } else
cksleep_ms(42); cksleep_ms(42);
} }
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed);
clistener_using_data = false; clistener_using_data = false;
if (conn) if (conn)
@ -4514,6 +4541,7 @@ static void *clistener(__maybe_unused void *arg)
static void *blistener(__maybe_unused void *arg) static void *blistener(__maybe_unused void *arg)
{ {
uint64_t processed = 0;
PGconn *conn = NULL; PGconn *conn = NULL;
K_ITEM *wq_item; K_ITEM *wq_item;
time_t now; time_t now;
@ -4523,6 +4551,8 @@ static void *blistener(__maybe_unused void *arg)
LOCK_INIT("db_blistener"); LOCK_INIT("db_blistener");
rename_proc("db_blistener"); rename_proc("db_blistener");
LOGNOTICE("%s() processing", __func__);
blistener_using_data = true; blistener_using_data = true;
now = time(NULL); now = time(NULL);
@ -4539,12 +4569,15 @@ static void *blistener(__maybe_unused void *arg)
now = time(NULL); now = time(NULL);
} }
if (wq_item) if (wq_item) {
processed++;
process_sockd(conn, wq_item); process_sockd(conn, wq_item);
else } else
cksleep_ms(142); cksleep_ms(142);
} }
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed);
blistener_using_data = false; blistener_using_data = false;
if (conn) if (conn)
@ -4932,6 +4965,7 @@ static void *socketer(void *arg)
char *end, *buf = NULL; char *end, *buf = NULL;
K_ITEM *bq_item = NULL; K_ITEM *bq_item = NULL;
BREAKQUEUE *bq = NULL; BREAKQUEUE *bq = NULL;
uint64_t proc_early = 0, processed = 0;
int sockd; int sockd;
tv_t now; tv_t now;
@ -4986,6 +5020,7 @@ static void *socketer(void *arg)
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
earlysock_left++; earlysock_left++;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
proc_early++;
} }
if (SEQALL_LOG) { if (SEQALL_LOG) {
@ -5009,6 +5044,7 @@ static void *socketer(void *arg)
} }
} }
processed++;
// Don't limit the speed filling up cmd_breakqueue_store // Don't limit the speed filling up cmd_breakqueue_store
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
bq_item = k_unlink_head(breakqueue_free); bq_item = k_unlink_head(breakqueue_free);
@ -5025,6 +5061,9 @@ static void *socketer(void *arg)
} }
} }
LOGNOTICE("%s() exiting, early %"PRIu64" after %"PRIu64" (%"PRIu64")",
__func__, proc_early, processed - proc_early, processed);
socketer_using_data = false; socketer_using_data = false;
close_unix_socket(us->sockd, us->path); close_unix_socket(us->sockd, us->path);
@ -5044,12 +5083,15 @@ static void *process_reload(__maybe_unused void *arg)
enum cmd_values cmdnum; enum cmd_values cmdnum;
char *ans, *st = NULL; char *ans, *st = NULL;
time_t now; time_t now;
uint64_t processed = 0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
LOCK_INIT("db_procreload"); LOCK_INIT("db_procreload");
rename_proc("db_procreload"); rename_proc("db_procreload");
LOGNOTICE("%s() starting", __func__);
conn = dbconnect(); conn = dbconnect();
now = time(NULL); now = time(NULL);
@ -5069,6 +5111,8 @@ static void *process_reload(__maybe_unused void *arg)
continue; continue;
} }
processed++;
// Don't keep a connection for more than ~10s ... of processing // Don't keep a connection for more than ~10s ... of processing
if ((time(NULL) - now) > 10) { if ((time(NULL) - now) > 10) {
PQfinish(conn); PQfinish(conn);
@ -5198,6 +5242,8 @@ static void *process_reload(__maybe_unused void *arg)
PQfinish(conn); PQfinish(conn);
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed);
return NULL; return NULL;
} }
@ -5655,7 +5701,8 @@ static void *listener(void *arg)
SEQDATA *seqdata; SEQDATA *seqdata;
K_ITEM *ss_item; K_ITEM *ss_item;
int cpus, i; int cpus, i;
bool reloader, cmder, pool0; bool reloader, cmder, pool0, switch_msg = false;
uint64_t proc0 = 0, proc1 = 0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -5747,6 +5794,7 @@ static void *listener(void *arg)
wqgot = 0; wqgot = 0;
} }
LOGNOTICE("%s() processing pool0", __func__);
/* Process queued work - ensure pool0 is emptied first, /* Process queued work - ensure pool0 is emptied first,
* even if there is pending pool0 data being processed by breaker() */ * even if there is pending pool0 data being processed by breaker() */
pool0 = true; pool0 = true;
@ -5756,9 +5804,10 @@ static void *listener(void *arg)
wq_item = NULL; wq_item = NULL;
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
if (pool0) { if (pool0) {
if (earlysock_left == 0) if (earlysock_left == 0) {
pool0 = false; pool0 = false;
else { switch_msg = true;
} else {
wq_item = k_unlink_head(pool0_workqueue_store); wq_item = k_unlink_head(pool0_workqueue_store);
if (wq_item) if (wq_item)
earlysock_left--; earlysock_left--;
@ -5770,6 +5819,12 @@ static void *listener(void *arg)
} }
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
if (switch_msg) {
switch_msg = false;
LOGNOTICE("%s() pool0 complete, processed %"PRIu64,
__func__, proc0);
}
if (wqcount == 0 && wq_stt.tv_sec != 0L) if (wqcount == 0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin); setnow(&wq_fin);
@ -5783,6 +5838,10 @@ static void *listener(void *arg)
} }
if (wq_item) { if (wq_item) {
if (pool0)
proc0++;
else
proc1++;
wqgot++; wqgot++;
process_queued(conn, wq_item); process_queued(conn, wq_item);
tick(); tick();
@ -5832,6 +5891,8 @@ static void *listener(void *arg)
} }
sayonara: sayonara:
LOGNOTICE("%s() exiting, pool0 %"PRIu64" pool %"PRIu64,
__func__, proc0, proc1);
plistener_using_data = false; plistener_using_data = false;

2
src/ckdb.h

@ -51,7 +51,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.5" #define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.013" #define CKDB_VERSION DB_VERSION"-2.014"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

Loading…
Cancel
Save