From 0abbed2cf6e7c3a43b5a88734069a81b531b5a6a Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 2 Oct 2014 11:24:31 +1000 Subject: [PATCH] ckdb - move remaining data code into ckdb_data --- src/ckdb.c | 353 ------------------------------------- src/ckdb.h | 26 ++- src/ckdb_data.c | 456 ++++++++++++++++++++++++++++++++++++++++++++++-- src/ckdb_dbio.c | 81 --------- 4 files changed, 457 insertions(+), 459 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 82e30cfd..4242eaa1 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -642,359 +642,6 @@ PGconn *dbconnect() return conn; } -void zero_on_new_block() -{ - WORKERSTATUS *workerstatus; - K_TREE_CTX ctx[1]; - K_ITEM *ws_item; - - K_WLOCK(workerstatus_free); - pool.diffacc = pool.diffinv = pool.shareacc = - pool.shareinv = pool.best_sdiff = 0; - ws_item = first_in_ktree(workerstatus_root, ctx); - while (ws_item) { - DATA_WORKERSTATUS(workerstatus, ws_item); - workerstatus->diffacc = workerstatus->diffinv = - workerstatus->diffsta = workerstatus->diffdup = - workerstatus->diffhi = workerstatus->diffrej = - workerstatus->shareacc = workerstatus->shareinv = - workerstatus->sharesta = workerstatus->sharedup = - workerstatus->sharehi = workerstatus->sharerej = 0.0; - ws_item = next_in_ktree(ctx); - } - K_WUNLOCK(workerstatus_free); - -} - -/* Currently only used at the end of the startup - * Will need to add locking if it's used, later, after startup completes */ -static void set_block_share_counters() -{ - K_TREE_CTX ctx[1]; - K_ITEM *ss_item, ss_look, *ws_item; - WORKERSTATUS *workerstatus; - SHARESUMMARY *sharesummary, looksharesummary; - - INIT_SHARESUMMARY(&ss_look); - - zero_on_new_block(); - - ws_item = NULL; - /* From the end backwards so we can skip the workinfoid's we don't - * want by jumping back to just before the current worker when the - * workinfoid goes below the limit */ - K_RLOCK(sharesummary_free); - ss_item = last_in_ktree(sharesummary_root, ctx); - while (ss_item) { - DATA_SHARESUMMARY(sharesummary, ss_item); - if (sharesummary->workinfoid < pool.workinfoid) { - // Skip back to the next worker - looksharesummary.userid = sharesummary->userid; - STRNCPY(looksharesummary.workername, - sharesummary->workername); - looksharesummary.workinfoid = -1; - ss_look.data = (void *)(&looksharesummary); - ss_item = find_before_in_ktree(sharesummary_root, &ss_look, - cmp_sharesummary, ctx); - continue; - } - - /* Check for user/workername change for new workerstatus - * The tree has user/workername grouped together in order - * so this will only be once per user/workername */ - if (!ws_item || - sharesummary->userid != workerstatus->userid || - strcmp(sharesummary->workername, workerstatus->workername)) { - /* This is to trigger a console error if it is missing - * since it should always exist - * However, it is simplest to simply create it - * and keep going */ - K_RUNLOCK(sharesummary_free); - ws_item = find_workerstatus(sharesummary->userid, - sharesummary->workername, - __FILE__, __func__, __LINE__); - if (!ws_item) { - ws_item = find_create_workerstatus(sharesummary->userid, - sharesummary->workername, - __FILE__, __func__, __LINE__); - } - K_RLOCK(sharesummary_free); - DATA_WORKERSTATUS(workerstatus, ws_item); - } - - pool.diffacc += sharesummary->diffacc; - pool.diffinv += sharesummary->diffsta + sharesummary->diffdup + - sharesummary->diffhi + sharesummary->diffrej; - workerstatus->diffacc += sharesummary->diffacc; - workerstatus->diffinv += sharesummary->diffsta + sharesummary->diffdup + - sharesummary->diffhi + sharesummary->diffrej; - workerstatus->diffsta += sharesummary->diffsta; - workerstatus->diffdup += sharesummary->diffdup; - workerstatus->diffhi += sharesummary->diffhi; - workerstatus->diffrej += sharesummary->diffrej; - workerstatus->shareacc += sharesummary->shareacc; - workerstatus->shareinv += sharesummary->sharesta + sharesummary->sharedup + - sharesummary->sharehi + sharesummary->sharerej; - workerstatus->sharesta += sharesummary->sharesta; - workerstatus->sharedup += sharesummary->sharedup; - workerstatus->sharehi += sharesummary->sharehi; - workerstatus->sharerej += sharesummary->sharerej; - - ss_item = prev_in_ktree(ctx); - } - K_RUNLOCK(sharesummary_free); -} - -/* All data is loaded, now update workerstatus fields - TODO: combine set_block_share_counters() with this? */ -static void workerstatus_ready() -{ - K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1]; - K_ITEM *ws_item, us_look, ss_look, *us_item, *ss_item; - USERSTATS lookuserstats, *userstats; - SHARESUMMARY looksharesummary, *sharesummary; - WORKERSTATUS *workerstatus; - - INIT_USERSTATS(&us_look); - INIT_SHARESUMMARY(&ss_look); - ws_item = first_in_ktree(workerstatus_root, ws_ctx); - while (ws_item) { - DATA_WORKERSTATUS(workerstatus, ws_item); - lookuserstats.userid = workerstatus->userid; - STRNCPY(lookuserstats.workername, workerstatus->workername); - lookuserstats.statsdate.tv_sec = date_eot.tv_sec; - lookuserstats.statsdate.tv_usec = date_eot.tv_usec; - us_look.data = (void *)(&lookuserstats); - us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look, - cmp_userstats_workerstatus, us_ctx); - if (us_item) { - DATA_USERSTATS(userstats, us_item); - if (userstats->idle) { - if (tv_newer(&(workerstatus->last_idle), - &(userstats->statsdate))) { - copy_tv(&(workerstatus->last_idle), - &(userstats->statsdate)); - } - } else { - if (tv_newer(&(workerstatus->last_stats), - &(userstats->statsdate))) { - copy_tv(&(workerstatus->last_stats), - &(userstats->statsdate)); - } - } - } - - looksharesummary.userid = workerstatus->userid; - STRNCPY(looksharesummary.workername, workerstatus->workername); - looksharesummary.workinfoid = MAXID; - ss_look.data = (void *)(&looksharesummary); - K_RLOCK(sharesummary_free); - ss_item = find_before_in_ktree(sharesummary_root, &ss_look, - cmp_sharesummary, ss_ctx); - K_RUNLOCK(sharesummary_free); - if (ss_item) { - DATA_SHARESUMMARY(sharesummary, ss_item); - if (tv_newer(&(workerstatus->last_share), - &(sharesummary->lastshare))) { - copy_tv(&(workerstatus->last_share), - &(sharesummary->lastshare)); - workerstatus->last_diff = - sharesummary->lastdiffacc; - } - } - - ws_item = next_in_ktree(ws_ctx); - } -} - -void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd) -{ - static int64_t last_attempted_id = -1; - static int64_t prev_found = 0; - static int repeat; - - char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ]; - int64_t ss_count_tot, s_count_tot, s_diff_tot; - int64_t ss_count, s_count, s_diff; - tv_t ss_first_min, ss_last_max; - tv_t ss_first, ss_last; - int32_t wid_count; - SHARESUMMARY looksharesummary, *sharesummary; - K_TREE_CTX ctx[1]; - K_ITEM look, *ss_item; - int64_t age_id, do_id, to_id; - bool ok, found; - - LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); - - age_id = prev_found; - - // Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found - looksharesummary.workinfoid = prev_found; - looksharesummary.userid = -1; - looksharesummary.workername[0] = '\0'; - INIT_SHARESUMMARY(&look); - look.data = (void *)(&looksharesummary); - - K_RLOCK(sharesummary_free); - ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, - cmp_sharesummary_workinfoid, ctx); - DATA_SHARESUMMARY_NULL(sharesummary, ss_item); - - ss_first_min.tv_sec = ss_first_min.tv_usec = - ss_last_max.tv_sec = ss_last_max.tv_usec = 0; - ss_count_tot = s_count_tot = s_diff_tot = 0; - - found = false; - while (ss_item && sharesummary->workinfoid < workinfoid) { - if (sharesummary->complete[0] == SUMMARY_NEW) { - age_id = sharesummary->workinfoid; - prev_found = age_id; - found = true; - break; - } - ss_item = next_in_ktree(ctx); - DATA_SHARESUMMARY_NULL(sharesummary, ss_item); - } - K_RUNLOCK(sharesummary_free); - - LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); - // Don't repeat searching old items to avoid accessing their ram - if (!found) - prev_found = workinfoid; - else { - /* Process all the consecutive sharesummaries that's aren't aged - * This way we find each oldest 'batch' of sharesummaries that have - * been missed and can report the range of data that was aged, - * which would normally just be an approx 10min set of workinfoids - * from the last time ckpool stopped - * Each next group of unaged sharesummaries following this, will be - * picked up by each next aging */ - wid_count = 0; - do_id = age_id; - to_id = 0; - do { - ok = workinfo_age(conn, do_id, poolinstance, - by, code, inet, cd, - &ss_first, &ss_last, - &ss_count, &s_count, &s_diff); - - ss_count_tot += ss_count; - s_count_tot += s_count; - s_diff_tot += s_diff; - if (ss_first_min.tv_sec == 0 || !tv_newer(&ss_first_min, &ss_first)) - copy_tv(&ss_first_min, &ss_first); - if (tv_newer(&ss_last_max, &ss_last)) - copy_tv(&ss_last_max, &ss_last); - - if (!ok) - break; - - to_id = do_id; - wid_count++; - K_RLOCK(sharesummary_free); - while (ss_item && sharesummary->workinfoid == to_id) { - ss_item = next_in_ktree(ctx); - DATA_SHARESUMMARY_NULL(sharesummary, ss_item); - } - K_RUNLOCK(sharesummary_free); - - if (ss_item) { - do_id = sharesummary->workinfoid; - if (do_id >= workinfoid) - break; - if (sharesummary->complete[0] != SUMMARY_NEW) - break; - } - } while (ss_item); - if (to_id == 0) { - if (last_attempted_id != age_id || ++repeat >= 10) { - // Approx once every 5min since workinfo defaults to ~30s - LOGWARNING("%s() Auto-age failed to age %"PRId64, - __func__, age_id); - last_attempted_id = age_id; - repeat = 0; - } - } else { - char idrange[64]; - char sharerange[256]; - if (to_id != age_id) { - snprintf(idrange, sizeof(idrange), - "from %"PRId64" to %"PRId64, - age_id, to_id); - } else { - snprintf(idrange, sizeof(idrange), - "%"PRId64, age_id); - } - tv_to_buf(&ss_first_min, min_buf, sizeof(min_buf)); - if (tv_equal(&ss_first_min, &ss_last_max)) { - snprintf(sharerange, sizeof(sharerange), - "share date %s", min_buf); - } else { - tv_to_buf(&ss_last_max, max_buf, sizeof(max_buf)); - snprintf(sharerange, sizeof(sharerange), - "share dates %s to %s", - min_buf, max_buf); - } - LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") " - "share%s %d sharesummar%s %d workinfoid%s " - "%s %s", - __func__, - s_count_tot, s_diff_tot, - (s_count_tot == 1) ? "" : "s", - ss_count_tot, - (ss_count_tot == 1) ? "y" : "ies", - wid_count, - (wid_count == 1) ? "" : "s", - idrange, sharerange); - } - } -} - -void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff) -{ - row->diffacc = row->diffsta = row->diffdup = row->diffhi = - row->diffrej = row->shareacc = row->sharesta = row->sharedup = - row->sharehi = row->sharerej = 0.0; - row->sharecount = row->errorcount = row->countlastupdate = 0; - row->reset = false; - row->firstshare.tv_sec = cd->tv_sec; - row->firstshare.tv_usec = cd->tv_usec; - row->lastshare.tv_sec = row->firstshare.tv_sec; - row->lastshare.tv_usec = row->firstshare.tv_usec; - row->lastdiffacc = diff; - row->complete[0] = SUMMARY_NEW; - row->complete[1] = '\0'; -} - -bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate) -{ - char buf[DATE_BUFSIZ+1]; - - copy_tv(statsdate, &(row->statsdate)); - // Start of this timeband - switch (row->summarylevel[0]) { - case SUMMARY_DB: - statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S; - statsdate->tv_usec = 0; - break; - case SUMMARY_FULL: - statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS; - statsdate->tv_usec = 0; - break; - default: - tv_to_buf(statsdate, buf, sizeof(buf)); - // Bad userstats are not fatal - LOGERR("Unknown userstats summarylevel 0x%02x '%c' " - "userid %"PRId64" workername %s statsdate %s", - row->summarylevel[0], row->summarylevel[0], - row->userid, row->workername, buf); - return false; - } - return true; -} - /* Load tables required to support auths,adduser,chkpass and newid * N.B. idcontrol is DB internal so is always ready * OptionControl is loaded first in case it is needed by other loads diff --git a/src/ckdb.h b/src/ckdb.h index 390149ef..f27fafdc 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.401" +#define CKDB_VERSION DB_VERSION"-0.402" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1151,15 +1151,6 @@ extern K_STORE *workerstatus_store; extern void logmsg(int loglevel, const char *fmt, ...); extern void tick(); extern PGconn *dbconnect(); -extern void zero_on_new_block(); -#define workerstatus_update(_auths, _shares, _userstats) \ - _workerstatus_update(_auths, _shares, _userstats, WHERE_FFL_HERE) -extern void _workerstatus_update(AUTHS *auths, SHARES *shares, - USERSTATS *userstats, WHERE_FFL_ARGS); -extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd); -extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff); -extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate); // *** // *** ckdb_data.c *** @@ -1211,10 +1202,6 @@ extern char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS); */ extern char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS); -// *** -// *** klist/ktree search/compare fucntions ckdb_data.c *** -// *** - extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS); extern void dsp_transfer(K_ITEM *item, FILE *stream); extern cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b); @@ -1240,6 +1227,11 @@ extern K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool create, const char *file2, const char *func2, const int line2, WHERE_FFL_ARGS); +extern void workerstatus_ready(); +#define workerstatus_update(_auths, _shares, _userstats) \ + _workerstatus_update(_auths, _shares, _userstats, WHERE_FFL_HERE) +extern void _workerstatus_update(AUTHS *auths, SHARES *shares, + USERSTATS *userstats, WHERE_FFL_ARGS); extern cmp_t cmp_users(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userid(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_users(char *username); @@ -1278,13 +1270,18 @@ extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b); extern void dsp_sharesummary(K_ITEM *item, FILE *stream); extern cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b); +extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff); extern K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid); +extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, + char *by, char *code, char *inet, tv_t *cd); extern void dsp_hash(char *hash, char *buf, size_t siz); extern void dsp_blocks(K_ITEM *item, FILE *stream); extern cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_blocks(int32_t height, char *blockhash); extern K_ITEM *find_prev_blocks(int32_t height); extern const char *blocks_confirmed(char *confirmed); +extern void zero_on_new_block(); +extern void set_block_share_counters(); extern cmp_t cmp_miningpayouts(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_auths(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_poolstats(K_ITEM *a, K_ITEM *b); @@ -1293,6 +1290,7 @@ extern cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b); +extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate); // *** // *** PostgreSQL functions ckdb_dbio.c diff --git a/src/ckdb_data.c b/src/ckdb_data.c index cb04f5fa..5c0ae560 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -319,17 +319,6 @@ char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS) return _data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz, WHERE_FFL_PASS); } -// TODO: do this better ... :) -void dsp_hash(char *hash, char *buf, size_t siz) -{ - char *ptr; - - ptr = hash + strlen(hash) - (siz - 1) - 8; - if (ptr < hash) - ptr = hash; - STRNCPYSIZ(buf, ptr, siz); -} - // For mutiple variable function calls that need the data char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS) { @@ -574,6 +563,149 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, return item; } +/* All data is loaded, now update workerstatus fields + TODO: combine set_block_share_counters() with this? */ +void workerstatus_ready() +{ + K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1]; + K_ITEM *ws_item, us_look, ss_look, *us_item, *ss_item; + USERSTATS lookuserstats, *userstats; + SHARESUMMARY looksharesummary, *sharesummary; + WORKERSTATUS *workerstatus; + + INIT_USERSTATS(&us_look); + INIT_SHARESUMMARY(&ss_look); + ws_item = first_in_ktree(workerstatus_root, ws_ctx); + while (ws_item) { + DATA_WORKERSTATUS(workerstatus, ws_item); + lookuserstats.userid = workerstatus->userid; + STRNCPY(lookuserstats.workername, workerstatus->workername); + lookuserstats.statsdate.tv_sec = date_eot.tv_sec; + lookuserstats.statsdate.tv_usec = date_eot.tv_usec; + us_look.data = (void *)(&lookuserstats); + us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look, + cmp_userstats_workerstatus, us_ctx); + if (us_item) { + DATA_USERSTATS(userstats, us_item); + if (userstats->idle) { + if (tv_newer(&(workerstatus->last_idle), + &(userstats->statsdate))) { + copy_tv(&(workerstatus->last_idle), + &(userstats->statsdate)); + } + } else { + if (tv_newer(&(workerstatus->last_stats), + &(userstats->statsdate))) { + copy_tv(&(workerstatus->last_stats), + &(userstats->statsdate)); + } + } + } + + looksharesummary.userid = workerstatus->userid; + STRNCPY(looksharesummary.workername, workerstatus->workername); + looksharesummary.workinfoid = MAXID; + ss_look.data = (void *)(&looksharesummary); + K_RLOCK(sharesummary_free); + ss_item = find_before_in_ktree(sharesummary_root, &ss_look, + cmp_sharesummary, ss_ctx); + K_RUNLOCK(sharesummary_free); + if (ss_item) { + DATA_SHARESUMMARY(sharesummary, ss_item); + if (tv_newer(&(workerstatus->last_share), + &(sharesummary->lastshare))) { + copy_tv(&(workerstatus->last_share), + &(sharesummary->lastshare)); + workerstatus->last_diff = + sharesummary->lastdiffacc; + } + } + + ws_item = next_in_ktree(ws_ctx); + } +} + +void _workerstatus_update(AUTHS *auths, SHARES *shares, + USERSTATS *userstats, WHERE_FFL_ARGS) +{ + WORKERSTATUS *row; + K_ITEM *item; + + if (auths) { + item = find_workerstatus(auths->userid, auths->workername, + file, func, line); + if (item) { + DATA_WORKERSTATUS(row, item); + if (tv_newer(&(row->last_auth), &(auths->createdate))) + copy_tv(&(row->last_auth), &(auths->createdate)); + } + } + + if (startup_complete && shares) { + if (shares->errn == SE_NONE) { + pool.diffacc += shares->diff; + pool.shareacc++; + } else { + pool.diffinv += shares->diff; + pool.shareinv++; + } + item = find_workerstatus(shares->userid, shares->workername, + file, func, line); + if (item) { + DATA_WORKERSTATUS(row, item); + if (tv_newer(&(row->last_share), &(shares->createdate))) { + copy_tv(&(row->last_share), &(shares->createdate)); + row->last_diff = shares->diff; + } + switch (shares->errn) { + case SE_NONE: + row->diffacc += shares->diff; + row->shareacc++; + break; + case SE_STALE: + row->diffinv += shares->diff; + row->shareinv++; + row->diffsta += shares->diff; + row->sharesta++; + break; + case SE_DUPE: + row->diffinv += shares->diff; + row->shareinv++; + row->diffdup += shares->diff; + row->sharedup++; + break; + case SE_HIGH_DIFF: + row->diffinv += shares->diff; + row->shareinv++; + row->diffhi += shares->diff; + row->sharehi++; + break; + default: + row->diffinv += shares->diff; + row->shareinv++; + row->diffrej += shares->diff; + row->sharerej++; + break; + } + } + } + + if (startup_complete && userstats) { + item = find_workerstatus(userstats->userid, userstats->workername, + file, func, line); + if (item) { + DATA_WORKERSTATUS(row, item); + if (userstats->idle) { + if (tv_newer(&(row->last_idle), &(userstats->statsdate))) + copy_tv(&(row->last_idle), &(userstats->statsdate)); + } else { + if (tv_newer(&(row->last_stats), &(userstats->statsdate))) + copy_tv(&(row->last_stats), &(userstats->statsdate)); + } + } + } +} + // default tree order by username asc,expirydate desc cmp_t cmp_users(K_ITEM *a, K_ITEM *b) { @@ -1310,6 +1442,22 @@ cmp_t cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b) return c; } +void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff) +{ + row->diffacc = row->diffsta = row->diffdup = row->diffhi = + row->diffrej = row->shareacc = row->sharesta = row->sharedup = + row->sharehi = row->sharerej = 0.0; + row->sharecount = row->errorcount = row->countlastupdate = 0; + row->reset = false; + row->firstshare.tv_sec = cd->tv_sec; + row->firstshare.tv_usec = cd->tv_usec; + row->lastshare.tv_sec = row->firstshare.tv_sec; + row->lastshare.tv_usec = row->firstshare.tv_usec; + row->lastdiffacc = diff; + row->complete[0] = SUMMARY_NEW; + row->complete[1] = '\0'; +} + K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid) { SHARESUMMARY sharesummary; @@ -1325,6 +1473,162 @@ K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid) return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx); } +void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, + char *by, char *code, char *inet, tv_t *cd) +{ + static int64_t last_attempted_id = -1; + static int64_t prev_found = 0; + static int repeat; + + char min_buf[DATE_BUFSIZ], max_buf[DATE_BUFSIZ]; + int64_t ss_count_tot, s_count_tot, s_diff_tot; + int64_t ss_count, s_count, s_diff; + tv_t ss_first_min, ss_last_max; + tv_t ss_first, ss_last; + int32_t wid_count; + SHARESUMMARY looksharesummary, *sharesummary; + K_TREE_CTX ctx[1]; + K_ITEM look, *ss_item; + int64_t age_id, do_id, to_id; + bool ok, found; + + LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); + + age_id = prev_found; + + // Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found + looksharesummary.workinfoid = prev_found; + looksharesummary.userid = -1; + looksharesummary.workername[0] = '\0'; + INIT_SHARESUMMARY(&look); + look.data = (void *)(&looksharesummary); + + K_RLOCK(sharesummary_free); + ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, + cmp_sharesummary_workinfoid, ctx); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + + ss_first_min.tv_sec = ss_first_min.tv_usec = + ss_last_max.tv_sec = ss_last_max.tv_usec = 0; + ss_count_tot = s_count_tot = s_diff_tot = 0; + + found = false; + while (ss_item && sharesummary->workinfoid < workinfoid) { + if (sharesummary->complete[0] == SUMMARY_NEW) { + age_id = sharesummary->workinfoid; + prev_found = age_id; + found = true; + break; + } + ss_item = next_in_ktree(ctx); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + } + K_RUNLOCK(sharesummary_free); + + LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); + // Don't repeat searching old items to avoid accessing their ram + if (!found) + prev_found = workinfoid; + else { + /* Process all the consecutive sharesummaries that's aren't aged + * This way we find each oldest 'batch' of sharesummaries that have + * been missed and can report the range of data that was aged, + * which would normally just be an approx 10min set of workinfoids + * from the last time ckpool stopped + * Each next group of unaged sharesummaries following this, will be + * picked up by each next aging */ + wid_count = 0; + do_id = age_id; + to_id = 0; + do { + ok = workinfo_age(conn, do_id, poolinstance, + by, code, inet, cd, + &ss_first, &ss_last, + &ss_count, &s_count, &s_diff); + + ss_count_tot += ss_count; + s_count_tot += s_count; + s_diff_tot += s_diff; + if (ss_first_min.tv_sec == 0 || !tv_newer(&ss_first_min, &ss_first)) + copy_tv(&ss_first_min, &ss_first); + if (tv_newer(&ss_last_max, &ss_last)) + copy_tv(&ss_last_max, &ss_last); + + if (!ok) + break; + + to_id = do_id; + wid_count++; + K_RLOCK(sharesummary_free); + while (ss_item && sharesummary->workinfoid == to_id) { + ss_item = next_in_ktree(ctx); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + } + K_RUNLOCK(sharesummary_free); + + if (ss_item) { + do_id = sharesummary->workinfoid; + if (do_id >= workinfoid) + break; + if (sharesummary->complete[0] != SUMMARY_NEW) + break; + } + } while (ss_item); + if (to_id == 0) { + if (last_attempted_id != age_id || ++repeat >= 10) { + // Approx once every 5min since workinfo defaults to ~30s + LOGWARNING("%s() Auto-age failed to age %"PRId64, + __func__, age_id); + last_attempted_id = age_id; + repeat = 0; + } + } else { + char idrange[64]; + char sharerange[256]; + if (to_id != age_id) { + snprintf(idrange, sizeof(idrange), + "from %"PRId64" to %"PRId64, + age_id, to_id); + } else { + snprintf(idrange, sizeof(idrange), + "%"PRId64, age_id); + } + tv_to_buf(&ss_first_min, min_buf, sizeof(min_buf)); + if (tv_equal(&ss_first_min, &ss_last_max)) { + snprintf(sharerange, sizeof(sharerange), + "share date %s", min_buf); + } else { + tv_to_buf(&ss_last_max, max_buf, sizeof(max_buf)); + snprintf(sharerange, sizeof(sharerange), + "share dates %s to %s", + min_buf, max_buf); + } + LOGWARNING("%s() Auto-aged %"PRId64"(%"PRId64") " + "share%s %d sharesummar%s %d workinfoid%s " + "%s %s", + __func__, + s_count_tot, s_diff_tot, + (s_count_tot == 1) ? "" : "s", + ss_count_tot, + (ss_count_tot == 1) ? "y" : "ies", + wid_count, + (wid_count == 1) ? "" : "s", + idrange, sharerange); + } + } +} + +// TODO: do this better ... :) +void dsp_hash(char *hash, char *buf, size_t siz) +{ + char *ptr; + + ptr = hash + strlen(hash) - (siz - 1) - 8; + if (ptr < hash) + ptr = hash; + STRNCPYSIZ(buf, ptr, siz); +} + void dsp_blocks(K_ITEM *item, FILE *stream) { char createdate_buf[DATE_BUFSIZ], expirydate_buf[DATE_BUFSIZ]; @@ -1425,6 +1729,109 @@ const char *blocks_confirmed(char *confirmed) return blocks_unknown; } +void zero_on_new_block() +{ + WORKERSTATUS *workerstatus; + K_TREE_CTX ctx[1]; + K_ITEM *ws_item; + + K_WLOCK(workerstatus_free); + pool.diffacc = pool.diffinv = pool.shareacc = + pool.shareinv = pool.best_sdiff = 0; + ws_item = first_in_ktree(workerstatus_root, ctx); + while (ws_item) { + DATA_WORKERSTATUS(workerstatus, ws_item); + workerstatus->diffacc = workerstatus->diffinv = + workerstatus->diffsta = workerstatus->diffdup = + workerstatus->diffhi = workerstatus->diffrej = + workerstatus->shareacc = workerstatus->shareinv = + workerstatus->sharesta = workerstatus->sharedup = + workerstatus->sharehi = workerstatus->sharerej = 0.0; + ws_item = next_in_ktree(ctx); + } + K_WUNLOCK(workerstatus_free); + +} + +/* Currently only used at the end of the startup + * Will need to add locking if it's used, later, after startup completes */ +void set_block_share_counters() +{ + K_TREE_CTX ctx[1]; + K_ITEM *ss_item, ss_look, *ws_item; + WORKERSTATUS *workerstatus; + SHARESUMMARY *sharesummary, looksharesummary; + + INIT_SHARESUMMARY(&ss_look); + + zero_on_new_block(); + + ws_item = NULL; + /* From the end backwards so we can skip the workinfoid's we don't + * want by jumping back to just before the current worker when the + * workinfoid goes below the limit */ + K_RLOCK(sharesummary_free); + ss_item = last_in_ktree(sharesummary_root, ctx); + while (ss_item) { + DATA_SHARESUMMARY(sharesummary, ss_item); + if (sharesummary->workinfoid < pool.workinfoid) { + // Skip back to the next worker + looksharesummary.userid = sharesummary->userid; + STRNCPY(looksharesummary.workername, + sharesummary->workername); + looksharesummary.workinfoid = -1; + ss_look.data = (void *)(&looksharesummary); + ss_item = find_before_in_ktree(sharesummary_root, &ss_look, + cmp_sharesummary, ctx); + continue; + } + + /* Check for user/workername change for new workerstatus + * The tree has user/workername grouped together in order + * so this will only be once per user/workername */ + if (!ws_item || + sharesummary->userid != workerstatus->userid || + strcmp(sharesummary->workername, workerstatus->workername)) { + /* This is to trigger a console error if it is missing + * since it should always exist + * However, it is simplest to simply create it + * and keep going */ + K_RUNLOCK(sharesummary_free); + ws_item = find_workerstatus(sharesummary->userid, + sharesummary->workername, + __FILE__, __func__, __LINE__); + if (!ws_item) { + ws_item = find_create_workerstatus(sharesummary->userid, + sharesummary->workername, + __FILE__, __func__, __LINE__); + } + K_RLOCK(sharesummary_free); + DATA_WORKERSTATUS(workerstatus, ws_item); + } + + pool.diffacc += sharesummary->diffacc; + pool.diffinv += sharesummary->diffsta + sharesummary->diffdup + + sharesummary->diffhi + sharesummary->diffrej; + workerstatus->diffacc += sharesummary->diffacc; + workerstatus->diffinv += sharesummary->diffsta + sharesummary->diffdup + + sharesummary->diffhi + sharesummary->diffrej; + workerstatus->diffsta += sharesummary->diffsta; + workerstatus->diffdup += sharesummary->diffdup; + workerstatus->diffhi += sharesummary->diffhi; + workerstatus->diffrej += sharesummary->diffrej; + workerstatus->shareacc += sharesummary->shareacc; + workerstatus->shareinv += sharesummary->sharesta + sharesummary->sharedup + + sharesummary->sharehi + sharesummary->sharerej; + workerstatus->sharesta += sharesummary->sharesta; + workerstatus->sharedup += sharesummary->sharedup; + workerstatus->sharehi += sharesummary->sharehi; + workerstatus->sharerej += sharesummary->sharerej; + + ss_item = prev_in_ktree(ctx); + } + K_RUNLOCK(sharesummary_free); +} + /* order by height asc,userid asc,expirydate asc * i.e. only one payout amount per block per user */ cmp_t cmp_miningpayouts(K_ITEM *a, K_ITEM *b) @@ -1561,3 +1968,30 @@ cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b) } return c; } + +bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate) +{ + char buf[DATE_BUFSIZ+1]; + + copy_tv(statsdate, &(row->statsdate)); + // Start of this timeband + switch (row->summarylevel[0]) { + case SUMMARY_DB: + statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S; + statsdate->tv_usec = 0; + break; + case SUMMARY_FULL: + statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS; + statsdate->tv_usec = 0; + break; + default: + tv_to_buf(statsdate, buf, sizeof(buf)); + // Bad userstats are not fatal + LOGERR("Unknown userstats summarylevel 0x%02x '%c' " + "userid %"PRId64" workername %s statsdate %s", + row->summarylevel[0], row->summarylevel[0], + row->userid, row->workername, buf); + return false; + } + return true; +} diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 1e4cd8c8..b4ad1773 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -304,87 +304,6 @@ cleanup: return lastid; } -void _workerstatus_update(AUTHS *auths, SHARES *shares, - USERSTATS *userstats, WHERE_FFL_ARGS) -{ - WORKERSTATUS *row; - K_ITEM *item; - - if (auths) { - item = find_workerstatus(auths->userid, auths->workername, - file, func, line); - if (item) { - DATA_WORKERSTATUS(row, item); - if (tv_newer(&(row->last_auth), &(auths->createdate))) - copy_tv(&(row->last_auth), &(auths->createdate)); - } - } - - if (startup_complete && shares) { - if (shares->errn == SE_NONE) { - pool.diffacc += shares->diff; - pool.shareacc++; - } else { - pool.diffinv += shares->diff; - pool.shareinv++; - } - item = find_workerstatus(shares->userid, shares->workername, - file, func, line); - if (item) { - DATA_WORKERSTATUS(row, item); - if (tv_newer(&(row->last_share), &(shares->createdate))) { - copy_tv(&(row->last_share), &(shares->createdate)); - row->last_diff = shares->diff; - } - switch (shares->errn) { - case SE_NONE: - row->diffacc += shares->diff; - row->shareacc++; - break; - case SE_STALE: - row->diffinv += shares->diff; - row->shareinv++; - row->diffsta += shares->diff; - row->sharesta++; - break; - case SE_DUPE: - row->diffinv += shares->diff; - row->shareinv++; - row->diffdup += shares->diff; - row->sharedup++; - break; - case SE_HIGH_DIFF: - row->diffinv += shares->diff; - row->shareinv++; - row->diffhi += shares->diff; - row->sharehi++; - break; - default: - row->diffinv += shares->diff; - row->shareinv++; - row->diffrej += shares->diff; - row->sharerej++; - break; - } - } - } - - if (startup_complete && userstats) { - item = find_workerstatus(userstats->userid, userstats->workername, - file, func, line); - if (item) { - DATA_WORKERSTATUS(row, item); - if (userstats->idle) { - if (tv_newer(&(row->last_idle), &(userstats->statsdate))) - copy_tv(&(row->last_idle), &(userstats->statsdate)); - } else { - if (tv_newer(&(row->last_stats), &(userstats->statsdate))) - copy_tv(&(row->last_stats), &(userstats->statsdate)); - } - } - } -} - bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash, char *newhash, char *email, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root)