From 0c308a926a6110efe999f2856891073e2cebad06 Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 5 Aug 2014 22:47:56 +1000 Subject: [PATCH] ckdb - calculate most of workerstatus after data load --- src/ckdb.c | 138 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 118 insertions(+), 20 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index aa2909ef..d4ef0839 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -85,8 +85,8 @@ static char *restorefrom; * sharesummary complete='n', but for any such sharesummary * we reset it back to the first share found and it will * correct itself during the CCL reload - * Verify that all DB sharesummaries with complete='n' have - * done this + * TODO: Verify that all DB sharesummaries with complete='n' + * have done this * DB+RAM workinfo: start from newest DB createdate workinfo * DB+RAM auths: start from newest DB createdate auths * DB+RAM poolstats: newest createdate poolstats @@ -107,7 +107,7 @@ static char *restorefrom; * DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any) * will be after the last DB workinfo * DB+RAM accountbalance (TODO): resolved by shares/workinfo/blocks - * RAM workerstats: last_auth, last_share, last_stats all handled by + * RAM workerstatus: last_auth, last_share, last_stats all handled by * DB load up to whatever the CCL restart point is, and then * corrected with the CCL reload * last_idle will be the last idle userstats in the CCL load or 0 @@ -144,7 +144,8 @@ static char *restorefrom; * 2) shares/shareerrors: any record that matches an incomplete DB * sharesummary that hasn't been reset, will reset the sharesummary * so that the sharesummary will be recalculated - * The record is processed normally in both situations + * The record is processed normally with or without the reset + * If the sharesummary is complete, the record is discarded * 3) ageworkinfo records are also handled by the shares date * while processing, any records already aged are not updated * and a warning is displayed if there were any matching shares @@ -567,8 +568,8 @@ static const tv_t date_first = { DATE_DEBUG, 0L }; // Different input data handling static bool reloading = false; -// Tell the summarizer data load is complete -static bool summarizer_go = false; +// Data load is complete +static bool startup_complete = false; // Tell the summarizer to die static bool summarizer_die = false; @@ -1057,6 +1058,7 @@ typedef struct userstats { static K_TREE *userstats_root; static K_TREE *userstats_statsdate_root; // ordered by statsdate first +static K_TREE *userstats_workerstatus_root; // during data load static K_LIST *userstats_free; static K_STORE *userstats_store; // Awaiting EOS @@ -1770,11 +1772,63 @@ static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool #define find_create_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, true) #define find_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, false) -/* TODO: Change this to calculate after the DB load rather than during - for: shares, userstats and sharesummary - auths will fully populate the workerstats tree to use for the - reverse search back into the shares, userstats and sharesummary - for the latest records */ +static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b); +static double cmp_sharesummary(K_ITEM *a, K_ITEM *b); + +/* All data is loaded, now update workerstatus last_share, last_idle, last_stats + * shares are all part of a sharesummary so no need to search shares + */ +static void workerstatus_ready() +{ + K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1]; + K_ITEM *ws_item, look, *us_item, *ss_item; + USERSTATS userstats; + SHARESUMMARY sharesummary; + + ws_item = first_in_ktree(workerstatus_root, ws_ctx); + while (ws_item) { + userstats.userid = DATA_WORKERSTATUS(ws_item)->userid; + STRNCPY(userstats.workername, DATA_WORKERSTATUS(ws_item)->workername); + userstats.statsdate.tv_sec = date_eot.tv_sec; + userstats.statsdate.tv_usec = date_eot.tv_usec; + look.data = (void *)(&userstats); + us_item = find_before_in_ktree(userstats_workerstatus_root, &look, + cmp_userstats_workerstatus, us_ctx); + if (us_item) { + if (DATA_USERSTATS(us_item)->idle) { + if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_idle), + &(DATA_USERSTATS(us_item)->statsdate))) { + copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_idle), + &(DATA_USERSTATS(us_item)->statsdate)); + } + } else { + if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_stats), + &(DATA_USERSTATS(us_item)->statsdate))) { + copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_stats), + &(DATA_USERSTATS(us_item)->statsdate)); + } + } + } + + sharesummary.userid = DATA_WORKERSTATUS(ws_item)->userid; + STRNCPY(sharesummary.workername, DATA_WORKERSTATUS(ws_item)->workername); + sharesummary.workinfoid = 0x7fffffffffffffffLL; + look.data = (void *)(&sharesummary); + ss_item = find_before_in_ktree(sharesummary_root, &look, cmp_sharesummary, ss_ctx); + if (ss_item) { + if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_share), + &(DATA_SHARESUMMARY(ss_item)->lastshare))) { + copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_share), + &(DATA_SHARESUMMARY(ss_item)->lastshare)); + } + } + + ws_item = next_in_ktree(ws_ctx); + } + + free_ktree(userstats_workerstatus_root, NULL); +} + static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats, SHARESUMMARY *sharesummary) { @@ -1790,14 +1844,14 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta copy_tv(&(row->last_auth), &(auths->createdate)); } - if (shares) { + if (startup_complete && shares) { item = find_create_workerstatus(shares->userid, shares->workername); row = DATA_WORKERSTATUS(item); if (tv_newer(&(row->last_share), &(shares->createdate))) copy_tv(&(row->last_share), &(shares->createdate)); } - if (userstats) { + if (startup_complete && userstats) { item = find_create_workerstatus(userstats->userid, userstats->workername); row = DATA_WORKERSTATUS(item); if (userstats->idle) { @@ -1809,7 +1863,7 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta } } - if (sharesummary) { + if (startup_complete && sharesummary) { item = find_create_workerstatus(sharesummary->userid, sharesummary->workername); row = DATA_WORKERSTATUS(item); if (tv_newer(&(row->last_share), &(sharesummary->lastshare))) @@ -2822,6 +2876,17 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row static double cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b); static double cmp_shares(K_ITEM *a, K_ITEM *b); +/* N.B. a DB check can be done to find sharesummaries that were missed being + * aged (and a possible problem with the aging process): + * e.g. for a date D in the past of at least a few hours + * select count(*) from sharesummary where createdate<'D' and complete='n'; + * and can be easily corrected: + * update sharesummary set complete='a' where createdate<'D' and complete='n'; + * It's important to make sure the D value is far enough in the past such that + * all the matching sharesummary records in ckdb have certainly completed + * ckdb would need a restart to get the updated DB information though it would + * not affect current ckdb code + */ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, char *by, char *code, char *inet, tv_t *cd) { @@ -4708,6 +4773,27 @@ static double cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b) return c; } +/* order by userid asc,workername asc,statsdate asc,poolinstance asc + built during data load to update workerstatus at the end of the load */ +static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b) +{ + double c = (double)(DATA_USERSTATS(a)->userid - + DATA_USERSTATS(b)->userid); + if (c == 0) { + c = (double)strcmp(DATA_USERSTATS(a)->workername, + DATA_USERSTATS(b)->workername); + if (c == 0) { + c = tvdiff(&(DATA_USERSTATS(a)->statsdate), + &(DATA_USERSTATS(b)->statsdate)); + if (c == 0) { + c = (double)strcmp(DATA_USERSTATS(a)->poolinstance, + DATA_USERSTATS(b)->poolinstance); + } + } + } + return c; +} + static bool userstats_add_db(PGconn *conn, USERSTATS *row) { ExecStatusType rescode; @@ -4865,7 +4951,11 @@ advancetogo: userstats_root = add_to_ktree(userstats_root, us_match, cmp_userstats); userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_match, - cmp_userstats_statsdate); + cmp_userstats_statsdate); + if (!startup_complete) { + userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, us_match, + cmp_userstats_workerstatus); + } k_add_head(userstats_store, us_match); } } @@ -5037,6 +5127,8 @@ static bool userstats_fill(PGconn *conn) userstats_root = add_to_ktree(userstats_root, item, cmp_userstats); userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, item, cmp_userstats_statsdate); + userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, item, + cmp_userstats_workerstatus); k_add_head(userstats_store, item); workerstatus_update(NULL, NULL, row, NULL); @@ -5402,6 +5494,7 @@ static bool setup_data() userstats_db = k_new_store(userstats_free); userstats_root = new_ktree(); userstats_statsdate_root = new_ktree(); + userstats_workerstatus_root = new_ktree(); userstats_free->dsp_func = dsp_userstats; userstats_db_root = new_ktree(); @@ -5416,6 +5509,8 @@ static bool setup_data() if (!reload()) return false; + workerstatus_ready(); + workinfo_current = last_in_ktree(workinfo_height_root, ctx); if (workinfo_current) { STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1); @@ -7052,7 +7147,8 @@ static void summarise_userstats() memcpy(userstats, DATA_USERSTATS(first), sizeof(USERSTATS)); userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats, ctx2); - userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first, cmp_userstats_statsdate, ctx2); + userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first, + cmp_userstats_statsdate, ctx2); k_unlink_item(userstats_store, first); k_add_head(userstats_summ, first); @@ -7077,7 +7173,8 @@ static void summarise_userstats() userstats->summarycount += DATA_USERSTATS(next)->summarycount; userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats, ctx2); - userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next, cmp_userstats_statsdate, ctx2); + userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next, + cmp_userstats_statsdate, ctx2); k_unlink_item(userstats_store, next); k_add_head(userstats_summ, next); } @@ -7128,7 +7225,8 @@ static void summarise_userstats() k_list_transfer_to_tail(userstats_summ, userstats_free); k_add_head(userstats_store, new); userstats_root = add_to_ktree(userstats_root, new, cmp_userstats); - userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new, cmp_userstats_statsdate); + userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new, + cmp_userstats_statsdate); if (upgrade) K_WUNLOCK(userstats_free); @@ -7155,7 +7253,7 @@ static void *summariser(__maybe_unused void *arg) { pthread_detach(pthread_self()); - while (!summarizer_die && !summarizer_go) + while (!summarizer_die && !startup_complete) cksleep_ms(42); while (!summarizer_die) { @@ -7395,7 +7493,7 @@ static void *listener(void *arg) LOGWARNING("%s(): ckdb ready", __func__); - summarizer_go = true; + startup_complete = true; while (true) { dealloc(buf);