Browse Source

ckdb - allow shift processing to start during the reload

master
kanoi 9 years ago
parent
commit
01ee4fbfd1
  1. 62
      src/ckdb.c
  2. 29
      src/ckdb.h
  3. 6
      src/ckdb_cmd.c
  4. 49
      src/ckdb_data.c
  5. 12
      src/ckdb_dbio.c

62
src/ckdb.c

@ -253,6 +253,8 @@ bool db_users_complete = false;
bool db_load_complete = false;
// Different input data handling
bool reloading = false;
// Start marks processing during a larger reload
static bool reloaded_N_files = false;
// Data load is complete
bool startup_complete = false;
// Set to true the first time workqueue reaches 0 after startup
@ -1483,13 +1485,27 @@ static bool setup_data()
sec -= min * 60.0;
LOGWARNING("reload complete %.0fm %.3fs", min, sec);
// full lock access since mark processing can occur
ck_wlock(&process_pplns_lock);
K_WLOCK(workerstatus_free);
K_RLOCK(sharesummary_free);
K_RLOCK(workmarkers_free);
K_RLOCK(markersummary_free);
set_block_share_counters();
if (!everyone_die)
workerstatus_ready();
K_RUNLOCK(markersummary_free);
K_RUNLOCK(workmarkers_free);
K_RUNLOCK(sharesummary_free);
K_WUNLOCK(workerstatus_free);
ck_wunlock(&process_pplns_lock);
if (everyone_die)
return false;
workerstatus_ready();
workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) {
DATA_WORKINFO(wic, workinfo_current);
@ -3083,9 +3099,20 @@ static void *summariser(__maybe_unused void *arg)
rename_proc("db_summariser");
/* Don't do any summarisation until the reload queue completes coz:
* 1) It locks/accesses a lot of data - workinfo/markersummary that
* can slow down the reload
* 2) If you stop and restart ckdb this wont affect the restart point
* Thus it's OK to do it later
* 3) It does I/O to bitcoind which is slow ...
* 4) It triggers the payout generation which also accesses a lot of
* data - workinfo/markersummary - but it wont affect a later
* restart point if it hasn't been done. Thus it's OK to do it later
*/
while (!everyone_die && !reload_queue_complete)
cksleep_ms(42);
LOGWARNING("%s() Start processing...", __func__);
summariser_using_data = true;
while (!everyone_die) {
@ -3628,7 +3655,13 @@ static void *marker(__maybe_unused void *arg)
rename_proc("db_marker");
while (!everyone_die && !reload_queue_complete)
/* We want this to start during the CCL reload so that if we run a
* large reload and it fails at some point, the next reload will not
* always have to go back to the same reload point as before due to
* no new workmarkers being completed/processed
* However, don't start during the first N reload files so that a
* normal ckdb restart reload won't slow down */
while (!everyone_die && !reloaded_N_files && !reload_queue_complete)
cksleep_ms(42);
if (sharesummary_marks_limit) {
@ -3637,6 +3670,7 @@ static void *marker(__maybe_unused void *arg)
return NULL;
}
LOGWARNING("%s() Start processing...", __func__);
marker_using_data = true;
while (!everyone_die) {
@ -3692,6 +3726,7 @@ static void *logger(__maybe_unused void *arg)
snprintf(buf, sizeof(buf), "db%s_logger", dbcode);
rename_proc(buf);
LOGWARNING("%s() Start processing...", __func__);
logger_using_data = true;
setnow(&now);
@ -3809,6 +3844,7 @@ static void *socketer(__maybe_unused void *arg)
while (!everyone_die && !db_users_complete)
cksem_mswait(&socketer_sem, 420);
LOGWARNING("%s() Start processing...", __func__);
socketer_using_data = true;
want_first = true;
@ -4484,6 +4520,14 @@ static bool logopen(char **filename, FILE **fp, bool *apipe)
return false;
}
// How many files need to be processed before flagging reloaded_N_files
#define RELOAD_N_FILES 2
// optioncontrol name to override the above value
#define RELOAD_N_FILES_STR "ReloadNFiles"
// How many lines in a reload file required to count it
#define RELOAD_N_COUNT 1000
/* If the reload start file is missing and -r was specified correctly:
* touch the filename reported in "Failed to open 'filename'",
* if ckdb aborts at the beginning of the reload, then start again */
@ -4501,11 +4545,15 @@ static bool reload_from(tv_t *start)
tv_t now, begin;
double diff;
FILE *fp = NULL;
int file_N_limit;
reload_buf = malloc(MAX_READ);
if (!reload_buf)
quithere(1, "(%d) OOM", MAX_READ);
file_N_limit = (int)sys_setting(RELOAD_N_FILES_STR, RELOAD_N_FILES,
&date_eot);
reloading = true;
copy_tv(&reload_timestamp, start);
@ -4569,6 +4617,14 @@ static bool reload_from(tv_t *start)
LOGWARNING("%s(): confirm range complete", __func__);
break;
}
/* Used by marker() to start mark generation during a longer
* than normal reload */
if (count > RELOAD_N_COUNT) {
if (--file_N_limit < 1)
reloaded_N_files = true;
}
filename = rotating_filename(restorefrom, reload_timestamp.tv_sec);
ok = logopen(&filename, &fp, &apipe);
if (!ok) {

29
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.3"
#define CKDB_VERSION DB_VERSION"-1.350"
#define CKDB_VERSION DB_VERSION"-1.400"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -2212,16 +2212,16 @@ extern K_ITEM *_optional_name(K_TREE *trf_root, char *name, int len, char *patt,
extern K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt,
char *reply, size_t siz, WHERE_FFL_ARGS);
extern cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b);
extern K_ITEM *get_workerstatus(int64_t userid, char *workername);
#define find_create_workerstatus(_u, _w, _file, _func, _line) \
_find_create_workerstatus(_u, _w, true, _file, _func, _line, WHERE_FFL_HERE)
#define find_workerstatus(_u, _w, _file, _func, _line) \
_find_create_workerstatus(_u, _w, false, _file, _func, _line, WHERE_FFL_HERE)
extern K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
bool create, const char *file2,
const char *func2, const int line2,
WHERE_FFL_ARGS);
extern K_ITEM *get_workerstatus(bool lock, int64_t userid, char *workername);
#define find_create_workerstatus(_l, _u, _w, _file, _func, _line) \
_find_create_workerstatus(_l, _u, _w, true, _file, _func, _line, WHERE_FFL_HERE)
#define find_workerstatus(_l, _u, _w, _file, _func, _line) \
_find_create_workerstatus(_l, _u, _w, false, _file, _func, _line, WHERE_FFL_HERE)
extern K_ITEM *_find_create_workerstatus(bool lock, int64_t userid,
char *workername, bool create,
const char *file2, const char *func2,
const int line2, WHERE_FFL_ARGS);
extern void zero_all_active(tv_t *when);
extern void workerstatus_ready();
#define workerstatus_update(_auths, _shares, _userstats) \
@ -2336,7 +2336,7 @@ extern cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_blocks(int32_t height, char *blockhash, K_TREE_CTX *ctx);
extern K_ITEM *find_prev_blocks(int32_t height, K_TREE_CTX *ctx);
extern const char *blocks_confirmed(char *confirmed);
extern void zero_on_new_block();
extern void zero_on_new_block(bool lock);
extern void set_block_share_counters();
extern bool check_update_blocks_stats(tv_t *stats);
#define set_blockcreatedate(_h) _set_blockcreatedate(_h, WHERE_FFL_HERE)
@ -2484,8 +2484,9 @@ extern K_ITEM *useratts_add(PGconn *conn, char *username, char *attname,
bool begun);
extern bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd);
extern bool useratts_fill(PGconn *conn);
extern K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername,
char *difficultydefault, char *idlenotificationenabled,
extern K_ITEM *workers_add(PGconn *conn, bool lock, int64_t userid,
char *workername, char *difficultydefault,
char *idlenotificationenabled,
char *idlenotificationtime, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root);
extern bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,

6
src/ckdb_cmd.c

@ -1734,7 +1734,8 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
DATA_WORKERS_NULL(workers, w_item);
while (w_item && workers->userid == users->userid) {
if (CURRENT(&(workers->expirydate))) {
ws_item = get_workerstatus(users->userid, workers->workername);
ws_item = get_workerstatus(true, users->userid,
workers->workername);
if (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item);
t_diffacc += workerstatus->block_diffacc;
@ -1996,7 +1997,8 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
rows = 0;
while (w_item && workers->userid == users->userid) {
if (CURRENT(&(workers->expirydate))) {
ws_item = get_workerstatus(users->userid, workers->workername);
ws_item = get_workerstatus(true, users->userid,
workers->workername);
if (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item);
K_RLOCK(workerstatus_free);

49
src/ckdb_data.c

@ -815,7 +815,7 @@ cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b)
/* TODO: replace a lot of the code for all data types that codes finds,
* each with specific functions for finding, to centralise the finds,
* with passed ctx's */
K_ITEM *get_workerstatus(int64_t userid, char *workername)
K_ITEM *get_workerstatus(bool lock, int64_t userid, char *workername)
{
WORKERSTATUS workerstatus;
K_TREE_CTX ctx[1];
@ -826,8 +826,10 @@ K_ITEM *get_workerstatus(int64_t userid, char *workername)
INIT_WORKERSTATUS(&look);
look.data = (void *)(&workerstatus);
if (lock)
K_RLOCK(workerstatus_free);
find = find_in_ktree(workerstatus_root, &look, ctx);
if (lock)
K_RUNLOCK(workerstatus_free);
return find;
}
@ -839,7 +841,7 @@ K_ITEM *get_workerstatus(int64_t userid, char *workername)
* This has 2 sets of file/func/line to allow 2 levels of traceback
* to see why it happened
*/
K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
K_ITEM *_find_create_workerstatus(bool lock, int64_t userid, char *workername,
bool create, const char *file2,
const char *func2, const int line2,
WHERE_FFL_ARGS)
@ -849,7 +851,7 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
bool ws_err = false, w_err = false;
tv_t now;
ws_item = get_workerstatus(userid, workername);
ws_item = get_workerstatus(lock, userid, workername);
if (!ws_item) {
if (!create) {
ws_err = true;
@ -858,7 +860,8 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
if (!w_item) {
w_err = true;
setnow(&now);
w_item = workers_add(NULL, userid, workername,
w_item = workers_add(NULL, lock, userid,
workername,
NULL, NULL, NULL,
by_default,
(char *)__func__,
@ -867,6 +870,7 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
}
}
if (lock)
K_WLOCK(workerstatus_free);
ws_item = k_unlink_head(workerstatus_free);
@ -878,6 +882,7 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
add_to_ktree(workerstatus_root, ws_item);
k_add_head(workerstatus_store, ws_item);
if (lock)
K_WUNLOCK(workerstatus_free);
if (ws_err) {
@ -946,12 +951,10 @@ void workerstatus_ready()
while (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item);
K_RLOCK(markersummary_free);
// This is the last share datestamp
ms_item = find_markersummary_userid(workerstatus->userid,
workerstatus->workername,
NULL);
K_RUNLOCK(markersummary_free);
if (ms_item) {
DATA_MARKERSUMMARY(markersummary, ms_item);
if (tv_newer(&(workerstatus->last_share),
@ -968,10 +971,8 @@ void workerstatus_ready()
}
}
K_RLOCK(sharesummary_free);
ss_item = find_last_sharesummary(workerstatus->userid,
workerstatus->workername);
K_RUNLOCK(sharesummary_free);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (tv_newer(&(workerstatus->last_share),
@ -1001,7 +1002,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
K_ITEM *item;
if (auths) {
item = find_workerstatus(auths->userid, auths->workername,
item = find_workerstatus(true, auths->userid, auths->workername,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
@ -1022,7 +1023,8 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
pool.diffinv += shares->diff;
pool.shareinv++;
}
item = find_workerstatus(shares->userid, shares->workername,
item = find_workerstatus(true, shares->userid,
shares->workername,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
@ -1090,7 +1092,8 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
}
if (startup_complete && userstats) {
item = find_workerstatus(userstats->userid, userstats->workername,
item = find_workerstatus(true, userstats->userid,
userstats->workername,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
@ -1535,7 +1538,7 @@ K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *workername,
}
// TODO: limit how many?
item = workers_add(conn, userid, workername, diffdef,
item = workers_add(conn, true, userid, workername, diffdef,
idlenotificationenabled, idlenotificationtime,
by, code, inet, cd, trf_root);
}
@ -2748,12 +2751,13 @@ const char *blocks_confirmed(char *confirmed)
return blocks_unknown;
}
void zero_on_new_block()
void zero_on_new_block(bool lock)
{
WORKERSTATUS *workerstatus;
K_TREE_CTX ctx[1];
K_ITEM *ws_item;
if (lock)
K_WLOCK(workerstatus_free);
pool.diffacc = pool.diffinv = pool.shareacc =
pool.shareinv = pool.best_sdiff = 0;
@ -2768,12 +2772,11 @@ void zero_on_new_block()
workerstatus->block_sharehi = workerstatus->block_sharerej = 0.0;
ws_item = next_in_ktree(ctx);
}
if (lock)
K_WUNLOCK(workerstatus_free);
}
/* Currently only used at the end of the startup
* Will need to add locking if it's used, later, after startup completes */
// Currently only used at the end of the startup
void set_block_share_counters()
{
K_TREE_CTX ctx[1], ctx_ms[1];
@ -2788,13 +2791,12 @@ void set_block_share_counters()
INIT_SHARESUMMARY(&ss_look);
INIT_MARKERSUMMARY(&ms_look);
zero_on_new_block();
zero_on_new_block(false);
ws_item = NULL;
/* From the end backwards so we can skip the workinfoid's we don't
* want by jumping back to just before the current worker when the
* workinfoid goes below the limit */
K_RLOCK(sharesummary_free);
ss_item = last_in_ktree(sharesummary_root, ctx);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
@ -2819,16 +2821,14 @@ void set_block_share_counters()
* since it should always exist
* However, it is simplest to simply create it
* and keep going */
K_RUNLOCK(sharesummary_free);
ws_item = find_workerstatus(sharesummary->userid,
ws_item = find_workerstatus(false, sharesummary->userid,
sharesummary->workername,
__FILE__, __func__, __LINE__);
if (!ws_item) {
ws_item = find_create_workerstatus(sharesummary->userid,
ws_item = find_create_workerstatus(false, sharesummary->userid,
sharesummary->workername,
__FILE__, __func__, __LINE__);
}
K_RLOCK(sharesummary_free);
DATA_WORKERSTATUS(workerstatus, ws_item);
}
@ -2857,7 +2857,6 @@ void set_block_share_counters()
ss_item = prev_in_ktree(ctx);
}
K_RUNLOCK(sharesummary_free);
LOGWARNING("%s(): Updating block markersummary counters...", __func__);
@ -2905,11 +2904,11 @@ void set_block_share_counters()
* since it should always exist
* However, it is simplest to simply create it
* and keep going */
ws_item = find_workerstatus(markersummary->userid,
ws_item = find_workerstatus(false, markersummary->userid,
markersummary->workername,
__FILE__, __func__, __LINE__);
if (!ws_item) {
ws_item = find_create_workerstatus(markersummary->userid,
ws_item = find_create_workerstatus(false, markersummary->userid,
markersummary->workername,
__FILE__, __func__, __LINE__);
}

12
src/ckdb_dbio.c

@ -1287,7 +1287,7 @@ bool useratts_fill(PGconn *conn)
return ok;
}
K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername,
K_ITEM *workers_add(PGconn *conn, bool lock, int64_t userid, char *workername,
char *difficultydefault, char *idlenotificationenabled,
char *idlenotificationtime, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root)
@ -1305,8 +1305,10 @@ K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername,
LOGDEBUG("%s(): add", __func__);
if (lock)
K_WLOCK(workers_free);
item = k_unlink_head(workers_free);
if (lock)
K_WUNLOCK(workers_free);
DATA_WORKERS(row, item);
@ -1397,6 +1399,7 @@ unparam:
unitem:
if (conned)
PQfinish(conn);
if (lock)
K_WLOCK(workers_free);
if (!ret)
k_add_head(workers_free, item);
@ -1404,9 +1407,10 @@ unitem:
add_to_ktree(workers_root, item);
k_add_head(workers_store, item);
// Ensure there is a matching workerstatus
find_create_workerstatus(userid, workername,
find_create_workerstatus(lock, userid, workername,
__FILE__, __func__, __LINE__);
}
if (lock)
K_WUNLOCK(workers_free);
return ret;
@ -1647,7 +1651,7 @@ bool workers_fill(PGconn *conn)
* This is to ensure that code can use the workerstatus tree
* to reference other tables and not miss workers in the
* other tables */
find_create_workerstatus(row->userid, row->workername,
find_create_workerstatus(false, row->userid, row->workername,
__FILE__, __func__, __LINE__);
}
if (!ok)
@ -4858,7 +4862,7 @@ flail:
if (pool.workinfoid < row->workinfoid) {
pool.workinfoid = row->workinfoid;
pool.height = row->height;
zero_on_new_block();
zero_on_new_block(true);
}
break;
case BLOCKS_ORPHAN:

Loading…
Cancel
Save