From 91430d565f80724d05ac1ddc20c51783f2a1aa08 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sun, 27 Jul 2014 19:27:46 +1000 Subject: [PATCH] ckdb - summarise userstats at 1st level --- src/ckdb.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++-- src/klist.h | 4 ++ 2 files changed, 194 insertions(+), 4 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index e0918005..5df75a7c 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -415,6 +415,14 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; STRNCPY(_row->createinet, _inet); \ } while (0) +#define SIMPLEDATEDEFAULT(_row, _now ) do { \ + _row->createdate.tv_sec = (_now)->tv_sec; \ + _row->createdate.tv_usec = (_now)->tv_usec; \ + STRNCPY(_row->createby, (char *)"code"); \ + STRNCPY(_row->createcode, (char *)__func__); \ + STRNCPY(_row->createinet, (char *)"127.0.0.1"); \ + } while (0) + // Override _row defaults if transfer fields are present #define SIMPLEDATETRANSFER(_row) do { \ K_ITEM *item; \ @@ -958,6 +966,8 @@ static K_LIST *userstats_list; static K_STORE *userstats_store; // Awaiting EOS static K_STORE *userstats_eos_store; +// Temporary while summarising +static K_STORE *userstats_summ; /* 1.5 x how often we expect to get user's stats from ckpool * This is used when grouping the sub-worker stats into a single user @@ -4409,6 +4419,52 @@ static double cmp_userstats_workername(K_ITEM *a, K_ITEM *b) return c; } +static bool userstats_add_db(PGconn *conn, USERSTATS *row) +{ + ExecStatusType rescode; + PGresult *res; + char *ins; + bool ok = false; + char *params[9 + HISTORYDATECOUNT]; + int par; + int n; + + LOGDEBUG("%s(): store", __func__); + + par = 0; + params[par++] = bigint_to_buf(row->userid, NULL, 0); + params[par++] = str_to_buf(row->workername, NULL, 0); + params[par++] = bigint_to_buf(row->elapsed, NULL, 0); + params[par++] = double_to_buf(row->hashrate, NULL, 0); + params[par++] = double_to_buf(row->hashrate5m, NULL, 0); + params[par++] = double_to_buf(row->hashrate1hr, NULL, 0); + params[par++] = double_to_buf(row->hashrate24hr, NULL, 0); + params[par++] = str_to_buf(row->summarylevel, NULL, 0); + params[par++] = tv_to_buf(&(row->statsdate), NULL, 0); + SIMPLEDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into userstats " + "(userid,workername,elapsed,hashrate,hashrate5m," + "hashrate1hr,hashrate24hr,summarylevel,statsdate" + HISTORYDATECONTROL ") values (" PQPARAM13 ")"; + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto unparam; + } + + ok = true; +unparam: + PQclear(res); + for (n = 0; n < par; n++) + free(params[n]); + + return ok; +} + static bool userstats_add(char *poolinstance, char *elapsed, char *username, char *workername, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, bool idle, @@ -4896,6 +4952,7 @@ static bool setup_data() userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true); userstats_store = k_new_store(userstats_list); userstats_eos_store = k_new_store(userstats_list); + userstats_summ = k_new_store(userstats_list); userstats_root = new_ktree(); userstats_list->dsp_func = dsp_userstats; @@ -6323,12 +6380,141 @@ static void summarise_poolstats() } // TODO: daily +// TODO: consider limiting how much/long this processes each time static void summarise_userstats() { -// Hourly: -// get stats of interest (extract from tree/list) -// summarise them -// store them in the DB,tree/list + K_TREE_CTX ctx[1], ctx2[1]; + K_ITEM *tail, *new, *prev, *tmp; + USERSTATS *userstats; + double statrange, factor; + bool locked, upgrade; + tv_t now, when; + PGconn *conn = NULL; + int count; + + locked = false; + while (1764) { + setnow(&now); + upgrade = false; + locked = true; + K_ILOCK(userstats_list); + tail = last_in_ktree(userstats_root, ctx); + // Last non DB stat + while (tail && DATA_USERSTATS(tail)->poolinstance[0] == '\0') + tail = prev_in_ktree(ctx); + + if (!tail) + break; + + statrange = tvdiff(&now, &(DATA_USERSTATS(tail)->statsdate)); + // Is there data ready for summarising? + if (statrange <= USERSTATS_AGE) + break; + + memcpy(&when, &(DATA_USERSTATS(tail)->statsdate), sizeof(when)); + /* Convent when to the start of the timeframe after the one it is in + * assume timeval ignores leapseconds ... */ + when.tv_sec = when.tv_sec - (when.tv_sec % USERSTATS_DB_S) + USERSTATS_DB_S; + when.tv_usec = 0; + + // Is the whole timerange up to before 'when' ready for summarising? + statrange = tvdiff(&now, &when); + if (statrange < USERSTATS_AGE) + break; + + prev = prev_in_ktree(ctx); + + upgrade = true; + K_ULOCK(userstats_list); + new = k_unlink_head(userstats_list); + userstats = DATA_USERSTATS(new); + memcpy(userstats, DATA_USERSTATS(tail), sizeof(USERSTATS)); + + remove_from_ktree(userstats_root, tail, cmp_userstats, ctx2); + k_unlink_item(userstats_store, tail); + k_add_head(userstats_summ, tail); + + count = 1; + while (prev) { + if (DATA_USERSTATS(prev)->userid != userstats->userid) + break; + if (strcmp(DATA_USERSTATS(prev)->workername, userstats->workername)) + break; + statrange = tvdiff(&when, &(DATA_USERSTATS(prev)->statsdate)); + if (statrange <= 0) + break; + + count++; + userstats->hashrate += DATA_USERSTATS(prev)->hashrate; + userstats->hashrate5m += DATA_USERSTATS(prev)->hashrate5m; + userstats->hashrate1hr += DATA_USERSTATS(prev)->hashrate1hr; + userstats->hashrate24hr += DATA_USERSTATS(prev)->hashrate24hr; + if (userstats->elapsed > DATA_USERSTATS(prev)->elapsed) + userstats->elapsed = DATA_USERSTATS(prev)->elapsed; + + tmp = prev_in_ktree(ctx); + remove_from_ktree(userstats_root, prev, cmp_userstats, ctx2); + k_unlink_item(userstats_store, prev); + k_add_head(userstats_summ, prev); + prev = tmp; + } + + if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0) + userstats->idle = false; + else + userstats->idle = true; + + userstats->summarylevel[0] = SUMMARY_DB; + userstats->summarylevel[1] = '\0'; + + // Expect 6 per instance + factor = (double)count / 6.0; + userstats->hashrate *= factor; + userstats->hashrate5m *= factor; + userstats->hashrate1hr *= factor; + userstats->hashrate24hr *= factor; + + memcpy(&(userstats->statsdate), &when, sizeof(when)); + // Stats to the end of this timeframe + userstats->statsdate.tv_sec -= 1; + userstats->statsdate.tv_usec = 999999; + + SIMPLEDATEDEFAULT(userstats, &now); + + if (!conn) + conn = dbconnect(); + + // TODO: Consider releasing the lock for the DB insert? + if (!userstats_add_db(conn, userstats)) { + // Put them back and cancel the summarisation + tmp = userstats_summ->head; + while (tmp) { + add_to_ktree(userstats_root, tmp, cmp_userstats); + tmp = tmp->next; + } + k_list_transfer_to_tail(userstats_summ, userstats_store); + break; + } + + k_list_transfer_to_tail(userstats_summ, userstats_list); + add_to_ktree(userstats_root, new, cmp_userstats); + + if (upgrade) + K_WUNLOCK(userstats_list); + else + K_IUNLOCK(userstats_list); + locked = false; + } + + if (locked) { + if (upgrade) + K_WUNLOCK(userstats_list); + else + K_IUNLOCK(userstats_list); + } + + if (conn) + PQfinish(conn); } static void *summariser(__maybe_unused void *arg) diff --git a/src/klist.h b/src/klist.h index 023b805b..e0072aec 100644 --- a/src/klist.h +++ b/src/klist.h @@ -64,6 +64,10 @@ typedef struct k_list { #define K_WUNLOCK(_list) ck_wunlock(_list->lock) #define K_RLOCK(_list) ck_rlock(_list->lock) #define K_RUNLOCK(_list) ck_runlock(_list->lock) +#define K_ILOCK(_list) ck_ilock(_list->lock) +#define K_IUNLOCK(_list) ck_ilock(_list->lock) +// Upgrade I to W +#define K_ULOCK(_list) ck_ulock(_list->lock) extern K_STORE *k_new_store(K_LIST *list); extern K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit, bool do_tail, KLIST_FFL_ARGS);