Browse Source

ckdb - non locking ckdb log - since it's not that critical

master
kanoi 10 years ago
parent
commit
e45ef004ef
  1. 118
      src/ckdb.c

118
src/ckdb.c

@ -693,6 +693,18 @@ enum cmd_values {
CMD_END CMD_END
}; };
// LOGQUEUE
typedef struct logqueue {
char *msg;
} LOGQUEUE;
#define ALLOC_LOGQUEUE 1024
#define LIMIT_LOGQUEUE 0
#define DATA_LOGQUEUE(_item) ((LOGQUEUE *)(_item->data))
static K_LIST *logqueue_free;
static K_STORE *logqueue_store;
// WORKQUEUE // WORKQUEUE
typedef struct workqueue { typedef struct workqueue {
char *buf; char *buf;
@ -1291,9 +1303,48 @@ static K_LIST *workerstatus_free;
static K_STORE *workerstatus_store; static K_STORE *workerstatus_store;
static char logname[512]; static char logname[512];
#define LOGFILE(_msg) rotating_log(logname, _msg) #define LOGQUE(_msg) log_queue_message(_msg)
#define LOGFILE(_msg) rotating_log_nolock(_msg)
#define LOGDUP "dup." #define LOGDUP "dup."
// low spec version of rotating_log() - no locking
static bool rotating_log_nolock(char *msg)
{
char *filename;
FILE *fp;
bool ok = false;
filename = rotating_filename(logname, time(NULL));
fp = fopen(filename, "a+");
if (unlikely(!fp)) {
LOGERR("Failed to fopen %s in rotating_log!", filename);
goto stageleft;
}
fprintf(fp, "%s\n", msg);
fclose(fp);
ok = true;
stageleft:
free(filename);
return ok;
}
static void log_queue_message(char *msg)
{
K_ITEM *lq_item;
K_WLOCK(logqueue_free);
lq_item = k_unlink_head(logqueue_free);
K_WUNLOCK(logqueue_free);
DATA_LOGQUEUE(lq_item)->msg = strdup(msg);
K_WLOCK(logqueue_free);
k_add_tail(logqueue_store, lq_item);
K_WUNLOCK(logqueue_free);
}
void logmsg(int loglevel, const char *fmt, ...) void logmsg(int loglevel, const char *fmt, ...)
{ {
int logfd = 0; int logfd = 0;
@ -7520,6 +7571,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(workerstatus, 1, 1); USEINFO(workerstatus, 1, 1);
USEINFO(workqueue, 1, 0); USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0); USEINFO(transfer, 0, 0);
USEINFO(logqueue, 1, 0);
snprintf(tmp, sizeof(tmp), "%ctotalram=%"PRIu64, FLDSEP, tot); snprintf(tmp, sizeof(tmp), "%ctotalram=%"PRIu64, FLDSEP, tot);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
@ -8011,6 +8063,53 @@ static void *summariser(__maybe_unused void *arg)
return NULL; return NULL;
} }
static void *logger(__maybe_unused void *arg)
{
K_ITEM *lq_item;
char buf[128];
tv_t now;
pthread_detach(pthread_self());
setnow(&now);
snprintf(buf, sizeof(buf), "logstart.%ld,%ld",
now.tv_sec, now.tv_usec);
LOGFILE(buf);
while (!everyone_die) {
K_WLOCK(logqueue_free);
lq_item = k_unlink_head(logqueue_store);
K_WUNLOCK(logqueue_free);
if (lq_item) {
LOGFILE(DATA_LOGQUEUE(lq_item)->msg);
free(DATA_LOGQUEUE(lq_item)->msg);
K_WLOCK(logqueue_free);
k_add_head(logqueue_free, lq_item);
K_WUNLOCK(logqueue_free);
} else
cksleep_ms(42);
}
K_WLOCK(logqueue_free);
setnow(&now);
snprintf(buf, sizeof(buf), "logstopping.%d.%ld,%ld",
logqueue_store->count,
now.tv_sec, now.tv_usec);
LOGFILE(buf);
while((lq_item = k_unlink_head(logqueue_store)))
LOGFILE(DATA_LOGQUEUE(lq_item)->msg);
K_WUNLOCK(logqueue_free);
setnow(&now);
snprintf(buf, sizeof(buf), "logstop.%ld,%ld",
now.tv_sec, now.tv_usec);
LOGFILE(buf);
return NULL;
}
static void *socketer(__maybe_unused void *arg) static void *socketer(__maybe_unused void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
@ -8119,10 +8218,10 @@ static void *socketer(__maybe_unused void *arg)
*dot = '\0'; *dot = '\0';
snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", snprintf(reply, sizeof(reply), "%s%ld,%ld.%s",
LOGDUP, now.tv_sec, now.tv_usec, duptype); LOGDUP, now.tv_sec, now.tv_usec, duptype);
LOGFILE(reply); LOGQUE(reply);
LOGWARNING("Duplicate '%s' message received", duptype); LOGWARNING("Duplicate '%s' message received", duptype);
} else { } else {
LOGFILE(buf); LOGQUE(buf);
cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd);
switch (cmdnum) { switch (cmdnum) {
case CMD_REPLY: case CMD_REPLY:
@ -8359,7 +8458,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
return true; return true;
} }
LOGFILE(buf); LOGQUE(buf);
cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd);
switch (cmdnum) { switch (cmdnum) {
// Ignore // Ignore
@ -8463,7 +8562,7 @@ static bool reload_from(tv_t *start)
setnow(&now); setnow(&now);
tvs_to_buf(&now, run, sizeof(run)); tvs_to_buf(&now, run, sizeof(run));
snprintf(reload_buf, MAX_READ, "reload.%s.s0", run); snprintf(reload_buf, MAX_READ, "reload.%s.s0", run);
LOGFILE(reload_buf); LOGQUE(reload_buf);
conn = dbconnect(); conn = dbconnect();
@ -8541,7 +8640,7 @@ static bool reload_from(tv_t *start)
PQfinish(conn); PQfinish(conn);
snprintf(reload_buf, MAX_READ, "reload.%s.%"PRIu64, run, total); snprintf(reload_buf, MAX_READ, "reload.%s.%"PRIu64, run, total);
LOGFILE(reload_buf); LOGQUE(reload_buf);
LOGWARNING("%s(): read %d file%s, total %"PRIu64" line%s", LOGWARNING("%s(): read %d file%s, total %"PRIu64" line%s",
__func__, __func__,
processing, processing == 1 ? "" : "s", processing, processing == 1 ? "" : "s",
@ -8612,11 +8711,18 @@ static void process_queued(K_ITEM *wq_item)
static void *listener(void *arg) static void *listener(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t log_pt;
pthread_t sock_pt; pthread_t sock_pt;
pthread_t summ_pt; pthread_t summ_pt;
K_ITEM *wq_item; K_ITEM *wq_item;
int qc; int qc;
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
create_pthread(&log_pt, logger, NULL);
create_pthread(&sock_pt, socketer, arg); create_pthread(&sock_pt, socketer, arg);
create_pthread(&summ_pt, summariser, NULL); create_pthread(&summ_pt, summariser, NULL);

Loading…
Cancel
Save