From 252233a3180e585c93994675efbb611af0aa2a89 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sun, 7 Aug 2016 23:26:38 +1000 Subject: [PATCH] ckdb - fix workmarker handling, ageing, stopping early and allow limiting workmarker loading with -w --- src/ckdb.c | 74 +++++++++++++++++++++--- src/ckdb.h | 2 +- src/ckdb_data.c | 150 ++++++++++++++++++++++++++++++++++++++++++++++++ src/ckdb_dbio.c | 40 +++++++++---- 4 files changed, 247 insertions(+), 19 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index e09c7ee0..3b26208f 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -746,6 +746,8 @@ K_TREE *workmarkers_root; K_TREE *workmarkers_workinfoid_root; K_LIST *workmarkers_free; K_STORE *workmarkers_store; +// static for key_update +static K_STORE *workmarkers_key_store; // MARKS K_TREE *marks_root; @@ -1963,6 +1965,10 @@ static void dealloc_storage() FREE_TREE(workmarkers_workinfoid); FREE_TREE(workmarkers); + if (workmarkers_key_store) { + k_list_transfer_to_tail_nolock(workmarkers_key_store, + workmarkers_store); + } FREE_STORE_DATA(workmarkers); FREE_LIST_DATA(workmarkers); @@ -6776,29 +6782,72 @@ sayonara: static bool make_keysummaries() { K_TREE_CTX ctx[1]; + KEYSHARESUMMARY *keysharesummary; WORKMARKERS *workmarkers; - K_ITEM *wm_item, *wm_last = NULL; + K_ITEM *kss_item, *wm_item, *wm_last = NULL; tv_t proc_lock_stt, proc_lock_got, proc_lock_fin, now; - bool ok = false; + int64_t kss_ready_wid; + bool ok = false, pending; + + // Find the highest complete keysharesummary workinfoid + kss_ready_wid = 0; + K_RLOCK(keysharesummary_free); + kss_item = first_in_ktree(keysharesummary_root, ctx); + while (kss_item) { + DATA_KEYSHARESUMMARY(keysharesummary, kss_item); + if (keysharesummary->complete[0] == SUMMARY_COMPLETE && + kss_ready_wid < keysharesummary->workinfoid) { + kss_ready_wid = keysharesummary->workinfoid; + } + kss_item = next_in_ktree(ctx); + } + K_RUNLOCK(keysharesummary_free); + + if (kss_ready_wid > 0 && workmarkers_key_store->count > 0) { + pending = true; + wm_item = STORE_HEAD_NOLOCK(workmarkers_key_store); + while (wm_item) { + DATA_WORKMARKERS(workmarkers, wm_item); + if (workmarkers->workinfoidend > kss_ready_wid) + break; + // move the item into the processing trees/store + k_unlink_item_nolock(workmarkers_key_store, wm_item); + K_WLOCK(workmarkers_free); + add_to_ktree(workmarkers_root, wm_item); + add_to_ktree(workmarkers_workinfoid_root, wm_item); + k_add_head(workmarkers_store, wm_item); + K_WUNLOCK(workmarkers_free); + wm_item = STORE_HEAD_NOLOCK(workmarkers_key_store); + } + } + pending = false; K_RLOCK(workmarkers_free); + // Any workmarkers still pending in the key_store? + if (workmarkers_key_store->count > 0) + pending = true; wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); while (wm_item) { DATA_WORKMARKERS(workmarkers, wm_item); if (!CURRENT(&(workmarkers->expirydate))) break; - // find the oldest READY workinfoid + // find the oldest READY workmarker if (WMREADY(workmarkers->status)) wm_last = wm_item; wm_item = prev_in_ktree(ctx); } K_RUNLOCK(workmarkers_free); + // all false means we've finished if (!wm_last) - return false; + return (pending || reloading); DATA_WORKMARKERS(workmarkers, wm_last); + // Not ready to be processed yet + if (kss_ready_wid < workmarkers->workinfoidend) + return true; + LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/" "Stt %"PRId64"/%s/%s", __func__, workmarkers->markerid, workmarkers->poolinstance, @@ -6861,8 +6910,9 @@ static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin) bool status_failure = false; /* Now that we know the workmarkers of interest, - * switch them from MARKER_PROCESSED to MARKER_READY - * and remove all after + * switch them from MARKER_PROCESSED to MARKER_READY, + * remove them and add them to key_store, + * then remove all after them * The markersummaries already exist * We are generating the missing keysummaries */ K_WLOCK(workmarkers_free); @@ -6889,8 +6939,13 @@ static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin) workmarkers->status, MARKER_PROCESSED); } else { - // Not part of either tree key + remove_from_ktree(workmarkers_workinfoid_root, + wm_item); + remove_from_ktree(workmarkers_root, wm_item); + k_unlink_item(workmarkers_store, wm_item); STRNCPY(workmarkers->status, MARKER_READY_STR); + // key_store will be in workinfoid ascending order + k_add_head(workmarkers_key_store, wm_item); } } else break; @@ -7063,6 +7118,9 @@ static void update_keysummary() ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); reload_breakqueue_store = k_new_store(breakqueue_free); reload_done_breakqueue_store = k_new_store(breakqueue_free); + // Must exist (but will be empty) + cmd_breakqueue_store = k_new_store(breakqueue_free); + cmd_done_breakqueue_store = k_new_store(breakqueue_free); #if LOCK_CHECK DLPRIO(breakqueue, PRIO_TERMINAL); @@ -7079,6 +7137,8 @@ static void update_keysummary() alloc_storage(); + workmarkers_key_store = k_new_store(workmarkers_free); + setnow(&db_stt); if (!getdata1() || everyone_die) diff --git a/src/ckdb.h b/src/ckdb.h index 99d631cd..0fe69735 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.301" +#define CKDB_VERSION DB_VERSION"-2.302" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_data.c b/src/ckdb_data.c index a2f9ad13..c8702d67 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -2631,6 +2631,151 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername) return item; } +// key_update must age keysharesummary directly +static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, + char *code, char *inet, tv_t *cd) +{ + static int64_t last_attempted_id = -1; + static int64_t prev_found = 0; + static int repeat; + + char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ]; + int64_t kss_count_tot, s_count_tot, s_diff_tot; + int64_t kss_count, s_count, s_diff; + tv_t kss_first_min, kss_last_max; + tv_t kss_first, kss_last; + int32_t wid_count; + KEYSHARESUMMARY lookkeysharesummary, *keysharesummary; + K_TREE_CTX ctx[1]; + K_ITEM look, *kss_item; + int64_t age_id, do_id, to_id; + bool ok, found; + + LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); + + age_id = prev_found; + + /* Find the oldest 'unaged' + * keysharesummary < workinfoid and >= prev_found */ + lookkeysharesummary.workinfoid = prev_found; + lookkeysharesummary.keytype[0] = '\0'; + lookkeysharesummary.key = EMPTY; + INIT_KEYSHARESUMMARY(&look); + look.data = (void *)(&lookkeysharesummary); + + K_RLOCK(keysharesummary_free); + kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item); + + DATE_ZERO(&kss_first_min); + DATE_ZERO(&kss_last_max); + kss_count_tot = s_count_tot = s_diff_tot = 0; + + found = false; + while (kss_item && keysharesummary->workinfoid < workinfoid) { + if (keysharesummary->complete[0] == SUMMARY_NEW) { + age_id = keysharesummary->workinfoid; + prev_found = age_id; + found = true; + break; + } + kss_item = next_in_ktree(ctx); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item); + } + K_RUNLOCK(keysharesummary_free); + + LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); + // Don't repeat searching old items to avoid accessing their ram + if (!found) + prev_found = workinfoid; + else { + /* Process all the consecutive keysharesummaries that's aren't aged + * This way we find each oldest 'batch' of keysharesummaries that have + * been missed and can report the range of data that was aged, + * which would normally just be an approx 10min set of workinfoids + * from the last time ckpool stopped + * Each next group of unaged keysharesummaries following this, will be + * picked up by each next aging */ + wid_count = 0; + do_id = age_id; + to_id = 0; + do { + ok = workinfo_age(do_id, poolinstance, by, code, inet, + cd, &kss_first, &kss_last, &kss_count, + &s_count, &s_diff); + + kss_count_tot += kss_count; + s_count_tot += s_count; + s_diff_tot += s_diff; + if (kss_first_min.tv_sec == 0 || !tv_newer(&kss_first_min, &kss_first)) + copy_tv(&kss_first_min, &kss_first); + if (tv_newer(&kss_last_max, &kss_last)) + copy_tv(&kss_last_max, &kss_last); + + if (!ok) + break; + + to_id = do_id; + wid_count++; + K_RLOCK(keysharesummary_free); + while (kss_item && keysharesummary->workinfoid == to_id) { + kss_item = next_in_ktree(ctx); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item); + } + K_RUNLOCK(keysharesummary_free); + + if (kss_item) { + do_id = keysharesummary->workinfoid; + if (do_id >= workinfoid) + break; + if (keysharesummary->complete[0] != SUMMARY_NEW) + break; + } + } while (kss_item); + if (to_id == 0) { + if (last_attempted_id != age_id || ++repeat >= 10) { + // Approx once every 5min since workinfo defaults to ~30s + LOGWARNING("%s() Auto-age failed to age %"PRId64, + __func__, age_id); + last_attempted_id = age_id; + repeat = 0; + } + } else { + char idrange[64]; + char keysharerange[256]; + if (to_id != age_id) { + snprintf(idrange, sizeof(idrange), + "from %"PRId64" to %"PRId64, + age_id, to_id); + } else { + snprintf(idrange, sizeof(idrange), + "%"PRId64, age_id); + } + tv_to_buf(&kss_first_min, min_buf, sizeof(min_buf)); + if (tv_equal(&kss_first_min, &kss_last_max)) { + snprintf(keysharerange, sizeof(keysharerange), + "share date %s", min_buf); + } else { + tv_to_buf(&kss_last_max, max_buf, sizeof(max_buf)); + snprintf(keysharerange, sizeof(keysharerange), + "share dates %s to %s", + min_buf, max_buf); + } + LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") " + "share%s %"PRId64" keysharesummar%s %"PRId32 + " workinfoid%s %s %s", + __func__, + s_count_tot, s_diff_tot, + (s_count_tot == 1) ? "" : "s", + kss_count_tot, + (kss_count_tot == 1) ? "y" : "ies", + wid_count, + (wid_count == 1) ? "" : "s", + idrange, keysharerange); + } + } +} + /* TODO: markersummary checking? * However, there should be no issues since the sharesummaries are removed */ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, @@ -2652,6 +2797,11 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, int64_t age_id, do_id, to_id; bool ok, found; + if (key_update) { + key_auto_age_older(workinfoid, poolinstance, by, code, inet, cd); + return; + } + LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); age_id = prev_found; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index e69c7859..96718995 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3188,9 +3188,10 @@ bool workinfo_fill(PGconn *conn) " from workinfo where "EDDB"=$1 and" " ((workinfoid>=$2 and workinfoid<=$3)"); - // If we aren't loading the full range, ensure the necessary ones are loaded - if ((!dbload_only_sharesummary && dbload_workinfoid_start != -1) || - dbload_workinfoid_finish != MAXID) { + /* If we aren't loading the full range, ensure the necessary ones are loaded + * However, don't for key_update to allow a possible lower memory profile */ + if (((!dbload_only_sharesummary && dbload_workinfoid_start != -1) || + dbload_workinfoid_finish != MAXID) && !key_update) { APPEND_REALLOC(sel, off, len, // we need all blocks workinfoids " or workinfoid in (select workinfoid from blocks)" @@ -4825,6 +4826,7 @@ dokey: } else { // Not part of either tree key STRNCPY(workmarkers->status, MARKER_PROCESSED_STR); + ok = true; } setnow(&kdb_fin); rollback: @@ -8831,7 +8833,8 @@ bool workmarkers_fill(PGconn *conn) PGresult *res; K_ITEM *item, *wi_item; WORKINFO *workinfo; - int n, i; + char *params[1]; + int n, i, par = 0; WORKMARKERS *row; char *field; char *sel; @@ -8840,13 +8843,25 @@ bool workmarkers_fill(PGconn *conn) LOGDEBUG("%s(): select", __func__); - // TODO: limit how far back - sel = "select " - "markerid,poolinstance,workinfoidend,workinfoidstart," - "description,status" - HISTORYDATECONTROL - " from workmarkers"; - res = PQexec(conn, sel, CKPQ_READ); + // Allow limiting the load for key_update + if (key_update && dbload_workinfoid_start != -1) { + sel = "select " + "markerid,poolinstance,workinfoidend,workinfoidstart," + "description,status" + HISTORYDATECONTROL + " from workmarkers where workinfoidstart>=$1"; + par = 0; + params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); + PARCHK(par, params); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); + } else { + sel = "select " + "markerid,poolinstance,workinfoidend,workinfoidstart," + "description,status" + HISTORYDATECONTROL + " from workmarkers"; + res = PQexec(conn, sel, CKPQ_READ); + } rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -8974,6 +8989,9 @@ bool workmarkers_fill(PGconn *conn) K_WUNLOCK(workmarkers_free); PQclear(res); + for (i = 0; i < par; i++) + free(params[i]); + par = 0; if (ok) { LOGDEBUG("%s(): built", __func__);