Browse Source

ckdb - fix workmarker handling, ageing, stopping early and allow limiting workmarker loading with -w

master
kanoi 8 years ago
parent
commit
252233a318
  1. 74
      src/ckdb.c
  2. 2
      src/ckdb.h
  3. 150
      src/ckdb_data.c
  4. 40
      src/ckdb_dbio.c

74
src/ckdb.c

@ -746,6 +746,8 @@ K_TREE *workmarkers_root;
K_TREE *workmarkers_workinfoid_root;
K_LIST *workmarkers_free;
K_STORE *workmarkers_store;
// static for key_update
static K_STORE *workmarkers_key_store;
// MARKS
K_TREE *marks_root;
@ -1963,6 +1965,10 @@ static void dealloc_storage()
FREE_TREE(workmarkers_workinfoid);
FREE_TREE(workmarkers);
if (workmarkers_key_store) {
k_list_transfer_to_tail_nolock(workmarkers_key_store,
workmarkers_store);
}
FREE_STORE_DATA(workmarkers);
FREE_LIST_DATA(workmarkers);
@ -6776,29 +6782,72 @@ sayonara:
static bool make_keysummaries()
{
K_TREE_CTX ctx[1];
KEYSHARESUMMARY *keysharesummary;
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL;
K_ITEM *kss_item, *wm_item, *wm_last = NULL;
tv_t proc_lock_stt, proc_lock_got, proc_lock_fin, now;
bool ok = false;
int64_t kss_ready_wid;
bool ok = false, pending;
// Find the highest complete keysharesummary workinfoid
kss_ready_wid = 0;
K_RLOCK(keysharesummary_free);
kss_item = first_in_ktree(keysharesummary_root, ctx);
while (kss_item) {
DATA_KEYSHARESUMMARY(keysharesummary, kss_item);
if (keysharesummary->complete[0] == SUMMARY_COMPLETE &&
kss_ready_wid < keysharesummary->workinfoid) {
kss_ready_wid = keysharesummary->workinfoid;
}
kss_item = next_in_ktree(ctx);
}
K_RUNLOCK(keysharesummary_free);
if (kss_ready_wid > 0 && workmarkers_key_store->count > 0) {
pending = true;
wm_item = STORE_HEAD_NOLOCK(workmarkers_key_store);
while (wm_item) {
DATA_WORKMARKERS(workmarkers, wm_item);
if (workmarkers->workinfoidend > kss_ready_wid)
break;
// move the item into the processing trees/store
k_unlink_item_nolock(workmarkers_key_store, wm_item);
K_WLOCK(workmarkers_free);
add_to_ktree(workmarkers_root, wm_item);
add_to_ktree(workmarkers_workinfoid_root, wm_item);
k_add_head(workmarkers_store, wm_item);
K_WUNLOCK(workmarkers_free);
wm_item = STORE_HEAD_NOLOCK(workmarkers_key_store);
}
}
pending = false;
K_RLOCK(workmarkers_free);
// Any workmarkers still pending in the key_store?
if (workmarkers_key_store->count > 0)
pending = true;
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
while (wm_item) {
DATA_WORKMARKERS(workmarkers, wm_item);
if (!CURRENT(&(workmarkers->expirydate)))
break;
// find the oldest READY workinfoid
// find the oldest READY workmarker
if (WMREADY(workmarkers->status))
wm_last = wm_item;
wm_item = prev_in_ktree(ctx);
}
K_RUNLOCK(workmarkers_free);
// all false means we've finished
if (!wm_last)
return false;
return (pending || reloading);
DATA_WORKMARKERS(workmarkers, wm_last);
// Not ready to be processed yet
if (kss_ready_wid < workmarkers->workinfoidend)
return true;
LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/"
"Stt %"PRId64"/%s/%s",
__func__, workmarkers->markerid, workmarkers->poolinstance,
@ -6861,8 +6910,9 @@ static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin)
bool status_failure = false;
/* Now that we know the workmarkers of interest,
* switch them from MARKER_PROCESSED to MARKER_READY
* and remove all after
* switch them from MARKER_PROCESSED to MARKER_READY,
* remove them and add them to key_store,
* then remove all after them
* The markersummaries already exist
* We are generating the missing keysummaries */
K_WLOCK(workmarkers_free);
@ -6889,8 +6939,13 @@ static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin)
workmarkers->status,
MARKER_PROCESSED);
} else {
// Not part of either tree key
remove_from_ktree(workmarkers_workinfoid_root,
wm_item);
remove_from_ktree(workmarkers_root, wm_item);
k_unlink_item(workmarkers_store, wm_item);
STRNCPY(workmarkers->status, MARKER_READY_STR);
// key_store will be in workinfoid ascending order
k_add_head(workmarkers_key_store, wm_item);
}
} else
break;
@ -7063,6 +7118,9 @@ static void update_keysummary()
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
// Must exist (but will be empty)
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(breakqueue, PRIO_TERMINAL);
@ -7079,6 +7137,8 @@ static void update_keysummary()
alloc_storage();
workmarkers_key_store = k_new_store(workmarkers_free);
setnow(&db_stt);
if (!getdata1() || everyone_die)

2
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.301"
#define CKDB_VERSION DB_VERSION"-2.302"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__

150
src/ckdb_data.c

@ -2631,6 +2631,151 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
return item;
}
// key_update must age keysharesummary directly
static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int64_t prev_found = 0;
static int repeat;
char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ];
int64_t kss_count_tot, s_count_tot, s_diff_tot;
int64_t kss_count, s_count, s_diff;
tv_t kss_first_min, kss_last_max;
tv_t kss_first, kss_last;
int32_t wid_count;
KEYSHARESUMMARY lookkeysharesummary, *keysharesummary;
K_TREE_CTX ctx[1];
K_ITEM look, *kss_item;
int64_t age_id, do_id, to_id;
bool ok, found;
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found;
/* Find the oldest 'unaged'
* keysharesummary < workinfoid and >= prev_found */
lookkeysharesummary.workinfoid = prev_found;
lookkeysharesummary.keytype[0] = '\0';
lookkeysharesummary.key = EMPTY;
INIT_KEYSHARESUMMARY(&look);
look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free);
kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
DATE_ZERO(&kss_first_min);
DATE_ZERO(&kss_last_max);
kss_count_tot = s_count_tot = s_diff_tot = 0;
found = false;
while (kss_item && keysharesummary->workinfoid < workinfoid) {
if (keysharesummary->complete[0] == SUMMARY_NEW) {
age_id = keysharesummary->workinfoid;
prev_found = age_id;
found = true;
break;
}
kss_item = next_in_ktree(ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
}
K_RUNLOCK(keysharesummary_free);
LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found);
// Don't repeat searching old items to avoid accessing their ram
if (!found)
prev_found = workinfoid;
else {
/* Process all the consecutive keysharesummaries that's aren't aged
* This way we find each oldest 'batch' of keysharesummaries that have
* been missed and can report the range of data that was aged,
* which would normally just be an approx 10min set of workinfoids
* from the last time ckpool stopped
* Each next group of unaged keysharesummaries following this, will be
* picked up by each next aging */
wid_count = 0;
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(do_id, poolinstance, by, code, inet,
cd, &kss_first, &kss_last, &kss_count,
&s_count, &s_diff);
kss_count_tot += kss_count;
s_count_tot += s_count;
s_diff_tot += s_diff;
if (kss_first_min.tv_sec == 0 || !tv_newer(&kss_first_min, &kss_first))
copy_tv(&kss_first_min, &kss_first);
if (tv_newer(&kss_last_max, &kss_last))
copy_tv(&kss_last_max, &kss_last);
if (!ok)
break;
to_id = do_id;
wid_count++;
K_RLOCK(keysharesummary_free);
while (kss_item && keysharesummary->workinfoid == to_id) {
kss_item = next_in_ktree(ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
}
K_RUNLOCK(keysharesummary_free);
if (kss_item) {
do_id = keysharesummary->workinfoid;
if (do_id >= workinfoid)
break;
if (keysharesummary->complete[0] != SUMMARY_NEW)
break;
}
} while (kss_item);
if (to_id == 0) {
if (last_attempted_id != age_id || ++repeat >= 10) {
// Approx once every 5min since workinfo defaults to ~30s
LOGWARNING("%s() Auto-age failed to age %"PRId64,
__func__, age_id);
last_attempted_id = age_id;
repeat = 0;
}
} else {
char idrange[64];
char keysharerange[256];
if (to_id != age_id) {
snprintf(idrange, sizeof(idrange),
"from %"PRId64" to %"PRId64,
age_id, to_id);
} else {
snprintf(idrange, sizeof(idrange),
"%"PRId64, age_id);
}
tv_to_buf(&kss_first_min, min_buf, sizeof(min_buf));
if (tv_equal(&kss_first_min, &kss_last_max)) {
snprintf(keysharerange, sizeof(keysharerange),
"share date %s", min_buf);
} else {
tv_to_buf(&kss_last_max, max_buf, sizeof(max_buf));
snprintf(keysharerange, sizeof(keysharerange),
"share dates %s to %s",
min_buf, max_buf);
}
LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") "
"share%s %"PRId64" keysharesummar%s %"PRId32
" workinfoid%s %s %s",
__func__,
s_count_tot, s_diff_tot,
(s_count_tot == 1) ? "" : "s",
kss_count_tot,
(kss_count_tot == 1) ? "y" : "ies",
wid_count,
(wid_count == 1) ? "" : "s",
idrange, keysharerange);
}
}
}
/* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */
void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
@ -2652,6 +2797,11 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
int64_t age_id, do_id, to_id;
bool ok, found;
if (key_update) {
key_auto_age_older(workinfoid, poolinstance, by, code, inet, cd);
return;
}
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found;

40
src/ckdb_dbio.c

@ -3188,9 +3188,10 @@ bool workinfo_fill(PGconn *conn)
" from workinfo where "EDDB"=$1 and"
" ((workinfoid>=$2 and workinfoid<=$3)");
// If we aren't loading the full range, ensure the necessary ones are loaded
if ((!dbload_only_sharesummary && dbload_workinfoid_start != -1) ||
dbload_workinfoid_finish != MAXID) {
/* If we aren't loading the full range, ensure the necessary ones are loaded
* However, don't for key_update to allow a possible lower memory profile */
if (((!dbload_only_sharesummary && dbload_workinfoid_start != -1) ||
dbload_workinfoid_finish != MAXID) && !key_update) {
APPEND_REALLOC(sel, off, len,
// we need all blocks workinfoids
" or workinfoid in (select workinfoid from blocks)"
@ -4825,6 +4826,7 @@ dokey:
} else {
// Not part of either tree key
STRNCPY(workmarkers->status, MARKER_PROCESSED_STR);
ok = true;
}
setnow(&kdb_fin);
rollback:
@ -8831,7 +8833,8 @@ bool workmarkers_fill(PGconn *conn)
PGresult *res;
K_ITEM *item, *wi_item;
WORKINFO *workinfo;
int n, i;
char *params[1];
int n, i, par = 0;
WORKMARKERS *row;
char *field;
char *sel;
@ -8840,13 +8843,25 @@ bool workmarkers_fill(PGconn *conn)
LOGDEBUG("%s(): select", __func__);
// TODO: limit how far back
sel = "select "
"markerid,poolinstance,workinfoidend,workinfoidstart,"
"description,status"
HISTORYDATECONTROL
" from workmarkers";
res = PQexec(conn, sel, CKPQ_READ);
// Allow limiting the load for key_update
if (key_update && dbload_workinfoid_start != -1) {
sel = "select "
"markerid,poolinstance,workinfoidend,workinfoidstart,"
"description,status"
HISTORYDATECONTROL
" from workmarkers where workinfoidstart>=$1";
par = 0;
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
} else {
sel = "select "
"markerid,poolinstance,workinfoidend,workinfoidstart,"
"description,status"
HISTORYDATECONTROL
" from workmarkers";
res = PQexec(conn, sel, CKPQ_READ);
}
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
@ -8974,6 +8989,9 @@ bool workmarkers_fill(PGconn *conn)
K_WUNLOCK(workmarkers_free);
PQclear(res);
for (i = 0; i < par; i++)
free(params[i]);
par = 0;
if (ok) {
LOGDEBUG("%s(): built", __func__);

Loading…
Cancel
Save