Browse Source

ckdb - dont store user/workerstats in the DB and only the last one per worker in RAM

master
kanoi 10 years ago
parent
commit
a111f51dda
  1. 252
      src/ckdb.c
  2. 30
      src/ckdb.h
  3. 233
      src/ckdb_cmd.c
  4. 103
      src/ckdb_data.c
  5. 367
      src/ckdb_dbio.c

252
src/ckdb.c

@ -69,9 +69,6 @@
* ckdb aborting and needing a complete restart resolves it * ckdb aborting and needing a complete restart resolves it
* The users table, required for the authorise messages, is always updated * The users table, required for the authorise messages, is always updated
* immediately and is not affected by ckpool messages until we * immediately and is not affected by ckpool messages until we
* During the reload, when checking the timeframe for summarisation, we
* use the current last userstats createdate as 'now' to avoid touching a
* timeframe where data could still be waiting to be loaded
*/ */
/* Reload data needed /* Reload data needed
@ -97,16 +94,7 @@
* TODO: subtract how much we need in RAM of the 'between' * TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting * non db records - will depend on TODO: pool stats reporting
* requirements * requirements
* DB+RAM userstats: start of the time band of the latest DB record, * RAM userstats: none (we simply store the last one found)
* since all data before this has been summarised to the DB
* The userstats summarisation always processes the oldest
* RAM data to the DB
* TODO: multiple pools is not yet handled by ckdb
* TODO: handle a pool restart with a different instance name
* since this would always make the userstats reload point
* just after the instance name was changed - however
* this can be handled for now by simply ignoring the
* poolinstance
* DB+RAM workers: created by auths so auths will resolve it * DB+RAM workers: created by auths so auths will resolve it
* DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any) * DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any)
* will be after the last DB workinfo * will be after the last DB workinfo
@ -137,15 +125,12 @@
* Any data that matches the reload stamp is processed with an * Any data that matches the reload stamp is processed with an
* ignore duplicates flag for all except as below. * ignore duplicates flag for all except as below.
* Any data after the stamp, is processed normally for all except: * Any data after the stamp, is processed normally for all except:
* 1) userstats: any record that falls in a DB userstats that would * 1) shares/shareerrors: any record that matches an incomplete DB
* summarise that record is discarded,
* otherwise the userstats is processed normally
* 2) shares/shareerrors: any record that matches an incomplete DB
* sharesummary that hasn't been reset, will reset the sharesummary * sharesummary that hasn't been reset, will reset the sharesummary
* so that the sharesummary will be recalculated * so that the sharesummary will be recalculated
* The record is processed normally with or without the reset * The record is processed normally with or without the reset
* If the sharesummary is complete, the record is discarded * If the sharesummary is complete, the record is discarded
* 3) ageworkinfo records are also handled by the shares date * 2) ageworkinfo records are also handled by the shares date
* while processing, any records already aged are not updated * while processing, any records already aged are not updated
* and a warning is displayed if there were any matching shares * and a warning is displayed if there were any matching shares
* Any ageworkinfos that match a workmarker are ignored with an error * Any ageworkinfos that match a workmarker are ignored with an error
@ -431,21 +416,16 @@ K_LIST *auths_free;
K_STORE *auths_store; K_STORE *auths_store;
// POOLSTATS poolstats.id.json={...} // POOLSTATS poolstats.id.json={...}
// TODO: redo like userstats, but every 10min
K_TREE *poolstats_root; K_TREE *poolstats_root;
K_LIST *poolstats_free; K_LIST *poolstats_free;
K_STORE *poolstats_store; K_STORE *poolstats_store;
// USERSTATS userstats.id.json={...} // USERSTATS userstats.id.json={...}
K_TREE *userstats_root; K_TREE *userstats_root;
K_TREE *userstats_statsdate_root; // ordered by statsdate first
K_TREE *userstats_workerstatus_root; // during data load
K_LIST *userstats_free; K_LIST *userstats_free;
K_STORE *userstats_store; K_STORE *userstats_store;
// Awaiting EOS // Awaiting EOS
K_STORE *userstats_eos_store; K_STORE *userstats_eos_store;
// Temporary while summarising
K_STORE *userstats_summ;
// WORKERSTATUS from various incoming data // WORKERSTATUS from various incoming data
K_TREE *workerstatus_root; K_TREE *workerstatus_root;
@ -779,9 +759,7 @@ static bool getdata3()
if (!confirm_sharesummary) { if (!confirm_sharesummary) {
if (!(ok = useratts_fill(conn)) || everyone_die) if (!(ok = useratts_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
if (!(ok = poolstats_fill(conn)) || everyone_die) ok = poolstats_fill(conn);
goto sukamudai;
ok = userstats_fill(conn);
} }
sukamudai: sukamudai:
@ -810,8 +788,6 @@ static bool reload()
LOGWARNING("%s(): %s newest DB auths", __func__, buf); LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_starttimeband_userstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB userstats start timeband", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf)); tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf); LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
@ -834,10 +810,6 @@ static bool reload()
copy_tv(&start, &(dbstatus.newest_createdate_poolstats)); copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
reason = "poolstats"; reason = "poolstats";
} }
if (!tv_newer(&start, &(dbstatus.newest_starttimeband_userstats))) {
copy_tv(&start, &(dbstatus.newest_starttimeband_userstats));
reason = "userstats";
}
tv_to_buf(&start, buf, sizeof(buf)); tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason); LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
@ -1048,10 +1020,7 @@ static void alloc_storage()
ALLOC_USERSTATS, LIMIT_USERSTATS, true); ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_free); userstats_store = k_new_store(userstats_free);
userstats_eos_store = k_new_store(userstats_free); userstats_eos_store = k_new_store(userstats_free);
userstats_summ = k_new_store(userstats_free);
userstats_root = new_ktree(); userstats_root = new_ktree();
userstats_statsdate_root = new_ktree();
userstats_workerstatus_root = new_ktree();
userstats_free->dsp_func = dsp_userstats; userstats_free->dsp_func = dsp_userstats;
workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS),
@ -1141,10 +1110,6 @@ static void dealloc_storage()
LOGWARNING("%s() userstats ...", __func__); LOGWARNING("%s() userstats ...", __func__);
FREE_TREE(userstats_workerstatus);
FREE_TREE(userstats_statsdate);
if (userstats_summ)
userstats_summ = k_free_store(userstats_summ);
FREE_STORE(userstats_eos); FREE_STORE(userstats_eos);
FREE_ALL(userstats); FREE_ALL(userstats);
@ -1239,8 +1204,6 @@ static bool setup_data()
workerstatus_ready(); workerstatus_ready();
userstats_workerstatus_root = free_ktree(userstats_workerstatus_root, NULL);
workinfo_current = last_in_ktree(workinfo_height_root, ctx); workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) { if (workinfo_current) {
DATA_WORKINFO(wic, workinfo_current); DATA_WORKINFO(wic, workinfo_current);
@ -1725,209 +1688,6 @@ static void summarise_poolstats()
// TODO // TODO
} }
// TODO: daily
// TODO: consider limiting how much/how long this processes each time
static void summarise_userstats()
{
K_TREE_CTX ctx[1];
K_ITEM *first, *last, *new, *next, *tmp;
USERSTATS *userstats, *us_first, *us_last, *us_next;
double statrange, factor;
bool locked, upgrade, issix, sixdiff;
tv_t now, process, when;
PGconn *conn = NULL;
int count, sixcount;
char error[1024];
char tvbuf1[DATE_BUFSIZ], tvbuf2[DATE_BUFSIZ];
upgrade = false;
locked = false;
while (1764) {
error[0] = '\0';
setnow(&now);
upgrade = false;
locked = true;
K_ILOCK(userstats_free);
// confirm_summaries() doesn't call this
if (!reloading)
copy_tv(&process, &now);
else {
// During reload, base the check date on the newest statsdate
last = last_in_ktree(userstats_statsdate_root, ctx);
if (!last)
break;
DATA_USERSTATS(us_last, last);
copy_tv(&process, &us_last->statsdate);
}
first = first_in_ktree(userstats_statsdate_root, ctx);
DATA_USERSTATS_NULL(us_first, first);
// Oldest non DB stat
// TODO: make the index start with summarylevel? so can find faster
while (first && us_first->summarylevel[0] != SUMMARY_NONE) {
first = next_in_ktree(ctx);
DATA_USERSTATS_NULL(us_first, first);
}
if (!first)
break;
statrange = tvdiff(&process, &(us_first->statsdate));
// Is there data ready for summarising?
if (statrange <= USERSTATS_AGE)
break;
copy_tv(&when, &(us_first->statsdate));
/* Convert when to the start of the timeframe after the one it is in
* assume timeval ignores leapseconds ... */
when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S;
when.tv_usec = 0;
// Is the whole timerange up to before 'when' ready for summarising?
statrange = tvdiff(&process, &when);
if (statrange < USERSTATS_AGE)
break;
next = next_in_ktree(ctx);
upgrade = true;
issix = us_first->six;
sixdiff = false;
K_ULOCK(userstats_free);
new = k_unlink_head(userstats_free);
DATA_USERSTATS(userstats, new);
memcpy(userstats, us_first, sizeof(USERSTATS));
userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first,
cmp_userstats_statsdate);
k_unlink_item(userstats_store, first);
k_add_head(userstats_summ, first);
count = 1;
sixcount = issix ? 1 : 0;
while (next) {
DATA_USERSTATS(us_next, next);
statrange = tvdiff(&when, &(us_next->statsdate));
if (statrange <= 0)
break;
tmp = next_in_ktree(ctx);
if (us_next->summarylevel[0] == SUMMARY_NONE &&
us_next->userid == userstats->userid &&
strcmp(us_next->workername, userstats->workername) == 0) {
if (us_next->six != issix)
sixdiff = true;
count++;
sixcount += us_next->six ? 1 : 0;
userstats->hashrate += us_next->hashrate;
userstats->hashrate5m += us_next->hashrate5m;
userstats->hashrate1hr += us_next->hashrate1hr;
userstats->hashrate24hr += us_next->hashrate24hr;
if (userstats->elapsed > us_next->elapsed)
userstats->elapsed = us_next->elapsed;
userstats->summarycount += us_next->summarycount;
userstats_root = remove_from_ktree(userstats_root,
next, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root,
next,
cmp_userstats_statsdate);
k_unlink_item(userstats_store, next);
k_add_head(userstats_summ, next);
}
next = tmp;
}
// Can temporarily release the lock since all our data is now not part of the lock
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
upgrade = false;
locked = false;
if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0)
userstats->idle = false;
else
userstats->idle = true;
userstats->summarylevel[0] = SUMMARY_DB;
userstats->summarylevel[1] = '\0';
if (issix && !sixdiff) {
// Expect 6 per poolinstance
factor = (double)count / 6.0;
} else {
// For now ... new format is still 6 per hour
factor = (double)count / 6.0;
}
userstats->hashrate *= factor;
userstats->hashrate5m *= factor;
userstats->hashrate1hr *= factor;
userstats->hashrate24hr *= factor;
copy_tv(&(userstats->statsdate), &when);
// Stats to the end of this timeframe
userstats->statsdate.tv_sec -= 1;
userstats->statsdate.tv_usec = 999999;
// This is simply when it was written, so 'now' is fine
SIMPLEDATEDEFAULT(userstats, &now);
if (!conn)
conn = dbconnect();
if (!userstats_add_db(conn, userstats)) {
/* This should only happen if a restart finds data
that wasn't found during the reload but is in
the same timeframe as DB data
i.e. it shouldn't happen, but keep the summary anyway */
when.tv_sec -= USERSTATS_DB_S;
tv_to_buf(&when, tvbuf1, sizeof(tvbuf1));
tv_to_buf(&(userstats->statsdate), tvbuf2, sizeof(tvbuf2));
snprintf(error, sizeof(error),
"Userid %"PRId64" Worker %s, %d userstats record%s "
"discarded from %s to %s",
userstats->userid,
userstats->workername,
count, (count == 1 ? "" : "s"),
tvbuf1, tvbuf2);
}
// The flags are not needed
//upgrade = true;
//locked = true;
K_WLOCK(userstats_free);
k_list_transfer_to_tail(userstats_summ, userstats_free);
k_add_head(userstats_store, new);
userstats_root = add_to_ktree(userstats_root, new, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new,
cmp_userstats_statsdate);
K_WUNLOCK(userstats_free);
//locked = false;
//upgrade = false;
if (error[0])
LOGERR("%s", error);
}
if (locked) {
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
}
if (conn)
PQfinish(conn);
}
static void *summariser(__maybe_unused void *arg) static void *summariser(__maybe_unused void *arg)
{ {
int i; int i;
@ -1968,10 +1728,6 @@ static void *summariser(__maybe_unused void *arg)
if (!everyone_die) if (!everyone_die)
sleep(1); sleep(1);
} }
if (everyone_die)
break;
else
summarise_userstats();
} }
summariser_using_data = false; summariser_using_data = false;

30
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.6" #define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.838" #define CKDB_VERSION DB_VERSION"-0.840"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1196,8 +1196,14 @@ extern K_TREE *poolstats_root;
extern K_LIST *poolstats_free; extern K_LIST *poolstats_free;
extern K_STORE *poolstats_store; extern K_STORE *poolstats_store;
// USERSTATS userstats.id.json={...} /* USERSTATS userstats.id.json={...}
// Pool sends each user (staggered) once per 10m * Pool sends each user (staggered) once per 10m
* As from CKDB V0.840 we don't store any userstats
* other than the last one we received,
* and we don't store them in the DB at all.
* Historical stats will come from markersummary
* Most of the #defines for USERSTATS are no longer needed
* but are left here for now for historical reference */
typedef struct userstats { typedef struct userstats {
char poolinstance[TXT_BIG+1]; char poolinstance[TXT_BIG+1];
int64_t userid; int64_t userid;
@ -1208,16 +1214,15 @@ typedef struct userstats {
double hashrate1hr; double hashrate1hr;
double hashrate24hr; double hashrate24hr;
bool idle; // Non-db field bool idle; // Non-db field
bool six; // Non-db field char summarylevel[TXT_FLAG+1]; // SUMMARY_NONE in RAM
char summarylevel[TXT_FLAG+1]; // Initially SUMMARY_NONE in RAM
int32_t summarycount; int32_t summarycount;
tv_t statsdate; tv_t statsdate;
SIMPLEDATECONTROLFIELDS; SIMPLEDATECONTROLFIELDS;
} USERSTATS; } USERSTATS;
/* USERSTATS protocol includes a boolean 'eos' that when true, /* The old USERSTATS protocol included a boolean 'eos' that when true,
* we have received the full set of data for the given * we had received the full set of data for the given
* createdate batch, and thus can move all (complete) records * createdate batch, and thus could move all (complete) records
* matching the createdate from userstats_eos_store into the tree */ * matching the createdate from userstats_eos_store into the tree */
#define ALLOC_USERSTATS 10000 #define ALLOC_USERSTATS 10000
@ -1227,14 +1232,10 @@ typedef struct userstats {
#define DATA_USERSTATS_NULL(_var, _item) DATA_GENERIC(_var, _item, userstats, false) #define DATA_USERSTATS_NULL(_var, _item) DATA_GENERIC(_var, _item, userstats, false)
extern K_TREE *userstats_root; extern K_TREE *userstats_root;
extern K_TREE *userstats_statsdate_root; // ordered by statsdate first
extern K_TREE *userstats_workerstatus_root; // during data load
extern K_LIST *userstats_free; extern K_LIST *userstats_free;
extern K_STORE *userstats_store; extern K_STORE *userstats_store;
// Awaiting EOS // Awaiting EOS
extern K_STORE *userstats_eos_store; extern K_STORE *userstats_eos_store;
// Temporary while summarising
extern K_STORE *userstats_summ;
/* 1.5 x how often we expect to get user's stats from ckpool /* 1.5 x how often we expect to get user's stats from ckpool
* This is used when grouping the sub-worker stats into a single user * This is used when grouping the sub-worker stats into a single user
@ -1659,10 +1660,7 @@ extern cmp_t cmp_auths(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_poolstats(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_poolstats(K_ITEM *a, K_ITEM *b);
extern void dsp_userstats(K_ITEM *item, FILE *stream); extern void dsp_userstats(K_ITEM *item, FILE *stream);
extern cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_userstats(int64_t userid, char *workername);
extern cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate);
extern void dsp_markersummary(K_ITEM *item, FILE *stream); extern void dsp_markersummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b);

233
src/ckdb_cmd.c

@ -1147,17 +1147,15 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users) static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
{ {
K_ITEM w_look, *w_item, us_look, *us_item, *ws_item; K_ITEM w_look, *w_item, us_look, *us_item, *ws_item;
K_TREE_CTX w_ctx[1], us_ctx[1], pay_ctx[1]; K_TREE_CTX w_ctx[1], pay_ctx[1];
WORKERS lookworkers, *workers; WORKERS lookworkers, *workers;
WORKERSTATUS *workerstatus; WORKERSTATUS *workerstatus;
USERSTATS lookuserstats, *userstats; USERSTATS *userstats;
char tmp[1024]; char tmp[1024];
char *buf; char *buf;
size_t len, off; size_t len, off;
int rows; int rows;
K_TREE *userstats_workername_root = new_ktree();
K_TREE_CTX usw_ctx[1];
double t_hashrate5m = 0, t_hashrate1hr = 0; double t_hashrate5m = 0, t_hashrate1hr = 0;
double t_hashrate24hr = 0; double t_hashrate24hr = 0;
double t_diffacc = 0, t_diffinv = 0; double t_diffacc = 0, t_diffinv = 0;
@ -1214,36 +1212,19 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
t_sharerej += workerstatus->sharerej; t_sharerej += workerstatus->sharerej;
} }
// find last stored userstats record /* TODO: workers_root userid+worker is ordered
lookuserstats.userid = users->userid; * so no 'find' should be needed -
lookuserstats.statsdate.tv_sec = date_eot.tv_sec; * just cmp to last 'unused us_item' userid+worker
lookuserstats.statsdate.tv_usec = date_eot.tv_usec; * then step it forward to be the next ready 'unused' */
// find/cmp doesn't get to here
lookuserstats.poolinstance[0] = '\0';
lookuserstats.workername[0] = '\0';
us_look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free); K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &us_look, cmp_userstats, us_ctx); us_item = find_userstats(users->userid, workers->workername);
DATA_USERSTATS_NULL(userstats, us_item); if (us_item) {
while (us_item && userstats->userid == lookuserstats.userid) { DATA_USERSTATS(userstats, us_item);
if (strcmp(userstats->workername, workers->workername) == 0) {
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) { if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
if (!find_in_ktree(userstats_workername_root, us_item,
cmp_userstats_workername, usw_ctx)) {
t_hashrate5m += userstats->hashrate5m; t_hashrate5m += userstats->hashrate5m;
t_hashrate1hr += userstats->hashrate1hr; t_hashrate1hr += userstats->hashrate1hr;
t_hashrate24hr += userstats->hashrate24hr; t_hashrate24hr += userstats->hashrate24hr;
userstats_workername_root =
add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
}
} else
break;
} }
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
} }
K_RUNLOCK(userstats_free); K_RUNLOCK(userstats_free);
} }
@ -1251,8 +1232,6 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
DATA_WORKERS_NULL(workers, w_item); DATA_WORKERS_NULL(workers, w_item);
} }
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
// Calculate total payratio // Calculate total payratio
paytotal = 0; paytotal = 0;
K_RLOCK(paymentaddresses_free); K_RLOCK(paymentaddresses_free);
@ -1394,11 +1373,11 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root) __maybe_unused tv_t *notcd, K_TREE *trf_root)
{ {
K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item; K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item;
K_ITEM *ua_item, us_look, *us_item, *ws_item; K_ITEM *ua_item, *us_item, *ws_item;
K_TREE_CTX w_ctx[1], us_ctx[1]; K_TREE_CTX w_ctx[1];
WORKERS lookworkers, *workers; WORKERS lookworkers, *workers;
WORKERSTATUS *workerstatus; WORKERSTATUS *workerstatus;
USERSTATS lookuserstats, *userstats; USERSTATS *userstats;
USERS *users; USERS *users;
char reply[1024] = ""; char reply[1024] = "";
char tmp[1024]; char tmp[1024];
@ -1450,7 +1429,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
INIT_WORKERS(&w_look); INIT_WORKERS(&w_look);
INIT_USERSTATS(&us_look);
lookworkers.userid = users->userid; lookworkers.userid = users->userid;
lookworkers.workername[0] = '\0'; lookworkers.workername[0] = '\0';
@ -1479,8 +1457,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
if (stats) { if (stats) {
K_TREE *userstats_workername_root = new_ktree();
K_TREE_CTX usw_ctx[1];
double w_hashrate5m, w_hashrate1hr; double w_hashrate5m, w_hashrate1hr;
double w_hashrate24hr; double w_hashrate24hr;
int64_t w_elapsed; int64_t w_elapsed;
@ -1523,39 +1499,23 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
w_sharerej = workerstatus->sharerej; w_sharerej = workerstatus->sharerej;
} }
// find last stored userid record /* TODO: workers_root userid+worker is ordered
lookuserstats.userid = users->userid; * so no 'find' should be needed -
lookuserstats.statsdate.tv_sec = date_eot.tv_sec; * just cmp to last 'unused us_item' userid+worker
lookuserstats.statsdate.tv_usec = date_eot.tv_usec; * then step it forward to be the next ready 'unused' */
// find/cmp doesn't get to here
lookuserstats.poolinstance[0] = '\0';
lookuserstats.workername[0] = '\0';
us_look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free); K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &us_look, cmp_userstats, us_ctx); us_item = find_userstats(users->userid, workers->workername);
DATA_USERSTATS_NULL(userstats, us_item); if (us_item) {
while (us_item && userstats->userid == lookuserstats.userid) { DATA_USERSTATS(userstats, us_item);
if (strcmp(userstats->workername, workers->workername) == 0) {
if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) { if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
// TODO: add together the latest per pool instance (this is the latest per worker)
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx)) {
w_hashrate5m += userstats->hashrate5m; w_hashrate5m += userstats->hashrate5m;
w_hashrate1hr += userstats->hashrate1hr; w_hashrate1hr += userstats->hashrate1hr;
w_hashrate24hr += userstats->hashrate24hr; w_hashrate24hr += userstats->hashrate24hr;
if (w_elapsed == -1 || w_elapsed > userstats->elapsed) if (w_elapsed == -1 || w_elapsed > userstats->elapsed)
w_elapsed = userstats->elapsed; w_elapsed = userstats->elapsed;
userstats_workername_root = add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
} }
} else
break;
}
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
} }
K_RUNLOCK(userstats_free);
double_to_buf(w_hashrate5m, reply, sizeof(reply)); double_to_buf(w_hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "w_hashrate5m:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "w_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
@ -1628,9 +1588,6 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
double_to_buf(w_sharerej, reply, sizeof(reply)); double_to_buf(w_sharerej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "w_sharerej:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "w_sharerej:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_RUNLOCK(userstats_free);
} }
rows++; rows++;
@ -1665,125 +1622,88 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, __maybe_unused tv_t *notcd,
__maybe_unused K_TREE *trf_root) __maybe_unused K_TREE *trf_root)
{ {
K_TREE *userstats_workername_root = new_ktree(); K_STORE *usu_store = k_new_store(userstats_free);
K_ITEM *us_item, *usw_item, *tmp_item, *u_item; K_ITEM *us_item, *usu_item, *u_item;
K_TREE_CTX us_ctx[1], usw_ctx[1]; K_TREE_CTX us_ctx[1];
USERSTATS *userstats, *userstats_w; USERSTATS *userstats, *userstats_u = NULL;
USERS *users; USERS *users;
char reply[1024] = ""; char reply[1024] = "";
char tmp[1024]; char tmp[1024];
char *buf; char *buf;
size_t len, off; size_t len, off;
int rows; int rows;
int64_t userid = -1;
double u_hashrate5m = 0.0;
double u_hashrate1hr = 0.0;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd); LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
// TODO: this really should just get the last value of each client_id (within the time limit) /* Sum up all recent userstats without workername
* i.e. userstasts per username */
// Find last records for each user/worker in ALLUSERS_LIMIT_S
// TODO: include pool_instance
K_WLOCK(userstats_free); K_WLOCK(userstats_free);
us_item = last_in_ktree(userstats_statsdate_root, us_ctx); us_item = first_in_ktree(userstats_root, us_ctx);
DATA_USERSTATS_NULL(userstats, us_item); while (us_item) {
while (us_item && tvdiff(now, &(userstats->statsdate)) < ALLUSERS_LIMIT_S) { DATA_USERSTATS(userstats, us_item);
usw_item = find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx); if (tvdiff(now, &(userstats->statsdate)) < ALLUSERS_LIMIT_S) {
if (!usw_item) { if (!userstats_u || userstats->userid != userstats_u->userid) {
usw_item = k_unlink_head(userstats_free); usu_item = k_unlink_head(userstats_free);
DATA_USERSTATS(userstats_w, usw_item); DATA_USERSTATS(userstats_u, usu_item);
userstats_w->userid = userstats->userid; userstats_u->userid = userstats->userid;
strcpy(userstats_w->workername, userstats->workername); /* Remember the first workername for if we ever
userstats_w->hashrate5m = userstats->hashrate5m; * get the missing user LOGERR message below */
userstats_w->hashrate1hr = userstats->hashrate1hr; STRNCPY(userstats_u->workername, userstats->workername);
userstats_u->hashrate5m = userstats->hashrate5m;
userstats_workername_root = add_to_ktree(userstats_workername_root, usw_item, cmp_userstats_workername); userstats_u->hashrate1hr = userstats->hashrate1hr;
k_add_head(usu_store, usu_item);
} else {
userstats_u->hashrate5m += userstats->hashrate5m;
userstats_u->hashrate1hr += userstats->hashrate1hr;
} }
us_item = prev_in_ktree(us_ctx);
DATA_USERSTATS_NULL(userstats, us_item);
} }
us_item = next_in_ktree(us_ctx);
}
K_WUNLOCK(userstats_free);
APPEND_REALLOC_INIT(buf, off, len); APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok."); APPEND_REALLOC(buf, off, len, "ok.");
rows = 0; rows = 0;
// Add up per user usu_item = usu_store->head;
usw_item = first_in_ktree(userstats_workername_root, usw_ctx); while (usu_item) {
while (usw_item) { DATA_USERSTATS(userstats_u, usu_item);
DATA_USERSTATS(userstats_w, usw_item);
if (userstats_w->userid != userid) {
if (userid != -1) {
K_RLOCK(users_free);
u_item = find_userid(userid);
K_RUNLOCK(users_free);
if (!u_item) {
LOGERR("%s() userid %"PRId64" ignored - userstats but not users",
__func__, userid);
} else {
DATA_USERS(users, u_item);
str_to_buf(users->username, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "username:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
bigint_to_buf(userid, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "userid:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate1hr, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate1hr:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows++;
}
}
userid = userstats_w->userid;
u_hashrate5m = 0;
u_hashrate1hr = 0;
}
u_hashrate5m += userstats_w->hashrate5m;
u_hashrate1hr += userstats_w->hashrate1hr;
tmp_item = usw_item;
usw_item = next_in_ktree(usw_ctx);
k_add_head(userstats_free, tmp_item);
}
if (userid != -1) {
K_RLOCK(users_free); K_RLOCK(users_free);
u_item = find_userid(userid); u_item = find_userid(userstats_u->userid);
K_RUNLOCK(users_free); K_RUNLOCK(users_free);
if (!u_item) { if (!u_item) {
LOGERR("%s() userid %"PRId64" ignored - userstats but not users", LOGERR("%s() userstats, but not users, "
__func__, userid); "ignored %"PRId64"/%s",
__func__, userstats_u->userid,
userstats_u->workername);
} else { } else {
DATA_USERS(users, u_item); DATA_USERS(users, u_item);
str_to_buf(users->username, reply, sizeof(reply)); str_to_buf(users->username, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "username:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "username:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
bigint_to_buf(userid, reply, sizeof(reply)); bigint_to_buf(users->userid, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "userid:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "userid:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate5m, reply, sizeof(reply)); double_to_buf(userstats_u->hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate5m:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "u_hashrate5m:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(u_hashrate1hr, reply, sizeof(reply)); double_to_buf(userstats_u->hashrate1hr, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate1hr:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "u_hashrate1hr:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
rows++; rows++;
} }
usu_item = usu_item->next;
} }
userstats_workername_root = free_ktree(userstats_workername_root, NULL); K_WLOCK(userstats_free);
k_list_transfer_to_head(usu_store, userstats_free);
K_WUNLOCK(userstats_free); K_WUNLOCK(userstats_free);
k_free_store(usu_store);
snprintf(tmp, sizeof(tmp), snprintf(tmp, sizeof(tmp),
"rows=%d%cflds=%s%c", "rows=%d%cflds=%s%c",
@ -2496,7 +2416,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
BLOCKS *blocks; BLOCKS *blocks;
USERS *users; USERS *users;
int64_t u_elapsed; int64_t u_elapsed;
K_TREE_CTX ctx[1], w_ctx[1]; K_TREE_CTX ctx[1];
size_t len, off; size_t len, off;
bool has_uhr; bool has_uhr;
@ -2660,40 +2580,29 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
has_uhr = false; has_uhr = false;
if (p_item && u_item) { if (p_item && u_item) {
K_TREE *userstats_workername_root = new_ktree();
u_hashrate5m = u_hashrate1hr = 0.0; u_hashrate5m = u_hashrate1hr = 0.0;
u_elapsed = -1; u_elapsed = -1;
// find last stored userid record /* find last matching userid record - before userid+1
lookuserstats.userid = users->userid; * Use 'before' in case there is (unexpectedly) a userstats
lookuserstats.statsdate.tv_sec = date_eot.tv_sec; * with an empty workername */
lookuserstats.statsdate.tv_usec = date_eot.tv_usec; lookuserstats.userid = users->userid+1;
// find/cmp doesn't get to here
STRNCPY(lookuserstats.poolinstance, EMPTY);
STRNCPY(lookuserstats.workername, EMPTY); STRNCPY(lookuserstats.workername, EMPTY);
INIT_USERSTATS(&look); INIT_USERSTATS(&look);
look.data = (void *)(&lookuserstats); look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free); K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_root, &look, cmp_userstats, ctx); us_item = find_before_in_ktree(userstats_root, &look, cmp_userstats, ctx);
DATA_USERSTATS_NULL(userstats, us_item); DATA_USERSTATS_NULL(userstats, us_item);
while (us_item && userstats->userid == lookuserstats.userid && while (us_item && userstats->userid == users->userid) {
tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) { if (tvdiff(now, &(userstats->statsdate)) < USERSTATS_PER_S) {
// TODO: add the latest per pool instance (this is the latest per worker)
// Ignore summarised data from the DB, it should be old so irrelevant
if (userstats->poolinstance[0] &&
!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, w_ctx)) {
u_hashrate5m += userstats->hashrate5m; u_hashrate5m += userstats->hashrate5m;
u_hashrate1hr += userstats->hashrate1hr; u_hashrate1hr += userstats->hashrate1hr;
if (u_elapsed == -1 || u_elapsed > userstats->elapsed) if (u_elapsed == -1 || u_elapsed > userstats->elapsed)
u_elapsed = userstats->elapsed; u_elapsed = userstats->elapsed;
has_uhr = true; has_uhr = true;
userstats_workername_root = add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
} }
us_item = prev_in_ktree(ctx); us_item = prev_in_ktree(ctx);
DATA_USERSTATS_NULL(userstats, us_item); DATA_USERSTATS_NULL(userstats, us_item);
} }
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_RUNLOCK(userstats_free); K_RUNLOCK(userstats_free);
} }
@ -4017,7 +3926,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(miningpayouts, 1, 1); USEINFO(miningpayouts, 1, 1);
USEINFO(auths, 1, 1); USEINFO(auths, 1, 1);
USEINFO(poolstats, 1, 1); USEINFO(poolstats, 1, 1);
USEINFO(userstats, 4, 2); USEINFO(userstats, 2, 1);
USEINFO(workerstatus, 1, 1); USEINFO(workerstatus, 1, 1);
USEINFO(workqueue, 1, 0); USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0); USEINFO(transfer, 0, 0);

103
src/ckdb_data.c

@ -739,15 +739,10 @@ void workerstatus_ready()
while (ws_item) { while (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item); DATA_WORKERSTATUS(workerstatus, ws_item);
// The last one // Zero or one
lookuserstats.userid = workerstatus->userid;
STRNCPY(lookuserstats.workername, workerstatus->workername);
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
us_look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free); K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look, us_item = find_userstats(workerstatus->userid,
cmp_userstats_workerstatus, us_ctx); workerstatus->workername);
K_RUNLOCK(userstats_free); K_RUNLOCK(userstats_free);
if (us_item) { if (us_item) {
DATA_USERSTATS(userstats, us_item); DATA_USERSTATS(userstats, us_item);
@ -2279,102 +2274,30 @@ void dsp_userstats(K_ITEM *item, FILE *stream)
} }
} }
/* order by userid asc,statsdate asc,poolinstance asc,workername asc /* order by userid asc,workername asc */
as per required for userstats homepage summarisation */
cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b) cmp_t cmp_userstats(K_ITEM *a, K_ITEM *b)
{ {
USERSTATS *ua, *ub; USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a); DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b); DATA_USERSTATS(ub, b);
cmp_t c = CMP_BIGINT(ua->userid, ub->userid); cmp_t c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0) {
c = CMP_STR(ua->poolinstance, ub->poolinstance);
if (c == 0) if (c == 0)
c = CMP_STR(ua->workername, ub->workername); c = CMP_STR(ua->workername, ub->workername);
}
}
return c; return c;
} }
/* order by userid asc,workername asc K_ITEM *find_userstats(int64_t userid, char *workername)
temporary tree for summing userstats when sending user homepage info */
cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
{ {
USERSTATS *ua, *ub; USERSTATS userstats;
DATA_USERSTATS(ua, a); K_TREE_CTX ctx[1];
DATA_USERSTATS(ub, b); K_ITEM look;
cmp_t c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0)
c = CMP_STR(ua->workername, ub->workername);
return c;
}
/* order by statsdate,userid asc,statsdate asc,workername asc,poolinstance asc
as per required for DB summarisation */
cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b);
cmp_t c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0) {
c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_STR(ua->workername, ub->workername);
if (c == 0)
c = CMP_STR(ua->poolinstance, ub->poolinstance);
}
}
return c;
}
/* order by userid asc,workername asc,statsdate asc,poolinstance asc
built during data load to update workerstatus at the end of the load
and used during reload to discard stats already in the DB */
cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b)
{
USERSTATS *ua, *ub;
DATA_USERSTATS(ua, a);
DATA_USERSTATS(ub, b);
cmp_t c = CMP_BIGINT(ua->userid, ub->userid);
if (c == 0) {
c = CMP_STR(ua->workername, ub->workername);
if (c == 0) {
c = CMP_TV(ua->statsdate, ub->statsdate);
if (c == 0)
c = CMP_STR(ua->poolinstance, ub->poolinstance);
}
}
return c;
}
bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate) userstats.userid = userid;
{ STRNCPY(userstats.workername, workername);
char buf[DATE_BUFSIZ+1];
copy_tv(statsdate, &(row->statsdate)); INIT_USERSTATS(&look);
// Start of this timeband look.data = (void *)(&userstats);
switch (row->summarylevel[0]) { return find_in_ktree(userstats_root, &look, cmp_userstats, ctx);
case SUMMARY_DB:
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S;
statsdate->tv_usec = 0;
break;
case SUMMARY_FULL:
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS;
statsdate->tv_usec = 0;
break;
default:
tv_to_buf(statsdate, buf, sizeof(buf));
// Bad userstats are not fatal
LOGERR("Unknown userstats summarylevel 0x%02x '%c' "
"userid %"PRId64" workername %s statsdate %s",
row->summarylevel[0], row->summarylevel[0],
row->userid, row->workername, buf);
return false;
}
return true;
} }
void dsp_markersummary(K_ITEM *item, FILE *stream) void dsp_markersummary(K_ITEM *item, FILE *stream)

367
src/ckdb_dbio.c

@ -5047,70 +5047,15 @@ clean:
return ok; return ok;
} }
bool userstats_add_db(PGconn *conn, USERSTATS *row) // To RAM
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
char *ins;
bool ok = false;
char *params[10 + SIMPLEDATECOUNT];
int n, par = 0;
LOGDEBUG("%s(): store", __func__);
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->elapsed, NULL, 0);
params[par++] = double_to_buf(row->hashrate, NULL, 0);
params[par++] = double_to_buf(row->hashrate5m, NULL, 0);
params[par++] = double_to_buf(row->hashrate1hr, NULL, 0);
params[par++] = double_to_buf(row->hashrate24hr, NULL, 0);
params[par++] = str_to_buf(row->summarylevel, NULL, 0);
params[par++] = int_to_buf(row->summarycount, NULL, 0);
params[par++] = tv_to_buf(&(row->statsdate), NULL, 0);
SIMPLEDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into userstats "
"(userid,workername,elapsed,hashrate,hashrate5m,hashrate1hr,"
"hashrate24hr,summarylevel,summarycount,statsdate"
SIMPLEDATECONTROL ") values (" PQPARAM14 ")";
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
free(params[n]);
return ok;
}
// This is to RAM. The summariser calls the DB I/O functions for userstats
bool userstats_add(char *poolinstance, char *elapsed, char *username, bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m, char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool idle, char *hashrate1hr, char *hashrate24hr, bool idle,
bool eos, char *by, char *code, char *inet, tv_t *cd, bool eos, char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root) K_TREE *trf_root)
{ {
K_ITEM *us_item, *u_item, *us_match, *us_next, look; K_ITEM *us_item, *u_item, *us_match, *us_next;
tv_t eosdate; USERSTATS *row, *match, *next;
USERSTATS *row, cmp, *match, *next;
USERS *users; USERS *users;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -5149,47 +5094,10 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
SIMPLEDATEINIT(row, cd, by, code, inet); SIMPLEDATEINIT(row, cd, by, code, inet);
SIMPLEDATETRANSFER(trf_root, row); SIMPLEDATETRANSFER(trf_root, row);
copy_tv(&(row->statsdate), &(row->createdate)); copy_tv(&(row->statsdate), &(row->createdate));
row->six = true;
if (eos) {
// Save it for end processing
eosdate.tv_sec = row->createdate.tv_sec;
eosdate.tv_usec = row->createdate.tv_usec;
}
// confirm_summaries() doesn't call this
if (reloading) {
memcpy(&cmp, row, sizeof(cmp));
INIT_USERSTATS(&look);
look.data = (void *)(&cmp);
// Just zero it to ensure the DB record is after it, not equal to it
cmp.statsdate.tv_usec = 0;
/* If there is a matching user+worker DB record summarising this row,
* or a matching user+worker DB record next after this row, discard it */
us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, ctx);
DATA_USERSTATS_NULL(match, us_match);
if (us_match &&
match->userid == row->userid &&
strcmp(match->workername, row->workername) == 0 &&
match->summarylevel[0] != SUMMARY_NONE) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
/* If this was an eos record and eos_store has data,
* it means we need to process the eos_store */
if (eos && userstats_eos_store->count > 0)
goto advancetogo;
return true;
}
}
workerstatus_update(NULL, NULL, row); workerstatus_update(NULL, NULL, row);
/* group at full key: userid,createdate,poolinstance,workername /* group at: userid,workername */
i.e. ignore instance and group together down at workername */
us_match = userstats_eos_store->head; us_match = userstats_eos_store->head;
while (us_match && cmp_userstats(us_item, us_match) != 0.0) while (us_match && cmp_userstats(us_item, us_match) != 0.0)
us_match = us_match->next; us_match = us_match->next;
@ -5208,39 +5116,35 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
k_add_head(userstats_free, us_item); k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free); K_WUNLOCK(userstats_free);
} else { } else {
// New worker // New user+worker
K_WLOCK(userstats_free); K_WLOCK(userstats_free);
k_add_head(userstats_eos_store, us_item); k_add_head(userstats_eos_store, us_item);
K_WUNLOCK(userstats_free); K_WUNLOCK(userstats_free);
} }
if (eos) { if (eos) {
advancetogo:
K_WLOCK(userstats_free); K_WLOCK(userstats_free);
us_next = userstats_eos_store->head; us_next = userstats_eos_store->head;
while (us_next) { while (us_next) {
DATA_USERSTATS(next, us_next); us_item = find_in_ktree(userstats_root, us_next,
if (tvdiff(&(next->createdate), &eosdate) != 0.0) { cmp_userstats, ctx);
char date_buf[DATE_BUFSIZ]; if (!us_item) {
LOGERR("userstats != eos '%s' discarded: %s/%"PRId64"/%s", // New user+worker - store it in RAM
tv_to_buf(&eosdate, date_buf, DATE_BUFSIZ),
next->poolinstance,
next->userid,
next->workername);
us_next = us_next->next;
} else {
us_match = us_next; us_match = us_next;
us_next = us_match->next; us_next = us_match->next;
k_unlink_item(userstats_eos_store, us_match); k_unlink_item(userstats_eos_store, us_match);
userstats_root = add_to_ktree(userstats_root, us_match, userstats_root = add_to_ktree(userstats_root, us_match,
cmp_userstats); cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_match,
cmp_userstats_statsdate);
if (!startup_complete) {
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, us_match,
cmp_userstats_workerstatus);
}
k_add_head(userstats_store, us_match); k_add_head(userstats_store, us_match);
} else {
DATA_USERSTATS(next, us_next);
// Old user+worker - update RAM if us_item is newer
DATA_USERSTATS(row, us_item);
if (tv_newer(&(next->createdate), &(row->createdate))) {
// the tree index data is the same
memcpy(next, row, sizeof(*row));
}
us_next = us_next->next;
} }
} }
// Discard them // Discard them
@ -5252,15 +5156,15 @@ advancetogo:
return true; return true;
} }
// This is to RAM. The summariser calls the DB I/O functions for userstats // To RAM
bool workerstats_add(char *poolinstance, char *elapsed, char *username, bool workerstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m, char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool idle, char *hashrate1hr, char *hashrate24hr, bool idle,
char *by, char *code, char *inet, tv_t *cd, char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root) K_TREE *trf_root)
{ {
K_ITEM *us_item, *u_item, *us_match, look; K_ITEM *us_item, *u_item, *us_match;
USERSTATS *row, cmp, *match; USERSTATS *row, *match;
USERS *users; USERS *users;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -5301,232 +5205,29 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username,
SIMPLEDATEINIT(row, cd, by, code, inet); SIMPLEDATEINIT(row, cd, by, code, inet);
SIMPLEDATETRANSFER(trf_root, row); SIMPLEDATETRANSFER(trf_root, row);
copy_tv(&(row->statsdate), &(row->createdate)); copy_tv(&(row->statsdate), &(row->createdate));
row->six = false;
// confirm_summaries() doesn't call this
if (reloading) {
memcpy(&cmp, row, sizeof(cmp));
INIT_USERSTATS(&look);
look.data = (void *)(&cmp);
// Just zero it to ensure the DB record is after it, not equal to it
cmp.statsdate.tv_usec = 0;
/* If there is a matching user+worker DB record summarising this row,
* or a matching user+worker DB record next after this row, discard it */
us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, ctx);
DATA_USERSTATS_NULL(match, us_match);
if (us_match &&
match->userid == row->userid &&
strcmp(match->workername, row->workername) == 0 &&
match->summarylevel[0] != SUMMARY_NONE) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
return true;
}
}
workerstatus_update(NULL, NULL, row); workerstatus_update(NULL, NULL, row);
K_WLOCK(userstats_free); K_WLOCK(userstats_free);
userstats_root = add_to_ktree(userstats_root, us_item, cmp_userstats); us_match = find_in_ktree(userstats_root, us_item,
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, us_item, cmp_userstats, ctx);
cmp_userstats_statsdate); if (!us_match) {
if (!startup_complete) { // New user+worker - store it in RAM
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, userstats_root = add_to_ktree(userstats_root, us_item,
us_item, cmp_userstats);
cmp_userstats_workerstatus);
}
k_add_head(userstats_store, us_item); k_add_head(userstats_store, us_item);
K_WUNLOCK(userstats_free);
return true;
}
// TODO: data selection - only require ?
bool userstats_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
struct tm tm;
time_t now_t;
char tzinfo[16], stamp[128];
USERSTATS *row;
tv_t statsdate;
char *field;
char *sel = NULL;
size_t len, off;
int fields = 10;
long minoff, hroff;
char tzch;
bool ok;
LOGDEBUG("%s(): select", __func__);
// Temoprarily ... load last 24hrs worth
now_t = time(NULL);
now_t -= 24 * 60 * 60;
localtime_r(&now_t, &tm);
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '-';
minoff *= -1;
} else
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld:%02ld",
tzch, hroff, minoff % 60);
} else { } else {
snprintf(tzinfo, sizeof(tzinfo), DATA_USERSTATS(match, us_match);
"%c%02ld", // Old user+worker - update RAM if us_item is newer
tzch, hroff); if (tv_newer(&(match->createdate), &(row->createdate))) {
} // the tree index data is the same
snprintf(stamp, sizeof(stamp), memcpy(match, row, sizeof(*row));
"'%d-%02d-%02d %02d:%02d:%02d%s'",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
tzinfo);
APPEND_REALLOC_INIT(sel, off, len);
APPEND_REALLOC(sel, off, len,
"select "
"userid,workername,elapsed,hashrate,hashrate5m,"
"hashrate1hr,hashrate24hr,summarylevel,summarycount,"
"statsdate"
SIMPLEDATECONTROL
" from userstats where statsdate>");
APPEND_REALLOC(sel, off, len, stamp);
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
ok = false;
goto clean;
}
n = PQnfields(res);
if (n != (fields + SIMPLEDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + SIMPLEDATECOUNT, n);
PQclear(res);
ok = false;
goto clean;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(userstats_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(userstats_free);
DATA_USERSTATS(row, item);
if (everyone_die) {
ok = false;
break;
}
// Not a DB field
row->poolinstance[0] = '\0';
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_STR("workername", field, row->workername);
PQ_GET_FLD(res, i, "elapsed", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("elapsed", field, row->elapsed);
PQ_GET_FLD(res, i, "hashrate", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate", field, row->hashrate);
PQ_GET_FLD(res, i, "hashrate5m", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate5m", field, row->hashrate5m);
PQ_GET_FLD(res, i, "hashrate1hr", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate1hr", field, row->hashrate1hr);
PQ_GET_FLD(res, i, "hashrate24hr", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("hashrate24hr", field, row->hashrate24hr);
PQ_GET_FLD(res, i, "summarylevel", field, ok);
if (!ok)
break;
TXT_TO_STR("summarylevel", field, row->summarylevel);
PQ_GET_FLD(res, i, "summarycount", field, ok);
if (!ok)
break;
TXT_TO_INT("summarycount", field, row->summarycount);
PQ_GET_FLD(res, i, "statsdate", field, ok);
if (!ok)
break;
TXT_TO_TV("statsdate", field, row->statsdate);
// From DB - 1hr means it must have been idle > 10m
if (row->hashrate5m == 0.0 && row->hashrate1hr == 0.0)
row->idle = true;
else
row->idle = false;
SIMPLEDATEFLDS(res, i, row, ok);
if (!ok)
break;
userstats_root = add_to_ktree(userstats_root, item, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, item,
cmp_userstats_statsdate);
userstats_workerstatus_root = add_to_ktree(userstats_workerstatus_root, item,
cmp_userstats_workerstatus);
k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row);
if (userstats_starttimeband(row, &statsdate)) {
if (tv_newer(&(dbstatus.newest_starttimeband_userstats), &statsdate))
copy_tv(&(dbstatus.newest_starttimeband_userstats), &statsdate);
} }
k_add_head(userstats_free, us_item);
tick();
} }
if (!ok)
k_add_head(userstats_free, item);
K_WUNLOCK(userstats_free); K_WUNLOCK(userstats_free);
PQclear(res);
if (ok) { return true;
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d userstats records", __func__, n);
}
clean:
free(sel);
return ok;
} }
bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,

Loading…
Cancel
Save