Browse Source

ckdb - calculate restart dates and related fixes

master
kanoi 11 years ago
parent
commit
74b5e817a3
  1. 167
      src/ckdb.c

167
src/ckdb.c

@ -119,7 +119,7 @@ static char *restorefrom;
* idcontrol: only userid reuse is critical and the user is added * idcontrol: only userid reuse is critical and the user is added
* immeditately to the DB before replying to the add message * immeditately to the DB before replying to the add message
* *
* Tables that are/will be written straight to the DB, so ar OK: * Tables that are/will be written straight to the DB, so are OK:
* users, useraccounts, paymentaddresses, payments, * users, useraccounts, paymentaddresses, payments,
* accountadjustment, optioncontrol, miningpayouts, * accountadjustment, optioncontrol, miningpayouts,
* eventlog * eventlog
@ -128,7 +128,7 @@ static char *restorefrom;
typedef struct loadstatus { typedef struct loadstatus {
tv_t oldest_sharesummary_firstshare_n; tv_t oldest_sharesummary_firstshare_n;
tv_t newest_createdate_workinfo; tv_t newest_createdate_workinfo;
tv_t newest_createdate_auth; tv_t newest_createdate_auths;
tv_t newest_createdate_poolstats; tv_t newest_createdate_poolstats;
} LOADSTATUS; } LOADSTATUS;
static LOADSTATUS dbstatus; static LOADSTATUS dbstatus;
@ -136,6 +136,7 @@ static LOADSTATUS dbstatus;
/* Temporary while doing restart - it (of course) contains the fields /* Temporary while doing restart - it (of course) contains the fields
* required to track the newest userstats per user/worker * required to track the newest userstats per user/worker
*/ */
static K_TREE *userstats_ccl_root;
static K_STORE *userstats_ccl; static K_STORE *userstats_ccl;
// size limit on the command string // size limit on the command string
@ -1096,13 +1097,15 @@ static K_STORE *userstats_summ;
#endif #endif
#endif #endif
/* summarisation of the userstats after this many days are done /* TODO: summarisation of the userstats after this many days are done
* at the day level and the above stats are deleted from the db * at the day level and the above stats are deleted from the db
* Obvious WARNING - the larger this is, the more stats in the DB * Obvious WARNING - the larger this is, the more stats in the DB
* This is summary level '2' (TODO) * This is summary level '2'
*/ */
#define USERSTATS_DB_D 7 #define USERSTATS_DB_D 7
#define USERSTATS_DB_DS (USERSTATS_DB_D * (60*60*24))
// true if _new is newer, i.e. _old is before _new
#define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \ #define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \
((_old)->tv_usec < (_new)->tv_usec) : \ ((_old)->tv_usec < (_new)->tv_usec) : \
((_old)->tv_sec < (_new)->tv_sec)) ((_old)->tv_sec < (_new)->tv_sec))
@ -2938,6 +2941,11 @@ static bool workinfo_fill(PGconn *conn)
workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo); workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo);
workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height); workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height);
k_add_head(workinfo_store, item); k_add_head(workinfo_store, item);
if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) {
memcpy(&(dbstatus.newest_createdate_workinfo), &(row->createdate),
sizeof(row->createdate));
}
} }
if (!ok) if (!ok)
k_add_head(workinfo_list, item); k_add_head(workinfo_list, item);
@ -3534,7 +3542,7 @@ static bool sharesummary_fill(PGconn *conn)
// TODO: limit how far back // TODO: limit how far back
sel = "select " sel = "select "
"userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi" "userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,complete" "sharecount,errorcount,firstshare,lastshare,complete"
MODIFYDATECONTROL MODIFYDATECONTROL
@ -3666,6 +3674,13 @@ static bool sharesummary_fill(PGconn *conn)
k_add_head(sharesummary_store, item); k_add_head(sharesummary_store, item);
workerstatus_update(NULL, NULL, NULL, row); workerstatus_update(NULL, NULL, NULL, row);
if (tolower(row->complete[0]) == SUMMARY_NEW &&
(dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare)))) {
memcpy(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare),
sizeof(row->createdate));
}
} }
if (!ok) if (!ok)
k_add_head(sharesummary_list, item); k_add_head(sharesummary_list, item);
@ -4206,6 +4221,11 @@ static bool auths_fill(PGconn *conn)
auths_root = add_to_ktree(auths_root, item, cmp_auths); auths_root = add_to_ktree(auths_root, item, cmp_auths);
k_add_head(auths_store, item); k_add_head(auths_store, item);
workerstatus_update(row, NULL, NULL, NULL); workerstatus_update(row, NULL, NULL, NULL);
if (tv_newer(&(dbstatus.newest_createdate_auths), &(row->createdate))) {
memcpy(&(dbstatus.newest_createdate_auths), &(row->createdate),
sizeof(row->createdate));
}
} }
if (!ok) if (!ok)
k_add_head(auths_list, item); k_add_head(auths_list, item);
@ -4299,7 +4319,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
ins = "insert into poolstats " ins = "insert into poolstats "
"(poolinstance,elapsed,users,workers,hashrate," "(poolinstance,elapsed,users,workers,hashrate,"
"hashrate5m,hashrate1hr,hashrate24hr" "hashrate5m,hashrate1hr,hashrate24hr"
SIMPLEDATECONTROL ") values (" PQPARAM11 ")"; SIMPLEDATECONTROL ") values (" PQPARAM12 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
@ -4413,8 +4433,17 @@ static bool poolstats_fill(PGconn *conn)
break; break;
TXT_TO_DOUBLE("hashrate24hr", field, row->hashrate24hr); TXT_TO_DOUBLE("hashrate24hr", field, row->hashrate24hr);
SIMPLEDATEFLDS(res, i, row, ok);
if (!ok)
break;
poolstats_root = add_to_ktree(poolstats_root, item, cmp_poolstats); poolstats_root = add_to_ktree(poolstats_root, item, cmp_poolstats);
k_add_head(poolstats_store, item); k_add_head(poolstats_store, item);
if (tv_newer(&(dbstatus.newest_createdate_poolstats), &(row->createdate))) {
memcpy(&(dbstatus.newest_createdate_poolstats), &(row->createdate),
sizeof(row->createdate));
}
} }
if (!ok) if (!ok)
k_add_head(poolstats_list, item); k_add_head(poolstats_list, item);
@ -4646,6 +4675,54 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
return true; return true;
} }
static void userstats_update_ccl(USERSTATS *row)
{
USERSTATS userstats, *tmp;
K_TREE_CTX ctx[1];
K_ITEM look, *item;
char buf[DATE_BUFSIZ+1];
userstats.userid = row->userid;
STRNCPY(userstats.workername, row->workername);
memcpy(&(userstats.statsdate), &(row->statsdate), sizeof(row->statsdate));
// Start of the next timeband after this row
switch (row->summarylevel[0]) {
case SUMMARY_DB:
userstats.statsdate.tv_sec += USERSTATS_DB_S;
break;
case SUMMARY_FULL:
userstats.statsdate.tv_sec += USERSTATS_DB_DS;
break;
default:
tv_to_buf(&(row->statsdate), buf, sizeof(buf));
// Bad userstats are not fatal
LOGERR("Unknown userstats summarylevel '%c' "
"userid "PRId64" workername %s statsdate %s",
row->summarylevel[0], row->userid,
row->workername, buf);
return;
}
look.data = (void *)(&userstats);
item = find_in_ktree(userstats_ccl_root, &look, cmp_userstats_workername, ctx);
if (item) {
tmp = DATA_USERSTATS(item);
if (tv_newer(&(tmp->statsdate), &(userstats.statsdate)))
memcpy(&(tmp->statsdate), &(userstats.statsdate), sizeof(tmp->statsdate));
} else {
K_WLOCK(userstats_list);
item = k_unlink_head(userstats_list);
tmp = DATA_USERSTATS(item);
bzero(tmp, sizeof(*tmp));
tmp->userid = userstats.userid;
STRNCPY(tmp->workername, userstats.workername);
memcpy(&(tmp->statsdate), &(userstats.statsdate), sizeof(tmp->statsdate));
userstats_ccl_root = add_to_ktree(userstats_ccl_root, item,
cmp_userstats_workername);
k_add_head(userstats_ccl, item);
K_WUNLOCK(userstats_list);
}
}
// TODO: data selection - only require ? // TODO: data selection - only require ?
static bool userstats_fill(PGconn *conn) static bool userstats_fill(PGconn *conn)
{ {
@ -4748,6 +4825,7 @@ static bool userstats_fill(PGconn *conn)
k_add_head(userstats_store, item); k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row, NULL); workerstatus_update(NULL, NULL, row, NULL);
userstats_update_ccl(row);
} }
if (!ok) if (!ok)
k_add_head(userstats_list, item); k_add_head(userstats_list, item);
@ -4870,6 +4948,10 @@ static bool getdata()
goto matane; goto matane;
if (!(ok = shareerrors_fill())) if (!(ok = shareerrors_fill()))
goto matane; goto matane;
if (!(ok = sharesummary_fill(conn)))
goto matane;
if (!(ok = blocks_fill(conn)))
goto matane;
if (!(ok = auths_fill(conn))) if (!(ok = auths_fill(conn)))
goto matane; goto matane;
if (!(ok = poolstats_fill(conn))) if (!(ok = poolstats_fill(conn)))
@ -4975,76 +5057,115 @@ static void clean_up(ckpool_t *ckp)
static bool setup_data() static bool setup_data()
{ {
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
K_ITEM look, *found; K_ITEM look, *found, *ccl;
WORKINFO wi; WORKINFO wi;
char buf[DATE_BUFSIZ+1];
tv_t statsdate;
transfer_list = k_new_list("Transfer", sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); transfer_list = k_new_list("Transfer", sizeof(TRANSFER),
ALLOC_TRANSFER, LIMIT_TRANSFER, true);
transfer_store = k_new_store(transfer_list); transfer_store = k_new_store(transfer_list);
transfer_root = new_ktree(); transfer_root = new_ktree();
transfer_list->dsp_func = dsp_transfer; transfer_list->dsp_func = dsp_transfer;
users_list = k_new_list("Users", sizeof(USERS), ALLOC_USERS, LIMIT_USERS, true); users_list = k_new_list("Users", sizeof(USERS),
ALLOC_USERS, LIMIT_USERS, true);
users_store = k_new_store(users_list); users_store = k_new_store(users_list);
users_root = new_ktree(); users_root = new_ktree();
userid_root = new_ktree(); userid_root = new_ktree();
workers_list = k_new_list("Workers", sizeof(WORKERS), ALLOC_WORKERS, LIMIT_WORKERS, true); workers_list = k_new_list("Workers", sizeof(WORKERS),
ALLOC_WORKERS, LIMIT_WORKERS, true);
workers_store = k_new_store(workers_list); workers_store = k_new_store(workers_list);
workers_root = new_ktree(); workers_root = new_ktree();
payments_list = k_new_list("Payments", sizeof(PAYMENTS), ALLOC_PAYMENTS, LIMIT_PAYMENTS, true); payments_list = k_new_list("Payments", sizeof(PAYMENTS),
ALLOC_PAYMENTS, LIMIT_PAYMENTS, true);
payments_store = k_new_store(payments_list); payments_store = k_new_store(payments_list);
payments_root = new_ktree(); payments_root = new_ktree();
idcontrol_list = k_new_list("IDControl", sizeof(IDCONTROL), ALLOC_IDCONTROL, LIMIT_IDCONTROL, true); idcontrol_list = k_new_list("IDControl", sizeof(IDCONTROL),
ALLOC_IDCONTROL, LIMIT_IDCONTROL, true);
idcontrol_store = k_new_store(idcontrol_list); idcontrol_store = k_new_store(idcontrol_list);
workinfo_list = k_new_list("WorkInfo", sizeof(WORKINFO), ALLOC_WORKINFO, LIMIT_WORKINFO, true); workinfo_list = k_new_list("WorkInfo", sizeof(WORKINFO),
ALLOC_WORKINFO, LIMIT_WORKINFO, true);
workinfo_store = k_new_store(workinfo_list); workinfo_store = k_new_store(workinfo_list);
workinfo_root = new_ktree(); workinfo_root = new_ktree();
workinfo_height_root = new_ktree(); workinfo_height_root = new_ktree();
shares_list = k_new_list("Shares", sizeof(SHARES), ALLOC_SHARES, LIMIT_SHARES, true); shares_list = k_new_list("Shares", sizeof(SHARES),
ALLOC_SHARES, LIMIT_SHARES, true);
shares_store = k_new_store(shares_list); shares_store = k_new_store(shares_list);
shares_root = new_ktree(); shares_root = new_ktree();
shareerrors_list = k_new_list("ShareErrors", sizeof(SHAREERRORS), ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true); shareerrors_list = k_new_list("ShareErrors", sizeof(SHAREERRORS),
ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true);
shareerrors_store = k_new_store(shareerrors_list); shareerrors_store = k_new_store(shareerrors_list);
shareerrors_root = new_ktree(); shareerrors_root = new_ktree();
sharesummary_list = k_new_list("ShareSummary", sizeof(SHARESUMMARY), ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true); sharesummary_list = k_new_list("ShareSummary", sizeof(SHARESUMMARY),
ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true);
sharesummary_store = k_new_store(sharesummary_list); sharesummary_store = k_new_store(sharesummary_list);
sharesummary_root = new_ktree(); sharesummary_root = new_ktree();
sharesummary_workinfoid_root = new_ktree(); sharesummary_workinfoid_root = new_ktree();
sharesummary_list->dsp_func = dsp_sharesummary; sharesummary_list->dsp_func = dsp_sharesummary;
blocks_list = k_new_list("Blocks", sizeof(BLOCKS), ALLOC_BLOCKS, LIMIT_BLOCKS, true); blocks_list = k_new_list("Blocks", sizeof(BLOCKS),
ALLOC_BLOCKS, LIMIT_BLOCKS, true);
blocks_store = k_new_store(blocks_list); blocks_store = k_new_store(blocks_list);
blocks_root = new_ktree(); blocks_root = new_ktree();
auths_list = k_new_list("Auths", sizeof(AUTHS), ALLOC_AUTHS, LIMIT_AUTHS, true); auths_list = k_new_list("Auths", sizeof(AUTHS),
ALLOC_AUTHS, LIMIT_AUTHS, true);
auths_store = k_new_store(auths_list); auths_store = k_new_store(auths_list);
auths_root = new_ktree(); auths_root = new_ktree();
poolstats_list = k_new_list("PoolStats", sizeof(POOLSTATS), ALLOC_POOLSTATS, LIMIT_POOLSTATS, true); poolstats_list = k_new_list("PoolStats", sizeof(POOLSTATS),
ALLOC_POOLSTATS, LIMIT_POOLSTATS, true);
poolstats_store = k_new_store(poolstats_list); poolstats_store = k_new_store(poolstats_list);
poolstats_root = new_ktree(); poolstats_root = new_ktree();
userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true); userstats_list = k_new_list("UserStats", sizeof(USERSTATS),
ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_list); userstats_store = k_new_store(userstats_list);
userstats_eos_store = k_new_store(userstats_list); userstats_eos_store = k_new_store(userstats_list);
userstats_summ = k_new_store(userstats_list); userstats_summ = k_new_store(userstats_list);
userstats_ccl = k_new_store(userstats_list); userstats_ccl = k_new_store(userstats_list);
userstats_root = new_ktree(); userstats_root = new_ktree();
userstats_list->dsp_func = dsp_userstats; userstats_list->dsp_func = dsp_userstats;
userstats_ccl_root = new_ktree();
workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS),
ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true);
workerstatus_store = k_new_store(workerstatus_list); workerstatus_store = k_new_store(workerstatus_list);
workerstatus_root = new_ktree(); workerstatus_root = new_ktree();
if (!getdata()) if (!getdata())
return false; return false;
tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workinfo", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_auths), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
bzero(&statsdate, sizeof(statsdate));
ccl = userstats_ccl->head;
while (ccl) {
if (statsdate.tv_sec == 0 ||
!tv_newer(&statsdate, &(DATA_USERSTATS(ccl)->statsdate)))
memcpy(&statsdate, &(DATA_USERSTATS(ccl)->statsdate), sizeof(statsdate));
ccl = ccl->next;
}
tv_to_buf(&statsdate, buf, sizeof(buf));
LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf);
free_ktree(userstats_ccl_root, NULL);
k_list_transfer_to_head(userstats_ccl, userstats_list);
workinfo_current = last_in_ktree(workinfo_height_root, ctx); workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) { if (workinfo_current) {
STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1); STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1);
@ -5201,7 +5322,7 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
if (!i_createdate) if (!i_createdate)
return strdup(reply); return strdup(reply);
TXT_TO_CTV("createdate", DATA_TRANSFER(i_createdate)->data, createdate); TXT_TO_CTV("createdate", DATA_TRANSFER(i_createdate)->data, createdate);
if (tvdiff(&createdate, &(row.createdate)) > STATS_PER) if (tvdiff(&createdate, &(DATA_POOLSTATS(ps)->createdate)) > STATS_PER)
store = true; store = true;
else else
store = false; store = false;
@ -6494,7 +6615,7 @@ static void summarise_userstats()
break; break;
memcpy(&when, &(DATA_USERSTATS(tail)->statsdate), sizeof(when)); memcpy(&when, &(DATA_USERSTATS(tail)->statsdate), sizeof(when));
/* Convent when to the start of the timeframe after the one it is in /* Convert when to the start of the timeframe after the one it is in
* assume timeval ignores leapseconds ... */ * assume timeval ignores leapseconds ... */
when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S; when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S;
when.tv_usec = 0; when.tv_usec = 0;

Loading…
Cancel
Save