From 7c003276936c9107f5dc72a6b3cddbea25520879 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 29 Aug 2014 14:01:50 +1000 Subject: [PATCH] ckdb - auto aging --- sql/reloadstatus.sql | 4 +- src/ckdb.c | 179 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 163 insertions(+), 20 deletions(-) diff --git a/sql/reloadstatus.sql b/sql/reloadstatus.sql index 2432090c..e7df33bd 100644 --- a/sql/reloadstatus.sql +++ b/sql/reloadstatus.sql @@ -1,5 +1,5 @@ -select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete firstshare" from sharesummary where complete = 'n'; -select 'sharesummary' as "sharesummary",max(firstshare) as "max complete firstshare" from sharesummary where complete != 'n'; +select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete firstshare",max(firstshare) as "(max incomplete firstshare)" from sharesummary where complete = 'n'; +select 'sharesummary' as "sharesummary",max(firstshare) as "max complete firstshare",min(firstshare) as "(min complete firstshare)" from sharesummary where complete != 'n'; select 'workinfo' as "workinfo",max(createdate) as "max createdate" from workinfo; select 'auths' as "auths",max(createdate) as "max createdate" from auths; select 'poolstats' as "poolstats",max(createdate) as "max createdate" from poolstats; diff --git a/src/ckdb.c b/src/ckdb.c index ad86f65f..f06c85b5 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -47,7 +47,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.7" -#define CKDB_VERSION DB_VERSION"-0.101" +#define CKDB_VERSION DB_VERSION"-0.102" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -3484,24 +3484,27 @@ static cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); * not affect current ckdb code * TODO: This will happen until auto aging is added here - since ckpool can not reliably * age all workinfo that was active when it exits (e.g. a crash) so best to not - * try, and get ckdb to auto age old unaged data + * try, but get ckdb to auto age old unaged data */ -static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd) +static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, + char *by, char *code, char *inet, tv_t *cd, + tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, + int64_t *s_count, int64_t *s_diff) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item; K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1]; char cd_buf[DATE_BUFSIZ]; - int64_t workinfoid; int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped; SHARESUMMARY sharesummary; SHARES shares; bool ok = false, conned = false, skipupdate; char error[1024]; - LOGDEBUG("%s(): complete", __func__); + LOGDEBUG("%s(): age", __func__); - TXT_TO_BIGINT("workinfoid", workinfoidstr, workinfoid); + ss_first->tv_sec = ss_first->tv_usec = + ss_last->tv_sec = ss_last->tv_usec = 0; + *ss_count = *s_count = *s_diff = 0; wi_item = find_workinfo(workinfoid); if (!wi_item) { @@ -3563,6 +3566,15 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, DATA_SHARESUMMARY(ss_item)->workername, DATA_SHARESUMMARY(ss_item)->workinfoid); ok = false; + } else { + (*ss_count)++; + *s_count += DATA_SHARESUMMARY(ss_item)->sharecount; + *s_diff += DATA_SHARESUMMARY(ss_item)->diffacc; + if (ss_first->tv_sec == 0 || + !tv_newer(ss_first, &(DATA_SHARESUMMARY(ss_item)->firstshare))) + copy_tv(ss_first, &(DATA_SHARESUMMARY(ss_item)->firstshare)); + if (tv_newer(ss_last, &(DATA_SHARESUMMARY(ss_item)->lastshare))) + copy_tv(ss_last, &(DATA_SHARESUMMARY(ss_item)->lastshare)); } } @@ -3612,10 +3624,10 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, /* If all were already aged, and no shares * then we don't want a message */ if (!(ss_already == ss_tot && shares_tot == 0)) { - LOGERR("%s(): Summary aging of %s/%s sstotal=%"PRId64 + LOGERR("%s(): Summary aging of %"PRId64"/%s sstotal=%"PRId64 " already=%"PRId64" failed=%"PRId64 ", sharestotal=%"PRId64" dumped=%"PRId64, - __func__, workinfoidstr, poolinstance, ss_tot, + __func__, workinfoid, poolinstance, ss_tot, ss_already, ss_failed, shares_tot, shares_dumped); } @@ -3624,6 +3636,126 @@ bye: return ok; } +static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, + char *by, char *code, char *inet, tv_t *cd) +{ + static int64_t last_attempted_id = -1; + static int repeat; + + char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ]; + int64_t ss_count_tot, s_count_tot, s_diff_tot; + int64_t ss_count, s_count, s_diff; + tv_t ss_first_min, ss_last_max; + tv_t ss_first, ss_last; + int32_t wid_count; + K_TREE_CTX ctx[1]; + K_ITEM *ss_item; + int64_t age_id, do_id, to_id; + bool ok; + + LOGDEBUG("%s(): %"PRId64, __func__, workinfoid); + + ss_first_min.tv_sec = ss_first_min.tv_usec = + ss_last_max.tv_sec = ss_last_max.tv_usec = 0; + ss_count_tot = s_count_tot = s_diff_tot = 0; + + age_id = 0; + // Find the oldest 'unaged' sharesummary < workinfoid + ss_item = first_in_ktree(sharesummary_workinfoid_root, ctx); + while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid < workinfoid) { + if (DATA_SHARESUMMARY(ss_item)->complete[0] == SUMMARY_NEW) { + age_id = DATA_SHARESUMMARY(ss_item)->workinfoid; + break; + } + ss_item = next_in_ktree(ctx); + } + + LOGDEBUG("%s(): age_id=%"PRId64, __func__, age_id); + /* Process all the consecutive sharesummaries that's aren't aged + * This way we find each oldest 'batch' of sharesummaries that have + * been missed and can report the range of data that was aged, + * which would normally just be an approx 10min set of workinfoids + * from the last time ckpool stopped + * Each next group of unaged sharesummaries following this, will be + * picked up by each next aging */ + if (age_id) { + wid_count = 0; + do_id = age_id; + to_id = 0; + do { + ok = workinfo_age(conn, do_id, poolinstance, + by, code, inet, cd, + &ss_first, &ss_last, + &ss_count, &s_count, &s_diff); + + ss_count_tot += ss_count; + s_count_tot += s_count; + s_diff_tot += s_diff; + if (ss_first_min.tv_sec == 0 || !tv_newer(&ss_first_min, &ss_first)) + copy_tv(&ss_first_min, &ss_first); + if (tv_newer(&ss_last_max, &ss_last)) + copy_tv(&ss_last_max, &ss_last); + + if (!ok) + break; + + to_id = do_id; + wid_count++; + while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == to_id) + ss_item = next_in_ktree(ctx); + + if (ss_item) { + do_id = DATA_SHARESUMMARY(ss_item)->workinfoid; + if (do_id >= workinfoid) + break; + if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW) + break; + } + } while (ss_item); + if (to_id == 0) { + if (last_attempted_id != age_id || ++repeat >= 10) { + // Approx once every 5min since workinfo defaults to ~30s + LOGWARNING("%s() Auto-age failed to age %"PRId64, + __func__, age_id); + last_attempted_id = age_id; + repeat = 0; + } + } else { + char idrange[64]; + char sharerange[256]; + if (to_id != age_id) { + snprintf(idrange, sizeof(idrange), + "from %"PRId64" to %"PRId64, + age_id, to_id); + } else { + snprintf(idrange, sizeof(idrange), + "%"PRId64, age_id); + } + tv_to_buf(&ss_first_min, min_buf, sizeof(min_buf)); + if (tv_equal(&ss_first_min, &ss_last_max)) { + snprintf(sharerange, sizeof(sharerange), + "share date %s", min_buf); + } else { + tv_to_buf(&ss_last_max, max_buf, sizeof(max_buf)); + snprintf(sharerange, sizeof(sharerange), + "share dates %s to %s", + min_buf, max_buf); + } + LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") " + "share%s %d sharesummar%s %d workinfoid%s " + "%s %s", + __func__, + s_count_tot, s_diff_tot, + (s_count_tot == 1) ? "" : "s", + ss_count_tot, + (ss_count_tot == 1) ? "y" : "ies", + wid_count, + (wid_count == 1) ? "" : "s", + idrange, sharerange); + } + } +} + static bool workinfo_fill(PGconn *conn) { ExecStatusType rescode; @@ -6401,7 +6533,8 @@ static bool check_db_version(PGconn *conn) PQclear(res); - LOGWARNING("%s(): DB version (%s) correct", __func__, DB_VERSION); + LOGWARNING("%s(): DB version (%s) correct (CKDB V%s)", + __func__, DB_VERSION, CKDB_VERSION); return true; } @@ -7793,6 +7926,8 @@ seconf: return strdup(reply); } else if (strcasecmp(cmd, STR_AGEWORKINFO) == 0) { K_ITEM *i_workinfoid, *i_poolinstance; + int64_t ss_count, s_count, s_diff; + tv_t ss_first, ss_last; bool ok; if (reloading && !confirm_sharesummary) { @@ -7816,21 +7951,29 @@ seconf: if (!i_poolinstance) return strdup(reply); - ok = workinfo_age(conn, DATA_TRANSFER(i_workinfoid)->data, + TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid); + + ok = workinfo_age(conn, workinfoid, DATA_TRANSFER(i_poolinstance)->data, - by, code, inet, cd); + by, code, inet, cd, + &ss_first, &ss_last, + &ss_count, &s_count, &s_diff); if (!ok) { LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); + } else { + // Don't slow down the reload - do them later + if (!reloading) { + // Aging is a queued item so the reply is ignored + auto_age_older(conn, workinfoid, + DATA_TRANSFER(i_poolinstance)->data, + by, code, inet, cd); + } } - LOGDEBUG("%s.ok.aged %.*s", - id, BIGINT_BUFSIZ, - DATA_TRANSFER(i_workinfoid)->data); + LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid); awconf: - snprintf(reply, siz, "ok.%.*s", - BIGINT_BUFSIZ, - DATA_TRANSFER(i_workinfoid)->data); + snprintf(reply, siz, "ok.%"PRId64, workinfoid); return strdup(reply); }