diff --git a/pool/base.php b/pool/base.php index 820dd68a..8bfa50ab 100644 --- a/pool/base.php +++ b/pool/base.php @@ -25,7 +25,10 @@ function btcfmt($amt) } # global $sipre; +# max of uint64 is ~1.845x10^19, 'Z' is above that (10^21) +# max of uint256 is ~1.158x10^77, which is well above 'Y' (10^24) $sipre = array('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'); +# function siprefmt($amt) { global $sipre; diff --git a/src/ckdb.c b/src/ckdb.c index 9d8a4952..11a47f5d 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -213,13 +213,20 @@ static char *restorefrom; typedef struct loadstatus { tv_t oldest_sharesummary_firstshare_n; - tv_t newest_sharesummary_firstshare; + tv_t newest_sharesummary_firstshare_a; + tv_t newest_sharesummary_firstshare_ay; tv_t sharesummary_firstshare; // whichever of above 2 used + tv_t oldest_sharesummary_firstshare_a; + tv_t newest_sharesummary_firstshare_y; tv_t newest_createdate_workinfo; tv_t newest_createdate_auths; tv_t newest_createdate_poolstats; tv_t newest_starttimeband_userstats; tv_t newest_createdate_blocks; + int64_t oldest_workinfoid_n; // of oldest firstshare sharesummary n + int64_t oldest_workinfoid_a; // of oldest firstshare sharesummary a + int64_t newest_workinfoid_a; // of newest firstshare sharesummary a + int64_t newest_workinfoid_y; // of newest firstshare sharesummary y } LOADSTATUS; static LOADSTATUS dbstatus; @@ -638,6 +645,17 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; #define BTC_TO_D(_amt) ((double)((_amt) / 100000000.0)) +// argv -y - don't run in ckdb mode, just confirm sharesummaries +static bool confirm_sharesummary; +// The workinfoid range we are processing +static int64_t confirm_first_workinfoid; +static int64_t confirm_last_workinfoid; +/* TODO: Stop the reload once we fully complete confirm_last_workinfoid + * and report a message saying so + * This isn't mandatory - but it's simply a waste of processing to continue + * reloading data since it will all be ignored anyway */ +static bool confirm_finished; + // DB users,workers,auth load is complete static bool db_auths_complete = false; // DB load is complete @@ -1053,7 +1071,7 @@ typedef struct sharesummary { #define DATA_SHARESUMMARY(_item) ((SHARESUMMARY *)(_item->data)) #define SUMMARY_NEW 'n' -#define SUMMARY_AGED 'a' +#define SUMMARY_COMPLETE 'a' #define SUMMARY_CONFIRM 'y' static K_TREE *sharesummary_root; @@ -1303,6 +1321,8 @@ static K_LIST *workerstatus_free; static K_STORE *workerstatus_store; static char logname[512]; +static char *dbcode; + #define LOGQUE(_msg) log_queue_message(_msg) #define LOGFILE(_msg) rotating_log_nolock(_msg) #define LOGDUP "dup." @@ -1412,6 +1432,46 @@ static void setnow(tv_t *now) now->tv_usec = spec.tv_nsec / 1000; } +#define CKPQ_READ true +#define CKPQ_WRITE false + +#define CKPQexec(_conn, _qry, _isread) _CKPQexec(_conn, _qry, _isread, WHERE_FFL_HERE) + +// Bug check to ensure no unexpected write txns occur +static PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS) +{ + // It would slow it down, but could check qry for insert/update/... + if (!isread && confirm_sharesummary) + quitfrom(1, file, func, line, "BUG: write txn during confirm"); + + return PQexec(conn, qry); +} + +#define CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \ + _CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \ + _isread, WHERE_FFL_HERE) + +static PGresult *_CKPQexecParams(PGconn *conn, const char *qry, + int nParams, + const Oid *paramTypes, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + bool isread, WHERE_FFL_ARGS) +{ + // It would slow it down, but could check qry for insert/update/... + if (!isread && confirm_sharesummary) + quitfrom(1, file, func, line, "BUG: write txn during confirm"); + + return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths, + paramFormats, resultFormat); +} + +// Force use CKPQ... for PQ functions in use +#define PQexec CKPQexec +#define PQexecParams CKPQexecParams + static uint64_t ticks; static time_t last_tick; @@ -1885,7 +1945,7 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, conned = true; } - res = PQexec(conn, qry); + res = PQexec(conn, qry, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -1928,7 +1988,7 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, params[par++] = str_to_buf(inet, NULL, 0); PARCHK(par, params); - res = PQexecParams(conn, qry, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, qry, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); @@ -2198,7 +2258,8 @@ static bool users_pass(PGconn *conn, K_ITEM *u_item, char *oldhash, conned = true; } - res = PQexec(conn, "Begin"); + // Beginning of a write txn + res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); @@ -2206,7 +2267,7 @@ static bool users_pass(PGconn *conn, K_ITEM *u_item, char *oldhash, } PQclear(res); - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); @@ -2232,16 +2293,16 @@ static bool users_pass(PGconn *conn, K_ITEM *u_item, char *oldhash, "$4,$5,$6,$7,$8 from users where " "userid=$1 and expirydate=$2"; - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback"); + res = PQexec(conn, "Rollback", CKPQ_WRITE); goto unparam; } - res = PQexec(conn, "Commit"); + res = PQexec(conn, "Commit", CKPQ_WRITE); ok = true; unparam: PQclear(res); @@ -2337,7 +2398,7 @@ static bool users_add(PGconn *conn, char *username, char *emailaddress, conned = true; } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -2384,7 +2445,7 @@ static bool users_fill(PGconn *conn) "secondaryuserid" HISTORYDATECONTROL " from users"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -2590,7 +2651,7 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, "idlenotificationenabled,idlenotificationtime" HISTORYDATECONTROL ") values (" PQPARAM11 ")"; - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -2685,7 +2746,7 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, conned = true; } - res = PQexec(conn, "Begin"); + res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); @@ -2693,12 +2754,12 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, } PQclear(res); - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback"); + res = PQexec(conn, "Rollback", CKPQ_WRITE); goto unparam; } @@ -2725,16 +2786,16 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, HISTORYDATEPARAMS(params, par, row); PARCHK(par, params); - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback"); + res = PQexec(conn, "Rollback", CKPQ_WRITE); goto unparam; } - res = PQexec(conn, "Commit"); + res = PQexec(conn, "Commit", CKPQ_WRITE); } ok = true; @@ -2757,12 +2818,19 @@ static K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *worke item = find_workers(userid, workername); if (item) { - if (update) { + if (!confirm_sharesummary && update) { workers_update(conn, item, diffdef, idlenotificationenabled, idlenotificationtime, by, code, inet, cd, trf_root); } } else { + if (confirm_sharesummary) { + // Shouldn't be possible since the sharesummary is already aged + LOGERR("%s() %"PRId64"/%s workername not found during confirm", + __func__, userid, workername); + return NULL; + } + // TODO: limit how many? item = workers_add(conn, userid, workername, diffdef, idlenotificationenabled, idlenotificationtime, @@ -2774,22 +2842,9 @@ static K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *worke static K_ITEM *new_default_worker(PGconn *conn, bool update, int64_t userid, char *workername, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { - bool conned = false; - K_ITEM *item; - - if (conn == NULL) { - conn = dbconnect(); - conned = true; - } - - item = new_worker(conn, update, userid, workername, DIFFICULTYDEFAULT_DEF_STR, - IDLENOTIFICATIONENABLED_DEF, IDLENOTIFICATIONTIME_DEF_STR, - by, code, inet, cd, trf_root); - - if (conned) - PQfinish(conn); - - return item; + return new_worker(conn, update, userid, workername, DIFFICULTYDEFAULT_DEF_STR, + IDLENOTIFICATIONENABLED_DEF, IDLENOTIFICATIONTIME_DEF_STR, + by, code, inet, cd, trf_root); } /* unused @@ -2833,7 +2888,7 @@ static bool workers_fill(PGconn *conn) "idlenotificationenabled,idlenotificationtime" HISTORYDATECONTROL ",workerid from workers"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -2966,7 +3021,7 @@ static bool payments_fill(PGconn *conn) par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); PARCHK(par, params); - res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -3195,45 +3250,49 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc K_WUNLOCK(workinfo_free); par = 0; - params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); - params[par++] = str_to_buf(row->poolinstance, NULL, 0); - params[par++] = str_to_buf(row->transactiontree, NULL, 0); - params[par++] = str_to_buf(row->merklehash, NULL, 0); - params[par++] = str_to_buf(row->prevhash, NULL, 0); - params[par++] = str_to_buf(row->coinbase1, NULL, 0); - params[par++] = str_to_buf(row->coinbase2, NULL, 0); - params[par++] = str_to_buf(row->version, NULL, 0); - params[par++] = str_to_buf(row->bits, NULL, 0); - params[par++] = str_to_buf(row->ntime, NULL, 0); - params[par++] = bigint_to_buf(row->reward, NULL, 0); - HISTORYDATEPARAMS(params, par, row); - PARCHK(par, params); + if (!confirm_sharesummary) { + params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); + params[par++] = str_to_buf(row->poolinstance, NULL, 0); + params[par++] = str_to_buf(row->transactiontree, NULL, 0); + params[par++] = str_to_buf(row->merklehash, NULL, 0); + params[par++] = str_to_buf(row->prevhash, NULL, 0); + params[par++] = str_to_buf(row->coinbase1, NULL, 0); + params[par++] = str_to_buf(row->coinbase2, NULL, 0); + params[par++] = str_to_buf(row->version, NULL, 0); + params[par++] = str_to_buf(row->bits, NULL, 0); + params[par++] = str_to_buf(row->ntime, NULL, 0); + params[par++] = bigint_to_buf(row->reward, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHK(par, params); - ins = "insert into workinfo " - "(workinfoid,poolinstance,transactiontree,merklehash," - "prevhash,coinbase1,coinbase2,version,bits,ntime,reward" - HISTORYDATECONTROL ") values (" PQPARAM16 ")"; + ins = "insert into workinfo " + "(workinfoid,poolinstance,transactiontree,merklehash," + "prevhash,coinbase1,coinbase2,version,bits,ntime,reward" + HISTORYDATECONTROL ") values (" PQPARAM16 ")"; - if (!conn) { - conn = dbconnect(); - conned = true; - } + if (!conn) { + conn = dbconnect(); + conned = true; + } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); - goto unparam; + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto unparam; + } } workinfoid = row->workinfoid; unparam: - PQclear(res); - if (conned) - PQfinish(conn); - for (n = 0; n < par; n++) - free(params[n]); + if (par) { + PQclear(res); + if (conned) + PQfinish(conn); + for (n = 0; n < par; n++) + free(params[n]); + } K_WLOCK(workinfo_free); if (workinfoid == -1) { @@ -3241,9 +3300,11 @@ unparam: free(row->merklehash); k_add_head(workinfo_free, item); } else { - // Not currently needed in RAM - free(row->transactiontree); - row->transactiontree = strdup(EMPTY); + if (row->transactiontree && *(row->transactiontree)) { + // Not currently needed in RAM + free(row->transactiontree); + row->transactiontree = strdup(EMPTY); + } workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo); k_add_head(workinfo_store, item); @@ -3281,12 +3342,16 @@ static cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); * all the matching sharesummary records in ckdb have certainly completed * ckdb would need to restart to get the updated DB information though it would * not affect current ckdb code + * TODO: This will happen until auto aging is added here - since ckpool can not reliably + * age all workinfo that was active when it exits (e.g. a crash) so best to not + * try, and get ckdb to auto age old unaged data */ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, char *by, char *code, char *inet, tv_t *cd) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item; K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1]; + char cd_buf[DATE_BUFSIZ]; int64_t workinfoid; SHARESUMMARY sharesummary; SHARES shares; @@ -3298,11 +3363,22 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, TXT_TO_BIGINT("workinfoid", workinfoidstr, workinfoid); wi_item = find_workinfo(workinfoid); - if (!wi_item) + if (!wi_item) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Age discarded!", + __func__, workinfoid, poolinstance, + cd->tv_sec, cd->tv_usec, cd_buf); goto bye; + } - if (strcmp(poolinstance, DATA_WORKINFO(wi_item)->poolinstance) != 0) + if (strcmp(poolinstance, DATA_WORKINFO(wi_item)->poolinstance) != 0) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s Poolinstance changed (from %s)! Age discarded!", + __func__, workinfoid, poolinstance, + cd->tv_sec, cd->tv_usec, cd_buf, + DATA_WORKINFO(wi_item)->poolinstance); goto bye; + } // Find the first matching sharesummary sharesummary.workinfoid = workinfoid; @@ -3315,13 +3391,23 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == workinfoid) { error[0] = '\0'; skipupdate = false; + /* Reloading during a confirm will not have any old data + * so finding an aged sharesummary here is an error */ if (reloading) { - if (DATA_SHARESUMMARY(ss_item)->complete[0] == SUMMARY_AGED) + if (DATA_SHARESUMMARY(ss_item)->complete[0] == SUMMARY_COMPLETE) { skipupdate = true; + if (confirm_sharesummary) { + LOGERR("%s(): Duplicate %s found during confirm %"PRId64"/%s/%"PRId64, + __func__, __func__, + DATA_SHARESUMMARY(ss_item)->userid, + DATA_SHARESUMMARY(ss_item)->workername, + DATA_SHARESUMMARY(ss_item)->workinfoid); + } + } } if (!skipupdate) { - if (conn == NULL) { + if (conn == NULL && !confirm_sharesummary) { conn = dbconnect(); conned = true; } @@ -3406,7 +3492,7 @@ static bool workinfo_fill(PGconn *conn) par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); PARCHK(par, params); - res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -3493,7 +3579,8 @@ static bool workinfo_fill(PGconn *conn) break; workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo); - workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height); + if (!confirm_sharesummary) + workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height); k_add_head(workinfo_store, item); if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) @@ -3585,6 +3672,7 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; + char cd_buf[DATE_BUFSIZ]; SHARES *shares; bool ok = false; @@ -3600,8 +3688,13 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor K_RLOCK(users_free); u_item = find_users(username); K_RUNLOCK(users_free); - if (!u_item) + if (!u_item) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %s/%ld,%ld %.19s no user! Share discarded!", + __func__, username, + cd->tv_sec, cd->tv_usec, cd_buf); goto unitem; + } shares->userid = DATA_USERS(u_item)->userid; @@ -3619,15 +3712,20 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor HISTORYDATETRANSFER(trf_root, shares); wi_item = find_workinfo(shares->workinfoid); - if (!wi_item) + if (!wi_item) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Share discarded!", + __func__, shares->workinfoid, workername, + cd->tv_sec, cd->tv_usec, cd_buf); goto unitem; + } w_item = new_default_worker(conn, false, shares->userid, shares->workername, by, code, inet, cd, trf_root); if (!w_item) goto unitem; - if (reloading) { + if (reloading && !confirm_sharesummary) { ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid); if (ss_item) { if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW) { @@ -3649,7 +3747,8 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor } } - workerstatus_update(NULL, shares, NULL, NULL); + if (!confirm_sharesummary) + workerstatus_update(NULL, shares, NULL, NULL); sharesummary_update(conn, shares, NULL, NULL, by, code, inet, cd); @@ -3703,6 +3802,7 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; + char cd_buf[DATE_BUFSIZ]; SHAREERRORS *shareerrors; bool ok = false; @@ -3718,8 +3818,13 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, K_RLOCK(users_free); u_item = find_users(username); K_RUNLOCK(users_free); - if (!u_item) + if (!u_item) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %s/%ld,%ld %.19s no user! Shareerror discarded!", + __func__, username, + cd->tv_sec, cd->tv_usec, cd_buf); goto unitem; + } shareerrors->userid = DATA_USERS(u_item)->userid; @@ -3734,15 +3839,20 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, HISTORYDATETRANSFER(trf_root, shareerrors); wi_item = find_workinfo(shareerrors->workinfoid); - if (!wi_item) + if (!wi_item) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Shareerror discarded!", + __func__, shareerrors->workinfoid, workername, + cd->tv_sec, cd->tv_usec, cd_buf); goto unitem; + } w_item = new_default_worker(NULL, false, shareerrors->userid, shareerrors->workername, by, code, inet, cd, trf_root); if (!w_item) goto unitem; - if (reloading) { + if (reloading && !confirm_sharesummary) { ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid); if (ss_item) { if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW) { @@ -3882,7 +3992,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row item = ss_item; row = DATA_SHARESUMMARY(item); must_update = true; - row->complete[0] = SUMMARY_AGED; + row->complete[0] = SUMMARY_COMPLETE; row->complete[1] = '\0'; } else { if (s_row) { @@ -4003,7 +4113,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row } } - if (conn == NULL) { + if (conn == NULL && !confirm_sharesummary) { conn = dbconnect(); conned = true; } @@ -4012,60 +4122,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row MODIFYDATEINIT(row, cd, by, code, inet); par = 0; - params[par++] = bigint_to_buf(row->userid, NULL, 0); - params[par++] = str_to_buf(row->workername, NULL, 0); - params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); - params[par++] = double_to_buf(row->diffacc, NULL, 0); - params[par++] = double_to_buf(row->diffsta, NULL, 0); - params[par++] = double_to_buf(row->diffdup, NULL, 0); - params[par++] = double_to_buf(row->diffhi, NULL, 0); - params[par++] = double_to_buf(row->diffrej, NULL, 0); - params[par++] = double_to_buf(row->shareacc, NULL, 0); - params[par++] = double_to_buf(row->sharesta, NULL, 0); - params[par++] = double_to_buf(row->sharedup, NULL, 0); - params[par++] = double_to_buf(row->sharehi, NULL, 0); - params[par++] = double_to_buf(row->sharerej, NULL, 0); - params[par++] = bigint_to_buf(row->sharecount, NULL, 0); - params[par++] = bigint_to_buf(row->errorcount, NULL, 0); - params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); - params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); - params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); - params[par++] = str_to_buf(row->complete, NULL, 0); - MODIFYDATEPARAMS(params, par, row); - PARCHK(par, params); - - ins = "insert into sharesummary " - "(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," - "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," - "sharecount,errorcount,firstshare,lastshare," - "lastdiffacc,complete" - MODIFYDATECONTROL ") values (" PQPARAM27 ")"; - - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); - goto unparam; - } - - row->countlastupdate = row->sharecount + row->errorcount; - row->inserted = true; - if (row->complete[0] == SUMMARY_AGED) - row->saveaged = true; - } else { - bool stats_update = false; - - MODIFYUPDATE(row, cd, by, code, inet); - - if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) < - (row->sharecount + row->errorcount)) - stats_update = true; - - if (must_update && row->countlastupdate < (row->sharecount + row->errorcount)) - stats_update = true; - - if (stats_update) { - par = 0; + if (!confirm_sharesummary) { params[par++] = bigint_to_buf(row->userid, NULL, 0); params[par++] = str_to_buf(row->workername, NULL, 0); params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -4079,55 +4136,114 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row params[par++] = double_to_buf(row->sharedup, NULL, 0); params[par++] = double_to_buf(row->sharehi, NULL, 0); params[par++] = double_to_buf(row->sharerej, NULL, 0); + params[par++] = bigint_to_buf(row->sharecount, NULL, 0); + params[par++] = bigint_to_buf(row->errorcount, NULL, 0); params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); params[par++] = str_to_buf(row->complete, NULL, 0); - MODIFYUPDATEPARAMS(params, par, row); - PARCHKVAL(par, 21, params); - - upd = "update sharesummary " - "set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8," - "shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12," - "sharerej=$13,firstshare=$14,lastshare=$15," - "lastdiffacc=$16,complete=$17" - ",modifydate=$18,modifyby=$19,modifycode=$20,modifyinet=$21 " - "where userid=$1 and workername=$2 and workinfoid=$3"; - - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); + MODIFYDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into sharesummary " + "(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," + "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," + "sharecount,errorcount,firstshare,lastshare," + "lastdiffacc,complete" + MODIFYDATECONTROL ") values (" PQPARAM27 ")"; + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { - PGLOGERR("Update", rescode, conn); + PGLOGERR("Insert", rescode, conn); goto unparam; } - row->countlastupdate = row->sharecount + row->errorcount; - if (row->complete[0] == SUMMARY_AGED) - row->saveaged = true; - } else { - if (!must_update) { - ok = true; - goto late; - } else { - par = 0; + } + + row->countlastupdate = row->sharecount + row->errorcount; + row->inserted = true; + if (row->complete[0] == SUMMARY_COMPLETE) + row->saveaged = true; + } else { + bool stats_update = false; + + MODIFYUPDATE(row, cd, by, code, inet); + + if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) < + (row->sharecount + row->errorcount)) + stats_update = true; + + if (must_update && row->countlastupdate < (row->sharecount + row->errorcount)) + stats_update = true; + + if (stats_update) { + par = 0; + if (!confirm_sharesummary) { params[par++] = bigint_to_buf(row->userid, NULL, 0); params[par++] = str_to_buf(row->workername, NULL, 0); params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); + params[par++] = double_to_buf(row->diffacc, NULL, 0); + params[par++] = double_to_buf(row->diffsta, NULL, 0); + params[par++] = double_to_buf(row->diffdup, NULL, 0); + params[par++] = double_to_buf(row->diffhi, NULL, 0); + params[par++] = double_to_buf(row->diffrej, NULL, 0); + params[par++] = double_to_buf(row->shareacc, NULL, 0); + params[par++] = double_to_buf(row->sharesta, NULL, 0); + params[par++] = double_to_buf(row->sharedup, NULL, 0); + params[par++] = double_to_buf(row->sharehi, NULL, 0); + params[par++] = double_to_buf(row->sharerej, NULL, 0); + params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); + params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); params[par++] = str_to_buf(row->complete, NULL, 0); MODIFYUPDATEPARAMS(params, par, row); - PARCHKVAL(par, 8, params); + PARCHKVAL(par, 21, params); upd = "update sharesummary " - "set complete=$4,modifydate=$5,modifyby=$6,modifycode=$7,modifyinet=$8 " + "set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8," + "shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12," + "sharerej=$13,firstshare=$14,lastshare=$15," + "lastdiffacc=$16,complete=$17" + ",modifydate=$18,modifyby=$19,modifycode=$20,modifyinet=$21 " "where userid=$1 and workername=$2 and workinfoid=$3"; - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { - PGLOGERR("MustUpdate", rescode, conn); + PGLOGERR("Update", rescode, conn); goto unparam; } + } + row->countlastupdate = row->sharecount + row->errorcount; + if (row->complete[0] == SUMMARY_COMPLETE) + row->saveaged = true; + } else { + if (!must_update) { + ok = true; + goto late; + } else { + par = 0; + if (!confirm_sharesummary) { + params[par++] = bigint_to_buf(row->userid, NULL, 0); + params[par++] = str_to_buf(row->workername, NULL, 0); + params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); + params[par++] = str_to_buf(row->complete, NULL, 0); + MODIFYUPDATEPARAMS(params, par, row); + PARCHKVAL(par, 8, params); + + upd = "update sharesummary " + "set complete=$4,modifydate=$5,modifyby=$6,modifycode=$7,modifyinet=$8 " + "where userid=$1 and workername=$2 and workinfoid=$3"; + + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("MustUpdate", rescode, conn); + goto unparam; + } + } row->countlastupdate = row->sharecount + row->errorcount; - if (row->complete[0] == SUMMARY_AGED) + if (row->complete[0] == SUMMARY_COMPLETE) row->saveaged = true; } } @@ -4135,9 +4251,11 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row ok = true; unparam: - PQclear(res); - for (n = 0; n < par; n++) - free(params[n]); + if (par) { + PQclear(res); + for (n = 0; n < par; n++) + free(params[n]); + } late: if (conned) PQfinish(conn); @@ -4178,7 +4296,7 @@ static bool sharesummary_fill(PGconn *conn) "lastdiffacc,complete" MODIFYDATECONTROL " from sharesummary"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -4311,12 +4429,33 @@ static bool sharesummary_fill(PGconn *conn) workerstatus_update(NULL, NULL, NULL, row); + // A share summary is currently only shares in a single workinfo, at all 3 levels n,a,y if (tolower(row->complete[0]) == SUMMARY_NEW) { if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 || - !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) + !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) { copy_tv(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare)); - } else if (tv_newer(&(dbstatus.newest_sharesummary_firstshare), &(row->firstshare))) - copy_tv(&(dbstatus.newest_sharesummary_firstshare), &(row->firstshare)); + dbstatus.oldest_workinfoid_n = row->workinfoid; + } + } else { + if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare))) + copy_tv(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare)); + if (tolower(row->complete[0]) == SUMMARY_COMPLETE) { + if (dbstatus.oldest_sharesummary_firstshare_a.tv_sec == 0 || + !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare))) { + copy_tv(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare)); + dbstatus.oldest_workinfoid_a = row->workinfoid; + } + if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare))) { + copy_tv(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare)); + dbstatus.newest_workinfoid_a = row->workinfoid; + } + } else /* SUMMARY_CONFIRM */ { + if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare))) { + copy_tv(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare)); + dbstatus.newest_workinfoid_y = row->workinfoid; + } + } + } if (row->workinfoid >= pool.workinfoid) { pool.diffacc += row->diffacc; @@ -4505,7 +4644,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, conned = true; } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -4565,7 +4704,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, HISTORYDATETRANSFER(trf_root, row); - res = PQexec(conn, "Begin"); + res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); @@ -4573,12 +4712,12 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, } PQclear(res); - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback"); + res = PQexec(conn, "Rollback", CKPQ_WRITE); goto unparam; } @@ -4601,18 +4740,18 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, "$3,$4,$5,$6,$7,$8 from blocks where " "blockhash=$1 and expirydate=$2"; - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback"); + res = PQexec(conn, "Rollback", CKPQ_WRITE); goto unparam; } update_old = true; - res = PQexec(conn, "Commit"); + res = PQexec(conn, "Commit", CKPQ_WRITE); break; default: K_WUNLOCK(blocks_free); @@ -4689,7 +4828,7 @@ static bool blocks_fill(PGconn *conn) "clientid,enonce1,nonce2,nonce,reward,confirmed" HISTORYDATECONTROL " from blocks"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -4917,7 +5056,7 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, "(authid,poolinstance,userid,workername,clientid,enonce1,useragent" HISTORYDATECONTROL ") values (" PQPARAM12 ")"; - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -4969,7 +5108,7 @@ static bool auths_fill(PGconn *conn) par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); PARCHK(par, params); - res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -5147,7 +5286,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, conned = true; } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -5199,7 +5338,7 @@ static bool poolstats_fill(PGconn *conn) "hashrate1hr,hashrate24hr" SIMPLEDATECONTROL " from poolstats"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -5440,7 +5579,7 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row) conned = true; } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -5504,6 +5643,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, eosdate.tv_usec = row->createdate.tv_usec; } + // confirm_summaries() doesn't call this if (reloading) { memcpy(&cmp, row, sizeof(cmp)); look.data = (void *)(&cmp); @@ -5642,7 +5782,7 @@ static bool userstats_fill(PGconn *conn) "hashrate24hr,summarylevel,summarycount,statsdate" SIMPLEDATECONTROL " from userstats"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -5786,7 +5926,7 @@ static bool check_db_version(PGconn *conn) LOGDEBUG("%s(): select", __func__); sel = "select * from version;"; - res = PQexec(conn, sel); + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGEMERG("Select", rescode, conn); @@ -5861,7 +6001,8 @@ static bool getdata1() goto matane; if (!(ok = workers_fill(conn))) goto matane; - ok = auths_fill(conn); + if (!confirm_sharesummary) + ok = auths_fill(conn); matane: @@ -5876,8 +6017,10 @@ static bool getdata2() if (!(ok = blocks_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = payments_fill(conn)) || everyone_die) - goto sukamudai; + if (!confirm_sharesummary) { + if (!(ok = payments_fill(conn)) || everyone_die) + goto sukamudai; + } if (!(ok = workinfo_fill(conn)) || everyone_die) goto sukamudai; if (!(ok = shares_fill()) || everyone_die) @@ -5886,9 +6029,11 @@ static bool getdata2() goto sukamudai; if (!(ok = sharesummary_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = poolstats_fill(conn)) || everyone_die) - goto sukamudai; - ok = userstats_fill(conn); + if (!confirm_sharesummary) { + if (!(ok = poolstats_fill(conn)) || everyone_die) + goto sukamudai; + ok = userstats_fill(conn); + } sukamudai: @@ -5908,7 +6053,7 @@ static bool reload() tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); - tv_to_buf(&(dbstatus.newest_sharesummary_firstshare), buf, sizeof(buf)); + tv_to_buf(&(dbstatus.newest_sharesummary_firstshare_ay), buf, sizeof(buf)); LOGWARNING("%s(): %s newest DB complete sharesummary", __func__, buf); tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf)); LOGWARNING("%s(): %s newest DB workinfo", __func__, buf); @@ -5924,7 +6069,7 @@ static bool reload() if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec) copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.oldest_sharesummary_firstshare_n)); else - copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.newest_sharesummary_firstshare)); + copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.newest_sharesummary_firstshare_ay)); copy_tv(&start, &(dbstatus.sharesummary_firstshare)); reason = "sharesummary"; @@ -6060,15 +6205,9 @@ static void clean_up(ckpool_t *ckp) fclose(ckp->logfp); } -static bool setup_data() +// TODO: skip ones not needed for confirm_summaries() +static void alloc_storage() { - K_TREE_CTX ctx[1]; - K_ITEM look, *found; - WORKINFO wi; - - cklock_init(&fpm_lock); - cksem_init(&socketer_sem); - workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); workqueue_store = k_new_store(workqueue_free); @@ -6101,7 +6240,8 @@ static bool setup_data() ALLOC_WORKINFO, LIMIT_WORKINFO, true); workinfo_store = k_new_store(workinfo_free); workinfo_root = new_ktree(); - workinfo_height_root = new_ktree(); + if (!confirm_sharesummary) + workinfo_height_root = new_ktree(); shares_free = k_new_list("Shares", sizeof(SHARES), ALLOC_SHARES, LIMIT_SHARES, true); @@ -6149,6 +6289,18 @@ static bool setup_data() ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); workerstatus_store = k_new_store(workerstatus_free); workerstatus_root = new_ktree(); +} + +static bool setup_data() +{ + K_TREE_CTX ctx[1]; + K_ITEM look, *found; + WORKINFO wi; + + cklock_init(&fpm_lock); + cksem_init(&socketer_sem); + + alloc_storage(); if (!getdata1() || everyone_die) return false; @@ -6215,7 +6367,7 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, by, code, inet, now, trf_root); if (!ok) { - LOGERR("%s.failed.DBE", id); + LOGERR("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_username)->data); @@ -6391,7 +6543,7 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, by, code, inet, cd, igndup, trf_root); if (!ok) { - LOGERR("%s.failed.DBE", id); + LOGERR("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } LOGDEBUG("%s.ok.", id); @@ -6406,6 +6558,7 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, { bool igndup = false; + // confirm_summaries() doesn't call this if (reloading) { if (tv_equal(cd, &(dbstatus.newest_createdate_blocks))) igndup = true; @@ -6487,7 +6640,7 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, idle, eos, by, code, inet, cd, trf_root); if (!ok) { - LOGERR("%s.failed.DATA", id); + LOGERR("%s() %s.failed.DATA", __func__, id); return strdup("failed.DATA"); } LOGDEBUG("%s.ok.", id); @@ -6546,7 +6699,7 @@ static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, conned = true; } - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); @@ -6566,7 +6719,7 @@ foil: K_WUNLOCK(idcontrol_free); if (!ok) { - LOGERR("%s.failed.DBE", id); + LOGERR("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } LOGDEBUG("%s.ok.added %s %"PRId64, id, DATA_TRANSFER(i_idname)->data, row->lastid); @@ -6912,6 +7065,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, { char reply[1024] = ""; size_t siz = sizeof(reply); + int64_t workinfoid; // log to logfile with processing success/failure code @@ -6921,10 +7075,9 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, K_ITEM *i_workinfoid, *i_poolinstance, *i_transactiontree, *i_merklehash; K_ITEM *i_prevhash, *i_coinbase1, *i_coinbase2, *i_version, *i_bits; K_ITEM *i_ntime, *i_reward; - int64_t workinfoid; bool igndup = false; - if (reloading) { + if (reloading && !confirm_sharesummary) { if (tv_equal(cd, &(dbstatus.newest_createdate_workinfo))) igndup = true; else if (tv_newer(cd, &(dbstatus.newest_createdate_workinfo))) @@ -6935,6 +7088,14 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, if (!i_workinfoid) return strdup(reply); + if (confirm_sharesummary) { + TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid); + + if (workinfoid < confirm_first_workinfoid || + workinfoid > confirm_last_workinfoid) + goto wiconf; + } + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); @@ -6989,10 +7150,11 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, by, code, inet, cd, igndup, trf_root); if (workinfoid == -1) { - LOGERR("%s.failed.DBE", id); + LOGERR("%s(%s) %s.failed.DBE", __func__, cmd, id); return strdup("failed.DBE"); } LOGDEBUG("%s.ok.added %"PRId64, id, workinfoid); +wiconf: snprintf(reply, siz, "ok.%"PRId64, workinfoid); return strdup(reply); } else if (strcasecmp(cmd, STR_SHARES) == 0) { @@ -7001,7 +7163,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, bool ok; // This just excludes the shares we certainly don't need - if (reloading) { + if (reloading && !confirm_sharesummary) { if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) return NULL; } @@ -7010,6 +7172,14 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, if (!i_workinfoid) return strdup(reply); + if (confirm_sharesummary) { + TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid); + + if (workinfoid < confirm_first_workinfoid || + workinfoid > confirm_last_workinfoid) + goto sconf; + } + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); @@ -7059,10 +7229,11 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, by, code, inet, cd, trf_root); if (!ok) { - LOGERR("%s.failed.DATA", id); + LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); } LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_nonce)->data); +sconf: snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_nonce)->data); return strdup(reply); } else if (strcasecmp(cmd, STR_SHAREERRORS) == 0) { @@ -7071,7 +7242,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, bool ok; // This just excludes the shareerrors we certainly don't need - if (reloading) { + if (reloading && !confirm_sharesummary) { if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) return NULL; } @@ -7080,6 +7251,14 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, if (!i_workinfoid) return strdup(reply); + if (confirm_sharesummary) { + TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid); + + if (workinfoid < confirm_first_workinfoid || + workinfoid > confirm_last_workinfoid) + goto seconf; + } + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); @@ -7113,17 +7292,18 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, DATA_TRANSFER(i_secondaryuserid)->data, by, code, inet, cd, trf_root); if (!ok) { - LOGERR("%s.failed.DATA", id); + LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); } LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_username)->data); +seconf: snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_username)->data); return strdup(reply); } else if (strcasecmp(cmd, STR_AGEWORKINFO) == 0) { K_ITEM *i_workinfoid, *i_poolinstance; bool ok; - if (reloading) { + if (reloading && !confirm_sharesummary) { if (tv_newer(cd, &(dbstatus.sharesummary_firstshare))) return NULL; } @@ -7132,6 +7312,14 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, if (!i_workinfoid) return strdup(reply); + if (confirm_sharesummary) { + TXT_TO_BIGINT("workinfoid", DATA_TRANSFER(i_workinfoid)->data, workinfoid); + + if (workinfoid < confirm_first_workinfoid || + workinfoid > confirm_last_workinfoid) + goto awconf; + } + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); @@ -7141,12 +7329,13 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, by, code, inet, cd); if (!ok) { - LOGERR("%s.failed.DATA", id); + LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); } LOGDEBUG("%s.ok.aged %.*s", id, BIGINT_BUFSIZ, DATA_TRANSFER(i_workinfoid)->data); +awconf: snprintf(reply, siz, "ok.%.*s", BIGINT_BUFSIZ, DATA_TRANSFER(i_workinfoid)->data); @@ -7250,7 +7439,7 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, } if (!ok) { - LOGERR("%s.failed.DBE", id); + LOGERR("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } @@ -7266,6 +7455,7 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id, { bool igndup = false; + // confirm_summaries() doesn't call this if (reloading) { if (tv_equal(cd, &(dbstatus.newest_createdate_blocks))) igndup = true; @@ -7321,7 +7511,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, by, code, inet, cd, igndup, trf_root); if (!secuserid) { - LOGDEBUG("%s.failed.DBE", id); + LOGDEBUG("%s() %s.failed.DBE", __func__, id); return strdup("failed.DBE"); } @@ -7337,6 +7527,7 @@ static char *cmd_auth(PGconn *conn, char *cmd, char *id, { bool igndup = false; + // confirm_summaries() doesn't call this if (reloading) { if (tv_equal(cd, &(dbstatus.newest_createdate_auths))) igndup = true; @@ -7911,6 +8102,7 @@ static void summarise_userstats() locked = true; K_ILOCK(userstats_free); + // confirm_summaries() doesn't call this if (!reloading) copy_tv(&process, &now); else { @@ -8100,7 +8292,8 @@ static void *logger(__maybe_unused void *arg) pthread_detach(pthread_self()); - rename_proc("db_logger"); + snprintf(buf, sizeof(buf), "db%s_logger", dbcode); + rename_proc(buf); setnow(&now); snprintf(buf, sizeof(buf), "logstart.%ld,%ld", @@ -8512,11 +8705,13 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", __func__, count, cmd); break; - case CMD_SHARELOG: case CMD_AUTH: case CMD_POOLSTAT: case CMD_USERSTAT: case CMD_BLOCK: + if (confirm_sharesummary) + break; + case CMD_SHARELOG: ans = cmds[which_cmds].func(conn, cmd, id, &now, by_default, (char *)__func__, @@ -8560,7 +8755,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) static char *reload_buf; /* If the reload start file is missing and -r was specified correctly: - * touch the filename reported in "Failed to open 'filename'" + * touch the filename reported in "Failed to open 'filename'", * if ckdb aborts at the beginning of the reload, then start again */ static bool reload_from(tv_t *start) { @@ -8604,7 +8799,7 @@ static bool reload_from(tv_t *start) processing++; count = 0; - while (!matched && fgets_unlocked(reload_buf, MAX_READ, fp)) + while (!matched && !confirm_finished && fgets_unlocked(reload_buf, MAX_READ, fp)) matched = reload_line(conn, filename, ++count, reload_buf); if (ferror(fp)) { @@ -8620,7 +8815,7 @@ static bool reload_from(tv_t *start) total += count; fclose(fp); free(filename); - if (matched) + if (matched || confirm_finished) break; start->tv_sec += ROLL_S; filename = rotating_filename(restorefrom, start->tv_sec); @@ -8680,7 +8875,7 @@ static bool reload_from(tv_t *start) if (everyone_die) return true; - if (!matched) { + if (!matched && !confirm_finished) { ck_wlock(&fpm_lock); if (first_pool_message) { LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", @@ -8797,6 +8992,224 @@ static void *listener(void *arg) return NULL; } +/* TODO: This will be way faster traversing both trees simultaneously + * rather than traversing one and searching the other, then repeating + * in reverse. Will change it later */ +static void compare_summaries(K_TREE *leftsum, char *leftname, + K_TREE *rightsum, char *rightname, + bool show_missing, bool show_diff) +{ + K_TREE_CTX ctxl[1], ctxr[1]; + K_ITEM look, *lss, *rss; + char cd_buf[DATE_BUFSIZ]; + SHARESUMMARY sharesummary; + uint64_t total, ok, missing, diff; + uint64_t first_used = 0, last_used = 0; + + sharesummary.workinfoid = confirm_first_workinfoid; + sharesummary.userid = -1; + sharesummary.workername[0] = '\0'; + look.data = (void *)(&sharesummary); + + total = ok = missing = diff = 0; + lss = find_after_in_ktree(leftsum, &look, cmp_sharesummary_workinfoid, ctxl); + while (lss) { + if (DATA_SHARESUMMARY(lss)->workinfoid > confirm_last_workinfoid) + break; + + total++; + + if (first_used == 0) + first_used = DATA_SHARESUMMARY(lss)->workinfoid; + last_used = DATA_SHARESUMMARY(lss)->workinfoid; + + rss = find_in_ktree(rightsum, lss, cmp_sharesummary_workinfoid, ctxr); + if (!rss) { + missing++; + if (show_missing) { + LOGERR("ERROR: %s %"PRId64"/%s/%ld,%ld %.19s missing from %s", + leftname, + DATA_SHARESUMMARY(lss)->workinfoid, + DATA_SHARESUMMARY(lss)->workername, + DATA_SHARESUMMARY(lss)->createdate.tv_sec, + DATA_SHARESUMMARY(lss)->createdate.tv_usec, + tv_to_buf(&(DATA_SHARESUMMARY(lss)->createdate), cd_buf, sizeof(cd_buf)), + rightname); + } + } else if (DATA_SHARESUMMARY(rss)->diffacc != DATA_SHARESUMMARY(lss)->diffacc) { + diff++; + if (show_diff) { + LOGERR("ERROR: %"PRId64"/%s/%ld,%ld %.19s - diffacc: %s: %.0f %s: %.0f", + DATA_SHARESUMMARY(lss)->workinfoid, + DATA_SHARESUMMARY(lss)->workername, + DATA_SHARESUMMARY(lss)->createdate.tv_sec, + DATA_SHARESUMMARY(lss)->createdate.tv_usec, + tv_to_buf(&(DATA_SHARESUMMARY(lss)->createdate), cd_buf, sizeof(cd_buf)), + leftname, + DATA_SHARESUMMARY(lss)->diffacc, + rightname, + DATA_SHARESUMMARY(rss)->diffacc); + } + } else + ok++; + + lss = next_in_ktree(ctxl); + } + + LOGERR("RESULT: %s->%s Total %"PRIu64" workinfoid %"PRId64"-%"PRId64 + " missing: %"PRIu64" different: %"PRIu64, + leftname, rightname, total, first_used, last_used, + missing, diff); +} + +static void confirm_reload() +{ + K_TREE *sharesummary_workinfoid_save; + __maybe_unused K_TREE *sharesummary_save; + __maybe_unused K_TREE *workinfo_save; + K_ITEM look, *wi_item; + WORKINFO workinfo; + K_TREE_CTX ctx[1]; + char buf[DATE_BUFSIZ+1]; + char *first_reason; + char *last_reason; + char *filename; + tv_t start; + FILE *fp; + +// TODO: // abort reload when we get an age after the end of a workinfo after the Xs after the last workinfo before the end + + /* The first workinfo we should process + * With no y records we should start from the beginning (0) + * With any y records, we should start from the oldest of: y+1 and a + * which can produce y records as reload a's, if a is used */ + if (dbstatus.newest_workinfoid_y > 0) { + confirm_first_workinfoid = dbstatus.newest_workinfoid_y + 1; + if (confirm_first_workinfoid > dbstatus.oldest_workinfoid_a) { + confirm_first_workinfoid = dbstatus.oldest_workinfoid_a; + first_reason = "oldest aged"; + } else + first_reason = "newest confirmed+1"; + } else + first_reason = "0 - none confirmed"; + + /* The last workinfo we should process + * The reason for going past the last 'a' up to before + * the first 'n' is in case there were shares missed between them - + * but that should only be the case with a code bug - so it checks that + * TODO: auto aging will clear 'n' sections inside the 'a's that + * will always occur when ckpool restarts, since ckpool will never + * send workinfo age records for workinfo that's active at shutdown - + * Aging these can (for now) easily be done manually in psql */ + if (dbstatus.newest_workinfoid_a > 0) { + confirm_last_workinfoid = dbstatus.newest_workinfoid_a; + last_reason = "newest aged"; + } + if (confirm_last_workinfoid < dbstatus.oldest_workinfoid_n) { + confirm_last_workinfoid = dbstatus.oldest_workinfoid_n - 1; + last_reason = "oldest new-1"; + } + if (confirm_last_workinfoid == 0) { + LOGWARNING("%s(): there are no unconfirmed sharesummary records in the DB", + __func__, buf); + return; + } + + workinfo.workinfoid = confirm_first_workinfoid + 1; + workinfo.expirydate.tv_sec = default_expiry.tv_sec; + workinfo.expirydate.tv_usec = default_expiry.tv_usec; + + look.data = (void *)(&workinfo); + wi_item = find_before_in_ktree(workinfo_root, &look, cmp_workinfo, ctx); + if (wi_item) { + copy_tv(&start, &(DATA_WORKINFO(wi_item)->createdate)); + if (DATA_WORKINFO(wi_item)->workinfoid != confirm_first_workinfoid) { + LOGWARNING("%s() start workinfo not found ... using %"PRId64, + __func__, DATA_WORKINFO(wi_item)->workinfoid); + } + } else { + start.tv_sec = 0; + LOGWARNING("%s() no start workinfo found ... using 0", __func__); + } + + LOGWARNING("%s() workinfo range: %"PRId64" to %"PRId64" ('%s' to '%s')", + __func__, confirm_first_workinfoid, confirm_last_workinfoid, + first_reason, last_reason); + + tv_to_buf(&start, buf, sizeof(buf)); + LOGWARNING("%s() load start timestamp %s", __func__, buf); + + /* Save the DB info for comparing to the reload + * i.e. the reload will generate from scratch all the + * sharesummaries and workinfo from the CCLs */ + sharesummary_workinfoid_save = sharesummary_workinfoid_root; + sharesummary_save = sharesummary_root; + workinfo_save = workinfo_root; + + sharesummary_workinfoid_root = new_ktree(); + sharesummary_root = new_ktree(); + workinfo_root = new_ktree(); + + if (start.tv_sec < DATE_BEGIN) { + start.tv_sec = DATE_BEGIN; + start.tv_usec = 0L; + filename = rotating_filename(restorefrom, start.tv_sec); + fp = fopen(filename, "r"); + if (fp) + fclose(fp); + else { + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + int fd = open(filename, O_CREAT|O_RDONLY, mode); + if (fd == -1) { + int ern = errno; + quithere(1, "Couldn't create '%s' (%d) %s", + filename, ern, strerror(ern)); + } + close(fd); + } + free(filename); + } + + if (!reload_from(&start)) { + LOGEMERG("%s() ABORTING from reload_from()", __func__); + return; + } + + compare_summaries(sharesummary_workinfoid_save, "DB", + sharesummary_workinfoid_root, "ReLoad", + true, true); + compare_summaries(sharesummary_workinfoid_root, "ReLoad", + sharesummary_workinfoid_save, "DB", + true, false); +} + +static void confirm_summaries() +{ + pthread_t log_pt; + + logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), + ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); + logqueue_store = k_new_store(logqueue_free); + + create_pthread(&log_pt, logger, NULL); + + rename_proc("dby_confirmer"); + + alloc_storage(); + + if (!getdata1()) { + LOGEMERG("%s() ABORTING from getdata1()", __func__); + return; + } + + if (!getdata2()) { + LOGEMERG("%s() ABORTING from getdata2()", __func__); + return; + } + + confirm_reload(); +} + #define RELOADFILES "ckdb" static void check_restore_dir() @@ -8839,7 +9252,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt(argc, argv, "c:d:kl:n:p:r:s:u:")) != -1) { + while ((c = getopt(argc, argv, "c:d:kl:n:p:r:s:u:y")) != -1) { switch(c) { case 'c': ckp.config = strdup(optarg); @@ -8883,9 +9296,18 @@ int main(int argc, char **argv) while (*kill) *(kill++) = '\0'; break; + case 'y': + // TODO: allow a range e.g. block or workinfo range or ... + confirm_sharesummary = true; + break; } } + if (confirm_sharesummary) + dbcode = "y"; + else + dbcode = ""; + check_restore_dir(); if (!db_name) @@ -8894,7 +9316,7 @@ int main(int argc, char **argv) db_user = "postgres"; if (!ckp.name) ckp.name = "ckdb"; - snprintf(buf, 15, "%s", ckp.name); + snprintf(buf, 15, "%s%s", ckp.name, dbcode); prctl(PR_SET_NAME, buf, 0, 0, 0); memset(buf, 0, 15); @@ -8928,32 +9350,39 @@ int main(int argc, char **argv) quit(1, "Failed to make log directory %s", ckp.logdir); /* Create the logfile */ - sprintf(buf, "%s%s.log", ckp.logdir, ckp.name); + sprintf(buf, "%s%s%s.log", ckp.logdir, ckp.name, dbcode); ckp.logfp = fopen(buf, "a"); if (!ckp.logfp) quit(1, "Failed to open log file %s", buf); ckp.logfd = fileno(ckp.logfp); - snprintf(logname, sizeof(logname), "%s%s-db-", - ckp.logdir, ckp.name); + snprintf(logname, sizeof(logname), "%s%s-db%s-", + ckp.logdir, ckp.name, dbcode); + + setnow(&now); + srand((unsigned int)(now.tv_usec * 4096 + now.tv_sec % 4096)); ckp.main.ckp = &ckp; ckp.main.processname = strdup("main"); - ckp.main.sockname = strdup("listener"); - write_namepid(&ckp.main); - create_process_unixsock(&ckp.main); - setnow(&now); - srand((unsigned int)(now.tv_usec * 4096 + now.tv_sec % 4096)); - create_pthread(&ckp.pth_listener, listener, &ckp.main); + if (confirm_sharesummary) { + // TODO: add a system lock to stop running 2 at once? + confirm_summaries(); + } else { + ckp.main.sockname = strdup("listener"); + write_namepid(&ckp.main); + create_process_unixsock(&ckp.main); - handler.sa_flags = 0; - sigemptyset(&handler.sa_mask); - sigaction(SIGTERM, &handler, NULL); - sigaction(SIGINT, &handler, NULL); + create_pthread(&ckp.pth_listener, listener, &ckp.main); - /* Shutdown from here if the listener is sent a shutdown message */ - join_pthread(ckp.pth_listener); + handler.sa_flags = 0; + sigemptyset(&handler.sa_mask); + sigaction(SIGTERM, &handler, NULL); + sigaction(SIGINT, &handler, NULL); + + /* Shutdown from here if the listener is sent a shutdown message */ + join_pthread(ckp.pth_listener); + } clean_up(&ckp);