Browse Source

ckdb - summarise userstats at 1st level

master
kanoi 10 years ago
parent
commit
91430d565f
  1. 194
      src/ckdb.c
  2. 4
      src/klist.h

194
src/ckdb.c

@ -415,6 +415,14 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT };
STRNCPY(_row->createinet, _inet); \
} while (0)
#define SIMPLEDATEDEFAULT(_row, _now ) do { \
_row->createdate.tv_sec = (_now)->tv_sec; \
_row->createdate.tv_usec = (_now)->tv_usec; \
STRNCPY(_row->createby, (char *)"code"); \
STRNCPY(_row->createcode, (char *)__func__); \
STRNCPY(_row->createinet, (char *)"127.0.0.1"); \
} while (0)
// Override _row defaults if transfer fields are present
#define SIMPLEDATETRANSFER(_row) do { \
K_ITEM *item; \
@ -958,6 +966,8 @@ static K_LIST *userstats_list;
static K_STORE *userstats_store;
// Awaiting EOS
static K_STORE *userstats_eos_store;
// Temporary while summarising
static K_STORE *userstats_summ;
/* 1.5 x how often we expect to get user's stats from ckpool
* This is used when grouping the sub-worker stats into a single user
@ -4409,6 +4419,52 @@ static double cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
return c;
}
static bool userstats_add_db(PGconn *conn, USERSTATS *row)
{
ExecStatusType rescode;
PGresult *res;
char *ins;
bool ok = false;
char *params[9 + HISTORYDATECOUNT];
int par;
int n;
LOGDEBUG("%s(): store", __func__);
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->elapsed, NULL, 0);
params[par++] = double_to_buf(row->hashrate, NULL, 0);
params[par++] = double_to_buf(row->hashrate5m, NULL, 0);
params[par++] = double_to_buf(row->hashrate1hr, NULL, 0);
params[par++] = double_to_buf(row->hashrate24hr, NULL, 0);
params[par++] = str_to_buf(row->summarylevel, NULL, 0);
params[par++] = tv_to_buf(&(row->statsdate), NULL, 0);
SIMPLEDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into userstats "
"(userid,workername,elapsed,hashrate,hashrate5m,"
"hashrate1hr,hashrate24hr,summarylevel,statsdate"
HISTORYDATECONTROL ") values (" PQPARAM13 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
return ok;
}
static bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool idle,
@ -4896,6 +4952,7 @@ static bool setup_data()
userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_list);
userstats_eos_store = k_new_store(userstats_list);
userstats_summ = k_new_store(userstats_list);
userstats_root = new_ktree();
userstats_list->dsp_func = dsp_userstats;
@ -6323,12 +6380,141 @@ static void summarise_poolstats()
}
// TODO: daily
// TODO: consider limiting how much/long this processes each time
static void summarise_userstats()
{
// Hourly:
// get stats of interest (extract from tree/list)
// summarise them
// store them in the DB,tree/list
K_TREE_CTX ctx[1], ctx2[1];
K_ITEM *tail, *new, *prev, *tmp;
USERSTATS *userstats;
double statrange, factor;
bool locked, upgrade;
tv_t now, when;
PGconn *conn = NULL;
int count;
locked = false;
while (1764) {
setnow(&now);
upgrade = false;
locked = true;
K_ILOCK(userstats_list);
tail = last_in_ktree(userstats_root, ctx);
// Last non DB stat
while (tail && DATA_USERSTATS(tail)->poolinstance[0] == '\0')
tail = prev_in_ktree(ctx);
if (!tail)
break;
statrange = tvdiff(&now, &(DATA_USERSTATS(tail)->statsdate));
// Is there data ready for summarising?
if (statrange <= USERSTATS_AGE)
break;
memcpy(&when, &(DATA_USERSTATS(tail)->statsdate), sizeof(when));
/* Convent when to the start of the timeframe after the one it is in
* assume timeval ignores leapseconds ... */
when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S;
when.tv_usec = 0;
// Is the whole timerange up to before 'when' ready for summarising?
statrange = tvdiff(&now, &when);
if (statrange < USERSTATS_AGE)
break;
prev = prev_in_ktree(ctx);
upgrade = true;
K_ULOCK(userstats_list);
new = k_unlink_head(userstats_list);
userstats = DATA_USERSTATS(new);
memcpy(userstats, DATA_USERSTATS(tail), sizeof(USERSTATS));
remove_from_ktree(userstats_root, tail, cmp_userstats, ctx2);
k_unlink_item(userstats_store, tail);
k_add_head(userstats_summ, tail);
count = 1;
while (prev) {
if (DATA_USERSTATS(prev)->userid != userstats->userid)
break;
if (strcmp(DATA_USERSTATS(prev)->workername, userstats->workername))
break;
statrange = tvdiff(&when, &(DATA_USERSTATS(prev)->statsdate));
if (statrange <= 0)
break;
count++;
userstats->hashrate += DATA_USERSTATS(prev)->hashrate;
userstats->hashrate5m += DATA_USERSTATS(prev)->hashrate5m;
userstats->hashrate1hr += DATA_USERSTATS(prev)->hashrate1hr;
userstats->hashrate24hr += DATA_USERSTATS(prev)->hashrate24hr;
if (userstats->elapsed > DATA_USERSTATS(prev)->elapsed)
userstats->elapsed = DATA_USERSTATS(prev)->elapsed;
tmp = prev_in_ktree(ctx);
remove_from_ktree(userstats_root, prev, cmp_userstats, ctx2);
k_unlink_item(userstats_store, prev);
k_add_head(userstats_summ, prev);
prev = tmp;
}
if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0)
userstats->idle = false;
else
userstats->idle = true;
userstats->summarylevel[0] = SUMMARY_DB;
userstats->summarylevel[1] = '\0';
// Expect 6 per instance
factor = (double)count / 6.0;
userstats->hashrate *= factor;
userstats->hashrate5m *= factor;
userstats->hashrate1hr *= factor;
userstats->hashrate24hr *= factor;
memcpy(&(userstats->statsdate), &when, sizeof(when));
// Stats to the end of this timeframe
userstats->statsdate.tv_sec -= 1;
userstats->statsdate.tv_usec = 999999;
SIMPLEDATEDEFAULT(userstats, &now);
if (!conn)
conn = dbconnect();
// TODO: Consider releasing the lock for the DB insert?
if (!userstats_add_db(conn, userstats)) {
// Put them back and cancel the summarisation
tmp = userstats_summ->head;
while (tmp) {
add_to_ktree(userstats_root, tmp, cmp_userstats);
tmp = tmp->next;
}
k_list_transfer_to_tail(userstats_summ, userstats_store);
break;
}
k_list_transfer_to_tail(userstats_summ, userstats_list);
add_to_ktree(userstats_root, new, cmp_userstats);
if (upgrade)
K_WUNLOCK(userstats_list);
else
K_IUNLOCK(userstats_list);
locked = false;
}
if (locked) {
if (upgrade)
K_WUNLOCK(userstats_list);
else
K_IUNLOCK(userstats_list);
}
if (conn)
PQfinish(conn);
}
static void *summariser(__maybe_unused void *arg)

4
src/klist.h

@ -64,6 +64,10 @@ typedef struct k_list {
#define K_WUNLOCK(_list) ck_wunlock(_list->lock)
#define K_RLOCK(_list) ck_rlock(_list->lock)
#define K_RUNLOCK(_list) ck_runlock(_list->lock)
#define K_ILOCK(_list) ck_ilock(_list->lock)
#define K_IUNLOCK(_list) ck_ilock(_list->lock)
// Upgrade I to W
#define K_ULOCK(_list) ck_ulock(_list->lock)
extern K_STORE *k_new_store(K_LIST *list);
extern K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit, bool do_tail, KLIST_FFL_ARGS);

Loading…
Cancel
Save