Browse Source

ckdb - pause command to help with culling

master
kanoi 8 years ago
parent
commit
c6e1a62a02
  1. 36
      src/ckdb.c
  2. 14
      src/ckdb.h
  3. 66
      src/ckdb_cmd.c

36
src/ckdb.c

@ -26,7 +26,7 @@
* be created irrelevant of any being deleted * be created irrelevant of any being deleted
* *
* The threads that can be managed have a command option to set them when * The threads that can be managed have a command option to set them when
* running ckdb and can also be changed via the cmd_threads socket command * starting ckdb and can also be changed via the cmd_threads socket command
* *
* The main() 'ckdb' thread starts: * The main() 'ckdb' thread starts:
* iomsgs() for filelog '_fiomsgs' and console '_ciomsgs' * iomsgs() for filelog '_fiomsgs' and console '_ciomsgs'
@ -71,13 +71,13 @@
* completes and just process authorise messages immediately while the * completes and just process authorise messages immediately while the
* reload runs * reload runs
* However, we start the ckpool message queue after loading * However, we start the ckpool message queue after loading
* the optioncontrol, users, workers and useratts DB tables, before loading * the optioncontrol, idcontrol, users, workers and useratts DB tables,
* the much larger DB tables, so that ckdb is effectively ready for messages * before loading the much larger DB tables, so that ckdb is effectively
* almost immediately * ready for messages almost immediately
* The first ckpool message allows us to know where ckpool is up to * The first ckpool message allows us to know where ckpool is up to
* in the CCLs - see reload_from() for how this is handled * in the CCLs - see reload_from() for how this is handled
* The users table, required for the authorise messages, is always updated * The users table, required for the authorise messages, is always updated
* immediately * in the disk DB immediately
*/ */
/* Reload data needed /* Reload data needed
@ -116,7 +116,7 @@
* RAM accountbalance: TODO: created as data is loaded * RAM accountbalance: TODO: created as data is loaded
* *
* idcontrol: only userid reuse is critical and the user is added * idcontrol: only userid reuse is critical and the user is added
* immeditately to the DB before replying to the add message * immeditately to the disk DB before replying to the add message
* *
* Tables that are/will be written straight to the DB, so are OK: * Tables that are/will be written straight to the DB, so are OK:
* users, useraccounts, paymentaddresses, payments, * users, useraccounts, paymentaddresses, payments,
@ -375,7 +375,7 @@ bool dbload_only_sharesummary = false;
* markersummaries and pplns payouts may not be correct */ * markersummaries and pplns payouts may not be correct */
bool sharesummary_marks_limit = false; bool sharesummary_marks_limit = false;
// DB optioncontrol,users,workers,useratts load is complete // DB optioncontrol,idcontrol,users,workers,useratts load is complete
bool db_users_complete = false; bool db_users_complete = false;
// DB load is complete // DB load is complete
bool db_load_complete = false; bool db_load_complete = false;
@ -388,7 +388,7 @@ bool reloaded_N_files = false;
// Data load is complete // Data load is complete
bool startup_complete = false; bool startup_complete = false;
// Set to true when pool0 completes, pool0 = socket data during reload // Set to true when pool0 completes, pool0 = socket data during reload
static bool reload_queue_complete = false; bool reload_queue_complete = false;
// Tell everyone to die // Tell everyone to die
bool everyone_die = false; bool everyone_die = false;
// Set to true every time a store is created // Set to true every time a store is created
@ -551,6 +551,8 @@ int reload_processing;
int cmd_processing; int cmd_processing;
int sockd_count; int sockd_count;
int max_sockd_count; int max_sockd_count;
ts_t breaker_sleep_stt;
int breaker_sleep_ms;
// Trigger breaker() processing // Trigger breaker() processing
mutex_t bq_reload_waitlock; mutex_t bq_reload_waitlock;
@ -4516,6 +4518,9 @@ static void *breaker(void *arg)
ts_t when, when_add; ts_t when, when_add;
int i, typ, mythread, done, tot, ret; int i, typ, mythread, done, tot, ret;
int breaker_delta = 0; int breaker_delta = 0;
ts_t last_sleep = { 0L, 0L };
int last_sleep_ms = 0;
bool do_sleep = false;
setup = (struct breaker_setup *)(arg); setup = (struct breaker_setup *)(arg);
mythread = setup->thread; mythread = setup->thread;
@ -4591,7 +4596,11 @@ static void *breaker(void *arg)
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
bq_item = NULL; bq_item = NULL;
was_null = false; was_null = false;
if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) { if (breaker_sleep_stt.tv_sec > last_sleep.tv_sec) {
copy_ts(&last_sleep, &breaker_sleep_stt);
last_sleep_ms = breaker_sleep_ms;
do_sleep = true;
} else if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) {
breaker_delta = reload_breakdown_threads_delta; breaker_delta = reload_breakdown_threads_delta;
reload_breakdown_threads_delta = 0; reload_breakdown_threads_delta = 0;
} else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) { } else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) {
@ -4621,6 +4630,12 @@ static void *breaker(void *arg)
} }
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
if (do_sleep) {
do_sleep = false;
cksleep_ms_r(&last_sleep, last_sleep_ms);
continue;
}
// TODO: deal with thread creation/shutdown failure // TODO: deal with thread creation/shutdown failure
if (breaker_delta != 0) { if (breaker_delta != 0) {
if (breaker_delta > 0) { if (breaker_delta > 0) {
@ -6396,6 +6411,7 @@ static void *process_socket(__maybe_unused void *arg)
case CMD_CHKPASS: case CMD_CHKPASS:
case CMD_GETATTS: case CMD_GETATTS:
case CMD_THREADS: case CMD_THREADS:
case CMD_PAUSE:
case CMD_HOMEPAGE: case CMD_HOMEPAGE:
case CMD_QUERY: case CMD_QUERY:
break; break;
@ -6597,6 +6613,7 @@ static void *process_socket(__maybe_unused void *arg)
case CMD_EVENTS: case CMD_EVENTS:
case CMD_HIGH: case CMD_HIGH:
case CMD_THREADS: case CMD_THREADS:
case CMD_PAUSE:
case CMD_QUERY: case CMD_QUERY:
msgline->sockd = bq->sockd; msgline->sockd = bq->sockd;
bq->sockd = -1; bq->sockd = -1;
@ -7111,6 +7128,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
case CMD_EVENTS: case CMD_EVENTS:
case CMD_HIGH: case CMD_HIGH:
case CMD_THREADS: case CMD_THREADS:
case CMD_PAUSE:
LOGERR("%s() INVALID message line %"PRIu64 LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...", " ignored '%.42s...",
__func__, bq->count, __func__, bq->count,

14
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.703" #define CKDB_VERSION DB_VERSION"-2.704"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -355,7 +355,7 @@ extern bool dbload_only_sharesummary;
* markersummaries and pplns payouts may not be correct */ * markersummaries and pplns payouts may not be correct */
extern bool sharesummary_marks_limit; extern bool sharesummary_marks_limit;
// DB users,workers load is complete // DB optioncontrol,idcontrol,users,workers,useratts load is complete
extern bool db_users_complete; extern bool db_users_complete;
// DB load is complete // DB load is complete
extern bool db_load_complete; extern bool db_load_complete;
@ -367,6 +367,8 @@ extern bool reloading;
extern bool reloaded_N_files; extern bool reloaded_N_files;
// Data load is complete // Data load is complete
extern bool startup_complete; extern bool startup_complete;
// Set to true when pool0 completes, pool0 = socket data during reload
extern bool reload_queue_complete;
// Tell everyone to die // Tell everyone to die
extern bool everyone_die; extern bool everyone_die;
@ -747,6 +749,7 @@ enum cmd_values {
CMD_EVENTS, CMD_EVENTS,
CMD_HIGH, CMD_HIGH,
CMD_THREADS, CMD_THREADS,
CMD_PAUSE,
CMD_END CMD_END
}; };
@ -1450,6 +1453,8 @@ extern int reload_processing;
extern int cmd_processing; extern int cmd_processing;
extern int sockd_count; extern int sockd_count;
extern int max_sockd_count; extern int max_sockd_count;
extern ts_t breaker_sleep_stt;
extern int breaker_sleep_ms;
// Trigger breaker() processing // Trigger breaker() processing
extern mutex_t bq_reload_waitlock; extern mutex_t bq_reload_waitlock;
@ -2877,6 +2882,11 @@ extern K_STORE *userstats_eos_store;
// newer OR equal // newer OR equal
#define tv_newer_eq(_old, _new) (!(tv_newer(_new, _old))) #define tv_newer_eq(_old, _new) (!(tv_newer(_new, _old)))
#define copy_ts(_dest, _src) do { \
(_dest)->tv_sec = (_src)->tv_sec; \
(_dest)->tv_nsec = (_src)->tv_nsec; \
} while(0)
// WORKERSTATUS from various incoming data // WORKERSTATUS from various incoming data
typedef struct workerstatus { typedef struct workerstatus {
int64_t userid; int64_t userid;

66
src/ckdb_cmd.c

@ -8510,6 +8510,71 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
return buf; return buf;
} }
static char *cmd_pause(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
K_ITEM *i_name;
char reply[1024] = "";
size_t siz = sizeof(reply);
char *name;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_name = require_name(trf_root, "name", 1, NULL, reply, siz);
if (!i_name)
return strdup(reply);
name = transfer_data(i_name);
/* Pause the breaker threads to help culling to take place for some
* tables that can be culled but 'never' empty due to threads always
* creating new data before the old data has finished being processed
* N.B. this should only be needed on a sizeable pool, once after
* the reload completes ... and even 499ms would be a long time to
* pause in the case of a sizeable pool ... DANGER, WILL ROBINSON! */
if (strcasecmp(name, "breaker") == 0) {
K_ITEM *i_ms;
int ms = 100;
i_ms = optional_name(trf_root, "ms", 1, NULL, reply, siz);
if (*reply)
return strdup(reply);
if (i_ms) {
ms = atoi(transfer_data(i_ms));
// 4999 is too long, don't do it!
if (ms < 10 || ms > 4999) {
snprintf(reply, siz,
"%s ms %d outside range 10-4999",
name, ms);
goto out;
}
}
if (!reload_queue_complete && !key_update) {
snprintf(reply, siz,
"no point pausing %s before reload completes",
name);
goto out;
}
/* Use an absolute start time to try to get all threads asleep
* at the same time */
K_WLOCK(breakqueue_free);
cksleep_prepare_r(&breaker_sleep_stt);
breaker_sleep_ms = ms;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.%s %s%dms pause sent", name,
ms > 499 ? "ALERT!!! " : EMPTY, ms);
} else
snprintf(reply, siz, "unknown name '%s'", name);
out:
LOGWARNING("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
/* The socket command format is as follows: /* The socket command format is as follows:
* Basic structure: * Basic structure:
* cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=... * cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=...
@ -8623,5 +8688,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM | ACCESS_WEB }, { CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM | ACCESS_WEB },
{ CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM }, { CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM }, { CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_PAUSE, "pause", false, false, cmd_pause, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 } { CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 }
}; };

Loading…
Cancel
Save