From fcfe965ba3af9254da565cf553c7243ec191fa0e Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 12 May 2015 09:20:38 +1000 Subject: [PATCH] ckdb - don't store sharesummaries in the DB and delay mark processing until sync has completed --- src/ckdb.c | 73 +++---- src/ckdb.h | 43 ++--- src/ckdb_cmd.c | 108 ++++++----- src/ckdb_data.c | 38 ++-- src/ckdb_dbio.c | 497 +++++------------------------------------------- 5 files changed, 163 insertions(+), 596 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index d3ee6728..bd66ef31 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -268,7 +268,7 @@ bool dbload_only_sharesummary = false; * markersummaries and pplns payouts may not be correct */ bool sharesummary_marks_limit = false; -// DB users,workers load is complete +// DB optioncontrol,users,workers,useratts load is complete bool db_users_complete = false; // DB load is complete bool db_load_complete = false; @@ -758,7 +758,9 @@ static bool getdata1() goto matane; if (!(ok = users_fill(conn))) goto matane; - ok = workers_fill(conn); + if (!(ok = workers_fill(conn))) + goto matane; + ok = useratts_fill(conn); matane: @@ -796,21 +798,15 @@ static bool getdata3() } if (!(ok = workinfo_fill(conn)) || everyone_die) goto sukamudai; - /* marks must be loaded before sharesummary - * since sharesummary looks at the marks data */ if (!(ok = marks_fill(conn)) || everyone_die) goto sukamudai; + /* must be after workinfo */ if (!(ok = workmarkers_fill(conn)) || everyone_die) goto sukamudai; if (!(ok = markersummary_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = sharesummary_fill(conn)) || everyone_die) - goto sukamudai; - if (!confirm_sharesummary) { - if (!(ok = useratts_fill(conn)) || everyone_die) - goto sukamudai; + if (!confirm_sharesummary) ok = poolstats_fill(conn); - } sukamudai: @@ -828,32 +824,25 @@ static bool reload() char *reason; FILE *fp; - tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); - LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); - tv_to_buf(&(dbstatus.newest_sharesummary_firstshare_ay), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB complete sharesummary", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_workmarker_workinfo), + buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB workmarker wid %"PRId64, + __func__, buf, + dbstatus.newest_workmarker_workinfoid); tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB workinfo", __func__, buf); + LOGWARNING("%s(): %s newest DB workinfo wid %"PRId64, + __func__, buf, dbstatus.newest_workinfoid); tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); + LOGWARNING("%s(): %s newest DB poolstats (ignored)", __func__, buf); tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf)); LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf); - if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec) - copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.oldest_sharesummary_firstshare_n)); - else - copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.newest_sharesummary_firstshare_ay)); - - copy_tv(&start, &(dbstatus.sharesummary_firstshare)); - reason = "sharesummary"; + copy_tv(&start, &(dbstatus.newest_createdate_workmarker_workinfo)); + reason = "workmarkers"; if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) { copy_tv(&start, &(dbstatus.newest_createdate_workinfo)); reason = "workinfo"; } - if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) { - copy_tv(&start, &(dbstatus.newest_createdate_poolstats)); - reason = "poolstats"; - } tv_to_buf(&start, buf, sizeof(buf)); LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason); @@ -901,15 +890,18 @@ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid) fclose(fp); if (ret == 1 && !(kill(oldpid, 0))) { if (!ckp->killold) { - LOGEMERG("Process %s pid %d still exists, start ckpool with -k if you wish to kill it", + LOGEMERG("Process %s pid %d still exists, start" + " ckpool with -k if you wish to kill it", path, oldpid); return false; } if (kill(oldpid, 9)) { - LOGEMERG("Unable to kill old process %s pid %d", path, oldpid); + LOGEMERG("Unable to kill old process %s pid %d", + path, oldpid); return false; } - LOGWARNING("Killing off old process %s pid %d", path, oldpid); + LOGWARNING("Killing off old process %s pid %d", + path, oldpid); } } fp = fopen(path, "we"); @@ -3076,7 +3068,7 @@ static void *summariser(__maybe_unused void *arg) rename_proc("db_summariser"); - while (!everyone_die && !startup_complete) + while (!everyone_die && !reload_queue_complete) cksleep_ms(42); summariser_using_data = true; @@ -3568,7 +3560,7 @@ static void *marker(__maybe_unused void *arg) rename_proc("db_marker"); - while (!everyone_die && !startup_complete) + while (!everyone_die && !reload_queue_complete) cksleep_ms(42); if (sharesummary_marks_limit) { @@ -3579,16 +3571,6 @@ static void *marker(__maybe_unused void *arg) marker_using_data = true; -/* TODO: trigger this every workinfo change? - * note that history catch up would also mean the tigger would - * catch up at most 100 missing marks per shift - * however, also, a workinfo change means a sharesummary DB update, - * so would be best to (usually) wait until that is done - * OR: avoid writing the sharesummaries to the DB at all - * and only write the markersummaries? - since 100 workinfoid shifts - * will usually mean that markersummaries are less than every hour - * (and a reload processes more than an hour) */ - while (!everyone_die) { for (i = 0; i < 5; i++) { if (!everyone_die) @@ -4439,6 +4421,7 @@ static bool reload_from(tv_t *start) reloading = true; copy_tv(&reload_timestamp, start); + // Go back further - one reload file reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S; tv_to_buf(start, buf, sizeof(buf)); @@ -4790,6 +4773,7 @@ static void *listener(void *arg) return NULL; } +#if 0 /* TODO: This will be way faster traversing both trees simultaneously * rather than traversing one and searching the other, then repeating * in reverse. Will change it later */ @@ -4893,6 +4877,7 @@ static void compare_summaries(K_TREE *leftsum, char *leftname, diff_first, diff_last, cd_buf1, cd_buf2); } } +#endif /* TODO: have a seperate option to find/store missing workinfo/shares/etc * from the reload files, in a supplied UTC time range @@ -4905,6 +4890,9 @@ static void compare_summaries(K_TREE *leftsum, char *leftname, * and the payment is now wrong */ static void confirm_reload() { +#if 0 + TODO: redo this using workmarkers + K_TREE *sharesummary_workinfoid_save; __maybe_unused K_TREE *sharesummary_save; __maybe_unused K_TREE *workinfo_save; @@ -5219,6 +5207,7 @@ static void confirm_reload() compare_summaries(sharesummary_workinfoid_root, "ReLoad", sharesummary_workinfoid_save, "DB", true, false); +#endif } // TODO: handle workmarkers/markersummaries diff --git a/src/ckdb.h b/src/ckdb.h index be133eae..9395b2b3 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.094" +#define CKDB_VERSION DB_VERSION"-1.100" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -125,20 +125,12 @@ extern const char *addrpatt; #define MAX_PAYADDR '~' typedef struct loadstatus { - tv_t oldest_sharesummary_firstshare_n; - tv_t newest_sharesummary_firstshare_a; - tv_t newest_sharesummary_firstshare_ay; - tv_t sharesummary_firstshare; // whichever of above 2 used - tv_t oldest_sharesummary_firstshare_a; - tv_t newest_sharesummary_firstshare_y; + int64_t newest_workmarker_workinfoid; + int64_t newest_workinfoid; + tv_t newest_createdate_workmarker_workinfo; tv_t newest_createdate_workinfo; tv_t newest_createdate_poolstats; - tv_t newest_starttimeband_userstats; tv_t newest_createdate_blocks; - int64_t oldest_workinfoid_n; // of oldest firstshare sharesummary n - int64_t oldest_workinfoid_a; // of oldest firstshare sharesummary a - int64_t newest_workinfoid_a; // of newest firstshare sharesummary a - int64_t newest_workinfoid_y; // of newest firstshare sharesummary y } LOADSTATUS; extern LOADSTATUS dbstatus; @@ -1366,10 +1358,6 @@ typedef struct sharesummary { double sharerej; int64_t sharecount; int64_t errorcount; - int64_t countlastupdate; // non-DB field - bool inserted; // non-DB field - bool saveaged; // non-DB field - bool reset; // non-DB field tv_t firstshare; tv_t lastshare; double lastdiffacc; @@ -2131,10 +2119,10 @@ extern cmp_t _cmp_height(char *coinbase1a, char *coinbase1b, WHERE_FFL_ARGS); extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); -extern bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd, - tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, - int64_t *s_count, int64_t *s_diff); +extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, + char *code, char *inet, tv_t *cd, tv_t *ss_first, + tv_t *ss_last, int64_t *ss_count, int64_t *s_count, + int64_t *s_diff); extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b); extern void dsp_sharesummary(K_ITEM *item, FILE *stream); @@ -2152,8 +2140,8 @@ extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff); extern K_ITEM *_find_sharesummary(int64_t userid, char *workername, int64_t workinfoid, bool pool); extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername); -extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd); +extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, + char *code, char *inet, tv_t *cd); #define dbhash2btchash(_hash, _buf, _siz) \ _dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE) void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS); @@ -2245,6 +2233,8 @@ extern void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock); (_res) == PGRES_TUPLES_OK || \ (_res) == PGRES_EMPTY_QUERY) +#define SQL_UNIQUE_VIOLATION "23505" + #define CKPQ_READ true #define CKPQ_WRITE false @@ -2348,11 +2338,12 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); extern char *ooo_status(char *buf, size_t siz); -#define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ - _sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ +#define sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ + _sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ WHERE_FFL_HERE) -extern bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, - char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS); +extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, + char *by, char *code, char *inet, tv_t *cd, + WHERE_FFL_ARGS); extern bool sharesummary_fill(PGconn *conn); extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, double diffacc, double diffinv, double shareacc, diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 0ec64f35..9d83bd24 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -683,7 +683,8 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, by, code, inet, cd, igndup, trf_root); if (!ok) { - LOGERR("%s() %s.failed.DBE", __func__, id); + if (!igndup) + LOGERR("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } LOGDEBUG("%s.ok.", id); @@ -698,13 +699,11 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, { bool igndup = false; - // confirm_summaries() doesn't call this - if (reloading) { - if (tv_equal(cd, &(dbstatus.newest_createdate_blocks))) - igndup = true; - else if (tv_newer(cd, &(dbstatus.newest_createdate_blocks))) - return NULL; - } + /* confirm_summaries() doesn't call this + * We don't care about dups during reload since poolstats_fill() + * doesn't load all the data */ + if (reloading) + igndup = true; return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } @@ -1965,20 +1964,18 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, K_ITEM *i_ntime, *i_reward; bool igndup = false; - if (reloading && !confirm_sharesummary) { - if (tv_equal(cd, &(dbstatus.newest_createdate_workinfo))) - igndup = true; - else if (tv_newer(cd, &(dbstatus.newest_createdate_workinfo))) - return NULL; - } - i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - if (confirm_sharesummary) { - TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + if (reloading && !confirm_sharesummary) { + if (workinfoid <= dbstatus.newest_workinfoid) + igndup = true; + } + + if (confirm_sharesummary) { if (workinfoid < confirm_first_workinfoid || workinfoid > confirm_last_workinfoid) goto wiconf; @@ -2056,12 +2053,6 @@ wiconf: K_ITEM *i_secondaryuserid; bool ok; - // This just excludes the shares we certainly don't need - if (reloading && !confirm_sharesummary) { - if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) - return NULL; - } - i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); if (!i_nonce) return strdup(reply); @@ -2070,9 +2061,28 @@ wiconf: if (!i_workinfoid) return strdup(reply); - if (confirm_sharesummary) { - TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + if (reloading && !confirm_sharesummary) { + /* ISDR (Ignored shares during reload) + * This will discard any shares older than the newest + * workinfoidend of any workmarker - including ready + * but not processed workmarkers + * This means that if a workmarker needs re-processing + * and all of it's shares need to be redone, that will + * require a seperate procedure to the reload + * This would be the (as yet non-existant) + * confirm_markersummary which will replace the + * now unusable confirm_sharesummary code + * However, if the workmarker simply just needs to be + * flagged as processed, this avoids the problem of + * duplicating shares before flagging it + */ + if (workinfoid <= dbstatus.newest_workmarker_workinfoid) + return NULL; + } + + if (confirm_sharesummary) { if (workinfoid < confirm_first_workinfoid || workinfoid > confirm_last_workinfoid) goto sconf; @@ -2151,12 +2161,6 @@ sconf: K_ITEM *i_error, *i_secondaryuserid; bool ok; - // This just excludes the shareerrors we certainly don't need - if (reloading && !confirm_sharesummary) { - if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) - return NULL; - } - i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); @@ -2165,6 +2169,13 @@ sconf: if (!i_workinfoid) return strdup(reply); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + + if (reloading && !confirm_sharesummary) { + // See comment 'ISDR' above for shares + if (workinfoid <= dbstatus.newest_workmarker_workinfoid) + return NULL; + } if (confirm_sharesummary) { TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); @@ -2216,15 +2227,18 @@ seconf: tv_t ss_first, ss_last; bool ok; - if (reloading && !confirm_sharesummary) { - if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) - return NULL; - } - i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + + if (reloading && !confirm_sharesummary) { + // This excludes any already summarised + if (workinfoid <= dbstatus.newest_workmarker_workinfoid) + return NULL; + } + if (confirm_sharesummary) { TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); @@ -2237,27 +2251,19 @@ seconf: if (!i_poolinstance) return strdup(reply); - TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); - - ok = workinfo_age(conn, workinfoid, - transfer_data(i_poolinstance), - by, code, inet, cd, - &ss_first, &ss_last, - &ss_count, &s_count, &s_diff); - + ok = workinfo_age(workinfoid, transfer_data(i_poolinstance), + by, code, inet, cd, &ss_first, &ss_last, + &ss_count, &s_count, &s_diff); if (!ok) { LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); } else { - /* Don't slow down the reload - do them later - * N.B. this means if you abort/terminate the reload, - * next restart will again go back to the oldest - * unaged sharesummary due to a pool terminate */ + /* Don't slow down the reload - do them later */ if (!reloading) { // Aging is a queued item thus the reply is ignored - auto_age_older(conn, workinfoid, - transfer_data(i_poolinstance), - by, code, inet, cd); + auto_age_older(workinfoid, + transfer_data(i_poolinstance), + by, code, inet, cd); } } LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index eedaff9b..e4e620cb 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -1764,10 +1764,10 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx) return item; } -bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd, - tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, - int64_t *s_count, int64_t *s_diff) +// Duplicates during a reload are set to not show messages +bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, + char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last, + int64_t *ss_count, int64_t *s_count, int64_t *s_diff) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item; K_ITEM *wm_item, *tmp_item; @@ -1778,7 +1778,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, SHARESUMMARY looksharesummary, *sharesummary; WORKINFO *workinfo; SHARES lookshares, *shares; - bool ok = false, conned = false, skipupdate; + bool ok = false, skipupdate; char error[1024]; LOGDEBUG("%s(): age", __func__); @@ -1857,14 +1857,9 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, } if (!skipupdate) { - if (conn == NULL && !confirm_sharesummary) { - conn = dbconnect(); - conned = true; - } - - if (!sharesummary_update(conn, NULL, NULL, ss_item, by, code, inet, cd)) { + if (!sharesummary_update(NULL, NULL, ss_item, by, code, inet, cd)) { ss_failed++; - LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64, + LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64, __func__, sharesummary->userid, sharesummary->workername, sharesummary->workinfoid); @@ -1929,9 +1924,6 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, LOGERR("%s(): %s", __func__, error); } - if (conned) - PQfinish(conn); - if (ss_already || ss_failed || shares_dumped) { /* If all were already aged, and no shares * then we don't want a message */ @@ -2046,8 +2038,7 @@ void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff) row->diffacc = row->diffsta = row->diffdup = row->diffhi = row->diffrej = row->shareacc = row->sharesta = row->sharedup = row->sharehi = row->sharerej = 0.0; - row->sharecount = row->errorcount = row->countlastupdate = 0; - row->reset = false; + row->sharecount = row->errorcount = 0; row->firstshare.tv_sec = cd->tv_sec; row->firstshare.tv_usec = cd->tv_usec; row->lastshare.tv_sec = row->firstshare.tv_sec; @@ -2102,8 +2093,8 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername) /* TODO: markersummary checking? * However, there should be no issues since the sharesummaries are removed */ -void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, - char *by, char *code, char *inet, tv_t *cd) +void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, + char *code, char *inet, tv_t *cd) { static int64_t last_attempted_id = -1; static int64_t prev_found = 0; @@ -2170,10 +2161,9 @@ void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, do_id = age_id; to_id = 0; do { - ok = workinfo_age(conn, do_id, poolinstance, - by, code, inet, cd, - &ss_first, &ss_last, - &ss_count, &s_count, &s_diff); + ok = workinfo_age(do_id, poolinstance, by, code, inet, + cd, &ss_first, &ss_last, &ss_count, + &s_count, &s_diff); ss_count_tot += ss_count; s_count_tot += s_count; @@ -4611,5 +4601,5 @@ void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock) } else row->orphans++; if (lock) - K_WLOCK(userinfo_free); + K_WUNLOCK(userinfo_free); } diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 8f119e93..3a251aa4 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -2760,8 +2760,10 @@ bool workinfo_fill(PGconn *conn) workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height); k_add_head(workinfo_store, item); - if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) + if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) { copy_tv(&(dbstatus.newest_createdate_workinfo), &(row->createdate)); + dbstatus.newest_workinfoid = row->workinfoid; + } tick(); } @@ -2834,16 +2836,6 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) // Reloading a share already summarised return true; } - - if (!sharesummary->reset) { - _userinfo_update(NULL, sharesummary, NULL, - true, true); - - zero_sharesummary(sharesummary, - &(shares->createdate), - shares->diff); - sharesummary->reset = true; - } } } @@ -2852,7 +2844,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) userinfo_update(shares, NULL, NULL); } - sharesummary_update(conn, shares, NULL, NULL, shares->createby, + sharesummary_update(shares, NULL, NULL, shares->createby, shares->createcode, shares->createinet, &(shares->createdate)); @@ -3147,20 +3139,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, FREENULL(st); return false; } - - if (!sharesummary->reset) { - _userinfo_update(NULL, sharesummary, NULL, - true, true); - - zero_sharesummary(sharesummary, - &(shareerrors->createdate), - 0.0); - sharesummary->reset = true; - } } } - sharesummary_update(conn, NULL, shareerrors, NULL, + sharesummary_update(NULL, shareerrors, NULL, shareerrors->createby, shareerrors->createcode, shareerrors->createinet, @@ -3768,6 +3750,8 @@ flail: return ok; } +// no longer used +#if 0 static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row) { p_row->diffacc += row->diffacc; @@ -3791,6 +3775,7 @@ static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row) p_row->lastdiffacc = row->lastdiffacc; } } +#endif static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, SHAREERRORS *e_row, bool new, @@ -3866,22 +3851,18 @@ char *ooo_status(char *buf, size_t siz) return buf; } -bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, - char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) +// No longer stored in the DB but fields are updated as before +bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, + char *by, char *code, char *inet, tv_t *cd, + WHERE_FFL_ARGS) { - ExecStatusType rescode; - PGresult *res = NULL; WORKMARKERS *wm; SHARESUMMARY *row, *p_row; K_ITEM *item, *wm_item, *p_item = NULL; - char *ins, *upd; - bool ok = false, new = false, p_new = false; - char *params[19 + MODIFYDATECOUNT]; - int n, par = 0; + bool new = false, p_new = false; int64_t userid, workinfoid; char *workername; tv_t *createdate; - bool must_update = false, conned = false; char *st = NULL, *db = NULL; char ooo_buf[256]; double tdf, tdl; @@ -3897,7 +3878,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE } item = ss_item; DATA_SHARESUMMARY(row, item); - must_update = true; row->complete[0] = SUMMARY_COMPLETE; row->complete[1] = '\0'; } else { @@ -3958,8 +3938,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE row->workername = strdup(workername); LIST_MEM_ADD(sharesummary_free, row->workername); row->workinfoid = workinfoid; - row->inserted = false; - row->saveaged = false; } // N.B. this directly updates the non-key data @@ -4018,7 +3996,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE } } - // p_items are ram only if (p_item) { DATA_SHARESUMMARY(p_row, p_item); } else { @@ -4036,161 +4013,9 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl); } - // During startup, don't save 'new' sharesummaries, to reduce DB I/O - // ... and also during normal processing - if (row->complete[0] == SUMMARY_NEW) - goto startupskip; - - if (conn == NULL && !confirm_sharesummary) { - conn = dbconnect(); - conned = true; - } - - if (new || !(row->inserted)) { - MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); - - if (!confirm_sharesummary) { - par = 0; - params[par++] = bigint_to_buf(row->userid, NULL, 0); - params[par++] = str_to_buf(row->workername, NULL, 0); - params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); - params[par++] = double_to_buf(row->diffacc, NULL, 0); - params[par++] = double_to_buf(row->diffsta, NULL, 0); - params[par++] = double_to_buf(row->diffdup, NULL, 0); - params[par++] = double_to_buf(row->diffhi, NULL, 0); - params[par++] = double_to_buf(row->diffrej, NULL, 0); - params[par++] = double_to_buf(row->shareacc, NULL, 0); - params[par++] = double_to_buf(row->sharesta, NULL, 0); - params[par++] = double_to_buf(row->sharedup, NULL, 0); - params[par++] = double_to_buf(row->sharehi, NULL, 0); - params[par++] = double_to_buf(row->sharerej, NULL, 0); - params[par++] = bigint_to_buf(row->sharecount, NULL, 0); - params[par++] = bigint_to_buf(row->errorcount, NULL, 0); - params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); - params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); - params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); - params[par++] = str_to_buf(row->complete, NULL, 0); - MODIFYDATEPARAMS(params, par, row); - PARCHK(par, params); - - ins = "insert into sharesummary " - "(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," - "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," - "sharecount,errorcount,firstshare,lastshare," - "lastdiffacc,complete" - MODIFYDATECONTROL ") values (" PQPARAM27 ")"; - - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); - goto unparam; - } - } - - row->countlastupdate = row->sharecount + row->errorcount; - row->inserted = true; - if (row->complete[0] == SUMMARY_COMPLETE) - row->saveaged = true; - } else { - bool stats_update = false; - - MODIFYUPDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); - - if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) < - (row->sharecount + row->errorcount)) - stats_update = true; - - if (must_update && row->countlastupdate < (row->sharecount + row->errorcount)) - stats_update = true; + MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); - if (stats_update) { - if (!confirm_sharesummary) { - par = 0; - params[par++] = bigint_to_buf(row->userid, NULL, 0); - params[par++] = str_to_buf(row->workername, NULL, 0); - params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); - params[par++] = double_to_buf(row->diffacc, NULL, 0); - params[par++] = double_to_buf(row->diffsta, NULL, 0); - params[par++] = double_to_buf(row->diffdup, NULL, 0); - params[par++] = double_to_buf(row->diffhi, NULL, 0); - params[par++] = double_to_buf(row->diffrej, NULL, 0); - params[par++] = double_to_buf(row->shareacc, NULL, 0); - params[par++] = double_to_buf(row->sharesta, NULL, 0); - params[par++] = double_to_buf(row->sharedup, NULL, 0); - params[par++] = double_to_buf(row->sharehi, NULL, 0); - params[par++] = double_to_buf(row->sharerej, NULL, 0); - params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); - params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); - params[par++] = bigint_to_buf(row->sharecount, NULL, 0); - params[par++] = bigint_to_buf(row->errorcount, NULL, 0); - params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); - params[par++] = str_to_buf(row->complete, NULL, 0); - MODIFYUPDATEPARAMS(params, par, row); - PARCHKVAL(par, 23, params); - - upd = "update sharesummary " - "set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8," - "shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12," - "sharerej=$13,firstshare=$14,lastshare=$15," - "sharecount=$16,errorcount=$17,lastdiffacc=$18,complete=$19" - ","MDDB"=$20,"MBYDB"=$21,"MCODEDB"=$22,"MINETDB"=$23 " - "where userid=$1 and workername=$2 and workinfoid=$3"; - - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Update", rescode, conn); - goto unparam; - } - } - row->countlastupdate = row->sharecount + row->errorcount; - if (row->complete[0] == SUMMARY_COMPLETE) - row->saveaged = true; - } else { - if (!must_update) { - ok = true; - goto late; - } else { - if (!confirm_sharesummary) { - par = 0; - params[par++] = bigint_to_buf(row->userid, NULL, 0); - params[par++] = str_to_buf(row->workername, NULL, 0); - params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); - params[par++] = str_to_buf(row->complete, NULL, 0); - MODIFYUPDATEPARAMS(params, par, row); - PARCHKVAL(par, 8, params); - - upd = "update sharesummary " - "set complete=$4,"MDDB"=$5,"MBYDB"=$6,"MCODEDB"=$7,"MINETDB"=$8 " - "where userid=$1 and workername=$2 and workinfoid=$3"; - - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("MustUpdate", rescode, conn); - goto unparam; - } - } - row->countlastupdate = row->sharecount + row->errorcount; - if (row->complete[0] == SUMMARY_COMPLETE) - row->saveaged = true; - } - } - } -startupskip: - ok = true; -unparam: - if (par) { - PQclear(res); - for (n = 0; n < par; n++) - free(params[n]); - } -late: - if (conned) - PQfinish(conn); - - // We keep the new item no matter what 'ok' is, since it will be inserted later + // Store either new item if (new || p_new) { K_WLOCK(sharesummary_free); if (new) { @@ -4209,267 +4034,7 @@ late: K_WUNLOCK(sharesummary_free); } - return ok; -} - -bool sharesummary_fill(PGconn *conn) -{ - ExecStatusType rescode; - PGresult *res; - K_TREE_CTX ctx[1]; - K_ITEM *item, *m_item, *p_item; - int n, i, par = 0, p_n; - SHARESUMMARY *row, *p_row; - MARKS *marks; - char *params[2]; - char *field; - char *sel; - int fields = 19; - bool ok; - - LOGDEBUG("%s(): select", __func__); - - /* Load needs to go back to the last marks workinfoid(+1) - * If it is later than that, we can't create markersummaries - * since some of the required data is missing - - * thus we also can't make the shift markersummaries */ - m_item = last_in_ktree(marks_root, ctx); - if (!m_item) { - if (dbload_workinfoid_start != -1) { - sharesummary_marks_limit = true; - LOGWARNING("WARNING: dbload -w start used " - "but there are no marks ..."); - } - } else { - DATA_MARKS(marks, m_item); - if (dbload_workinfoid_start > marks->workinfoid) { - sharesummary_marks_limit = true; - LOGWARNING("WARNING: dbload -w start %"PRId64 - " is after the last mark %"PRId64" ...", - dbload_workinfoid_start, - marks->workinfoid); - } - } - if (sharesummary_marks_limit) { - LOGWARNING("WARNING: ... markersummaries cannot be created " - "and pplns calculations may be wrong"); - } - - sel = "select " - "userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," - "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," - "sharecount,errorcount,firstshare,lastshare," - "lastdiffacc,complete" - MODIFYDATECONTROL - " from sharesummary where workinfoid>=$1 and workinfoid<=$2"; - par = 0; - params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); - params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0); - PARCHK(par, params); - res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); - PQclear(res); - return false; - } - - n = PQnfields(res); - if (n != (fields + MODIFYDATECOUNT)) { - LOGERR("%s(): Invalid field count - should be %d, but is %d", - __func__, fields + MODIFYDATECOUNT, n); - PQclear(res); - return false; - } - - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); - ok = true; - //K_WLOCK(sharesummary_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(sharesummary_free); - DATA_SHARESUMMARY(row, item); - bzero(row, sizeof(*row)); - - if (everyone_die) { - ok = false; - break; - } - - row->inserted = true; - - PQ_GET_FLD(res, i, "userid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("userid", field, row->userid); - - PQ_GET_FLD(res, i, "workername", field, ok); - if (!ok) - break; - TXT_TO_PTR("workername", field, row->workername); - LIST_MEM_ADD(sharesummary_free, row->workername); - - PQ_GET_FLD(res, i, "workinfoid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("workinfoid", field, row->workinfoid); - - PQ_GET_FLD(res, i, "diffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffacc", field, row->diffacc); - - PQ_GET_FLD(res, i, "diffsta", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffsta", field, row->diffsta); - - PQ_GET_FLD(res, i, "diffdup", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffdup", field, row->diffdup); - - PQ_GET_FLD(res, i, "diffhi", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffhi", field, row->diffhi); - - PQ_GET_FLD(res, i, "diffrej", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffrej", field, row->diffrej); - - PQ_GET_FLD(res, i, "shareacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("shareacc", field, row->shareacc); - - PQ_GET_FLD(res, i, "sharesta", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharesta", field, row->sharesta); - - PQ_GET_FLD(res, i, "sharedup", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharedup", field, row->sharedup); - - PQ_GET_FLD(res, i, "sharehi", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharehi", field, row->sharehi); - - PQ_GET_FLD(res, i, "sharerej", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharerej", field, row->sharerej); - - PQ_GET_FLD(res, i, "sharecount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("sharecount", field, row->sharecount); - - PQ_GET_FLD(res, i, "errorcount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("errorcount", field, row->errorcount); - - row->countlastupdate = row->sharecount + row->errorcount; - - PQ_GET_FLD(res, i, "firstshare", field, ok); - if (!ok) - break; - TXT_TO_TV("firstshare", field, row->firstshare); - - PQ_GET_FLD(res, i, "lastshare", field, ok); - if (!ok) - break; - TXT_TO_TV("lastshare", field, row->lastshare); - - PQ_GET_FLD(res, i, "lastdiffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc); - - PQ_GET_FLD(res, i, "complete", field, ok); - if (!ok) - break; - TXT_TO_STR("complete", field, row->complete); - - MODIFYDATEFLDPOINTERS(sharesummary_free, res, i, row, ok); - if (!ok) - break; - - sharesummary_root = add_to_ktree(sharesummary_root, item, cmp_sharesummary); - sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root, item, cmp_sharesummary_workinfoid); - k_add_head(sharesummary_store, item); - - // A share summary is shares in a single workinfo, at all 3 levels n,a,y - if (tolower(row->complete[0]) == SUMMARY_NEW) { - if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 || - !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) { - copy_tv(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare)); - dbstatus.oldest_workinfoid_n = row->workinfoid; - } - } else { - if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare))) - copy_tv(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare)); - if (tolower(row->complete[0]) == SUMMARY_COMPLETE) { - if (dbstatus.oldest_sharesummary_firstshare_a.tv_sec == 0 || - !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare))) { - copy_tv(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare)); - dbstatus.oldest_workinfoid_a = row->workinfoid; - } - if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare))) { - copy_tv(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare)); - dbstatus.newest_workinfoid_a = row->workinfoid; - } - } else /* SUMMARY_CONFIRM */ { - if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare))) { - copy_tv(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare)); - dbstatus.newest_workinfoid_y = row->workinfoid; - } - } - } - - p_item = find_sharesummary_p(row->workinfoid); - if (!p_item) { - p_item = k_unlink_head(sharesummary_free); - DATA_SHARESUMMARY(p_row, p_item); - bzero(p_row, sizeof(*p_row)); - POOL_SS(p_row); - LIST_MEM_ADD(sharesummary_free, p_row->workername); - p_row->workinfoid = row->workinfoid; - sharesummary_pool_root = add_to_ktree(sharesummary_pool_root, - p_item, - cmp_sharesummary); - k_add_head(sharesummary_pool_store, p_item); - } else { - DATA_SHARESUMMARY(p_row, p_item); - } - - sharesummary_to_pool(p_row, row); - - _userinfo_update(NULL, row, NULL, false, false); - - tick(); - } - if (!ok) { - FREENULL(row->workername); - k_add_head(sharesummary_free, item); - } - - p_n = sharesummary_pool_store->count; - //K_WUNLOCK(sharesummary_free); - PQclear(res); - - if (ok) { - LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d sharesummary records", __func__, n); - LOGWARNING("%s(): created %d sharesummary pool records", __func__, p_n); - } - - return ok; + return true; } bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, @@ -5855,7 +5420,15 @@ bool poolstats_add(PGconn *conn, bool store, char *poolinstance, res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); + bool show_msg = true; + char *code; + if (igndup) { + code = PQresultErrorField(res, PG_DIAG_SQLSTATE); + if (code && strcmp(code, SQL_UNIQUE_VIOLATION) == 0) + show_msg = false; + } + if (show_msg) + PGLOGERR("Insert", rescode, conn); goto unparam; } @@ -6750,7 +6323,8 @@ bool workmarkers_fill(PGconn *conn) { ExecStatusType rescode; PGresult *res; - K_ITEM *item; + K_ITEM *item, *wi_item; + WORKINFO *workinfo; int n, i; WORKMARKERS *row; char *field; @@ -6837,6 +6411,23 @@ bool workmarkers_fill(PGconn *conn) item, cmp_workmarkers_workinfoid); k_add_head(workmarkers_store, item); + if (dbstatus.newest_workmarker_workinfoid < row->workinfoidend) { + dbstatus.newest_workmarker_workinfoid = row->workinfoidend; + wi_item = find_workinfo(row->workinfoidend, NULL); + if (!wi_item) { + LOGEMERG("%s(): FAILURE workmarkerid %"PRId64 + " wid end %"PRId64" doesn't exist! " + "You should abort ckdb and fix it, " + " since the reload may skip some data", + __func__, row->markerid, + row->workinfoidend); + } else { + DATA_WORKINFO(workinfo, wi_item); + copy_tv(&(dbstatus.newest_createdate_workmarker_workinfo), + &(workinfo->createdate)); + } + } + tick(); } if (!ok)