diff --git a/src/ckdb.c b/src/ckdb.c index 652679a0..712fe132 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -126,13 +126,16 @@ int queue_threads_delta = 0; * handle the extra locking or the threads are swapping */ static int queue_threads = 1; // -B to override calculated value -static int breakdown_threads = -1; +static int reload_breakdown_threads = -1; +// This is normally the same as above, but key_update only requires 1 +static int cmd_breakdown_threads = -1; // cpu count to breakdown thread ratio #define BREAKDOWN_RATIO 3 -static int reload_breakdown_count = 0; -static int cmd_breakdown_count = 0; -/* Lock for access to *breakdown_count - * Any change to/from 0 will update breakdown_using_data */ +// Flags to notify thread changes +int reload_breakdown_threads_delta = 0; +int cmd_breakdown_threads_delta = 0; + +// Lock used to determine when the last breakdown thread exits static cklock_t breakdown_lock; static int replier_count = 0; @@ -3719,23 +3722,63 @@ nogood: return CMD_REPLY; } +struct breaker_setup { + bool reload; + int thread; +}; + +#define ISRELOAD 0 +#define ISCMD 1 + static void *breaker(void *arg) { + static pthread_t breaker_pt[2][THREAD_LIMIT]; + static struct breaker_setup breaker_setup[2][THREAD_LIMIT]; + static bool breaker_running[2][THREAD_LIMIT]; + static bool reload0 = false; + static bool cmd0 = false; + + struct breaker_setup *setup; K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; MSGLINE *msgline = NULL; char buf[128]; - int thr, zeros; - bool reload, was_null, msg = false; + bool reload, was_null, msg; int queue_sleep, queue_limit, count; uint64_t processed = 0; ts_t when, when_add; - int ret; + int i, typ, mythread, done, tot, ret; + int breaker_delta = 0; - pthread_detach(pthread_self()); + setup = (struct breaker_setup *)(arg); + mythread = setup->thread; + if ((reload = setup->reload)) + typ = ISRELOAD; + else + typ = ISCMD; + + if (mythread == 0) { + pthread_detach(pthread_self()); + + for (i = 0; i < THREAD_LIMIT; i++) { + breaker_setup[typ][i].thread = i; + breaker_setup[typ][i].reload = reload; + breaker_running[typ][i] = false; + } + breaker_running[typ][0] = true; + + if (reload) { + reload0 = true; + breaker_delta = reload_breakdown_threads - 1; + } else { + cmd0 = true; + breaker_delta = cmd_breakdown_threads - 1; + } + + LOGNOTICE("%s() %s initialised - delta %d", + __func__, reload ? "reload" : "cmd", breaker_delta); + } - // Is this a reload thread or a cmd thread? - reload = *(bool *)(arg); if (reload) { queue_limit = reload_queue_limit; queue_sleep = RELOAD_QUEUE_SLEEP_MS; @@ -3748,37 +3791,22 @@ static void *breaker(void *arg) when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; } - ck_wlock(&breakdown_lock); - if (reload) - thr = ++reload_breakdown_count; - else - thr = ++cmd_breakdown_count; - breakdown_using_data = true; - ck_wunlock(&breakdown_lock); - - if (breakdown_threads < 10) - zeros = 1; - else - zeros = (int)log10(breakdown_threads) + 1; - - snprintf(buf, sizeof(buf), "db_%c%0*d%s", - reload ? 'r' : 'c', zeros, thr, __func__); + snprintf(buf, sizeof(buf), "db_%c%02d%s", + reload ? 'r' : 'c', mythread, __func__); LOCK_INIT(buf); rename_proc(buf); - LOGNOTICE("%s() %s %s starting", - __func__, buf, reload ? "reload" : "cmd"); + LOGNOTICE("%s() %s starting", __func__, buf); if (reload) { /* reload has to wait for the reload to start, however, also * check for startup_complete in case we miss the reload */ while (!everyone_die && !reloading && !startup_complete) cksleep_ms(queue_sleep); - - LOGNOTICE("%s() %s reload processing", __func__, buf); - } + LOGNOTICE("%s() %s processing", __func__, buf); + // The first one to start K_WLOCK(breakqueue_free); if (reload) { @@ -3790,31 +3818,112 @@ static void *breaker(void *arg) } K_WUNLOCK(breakqueue_free); while (!everyone_die) { - K_WLOCK(breakqueue_free); - bq_item = NULL; - was_null = false; - if (reload) - count = reload_done_breakqueue_store->count; - else - count = cmd_done_breakqueue_store->count; + if (mythread && !breaker_running[typ][mythread]) + break; - // Don't unlink if we are above the limit - if (count <= queue_limit) { - if (reload) - bq_item = k_unlink_head(reload_breakqueue_store); - else - bq_item = k_unlink_head(cmd_breakqueue_store); - if (!bq_item) - was_null = true; - } - if (bq_item) { + K_WLOCK(breakqueue_free); + if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) { + breaker_delta = reload_breakdown_threads_delta; + reload_breakdown_threads_delta = 0; + } else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) { + breaker_delta = cmd_breakdown_threads_delta; + cmd_breakdown_threads_delta = 0; + } else { + bq_item = NULL; + was_null = false; if (reload) - break_reload_processed++; + count = reload_done_breakqueue_store->count; else - break_cmd_processed++; + count = cmd_done_breakqueue_store->count; + + // Don't unlink if we are above the limit + if (count <= queue_limit) { + if (reload) + bq_item = k_unlink_head(reload_breakqueue_store); + else + bq_item = k_unlink_head(cmd_breakqueue_store); + if (!bq_item) + was_null = true; + } + if (bq_item) { + if (reload) + break_reload_processed++; + else + break_cmd_processed++; + } } K_WUNLOCK(breakqueue_free); + // TODO: deal with thread creation/shutdown failure + if (breaker_delta != 0) { + if (breaker_delta > 0) { + // Add threads + tot = 1; + done = 0; + for (i = 1; i < THREAD_LIMIT; i++) { + if (!breaker_running[typ][i]) { + if (breaker_delta > 0) { + breaker_delta--; + breaker_running[typ][i] = true; + create_pthread(&(breaker_pt[typ][i]), + breaker, + &(breaker_setup[typ][i])); + done++; + tot++; + } + } else + tot++; + } + LOGWARNING("%s() created %d %s thread%s total=%d" +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, + done, + reload ? "reload" : "cmd", + (done == 1) ? EMPTY : "s", + tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } else { + // Notify and wait for each to exit + tot = 1; + done = 0; + for (i = THREAD_LIMIT - 1; i > 0; i--) { + if (breaker_running[typ][i]) { + if (breaker_delta < 0) { + breaker_delta++; + LOGNOTICE("%s() %s stopping %d", + __func__, + reload ? "reload" : "cmd", + i); + breaker_running[typ][i] = false; + join_pthread(breaker_pt[typ][i]); + done++; + } else + tot++; + } + } + LOGWARNING("%s() stopped %d %s thread%s total=%d" +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, + done, + reload ? "reload" : "cmd", + (done == 1) ? EMPTY : "s", + tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } + breaker_delta = 0; + continue; + } + if (!bq_item) { // Is the queue empty and the reload completed? if (was_null && reload && !reloading) @@ -3913,23 +4022,37 @@ static void *breaker(void *arg) count = max_sockd_count; K_RUNLOCK(breakqueue_free); - ck_wlock(&breakdown_lock); - if (reload) { - reload_breakdown_count--; - // The last one to finish - updated each exit - setnow(&break_reload_fin); - } else - cmd_breakdown_count--; + if (mythread == 0) { + for (i = 1; i < THREAD_LIMIT; i++) { + if (breaker_running[typ][i]) { + breaker_running[typ][i] = false; + LOGNOTICE("%s() %s waiting for %d", + __func__, buf, i); + join_pthread(breaker_pt[typ][i]); + } + } - if ((reload_breakdown_count + cmd_breakdown_count) < 1) { - breakdown_using_data = false; - msg = true; - } - ck_wunlock(&breakdown_lock); + if (reload) + setnow(&break_reload_fin); + + msg = false; + ck_wlock(&breakdown_lock); + if (reload) + reload0 = false; + else + cmd0 = false; - if (msg) { - LOGWARNING("%s() threads shut down - max_sockd_count=%d", - __func__, count); + if (reload0 == false && cmd0 == false) { + breakdown_using_data = false; + msg = true; + } + ck_wunlock(&breakdown_lock); + + if (msg) { + LOGWARNING("%s() threads shut down - " + "max_sockd_count=%d", + __func__, count); + } } return NULL; @@ -6124,6 +6247,9 @@ static void *process_reload(__maybe_unused void *arg) now = time(NULL); while (!everyone_die) { + if (mythread && !running[mythread]) + break; + K_WLOCK(breakqueue_free); if (mythread == 0 && queue_threads_delta != 0) { threads_delta = queue_threads_delta; @@ -6137,9 +6263,6 @@ static void *process_reload(__maybe_unused void *arg) } K_WUNLOCK(breakqueue_free); - if (!running[mythread]) - break; - // TODO: deal with thread creation/shutdown failure if (threads_delta != 0) { if (threads_delta > 0) { @@ -6174,11 +6297,12 @@ static void *process_reload(__maybe_unused void *arg) // Notify and wait for each to exit tot = 1; done = 0; - i = THREAD_LIMIT - 1; for (i = THREAD_LIMIT - 1; i > 0; i--) { if (running[i]) { if (threads_delta < 0) { threads_delta++; + LOGNOTICE("%s() stopping %d", + __func__, i); running[i] = false; join_pthread(procrel_pt[i]); done++; @@ -6240,11 +6364,13 @@ static void *process_reload(__maybe_unused void *arg) PQfinish(conn); - // Only when everyone_die is true if (mythread == 0) { for (i = 1; i < THREAD_LIMIT; i++) { - if (running[i]) + if (running[i]) { + running[i] = false; + LOGNOTICE("%s() waiting for %d", __func__, i); join_pthread(procrel_pt[i]); + } } LOGNOTICE("%s() exiting, processed %"PRIu64, @@ -6883,12 +7009,13 @@ static void *listener(void *arg) pthread_t sock_pt; pthread_t summ_pt; pthread_t mark_pt; - pthread_t break_pt; + pthread_t reload_break_pt; + pthread_t cmd_break_pt; int bq, bqp, bqd, wq0count, wqcount; char ooo_buf[256]; char buf[128]; - int cpus, i; - bool reloader, cmder; + int cpus; + struct breaker_setup reloader, cmder; pthread_detach(pthread_self()); @@ -6911,20 +7038,25 @@ static void *listener(void *arg) DLPRIO(logqueue, 94); DLPRIO(breakqueue, PRIO_TERMINAL); #endif - if (breakdown_threads <= 0) { + if (reload_breakdown_threads <= 0) { cpus = sysconf(_SC_NPROCESSORS_ONLN); if (cpus < 1) cpus = 1; - breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; + reload_breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; } + cmd_breakdown_threads = reload_breakdown_threads; LOGWARNING("%s(): creating %d*2 breaker threads ...", - __func__, breakdown_threads); - reloader = true; - for (i = 0; i < breakdown_threads; i++) - create_pthread(&break_pt, breaker, &reloader); - cmder = false; - for (i = 0; i < breakdown_threads; i++) - create_pthread(&break_pt, breaker, &cmder); + __func__, reload_breakdown_threads); + + breakdown_using_data = true; + + reloader.reload = true; + reloader.thread = 0; + create_pthread(&reload_break_pt, breaker, &reloader); + + cmder.reload = false; + cmder.thread = 0; + create_pthread(&cmd_break_pt, breaker, &cmder); if (no_data_log == false) create_pthread(&log_pt, logger, NULL); @@ -7274,10 +7406,12 @@ static void update_keysummary(ckpool_t *ckp) int64_t markerid_stt, markerid_fin; char *tmp, *minus; tv_t db_stt, db_fin; - pthread_t break_pt, sock_pt; + pthread_t reload_break_pt; + pthread_t cmd_break_pt; + pthread_t sock_pt; double min, sec; - bool reloader, cmder; - int cpus, i; + int cpus; + struct breaker_setup reloader, cmder; // Simple value check to abort early if (!key_range || !(*key_range)) { @@ -7338,21 +7472,26 @@ static void update_keysummary(ckpool_t *ckp) #if LOCK_CHECK DLPRIO(breakqueue, PRIO_TERMINAL); #endif - if (breakdown_threads <= 0) { + if (reload_breakdown_threads <= 0) { cpus = sysconf(_SC_NPROCESSORS_ONLN); if (cpus < 1) cpus = 1; - breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; + reload_breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; } - LOGWARNING("%s(): creating %d+1 breaker threads ...", - __func__, breakdown_threads); - reloader = true; - for (i = 0; i < breakdown_threads; i++) - create_pthread(&break_pt, breaker, &reloader); - cmder = false; // Only needs one - for (i = 0; i < 1; i++) - create_pthread(&break_pt, breaker, &cmder); + cmd_breakdown_threads = 1; + LOGWARNING("%s(): creating %d+1 breaker threads ...", + __func__, reload_breakdown_threads); + + breakdown_using_data = true; + + reloader.reload = true; + reloader.thread = 0; + create_pthread(&reload_break_pt, breaker, &reloader); + + cmder.reload = false; + cmder.thread = 0; + create_pthread(&cmd_break_pt, breaker, &cmder); alloc_storage(); @@ -8039,7 +8178,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:f:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case '?': case ':': @@ -8073,7 +8212,7 @@ int main(int argc, char **argv) "- must be >0 and <=%d", bt, THREAD_LIMIT); } - breakdown_threads = bt; + reload_breakdown_threads = bt; } break; case 'c': @@ -8237,12 +8376,12 @@ int main(int argc, char **argv) case 'S': btc_server = strdup(optarg); break; - case 'T': - txn_tree_store = false; - break; case 't': btc_timeout = atoi(optarg); break; + case 'T': + txn_tree_store = false; + break; case 'u': db_user = strdup(optarg); kill = optarg; diff --git a/src/ckdb.h b/src/ckdb.h index ca156cf9..a96f7d9f 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.402" +#define CKDB_VERSION DB_VERSION"-2.403" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -124,6 +124,9 @@ extern enum free_modes free_mode; * Set/checked under the function's main loop's first lock * This is always a 'delta' value meaning add or subtract that many */ extern int queue_threads_delta; +/* Flags to notify thread changes */ +extern int reload_breakdown_threads_delta; +extern int cmd_breakdown_threads_delta; #define BLANK " " extern char *EMPTY; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index a0f432be..1082560e 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -8402,6 +8402,22 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id, K_WUNLOCK(breakqueue_free); snprintf(reply, siz, "ok.delta %d request sent", delta_value); return strdup(reply); + } else if (strcasecmp(name, "rb") == 0 || + strcasecmp(name, "reload_breaker") == 0) { + K_WLOCK(breakqueue_free); + // Just overwrite whatever's there + reload_breakdown_threads_delta = delta_value; + K_WUNLOCK(breakqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); + } else if (strcasecmp(name, "cb") == 0 || + strcasecmp(name, "cmd_breaker") == 0) { + K_WLOCK(breakqueue_free); + // Just overwrite whatever's there + cmd_breakdown_threads_delta = delta_value; + K_WUNLOCK(breakqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); } else { snprintf(reply, siz, "unknown name '%s'", name); LOGERR("%s() %s.%s", __func__, id, reply);