diff --git a/src/ckdb.c b/src/ckdb.c index 046df82f..b191f347 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -26,7 +26,7 @@ * be created irrelevant of any being deleted * * The threads that can be managed have a command option to set them when - * running ckdb and can also be changed via the cmd_threads socket command + * starting ckdb and can also be changed via the cmd_threads socket command * * The main() 'ckdb' thread starts: * iomsgs() for filelog '_fiomsgs' and console '_ciomsgs' @@ -71,13 +71,13 @@ * completes and just process authorise messages immediately while the * reload runs * However, we start the ckpool message queue after loading - * the optioncontrol, users, workers and useratts DB tables, before loading - * the much larger DB tables, so that ckdb is effectively ready for messages - * almost immediately + * the optioncontrol, idcontrol, users, workers and useratts DB tables, + * before loading the much larger DB tables, so that ckdb is effectively + * ready for messages almost immediately * The first ckpool message allows us to know where ckpool is up to * in the CCLs - see reload_from() for how this is handled * The users table, required for the authorise messages, is always updated - * immediately + * in the disk DB immediately */ /* Reload data needed @@ -116,7 +116,7 @@ * RAM accountbalance: TODO: created as data is loaded * * idcontrol: only userid reuse is critical and the user is added - * immeditately to the DB before replying to the add message + * immeditately to the disk DB before replying to the add message * * Tables that are/will be written straight to the DB, so are OK: * users, useraccounts, paymentaddresses, payments, @@ -375,7 +375,7 @@ bool dbload_only_sharesummary = false; * markersummaries and pplns payouts may not be correct */ bool sharesummary_marks_limit = false; -// DB optioncontrol,users,workers,useratts load is complete +// DB optioncontrol,idcontrol,users,workers,useratts load is complete bool db_users_complete = false; // DB load is complete bool db_load_complete = false; @@ -388,7 +388,7 @@ bool reloaded_N_files = false; // Data load is complete bool startup_complete = false; // Set to true when pool0 completes, pool0 = socket data during reload -static bool reload_queue_complete = false; +bool reload_queue_complete = false; // Tell everyone to die bool everyone_die = false; // Set to true every time a store is created @@ -551,6 +551,8 @@ int reload_processing; int cmd_processing; int sockd_count; int max_sockd_count; +ts_t breaker_sleep_stt; +int breaker_sleep_ms; // Trigger breaker() processing mutex_t bq_reload_waitlock; @@ -4516,6 +4518,9 @@ static void *breaker(void *arg) ts_t when, when_add; int i, typ, mythread, done, tot, ret; int breaker_delta = 0; + ts_t last_sleep = { 0L, 0L }; + int last_sleep_ms = 0; + bool do_sleep = false; setup = (struct breaker_setup *)(arg); mythread = setup->thread; @@ -4591,7 +4596,11 @@ static void *breaker(void *arg) K_WLOCK(breakqueue_free); bq_item = NULL; was_null = false; - if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) { + if (breaker_sleep_stt.tv_sec > last_sleep.tv_sec) { + copy_ts(&last_sleep, &breaker_sleep_stt); + last_sleep_ms = breaker_sleep_ms; + do_sleep = true; + } else if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) { breaker_delta = reload_breakdown_threads_delta; reload_breakdown_threads_delta = 0; } else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) { @@ -4621,6 +4630,12 @@ static void *breaker(void *arg) } K_WUNLOCK(breakqueue_free); + if (do_sleep) { + do_sleep = false; + cksleep_ms_r(&last_sleep, last_sleep_ms); + continue; + } + // TODO: deal with thread creation/shutdown failure if (breaker_delta != 0) { if (breaker_delta > 0) { @@ -6396,6 +6411,7 @@ static void *process_socket(__maybe_unused void *arg) case CMD_CHKPASS: case CMD_GETATTS: case CMD_THREADS: + case CMD_PAUSE: case CMD_HOMEPAGE: case CMD_QUERY: break; @@ -6597,6 +6613,7 @@ static void *process_socket(__maybe_unused void *arg) case CMD_EVENTS: case CMD_HIGH: case CMD_THREADS: + case CMD_PAUSE: case CMD_QUERY: msgline->sockd = bq->sockd; bq->sockd = -1; @@ -7111,6 +7128,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item) case CMD_EVENTS: case CMD_HIGH: case CMD_THREADS: + case CMD_PAUSE: LOGERR("%s() INVALID message line %"PRIu64 " ignored '%.42s...", __func__, bq->count, diff --git a/src/ckdb.h b/src/ckdb.h index 4a9b9560..86f407d4 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.703" +#define CKDB_VERSION DB_VERSION"-2.704" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -355,7 +355,7 @@ extern bool dbload_only_sharesummary; * markersummaries and pplns payouts may not be correct */ extern bool sharesummary_marks_limit; -// DB users,workers load is complete +// DB optioncontrol,idcontrol,users,workers,useratts load is complete extern bool db_users_complete; // DB load is complete extern bool db_load_complete; @@ -367,6 +367,8 @@ extern bool reloading; extern bool reloaded_N_files; // Data load is complete extern bool startup_complete; +// Set to true when pool0 completes, pool0 = socket data during reload +extern bool reload_queue_complete; // Tell everyone to die extern bool everyone_die; @@ -747,6 +749,7 @@ enum cmd_values { CMD_EVENTS, CMD_HIGH, CMD_THREADS, + CMD_PAUSE, CMD_END }; @@ -1450,6 +1453,8 @@ extern int reload_processing; extern int cmd_processing; extern int sockd_count; extern int max_sockd_count; +extern ts_t breaker_sleep_stt; +extern int breaker_sleep_ms; // Trigger breaker() processing extern mutex_t bq_reload_waitlock; @@ -2877,6 +2882,11 @@ extern K_STORE *userstats_eos_store; // newer OR equal #define tv_newer_eq(_old, _new) (!(tv_newer(_new, _old))) +#define copy_ts(_dest, _src) do { \ + (_dest)->tv_sec = (_src)->tv_sec; \ + (_dest)->tv_nsec = (_src)->tv_nsec; \ + } while(0) + // WORKERSTATUS from various incoming data typedef struct workerstatus { int64_t userid; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 5eb52d35..775221b0 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -8510,6 +8510,71 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id, return buf; } +static char *cmd_pause(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) +{ + K_ITEM *i_name; + char reply[1024] = ""; + size_t siz = sizeof(reply); + char *name; + + LOGDEBUG("%s(): cmd '%s'", __func__, cmd); + + i_name = require_name(trf_root, "name", 1, NULL, reply, siz); + if (!i_name) + return strdup(reply); + name = transfer_data(i_name); + + /* Pause the breaker threads to help culling to take place for some + * tables that can be culled but 'never' empty due to threads always + * creating new data before the old data has finished being processed + * N.B. this should only be needed on a sizeable pool, once after + * the reload completes ... and even 499ms would be a long time to + * pause in the case of a sizeable pool ... DANGER, WILL ROBINSON! */ + if (strcasecmp(name, "breaker") == 0) { + K_ITEM *i_ms; + int ms = 100; + + i_ms = optional_name(trf_root, "ms", 1, NULL, reply, siz); + if (*reply) + return strdup(reply); + if (i_ms) { + ms = atoi(transfer_data(i_ms)); + // 4999 is too long, don't do it! + if (ms < 10 || ms > 4999) { + snprintf(reply, siz, + "%s ms %d outside range 10-4999", + name, ms); + goto out; + } + } + + if (!reload_queue_complete && !key_update) { + snprintf(reply, siz, + "no point pausing %s before reload completes", + name); + goto out; + } + + /* Use an absolute start time to try to get all threads asleep + * at the same time */ + K_WLOCK(breakqueue_free); + cksleep_prepare_r(&breaker_sleep_stt); + breaker_sleep_ms = ms; + K_WUNLOCK(breakqueue_free); + snprintf(reply, siz, "ok.%s %s%dms pause sent", name, + ms > 499 ? "ALERT!!! " : EMPTY, ms); + } else + snprintf(reply, siz, "unknown name '%s'", name); + +out: + LOGWARNING("%s() %s.%s", __func__, id, reply); + return strdup(reply); +} + /* The socket command format is as follows: * Basic structure: * cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=... @@ -8623,5 +8688,6 @@ struct CMDS ckdb_cmds[] = { { CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM | ACCESS_WEB }, { CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM }, { CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_PAUSE, "pause", false, false, cmd_pause, SEQ_NONE, ACCESS_SYSTEM }, { CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 } };