Browse Source

ckdb - use a separate thread for message logging

master
kanoi 8 years ago
parent
commit
654baf6bf5
  1. 409
      src/ckdb.c
  2. 20
      src/ckdb.h

409
src/ckdb.c

@ -327,6 +327,15 @@ static bool reload_queue_complete = false;
bool everyone_die = false; bool everyone_die = false;
// Set to true every time a store is created // Set to true every time a store is created
static bool seqdata_reload_lost = false; static bool seqdata_reload_lost = false;
// Tell the ioqueue thread to exit
static bool ioqueue_die = false;
/* Tell the console ioqueue to clear it's list as fast as possible
* In the case where a console log unexpectedly ends up having a very large
* number of messages, since it limits the number of messages per second
* this could take a long time to clear
* To resolve this, you can disconnect from the console if your IO speed to
* the console is slow, then send it all by sending flush.2 */
static bool ioqueue_flush = false;
/* These are included in cmd_homepage /* These are included in cmd_homepage
* to help identify when ckpool locks up (or dies) */ * to help identify when ckpool locks up (or dies) */
@ -402,6 +411,14 @@ int64_t mismatch_all_workmarkers;
int64_t mismatch_all_marks; int64_t mismatch_all_marks;
int64_t mismatch_all_total; int64_t mismatch_all_total;
// IOQUEUE
static K_LIST *ioqueue_free;
static K_STORE *ioqueue_store;
static K_STORE *console_ioqueue_store;
// Trigger ioqueue_store processing
static mutex_t f_ioqueue_waitlock;
static pthread_cond_t f_ioqueue_waitcond;
// LOGQUEUE // LOGQUEUE
K_LIST *logqueue_free; K_LIST *logqueue_free;
K_STORE *logqueue_store; K_STORE *logqueue_store;
@ -823,6 +840,245 @@ static bool no_data_log = false;
* 'restorefrom/' */ * 'restorefrom/' */
static char *logpath; static char *logpath;
static void ioprocess(IOQUEUE *io)
{
char stamp[128], tzinfo[16], tzch;
long minoff, hroff;
struct tm tm;
int ms;
if (io->when.tv_sec == 0)
stamp[0] = '\0';
else {
ms = (int)(io->when.tv_usec / 1000);
localtime_r(&(io->when.tv_sec), &tm);
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '-';
minoff *= -1;
} else
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld:%02ld",
tzch, hroff, minoff % 60);
} else {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld",
tzch, hroff);
}
snprintf(stamp, sizeof(stamp),
"[%d-%02d-%02d %02d:%02d:%02d.%03d%s] ",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec, ms,
tzinfo);
}
if (io->logfd && global_ckp) {
int logfd = global_ckp->logfd;
if (logfd) {
FILE *LOGFP = global_ckp->logfp;
flock(logfd, LOCK_EX);
if (io->errn) {
fprintf(LOGFP, "%s%s with errno %d: %s\n",
stamp, io->msg,
io->errn, strerror(io->errn));
} else
fprintf(LOGFP, "%s%s\n", stamp, io->msg);
flock(logfd, LOCK_UN);
}
}
if (io->logout) {
if (io->errn) {
fprintf(stdout, "%s%s with errno %d: %s%s",
stamp, io->msg,
io->errn, strerror(io->errn),
io->eol ? "\n" : "\r");
} else {
fprintf(stdout, "%s%s%s",
stamp, io->msg,
io->eol ? "\n" : "\r");
}
if (io->flush)
fflush(stdout);
}
if (io->logerr) {
if (io->errn) {
fprintf(stderr, "%s%s with errno %d: %s%s",
stamp, io->msg,
io->errn, strerror(io->errn),
io->eol ? "\n" : "\r");
} else {
fprintf(stderr, "%s%s%s",
stamp, io->msg,
io->eol ? "\n" : "\r");
}
if (io->flush)
fflush(stderr);
}
}
static void *iomsgs(void *consol)
{
ts_t when, when_add;
K_ITEM *io_item;
IOQUEUE *io;
char buf[64];
snprintf(buf, sizeof(buf),
"db%s_%s%s",
dbcode, consol ? "c" : "f", __func__);
LOCK_INIT(buf);
rename_proc(buf);
when_add.tv_sec = 0;
when_add.tv_nsec = 100000000; // 100ms
while (0x80085) {
// WARNING taking locks can produce messages ...
K_WLOCK(ioqueue_free);
if (consol)
io_item = k_unlink_head(console_ioqueue_store);
else
io_item = k_unlink_head(ioqueue_store);
K_WUNLOCK(ioqueue_free);
if (io_item) {
DATA_IOQUEUE(io, io_item);
ioprocess(io);
free(io->msg);
// WARNING taking locks can produce messages ...
K_WLOCK(ioqueue_free);
k_add_head(ioqueue_free, io_item);
K_WUNLOCK(ioqueue_free);
} else {
if (ioqueue_die)
break;
if (consol) {
// The queue is clear
ioqueue_flush = false;
}
}
if (consol) {
if (!ioqueue_flush) {
// max 50 per second
cksleep_ms(20);
}
} else {
if (!io_item) {
setnowts(&when);
timeraddspec(&when, &when_add);
mutex_lock(&f_ioqueue_waitlock);
cond_timedwait(&f_ioqueue_waitcond,
&f_ioqueue_waitlock,
&when);
mutex_unlock(&f_ioqueue_waitlock);
}
}
}
return NULL;
}
#define io_msg(stamp, msg, errn, logfd, logerr) \
_io_msg(stamp, msg, false, errn, logfd, false, logerr, true, true, \
WHERE_FFL_HERE)
#define cr_msg(stamp, msg) \
_io_msg(stamp, msg, true, 0, false, true, false, false, true, WHERE_FFL_HERE)
#define lf_msg(stamp, msg) \
_io_msg(stamp, msg, true, 0, false, true, false, true, true, WHERE_FFL_HERE)
#define err_msg(stamp, msg, errn) \
_io_msg(stamp, msg, true, errn, false, false, true, true, true, WHERE_FFL_HERE)
static void _io_msg(bool stamp, char *msg, bool alloc, int errn, bool logfd,
bool logout, bool logerr, bool eol, bool flush,
WHERE_FFL_ARGS)
{
K_ITEM *fio_item = NULL, *cio_item = NULL;
bool msgused = false;
IOQUEUE *io;
tv_t now;
if (!logfd && !logout && !logerr) {
quitfrom(1, file, func, line,
"%s() called without output", __func__);
}
if (stamp)
setnow(&now);
else
now.tv_sec = 0;
// WARNING taking locks can produce messages ...
K_WLOCK(ioqueue_free);
if (logfd)
fio_item = k_unlink_head(ioqueue_free);
if (logout || logerr)
cio_item = k_unlink_head(ioqueue_free);
K_WUNLOCK(ioqueue_free);
if (logfd) {
DATA_IOQUEUE(io, fio_item);
if (!alloc) {
io->msg = msg;
msgused = true;
} else {
io->msg = strdup(msg);
if (!(io->msg))
quithere(1, "strdup (%d) OOM", (int)strlen(msg));
}
copy_tv(&(io->when), &now);
io->errn = errn;
io->logfd = logfd;
io->logout = false;
io->logerr = false;
io->eol = eol;
io->flush = flush;
}
if (logout || logerr) {
DATA_IOQUEUE(io, cio_item);
if (!alloc && !msgused)
io->msg = msg;
else {
io->msg = strdup(msg);
if (!(io->msg))
quithere(1, "strdup (%d) OOM", (int)strlen(msg));
}
copy_tv(&(io->when), &now);
io->errn = errn;
io->logfd = false;
io->logout = logout;
io->logerr = logerr;
io->eol = eol;
io->flush = flush;
}
// WARNING taking locks can produce messages ...
K_WLOCK(ioqueue_free);
if (fio_item)
k_add_tail(ioqueue_store, fio_item);
if (cio_item)
k_add_tail(console_ioqueue_store, cio_item);
K_WUNLOCK(ioqueue_free);
if (fio_item) {
mutex_lock(&f_ioqueue_waitlock);
pthread_cond_signal(&f_ioqueue_waitcond);
mutex_unlock(&f_ioqueue_waitlock);
}
}
static void replace_ymd(char *srch, char *match, int val) static void replace_ymd(char *srch, char *match, int val)
{ {
char buf[32], *ptr, *found; char buf[32], *ptr, *found;
@ -914,85 +1170,30 @@ static void log_queue_message(char *msg, bool db)
void logmsg(int loglevel, const char *fmt, ...) void logmsg(int loglevel, const char *fmt, ...)
{ {
int logfd = 0;
char *buf = NULL; char *buf = NULL;
struct tm tm;
tv_t now_tv;
int ms;
va_list ap; va_list ap;
char stamp[128]; int errn;
char *extra = EMPTY;
char tzinfo[16];
char tzch;
long minoff, hroff;
if (loglevel > global_ckp->loglevel)
return;
tv_time(&now_tv); errn = errno;
ms = (int)(now_tv.tv_usec / 1000); errno = 0;
localtime_r(&(now_tv.tv_sec), &tm);
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '-';
minoff *= -1;
} else
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld:%02ld",
tzch, hroff, minoff % 60);
} else {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld",
tzch, hroff);
}
snprintf(stamp, sizeof(stamp),
"[%d-%02d-%02d %02d:%02d:%02d.%03d%s]",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec, ms,
tzinfo);
if (!fmt) { if (!fmt) {
fprintf(stderr, "%s %s() called without fmt\n", stamp, __func__); err_msg(true, "logmsg() called without fmt", errn);
return; return;
} }
if (!global_ckp) if (loglevel > global_ckp->loglevel)
extra = " !!NULL global_ckp!!"; return;
else
logfd = global_ckp->logfd;
va_start(ap, fmt); va_start(ap, fmt);
VASPRINTF(&buf, fmt, ap); VASPRINTF(&buf, fmt, ap);
va_end(ap); va_end(ap);
if (logfd) { // iomsgs() will free buf
FILE *LOGFP = global_ckp->logfp; if (loglevel <= LOG_ERR)
io_msg(true, buf, errn, true, loglevel <= LOG_WARNING);
flock(logfd, LOCK_EX); else
fprintf(LOGFP, "%s %s", stamp, buf); io_msg(true, buf, 0, true, loglevel <= LOG_WARNING);
if (loglevel <= LOG_ERR && errno != 0)
fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno));
errno = 0;
fprintf(LOGFP, "\n");
flock(logfd, LOCK_UN);
}
if (loglevel <= LOG_WARNING) {
if (loglevel <= LOG_ERR && errno != 0) {
fprintf(stderr, "%s %s with errno %d: %s%s\n",
stamp, buf, errno, strerror(errno), extra);
errno = 0;
} else
fprintf(stderr, "%s %s%s\n", stamp, buf, extra);
fflush(stderr);
}
free(buf);
} }
void setnowts(ts_t *now) void setnowts(ts_t *now)
@ -1230,15 +1431,14 @@ static time_t last_tick;
void tick() void tick()
{ {
time_t now; time_t now;
char ch; char ch[2];
now = time(NULL); now = time(NULL);
if (now != last_tick) { if (now != last_tick) {
last_tick = now; last_tick = now;
ch = status_chars[ticks++ & 0x3]; ch[0] = status_chars[ticks++ & 0x3];
putchar(ch); ch[1] = '\0';
putchar('\r'); cr_msg(false, ch);
fflush(stdout);
} }
} }
@ -3907,6 +4107,11 @@ static void *breaker(void *arg)
else else
typ = ISCMD; typ = ISCMD;
snprintf(buf, sizeof(buf), "db%s_%c%02d%s",
dbcode, reload ? 'r' : 'c', mythread, __func__);
LOCK_INIT(buf);
rename_proc(buf);
if (mythread == 0) { if (mythread == 0) {
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -3941,11 +4146,6 @@ static void *breaker(void *arg)
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
} }
snprintf(buf, sizeof(buf), "db%s_%c%02d%s",
dbcode, reload ? 'r' : 'c', mythread, __func__);
LOCK_INIT(buf);
rename_proc(buf);
LOGNOTICE("%s() %s starting", __func__, buf); LOGNOTICE("%s() %s starting", __func__, buf);
if (reload) { if (reload) {
@ -5856,6 +6056,11 @@ static void *process_socket(void *arg)
fflush(stderr); fflush(stderr);
if (global_ckp && global_ckp->logfd) if (global_ckp && global_ckp->logfd)
fflush(global_ckp->logfp); fflush(global_ckp->logfp);
if (*(msgline->id)) {
// If you set the flush id to 2
if(atoi(msgline->id) == 2)
ioqueue_flush = true;
}
setnow(&(msgline->processed)); setnow(&(msgline->processed));
break; break;
case CMD_USERSET: case CMD_USERSET:
@ -6409,7 +6614,14 @@ static void *process_reload(__maybe_unused void *arg)
if (arg) if (arg)
mythread = *(int *)(arg); mythread = *(int *)(arg);
else { else
mythread = 0;
snprintf(buf, sizeof(buf), "db%s_p%02drload", dbcode, mythread);
LOCK_INIT(buf);
rename_proc(buf);
if (!arg) {
pthread_detach(pthread_self()); pthread_detach(pthread_self());
for (i = 0; i < THREAD_LIMIT; i++) { for (i = 0; i < THREAD_LIMIT; i++) {
@ -6417,7 +6629,6 @@ static void *process_reload(__maybe_unused void *arg)
running[i] = false; running[i] = false;
} }
mythread = 0;
running[0] = true; running[0] = true;
// Set to create the rest of the threads // Set to create the rest of the threads
@ -6426,10 +6637,6 @@ static void *process_reload(__maybe_unused void *arg)
LOGNOTICE("%s() starting", __func__); LOGNOTICE("%s() starting", __func__);
} }
snprintf(buf, sizeof(buf), "db%s_p%02drload", dbcode, mythread);
LOCK_INIT(buf);
rename_proc(buf);
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000;
@ -6734,7 +6941,7 @@ static bool reload_from(tv_t *start, const tv_t *finish)
{ {
// proc_pt could exit after this returns // proc_pt could exit after this returns
static pthread_t proc_pt; static pthread_t proc_pt;
char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1], tickmsg[256];
size_t rflen = strlen(restorefrom); size_t rflen = strlen(restorefrom);
char *missingfirst = NULL, *missinglast = NULL, *st = NULL; char *missingfirst = NULL, *missinglast = NULL, *st = NULL;
int missing_count, processing, counter; int missing_count, processing, counter;
@ -6816,11 +7023,12 @@ static bool reload_from(tv_t *start, const tv_t *finish)
poolq = pool_workqueue_store->count; poolq = pool_workqueue_store->count;
// pool_workqueue_store should be zero // pool_workqueue_store should be zero
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" snprintf(tickmsg, sizeof(tickmsg),
" ckp %d/%d/%d/%d (%d) \r", TICK_PREFIX"reload %"PRIu64"/%d/%d"
" ckp %d/%d/%d/%d (%d) ",
total+count, relq, relqd, total+count, relq, relqd,
cmdq, cmdqd, pool0q, poolq, mx); cmdq, cmdqd, pool0q, poolq, mx);
fflush(stdout); cr_msg(false, tickmsg);
tick_time = tmp_time; tick_time = tmp_time;
} }
} }
@ -8450,6 +8658,7 @@ int main(int argc, char **argv)
struct sigaction handler; struct sigaction handler;
char *btc_user = "user"; char *btc_user = "user";
char *btc_pass = "p"; char *btc_pass = "p";
pthread_t f_iomsgs_pt, c_iomsgs_pt;
char buf[512]; char buf[512];
ckpool_t ckp; ckpool_t ckp;
int c, ret, i = 0, j; int c, ret, i = 0, j;
@ -8850,6 +9059,23 @@ int main(int argc, char **argv)
cond_init(&wq_cmd_waitcond); cond_init(&wq_cmd_waitcond);
cond_init(&wq_btc_waitcond); cond_init(&wq_btc_waitcond);
mutex_init(&f_ioqueue_waitlock);
cond_init(&f_ioqueue_waitcond);
// Initialise IOQUEUE before anything needs it
ioqueue_free = k_new_list("IOQueue", sizeof(IOQUEUE),
ALLOC_IOQUEUE, LIMIT_IOQUEUE, true);
ioqueue_store = k_new_store(ioqueue_free);
console_ioqueue_store = k_new_store(ioqueue_free);
#if LOCK_CHECK
DLPRIO(ioqueue, PRIO_TERMINAL);
#endif
create_pthread(&f_iomsgs_pt, iomsgs, NULL);
bool consol = true;
create_pthread(&c_iomsgs_pt, iomsgs, &consol);
// Emulate a list for lock checking // Emulate a list for lock checking
process_pplns_free = k_lock_only_list("ProcessPPLNS"); process_pplns_free = k_lock_only_list("ProcessPPLNS");
workers_db_free = k_lock_only_list("WorkersDB"); workers_db_free = k_lock_only_list("WorkersDB");
@ -8923,7 +9149,8 @@ int main(int argc, char **argv)
} }
if (msg) { if (msg) {
trigger = curr; trigger = curr;
printf("%s %ds due to%s%s%s%s%s%s%s%s%s\n", snprintf(buf, sizeof(buf),
"%s %ds due to%s%s%s%s%s%s%s%s%s\n",
msg, (int)(curr - start), msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY, socketer_using_data ? " socketer" : EMPTY,
summariser_using_data ? " summariser" : EMPTY, summariser_using_data ? " summariser" : EMPTY,
@ -8934,13 +9161,19 @@ int main(int argc, char **argv)
marker_using_data ? " marker" : EMPTY, marker_using_data ? " marker" : EMPTY,
breakdown_using_data ? " breakdown" : EMPTY, breakdown_using_data ? " breakdown" : EMPTY,
replier_using_data ? " replier" : EMPTY); replier_using_data ? " replier" : EMPTY);
fflush(stdout); lf_msg(true, buf);
} }
sleep(1); sleep(1);
} }
dealloc_storage(); dealloc_storage();
ioqueue_die = true;
join_pthread(f_iomsgs_pt);
join_pthread(c_iomsgs_pt);
FREE_STORE(console_ioqueue);
FREE_LISTS(ioqueue);
clean_up(&ckp); clean_up(&ckp);
return 0; return 0;

20
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.419" #define CKDB_VERSION DB_VERSION"-2.420"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1115,6 +1115,23 @@ enum cmd_values {
(_row)->pointers = (_row)->pointers; \ (_row)->pointers = (_row)->pointers; \
} while (0) } while (0)
// IOQUEUE
typedef struct ioqueue {
char *msg;
tv_t when;
int errn;
bool logfd;
bool logout;
bool logerr;
bool eol;
bool flush;
} IOQUEUE;
#define ALLOC_IOQUEUE 1024
#define LIMIT_IOQUEUE 0
#define INIT_IOQUEUE(_item) INIT_GENERIC(_item, ioqueue)
#define DATA_IOQUEUE(_var, _item) DATA_GENERIC(_var, _item, ioqueue, true)
// LOGQUEUE // LOGQUEUE
typedef struct logqueue { typedef struct logqueue {
char *msg; char *msg;
@ -2945,6 +2962,7 @@ enum reply_type {
}; };
extern void logmsg(int loglevel, const char *fmt, ...); extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnowts(ts_t *now);
extern void setnow(tv_t *now); extern void setnow(tv_t *now);
extern void tick(); extern void tick();
extern PGconn *dbconnect(); extern PGconn *dbconnect();

Loading…
Cancel
Save