From 74b5e817a3d6ecc70d7b7ef1aa83ff99c03c09a3 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 31 Jul 2014 14:09:33 +1000 Subject: [PATCH] ckdb - calculate restart dates and related fixes --- src/ckdb.c | 167 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 23 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index ebb091f1..161dcb4a 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -119,7 +119,7 @@ static char *restorefrom; * idcontrol: only userid reuse is critical and the user is added * immeditately to the DB before replying to the add message * - * Tables that are/will be written straight to the DB, so ar OK: + * Tables that are/will be written straight to the DB, so are OK: * users, useraccounts, paymentaddresses, payments, * accountadjustment, optioncontrol, miningpayouts, * eventlog @@ -128,7 +128,7 @@ static char *restorefrom; typedef struct loadstatus { tv_t oldest_sharesummary_firstshare_n; tv_t newest_createdate_workinfo; - tv_t newest_createdate_auth; + tv_t newest_createdate_auths; tv_t newest_createdate_poolstats; } LOADSTATUS; static LOADSTATUS dbstatus; @@ -136,6 +136,7 @@ static LOADSTATUS dbstatus; /* Temporary while doing restart - it (of course) contains the fields * required to track the newest userstats per user/worker */ +static K_TREE *userstats_ccl_root; static K_STORE *userstats_ccl; // size limit on the command string @@ -1096,13 +1097,15 @@ static K_STORE *userstats_summ; #endif #endif -/* summarisation of the userstats after this many days are done +/* TODO: summarisation of the userstats after this many days are done * at the day level and the above stats are deleted from the db * Obvious WARNING - the larger this is, the more stats in the DB - * This is summary level '2' (TODO) + * This is summary level '2' */ #define USERSTATS_DB_D 7 +#define USERSTATS_DB_DS (USERSTATS_DB_D * (60*60*24)) +// true if _new is newer, i.e. _old is before _new #define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \ ((_old)->tv_usec < (_new)->tv_usec) : \ ((_old)->tv_sec < (_new)->tv_sec)) @@ -2938,6 +2941,11 @@ static bool workinfo_fill(PGconn *conn) workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo); workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height); k_add_head(workinfo_store, item); + + if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) { + memcpy(&(dbstatus.newest_createdate_workinfo), &(row->createdate), + sizeof(row->createdate)); + } } if (!ok) k_add_head(workinfo_list, item); @@ -3534,7 +3542,7 @@ static bool sharesummary_fill(PGconn *conn) // TODO: limit how far back sel = "select " - "userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi" + "userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," "sharecount,errorcount,firstshare,lastshare,complete" MODIFYDATECONTROL @@ -3666,6 +3674,13 @@ static bool sharesummary_fill(PGconn *conn) k_add_head(sharesummary_store, item); workerstatus_update(NULL, NULL, NULL, row); + + if (tolower(row->complete[0]) == SUMMARY_NEW && + (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 || + !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare)))) { + memcpy(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare), + sizeof(row->createdate)); + } } if (!ok) k_add_head(sharesummary_list, item); @@ -4206,6 +4221,11 @@ static bool auths_fill(PGconn *conn) auths_root = add_to_ktree(auths_root, item, cmp_auths); k_add_head(auths_store, item); workerstatus_update(row, NULL, NULL, NULL); + + if (tv_newer(&(dbstatus.newest_createdate_auths), &(row->createdate))) { + memcpy(&(dbstatus.newest_createdate_auths), &(row->createdate), + sizeof(row->createdate)); + } } if (!ok) k_add_head(auths_list, item); @@ -4299,7 +4319,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, ins = "insert into poolstats " "(poolinstance,elapsed,users,workers,hashrate," "hashrate5m,hashrate1hr,hashrate24hr" - SIMPLEDATECONTROL ") values (" PQPARAM11 ")"; + SIMPLEDATECONTROL ") values (" PQPARAM12 ")"; res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); @@ -4413,8 +4433,17 @@ static bool poolstats_fill(PGconn *conn) break; TXT_TO_DOUBLE("hashrate24hr", field, row->hashrate24hr); + SIMPLEDATEFLDS(res, i, row, ok); + if (!ok) + break; + poolstats_root = add_to_ktree(poolstats_root, item, cmp_poolstats); k_add_head(poolstats_store, item); + + if (tv_newer(&(dbstatus.newest_createdate_poolstats), &(row->createdate))) { + memcpy(&(dbstatus.newest_createdate_poolstats), &(row->createdate), + sizeof(row->createdate)); + } } if (!ok) k_add_head(poolstats_list, item); @@ -4646,6 +4675,54 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, return true; } +static void userstats_update_ccl(USERSTATS *row) +{ + USERSTATS userstats, *tmp; + K_TREE_CTX ctx[1]; + K_ITEM look, *item; + char buf[DATE_BUFSIZ+1]; + + userstats.userid = row->userid; + STRNCPY(userstats.workername, row->workername); + memcpy(&(userstats.statsdate), &(row->statsdate), sizeof(row->statsdate)); + // Start of the next timeband after this row + switch (row->summarylevel[0]) { + case SUMMARY_DB: + userstats.statsdate.tv_sec += USERSTATS_DB_S; + break; + case SUMMARY_FULL: + userstats.statsdate.tv_sec += USERSTATS_DB_DS; + break; + default: + tv_to_buf(&(row->statsdate), buf, sizeof(buf)); + // Bad userstats are not fatal + LOGERR("Unknown userstats summarylevel '%c' " + "userid "PRId64" workername %s statsdate %s", + row->summarylevel[0], row->userid, + row->workername, buf); + return; + } + look.data = (void *)(&userstats); + item = find_in_ktree(userstats_ccl_root, &look, cmp_userstats_workername, ctx); + if (item) { + tmp = DATA_USERSTATS(item); + if (tv_newer(&(tmp->statsdate), &(userstats.statsdate))) + memcpy(&(tmp->statsdate), &(userstats.statsdate), sizeof(tmp->statsdate)); + } else { + K_WLOCK(userstats_list); + item = k_unlink_head(userstats_list); + tmp = DATA_USERSTATS(item); + bzero(tmp, sizeof(*tmp)); + tmp->userid = userstats.userid; + STRNCPY(tmp->workername, userstats.workername); + memcpy(&(tmp->statsdate), &(userstats.statsdate), sizeof(tmp->statsdate)); + userstats_ccl_root = add_to_ktree(userstats_ccl_root, item, + cmp_userstats_workername); + k_add_head(userstats_ccl, item); + K_WUNLOCK(userstats_list); + } +} + // TODO: data selection - only require ? static bool userstats_fill(PGconn *conn) { @@ -4748,6 +4825,7 @@ static bool userstats_fill(PGconn *conn) k_add_head(userstats_store, item); workerstatus_update(NULL, NULL, row, NULL); + userstats_update_ccl(row); } if (!ok) k_add_head(userstats_list, item); @@ -4870,6 +4948,10 @@ static bool getdata() goto matane; if (!(ok = shareerrors_fill())) goto matane; + if (!(ok = sharesummary_fill(conn))) + goto matane; + if (!(ok = blocks_fill(conn))) + goto matane; if (!(ok = auths_fill(conn))) goto matane; if (!(ok = poolstats_fill(conn))) @@ -4975,76 +5057,115 @@ static void clean_up(ckpool_t *ckp) static bool setup_data() { K_TREE_CTX ctx[1]; - K_ITEM look, *found; + K_ITEM look, *found, *ccl; WORKINFO wi; + char buf[DATE_BUFSIZ+1]; + tv_t statsdate; - transfer_list = k_new_list("Transfer", sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); + transfer_list = k_new_list("Transfer", sizeof(TRANSFER), + ALLOC_TRANSFER, LIMIT_TRANSFER, true); transfer_store = k_new_store(transfer_list); transfer_root = new_ktree(); transfer_list->dsp_func = dsp_transfer; - users_list = k_new_list("Users", sizeof(USERS), ALLOC_USERS, LIMIT_USERS, true); + users_list = k_new_list("Users", sizeof(USERS), + ALLOC_USERS, LIMIT_USERS, true); users_store = k_new_store(users_list); users_root = new_ktree(); userid_root = new_ktree(); - workers_list = k_new_list("Workers", sizeof(WORKERS), ALLOC_WORKERS, LIMIT_WORKERS, true); + workers_list = k_new_list("Workers", sizeof(WORKERS), + ALLOC_WORKERS, LIMIT_WORKERS, true); workers_store = k_new_store(workers_list); workers_root = new_ktree(); - payments_list = k_new_list("Payments", sizeof(PAYMENTS), ALLOC_PAYMENTS, LIMIT_PAYMENTS, true); + payments_list = k_new_list("Payments", sizeof(PAYMENTS), + ALLOC_PAYMENTS, LIMIT_PAYMENTS, true); payments_store = k_new_store(payments_list); payments_root = new_ktree(); - idcontrol_list = k_new_list("IDControl", sizeof(IDCONTROL), ALLOC_IDCONTROL, LIMIT_IDCONTROL, true); + idcontrol_list = k_new_list("IDControl", sizeof(IDCONTROL), + ALLOC_IDCONTROL, LIMIT_IDCONTROL, true); idcontrol_store = k_new_store(idcontrol_list); - workinfo_list = k_new_list("WorkInfo", sizeof(WORKINFO), ALLOC_WORKINFO, LIMIT_WORKINFO, true); + workinfo_list = k_new_list("WorkInfo", sizeof(WORKINFO), + ALLOC_WORKINFO, LIMIT_WORKINFO, true); workinfo_store = k_new_store(workinfo_list); workinfo_root = new_ktree(); workinfo_height_root = new_ktree(); - shares_list = k_new_list("Shares", sizeof(SHARES), ALLOC_SHARES, LIMIT_SHARES, true); + shares_list = k_new_list("Shares", sizeof(SHARES), + ALLOC_SHARES, LIMIT_SHARES, true); shares_store = k_new_store(shares_list); shares_root = new_ktree(); - shareerrors_list = k_new_list("ShareErrors", sizeof(SHAREERRORS), ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true); + shareerrors_list = k_new_list("ShareErrors", sizeof(SHAREERRORS), + ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true); shareerrors_store = k_new_store(shareerrors_list); shareerrors_root = new_ktree(); - sharesummary_list = k_new_list("ShareSummary", sizeof(SHARESUMMARY), ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true); + sharesummary_list = k_new_list("ShareSummary", sizeof(SHARESUMMARY), + ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true); sharesummary_store = k_new_store(sharesummary_list); sharesummary_root = new_ktree(); sharesummary_workinfoid_root = new_ktree(); sharesummary_list->dsp_func = dsp_sharesummary; - blocks_list = k_new_list("Blocks", sizeof(BLOCKS), ALLOC_BLOCKS, LIMIT_BLOCKS, true); + blocks_list = k_new_list("Blocks", sizeof(BLOCKS), + ALLOC_BLOCKS, LIMIT_BLOCKS, true); blocks_store = k_new_store(blocks_list); blocks_root = new_ktree(); - auths_list = k_new_list("Auths", sizeof(AUTHS), ALLOC_AUTHS, LIMIT_AUTHS, true); + auths_list = k_new_list("Auths", sizeof(AUTHS), + ALLOC_AUTHS, LIMIT_AUTHS, true); auths_store = k_new_store(auths_list); auths_root = new_ktree(); - poolstats_list = k_new_list("PoolStats", sizeof(POOLSTATS), ALLOC_POOLSTATS, LIMIT_POOLSTATS, true); + poolstats_list = k_new_list("PoolStats", sizeof(POOLSTATS), + ALLOC_POOLSTATS, LIMIT_POOLSTATS, true); poolstats_store = k_new_store(poolstats_list); poolstats_root = new_ktree(); - userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true); + userstats_list = k_new_list("UserStats", sizeof(USERSTATS), + ALLOC_USERSTATS, LIMIT_USERSTATS, true); userstats_store = k_new_store(userstats_list); userstats_eos_store = k_new_store(userstats_list); userstats_summ = k_new_store(userstats_list); userstats_ccl = k_new_store(userstats_list); userstats_root = new_ktree(); userstats_list->dsp_func = dsp_userstats; + userstats_ccl_root = new_ktree(); - workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); + workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), + ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); workerstatus_store = k_new_store(workerstatus_list); workerstatus_root = new_ktree(); if (!getdata()) return false; + tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); + LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB workinfo", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_auths), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB auths", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); + + bzero(&statsdate, sizeof(statsdate)); + ccl = userstats_ccl->head; + while (ccl) { + if (statsdate.tv_sec == 0 || + !tv_newer(&statsdate, &(DATA_USERSTATS(ccl)->statsdate))) + memcpy(&statsdate, &(DATA_USERSTATS(ccl)->statsdate), sizeof(statsdate)); + ccl = ccl->next; + } + tv_to_buf(&statsdate, buf, sizeof(buf)); + LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf); + free_ktree(userstats_ccl_root, NULL); + k_list_transfer_to_head(userstats_ccl, userstats_list); + workinfo_current = last_in_ktree(workinfo_height_root, ctx); if (workinfo_current) { STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1); @@ -5201,7 +5322,7 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char * if (!i_createdate) return strdup(reply); TXT_TO_CTV("createdate", DATA_TRANSFER(i_createdate)->data, createdate); - if (tvdiff(&createdate, &(row.createdate)) > STATS_PER) + if (tvdiff(&createdate, &(DATA_POOLSTATS(ps)->createdate)) > STATS_PER) store = true; else store = false; @@ -6494,7 +6615,7 @@ static void summarise_userstats() break; memcpy(&when, &(DATA_USERSTATS(tail)->statsdate), sizeof(when)); - /* Convent when to the start of the timeframe after the one it is in + /* Convert when to the start of the timeframe after the one it is in * assume timeval ignores leapseconds ... */ when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S; when.tv_usec = 0;