diff --git a/src/ckdb.c b/src/ckdb.c index 3f4f9350..afd1d524 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -117,14 +117,17 @@ static bool blistener_using_data; static bool breakdown_using_data; static bool replier_using_data; -// Define the array size for thread data -#define THREAD_LIMIT 99 +/* Flag to notify thread changes + * Set/checked under the function's main loop's first lock + * This is always a 'delta' value meaning add or subtract that many */ +int queue_threads_delta = 0; /* Use -Q to set it higher - * Setting it higher can degrade performance if the server can't - * handle the extra locking or is swapping */ + * Setting it higher can degrade performance if the CPUs can't + * handle the extra locking or the threads are swapping */ static int queue_threads = 1; // -B to override calculated value static int breakdown_threads = -1; +// cpu count to breakdown thread ratio #define BREAKDOWN_RATIO 3 static int reload_breakdown_count = 0; static int cmd_breakdown_count = 0; @@ -5395,6 +5398,7 @@ static void *process_socket(void *arg) case CMD_SHSTA: case CMD_CHKPASS: case CMD_GETATTS: + case CMD_THREADS: case CMD_HOMEPAGE: break; default: @@ -5586,6 +5590,7 @@ static void *process_socket(void *arg) case CMD_LOCKS: case CMD_EVENTS: case CMD_HIGH: + case CMD_THREADS: msgline->sockd = bq->sockd; bq->sockd = -1; K_WLOCK(workqueue_free); @@ -6020,6 +6025,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item) case CMD_LOCKS: case CMD_EVENTS: case CMD_HIGH: + case CMD_THREADS: LOGERR("%s() INVALID message line %"PRIu64 " ignored '%.42s...", __func__, bq->count, @@ -6077,37 +6083,37 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item) static void *process_reload(__maybe_unused void *arg) { - pthread_t *procrel_pt; + static pthread_t procrel_pt[THREAD_LIMIT]; + static int n[THREAD_LIMIT]; + static bool running[THREAD_LIMIT]; + PGconn *conn = NULL; K_ITEM *bq_item = NULL; char buf[128]; time_t now; - int i, *n, zeros; ts_t when, when_add; - int ret; + int i, mythread, threads_delta = 0, done, tot, ret; if (arg) - i = *(int *)(arg); + mythread = *(int *)(arg); else { pthread_detach(pthread_self()); - n = malloc(queue_threads * sizeof(int)); - procrel_pt = malloc(queue_threads * sizeof(*procrel_pt)); - for (i = 1; i < queue_threads; i++) { + for (i = 0; i < THREAD_LIMIT; i++) { n[i] = i; - create_pthread(&(procrel_pt[i]), process_reload, &(n[i])); + running[i] = false; } - i = 0; + + mythread = 0; + running[0] = true; + + // Set to create the rest of the threads + queue_threads_delta = queue_threads - 1; LOGNOTICE("%s() starting", __func__); } - if (queue_threads < 10) - zeros = 1; - else - zeros = (int)log10(queue_threads) + 1; - - snprintf(buf, sizeof(buf), "db_p%0*drload", zeros, i); + snprintf(buf, sizeof(buf), "db_p%02drload", mythread); LOCK_INIT(buf); rename_proc(buf); @@ -6119,13 +6125,70 @@ static void *process_reload(__maybe_unused void *arg) while (!everyone_die) { K_WLOCK(breakqueue_free); - bq_item = k_unlink_head(reload_done_breakqueue_store); - if (bq_item) { - reload_processing++; - reload_processed++; + if (mythread == 0 && queue_threads_delta != 0) { + threads_delta = queue_threads_delta; + queue_threads_delta = 0; + } else { + bq_item = k_unlink_head(reload_done_breakqueue_store); + if (bq_item) { + reload_processing++; + reload_processed++; + } } K_WUNLOCK(breakqueue_free); + if (!running[mythread]) + break; + + // TODO: deal with thread creation/shutdown failure + if (threads_delta != 0) { + if (threads_delta > 0) { + // Add threads + tot = 1; + done = 0; + for (i = 1; i < THREAD_LIMIT; i++) { + if (!running[i]) { + if (threads_delta > 0) { + threads_delta--; + running[i] = true; + create_pthread(&(procrel_pt[i]), + process_reload, + &(n[i])); + done++; + tot++; + } + } else + tot++; + } + LOGWARNING("%s() created %d thread%s total now" + " %d", + __func__, done, + (done == 1) ? EMPTY : "s", tot); + } else { + // Notify and wait for each to exit + tot = 1; + done = 0; + i = THREAD_LIMIT - 1; + for (i = THREAD_LIMIT - 1; i > 0; i--) { + if (running[i]) { + if (threads_delta < 0) { + threads_delta++; + running[i] = false; + join_pthread(procrel_pt[i]); + done++; + } else + tot++; + } + } + LOGWARNING("%s() stopped %d thread%s total now" + " %d", + __func__, done, + (done == 1) ? EMPTY : "s", tot); + } + threads_delta = 0; + continue; + } + if (!bq_item) { // Finished reloading? if (!reloading) @@ -6165,9 +6228,12 @@ static void *process_reload(__maybe_unused void *arg) PQfinish(conn); - if (!arg) { - for (i = 1; i < queue_threads; i++) - join_pthread(procrel_pt[i]); + // Only when everyone_die is true + if (mythread == 0) { + for (i = 1; i < THREAD_LIMIT; i++) { + if (running[i]) + join_pthread(procrel_pt[i]); + } LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, reload_processed); diff --git a/src/ckdb.h b/src/ckdb.h index 20796597..ddf8213a 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -11,8 +11,11 @@ #ifndef CKDB_H #define CKDB_H -// Remove this line if you have an old GCC version +#ifdef __GNUC__ +#if __GNUC__ >= 6 #pragma GCC diagnostic ignored "-Wtautological-compare" +#endif +#endif #include "config.h" @@ -55,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.400" +#define CKDB_VERSION DB_VERSION"-2.401" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -115,6 +118,13 @@ enum free_modes { extern enum free_modes free_mode; +// Define the array size for thread data +#define THREAD_LIMIT 99 +/* Flag to notify thread changes + * Set/checked under the function's main loop's first lock + * This is always a 'delta' value meaning add or subtract that many */ +extern int queue_threads_delta; + #define BLANK " " extern char *EMPTY; extern const char *nullstr; @@ -722,6 +732,7 @@ enum cmd_values { CMD_LOCKS, CMD_EVENTS, CMD_HIGH, + CMD_THREADS, CMD_END }; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index dfe8b576..a0f432be 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -8356,6 +8356,61 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id, return buf; } +// Running thread adjustments +static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) +{ + K_ITEM *i_name, *i_delta; + char *name, *delta; + char reply[1024] = ""; + size_t siz = sizeof(reply); + char *buf = NULL; + int delta_value = 0; + + LOGDEBUG("%s(): cmd '%s'", __func__, cmd); + + i_name = require_name(trf_root, "name", 1, NULL, reply, siz); + if (!i_name) + return strdup(reply); + name = transfer_data(i_name); + i_delta = require_name(trf_root, "delta", 2, NULL, reply, siz); + if (!i_delta) + return strdup(reply); + delta = transfer_data(i_delta); + if (*delta != '+' && *delta != '-') { + snprintf(reply, siz, "invalid delta '%s'", delta); + LOGERR("%s() %s.%s", __func__, id, reply); + return strdup(reply); + } + delta_value = atoi(delta+1); + if (delta_value < 1 || delta_value >= THREAD_LIMIT) { + snprintf(reply, siz, "invalid delta range '%s'", delta); + LOGERR("%s() %s.%s", __func__, id, reply); + return strdup(reply); + } + if (*delta == '-') + delta_value = -delta_value; + + if (strcasecmp(name, "pr") == 0 || + strcasecmp(name, "process_reload") == 0) { + K_WLOCK(breakqueue_free); + // Just overwrite whatever's there + queue_threads_delta = delta_value; + K_WUNLOCK(breakqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); + } else { + snprintf(reply, siz, "unknown name '%s'", name); + LOGERR("%s() %s.%s", __func__, id, reply); + return strdup(reply); + } + + return buf; +} + /* The socket command format is as follows: * Basic structure: * cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=... @@ -8468,5 +8523,6 @@ struct CMDS ckdb_cmds[] = { { CMD_LOCKS, "locks", false, false, cmd_locks, SEQ_NONE, ACCESS_SYSTEM }, { CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM }, { CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM }, { CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 } };