diff --git a/src/ckdb.c b/src/ckdb.c index 1b38ff00..76591c6c 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -14,6 +14,48 @@ * Consider adding row level locking (a per kitem usage count) if needed */ +/* Thread layout + * ------------- + * Any thread that manages a thread count will have 00 in it's name + * and name each subsequent thread it creates, with the same name + * but with 01, 02 etc, and then wait on them all before exiting + * The 2 digit 00 relates to THREAD_LIMIT which is 99 + * there's a limit of THREAD_LIMIT active threads per thread manager + * WARNING - however the total number of threads created is limited by the + * LOCK_CHECK code which allows only MAX_THREADS total ckdb thread to + * be created irrelevant of any being deleted + * + * The threads that can be managed have a command option to set them when + * running ckdb and can also be changed via the cmd_threads socket command + * + * The main() 'ckdb' thread starts: + * iomsgs() for filelog '_fiomsgs' and console '_ciomsgs' + * listener() '_p00qproc' + * which manages it's thread count in pqproc() + * + * listener() starts: + * breakdown() for reload '_r00breaker' and cmd '_c00breaker' + * each of which manage their thead counts + * logger() '_logger' + * socksetup() '_socksetup' + * summariser() '_summarise' + * marker() '_marker' + * then calls setup_data() which calls reload() + * then calls pqproc() + * which manages the thread count + * + * socksetup() starts: + * replier() for pool '_preplier' cmd '_creplier' and btc '_breplier' + * listener_all() for cmd '_c00listen' and btc '_b00listen' + * each of which manage their thead counts + * process_socket() '_procsock' + * sockrun() for ckpool '_psockrun' web '_wsockrun' and cmd '_csockrun' + * + * reload() starts: + * process_reload() '_p00rload' + * which manages it's thread count + */ + /* Startup * ------- * During startup we load the DB and track where it is up to with @@ -157,6 +199,11 @@ static int cmd_breakdown_threads = -1; int reload_breakdown_threads_delta = 0; int cmd_breakdown_threads_delta = 0; +int cmd_listener_threads = 2; +int btc_listener_threads = 2; +int cmd_listener_threads_delta = 0; +int btc_listener_threads_delta = 0; + // Lock used to determine when the last breakdown thread exits static cklock_t breakdown_lock; @@ -381,10 +428,10 @@ static uint64_t sock_acc[MAXSOCK], sock_recv[MAXSOCK]; // breaker() summarised static tv_t break_reload_stt, break_cmd_stt, break_reload_fin; static uint64_t break_reload_processed, break_cmd_processed; -// clistener() +// listener_all() +static cklock_t listener_all_lock; static double clis_us; static uint64_t clis_processed; -// blistener() static double blis_us; static uint64_t blis_processed; @@ -958,7 +1005,7 @@ static void ioprocess(IOQUEUE *io) flock(logfd, LOCK_EX); if (io->errn) { fprintf(LOGFP, "%s%s with errno %d: %s\n", - stamp, io->msg, + stamp, io->msg, io->errn, strerror(io->errn)); } else fprintf(LOGFP, "%s%s\n", stamp, io->msg); @@ -3202,7 +3249,7 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, goto gotseqset; } } - } + } // Need to setup a new seqset newseq = true; @@ -3464,7 +3511,7 @@ gotseqset: } } seqdata->seqbase++; - } + } seqdata->maxseq++; } // store n_seqcmd @@ -4525,7 +4572,7 @@ static void *breaker(void *arg) } K_WUNLOCK(breakqueue_free); while (!everyone_die) { - if (mythread && !breaker_running[typ][mythread]) + if (mythread && !breaker_running[typ][mythread]) break; K_WLOCK(breakqueue_free); @@ -6051,37 +6098,163 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t tick(); } -static void *clistener(__maybe_unused void *arg) +struct listener_setup { + int bc; + int thread; +}; + +#define BC_B 0 +#define BC_C 1 + +static pthread_t listener_pt[2][THREAD_LIMIT]; +static struct listener_setup listener_setup[2][THREAD_LIMIT]; + +static void *listener_all(void *arg) { + static bool running[2][THREAD_LIMIT]; + + struct listener_setup *setup; PGconn *conn = NULL; K_ITEM *wq_item; tv_t now1, now2; char buf[128]; time_t now; ts_t when, when_add; - int ret; + int i, typ, mythread, done, tot, ret; + int listener_delta = 0; - pthread_detach(pthread_self()); + setup = (struct listener_setup *)(arg); + typ = setup->bc; + mythread = setup->thread; - snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__); + snprintf(buf, sizeof(buf), "db%s_%c%02d%s", + dbcode, (typ == BC_B) ? 'b' : 'c', mythread, "listen"); LOCK_INIT(buf); rename_proc(buf); - LOGNOTICE("%s() processing", __func__); + if (mythread == 0) { + pthread_detach(pthread_self()); + + for (i = 1; i < THREAD_LIMIT; i++) { + listener_setup[typ][i].thread = i; + listener_setup[typ][i].bc = typ; + running[typ][i] = false; + } + running[typ][0] = true; + + if (typ == BC_B) + listener_delta = btc_listener_threads - 1; + else + listener_delta = cmd_listener_threads - 1; + + LOGNOTICE("%s() %s initialised - delta %d", + __func__, (typ == BC_B) ? "btc" : "cmd", listener_delta); + } + LOGNOTICE("%s() %s processing", __func__, buf); when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; - clistener_using_data = true; + if (typ == BC_B) + blistener_using_data = true; + else + clistener_using_data = true; conn = dbconnect(); now = time(NULL); while (!everyone_die) { + if (mythread && !running[typ][mythread]) + break; + + wq_item = NULL; K_WLOCK(workqueue_free); - wq_item = k_unlink_head(cmd_workqueue_store); + if (mythread == 0 && typ == BC_B && + btc_listener_threads_delta != 0) { + listener_delta = btc_listener_threads_delta; + btc_listener_threads_delta = 0; + } else if (mythread == 0 && typ == BC_C && + cmd_listener_threads_delta != 0) { + listener_delta = cmd_listener_threads_delta; + cmd_listener_threads_delta = 0; + } else { + if (typ == BC_B) + wq_item = k_unlink_head(btc_workqueue_store); + else + wq_item = k_unlink_head(cmd_workqueue_store); + } K_WUNLOCK(workqueue_free); + // TODO: deal with thread creation/shutdown failure + if (listener_delta != 0) { + if (listener_delta > 0) { + // Add threads + tot = 1; + done = 0; + for (i = 1; i < THREAD_LIMIT; i++) { + if (!running[typ][i]) { + if (listener_delta > 0) { + listener_delta--; + running[typ][i] = true; + create_pthread(&(listener_pt[typ][i]), + listener_all, + &(listener_setup[typ][i])); + done++; + tot++; + } + } else + tot++; + } + LOGWARNING("%s() created %d %s thread%s total=%d" +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, + done, + (typ == BC_B) ? "btc" : "cmd", + (done == 1) ? EMPTY : "s", + tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } else { + // Notify and wait for each to exit + tot = 1; + done = 0; + for (i = THREAD_LIMIT - 1; i > 0; i--) { + if (running[typ][i]) { + if (listener_delta < 0) { + listener_delta++; + LOGNOTICE("%s() %s stopping %d", + __func__, + (typ == BC_B) ? "btc" : "cmd", + i); + running[typ][i] = false; + join_pthread(listener_pt[typ][i]); + done++; + } else + tot++; + } + } + LOGWARNING("%s() stopped %d %s thread%s total=%d" +#if LOCK_CHECK + " next_thread_id=%d" +#endif + , __func__, + done, + (typ == BC_B) ? "btc" : "cmd", + (done == 1) ? EMPTY : "s", + tot +#if LOCK_CHECK + , next_thread_id +#endif + ); + } + listener_delta = 0; + continue; + } + // Don't keep a connection for more than ~10s if ((time(NULL) - now) > 10) { PQfinish(conn); @@ -6091,100 +6264,68 @@ static void *clistener(__maybe_unused void *arg) if (wq_item) { setnow(&now1); - process_sockd(conn, wq_item, REPLIER_CMD); + process_sockd(conn, wq_item, + (typ == BC_B) ? REPLIER_BTC : REPLIER_CMD); setnow(&now2); - clis_us += us_tvdiff(&now2, &now1); - clis_processed++; + ck_wlock(&listener_all_lock); + if (typ == BC_B) { + blis_us += us_tvdiff(&now2, &now1); + blis_processed++; + } else { + clis_us += us_tvdiff(&now2, &now1); + clis_processed++; + } + ck_wunlock(&listener_all_lock); } else { setnowts(&when); timeraddspec(&when, &when_add); - mutex_lock(&wq_cmd_waitlock); - ret = cond_timedwait(&wq_cmd_waitcond, - &wq_cmd_waitlock, &when); - if (ret == 0) - wq_cmd_wakes++; - else if (errno == ETIMEDOUT) - wq_cmd_timeouts++; - mutex_unlock(&wq_cmd_waitlock); + if (typ == BC_B) { + mutex_lock(&wq_btc_waitlock); + ret = cond_timedwait(&wq_btc_waitcond, + &wq_btc_waitlock, &when); + if (ret == 0) + wq_btc_wakes++; + else if (errno == ETIMEDOUT) + wq_btc_timeouts++; + mutex_unlock(&wq_btc_waitlock); + } else { + mutex_lock(&wq_cmd_waitlock); + ret = cond_timedwait(&wq_cmd_waitcond, + &wq_cmd_waitlock, &when); + if (ret == 0) + wq_cmd_wakes++; + else if (errno == ETIMEDOUT) + wq_cmd_timeouts++; + mutex_unlock(&wq_cmd_waitlock); + } } } - LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed); - - clistener_using_data = false; - if (conn) PQfinish(conn); - return NULL; -} - -static void *blistener(__maybe_unused void *arg) -{ - PGconn *conn = NULL; - K_ITEM *wq_item; - tv_t now1, now2; - char buf[128]; - time_t now; - ts_t when, when_add; - int ret; - - pthread_detach(pthread_self()); - - snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__); - LOCK_INIT(buf); - rename_proc(buf); - - LOGNOTICE("%s() processing", __func__); - - when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; - when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; - - blistener_using_data = true; - - now = time(NULL); - - while (!everyone_die) { - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(btc_workqueue_store); - K_WUNLOCK(workqueue_free); - - // Don't keep a connection for more than ~10s - if ((time(NULL) - now) > 10) { - PQfinish(conn); - conn = dbconnect(); - now = time(NULL); + if (mythread != 0) + LOGNOTICE("%s() %s exiting", __func__, buf); + else { + for (i = 1; i < THREAD_LIMIT; i++) { + if (running[typ][i]) { + running[typ][i] = false; + LOGNOTICE("%s() %s waiting for %d", + __func__, buf, i); + join_pthread(listener_pt[typ][i]); + } } - if (wq_item) { - setnow(&now1); - process_sockd(conn, wq_item, REPLIER_BTC); - setnow(&now2); - blis_us += us_tvdiff(&now2, &now1); - blis_processed++; + if (typ == BC_B) { + LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, blis_processed); + blistener_using_data = false; } else { - setnowts(&when); - timeraddspec(&when, &when_add); - - mutex_lock(&wq_btc_waitlock); - ret = cond_timedwait(&wq_btc_waitcond, - &wq_btc_waitlock, &when); - if (ret == 0) - wq_btc_wakes++; - else if (errno == ETIMEDOUT) - wq_btc_timeouts++; - mutex_unlock(&wq_btc_waitlock); + LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, clis_processed); + clistener_using_data = false; } } - LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed); - - blistener_using_data = false; - - if (conn) - PQfinish(conn); - return NULL; } @@ -6841,7 +6982,7 @@ static void *socksetup(__maybe_unused void *arg) { pthread_t prep_pt, crep_pt, brep_pt; enum reply_type p_typ, c_typ, b_typ; - pthread_t clis_pt, blis_pt, proc_pt; + pthread_t proc_pt; pthread_t psock_pt, wsock_pt, csock_pt; char nbuf[64]; @@ -6868,9 +7009,15 @@ static void *socksetup(__maybe_unused void *arg) LOGWARNING("%s() Start processing...", __func__); socksetup_using_data = true; - create_pthread(&clis_pt, clistener, NULL); + listener_setup[BC_B][0].bc = BC_B; + listener_setup[BC_B][0].thread = 0; + create_pthread(&listener_pt[BC_B][0], listener_all, + &(listener_setup[BC_B][0])); - create_pthread(&blis_pt, blistener, NULL); + listener_setup[BC_C][0].bc = BC_C; + listener_setup[BC_C][0].thread = 0; + create_pthread(&listener_pt[BC_C][0], listener_all, + &(listener_setup[BC_C][0])); create_pthread(&proc_pt, process_socket, NULL); @@ -7429,7 +7576,7 @@ static bool reload_from(tv_t *start, const tv_t *finish) * Also since ckpool messages are not in order, we could be * aborting early and not get the few slightly later out of * order messages in the log file */ - while (!everyone_die && + while (!everyone_die && logline(reload_buf, MAX_READ, fp, filename)) { reload_line(filename, reload_buf, ++count); @@ -8995,6 +9142,7 @@ static struct option long_options[] = { // override calculated value { "breakdown-threads", required_argument, 0, 'B' }, { "config", required_argument, 0, 'c' }, + { "cmd-listener-threads", required_argument, 0, 'C' }, { "dbname", required_argument, 0, 'd' }, { "minsdiff", required_argument, 0, 'D' }, { "free", required_argument, 0, 'f' }, @@ -9077,7 +9225,7 @@ int main(int argc, char **argv) memset(&ckpcmd, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:Ab:B:c:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:Ab:B:c:C:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) { switch(c) { case '?': case ':': @@ -9120,6 +9268,18 @@ int main(int argc, char **argv) case 'c': ckp.config = strdup(optarg); break; + case 'C': + { + int cl = atoi(optarg); + if (cl < 1 || cl > THREAD_LIMIT) { + quit(1, "Invalid listener " + "thread count %d " + "- must be >0 and <=%d", + cl, THREAD_LIMIT); + } + cmd_listener_threads = cl; + } + break; case 'd': db_name = strdup(optarg); kill = optarg; @@ -9467,6 +9627,7 @@ int main(int argc, char **argv) cklock_init(&breakdown_lock); cklock_init(&replier_lock); + cklock_init(&listener_all_lock); cklock_init(&last_lock); cklock_init(&btc_lock); cklock_init(&poolinstance_lock); diff --git a/src/ckdb.h b/src/ckdb.h index 135494ce..73258212 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.508" +#define CKDB_VERSION DB_VERSION"-2.509" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -129,6 +129,11 @@ extern int proc_queue_threads_delta; extern int reload_breakdown_threads_delta; extern int cmd_breakdown_threads_delta; +extern int cmd_listener_threads; +extern int btc_listener_threads; +extern int cmd_listener_threads_delta; +extern int btc_listener_threads_delta; + #define BLANK " " extern char *EMPTY; extern const char *nullstr; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 8754dddf..15727195 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -8463,6 +8463,22 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id, K_WUNLOCK(breakqueue_free); snprintf(reply, siz, "ok.delta %d request sent", delta_value); return strdup(reply); + } else if (strcasecmp(name, "cl") == 0 || + strcasecmp(name, "cmd_listener") == 0) { + K_WLOCK(workqueue_free); + // Just overwrite whatever's there + cmd_listener_threads_delta = delta_value; + K_WUNLOCK(workqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); + } else if (strcasecmp(name, "bl") == 0 || + strcasecmp(name, "btc_listener") == 0) { + K_WLOCK(workqueue_free); + // Just overwrite whatever's there + btc_listener_threads_delta = delta_value; + K_WUNLOCK(workqueue_free); + snprintf(reply, siz, "ok.delta %d request sent", delta_value); + return strdup(reply); } else { snprintf(reply, siz, "unknown name '%s'", name); LOGERR("%s() %s.%s", __func__, id, reply);