Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 10 years ago
parent
commit
d2baf22052
  1. 252
      src/ckdb.c
  2. 30
      src/ckdb.h
  3. 253
      src/ckdb_cmd.c
  4. 107
      src/ckdb_data.c
  5. 379
      src/ckdb_dbio.c

252
src/ckdb.c

@ -69,9 +69,6 @@
* ckdb aborting and needing a complete restart resolves it
* The users table, required for the authorise messages, is always updated
* immediately and is not affected by ckpool messages until we
* During the reload, when checking the timeframe for summarisation, we
* use the current last userstats createdate as 'now' to avoid touching a
* timeframe where data could still be waiting to be loaded
*/
/* Reload data needed
@ -97,16 +94,7 @@
* TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting
* requirements
* DB+RAM userstats: start of the time band of the latest DB record,
* since all data before this has been summarised to the DB
* The userstats summarisation always processes the oldest
* RAM data to the DB
* TODO: multiple pools is not yet handled by ckdb
* TODO: handle a pool restart with a different instance name
* since this would always make the userstats reload point
* just after the instance name was changed - however
* this can be handled for now by simply ignoring the
* poolinstance
* RAM userstats: none (we simply store the last one found)
* DB+RAM workers: created by auths so auths will resolve it
* DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any)
* will be after the last DB workinfo
@ -137,15 +125,12 @@
* Any data that matches the reload stamp is processed with an
* ignore duplicates flag for all except as below.
* Any data after the stamp, is processed normally for all except:
* 1) userstats: any record that falls in a DB userstats that would
* summarise that record is discarded,
* otherwise the userstats is processed normally
* 2) shares/shareerrors: any record that matches an incomplete DB
* 1) shares/shareerrors: any record that matches an incomplete DB
* sharesummary that hasn't been reset, will reset the sharesummary
* so that the sharesummary will be recalculated
* The record is processed normally with or without the reset
* If the sharesummary is complete, the record is discarded
* 3) ageworkinfo records are also handled by the shares date
* 2) ageworkinfo records are also handled by the shares date
* while processing, any records already aged are not updated
* and a warning is displayed if there were any matching shares
* Any ageworkinfos that match a workmarker are ignored with an error
@ -431,21 +416,16 @@ K_LIST *auths_free;
K_STORE *auths_store;
// POOLSTATS poolstats.id.json={...}
// TODO: redo like userstats, but every 10min
K_TREE *poolstats_root;
K_LIST *poolstats_free;
K_STORE *poolstats_store;
// USERSTATS userstats.id.json={...}
K_TREE *userstats_root;
K_TREE *userstats_statsdate_root; // ordered by statsdate first
K_TREE *userstats_workerstatus_root; // during data load
K_LIST *userstats_free;
K_STORE *userstats_store;
// Awaiting EOS
K_STORE *userstats_eos_store;
// Temporary while summarising
K_STORE *userstats_summ;
// WORKERSTATUS from various incoming data
K_TREE *workerstatus_root;
@ -779,9 +759,7 @@ static bool getdata3()
if (!confirm_sharesummary) {
if (!(ok = useratts_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = poolstats_fill(conn)) || everyone_die)
goto sukamudai;
ok = userstats_fill(conn);
ok = poolstats_fill(conn);
}
sukamudai:
@ -810,8 +788,6 @@ static bool reload()
LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_starttimeband_userstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB userstats start timeband", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
@ -834,10 +810,6 @@ static bool reload()
copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
reason = "poolstats";
}
if (!tv_newer(&start, &(dbstatus.newest_starttimeband_userstats))) {
copy_tv(&start, &(dbstatus.newest_starttimeband_userstats));
reason = "userstats";
}
tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
@ -1048,10 +1020,7 @@ static void alloc_storage()
ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_free);
userstats_eos_store = k_new_store(userstats_free);
userstats_summ = k_new_store(userstats_free);
userstats_root = new_ktree();
userstats_statsdate_root = new_ktree();
userstats_workerstatus_root = new_ktree();
userstats_free->dsp_func = dsp_userstats;
workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS),
@ -1141,10 +1110,6 @@ static void dealloc_storage()
LOGWARNING("%s() userstats ...", __func__);
FREE_TREE(userstats_workerstatus);
FREE_TREE(userstats_statsdate);
if (userstats_summ)
userstats_summ = k_free_store(userstats_summ);
FREE_STORE(userstats_eos);
FREE_ALL(userstats);
@ -1239,8 +1204,6 @@ static bool setup_data()
workerstatus_ready();
userstats_workerstatus_root = free_ktree(userstats_workerstatus_root, NULL);
workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) {
DATA_WORKINFO(wic, workinfo_current);
@ -1725,209 +1688,6 @@ static void summarise_poolstats()
// TODO
}
// TODO: daily
// TODO: consider limiting how much/how long this processes each time
static void summarise_userstats()
{
K_TREE_CTX ctx[1];
K_ITEM *first, *last, *new, *next, *tmp;
USERSTATS *userstats, *us_first, *us_last, *us_next;
double statrange, factor;
bool locked, upgrade, issix, sixdiff;
tv_t now, process, when;
PGconn *conn = NULL;
int count, sixcount;
char error[1024];
char tvbuf1[DATE_BUFSIZ], tvbuf2[DATE_BUFSIZ];
upgrade = false;
locked = false;
while (1764) {
error[0] = '\0';
setnow(&now);
upgrade = false;
locked = true;
K_ILOCK(userstats_free);
// confirm_summaries() doesn't call this
if (!reloading)
copy_tv(&process, &now);
else {
// During reload, base the check date on the newest statsdate
last = last_in_ktree(userstats_statsdate_root, ctx);
if (!last)
break;
DATA_USERSTATS(us_last, last);
copy_tv(&process, &us_last->statsdate);
}
first = first_in_ktree(userstats_statsdate_root, ctx);
DATA_USERSTATS_NULL(us_first, first);
// Oldest non DB stat
// TODO: make the index start with summarylevel? so can find faster
while (first && us_first->summarylevel[0] != SUMMARY_NONE) {
first = next_in_ktree(ctx);
DATA_USERSTATS_NULL(us_first, first);
}
if (!first)
break;
statrange = tvdiff(&process, &(us_first->statsdate));
// Is there data ready for summarising?
if (statrange <= USERSTATS_AGE)
break;
copy_tv(&when, &(us_first->statsdate));
/* Convert when to the start of the timeframe after the one it is in
* assume timeval ignores leapseconds ... */
when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S;
when.tv_usec = 0;
// Is the whole timerange up to before 'when' ready for summarising?
statrange = tvdiff(&process, &when);
if (statrange < USERSTATS_AGE)
break;
next = next_in_ktree(ctx);
upgrade = true;
issix = us_first->six;
sixdiff = false;
K_ULOCK(userstats_free);
new = k_unlink_head(userstats_free);
DATA_USERSTATS(userstats, new);
memcpy(userstats, us_first, sizeof(USERSTATS));
userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first,
cmp_userstats_statsdate);
k_unlink_item(userstats_store, first);
k_add_head(userstats_summ, first);
count = 1;
sixcount = issix ? 1 : 0;
while (next) {
DATA_USERSTATS(us_next, next);
statrange = tvdiff(&when, &(us_next->statsdate));
if (statrange <= 0)
break;
tmp = next_in_ktree(ctx);
if (us_next->summarylevel[0] == SUMMARY_NONE &&
us_next->userid == userstats->userid &&
strcmp(us_next->workername, userstats->workername) == 0) {
if (us_next->six != issix)
sixdiff = true;
count++;
sixcount += us_next->six ? 1 : 0;
userstats->hashrate += us_next->hashrate;
userstats->hashrate5m += us_next->hashrate5m;
userstats->hashrate1hr += us_next->hashrate1hr;
userstats->hashrate24hr += us_next->hashrate24hr;
if (userstats->elapsed > us_next->elapsed)
userstats->elapsed = us_next->elapsed;
userstats->summarycount += us_next->summarycount;
userstats_root = remove_from_ktree(userstats_root,
next, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root,
next,
cmp_userstats_statsdate);
k_unlink_item(userstats_store, next);
k_add_head(userstats_summ, next);
}
next = tmp;
}
// Can temporarily release the lock since all our data is now not part of the lock
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
upgrade = false;
locked = false;
if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0)
userstats->idle = false;
else
userstats->idle = true;
userstats->summarylevel[0] = SUMMARY_DB;
userstats->summarylevel[1] = '\0';
if (issix && !sixdiff) {
// Expect 6 per poolinstance
factor = (double)count / 6.0;
} else {
// For now ... new format is still 6 per hour
factor = (double)count / 6.0;
}
userstats->hashrate *= factor;
userstats->hashrate5m *= factor;
userstats->hashrate1hr *= factor;
userstats->hashrate24hr *= factor;
copy_tv(&(userstats->statsdate), &when);
// Stats to the end of this timeframe
userstats->statsdate.tv_sec -= 1;
userstats->statsdate.tv_usec = 999999;
// This is simply when it was written, so 'now' is fine
SIMPLEDATEDEFAULT(userstats, &now);
if (!conn)
conn = dbconnect();
if (!userstats_add_db(conn, userstats)) {
/* This should only happen if a restart finds data
that wasn't found during the reload but is in
the same timeframe as DB data
i.e. it shouldn't happen, but keep the summary anyway */
when.tv_sec -= USERSTATS_DB_S;
tv_to_buf(&when, tvbuf1, sizeof(tvbuf1));
tv_to_buf(&(userstats->statsdate), tvbuf2, sizeof(tvbuf2));
snprintf(error, sizeof(error),
"Userid %"PRId64" Worker %s, %d userstats record%s "
"discarded from %s to %s",
userstats->userid,
userstats->workername,
count, (count == 1 ? "" : "s"),
tvbuf1, tvbuf2);
}
// The flags are not needed
//upgrade = true;
//locked = true;
K_WLOCK(userstats_free);
k_list_transfer_to_tail(userstats_summ, userstats_free);
k_add_head(userstats_store, new);
userstats_root = add_to_ktree(userstats_root, new, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new,
cmp_userstats_statsdate);
K_WUNLOCK(userstats_free);
//locked = false;
//upgrade = false;
if (error[0])
LOGERR("%s", error);
}
if (locked) {
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
}
if (conn)
PQfinish(conn);
}
static void *summariser(__maybe_unused void *arg)
{
int i;
@ -1968,10 +1728,6 @@ static void *summariser(__maybe_unused void *arg)
if (!everyone_die)
sleep(1);
}
if (everyone_die)
break;
else
summarise_userstats();
}
summariser_using_data = false;

30
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.837"
#define CKDB_VERSION DB_VERSION"-0.841"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1196,8 +1196,14 @@ extern K_TREE *poolstats_root;
extern K_LIST *poolstats_free;
extern K_STORE *poolstats_store;
// USERSTATS userstats.id.json={...}
// Pool sends each user (staggered) once per 10m
/* USERSTATS userstats.id.json={...}
* Pool sends each user (staggered) once per 10m
* As from CKDB V0.840 we don't store any userstats
* other than the last one we received,
* and we don't store them in the DB at all.
* Historical stats will come from markersummary
* Most of the #defines for USERSTATS are no longer needed
* but are left here for now for historical reference */
typedef struct userstats {
char poolinstance[TXT_BIG+1];
int64_t userid;
@ -1208,16 +1214,15 @@ typedef struct userstats {
double hashrate1hr;
double hashrate24hr;
bool idle; // Non-db field
bool six; // Non-db field
char summarylevel[TXT_FLAG+1]; // Initially SUMMARY_NONE in RAM
char summarylevel[TXT_FLAG+1]; // SUMMARY_NONE in RAM
int32_t summarycount;
tv_t statsdate;
SIMPLEDATECONTROLFIELDS;
} USERSTATS;
/* USERSTATS protocol includes a boolean 'eos' that when true,
* we have received the full set of data for the given
* createdate batch, and thus can move all (complete) records
/* The old USERSTATS protocol included a boolean 'eos' that when true,
* we had received the full set of data for the given
* createdate batch, and thus could move all (complete) records
* matching the createdate from userstats_eos_store into the tree */
#define ALLOC_USERSTATS 10000
@ -1227,14 +1232,10 @@ typedef struct userstats {
#define DATA_USERSTATS_NULL(_var, _item) DATA_GENERIC(_var, _item, userstats, false)
extern K_TREE *userstats_root;
extern K_TREE *userstats_statsdate_root; // ordered by statsdate first
extern K_TREE *userstats_workerstatus_root; // during data load
extern K_LIST *userstats_free;
extern K_STORE *userstats_store;
// Awaiting EOS
extern K_STORE *userstats_eos_store;
// Temporary while summarising
extern K_STORE *userstats_summ;
/* 1.5 x how often we expect to get user's stats from ckpool
* This is used when grouping the sub-worker stats into a single user
@ -1659,10 +1660,7 @@ extern cmp_t cmp_auths(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_poolstats(K_ITEM *a, K_ITEM *b);
extern void dsp_userstats(K_ITEM *item, FILE *stream);
extern cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate);
extern K_ITEM *find_userstats(int64_t userid, char *workername);
extern void dsp_markersummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b);

253
src/ckdb_cmd.c

@ -1147,17 +1147,15 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
{
K_ITEM w_look, *w_item, us_look, *us_item, *ws_item;
K_TREE_CTX w_ctx[1], us_ctx[1], pay_ctx[1];
K_TREE_CTX w_ctx[1], pay_ctx[1];
WORKERS lookworkers, *workers;
WORKERSTATUS *workerstatus;
USERSTATS lookuserstats, *userstats;
USERSTATS *userstats;
char tmp[1024];
char *buf;
size_t len, off;
int rows;
K_TREE *userstats_workername_root = new_ktree();
K_TREE_CTX usw_ctx[1];
double t_hashrate5m = 0, t_hashrate1hr = 0;
double t_hashrate24hr = 0;
double t_diffacc = 0, t_diffinv = 0;
@ -1214,36 +1212,19 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
t_sharerej += workerstatus->sharerej;
}
// find last stored userstats record
lookuserstats.userid = users->userid;
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
// find/cmp doesn't get to here
lookuserstats.poolinstance[0] = '\0';
lookuserstats.workername[0] = '\0';
us_look.data = (void *)(&lookuserstats);
/* TODO: workers_root userid+worker is ordered
* so no 'find' should be needed -
* just cmp to last 'unused us_item' userid+worker
* then step it forward to be the next ready 'unused' */
K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &us_look, cmp_userstats, us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
while (us_item && userstats->userid == lookuserstats.userid) {
if (strcmp(userstats->workername, workers->workername) == 0) {
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
if (!find_in_ktree(userstats_workername_root, us_item,
cmp_userstats_workername, usw_ctx)) {
t_hashrate5m += userstats->hashrate5m;
t_hashrate1hr += userstats->hashrate1hr;
t_hashrate24hr += userstats->hashrate24hr;
userstats_workername_root =
add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
}
} else
break;
us_item = find_userstats(users->userid, workers->workername);
if (us_item) {
DATA_USERSTATS(userstats, us_item);
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
t_hashrate5m += userstats->hashrate5m;
t_hashrate1hr += userstats->hashrate1hr;
t_hashrate24hr += userstats->hashrate24hr;
}
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
}
K_RUNLOCK(userstats_free);
}
@ -1251,8 +1232,6 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
DATA_WORKERS_NULL(workers, w_item);
}
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
// Calculate total payratio
paytotal = 0;
K_RLOCK(paymentaddresses_free);
@ -1394,11 +1373,11 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root)
{
K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item;
K_ITEM *ua_item, us_look, *us_item, *ws_item;
K_TREE_CTX w_ctx[1], us_ctx[1];
K_ITEM *ua_item, *us_item, *ws_item;
K_TREE_CTX w_ctx[1];
WORKERS lookworkers, *workers;
WORKERSTATUS *workerstatus;
USERSTATS lookuserstats, *userstats;
USERSTATS *userstats;
USERS *users;
char reply[1024] = "";
char tmp[1024];
@ -1450,7 +1429,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp);
INIT_WORKERS(&w_look);
INIT_USERSTATS(&us_look);
lookworkers.userid = users->userid;
lookworkers.workername[0] = '\0';
@ -1479,8 +1457,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp);
if (stats) {
K_TREE *userstats_workername_root = new_ktree();
K_TREE_CTX usw_ctx[1];
double w_hashrate5m, w_hashrate1hr;
double w_hashrate24hr;
int64_t w_elapsed;
@ -1523,39 +1499,23 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
w_sharerej = workerstatus->sharerej;
}
// find last stored userid record
lookuserstats.userid = users->userid;
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
// find/cmp doesn't get to here
lookuserstats.poolinstance[0] = '\0';
lookuserstats.workername[0] = '\0';
us_look.data = (void *)(&lookuserstats);
/* TODO: workers_root userid+worker is ordered
* so no 'find' should be needed -
* just cmp to last 'unused us_item' userid+worker
* then step it forward to be the next ready 'unused' */
K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &us_look, cmp_userstats, us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
while (us_item && userstats->userid == lookuserstats.userid) {
if (strcmp(userstats->workername, workers->workername) == 0) {
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
// TODO: add together the latest per pool instance (this is the latest per worker)
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx)) {
w_hashrate5m += userstats->hashrate5m;
w_hashrate1hr += userstats->hashrate1hr;
w_hashrate24hr += userstats->hashrate24hr;
if (w_elapsed == -1 || w_elapsed > userstats->elapsed)
w_elapsed = userstats->elapsed;
userstats_workername_root = add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
}
} else
break;
us_item = find_userstats(users->userid, workers->workername);
if (us_item) {
DATA_USERSTATS(userstats, us_item);
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
w_hashrate5m += userstats->hashrate5m;
w_hashrate1hr += userstats->hashrate1hr;
w_hashrate24hr += userstats->hashrate24hr;
if (w_elapsed == -1 || w_elapsed > userstats->elapsed)
w_elapsed = userstats->elapsed;
}
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
}
K_RUNLOCK(userstats_free);
double_to_buf(w_hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "w_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
@ -1628,9 +1588,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
double_to_buf(w_sharerej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "w_sharerej:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_RUNLOCK(userstats_free);
}
rows++;
@ -1665,125 +1622,88 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd,
__maybe_unused K_TREE *trf_root)
{
K_TREE *userstats_workername_root = new_ktree();
K_ITEM *us_item, *usw_item, *tmp_item, *u_item;
K_TREE_CTX us_ctx[1], usw_ctx[1];
USERSTATS *userstats, *userstats_w;
K_STORE *usu_store = k_new_store(userstats_free);
K_ITEM *us_item, *usu_item, *u_item;
K_TREE_CTX us_ctx[1];
USERSTATS *userstats, *userstats_u = NULL;
USERS *users;
char reply[1024] = "";
char tmp[1024];
char *buf;
size_t len, off;
int rows;
int64_t userid = -1;
double u_hashrate5m = 0.0;
double u_hashrate1hr = 0.0;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
// TODO: this really should just get the last value of each client_id (within the time limit)
// Find last records for each user/worker in ALLUSERS_LIMIT_S
// TODO: include pool_instance
/* Sum up all recent userstats without workername
* i.e. userstasts per username */
K_WLOCK(userstats_free);
us_item = last_in_ktree(userstats_statsdate_root, us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
while (us_item && tvdiff(now, &(userstats->statsdate)) < ALLUSERS_LIMIT_S) {
usw_item = find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx);
if (!usw_item) {
usw_item = k_unlink_head(userstats_free);
DATA_USERSTATS(userstats_w, usw_item);
userstats_w->userid = userstats->userid;
strcpy(userstats_w->workername, userstats->workername);
userstats_w->hashrate5m = userstats->hashrate5m;
userstats_w->hashrate1hr = userstats->hashrate1hr;
userstats_workername_root = add_to_ktree(userstats_workername_root, usw_item, cmp_userstats_workername);
us_item = first_in_ktree(userstats_root, us_ctx);
while (us_item) {
DATA_USERSTATS(userstats, us_item);
if (tvdiff(now, &(userstats->statsdate)) < ALLUSERS_LIMIT_S) {
if (!userstats_u || userstats->userid != userstats_u->userid) {
usu_item = k_unlink_head(userstats_free);
DATA_USERSTATS(userstats_u, usu_item);
userstats_u->userid = userstats->userid;
/* Remember the first workername for if we ever
* get the missing user LOGERR message below */
STRNCPY(userstats_u->workername, userstats->workername);
userstats_u->hashrate5m = userstats->hashrate5m;
userstats_u->hashrate1hr = userstats->hashrate1hr;
k_add_head(usu_store, usu_item);
} else {
userstats_u->hashrate5m += userstats->hashrate5m;
userstats_u->hashrate1hr += userstats->hashrate1hr;
}
}
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
us_item = next_in_ktree(us_ctx);
}
K_WUNLOCK(userstats_free);
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok.");
rows = 0;
// Add up per user
usw_item = first_in_ktree(userstats_workername_root, usw_ctx);
while (usw_item) {
DATA_USERSTATS(userstats_w, usw_item);
if (userstats_w->userid != userid) {
if (userid != -1) {
K_RLOCK(users_free);
u_item = find_userid(userid);
K_RUNLOCK(users_free);
if (!u_item) {
LOGERR("%s() userid %"PRId64" ignored - userstats but not users",
__func__, userid);
} else {
DATA_USERS(users, u_item);
str_to_buf(users->username, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "username:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
bigint_to_buf(userid, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "userid:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate1hr, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate1hr:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows++;
}
}
userid = userstats_w->userid;
u_hashrate5m = 0;
u_hashrate1hr = 0;
}
u_hashrate5m += userstats_w->hashrate5m;
u_hashrate1hr += userstats_w->hashrate1hr;
tmp_item = usw_item;
usw_item = next_in_ktree(usw_ctx);
k_add_head(userstats_free, tmp_item);
}
if (userid != -1) {
usu_item = usu_store->head;
while (usu_item) {
DATA_USERSTATS(userstats_u, usu_item);
K_RLOCK(users_free);
u_item = find_userid(userid);
u_item = find_userid(userstats_u->userid);
K_RUNLOCK(users_free);
if (!u_item) {
LOGERR("%s() userid %"PRId64" ignored - userstats but not users",
__func__, userid);
LOGERR("%s() userstats, but not users, "
"ignored %"PRId64"/%s",
__func__, userstats_u->userid,
userstats_u->workername);
} else {
DATA_USERS(users, u_item);
str_to_buf(users->username, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "username:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
bigint_to_buf(userid, reply, sizeof(reply));
bigint_to_buf(users->userid, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "userid:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate5m, reply, sizeof(reply));
double_to_buf(userstats_u->hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate1hr, reply, sizeof(reply));
double_to_buf(userstats_u->hashrate1hr, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate1hr:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows++;
}
usu_item = usu_item->next;
}
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_WLOCK(userstats_free);
k_list_transfer_to_head(usu_store, userstats_free);
K_WUNLOCK(userstats_free);
k_free_store(usu_store);
snprintf(tmp, sizeof(tmp),
"rows=%d%cflds=%s%c",
@ -2496,7 +2416,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
BLOCKS *blocks;
USERS *users;
int64_t u_elapsed;
K_TREE_CTX ctx[1], w_ctx[1];
K_TREE_CTX ctx[1];
size_t len, off;
bool has_uhr;
@ -2660,40 +2580,29 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
has_uhr = false;
if (p_item && u_item) {
K_TREE *userstats_workername_root = new_ktree();
u_hashrate5m = u_hashrate1hr = 0.0;
u_elapsed = -1;
// find last stored userid record
lookuserstats.userid = users->userid;
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
// find/cmp doesn't get to here
STRNCPY(lookuserstats.poolinstance, EMPTY);
/* find last matching userid record - before userid+1
* Use 'before' in case there is (unexpectedly) a userstats
* with an empty workername */
lookuserstats.userid = users->userid+1;
STRNCPY(lookuserstats.workername, EMPTY);
INIT_USERSTATS(&look);
look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &look, cmp_userstats, ctx);
DATA_USERSTATS_NULL(userstats, us_item);
while (us_item && userstats->userid == lookuserstats.userid &&
tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
// TODO: add the latest per pool instance (this is the latest per worker)
// Ignore summarised data from the DB, it should be old so irrelevant
if (userstats->poolinstance[0] &&
!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, w_ctx)) {
while (us_item && userstats->userid == users->userid) {
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
u_hashrate5m += userstats->hashrate5m;
u_hashrate1hr += userstats->hashrate1hr;
if (u_elapsed == -1 || u_elapsed > userstats->elapsed)
u_elapsed = userstats->elapsed;
has_uhr = true;
userstats_workername_root = add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
}
us_item = prev_in_ktree(ctx);
DATA_USERSTATS_NULL(userstats, us_item);
}
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_RUNLOCK(userstats_free);
}
@ -4017,7 +3926,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(miningpayouts, 1, 1);
USEINFO(auths, 1, 1);
USEINFO(poolstats, 1, 1);
USEINFO(userstats, 4, 2);
USEINFO(userstats, 2, 1);
USEINFO(workerstatus, 1, 1);
USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0);

107
src/ckdb_data.c

@ -722,10 +722,10 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
TODO: combine set_block_share_counters() with this? */
void workerstatus_ready()
{
K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1], ms_ctx[1];
K_TREE_CTX ws_ctx[1], ss_ctx[1], ms_ctx[1];
K_ITEM *ws_item, us_look, ss_look, *us_item, *ss_item;
K_ITEM *ms_item, ms_look, *wm_item;
USERSTATS lookuserstats, *userstats;
USERSTATS *userstats;
SHARESUMMARY looksharesummary, *sharesummary;
MARKERSUMMARY *markersummary;
WORKERSTATUS *workerstatus;
@ -739,15 +739,10 @@ void workerstatus_ready()
while (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item);
// The last one
lookuserstats.userid = workerstatus->userid;
STRNCPY(lookuserstats.workername, workerstatus->workername);
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
us_look.data = (void *)(&lookuserstats);
// Zero or one
K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look,
cmp_userstats_workerstatus, us_ctx);
us_item = find_userstats(workerstatus->userid,
workerstatus->workername);
K_RUNLOCK(userstats_free);
if (us_item) {
DATA_USERSTATS(userstats, us_item);
@ -2279,28 +2274,8 @@ void dsp_userstats(K_ITEM *item, FILE *stream)
}
}
/* order by userid asc,statsdate asc,poolinstance asc,workername asc
as per required for userstats homepage summarisation */
/* order by userid asc,workername asc */
cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b);
cmp_t c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0) {
c = CMP_STR(ua->poolinstance, ub->poolinstance);
if (c == 0)
c = CMP_STR(ua->workername, ub->workername);
}
}
return c;
}
/* order by userid asc,workername asc
temporary tree for summing userstats when sending user homepage info */
cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
@ -2311,70 +2286,18 @@ cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
return c;
}
/* order by statsdate,userid asc,statsdate asc,workername asc,poolinstance asc
as per required for DB summarisation */
cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b)
K_ITEM *find_userstats(int64_t userid, char *workername)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b);
cmp_t c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0) {
c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_STR(ua->workername, ub->workername);
if (c == 0)
c = CMP_STR(ua->poolinstance, ub->poolinstance);
}
}
return c;
}
/* order by userid asc,workername asc,statsdate asc,poolinstance asc
built during data load to update workerstatus at the end of the load
and used during reload to discard stats already in the DB */
cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b);
cmp_t c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_STR(ua->workername, ub->workername);
if (c == 0) {
c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0)
c = CMP_STR(ua->poolinstance, ub->poolinstance);
}
}
return c;
}
USERSTATS userstats;
K_TREE_CTX ctx[1];
K_ITEM look;
bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate)
{
char buf[DATE_BUFSIZ+1];
userstats.userid = userid;
STRNCPY(userstats.workername, workername);
copy_tv(statsdate, &(row->statsdate));
// Start of this timeband
switch (row->summarylevel[0]) {
case SUMMARY_DB:
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S;
statsdate->tv_usec = 0;
break;
case SUMMARY_FULL:
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS;
statsdate->tv_usec = 0;
break;
default:
tv_to_buf(statsdate, buf, sizeof(buf));
// Bad userstats are not fatal
LOGERR("Unknown userstats summarylevel 0x%02x '%c' "
"userid %"PRId64" workername %s statsdate %s",
row->summarylevel[0], row->summarylevel[0],
row->userid, row->workername, buf);
return false;
}
return true;
INIT_USERSTATS(&look);
look.data = (void *)(&userstats);
return find_in_ktree(userstats_root, &look, cmp_userstats, ctx);
}
void dsp_markersummary(K_ITEM *item, FILE *stream)

379
src/ckdb_dbio.c

@ -5047,70 +5047,15 @@ clean:
return ok;
}
bool userstats_add_db(PGconn *conn, USERSTATS *row)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
char *ins;
bool ok = false;
char *params[10 + SIMPLEDATECOUNT];
int n, par = 0;
LOGDEBUG("%s(): store", __func__);
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->elapsed, NULL, 0);
params[par++] = double_to_buf(row->hashrate, NULL, 0);
params[par++] = double_to_buf(row->hashrate5m, NULL, 0);
params[par++] = double_to_buf(row->hashrate1hr, NULL, 0);
params[par++] = double_to_buf(row->hashrate24hr, NULL, 0);
params[par++] = str_to_buf(row->summarylevel, NULL, 0);
params[par++] = int_to_buf(row->summarycount, NULL, 0);
params[par++] = tv_to_buf(&(row->statsdate), NULL, 0);
SIMPLEDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into userstats "
"(userid,workername,elapsed,hashrate,hashrate5m,hashrate1hr,"
"hashrate24hr,summarylevel,summarycount,statsdate"
SIMPLEDATECONTROL ") values (" PQPARAM14 ")";
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
free(params[n]);
return ok;
}
// This is to RAM. The summariser calls the DB I/O functions for userstats
// To RAM
bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool idle,
bool eos, char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
K_ITEM *us_item, *u_item, *us_match, *us_next, look;
tv_t eosdate;
USERSTATS *row, cmp, *match, *next;
K_ITEM *us_item, *u_item, *us_match, *us_next;
USERSTATS *row, *match, *next;
USERS *users;
K_TREE_CTX ctx[1];
@ -5149,47 +5094,10 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
SIMPLEDATEINIT(row, cd, by, code, inet);
SIMPLEDATETRANSFER(trf_root, row);
copy_tv(&(row->statsdate), &(row->createdate));
row->six = true;
if (eos) {
// Save it for end processing
eosdate.tv_sec = row->createdate.tv_sec;
eosdate.tv_usec = row->createdate.tv_usec;
}
// confirm_summaries() doesn't call this
if (reloading) {
memcpy(&cmp, row, sizeof(cmp));
INIT_USERSTATS(&look);
look.data = (void *)(&cmp);
// Just zero it to ensure the DB record is after it, not equal to it
cmp.statsdate.tv_usec = 0;
/* If there is a matching user+worker DB record summarising this row,
* or a matching user+worker DB record next after this row, discard it */
us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, ctx);
DATA_USERSTATS_NULL(match, us_match);
if (us_match &&
match->userid == row->userid &&
strcmp(match->workername, row->workername) == 0 &&
match->summarylevel[0] != SUMMARY_NONE) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
/* If this was an eos record and eos_store has data,
* it means we need to process the eos_store */
if (eos && userstats_eos_store->count > 0)
goto advancetogo;
return true;
}
}
workerstatus_update(NULL, NULL, row);
/* group at full key: userid,createdate,poolinstance,workername
i.e. ignore instance and group together down at workername */
/* group at: userid,workername */
us_match = userstats_eos_store->head;
while (us_match && cmp_userstats(us_item, us_match) != 0.0)
us_match = us_match->next;
@ -5208,39 +5116,35 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
} else {
// New worker
// New user+worker
K_WLOCK(userstats_free);
k_add_head(userstats_eos_store, us_item);
K_WUNLOCK(userstats_free);
}
if (eos) {
advancetogo:
K_WLOCK(userstats_free);
us_next = userstats_eos_store->head;
while (us_next) {
DATA_USERSTATS(next, us_next);
if (tvdiff(&(next->createdate), &eosdate) != 0.0) {
char date_buf[DATE_BUFSIZ];
LOGERR("userstats != eos '%s' discarded: %s/%"PRId64"/%s",
tv_to_buf(&eosdate, date_buf, DATE_BUFSIZ),
next->poolinstance,
next->userid,
next->workername);
us_next = us_next->next;
} else {
us_item = find_in_ktree(userstats_root, us_next,
cmp_userstats, ctx);
if (!us_item) {
// New user+worker - store it in RAM
us_match = us_next;
us_next = us_match->next;
k_unlink_item(userstats_eos_store, us_match);
userstats_root = add_to_ktree(userstats_root, us_match,
cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_match,
cmp_userstats_statsdate);
if (!startup_complete) {
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, us_match,
cmp_userstats_workerstatus);
}
k_add_head(userstats_store, us_match);
} else {
DATA_USERSTATS(next, us_next);
// Old user+worker - update RAM if us_item is newer
DATA_USERSTATS(row, us_item);
if (tv_newer(&(next->createdate), &(row->createdate))) {
// the tree index data is the same
memcpy(next, row, sizeof(*row));
}
us_next = us_next->next;
}
}
// Discard them
@ -5252,15 +5156,15 @@ advancetogo:
return true;
}
// This is to RAM. The summariser calls the DB I/O functions for userstats
// To RAM
bool workerstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool idle,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
K_ITEM *us_item, *u_item, *us_match, look;
USERSTATS *row, cmp, *match;
K_ITEM *us_item, *u_item, *us_match;
USERSTATS *row, *match;
USERS *users;
K_TREE_CTX ctx[1];
@ -5278,11 +5182,13 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *txt;
LOGERR("%s(): unknown user '%s'",
char *usr, *wrk;
LOGERR("%s(): unknown user '%s' (worker=%s)",
__func__,
txt = safe_text(username));
free(txt);
usr = safe_text(username),
wrk = safe_text(workername));
free(usr);
free(wrk);
return false;
}
DATA_USERS(users, u_item);
@ -5299,232 +5205,29 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username,
SIMPLEDATEINIT(row, cd, by, code, inet);
SIMPLEDATETRANSFER(trf_root, row);
copy_tv(&(row->statsdate), &(row->createdate));
row->six = false;
// confirm_summaries() doesn't call this
if (reloading) {
memcpy(&cmp, row, sizeof(cmp));
INIT_USERSTATS(&look);
look.data = (void *)(&cmp);
// Just zero it to ensure the DB record is after it, not equal to it
cmp.statsdate.tv_usec = 0;
/* If there is a matching user+worker DB record summarising this row,
* or a matching user+worker DB record next after this row, discard it */
us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, ctx);
DATA_USERSTATS_NULL(match, us_match);
if (us_match &&
match->userid == row->userid &&
strcmp(match->workername, row->workername) == 0 &&
match->summarylevel[0] != SUMMARY_NONE) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
return true;
}
}
workerstatus_update(NULL, NULL, row);
K_WLOCK(userstats_free);
userstats_root = add_to_ktree(userstats_root, us_item, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_item,
cmp_userstats_statsdate);
if (!startup_complete) {
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root,
us_item,
cmp_userstats_workerstatus);
}
k_add_head(userstats_store, us_item);
K_WUNLOCK(userstats_free);
return true;
}
// TODO: data selection - only require ?
bool userstats_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
struct tm tm;
time_t now_t;
char tzinfo[16], stamp[128];
USERSTATS *row;
tv_t statsdate;
char *field;
char *sel = NULL;
size_t len, off;
int fields = 10;
long minoff, hroff;
char tzch;
bool ok;
LOGDEBUG("%s(): select", __func__);
// Temoprarily ... load last 24hrs worth
now_t = time(NULL);
now_t -= 24 * 60 * 60;
localtime_r(&now_t, &tm);
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '-';
minoff *= -1;
} else
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld:%02ld",
tzch, hroff, minoff % 60);
us_match = find_in_ktree(userstats_root, us_item,
cmp_userstats, ctx);
if (!us_match) {
// New user+worker - store it in RAM
userstats_root = add_to_ktree(userstats_root, us_item,
cmp_userstats);
k_add_head(userstats_store, us_item);
} else {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld",
tzch, hroff);
}
snprintf(stamp, sizeof(stamp),
"'%d-%02d-%02d %02d:%02d:%02d%s'",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
tzinfo);
APPEND_REALLOC_INIT(sel, off, len);
APPEND_REALLOC(sel, off, len,
"select "
"userid,workername,elapsed,hashrate,hashrate5m,"
"hashrate1hr,hashrate24hr,summarylevel,summarycount,"
"statsdate"
SIMPLEDATECONTROL
" from userstats where statsdate>");
APPEND_REALLOC(sel, off, len, stamp);
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
ok = false;
goto clean;
}
n = PQnfields(res);
if (n != (fields + SIMPLEDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + SIMPLEDATECOUNT, n);
PQclear(res);
ok = false;
goto clean;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(userstats_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(userstats_free);
DATA_USERSTATS(row, item);
if (everyone_die) {
ok = false;
break;
}
// Not a DB field
row->poolinstance[0] = '\0';
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_STR("workername", field, row->workername);
PQ_GET_FLD(res, i, "elapsed", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("elapsed", field, row->elapsed);
PQ_GET_FLD(res, i, "hashrate", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate", field, row->hashrate);
PQ_GET_FLD(res, i, "hashrate5m", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate5m", field, row->hashrate5m);
PQ_GET_FLD(res, i, "hashrate1hr", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate1hr", field, row->hashrate1hr);
PQ_GET_FLD(res, i, "hashrate24hr", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate24hr", field, row->hashrate24hr);
PQ_GET_FLD(res, i, "summarylevel", field, ok);
if (!ok)
break;
TXT_TO_STR("summarylevel", field, row->summarylevel);
PQ_GET_FLD(res, i, "summarycount", field, ok);
if (!ok)
break;
TXT_TO_INT("summarycount", field, row->summarycount);
PQ_GET_FLD(res, i, "statsdate", field, ok);
if (!ok)
break;
TXT_TO_TV("statsdate", field, row->statsdate);
// From DB - 1hr means it must have been idle > 10m
if (row->hashrate5m == 0.0 && row->hashrate1hr == 0.0)
row->idle = true;
else
row->idle = false;
SIMPLEDATEFLDS(res, i, row, ok);
if (!ok)
break;
userstats_root = add_to_ktree(userstats_root, item, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, item,
cmp_userstats_statsdate);
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, item,
cmp_userstats_workerstatus);
k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row);
if (userstats_starttimeband(row, &statsdate)) {
if (tv_newer(&(dbstatus.newest_starttimeband_userstats), &statsdate))
copy_tv(&(dbstatus.newest_starttimeband_userstats), &statsdate);
DATA_USERSTATS(match, us_match);
// Old user+worker - update RAM if us_item is newer
if (tv_newer(&(match->createdate), &(row->createdate))) {
// the tree index data is the same
memcpy(match, row, sizeof(*row));
}
tick();
k_add_head(userstats_free, us_item);
}
if (!ok)
k_add_head(userstats_free, item);
K_WUNLOCK(userstats_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d userstats records", __func__, n);
}
clean:
free(sel);
return ok;
return true;
}
bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,

Loading…
Cancel
Save