Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 11 years ago
parent
commit
dbd86bd57c
  1. 3
      sql/reloadstatus.sql
  2. 195
      src/ckdb.c

3
sql/reloadstatus.sql

@ -2,4 +2,5 @@ select 'sharesummary' as "sharesummary",min(firstshare) as "min incomplete first
select 'workinfo' as "workinfo",max(createdate) as "max createdate" from workinfo;
select 'auths' as "auths",max(createdate) as "max createdate" from auths;
select 'poolstats' as "poolstats",max(createdate) as "max createdate" from poolstats;
select 'userstats' as "userstats",max(statsdate) as "max statsdate - start of this hour" ,userid,workername from userstats group by userid,workername;
select 'userstats' as "userstats",max(statsdate) as "max statsdate - start of this hour" from userstats;
select 'blocks' as "blocks",max(createdate) as "max createdate" from blocks;

195
src/ckdb.c

@ -153,10 +153,10 @@ static char *restorefrom;
* TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting
* requirements
* DB+RAM userstats: for each pool/user/worker it would be the start
* of the time band containing the latest DB statsdate,
* since all previous data was summarised to the DB and RAM and
* deleted - use the oldest of these for all pools/users/workers
* DB+RAM userstats: start of the time band of the latest DB record,
* since all data before this has been summarised to the DB
* The userstats summarisation always processes the oldest
* RAM data to the DB
* TODO: multiple pools is not yet handled by ckdb
* TODO: handle a pool restart with a different instance name
* since this would always make the userstats reload point
@ -216,17 +216,11 @@ typedef struct loadstatus {
tv_t newest_createdate_workinfo;
tv_t newest_createdate_auths;
tv_t newest_createdate_poolstats;
tv_t userstats;
tv_t newest_starttimeband_userstats;
tv_t newest_createdate_blocks;
} LOADSTATUS;
static LOADSTATUS dbstatus;
/* Temporary while doing reload - it (of course) contains the fields
* required to track the newest userstats per user/worker
*/
static K_TREE *userstats_db_root;
static K_STORE *userstats_db;
// size limit on the command string
#define CMD_SIZ 31
#define ID_SIZ 31
@ -1895,8 +1889,6 @@ static void workerstatus_ready()
ws_item = next_in_ktree(ws_ctx);
}
free_ktree(userstats_workerstatus_root, NULL);
}
static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats,
@ -4844,7 +4836,8 @@ static double cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b)
}
/* order by userid asc,workername asc,statsdate asc,poolinstance asc
built during data load to update workerstatus at the end of the load */
built during data load to update workerstatus at the end of the load
and used during reload to discard stats already in the DB */
static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b)
{
double c = (double)(DATA_USERSTATS(a)->userid -
@ -4916,10 +4909,10 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *hashrate1hr, char *hashrate24hr, bool idle,
bool eos, char *by, char *code, char *inet, tv_t *cd)
{
K_ITEM *us_item, *u_item, *us_match, *us_next, *db_match;
K_TREE_CTX ctx[1];
USERSTATS *row;
K_ITEM *us_item, *u_item, *us_match, *us_next, look;
tv_t eosdate;
USERSTATS *row, cmp;
K_TREE_CTX ctx[1];
LOGDEBUG("%s(): add", __func__);
@ -4955,22 +4948,28 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
}
if (reloading) {
/* If the db load said the statsdate for this userid+workername
* is already summarised then we discard it */
db_match = find_in_ktree(userstats_db_root, us_item,
cmp_userstats_workername, ctx);
if (db_match &&
!tv_newer(&(DATA_USERSTATS(db_match)->statsdate), cd)) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
/* If this was an eos record and eos_store has data,
* it means we need to process the eos_store */
if (eos && userstats_eos_store->count > 0)
goto advancetogo;
memcpy(&cmp, row, sizeof(cmp));
look.data = (void *)(&cmp);
// Just zero it to ensure the DB record is after it, not equal to it
cmp.statsdate.tv_usec = 0;
/* If there is a matching user+worker DB record summarising this row,
* or a matching user+worker DB record next after this row, discard it */
us_match = find_after_in_ktree(userstats_workerstatus_root, &look,
cmp_userstats_workerstatus, ctx);
if (us_match &&
DATA_USERSTATS(us_match)->userid == row->userid &&
strcmp(DATA_USERSTATS(us_match)->workername, row->workername) == 0 &&
DATA_USERSTATS(us_match)->summarylevel[0] != SUMMARY_NONE) {
K_WLOCK(userstats_free);
k_add_head(userstats_free, us_item);
K_WUNLOCK(userstats_free);
return true;
/* If this was an eos record and eos_store has data,
* it means we need to process the eos_store */
if (eos && userstats_eos_store->count > 0)
goto advancetogo;
return true;
}
}
@ -5038,53 +5037,31 @@ advancetogo:
return true;
}
// Requires K_WLOCK(userstats_free)
static void userstats_update_ccl(USERSTATS *row)
static bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate)
{
USERSTATS userstats, *tmp;
K_TREE_CTX ctx[1];
K_ITEM look, *item;
char buf[DATE_BUFSIZ+1];
userstats.userid = row->userid;
STRNCPY(userstats.workername, row->workername);
copy_tv(&(userstats.statsdate), &(row->statsdate));
copy_tv(statsdate, &(row->statsdate));
// Start of this timeband
switch (row->summarylevel[0]) {
case SUMMARY_DB:
userstats.statsdate.tv_sec -= userstats.statsdate.tv_sec % USERSTATS_DB_S;
userstats.statsdate.tv_usec = 0;
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_S;
statsdate->tv_usec = 0;
break;
case SUMMARY_FULL:
userstats.statsdate.tv_sec -= userstats.statsdate.tv_sec % USERSTATS_DB_DS;
userstats.statsdate.tv_usec = 0;
statsdate->tv_sec -= statsdate->tv_sec % USERSTATS_DB_DS;
statsdate->tv_usec = 0;
break;
default:
tv_to_buf(&(row->statsdate), buf, sizeof(buf));
tv_to_buf(statsdate, buf, sizeof(buf));
// Bad userstats are not fatal
LOGERR("Unknown userstats summarylevel '%c' "
"userid "PRId64" workername %s statsdate %s",
row->summarylevel[0], row->userid,
row->workername, buf);
return;
}
look.data = (void *)(&userstats);
item = find_in_ktree(userstats_db_root, &look, cmp_userstats_workername, ctx);
if (item) {
tmp = DATA_USERSTATS(item);
if (tv_newer(&(tmp->statsdate), &(userstats.statsdate)))
copy_tv(&(tmp->statsdate), &(userstats.statsdate));
} else {
item = k_unlink_head(userstats_free);
tmp = DATA_USERSTATS(item);
bzero(tmp, sizeof(*tmp));
tmp->userid = userstats.userid;
STRNCPY(tmp->workername, userstats.workername);
copy_tv(&(tmp->statsdate), &(userstats.statsdate));
userstats_db_root = add_to_ktree(userstats_db_root, item,
cmp_userstats_workername);
k_add_head(userstats_db, item);
LOGERR("Unknown userstats summarylevel 0x%02x '%c' "
"userid %"PRId64" workername %s statsdate %s",
row->summarylevel[0], row->summarylevel[0],
row->userid, row->workername, buf);
return false;
}
return true;
}
// TODO: data selection - only require ?
@ -5095,6 +5072,7 @@ static bool userstats_fill(PGconn *conn)
K_ITEM *item;
int n, i;
USERSTATS *row;
tv_t statsdate;
char *field;
char *sel;
int fields = 10;
@ -5202,7 +5180,10 @@ static bool userstats_fill(PGconn *conn)
k_add_head(userstats_store, item);
workerstatus_update(NULL, NULL, row, NULL);
userstats_update_ccl(row);
if (userstats_starttimeband(row, &statsdate)) {
if (tv_newer(&(dbstatus.newest_starttimeband_userstats), &statsdate))
copy_tv(&(dbstatus.newest_starttimeband_userstats), &statsdate);
}
}
if (!ok)
k_add_head(userstats_free, item);
@ -5348,10 +5329,10 @@ static bool reload()
{
char buf[DATE_BUFSIZ+1];
char *filename;
K_ITEM *ccl;
tv_t start;
char *reason;
FILE *fp;
bool ok;
bool ok = true;
tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf);
@ -5361,30 +5342,32 @@ static bool reload()
LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
ccl = userstats_db->head;
// oldest in ccl
while (ccl) {
if (dbstatus.userstats.tv_sec == 0 ||
!tv_newer(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate)))
copy_tv(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate));
ccl = ccl->next;
}
tv_to_buf(&(dbstatus.userstats), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_starttimeband_userstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB userstats start timeband", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks", __func__, buf);
LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
copy_tv(&start, &(dbstatus.oldest_sharesummary_firstshare_n));
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo)))
reason = "sharesummary";
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) {
copy_tv(&start, &(dbstatus.newest_createdate_workinfo));
if (!tv_newer(&start, &(dbstatus.newest_createdate_auths)))
reason = "workinfo";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_auths))) {
copy_tv(&start, &(dbstatus.newest_createdate_auths));
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats)))
reason = "auths";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) {
copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
if (!tv_newer(&start, &(dbstatus.userstats)))
copy_tv(&start, &(dbstatus.userstats));
reason = "poolstats";
}
if (!tv_newer(&start, &(dbstatus.newest_starttimeband_userstats))) {
copy_tv(&start, &(dbstatus.newest_starttimeband_userstats));
reason = "userstats";
}
tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
if (start.tv_sec < DATE_BEGIN) {
start.tv_sec = DATE_BEGIN;
@ -5407,9 +5390,6 @@ static bool reload()
}
ok = reload_from(&start);
free_ktree(userstats_db_root, NULL);
k_list_transfer_to_head(userstats_db, userstats_free);
return ok;
}
@ -5578,12 +5558,10 @@ static bool setup_data()
userstats_store = k_new_store(userstats_free);
userstats_eos_store = k_new_store(userstats_free);
userstats_summ = k_new_store(userstats_free);
userstats_db = k_new_store(userstats_free);
userstats_root = new_ktree();
userstats_statsdate_root = new_ktree();
userstats_workerstatus_root = new_ktree();
userstats_free->dsp_func = dsp_userstats;
userstats_db_root = new_ktree();
workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS),
ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true);
@ -5600,6 +5578,8 @@ static bool setup_data()
workerstatus_ready();
userstats_workerstatus_root = free_ktree(userstats_workerstatus_root, NULL);
workinfo_current = last_in_ktree(workinfo_height_root, ctx);
if (workinfo_current) {
STRNCPY(wi.coinbase1, DATA_WORKINFO(workinfo_current)->coinbase1);
@ -7382,6 +7362,24 @@ static void *summariser(__maybe_unused void *arg)
return NULL;
}
static uint64_t ticks;
static time_t last_tick;
static void tick()
{
time_t now;
char ch;
now = time(NULL);
if (now > last_tick) {
last_tick = now;
ch = status_chars[ticks++ & 0x3];
putchar(ch);
putchar('\r');
fflush(stdout);
}
}
static bool reload_line(char *filename, uint64_t count, char *buf)
{
char cmd[CMD_SIZ+1], id[ID_SIZ+1];
@ -7449,10 +7447,7 @@ static bool reload_line(char *filename, uint64_t count, char *buf)
}
}
char ch = status_chars[count & 0x3];
putchar(ch);
putchar('\r');
fflush(stdout);
tick();
K_WLOCK(transfer_free);
transfer_root = free_ktree(transfer_root, NULL);
@ -7720,10 +7715,8 @@ static void *listener(void *arg)
}
close(sockd);
char ch = status_chars[(counter++) & 0x3];
putchar(ch);
putchar('\r');
fflush(stdout);
counter++;
tick();
if (cmdnum == CMD_SHUTDOWN)
break;

Loading…
Cancel
Save