Browse Source

ckdb - fix incorrect userstats reload date

master
kanoi 10 years ago
parent
commit
91d37f5894
  1. 2
      sql/reloadstatus.sql
  2. 166
      src/ckdb.c

2
sql/reloadstatus.sql

@ -2,4 +2,4 @@ select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete first
select 'workinfo' as "workinfo",max(createdate) as "max createdate" from workinfo; select 'workinfo' as "workinfo",max(createdate) as "max createdate" from workinfo;
select 'auths' as "auths",max(createdate) as "max createdate" from auths; select 'auths' as "auths",max(createdate) as "max createdate" from auths;
select 'poolstats' as "poolstats",max(createdate) as "max createdate" from poolstats; select 'poolstats' as "poolstats",max(createdate) as "max createdate" from poolstats;
select 'userstats' as "userstats",max(statsdate) as "max statsdate - start of this hour" ,userid,workername from userstats group by userid,workername; select 'userstats' as "userstats",max(statsdate) as "max statsdate - start of this hour" from userstats;

166
src/ckdb.c

@ -153,10 +153,10 @@ static char *restorefrom;
* TODO: subtract how much we need in RAM of the 'between' * TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting * non db records - will depend on TODO: pool stats reporting
* requirements * requirements
* DB+RAM userstats: for each pool/user/worker it would be the start * DB+RAM userstats: start of the time band of the latest DB record,
* of the time band containing the latest DB statsdate, * since all data before this has been summarised to the DB
* since all previous data was summarised to the DB and RAM and * The userstats summarisation always processes the oldest
* deleted - use the oldest of these for all pools/users/workers * RAM data to the DB
* TODO: multiple pools is not yet handled by ckdb * TODO: multiple pools is not yet handled by ckdb
* TODO: handle a pool restart with a different instance name * TODO: handle a pool restart with a different instance name
* since this would always make the userstats reload point * since this would always make the userstats reload point
@ -216,17 +216,11 @@ typedef struct loadstatus {
tv_t newest_createdate_workinfo; tv_t newest_createdate_workinfo;
tv_t newest_createdate_auths; tv_t newest_createdate_auths;
tv_t newest_createdate_poolstats; tv_t newest_createdate_poolstats;
tv_t userstats; tv_t newest_starttimeband_userstats;
tv_t newest_createdate_blocks; tv_t newest_createdate_blocks;
} LOADSTATUS; } LOADSTATUS;
static LOADSTATUS dbstatus; static LOADSTATUS dbstatus;
/* Temporary while doing reload - it (of course) contains the fields
* required to track the newest userstats per user/worker
*/
static K_TREE *userstats_db_root;
static K_STORE *userstats_db;
// size limit on the command string // size limit on the command string
#define CMD_SIZ 31 #define CMD_SIZ 31
#define ID_SIZ 31 #define ID_SIZ 31
@ -1895,8 +1889,6 @@ static void workerstatus_ready()
ws_item = next_in_ktree(ws_ctx); ws_item = next_in_ktree(ws_ctx);
} }
free_ktree(userstats_workerstatus_root, NULL);
} }
static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats, static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats,
@ -4844,7 +4836,8 @@ static double cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b)
} }
/* order by userid asc,workername asc,statsdate asc,poolinstance asc /* order by userid asc,workername asc,statsdate asc,poolinstance asc
built during data load to update workerstatus at the end of the load */ built during data load to update workerstatus at the end of the load
and used during reload to discard stats already in the DB */
static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b) static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b)
{ {
double c = (double)(DATA_USERSTATS(a)->userid - double c = (double)(DATA_USERSTATS(a)->userid -
@ -4916,10 +4909,10 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *hashrate1hr, char *hashrate24hr, bool idle, char *hashrate1hr, char *hashrate24hr, bool idle,
bool eos, char *by, char *code, char *inet, tv_t *cd) bool eos, char *by, char *code, char *inet, tv_t *cd)
{ {
K_ITEM *us_item, *u_item, *us_match, *us_next, *db_match; K_ITEM *us_item, *u_item, *us_match, *us_next, look;
K_TREE_CTX ctx[1];
USERSTATS *row;
tv_t eosdate; tv_t eosdate;
USERSTATS *row, cmp;
K_TREE_CTX ctx[1];
LOGDEBUG("%s(): add", __func__); LOGDEBUG("%s(): add", __func__);
@ -4955,22 +4948,28 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
} }
if (reloading) { if (reloading) {
/* If the db load said the statsdate for this userid+workername memcpy(&cmp, row, sizeof(cmp));
* is already summarised then we discard it */ look.data = (void *)(&cmp);
db_match = find_in_ktree(userstats_db_root, us_item, // Just zero it to ensure the DB record is after it, not equal to it
cmp_userstats_workername, ctx); cmp.statsdate.tv_usec = 0;
if (db_match && /* If there is a matching user+worker DB record summarising this row,
!tv_newer(&(DATA_USERSTATS(db_match)->statsdate), cd)) { * or a matching user+worker DB record next after this row, discard it */
K_WLOCK(userstats_free); us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
k_add_head(userstats_free, us_item); cmp_userstats_workerstatus, ctx);
K_WUNLOCK(userstats_free); if (us_match &&
DATA_USERSTATS(us_match)->userid == row->userid &&
/* If this was an eos record and eos_store has data, strcmp(DATA_USERSTATS(us_match)->workername, row->workername) == 0 &&
* it means we need to process the eos_store */ DATA_USERSTATS(us_match)->summarylevel[0] != SUMMARY_NONE) {
if (eos && userstats_eos_store->count > 0) K_WLOCK(userstats_free);
goto advancetogo; k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
return true; /* If this was an eos record and eos_store has data,
* it means we need to process the eos_store */
if (eos && userstats_eos_store->count > 0)
goto advancetogo;
return true;
} }
} }
@ -5038,53 +5037,31 @@ advancetogo:
return true; return true;
} }
// Requires K_WLOCK(userstats_free) static bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate)
static void userstats_update_ccl(USERSTATS *row)
{ {
USERSTATS userstats, *tmp;
K_TREE_CTX ctx[1];
K_ITEM look, *item;
char buf[DATE_BUFSIZ+1]; char buf[DATE_BUFSIZ+1];
userstats.userid = row->userid; copy_tv(statsdate, &(row->statsdate));
STRNCPY(userstats.workername, row->workername);
copy_tv(&(userstats.statsdate), &(row->statsdate));
// Start of this timeband // Start of this timeband
switch (row->summarylevel[0]) { switch (row->summarylevel[0]) {
case SUMMARY_DB: case SUMMARY_DB:
userstats.statsdate.tv_sec -= userstats.statsdate.tv_sec % USERSTATS_DB_S; statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S;
userstats.statsdate.tv_usec = 0; statsdate->tv_usec = 0;
break; break;
case SUMMARY_FULL: case SUMMARY_FULL:
userstats.statsdate.tv_sec -= userstats.statsdate.tv_sec % USERSTATS_DB_DS; statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS;
userstats.statsdate.tv_usec = 0; statsdate->tv_usec = 0;
break; break;
default: default:
tv_to_buf(&(row->statsdate), buf, sizeof(buf)); tv_to_buf(statsdate, buf, sizeof(buf));
// Bad userstats are not fatal // Bad userstats are not fatal
LOGERR("Unknown userstats summarylevel '%c' " LOGERR("Unknown userstats summarylevel 0x%02x '%c' "
"userid "PRId64" workername %s statsdate %s", "userid %"PRId64" workername %s statsdate %s",
row->summarylevel[0], row->userid, row->summarylevel[0], row->summarylevel[0],
row->workername, buf); row->userid, row->workername, buf);
return; return false;
}
look.data = (void *)(&userstats);
item = find_in_ktree(userstats_db_root, &look, cmp_userstats_workername, ctx);
if (item) {
tmp = DATA_USERSTATS(item);
if (tv_newer(&(tmp->statsdate), &(userstats.statsdate)))
copy_tv(&(tmp->statsdate), &(userstats.statsdate));
} else {
item = k_unlink_head(userstats_free);
tmp = DATA_USERSTATS(item);
bzero(tmp, sizeof(*tmp));
tmp->userid = userstats.userid;
STRNCPY(tmp->workername, userstats.workername);
copy_tv(&(tmp->statsdate), &(userstats.statsdate));
userstats_db_root = add_to_ktree(userstats_db_root, item,
cmp_userstats_workername);
k_add_head(userstats_db, item);
} }
return true;
} }
// TODO: data selection - only require ? // TODO: data selection - only require ?
@ -5095,6 +5072,7 @@ static bool userstats_fill(PGconn *conn)
K_ITEM *item; K_ITEM *item;
int n, i; int n, i;
USERSTATS *row; USERSTATS *row;
tv_t statsdate;
char *field; char *field;
char *sel; char *sel;
int fields = 10; int fields = 10;
@ -5202,7 +5180,10 @@ static bool userstats_fill(PGconn *conn)
k_add_head(userstats_store, item); k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row, NULL); workerstatus_update(NULL, NULL, row, NULL);
userstats_update_ccl(row); if (userstats_starttimeband(row, &statsdate)) {
if (tv_newer(&(dbstatus.newest_starttimeband_userstats), &statsdate))
copy_tv(&(dbstatus.newest_starttimeband_userstats), &statsdate);
}
} }
if (!ok) if (!ok)
k_add_head(userstats_free, item); k_add_head(userstats_free, item);
@ -5348,10 +5329,10 @@ static bool reload()
{ {
char buf[DATE_BUFSIZ+1]; char buf[DATE_BUFSIZ+1];
char *filename; char *filename;
K_ITEM *ccl;
tv_t start; tv_t start;
char *reason;
FILE *fp; FILE *fp;
bool ok; bool ok = true;
tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf);
@ -5361,30 +5342,32 @@ static bool reload()
LOGWARNING("%s(): %s newest DB auths", __func__, buf); LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_starttimeband_userstats), buf, sizeof(buf));
ccl = userstats_db->head; LOGWARNING("%s(): %s newest DB userstats start timeband", __func__, buf);
// oldest in ccl
while (ccl) {
if (dbstatus.userstats.tv_sec == 0 ||
!tv_newer(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate)))
copy_tv(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate));
ccl = ccl->next;
}
tv_to_buf(&(dbstatus.userstats), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf)); tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks", __func__, buf); LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
copy_tv(&start, &(dbstatus.oldest_sharesummary_firstshare_n)); copy_tv(&start, &(dbstatus.oldest_sharesummary_firstshare_n));
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) reason = "sharesummary";
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) {
copy_tv(&start, &(dbstatus.newest_createdate_workinfo)); copy_tv(&start, &(dbstatus.newest_createdate_workinfo));
if (!tv_newer(&start, &(dbstatus.newest_createdate_auths))) reason = "workinfo";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_auths))) {
copy_tv(&start, &(dbstatus.newest_createdate_auths)); copy_tv(&start, &(dbstatus.newest_createdate_auths));
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) reason = "auths";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) {
copy_tv(&start, &(dbstatus.newest_createdate_poolstats)); copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
if (!tv_newer(&start, &(dbstatus.userstats))) reason = "poolstats";
copy_tv(&start, &(dbstatus.userstats)); }
if (!tv_newer(&start, &(dbstatus.newest_starttimeband_userstats))) {
copy_tv(&start, &(dbstatus.newest_starttimeband_userstats));
reason = "userstats";
}
tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
if (start.tv_sec < DATE_BEGIN) { if (start.tv_sec < DATE_BEGIN) {
start.tv_sec = DATE_BEGIN; start.tv_sec = DATE_BEGIN;
@ -5407,9 +5390,6 @@ static bool reload()
} }
ok = reload_from(&start); ok = reload_from(&start);
free_ktree(userstats_db_root, NULL);
k_list_transfer_to_head(userstats_db, userstats_free);
return ok; return ok;
} }
@ -5578,12 +5558,10 @@ static bool setup_data()
userstats_store = k_new_store(userstats_free); userstats_store = k_new_store(userstats_free);
userstats_eos_store = k_new_store(userstats_free); userstats_eos_store = k_new_store(userstats_free);
userstats_summ = k_new_store(userstats_free); userstats_summ = k_new_store(userstats_free);
userstats_db = k_new_store(userstats_free);
userstats_root = new_ktree(); userstats_root = new_ktree();
userstats_statsdate_root = new_ktree(); userstats_statsdate_root = new_ktree();
userstats_workerstatus_root = new_ktree(); userstats_workerstatus_root = new_ktree();
userstats_free->dsp_func = dsp_userstats; userstats_free->dsp_func = dsp_userstats;
userstats_db_root = new_ktree();
workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS),
ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true);
@ -5600,6 +5578,8 @@ static bool setup_data()
workerstatus_ready(); workerstatus_ready();
userstats_workerstatus_root = free_ktree(userstats_workerstatus_root, NULL);
workinfo_current = last_in_ktree(workinfo_height_root, ctx); workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) { if (workinfo_current) {
STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1); STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1);

Loading…
Cancel
Save