Browse Source

ckdb - allow adjusting the reload process thread count

master
kanoi 8 years ago
parent
commit
366a36a9ae
  1. 118
      src/ckdb.c
  2. 15
      src/ckdb.h
  3. 56
      src/ckdb_cmd.c

118
src/ckdb.c

@ -117,14 +117,17 @@ static bool blistener_using_data;
static bool breakdown_using_data;
static bool replier_using_data;
// Define the array size for thread data
#define THREAD_LIMIT 99
/* Flag to notify thread changes
* Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */
int queue_threads_delta = 0;
/* Use -Q to set it higher
* Setting it higher can degrade performance if the server can't
* handle the extra locking or is swapping */
* Setting it higher can degrade performance if the CPUs can't
* handle the extra locking or the threads are swapping */
static int queue_threads = 1;
// -B to override calculated value
static int breakdown_threads = -1;
// cpu count to breakdown thread ratio
#define BREAKDOWN_RATIO 3
static int reload_breakdown_count = 0;
static int cmd_breakdown_count = 0;
@ -5395,6 +5398,7 @@ static void *process_socket(void *arg)
case CMD_SHSTA:
case CMD_CHKPASS:
case CMD_GETATTS:
case CMD_THREADS:
case CMD_HOMEPAGE:
break;
default:
@ -5586,6 +5590,7 @@ static void *process_socket(void *arg)
case CMD_LOCKS:
case CMD_EVENTS:
case CMD_HIGH:
case CMD_THREADS:
msgline->sockd = bq->sockd;
bq->sockd = -1;
K_WLOCK(workqueue_free);
@ -6020,6 +6025,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
case CMD_LOCKS:
case CMD_EVENTS:
case CMD_HIGH:
case CMD_THREADS:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
__func__, bq->count,
@ -6077,37 +6083,37 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
static void *process_reload(__maybe_unused void *arg)
{
pthread_t *procrel_pt;
static pthread_t procrel_pt[THREAD_LIMIT];
static int n[THREAD_LIMIT];
static bool running[THREAD_LIMIT];
PGconn *conn = NULL;
K_ITEM *bq_item = NULL;
char buf[128];
time_t now;
int i, *n, zeros;
ts_t when, when_add;
int ret;
int i, mythread, threads_delta = 0, done, tot, ret;
if (arg)
i = *(int *)(arg);
mythread = *(int *)(arg);
else {
pthread_detach(pthread_self());
n = malloc(queue_threads * sizeof(int));
procrel_pt = malloc(queue_threads * sizeof(*procrel_pt));
for (i = 1; i < queue_threads; i++) {
for (i = 0; i < THREAD_LIMIT; i++) {
n[i] = i;
create_pthread(&(procrel_pt[i]), process_reload, &(n[i]));
running[i] = false;
}
i = 0;
mythread = 0;
running[0] = true;
// Set to create the rest of the threads
queue_threads_delta = queue_threads - 1;
LOGNOTICE("%s() starting", __func__);
}
if (queue_threads < 10)
zeros = 1;
else
zeros = (int)log10(queue_threads) + 1;
snprintf(buf, sizeof(buf), "db_p%0*drload", zeros, i);
snprintf(buf, sizeof(buf), "db_p%02drload", mythread);
LOCK_INIT(buf);
rename_proc(buf);
@ -6119,13 +6125,70 @@ static void *process_reload(__maybe_unused void *arg)
while (!everyone_die) {
K_WLOCK(breakqueue_free);
bq_item = k_unlink_head(reload_done_breakqueue_store);
if (bq_item) {
reload_processing++;
reload_processed++;
if (mythread == 0 && queue_threads_delta != 0) {
threads_delta = queue_threads_delta;
queue_threads_delta = 0;
} else {
bq_item = k_unlink_head(reload_done_breakqueue_store);
if (bq_item) {
reload_processing++;
reload_processed++;
}
}
K_WUNLOCK(breakqueue_free);
if (!running[mythread])
break;
// TODO: deal with thread creation/shutdown failure
if (threads_delta != 0) {
if (threads_delta > 0) {
// Add threads
tot = 1;
done = 0;
for (i = 1; i < THREAD_LIMIT; i++) {
if (!running[i]) {
if (threads_delta > 0) {
threads_delta--;
running[i] = true;
create_pthread(&(procrel_pt[i]),
process_reload,
&(n[i]));
done++;
tot++;
}
} else
tot++;
}
LOGWARNING("%s() created %d thread%s total now"
" %d",
__func__, done,
(done == 1) ? EMPTY : "s", tot);
} else {
// Notify and wait for each to exit
tot = 1;
done = 0;
i = THREAD_LIMIT - 1;
for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (running[i]) {
if (threads_delta < 0) {
threads_delta++;
running[i] = false;
join_pthread(procrel_pt[i]);
done++;
} else
tot++;
}
}
LOGWARNING("%s() stopped %d thread%s total now"
" %d",
__func__, done,
(done == 1) ? EMPTY : "s", tot);
}
threads_delta = 0;
continue;
}
if (!bq_item) {
// Finished reloading?
if (!reloading)
@ -6165,9 +6228,12 @@ static void *process_reload(__maybe_unused void *arg)
PQfinish(conn);
if (!arg) {
for (i = 1; i < queue_threads; i++)
join_pthread(procrel_pt[i]);
// Only when everyone_die is true
if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) {
if (running[i])
join_pthread(procrel_pt[i]);
}
LOGNOTICE("%s() exiting, processed %"PRIu64,
__func__, reload_processed);

15
src/ckdb.h

@ -11,8 +11,11 @@
#ifndef CKDB_H
#define CKDB_H
// Remove this line if you have an old GCC version
#ifdef __GNUC__
#if __GNUC__ >= 6
#pragma GCC diagnostic ignored "-Wtautological-compare"
#endif
#endif
#include "config.h"
@ -55,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.400"
#define CKDB_VERSION DB_VERSION"-2.401"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -115,6 +118,13 @@ enum free_modes {
extern enum free_modes free_mode;
// Define the array size for thread data
#define THREAD_LIMIT 99
/* Flag to notify thread changes
* Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */
extern int queue_threads_delta;
#define BLANK " "
extern char *EMPTY;
extern const char *nullstr;
@ -722,6 +732,7 @@ enum cmd_values {
CMD_LOCKS,
CMD_EVENTS,
CMD_HIGH,
CMD_THREADS,
CMD_END
};

56
src/ckdb_cmd.c

@ -8356,6 +8356,61 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
return buf;
}
// Running thread adjustments
static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
K_ITEM *i_name, *i_delta;
char *name, *delta;
char reply[1024] = "";
size_t siz = sizeof(reply);
char *buf = NULL;
int delta_value = 0;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_name = require_name(trf_root, "name", 1, NULL, reply, siz);
if (!i_name)
return strdup(reply);
name = transfer_data(i_name);
i_delta = require_name(trf_root, "delta", 2, NULL, reply, siz);
if (!i_delta)
return strdup(reply);
delta = transfer_data(i_delta);
if (*delta != '+' && *delta != '-') {
snprintf(reply, siz, "invalid delta '%s'", delta);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
delta_value = atoi(delta+1);
if (delta_value < 1 || delta_value >= THREAD_LIMIT) {
snprintf(reply, siz, "invalid delta range '%s'", delta);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
if (*delta == '-')
delta_value = -delta_value;
if (strcasecmp(name, "pr") == 0 ||
strcasecmp(name, "process_reload") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
queue_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else {
snprintf(reply, siz, "unknown name '%s'", name);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
return buf;
}
/* The socket command format is as follows:
* Basic structure:
* cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=...
@ -8468,5 +8523,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_LOCKS, "locks", false, false, cmd_locks, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 }
};

Loading…
Cancel
Save