Browse Source

ckdb - auto aging

master
kanoi 11 years ago
parent
commit
7c00327693
  1. 4
      sql/reloadstatus.sql
  2. 179
      src/ckdb.c

4
sql/reloadstatus.sql

@ -1,5 +1,5 @@
select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete firstshare" from sharesummary where complete = 'n';
select 'sharesummary' as "sharesummary",max(firstshare) as "max complete firstshare" from sharesummary where complete != 'n';
select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete firstshare",max(firstshare) as "(max incomplete firstshare)" from sharesummary where complete = 'n';
select 'sharesummary' as "sharesummary",max(firstshare) as "max complete firstshare",min(firstshare) as "(min complete firstshare)" from sharesummary where complete != 'n';
select 'workinfo' as "workinfo",max(createdate) as "max createdate" from workinfo;
select 'auths' as "auths",max(createdate) as "max createdate" from auths;
select 'poolstats' as "poolstats",max(createdate) as "max createdate" from poolstats;

179
src/ckdb.c

@ -47,7 +47,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.7"
#define CKDB_VERSION DB_VERSION"-0.101"
#define CKDB_VERSION DB_VERSION"-0.102"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -3484,24 +3484,27 @@ static cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
* not affect current ckdb code
* TODO: This will happen until auto aging is added here - since ckpool can not reliably
* age all workinfo that was active when it exits (e.g. a crash) so best to not
* try, and get ckdb to auto age old unaged data
* try, but get ckdb to auto age old unaged data
*/
static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t workinfoid;
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
SHARESUMMARY sharesummary;
SHARES shares;
bool ok = false, conned = false, skipupdate;
char error[1024];
LOGDEBUG("%s(): complete", __func__);
LOGDEBUG("%s(): age", __func__);
TXT_TO_BIGINT("workinfoid", workinfoidstr, workinfoid);
ss_first->tv_sec = ss_first->tv_usec =
ss_last->tv_sec = ss_last->tv_usec = 0;
*ss_count = *s_count = *s_diff = 0;
wi_item = find_workinfo(workinfoid);
if (!wi_item) {
@ -3563,6 +3566,15 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance,
DATA_SHARESUMMARY(ss_item)->workername,
DATA_SHARESUMMARY(ss_item)->workinfoid);
ok = false;
} else {
(*ss_count)++;
*s_count += DATA_SHARESUMMARY(ss_item)->sharecount;
*s_diff += DATA_SHARESUMMARY(ss_item)->diffacc;
if (ss_first->tv_sec == 0 ||
!tv_newer(ss_first, &(DATA_SHARESUMMARY(ss_item)->firstshare)))
copy_tv(ss_first, &(DATA_SHARESUMMARY(ss_item)->firstshare));
if (tv_newer(ss_last, &(DATA_SHARESUMMARY(ss_item)->lastshare)))
copy_tv(ss_last, &(DATA_SHARESUMMARY(ss_item)->lastshare));
}
}
@ -3612,10 +3624,10 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance,
/* If all were already aged, and no shares
* then we don't want a message */
if (!(ss_already == ss_tot && shares_tot == 0)) {
LOGERR("%s(): Summary aging of %s/%s sstotal=%"PRId64
LOGERR("%s(): Summary aging of %"PRId64"/%s sstotal=%"PRId64
" already=%"PRId64" failed=%"PRId64
", sharestotal=%"PRId64" dumped=%"PRId64,
__func__, workinfoidstr, poolinstance, ss_tot,
__func__, workinfoid, poolinstance, ss_tot,
ss_already, ss_failed, shares_tot,
shares_dumped);
}
@ -3624,6 +3636,126 @@ bye:
return ok;
}
static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int repeat;
char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ];
int64_t ss_count_tot, s_count_tot, s_diff_tot;
int64_t ss_count, s_count, s_diff;
tv_t ss_first_min, ss_last_max;
tv_t ss_first, ss_last;
int32_t wid_count;
K_TREE_CTX ctx[1];
K_ITEM *ss_item;
int64_t age_id, do_id, to_id;
bool ok;
LOGDEBUG("%s(): %"PRId64, __func__, workinfoid);
ss_first_min.tv_sec = ss_first_min.tv_usec =
ss_last_max.tv_sec = ss_last_max.tv_usec = 0;
ss_count_tot = s_count_tot = s_diff_tot = 0;
age_id = 0;
// Find the oldest 'unaged' sharesummary < workinfoid
ss_item = first_in_ktree(sharesummary_workinfoid_root, ctx);
while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid < workinfoid) {
if (DATA_SHARESUMMARY(ss_item)->complete[0] == SUMMARY_NEW) {
age_id = DATA_SHARESUMMARY(ss_item)->workinfoid;
break;
}
ss_item = next_in_ktree(ctx);
}
LOGDEBUG("%s(): age_id=%"PRId64, __func__, age_id);
/* Process all the consecutive sharesummaries that's aren't aged
* This way we find each oldest 'batch' of sharesummaries that have
* been missed and can report the range of data that was aged,
* which would normally just be an approx 10min set of workinfoids
* from the last time ckpool stopped
* Each next group of unaged sharesummaries following this, will be
* picked up by each next aging */
if (age_id) {
wid_count = 0;
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(conn, do_id, poolinstance,
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
ss_count_tot += ss_count;
s_count_tot += s_count;
s_diff_tot += s_diff;
if (ss_first_min.tv_sec == 0 || !tv_newer(&ss_first_min, &ss_first))
copy_tv(&ss_first_min, &ss_first);
if (tv_newer(&ss_last_max, &ss_last))
copy_tv(&ss_last_max, &ss_last);
if (!ok)
break;
to_id = do_id;
wid_count++;
while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == to_id)
ss_item = next_in_ktree(ctx);
if (ss_item) {
do_id = DATA_SHARESUMMARY(ss_item)->workinfoid;
if (do_id >= workinfoid)
break;
if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW)
break;
}
} while (ss_item);
if (to_id == 0) {
if (last_attempted_id != age_id || ++repeat >= 10) {
// Approx once every 5min since workinfo defaults to ~30s
LOGWARNING("%s() Auto-age failed to age %"PRId64,
__func__, age_id);
last_attempted_id = age_id;
repeat = 0;
}
} else {
char idrange[64];
char sharerange[256];
if (to_id != age_id) {
snprintf(idrange, sizeof(idrange),
"from %"PRId64" to %"PRId64,
age_id, to_id);
} else {
snprintf(idrange, sizeof(idrange),
"%"PRId64, age_id);
}
tv_to_buf(&ss_first_min, min_buf, sizeof(min_buf));
if (tv_equal(&ss_first_min, &ss_last_max)) {
snprintf(sharerange, sizeof(sharerange),
"share date %s", min_buf);
} else {
tv_to_buf(&ss_last_max, max_buf, sizeof(max_buf));
snprintf(sharerange, sizeof(sharerange),
"share dates %s to %s",
min_buf, max_buf);
}
LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") "
"share%s %d sharesummar%s %d workinfoid%s "
"%s %s",
__func__,
s_count_tot, s_diff_tot,
(s_count_tot == 1) ? "" : "s",
ss_count_tot,
(ss_count_tot == 1) ? "y" : "ies",
wid_count,
(wid_count == 1) ? "" : "s",
idrange, sharerange);
}
}
}
static bool workinfo_fill(PGconn *conn)
{
ExecStatusType rescode;
@ -6401,7 +6533,8 @@ static bool check_db_version(PGconn *conn)
PQclear(res);
LOGWARNING("%s(): DB version (%s) correct", __func__, DB_VERSION);
LOGWARNING("%s(): DB version (%s) correct (CKDB V%s)",
__func__, DB_VERSION, CKDB_VERSION);
return true;
}
@ -7793,6 +7926,8 @@ seconf:
return strdup(reply);
} else if (strcasecmp(cmd, STR_AGEWORKINFO) == 0) {
K_ITEM *i_workinfoid, *i_poolinstance;
int64_t ss_count, s_count, s_diff;
tv_t ss_first, ss_last;
bool ok;
if (reloading && !confirm_sharesummary) {
@ -7816,21 +7951,29 @@ seconf:
if (!i_poolinstance)
return strdup(reply);
ok = workinfo_age(conn, DATA_TRANSFER(i_workinfoid)->data,
TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid);
ok = workinfo_age(conn, workinfoid,
DATA_TRANSFER(i_poolinstance)->data,
by, code, inet, cd);
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
if (!ok) {
LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id);
return strdup("failed.DATA");
} else {
// Don't slow down the reload - do them later
if (!reloading) {
// Aging is a queued item so the reply is ignored
auto_age_older(conn, workinfoid,
DATA_TRANSFER(i_poolinstance)->data,
by, code, inet, cd);
}
}
LOGDEBUG("%s.ok.aged %.*s",
id, BIGINT_BUFSIZ,
DATA_TRANSFER(i_workinfoid)->data);
LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid);
awconf:
snprintf(reply, siz, "ok.%.*s",
BIGINT_BUFSIZ,
DATA_TRANSFER(i_workinfoid)->data);
snprintf(reply, siz, "ok.%"PRId64, workinfoid);
return strdup(reply);
}

Loading…
Cancel
Save