Browse Source

ckdb - calculate most of workerstatus after data load

master
kanoi 10 years ago
parent
commit
0c308a926a
  1. 136
      src/ckdb.c

136
src/ckdb.c

@ -85,8 +85,8 @@ static char *restorefrom;
* sharesummary complete='n', but for any such sharesummary
* we reset it back to the first share found and it will
* correct itself during the CCL reload
* Verify that all DB sharesummaries with complete='n' have
* done this
* TODO: Verify that all DB sharesummaries with complete='n'
* have done this
* DB+RAM workinfo: start from newest DB createdate workinfo
* DB+RAM auths: start from newest DB createdate auths
* DB+RAM poolstats: newest createdate poolstats
@ -107,7 +107,7 @@ static char *restorefrom;
* DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any)
* will be after the last DB workinfo
* DB+RAM accountbalance (TODO): resolved by shares/workinfo/blocks
* RAM workerstats: last_auth, last_share, last_stats all handled by
* RAM workerstatus: last_auth, last_share, last_stats all handled by
* DB load up to whatever the CCL restart point is, and then
* corrected with the CCL reload
* last_idle will be the last idle userstats in the CCL load or 0
@ -144,7 +144,8 @@ static char *restorefrom;
* 2) shares/shareerrors: any record that matches an incomplete DB
* sharesummary that hasn't been reset, will reset the sharesummary
* so that the sharesummary will be recalculated
* The record is processed normally in both situations
* The record is processed normally with or without the reset
* If the sharesummary is complete, the record is discarded
* 3) ageworkinfo records are also handled by the shares date
* while processing, any records already aged are not updated
* and a warning is displayed if there were any matching shares
@ -567,8 +568,8 @@ static const tv_t date_first = { DATE_DEBUG, 0L };
// Different input data handling
static bool reloading = false;
// Tell the summarizer data load is complete
static bool summarizer_go = false;
// Data load is complete
static bool startup_complete = false;
// Tell the summarizer to die
static bool summarizer_die = false;
@ -1057,6 +1058,7 @@ typedef struct userstats {
static K_TREE *userstats_root;
static K_TREE *userstats_statsdate_root; // ordered by statsdate first
static K_TREE *userstats_workerstatus_root; // during data load
static K_LIST *userstats_free;
static K_STORE *userstats_store;
// Awaiting EOS
@ -1770,11 +1772,63 @@ static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool
#define find_create_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, true)
#define find_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, false)
/* TODO: Change this to calculate after the DB load rather than during
for: shares, userstats and sharesummary
auths will fully populate the workerstats tree to use for the
reverse search back into the shares, userstats and sharesummary
for the latest records */
static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
static double cmp_sharesummary(K_ITEM *a, K_ITEM *b);
/* All data is loaded, now update workerstatus last_share, last_idle, last_stats
* shares are all part of a sharesummary so no need to search shares
*/
static void workerstatus_ready()
{
K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1];
K_ITEM *ws_item, look, *us_item, *ss_item;
USERSTATS userstats;
SHARESUMMARY sharesummary;
ws_item = first_in_ktree(workerstatus_root, ws_ctx);
while (ws_item) {
userstats.userid = DATA_WORKERSTATUS(ws_item)->userid;
STRNCPY(userstats.workername, DATA_WORKERSTATUS(ws_item)->workername);
userstats.statsdate.tv_sec = date_eot.tv_sec;
userstats.statsdate.tv_usec = date_eot.tv_usec;
look.data = (void *)(&userstats);
us_item = find_before_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, us_ctx);
if (us_item) {
if (DATA_USERSTATS(us_item)->idle) {
if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_idle),
&(DATA_USERSTATS(us_item)->statsdate))) {
copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_idle),
&(DATA_USERSTATS(us_item)->statsdate));
}
} else {
if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_stats),
&(DATA_USERSTATS(us_item)->statsdate))) {
copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_stats),
&(DATA_USERSTATS(us_item)->statsdate));
}
}
}
sharesummary.userid = DATA_WORKERSTATUS(ws_item)->userid;
STRNCPY(sharesummary.workername, DATA_WORKERSTATUS(ws_item)->workername);
sharesummary.workinfoid = 0x7fffffffffffffffLL;
look.data = (void *)(&sharesummary);
ss_item = find_before_in_ktree(sharesummary_root, &look, cmp_sharesummary, ss_ctx);
if (ss_item) {
if (tv_newer(&(DATA_WORKERSTATUS(ws_item)->last_share),
&(DATA_SHARESUMMARY(ss_item)->lastshare))) {
copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_share),
&(DATA_SHARESUMMARY(ss_item)->lastshare));
}
}
ws_item = next_in_ktree(ws_ctx);
}
free_ktree(userstats_workerstatus_root, NULL);
}
static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats,
SHARESUMMARY *sharesummary)
{
@ -1790,14 +1844,14 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta
copy_tv(&(row->last_auth), &(auths->createdate));
}
if (shares) {
if (startup_complete && shares) {
item = find_create_workerstatus(shares->userid, shares->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->last_share), &(shares->createdate)))
copy_tv(&(row->last_share), &(shares->createdate));
}
if (userstats) {
if (startup_complete && userstats) {
item = find_create_workerstatus(userstats->userid, userstats->workername);
row = DATA_WORKERSTATUS(item);
if (userstats->idle) {
@ -1809,7 +1863,7 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta
}
}
if (sharesummary) {
if (startup_complete && sharesummary) {
item = find_create_workerstatus(sharesummary->userid, sharesummary->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->last_share), &(sharesummary->lastshare)))
@ -2822,6 +2876,17 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
static double cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b);
static double cmp_shares(K_ITEM *a, K_ITEM *b);
/* N.B. a DB check can be done to find sharesummaries that were missed being
* aged (and a possible problem with the aging process):
* e.g. for a date D in the past of at least a few hours
* select count(*) from sharesummary where createdate<'D' and complete='n';
* and can be easily corrected:
* update sharesummary set complete='a' where createdate<'D' and complete='n';
* It's important to make sure the D value is far enough in the past such that
* all the matching sharesummary records in ckdb have certainly completed
* ckdb would need a restart to get the updated DB information though it would
* not affect current ckdb code
*/
static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
{
@ -4708,6 +4773,27 @@ static double cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b)
return c;
}
/* order by userid asc,workername asc,statsdate asc,poolinstance asc
built during data load to update workerstatus at the end of the load */
static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b)
{
double c = (double)(DATA_USERSTATS(a)->userid -
DATA_USERSTATS(b)->userid);
if (c == 0) {
c = (double)strcmp(DATA_USERSTATS(a)->workername,
DATA_USERSTATS(b)->workername);
if (c == 0) {
c = tvdiff(&(DATA_USERSTATS(a)->statsdate),
&(DATA_USERSTATS(b)->statsdate));
if (c == 0) {
c = (double)strcmp(DATA_USERSTATS(a)->poolinstance,
DATA_USERSTATS(b)->poolinstance);
}
}
}
return c;
}
static bool userstats_add_db(PGconn *conn, USERSTATS *row)
{
ExecStatusType rescode;
@ -4866,6 +4952,10 @@ advancetogo:
cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_match,
cmp_userstats_statsdate);
if (!startup_complete) {
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, us_match,
cmp_userstats_workerstatus);
}
k_add_head(userstats_store, us_match);
}
}
@ -5037,6 +5127,8 @@ static bool userstats_fill(PGconn *conn)
userstats_root = add_to_ktree(userstats_root, item, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, item,
cmp_userstats_statsdate);
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, item,
cmp_userstats_workerstatus);
k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row, NULL);
@ -5402,6 +5494,7 @@ static bool setup_data()
userstats_db = k_new_store(userstats_free);
userstats_root = new_ktree();
userstats_statsdate_root = new_ktree();
userstats_workerstatus_root = new_ktree();
userstats_free->dsp_func = dsp_userstats;
userstats_db_root = new_ktree();
@ -5416,6 +5509,8 @@ static bool setup_data()
if (!reload())
return false;
workerstatus_ready();
workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) {
STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1);
@ -7052,7 +7147,8 @@ static void summarise_userstats()
memcpy(userstats, DATA_USERSTATS(first), sizeof(USERSTATS));
userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats, ctx2);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first, cmp_userstats_statsdate, ctx2);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first,
cmp_userstats_statsdate, ctx2);
k_unlink_item(userstats_store, first);
k_add_head(userstats_summ, first);
@ -7077,7 +7173,8 @@ static void summarise_userstats()
userstats->summarycount += DATA_USERSTATS(next)->summarycount;
userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats, ctx2);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next, cmp_userstats_statsdate, ctx2);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next,
cmp_userstats_statsdate, ctx2);
k_unlink_item(userstats_store, next);
k_add_head(userstats_summ, next);
}
@ -7128,7 +7225,8 @@ static void summarise_userstats()
k_list_transfer_to_tail(userstats_summ, userstats_free);
k_add_head(userstats_store, new);
userstats_root = add_to_ktree(userstats_root, new, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new, cmp_userstats_statsdate);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new,
cmp_userstats_statsdate);
if (upgrade)
K_WUNLOCK(userstats_free);
@ -7155,7 +7253,7 @@ static void *summariser(__maybe_unused void *arg)
{
pthread_detach(pthread_self());
while (!summarizer_die && !summarizer_go)
while (!summarizer_die && !startup_complete)
cksleep_ms(42);
while (!summarizer_die) {
@ -7395,7 +7493,7 @@ static void *listener(void *arg)
LOGWARNING("%s(): ckdb ready", __func__);
summarizer_go = true;
startup_complete = true;
while (true) {
dealloc(buf);

Loading…
Cancel
Save