From 01ee4fbfd164bd15dcd0fd6caea209c8dacd4dd3 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 30 Sep 2015 19:16:16 +1000 Subject: [PATCH] ckdb - allow shift processing to start during the reload --- src/ckdb.c | 62 ++++++++++++++++++++++++++++++++++++++++++++++--- src/ckdb.h | 29 ++++++++++++----------- src/ckdb_cmd.c | 6 +++-- src/ckdb_data.c | 61 ++++++++++++++++++++++++------------------------ src/ckdb_dbio.c | 20 +++++++++------- 5 files changed, 120 insertions(+), 58 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 78045855..0143a0c3 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -253,6 +253,8 @@ bool db_users_complete = false; bool db_load_complete = false; // Different input data handling bool reloading = false; +// Start marks processing during a larger reload +static bool reloaded_N_files = false; // Data load is complete bool startup_complete = false; // Set to true the first time workqueue reaches 0 after startup @@ -1483,13 +1485,27 @@ static bool setup_data() sec -= min * 60.0; LOGWARNING("reload complete %.0fm %.3fs", min, sec); + // full lock access since mark processing can occur + ck_wlock(&process_pplns_lock); + K_WLOCK(workerstatus_free); + K_RLOCK(sharesummary_free); + K_RLOCK(workmarkers_free); + K_RLOCK(markersummary_free); + set_block_share_counters(); + if (!everyone_die) + workerstatus_ready(); + + K_RUNLOCK(markersummary_free); + K_RUNLOCK(workmarkers_free); + K_RUNLOCK(sharesummary_free); + K_WUNLOCK(workerstatus_free); + ck_wunlock(&process_pplns_lock); + if (everyone_die) return false; - workerstatus_ready(); - workinfo_current = last_in_ktree(workinfo_height_root, ctx); if (workinfo_current) { DATA_WORKINFO(wic, workinfo_current); @@ -3083,9 +3099,20 @@ static void *summariser(__maybe_unused void *arg) rename_proc("db_summariser"); + /* Don't do any summarisation until the reload queue completes coz: + * 1) It locks/accesses a lot of data - workinfo/markersummary that + * can slow down the reload + * 2) If you stop and restart ckdb this wont affect the restart point + * Thus it's OK to do it later + * 3) It does I/O to bitcoind which is slow ... + * 4) It triggers the payout generation which also accesses a lot of + * data - workinfo/markersummary - but it wont affect a later + * restart point if it hasn't been done. Thus it's OK to do it later + */ while (!everyone_die && !reload_queue_complete) cksleep_ms(42); + LOGWARNING("%s() Start processing...", __func__); summariser_using_data = true; while (!everyone_die) { @@ -3628,7 +3655,13 @@ static void *marker(__maybe_unused void *arg) rename_proc("db_marker"); - while (!everyone_die && !reload_queue_complete) + /* We want this to start during the CCL reload so that if we run a + * large reload and it fails at some point, the next reload will not + * always have to go back to the same reload point as before due to + * no new workmarkers being completed/processed + * However, don't start during the first N reload files so that a + * normal ckdb restart reload won't slow down */ + while (!everyone_die && !reloaded_N_files && !reload_queue_complete) cksleep_ms(42); if (sharesummary_marks_limit) { @@ -3637,6 +3670,7 @@ static void *marker(__maybe_unused void *arg) return NULL; } + LOGWARNING("%s() Start processing...", __func__); marker_using_data = true; while (!everyone_die) { @@ -3692,6 +3726,7 @@ static void *logger(__maybe_unused void *arg) snprintf(buf, sizeof(buf), "db%s_logger", dbcode); rename_proc(buf); + LOGWARNING("%s() Start processing...", __func__); logger_using_data = true; setnow(&now); @@ -3809,6 +3844,7 @@ static void *socketer(__maybe_unused void *arg) while (!everyone_die && !db_users_complete) cksem_mswait(&socketer_sem, 420); + LOGWARNING("%s() Start processing...", __func__); socketer_using_data = true; want_first = true; @@ -4484,6 +4520,14 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) return false; } +// How many files need to be processed before flagging reloaded_N_files +#define RELOAD_N_FILES 2 +// optioncontrol name to override the above value +#define RELOAD_N_FILES_STR "ReloadNFiles" + +// How many lines in a reload file required to count it +#define RELOAD_N_COUNT 1000 + /* If the reload start file is missing and -r was specified correctly: * touch the filename reported in "Failed to open 'filename'", * if ckdb aborts at the beginning of the reload, then start again */ @@ -4501,11 +4545,15 @@ static bool reload_from(tv_t *start) tv_t now, begin; double diff; FILE *fp = NULL; + int file_N_limit; reload_buf = malloc(MAX_READ); if (!reload_buf) quithere(1, "(%d) OOM", MAX_READ); + file_N_limit = (int)sys_setting(RELOAD_N_FILES_STR, RELOAD_N_FILES, + &date_eot); + reloading = true; copy_tv(&reload_timestamp, start); @@ -4569,6 +4617,14 @@ static bool reload_from(tv_t *start) LOGWARNING("%s(): confirm range complete", __func__); break; } + + /* Used by marker() to start mark generation during a longer + * than normal reload */ + if (count > RELOAD_N_COUNT) { + if (--file_N_limit < 1) + reloaded_N_files = true; + } + filename = rotating_filename(restorefrom, reload_timestamp.tv_sec); ok = logopen(&filename, &fp, &apipe); if (!ok) { diff --git a/src/ckdb.h b/src/ckdb.h index 403acf0c..a45dc0be 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.3" -#define CKDB_VERSION DB_VERSION"-1.350" +#define CKDB_VERSION DB_VERSION"-1.400" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2212,16 +2212,16 @@ extern K_ITEM *_optional_name(K_TREE *trf_root, char *name, int len, char *patt, extern K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt, char *reply, size_t siz, WHERE_FFL_ARGS); extern cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b); -extern K_ITEM *get_workerstatus(int64_t userid, char *workername); -#define find_create_workerstatus(_u, _w, _file, _func, _line) \ - _find_create_workerstatus(_u, _w, true, _file, _func, _line, WHERE_FFL_HERE) -#define find_workerstatus(_u, _w, _file, _func, _line) \ - _find_create_workerstatus(_u, _w, false, _file, _func, _line, WHERE_FFL_HERE) - -extern K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, - bool create, const char *file2, - const char *func2, const int line2, - WHERE_FFL_ARGS); +extern K_ITEM *get_workerstatus(bool lock, int64_t userid, char *workername); +#define find_create_workerstatus(_l, _u, _w, _file, _func, _line) \ + _find_create_workerstatus(_l, _u, _w, true, _file, _func, _line, WHERE_FFL_HERE) +#define find_workerstatus(_l, _u, _w, _file, _func, _line) \ + _find_create_workerstatus(_l, _u, _w, false, _file, _func, _line, WHERE_FFL_HERE) + +extern K_ITEM *_find_create_workerstatus(bool lock, int64_t userid, + char *workername, bool create, + const char *file2, const char *func2, + const int line2, WHERE_FFL_ARGS); extern void zero_all_active(tv_t *when); extern void workerstatus_ready(); #define workerstatus_update(_auths, _shares, _userstats) \ @@ -2336,7 +2336,7 @@ extern cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_blocks(int32_t height, char *blockhash, K_TREE_CTX *ctx); extern K_ITEM *find_prev_blocks(int32_t height, K_TREE_CTX *ctx); extern const char *blocks_confirmed(char *confirmed); -extern void zero_on_new_block(); +extern void zero_on_new_block(bool lock); extern void set_block_share_counters(); extern bool check_update_blocks_stats(tv_t *stats); #define set_blockcreatedate(_h) _set_blockcreatedate(_h, WHERE_FFL_HERE) @@ -2484,8 +2484,9 @@ extern K_ITEM *useratts_add(PGconn *conn, char *username, char *attname, bool begun); extern bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd); extern bool useratts_fill(PGconn *conn); -extern K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, - char *difficultydefault, char *idlenotificationenabled, +extern K_ITEM *workers_add(PGconn *conn, bool lock, int64_t userid, + char *workername, char *difficultydefault, + char *idlenotificationenabled, char *idlenotificationtime, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); extern bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 2f8b396f..d7bed90e 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -1734,7 +1734,8 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users) DATA_WORKERS_NULL(workers, w_item); while (w_item && workers->userid == users->userid) { if (CURRENT(&(workers->expirydate))) { - ws_item = get_workerstatus(users->userid, workers->workername); + ws_item = get_workerstatus(true, users->userid, + workers->workername); if (ws_item) { DATA_WORKERSTATUS(workerstatus, ws_item); t_diffacc += workerstatus->block_diffacc; @@ -1996,7 +1997,8 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, rows = 0; while (w_item && workers->userid == users->userid) { if (CURRENT(&(workers->expirydate))) { - ws_item = get_workerstatus(users->userid, workers->workername); + ws_item = get_workerstatus(true, users->userid, + workers->workername); if (ws_item) { DATA_WORKERSTATUS(workerstatus, ws_item); K_RLOCK(workerstatus_free); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index eb2572c5..95d38469 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -815,7 +815,7 @@ cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b) /* TODO: replace a lot of the code for all data types that codes finds, * each with specific functions for finding, to centralise the finds, * with passed ctx's */ -K_ITEM *get_workerstatus(int64_t userid, char *workername) +K_ITEM *get_workerstatus(bool lock, int64_t userid, char *workername) { WORKERSTATUS workerstatus; K_TREE_CTX ctx[1]; @@ -826,9 +826,11 @@ K_ITEM *get_workerstatus(int64_t userid, char *workername) INIT_WORKERSTATUS(&look); look.data = (void *)(&workerstatus); - K_RLOCK(workerstatus_free); + if (lock) + K_RLOCK(workerstatus_free); find = find_in_ktree(workerstatus_root, &look, ctx); - K_RUNLOCK(workerstatus_free); + if (lock) + K_RUNLOCK(workerstatus_free); return find; } @@ -839,7 +841,7 @@ K_ITEM *get_workerstatus(int64_t userid, char *workername) * This has 2 sets of file/func/line to allow 2 levels of traceback * to see why it happened */ -K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, +K_ITEM *_find_create_workerstatus(bool lock, int64_t userid, char *workername, bool create, const char *file2, const char *func2, const int line2, WHERE_FFL_ARGS) @@ -849,7 +851,7 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool ws_err = false, w_err = false; tv_t now; - ws_item = get_workerstatus(userid, workername); + ws_item = get_workerstatus(lock, userid, workername); if (!ws_item) { if (!create) { ws_err = true; @@ -858,7 +860,8 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, if (!w_item) { w_err = true; setnow(&now); - w_item = workers_add(NULL, userid, workername, + w_item = workers_add(NULL, lock, userid, + workername, NULL, NULL, NULL, by_default, (char *)__func__, @@ -867,7 +870,8 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, } } - K_WLOCK(workerstatus_free); + if (lock) + K_WLOCK(workerstatus_free); ws_item = k_unlink_head(workerstatus_free); DATA_WORKERSTATUS(row, ws_item); @@ -878,7 +882,8 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, add_to_ktree(workerstatus_root, ws_item); k_add_head(workerstatus_store, ws_item); - K_WUNLOCK(workerstatus_free); + if (lock) + K_WUNLOCK(workerstatus_free); if (ws_err) { LOGNOTICE("%s(): CREATED Missing workerstatus" @@ -946,12 +951,10 @@ void workerstatus_ready() while (ws_item) { DATA_WORKERSTATUS(workerstatus, ws_item); - K_RLOCK(markersummary_free); // This is the last share datestamp ms_item = find_markersummary_userid(workerstatus->userid, workerstatus->workername, NULL); - K_RUNLOCK(markersummary_free); if (ms_item) { DATA_MARKERSUMMARY(markersummary, ms_item); if (tv_newer(&(workerstatus->last_share), @@ -968,10 +971,8 @@ void workerstatus_ready() } } - K_RLOCK(sharesummary_free); ss_item = find_last_sharesummary(workerstatus->userid, workerstatus->workername); - K_RUNLOCK(sharesummary_free); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); if (tv_newer(&(workerstatus->last_share), @@ -1001,7 +1002,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares, K_ITEM *item; if (auths) { - item = find_workerstatus(auths->userid, auths->workername, + item = find_workerstatus(true, auths->userid, auths->workername, file, func, line); if (item) { DATA_WORKERSTATUS(row, item); @@ -1022,7 +1023,8 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares, pool.diffinv += shares->diff; pool.shareinv++; } - item = find_workerstatus(shares->userid, shares->workername, + item = find_workerstatus(true, shares->userid, + shares->workername, file, func, line); if (item) { DATA_WORKERSTATUS(row, item); @@ -1090,7 +1092,8 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares, } if (startup_complete && userstats) { - item = find_workerstatus(userstats->userid, userstats->workername, + item = find_workerstatus(true, userstats->userid, + userstats->workername, file, func, line); if (item) { DATA_WORKERSTATUS(row, item); @@ -1535,7 +1538,7 @@ K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *workername, } // TODO: limit how many? - item = workers_add(conn, userid, workername, diffdef, + item = workers_add(conn, true, userid, workername, diffdef, idlenotificationenabled, idlenotificationtime, by, code, inet, cd, trf_root); } @@ -2748,13 +2751,14 @@ const char *blocks_confirmed(char *confirmed) return blocks_unknown; } -void zero_on_new_block() +void zero_on_new_block(bool lock) { WORKERSTATUS *workerstatus; K_TREE_CTX ctx[1]; K_ITEM *ws_item; - K_WLOCK(workerstatus_free); + if (lock) + K_WLOCK(workerstatus_free); pool.diffacc = pool.diffinv = pool.shareacc = pool.shareinv = pool.best_sdiff = 0; ws_item = first_in_ktree(workerstatus_root, ctx); @@ -2768,12 +2772,11 @@ void zero_on_new_block() workerstatus->block_sharehi = workerstatus->block_sharerej = 0.0; ws_item = next_in_ktree(ctx); } - K_WUNLOCK(workerstatus_free); - + if (lock) + K_WUNLOCK(workerstatus_free); } -/* Currently only used at the end of the startup - * Will need to add locking if it's used, later, after startup completes */ +// Currently only used at the end of the startup void set_block_share_counters() { K_TREE_CTX ctx[1], ctx_ms[1]; @@ -2788,13 +2791,12 @@ void set_block_share_counters() INIT_SHARESUMMARY(&ss_look); INIT_MARKERSUMMARY(&ms_look); - zero_on_new_block(); + zero_on_new_block(false); ws_item = NULL; /* From the end backwards so we can skip the workinfoid's we don't * want by jumping back to just before the current worker when the * workinfoid goes below the limit */ - K_RLOCK(sharesummary_free); ss_item = last_in_ktree(sharesummary_root, ctx); while (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); @@ -2819,16 +2821,14 @@ void set_block_share_counters() * since it should always exist * However, it is simplest to simply create it * and keep going */ - K_RUNLOCK(sharesummary_free); - ws_item = find_workerstatus(sharesummary->userid, + ws_item = find_workerstatus(false, sharesummary->userid, sharesummary->workername, __FILE__, __func__, __LINE__); if (!ws_item) { - ws_item = find_create_workerstatus(sharesummary->userid, + ws_item = find_create_workerstatus(false, sharesummary->userid, sharesummary->workername, __FILE__, __func__, __LINE__); } - K_RLOCK(sharesummary_free); DATA_WORKERSTATUS(workerstatus, ws_item); } @@ -2857,7 +2857,6 @@ void set_block_share_counters() ss_item = prev_in_ktree(ctx); } - K_RUNLOCK(sharesummary_free); LOGWARNING("%s(): Updating block markersummary counters...", __func__); @@ -2905,11 +2904,11 @@ void set_block_share_counters() * since it should always exist * However, it is simplest to simply create it * and keep going */ - ws_item = find_workerstatus(markersummary->userid, + ws_item = find_workerstatus(false, markersummary->userid, markersummary->workername, __FILE__, __func__, __LINE__); if (!ws_item) { - ws_item = find_create_workerstatus(markersummary->userid, + ws_item = find_create_workerstatus(false, markersummary->userid, markersummary->workername, __FILE__, __func__, __LINE__); } diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index fd839ad7..60940a30 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -1287,7 +1287,7 @@ bool useratts_fill(PGconn *conn) return ok; } -K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, +K_ITEM *workers_add(PGconn *conn, bool lock, int64_t userid, char *workername, char *difficultydefault, char *idlenotificationenabled, char *idlenotificationtime, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) @@ -1305,9 +1305,11 @@ K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, LOGDEBUG("%s(): add", __func__); - K_WLOCK(workers_free); + if (lock) + K_WLOCK(workers_free); item = k_unlink_head(workers_free); - K_WUNLOCK(workers_free); + if (lock) + K_WUNLOCK(workers_free); DATA_WORKERS(row, item); @@ -1397,17 +1399,19 @@ unparam: unitem: if (conned) PQfinish(conn); - K_WLOCK(workers_free); + if (lock) + K_WLOCK(workers_free); if (!ret) k_add_head(workers_free, item); else { add_to_ktree(workers_root, item); k_add_head(workers_store, item); // Ensure there is a matching workerstatus - find_create_workerstatus(userid, workername, + find_create_workerstatus(lock, userid, workername, __FILE__, __func__, __LINE__); } - K_WUNLOCK(workers_free); + if (lock) + K_WUNLOCK(workers_free); return ret; } @@ -1647,7 +1651,7 @@ bool workers_fill(PGconn *conn) * This is to ensure that code can use the workerstatus tree * to reference other tables and not miss workers in the * other tables */ - find_create_workerstatus(row->userid, row->workername, + find_create_workerstatus(false, row->userid, row->workername, __FILE__, __func__, __LINE__); } if (!ok) @@ -4858,7 +4862,7 @@ flail: if (pool.workinfoid < row->workinfoid) { pool.workinfoid = row->workinfoid; pool.height = row->height; - zero_on_new_block(); + zero_on_new_block(true); } break; case BLOCKS_ORPHAN: