Browse Source

ckdb - don't store sharesummaries in the DB and delay mark processing until sync has completed

master
kanoi 10 years ago
parent
commit
fcfe965ba3
  1. 73
      src/ckdb.c
  2. 43
      src/ckdb.h
  3. 108
      src/ckdb_cmd.c
  4. 38
      src/ckdb_data.c
  5. 497
      src/ckdb_dbio.c

73
src/ckdb.c

@ -268,7 +268,7 @@ bool dbload_only_sharesummary = false;
* markersummaries and pplns payouts may not be correct */
bool sharesummary_marks_limit = false;
// DB users,workers load is complete
// DB optioncontrol,users,workers,useratts load is complete
bool db_users_complete = false;
// DB load is complete
bool db_load_complete = false;
@ -758,7 +758,9 @@ static bool getdata1()
goto matane;
if (!(ok = users_fill(conn)))
goto matane;
ok = workers_fill(conn);
if (!(ok = workers_fill(conn)))
goto matane;
ok = useratts_fill(conn);
matane:
@ -796,21 +798,15 @@ static bool getdata3()
}
if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai;
/* marks must be loaded before sharesummary
* since sharesummary looks at the marks data */
if (!(ok = marks_fill(conn)) || everyone_die)
goto sukamudai;
/* must be after workinfo */
if (!(ok = workmarkers_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = sharesummary_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary) {
if (!(ok = useratts_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary)
ok = poolstats_fill(conn);
}
sukamudai:
@ -828,32 +824,25 @@ static bool reload()
char *reason;
FILE *fp;
tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_sharesummary_firstshare_ay), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB complete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_workmarker_workinfo),
buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workmarker wid %"PRId64,
__func__, buf,
dbstatus.newest_workmarker_workinfoid);
tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workinfo", __func__, buf);
LOGWARNING("%s(): %s newest DB workinfo wid %"PRId64,
__func__, buf, dbstatus.newest_workinfoid);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
LOGWARNING("%s(): %s newest DB poolstats (ignored)", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec)
copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.oldest_sharesummary_firstshare_n));
else
copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.newest_sharesummary_firstshare_ay));
copy_tv(&start, &(dbstatus.sharesummary_firstshare));
reason = "sharesummary";
copy_tv(&start, &(dbstatus.newest_createdate_workmarker_workinfo));
reason = "workmarkers";
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) {
copy_tv(&start, &(dbstatus.newest_createdate_workinfo));
reason = "workinfo";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) {
copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
reason = "poolstats";
}
tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
@ -901,15 +890,18 @@ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid)
fclose(fp);
if (ret == 1 && !(kill(oldpid, 0))) {
if (!ckp->killold) {
LOGEMERG("Process %s pid %d still exists, start ckpool with -k if you wish to kill it",
LOGEMERG("Process %s pid %d still exists, start"
" ckpool with -k if you wish to kill it",
path, oldpid);
return false;
}
if (kill(oldpid, 9)) {
LOGEMERG("Unable to kill old process %s pid %d", path, oldpid);
LOGEMERG("Unable to kill old process %s pid %d",
path, oldpid);
return false;
}
LOGWARNING("Killing off old process %s pid %d", path, oldpid);
LOGWARNING("Killing off old process %s pid %d",
path, oldpid);
}
}
fp = fopen(path, "we");
@ -3076,7 +3068,7 @@ static void *summariser(__maybe_unused void *arg)
rename_proc("db_summariser");
while (!everyone_die && !startup_complete)
while (!everyone_die && !reload_queue_complete)
cksleep_ms(42);
summariser_using_data = true;
@ -3568,7 +3560,7 @@ static void *marker(__maybe_unused void *arg)
rename_proc("db_marker");
while (!everyone_die && !startup_complete)
while (!everyone_die && !reload_queue_complete)
cksleep_ms(42);
if (sharesummary_marks_limit) {
@ -3579,16 +3571,6 @@ static void *marker(__maybe_unused void *arg)
marker_using_data = true;
/* TODO: trigger this every workinfo change?
* note that history catch up would also mean the tigger would
* catch up at most 100 missing marks per shift
* however, also, a workinfo change means a sharesummary DB update,
* so would be best to (usually) wait until that is done
* OR: avoid writing the sharesummaries to the DB at all
* and only write the markersummaries? - since 100 workinfoid shifts
* will usually mean that markersummaries are less than every hour
* (and a reload processes more than an hour) */
while (!everyone_die) {
for (i = 0; i < 5; i++) {
if (!everyone_die)
@ -4439,6 +4421,7 @@ static bool reload_from(tv_t *start)
reloading = true;
copy_tv(&reload_timestamp, start);
// Go back further - one reload file
reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S;
tv_to_buf(start, buf, sizeof(buf));
@ -4790,6 +4773,7 @@ static void *listener(void *arg)
return NULL;
}
#if 0
/* TODO: This will be way faster traversing both trees simultaneously
* rather than traversing one and searching the other, then repeating
* in reverse. Will change it later */
@ -4893,6 +4877,7 @@ static void compare_summaries(K_TREE *leftsum, char *leftname,
diff_first, diff_last, cd_buf1, cd_buf2);
}
}
#endif
/* TODO: have a seperate option to find/store missing workinfo/shares/etc
* from the reload files, in a supplied UTC time range
@ -4905,6 +4890,9 @@ static void compare_summaries(K_TREE *leftsum, char *leftname,
* and the payment is now wrong */
static void confirm_reload()
{
#if 0
TODO: redo this using workmarkers
K_TREE *sharesummary_workinfoid_save;
__maybe_unused K_TREE *sharesummary_save;
__maybe_unused K_TREE *workinfo_save;
@ -5219,6 +5207,7 @@ static void confirm_reload()
compare_summaries(sharesummary_workinfoid_root, "ReLoad",
sharesummary_workinfoid_save, "DB",
true, false);
#endif
}
// TODO: handle workmarkers/markersummaries

43
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.094"
#define CKDB_VERSION DB_VERSION"-1.100"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -125,20 +125,12 @@ extern const char *addrpatt;
#define MAX_PAYADDR '~'
typedef struct loadstatus {
tv_t oldest_sharesummary_firstshare_n;
tv_t newest_sharesummary_firstshare_a;
tv_t newest_sharesummary_firstshare_ay;
tv_t sharesummary_firstshare; // whichever of above 2 used
tv_t oldest_sharesummary_firstshare_a;
tv_t newest_sharesummary_firstshare_y;
int64_t newest_workmarker_workinfoid;
int64_t newest_workinfoid;
tv_t newest_createdate_workmarker_workinfo;
tv_t newest_createdate_workinfo;
tv_t newest_createdate_poolstats;
tv_t newest_starttimeband_userstats;
tv_t newest_createdate_blocks;
int64_t oldest_workinfoid_n; // of oldest firstshare sharesummary n
int64_t oldest_workinfoid_a; // of oldest firstshare sharesummary a
int64_t newest_workinfoid_a; // of newest firstshare sharesummary a
int64_t newest_workinfoid_y; // of newest firstshare sharesummary y
} LOADSTATUS;
extern LOADSTATUS dbstatus;
@ -1366,10 +1358,6 @@ typedef struct sharesummary {
double sharerej;
int64_t sharecount;
int64_t errorcount;
int64_t countlastupdate; // non-DB field
bool inserted; // non-DB field
bool saveaged; // non-DB field
bool reset; // non-DB field
tv_t firstshare;
tv_t lastshare;
double lastdiffacc;
@ -2131,10 +2119,10 @@ extern cmp_t _cmp_height(char *coinbase1a, char *coinbase1b, WHERE_FFL_ARGS);
extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd, tv_t *ss_first,
tv_t *ss_last, int64_t *ss_count, int64_t *s_count,
int64_t *s_diff);
extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b);
extern void dsp_sharesummary(K_ITEM *item, FILE *stream);
@ -2152,8 +2140,8 @@ extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff);
extern K_ITEM *_find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid, bool pool);
extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername);
extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd);
extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd);
#define dbhash2btchash(_hash, _buf, _siz) \
_dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
@ -2245,6 +2233,8 @@ extern void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock);
(_res) == PGRES_TUPLES_OK || \
(_res) == PGRES_EMPTY_QUERY)
#define SQL_UNIQUE_VIOLATION "23505"
#define CKPQ_READ true
#define CKPQ_WRITE false
@ -2348,11 +2338,12 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar
char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
extern char *ooo_status(char *buf, size_t siz);
#define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \
_sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \
#define sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \
_sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \
WHERE_FFL_HERE)
extern bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS);
extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS);
extern bool sharesummary_fill(PGconn *conn);
extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
double diffacc, double diffinv, double shareacc,

108
src/ckdb_cmd.c

@ -683,7 +683,8 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by,
by, code, inet, cd, igndup, trf_root);
if (!ok) {
LOGERR("%s() %s.failed.DBE", __func__, id);
if (!igndup)
LOGERR("%s() %s.failed.DBE", __func__, id);
return strdup("failed.DBE");
}
LOGDEBUG("%s.ok.", id);
@ -698,13 +699,11 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id,
{
bool igndup = false;
// confirm_summaries() doesn't call this
if (reloading) {
if (tv_equal(cd, &(dbstatus.newest_createdate_blocks)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_blocks)))
return NULL;
}
/* confirm_summaries() doesn't call this
* We don't care about dups during reload since poolstats_fill()
* doesn't load all the data */
if (reloading)
igndup = true;
return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root);
}
@ -1965,20 +1964,18 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id,
K_ITEM *i_ntime, *i_reward;
bool igndup = false;
if (reloading && !confirm_sharesummary) {
if (tv_equal(cd, &(dbstatus.newest_createdate_workinfo)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_workinfo)))
return NULL;
}
i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz);
if (!i_workinfoid)
return strdup(reply);
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
if (workinfoid <= dbstatus.newest_workinfoid)
igndup = true;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
goto wiconf;
@ -2056,12 +2053,6 @@ wiconf:
K_ITEM *i_secondaryuserid;
bool ok;
// This just excludes the shares we certainly don't need
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz);
if (!i_nonce)
return strdup(reply);
@ -2070,9 +2061,28 @@ wiconf:
if (!i_workinfoid)
return strdup(reply);
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
/* ISDR (Ignored shares during reload)
* This will discard any shares older than the newest
* workinfoidend of any workmarker - including ready
* but not processed workmarkers
* This means that if a workmarker needs re-processing
* and all of it's shares need to be redone, that will
* require a seperate procedure to the reload
* This would be the (as yet non-existant)
* confirm_markersummary which will replace the
* now unusable confirm_sharesummary code
* However, if the workmarker simply just needs to be
* flagged as processed, this avoids the problem of
* duplicating shares before flagging it
*/
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
goto sconf;
@ -2151,12 +2161,6 @@ sconf:
K_ITEM *i_error, *i_secondaryuserid;
bool ok;
// This just excludes the shareerrors we certainly don't need
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_username = require_name(trf_root, "username", 1, NULL, reply, siz);
if (!i_username)
return strdup(reply);
@ -2165,6 +2169,13 @@ sconf:
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
// See comment 'ISDR' above for shares
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
@ -2216,15 +2227,18 @@ seconf:
tv_t ss_first, ss_last;
bool ok;
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
// This excludes any already summarised
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
@ -2237,27 +2251,19 @@ seconf:
if (!i_poolinstance)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
ok = workinfo_age(conn, workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
ok = workinfo_age(workinfoid, transfer_data(i_poolinstance),
by, code, inet, cd, &ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
if (!ok) {
LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id);
return strdup("failed.DATA");
} else {
/* Don't slow down the reload - do them later
* N.B. this means if you abort/terminate the reload,
* next restart will again go back to the oldest
* unaged sharesummary due to a pool terminate */
/* Don't slow down the reload - do them later */
if (!reloading) {
// Aging is a queued item thus the reply is ignored
auto_age_older(conn, workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd);
auto_age_older(workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd);
}
}
LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid);

38
src/ckdb_data.c

@ -1764,10 +1764,10 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx)
return item;
}
bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff)
// Duplicates during a reload are set to not show messages
bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item;
K_ITEM *wm_item, *tmp_item;
@ -1778,7 +1778,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
SHARESUMMARY looksharesummary, *sharesummary;
WORKINFO *workinfo;
SHARES lookshares, *shares;
bool ok = false, conned = false, skipupdate;
bool ok = false, skipupdate;
char error[1024];
LOGDEBUG("%s(): age", __func__);
@ -1857,14 +1857,9 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
}
if (!skipupdate) {
if (conn == NULL && !confirm_sharesummary) {
conn = dbconnect();
conned = true;
}
if (!sharesummary_update(conn, NULL, NULL, ss_item, by, code, inet, cd)) {
if (!sharesummary_update(NULL, NULL, ss_item, by, code, inet, cd)) {
ss_failed++;
LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64,
LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64,
__func__, sharesummary->userid,
sharesummary->workername,
sharesummary->workinfoid);
@ -1929,9 +1924,6 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
LOGERR("%s(): %s", __func__, error);
}
if (conned)
PQfinish(conn);
if (ss_already || ss_failed || shares_dumped) {
/* If all were already aged, and no shares
* then we don't want a message */
@ -2046,8 +2038,7 @@ void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff)
row->diffacc = row->diffsta = row->diffdup = row->diffhi =
row->diffrej = row->shareacc = row->sharesta = row->sharedup =
row->sharehi = row->sharerej = 0.0;
row->sharecount = row->errorcount = row->countlastupdate = 0;
row->reset = false;
row->sharecount = row->errorcount = 0;
row->firstshare.tv_sec = cd->tv_sec;
row->firstshare.tv_usec = cd->tv_usec;
row->lastshare.tv_sec = row->firstshare.tv_sec;
@ -2102,8 +2093,8 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
/* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */
void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int64_t prev_found = 0;
@ -2170,10 +2161,9 @@ void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(conn, do_id, poolinstance,
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
ok = workinfo_age(do_id, poolinstance, by, code, inet,
cd, &ss_first, &ss_last, &ss_count,
&s_count, &s_diff);
ss_count_tot += ss_count;
s_count_tot += s_count;
@ -4611,5 +4601,5 @@ void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock)
} else
row->orphans++;
if (lock)
K_WLOCK(userinfo_free);
K_WUNLOCK(userinfo_free);
}

497
src/ckdb_dbio.c

@ -2760,8 +2760,10 @@ bool workinfo_fill(PGconn *conn)
workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height);
k_add_head(workinfo_store, item);
if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate)))
if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) {
copy_tv(&(dbstatus.newest_createdate_workinfo), &(row->createdate));
dbstatus.newest_workinfoid = row->workinfoid;
}
tick();
}
@ -2834,16 +2836,6 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root)
// Reloading a share already summarised
return true;
}
if (!sharesummary->reset) {
_userinfo_update(NULL, sharesummary, NULL,
true, true);
zero_sharesummary(sharesummary,
&(shares->createdate),
shares->diff);
sharesummary->reset = true;
}
}
}
@ -2852,7 +2844,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root)
userinfo_update(shares, NULL, NULL);
}
sharesummary_update(conn, shares, NULL, NULL, shares->createby,
sharesummary_update(shares, NULL, NULL, shares->createby,
shares->createcode, shares->createinet,
&(shares->createdate));
@ -3147,20 +3139,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
FREENULL(st);
return false;
}
if (!sharesummary->reset) {
_userinfo_update(NULL, sharesummary, NULL,
true, true);
zero_sharesummary(sharesummary,
&(shareerrors->createdate),
0.0);
sharesummary->reset = true;
}
}
}
sharesummary_update(conn, NULL, shareerrors, NULL,
sharesummary_update(NULL, shareerrors, NULL,
shareerrors->createby,
shareerrors->createcode,
shareerrors->createinet,
@ -3768,6 +3750,8 @@ flail:
return ok;
}
// no longer used
#if 0
static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row)
{
p_row->diffacc += row->diffacc;
@ -3791,6 +3775,7 @@ static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row)
p_row->lastdiffacc = row->lastdiffacc;
}
}
#endif
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new,
@ -3866,22 +3851,18 @@ char *ooo_status(char *buf, size_t siz)
return buf;
}
bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS)
// No longer stored in the DB but fields are updated as before
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS)
{
ExecStatusType rescode;
PGresult *res = NULL;
WORKMARKERS *wm;
SHARESUMMARY *row, *p_row;
K_ITEM *item, *wm_item, *p_item = NULL;
char *ins, *upd;
bool ok = false, new = false, p_new = false;
char *params[19 + MODIFYDATECOUNT];
int n, par = 0;
bool new = false, p_new = false;
int64_t userid, workinfoid;
char *workername;
tv_t *createdate;
bool must_update = false, conned = false;
char *st = NULL, *db = NULL;
char ooo_buf[256];
double tdf, tdl;
@ -3897,7 +3878,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
}
item = ss_item;
DATA_SHARESUMMARY(row, item);
must_update = true;
row->complete[0] = SUMMARY_COMPLETE;
row->complete[1] = '\0';
} else {
@ -3958,8 +3938,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
row->workername = strdup(workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
row->workinfoid = workinfoid;
row->inserted = false;
row->saveaged = false;
}
// N.B. this directly updates the non-key data
@ -4018,7 +3996,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
}
}
// p_items are ram only
if (p_item) {
DATA_SHARESUMMARY(p_row, p_item);
} else {
@ -4036,161 +4013,9 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl);
}
// During startup, don't save 'new' sharesummaries, to reduce DB I/O
// ... and also during normal processing
if (row->complete[0] == SUMMARY_NEW)
goto startupskip;
if (conn == NULL && !confirm_sharesummary) {
conn = dbconnect();
conned = true;
}
if (new || !(row->inserted)) {
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into sharesummary "
"(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL ") values (" PQPARAM27 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
row->inserted = true;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
} else {
bool stats_update = false;
MODIFYUPDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) <
(row->sharecount + row->errorcount))
stats_update = true;
if (must_update && row->countlastupdate < (row->sharecount + row->errorcount))
stats_update = true;
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
if (stats_update) {
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 23, params);
upd = "update sharesummary "
"set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8,"
"shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12,"
"sharerej=$13,firstshare=$14,lastshare=$15,"
"sharecount=$16,errorcount=$17,lastdiffacc=$18,complete=$19"
","MDDB"=$20,"MBYDB"=$21,"MCODEDB"=$22,"MINETDB"=$23 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
} else {
if (!must_update) {
ok = true;
goto late;
} else {
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 8, params);
upd = "update sharesummary "
"set complete=$4,"MDDB"=$5,"MBYDB"=$6,"MCODEDB"=$7,"MINETDB"=$8 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("MustUpdate", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
}
}
}
startupskip:
ok = true;
unparam:
if (par) {
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
}
late:
if (conned)
PQfinish(conn);
// We keep the new item no matter what 'ok' is, since it will be inserted later
// Store either new item
if (new || p_new) {
K_WLOCK(sharesummary_free);
if (new) {
@ -4209,267 +4034,7 @@ late:
K_WUNLOCK(sharesummary_free);
}
return ok;
}
bool sharesummary_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ctx[1];
K_ITEM *item, *m_item, *p_item;
int n, i, par = 0, p_n;
SHARESUMMARY *row, *p_row;
MARKS *marks;
char *params[2];
char *field;
char *sel;
int fields = 19;
bool ok;
LOGDEBUG("%s(): select", __func__);
/* Load needs to go back to the last marks workinfoid(+1)
* If it is later than that, we can't create markersummaries
* since some of the required data is missing -
* thus we also can't make the shift markersummaries */
m_item = last_in_ktree(marks_root, ctx);
if (!m_item) {
if (dbload_workinfoid_start != -1) {
sharesummary_marks_limit = true;
LOGWARNING("WARNING: dbload -w start used "
"but there are no marks ...");
}
} else {
DATA_MARKS(marks, m_item);
if (dbload_workinfoid_start > marks->workinfoid) {
sharesummary_marks_limit = true;
LOGWARNING("WARNING: dbload -w start %"PRId64
" is after the last mark %"PRId64" ...",
dbload_workinfoid_start,
marks->workinfoid);
}
}
if (sharesummary_marks_limit) {
LOGWARNING("WARNING: ... markersummaries cannot be created "
"and pplns calculations may be wrong");
}
sel = "select "
"userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL
" from sharesummary where workinfoid>=$1 and workinfoid<=$2";
par = 0;
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
return false;
}
n = PQnfields(res);
if (n != (fields + MODIFYDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + MODIFYDATECOUNT, n);
PQclear(res);
return false;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
//K_WLOCK(sharesummary_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
break;
}
row->inserted = true;
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_PTR("workername", field, row->workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
PQ_GET_FLD(res, i, "workinfoid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("workinfoid", field, row->workinfoid);
PQ_GET_FLD(res, i, "diffacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffacc", field, row->diffacc);
PQ_GET_FLD(res, i, "diffsta", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffsta", field, row->diffsta);
PQ_GET_FLD(res, i, "diffdup", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffdup", field, row->diffdup);
PQ_GET_FLD(res, i, "diffhi", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffhi", field, row->diffhi);
PQ_GET_FLD(res, i, "diffrej", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffrej", field, row->diffrej);
PQ_GET_FLD(res, i, "shareacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("shareacc", field, row->shareacc);
PQ_GET_FLD(res, i, "sharesta", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharesta", field, row->sharesta);
PQ_GET_FLD(res, i, "sharedup", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharedup", field, row->sharedup);
PQ_GET_FLD(res, i, "sharehi", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharehi", field, row->sharehi);
PQ_GET_FLD(res, i, "sharerej", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharerej", field, row->sharerej);
PQ_GET_FLD(res, i, "sharecount", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("sharecount", field, row->sharecount);
PQ_GET_FLD(res, i, "errorcount", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("errorcount", field, row->errorcount);
row->countlastupdate = row->sharecount + row->errorcount;
PQ_GET_FLD(res, i, "firstshare", field, ok);
if (!ok)
break;
TXT_TO_TV("firstshare", field, row->firstshare);
PQ_GET_FLD(res, i, "lastshare", field, ok);
if (!ok)
break;
TXT_TO_TV("lastshare", field, row->lastshare);
PQ_GET_FLD(res, i, "lastdiffacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc);
PQ_GET_FLD(res, i, "complete", field, ok);
if (!ok)
break;
TXT_TO_STR("complete", field, row->complete);
MODIFYDATEFLDPOINTERS(sharesummary_free, res, i, row, ok);
if (!ok)
break;
sharesummary_root = add_to_ktree(sharesummary_root, item, cmp_sharesummary);
sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root, item, cmp_sharesummary_workinfoid);
k_add_head(sharesummary_store, item);
// A share summary is shares in a single workinfo, at all 3 levels n,a,y
if (tolower(row->complete[0]) == SUMMARY_NEW) {
if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) {
copy_tv(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare));
dbstatus.oldest_workinfoid_n = row->workinfoid;
}
} else {
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare)))
copy_tv(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare));
if (tolower(row->complete[0]) == SUMMARY_COMPLETE) {
if (dbstatus.oldest_sharesummary_firstshare_a.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare))) {
copy_tv(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare));
dbstatus.oldest_workinfoid_a = row->workinfoid;
}
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare))) {
copy_tv(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare));
dbstatus.newest_workinfoid_a = row->workinfoid;
}
} else /* SUMMARY_CONFIRM */ {
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare))) {
copy_tv(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare));
dbstatus.newest_workinfoid_y = row->workinfoid;
}
}
}
p_item = find_sharesummary_p(row->workinfoid);
if (!p_item) {
p_item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
POOL_SS(p_row);
LIST_MEM_ADD(sharesummary_free, p_row->workername);
p_row->workinfoid = row->workinfoid;
sharesummary_pool_root = add_to_ktree(sharesummary_pool_root,
p_item,
cmp_sharesummary);
k_add_head(sharesummary_pool_store, p_item);
} else {
DATA_SHARESUMMARY(p_row, p_item);
}
sharesummary_to_pool(p_row, row);
_userinfo_update(NULL, row, NULL, false, false);
tick();
}
if (!ok) {
FREENULL(row->workername);
k_add_head(sharesummary_free, item);
}
p_n = sharesummary_pool_store->count;
//K_WUNLOCK(sharesummary_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d sharesummary records", __func__, n);
LOGWARNING("%s(): created %d sharesummary pool records", __func__, p_n);
}
return ok;
return true;
}
bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
@ -5855,7 +5420,15 @@ bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
bool show_msg = true;
char *code;
if (igndup) {
code = PQresultErrorField(res, PG_DIAG_SQLSTATE);
if (code && strcmp(code, SQL_UNIQUE_VIOLATION) == 0)
show_msg = false;
}
if (show_msg)
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
@ -6750,7 +6323,8 @@ bool workmarkers_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
K_ITEM *item, *wi_item;
WORKINFO *workinfo;
int n, i;
WORKMARKERS *row;
char *field;
@ -6837,6 +6411,23 @@ bool workmarkers_fill(PGconn *conn)
item, cmp_workmarkers_workinfoid);
k_add_head(workmarkers_store, item);
if (dbstatus.newest_workmarker_workinfoid < row->workinfoidend) {
dbstatus.newest_workmarker_workinfoid = row->workinfoidend;
wi_item = find_workinfo(row->workinfoidend, NULL);
if (!wi_item) {
LOGEMERG("%s(): FAILURE workmarkerid %"PRId64
" wid end %"PRId64" doesn't exist! "
"You should abort ckdb and fix it, "
" since the reload may skip some data",
__func__, row->markerid,
row->workinfoidend);
} else {
DATA_WORKINFO(workinfo, wi_item);
copy_tv(&(dbstatus.newest_createdate_workmarker_workinfo),
&(workinfo->createdate));
}
}
tick();
}
if (!ok)

Loading…
Cancel
Save