diff --git a/src/ckdb.c b/src/ckdb.c index ccb994a6..a24a51eb 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -120,11 +120,13 @@ static bool replier_using_data; /* To notify thread changes * Set/checked under the function's main loop's first lock * This is always a 'delta' value meaning add or subtract that many */ -int queue_threads_delta = 0; -/* Use -Q to set it higher - * Setting it higher can degrade performance if the CPUs can't +int reload_queue_threads_delta = 0; +int proc_queue_threads_delta = 0; +/* Use -Q to set them higher + * Setting them higher can degrade performance if the CPUs can't * handle the extra locking or the threads are swapping */ -static int queue_threads = 1; +static int reload_queue_threads = 1; +static int proc_queue_threads = 1; // -B to override calculated value static int reload_breakdown_threads = -1; // This is normally the same as above, but key_update only requires 1 @@ -6231,7 +6233,7 @@ static void *process_reload(__maybe_unused void *arg) running[0] = true; // Set to create the rest of the threads - queue_threads_delta = queue_threads - 1; + reload_queue_threads_delta = reload_queue_threads - 1; LOGNOTICE("%s() starting", __func__); } @@ -6251,9 +6253,9 @@ static void *process_reload(__maybe_unused void *arg) break; K_WLOCK(breakqueue_free); - if (mythread == 0 && queue_threads_delta != 0) { - threads_delta = queue_threads_delta; - queue_threads_delta = 0; + if (mythread == 0 && reload_queue_threads_delta != 0) { + threads_delta = reload_queue_threads_delta; + reload_queue_threads_delta = 0; } else { bq_item = k_unlink_head(reload_done_breakqueue_store); if (bq_item) { @@ -6362,7 +6364,8 @@ static void *process_reload(__maybe_unused void *arg) tick(); } - PQfinish(conn); + if (conn) + PQfinish(conn); if (mythread == 0) { for (i = 1; i < THREAD_LIMIT; i++) { @@ -6831,12 +6834,15 @@ static void free_lost(SEQDATA *seqdata) static void *pqproc(void *arg) { + static pthread_t pqproc_pt[THREAD_LIMIT]; + static int n[THREAD_LIMIT]; + static bool running[THREAD_LIMIT]; + /* Process queued work - ensure pool0 is emptied first, * even if there is pending pool0 data being processed by breaker() */ static bool pool0 = true; static tv_t wq_stt, wq_fin; - pthread_t *queue_pt; PGconn *conn = NULL; K_ITEM *wq_item; time_t now = 0; @@ -6847,28 +6853,26 @@ static void *pqproc(void *arg) SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; - int i, *n, zeros; ts_t when, when_add; - int ret; + int i, mythread, threads_delta = 0, done, tot, ret; if (!arg) { setnow(&wq_stt); - n = malloc(queue_threads * sizeof(int)); - queue_pt = malloc(queue_threads * sizeof(*queue_pt)); - for (i = 1; i < queue_threads; i++) { + for (i = 0; i < THREAD_LIMIT; i++) { n[i] = i; - create_pthread(&(queue_pt[i]), pqproc, &(n[i])); + running[i] = false; } - } else { - i = *(int *)(arg); - if (queue_threads < 10) - zeros = 1; - else - zeros = (int)log10(queue_threads) + 1; + mythread = 0; + running[0] = true; - snprintf(buf, sizeof(buf), "db_p%0*dqproc", zeros, i); + // Set to create the rest of the threads + proc_queue_threads_delta = proc_queue_threads - 1; + } else { + mythread = *(int *)(arg); + + snprintf(buf, sizeof(buf), "db_p%02dqproc", mythread); LOCK_INIT(buf); rename_proc(buf); } @@ -6883,34 +6887,104 @@ static void *pqproc(void *arg) // Override checking until pool0 is complete wqcount = -1; while (!everyone_die) { + if (mythread && !running[mythread]) + break; + wq_item = NULL; K_WLOCK(workqueue_free); - if (pool0) { - if (earlysock_left == 0) { - pool0 = false; - switch_msg = true; - } else { - wq_item = k_unlink_head(pool0_workqueue_store); - if (wq_item) - earlysock_left--; + if (mythread == 0 && proc_queue_threads_delta != 0) { + threads_delta = proc_queue_threads_delta; + proc_queue_threads_delta = 0; + } else { + if (pool0) { + if (earlysock_left == 0) { + pool0 = false; + switch_msg = true; + } else { + wq_item = k_unlink_head(pool0_workqueue_store); + if (wq_item) + earlysock_left--; + } + } + if (!pool0) { + wq_item = k_unlink_head(pool_workqueue_store); + wqcount = pool_workqueue_store->count; } - } - if (!pool0) { - wq_item = k_unlink_head(pool_workqueue_store); - wqcount = pool_workqueue_store->count; - } - if (wqcount == 0 && wq_stt.tv_sec != 0L) - setnow(&wq_fin); + if (wqcount == 0 && wq_stt.tv_sec != 0L) + setnow(&wq_fin); - if (wq_item) { - if (pool0) - workqueue_proc0++; - else - workqueue_proc1++; + if (wq_item) { + if (pool0) + workqueue_proc0++; + else + workqueue_proc1++; + } } K_WUNLOCK(workqueue_free); + // TODO: deal with thread creation/shutdown failure + if (threads_delta != 0) { + if (threads_delta > 0) { + // Add threads + tot = 1; + done = 0; + for (i = 1; i < THREAD_LIMIT; i++) { + if (!running[i]) { + if (threads_delta > 0) { + threads_delta--; + running[i] = true; + create_pthread(&(pqproc_pt[i]), + pqproc, + &(n[i])); + done++; + tot++; + } + } else + tot++; + } + LOGWARNING("%s() created %d thread%s total=%d" +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, done, + (done == 1) ? EMPTY : "s", tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } else { + // Notify and wait for each to exit + tot = 1; + done = 0; + for (i = THREAD_LIMIT - 1; i > 0; i--) { + if (running[i]) { + if (threads_delta < 0) { + threads_delta++; + LOGNOTICE("%s() stopping %d", + __func__, i); + running[i] = false; + join_pthread(pqproc_pt[i]); + done++; + } else + tot++; + } + } + LOGWARNING("%s() stopped %d thread%s total=%d " +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, done, + (done == 1) ? EMPTY : "s", tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } + threads_delta = 0; + continue; + } + if (switch_msg) { switch_msg = false; LOGNOTICE("%s() pool0 complete, processed %"PRIu64, @@ -6995,9 +7069,14 @@ static void *pqproc(void *arg) if (conn) PQfinish(conn); - if (!arg) { - for (i = 1; i < queue_threads; i++) - join_pthread(queue_pt[i]); + if (mythread == 0) { + for (i = 1; i < THREAD_LIMIT; i++) { + if (running[i]) { + running[i] = false; + LOGNOTICE("%s() waiting for %d", __func__, i); + join_pthread(pqproc_pt[i]); + } + } } return NULL; @@ -7019,7 +7098,7 @@ static void *listener(void *arg) pthread_detach(pthread_self()); - snprintf(buf, sizeof(buf), "db_p0qproc"); + snprintf(buf, sizeof(buf), "db_p00qproc"); LOCK_INIT(buf); rename_proc(buf); @@ -8354,14 +8433,33 @@ int main(int argc, char **argv) break; case 'Q': { - int qt = atoi(optarg); + // N for both or N:M + char *q = strdup(optarg); + char *colon = strchr(q, ':'); + int qt; + if (colon) + *(colon++) = '\0'; + qt = atoi(q); if (qt < 1 || qt > THREAD_LIMIT) { quit(1, "Invalid queue " - "thread count %d " + "thread count '%s' " "- must be >0 and <=%d", - qt, THREAD_LIMIT); + colon ? q : optarg, + THREAD_LIMIT); + } + reload_queue_threads = qt; + if (!colon) + proc_queue_threads = qt; + else { + qt = atoi(colon); + if (qt < 1 || qt > THREAD_LIMIT) { + quit(1, "Invalid 2nd queue " + "thread count '%s' " + "- must be >0 and <=%d", + colon, THREAD_LIMIT); + } + proc_queue_threads = qt; } - queue_threads = qt; } break; case 'r': diff --git a/src/ckdb.h b/src/ckdb.h index 8cec4bc3..2c8d8b02 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.404" +#define CKDB_VERSION DB_VERSION"-2.405" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -123,7 +123,8 @@ extern enum free_modes free_mode; /* To notify thread changes * Set/checked under the function's main loop's first lock * This is always a 'delta' value meaning add or subtract that many */ -extern int queue_threads_delta; +extern int reload_queue_threads_delta; +extern int proc_queue_threads_delta; // To notify thread changes extern int reload_breakdown_threads_delta; extern int cmd_breakdown_threads_delta; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 1082560e..61ef1fff 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -8398,10 +8398,18 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id, strcasecmp(name, "process_reload") == 0) { K_WLOCK(breakqueue_free); // Just overwrite whatever's there - queue_threads_delta = delta_value; + reload_queue_threads_delta = delta_value; K_WUNLOCK(breakqueue_free); snprintf(reply, siz, "ok.delta %d request sent", delta_value); return strdup(reply); + } else if (strcasecmp(name, "pq") == 0 || + strcasecmp(name, "pqproc") == 0) { + K_WLOCK(workqueue_free); + // Just overwrite whatever's there + proc_queue_threads_delta = delta_value; + K_WUNLOCK(workqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); } else if (strcasecmp(name, "rb") == 0 || strcasecmp(name, "reload_breaker") == 0) { K_WLOCK(breakqueue_free);