Browse Source

ckdb - begin aging code, startup stats, handle idle

master
kanoi 10 years ago
parent
commit
512e734523
  1. 375
      src/ckdb.c

375
src/ckdb.c

@ -93,14 +93,22 @@ ASSERT2(sizeof(int64_t) == 8);
(_res) == PGRES_TUPLES_OK || \
(_res) == PGRES_EMPTY_QUERY)
static char *pqerrmsg(PGconn *conn)
{
char *ptr, *buf = strdup(PQerrorMessage(conn));
ptr = buf + strlen(buf) - 1;
while (ptr >= buf && (*ptr == '\n' || *ptr == '\r'))
*(ptr--) = '\0';
while (--ptr >= buf) {
if (*ptr == '\n' || *ptr == '\r' || *ptr == '\t')
*ptr = ' ';
}
return buf;
}
#define PGLOG(__LOG, __str, __rescode, __conn) do { \
char *__ptr, *__buf = strdup(PQerrorMessage(__conn)); \
__ptr = __buf + strlen(__buf) - 1; \
while (__ptr >= __buf && (*__ptr == '\n' || *__ptr == '\r')) \
*(__ptr--) = '\0'; \
while (--__ptr >= __buf) \
if (*__ptr == '\n' || *__ptr == '\r' || *__ptr == '\t') \
*__ptr = ' '; \
char *__buf = pqerrmsg(__conn); \
__LOG("%s(): %s failed (%d) '%s'", __func__, \
__str, (int)rescode, __buf); \
free(__buf); \
@ -469,6 +477,11 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT };
} \
} while (0)
// Tell the summarizer data load is complete
static bool summarizer_go = false;
// Tell the summarizer to die
static bool summarizer_die = false;
static const char *userpatt = "^[!-~]*$"; // no spaces
static const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.]*[A-Za-z0-9]$";
static const char *idpatt = "^[_A-Za-z][_A-Za-z0-9]*$";
@ -478,7 +491,7 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$";
#define JSON_TRANSFER "json="
#define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1)
// Methods for sharelog (common function for all 3)
// Methods for sharelog (common function for all)
#define STR_WORKINFO "workinfo"
#define STR_SHARES "shares"
#define STR_SHAREERRORS "shareerror"
@ -779,6 +792,7 @@ typedef struct sharesummary {
int64_t errorcount;
int64_t countlastupdate; // non-DB field
bool inserted; // non-DB field
bool saveaged; // non-DB field
tv_t firstshare;
tv_t lastshare;
char complete[TXT_FLAG+1];
@ -912,7 +926,6 @@ static K_STORE *poolstats_store;
// USERSTATS userstats.id.json={...}
// Pool sends each user (staggered) once per 10m
// TODO: When to discard?
typedef struct userstats {
char poolinstance[TXT_BIG+1];
int64_t userid;
@ -922,15 +935,17 @@ typedef struct userstats {
double hashrate5m;
double hashrate1hr;
double hashrate24hr;
bool idle; // Non-db field
char summarylevel[TXT_FLAG+1]; // Initially '0' in the DB
SIMPLEDATECONTROLFIELDS;
} USERSTATS;
/* USERSATS protocol includes a boolean 'eos' that when true,
/* USERSTATS protocol includes a boolean 'eos' that when true,
* we have received the full set of data for the given
* createdate batch, and thus can move all (complete) records
* matching the createdate in userstats_eos_store into the tree */
* matching the createdate from userstats_eos_store into the tree */
#define ALLOC_USERSTATS 1000
#define ALLOC_USERSTATS 10000
#define LIMIT_USERSTATS 0
#define DATA_USERSTATS(_item) ((USERSTATS *)(_item->data))
@ -944,11 +959,79 @@ static K_STORE *userstats_eos_store;
* This is used when grouping the sub-worker stats into a single user
* We add each worker's latest stats to the total - except we ignore
* any worker with newest stats being older than USERSTATS_PER_S */
#define USERSTATS_PER_S (int)(600 * 1.5)
#define USERSTATS_PER_S 900
/* on the allusers page, show any with stats in the last 1hr */
/* on the allusers page, show any with stats in the last ... */
#define ALLUSERS_LIMIT_S 3600
/* Userstats get stored in the DB for each time band of this
* amount from midnight (UTC+00)
* Thus we simply put each stats value in the time band of the
* stat's timestamp
* Userstats are sumarised in the the same userstats table
* If USERSTATS_DB_S is close to the expected time per USERSTATS
* then it will have higher variance i.e. obviously: a higher
* average of stats per sample will mean a lower SD of the number
* of stats per sample
* The #if below ensures USERSTATS_DB_S times an integer = a day
* so the last band is the same size as the rest -
* and will graph easily
* Obvious WARNING - the smaller this is, the more stats in the DB
* This is summary level '1'
*/
#define USERSTATS_DB_S 3600
#if (((24*60*60) % USERSTATS_DB_S) != 0)
#error "USERSTATS_DB_S times an integer must = a day"
#endif
/* We summarise and discard userstats that are older than the
* maximum of USERSTATS_DB_S, USERSTATS_PER_S, ALLUSERS_LIMIT_S
*/
#if (USERSTATS_PER_S > ALLUSERS_LIMIT_S)
#if (USERSTATS_PER_S > USERSTATS_DB_S)
#define USERSTATS_AGE USERSTATS_PER_S
#else
#define USERSTATS_AGE USERSTATS_DB_S
#endif
#else
#if (ALLUSERS_LIMIT_S > USERSTATS_DB_S)
#define USERSTATS_AGE ALLUSERS_LIMIT_S
#else
#define USERSTATS_AGE USERSTATS_DB_S
#endif
#endif
/* summarisation of the userstats after this many days are done
* at the day level and the above stats are deleted from the db
* Obvious WARNING - the larger this is, the more stats in the DB
* This is summary level '2'
*/
#define USERSTATS_DB_D 7
#define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \
((_old)->tv_usec < (_new)->tv_usec) : \
((_old)->tv_sec < (_new)->tv_sec))
// zzz TODO
// WORKERSTATUS from various incoming data
typedef struct workerstatus {
int64_t userid;
char workername[TXT_BIG+1];
tv_t auth;
tv_t share;
tv_t stats;
tv_t idle;
} WORKERSTATUS;
#define ALLOC_WORKERSTATUS 1000
#define LIMIT_WORKERSTATUS 0
#define DATA_WORKERSTATUS(_item) ((WORKERSTATUS *)(_item->data))
static K_TREE *workerstatus_root;
static K_LIST *workerstatus_list;
static K_STORE *workerstatus_store;
static char logname[512];
#define LOGFILE(_msg) rotating_log(logname, _msg)
@ -1410,7 +1493,7 @@ static PGconn *dbconnect()
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK)
quithere(1, "ERR: Failed to connect to db '%s'", PQerrorMessage(conn));
quithere(1, "ERR: Failed to connect to db '%s'", pqerrmsg(conn));
return conn;
}
@ -1491,6 +1574,91 @@ cleanup:
return lastid;
}
// order by userid asc,workername asc
static double cmp_workerstatus(K_ITEM *a, K_ITEM *b)
{
double c = (double)(DATA_WORKERSTATUS(a)->userid -
DATA_WORKERSTATUS(b)->userid);
if (c == 0.0) {
c = strcmp(DATA_WORKERSTATUS(a)->workername,
DATA_WORKERSTATUS(b)->workername);
}
return c;
}
static K_ITEM *find_workerstatus(int64_t userid, char *workername)
{
WORKERSTATUS workerstatus;
K_TREE_CTX ctx[1];
K_ITEM look;
workerstatus.userid = userid;
STRNCPY(workerstatus.workername, workername);
look.data = (void *)(&workerstatus);
return find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx);
}
static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool create)
{
K_ITEM *item = NULL;
WORKERSTATUS *row;
item = find_workerstatus(userid, workername);
if (!item && create) {
K_WLOCK(workerstatus_list);
item = k_unlink_head(workerstatus_list);
row = DATA_WORKERSTATUS(item);
bzero(row, sizeof(*row));
row->userid = userid;
STRNCPY(row->workername, workername);
workerstatus_root = add_to_ktree(workerstatus_root, item, cmp_workerstatus);
k_add_head(workerstatus_store, item);
K_WUNLOCK(workerstatus_list);
}
return item;
}
#define find_create_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, true)
#define find_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, false)
static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats)
{
WORKERSTATUS *row;
K_ITEM *item;
LOGDEBUG("%s()", __func__);
if (auths) {
item = find_create_workerstatus(auths->userid, auths->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->auth), &(auths->createdate)))
memcpy(&(row->auth), &(auths->createdate), sizeof(row->auth));
}
if (shares) {
item = find_create_workerstatus(shares->userid, shares->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->share), &(shares->createdate)))
memcpy(&(row->share), &(shares->createdate), sizeof(row->share));
}
if (userstats) {
item = find_create_workerstatus(userstats->userid, userstats->workername);
row = DATA_WORKERSTATUS(item);
if (userstats->idle) {
if (tv_newer(&(row->idle), &(userstats->createdate)))
memcpy(&(row->idle), &(userstats->createdate), sizeof(row->idle));
} else {
if (tv_newer(&(row->stats), &(userstats->createdate)))
memcpy(&(row->stats), &(userstats->createdate), sizeof(row->idle));
}
}
}
// default tree order by username asc,expirydate desc
static double cmp_users(K_ITEM *a, K_ITEM *b)
{
@ -1716,10 +1884,12 @@ static bool users_fill(PGconn *conn)
K_WUNLOCK(users_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d users records", __func__, n);
}
return true;
return ok;
}
void users_reload()
@ -2140,10 +2310,12 @@ static bool workers_fill(PGconn *conn)
K_WUNLOCK(workers_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d workers records", __func__, n);
}
return true;
return ok;
}
void workers_reload()
@ -2281,10 +2453,12 @@ static bool payments_fill(PGconn *conn)
K_WUNLOCK(payments_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d payments records", __func__, n);
}
return true;
return ok;
}
void payments_reload()
@ -2669,10 +2843,12 @@ static bool workinfo_fill(PGconn *conn)
K_WUNLOCK(workinfo_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d workinfo records", __func__, n);
}
return true;
return ok;
}
void workinfo_reload()
@ -2766,6 +2942,8 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char
if (!w_item)
goto unitem;
workerstatus_update(NULL, shares, NULL);
sharesummary_update(NULL, shares, NULL, NULL, now, by, code, inet);
ok = true;
@ -3015,6 +3193,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
row->sharehi = row->sharerej = 0.0;
row->sharecount = row->errorcount = row->countlastupdate = 0;
row->inserted = false;
row->saveaged = false;
row->firstshare.tv_sec = sharecreatedate->tv_sec;
row->firstshare.tv_usec = sharecreatedate->tv_usec;
row->lastshare.tv_sec = row->firstshare.tv_sec;
@ -3131,6 +3310,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
row->countlastupdate = row->sharecount + row->errorcount;
row->inserted = true;
if (row->complete[0] == SUMMARY_AGED)
row->saveaged = true;
} else {
bool stats_update = false;
@ -3178,6 +3359,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
goto unparam;
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_AGED)
row->saveaged = true;
} else {
if (!must_update) {
ok = true;
@ -3202,6 +3385,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
goto unparam;
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_AGED)
row->saveaged = true;
}
}
}
@ -3382,10 +3567,12 @@ static bool sharesummary_fill(PGconn *conn)
K_WUNLOCK(sharesummary_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d sharesummary records", __func__, n);
}
return true;
return ok;
}
void sharesummary_reload()
@ -3698,10 +3885,12 @@ static bool blocks_fill(PGconn *conn)
K_WUNLOCK(blocks_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d blocks records", __func__, n);
}
return true;
return ok;
}
void blocks_reload()
@ -3778,6 +3967,9 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username,
HISTORYDATEINIT(row, now, by, code, inet);
HISTORYDATETRANSFER(row);
// Update even if DB fails
workerstatus_update(row, NULL, NULL);
row->authid = nextid(conn, "authid", (int64_t)1, now, by, code, inet);
if (row->authid == 0)
goto unitem;
@ -3914,10 +4106,12 @@ static bool auths_fill(PGconn *conn)
K_WUNLOCK(auths_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d auth records", __func__, n);
}
return true;
return ok;
}
void auths_reload()
@ -4115,10 +4309,12 @@ static bool poolstats_fill(PGconn *conn)
K_WUNLOCK(poolstats_list);
PQclear(res);
if (ok)
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d poolstats records", __func__, n);
}
return true;
return ok;
}
void poolstats_reload()
@ -4195,8 +4391,8 @@ static double cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
static bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool eos,
tv_t *now, char *by, char *code, char *inet)
char *hashrate1hr, char *hashrate24hr, bool idle,
bool eos, tv_t *now, char *by, char *code, char *inet)
{
K_ITEM *us_item, *u_item, *us_match, *us_next;
USERSTATS *row;
@ -4221,6 +4417,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
TXT_TO_DOUBLE("hashrate5m", hashrate5m, row->hashrate5m);
TXT_TO_DOUBLE("hashrate1hr", hashrate1hr, row->hashrate1hr);
TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr);
row->idle = idle;
SIMPLEDATEINIT(row, now, by, code, inet);
SIMPLEDATETRANSFER(row);
@ -4230,6 +4427,8 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
createdate.tv_usec = row->createdate.tv_usec;
}
workerstatus_update(NULL, NULL, row);
/* group at full key: userid,createdate,poolinstance,workername
i.e. ignore instance and group together down at workername */
us_match = userstats_eos_store->head;
@ -4353,29 +4552,39 @@ static bool check_db_version(PGconn *conn)
}
PQclear(res);
LOGWARNING("%s(): DB version (%s) correct", __func__, DB_VERSION);
return true;
}
static bool getdata()
{
PGconn *conn = dbconnect();
if (!check_db_version(conn)) {
PQfinish(conn);
return false;
}
users_fill(conn);
workers_fill(conn);
payments_fill(conn);
workinfo_fill(conn);
shares_fill();
shareerrors_fill();
auths_fill(conn);
poolstats_fill(conn);
bool ok = true;
if (!(ok = check_db_version(conn)))
goto matane;
if (!(ok = users_fill(conn)))
goto matane;
if (!(ok = workers_fill(conn)))
goto matane;
if (!(ok = payments_fill(conn)))
goto matane;
if (!(ok = workinfo_fill(conn)))
goto matane;
if (!(ok = shares_fill()))
goto matane;
if (!(ok = shareerrors_fill()))
goto matane;
if (!(ok = auths_fill(conn)))
goto matane;
ok = poolstats_fill(conn);
matane:
PQfinish(conn);
return true;
return ok;
}
/* TODO:
@ -4532,6 +4741,10 @@ static bool setup_data()
userstats_root = new_ktree();
userstats_list->dsp_func = dsp_userstats;
workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true);
workerstatus_store = k_new_store(workerstatus_list);
workerstatus_root = new_ktree();
if (!getdata())
return false;
@ -4727,8 +4940,8 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
K_ITEM *i_poolinstance, *i_elapsed, *i_username, *i_workername;
K_ITEM *i_hashrate, *i_hashrate5m, *i_hashrate1hr, *i_hashrate24hr;
K_ITEM *i_eos;
bool ok = false, eos;
K_ITEM *i_eos, *i_idle;
bool ok = false, idle, eos;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -4764,6 +4977,12 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
if (!i_hashrate24hr)
return strdup(reply);
i_idle = require_name("idle", 1, NULL, reply, siz);
if (!i_idle)
return strdup(reply);
idle = (strcasecmp(DATA_TRANSFER(i_idle)->data, TRUE_STR) == 0);
i_eos = require_name("eos", 1, NULL, reply, siz);
if (!i_eos)
return strdup(reply);
@ -4778,7 +4997,7 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
DATA_TRANSFER(i_hashrate5m)->data,
DATA_TRANSFER(i_hashrate1hr)->data,
DATA_TRANSFER(i_hashrate24hr)->data,
eos, now, by, code, inet);
idle, eos, now, by, code, inet);
if (!ok) {
LOGERR("%s.failed.DATA", id);
@ -4920,7 +5139,7 @@ static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet)
{
K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item;
K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item, *ws_item;
K_TREE_CTX w_ctx[1], us_ctx[1];
WORKERS workers;
USERSTATS userstats;
@ -4982,9 +5201,14 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
int64_t w_elapsed;
tv_t w_lastshare;
w_lastshare.tv_sec = 0;
w_hashrate5m = w_hashrate1hr = 0.0;
w_elapsed = -1;
w_lastshare.tv_sec = 0;
ws_item = find_workerstatus(DATA_USERS(u_item)->userid,
DATA_WORKERS(w_item)->workername);
if (ws_item)
w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->share.tv_sec;
// find last stored userid record
userstats.userid = DATA_USERS(u_item)->userid;
@ -4997,10 +5221,6 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
us_item = find_before_in_ktree(userstats_root, &uslook, cmp_userstats, us_ctx);
while (us_item && DATA_USERSTATS(us_item)->userid == userstats.userid) {
if (strcmp(DATA_USERSTATS(us_item)->workername, DATA_WORKERS(w_item)->workername) == 0) {
// first found is the newest share
if (w_lastshare.tv_sec == 0)
w_lastshare.tv_sec = DATA_USERSTATS(us_item)->createdate.tv_sec;
if (tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < USERSTATS_PER_S) {
// TODO: add together the latest per pool instance (this is the latest per worker)
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx)) {
@ -5935,6 +6155,36 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
return cmds[*which_cmds].cmd_val;
}
static void summarise_poolstats()
{
// TODO
}
static void summarise_userstats()
{
// TODO
}
static void *summariser(__maybe_unused void *arg)
{
pthread_detach(pthread_self());
while (!summarizer_die && !summarizer_go)
cksleep_ms(42);
while (!summarizer_die) {
sleep(19);
if (!summarizer_die)
summarise_poolstats();
if (!summarizer_die)
summarise_userstats();
}
return NULL;
}
// TODO: equivalent of api_allow
static void *listener(void *arg)
{
@ -5944,10 +6194,13 @@ static void *listener(void *arg)
char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1];
enum cmd_values cmdnum;
int sockd, which_cmds;
pthread_t summzer;
K_ITEM *item;
size_t siz;
tv_t now;
create_pthread(&summzer, summariser, NULL);
rename_proc(pi->sockname);
if (!setup_data()) {
@ -5955,6 +6208,10 @@ static void *listener(void *arg)
return NULL;
}
LOGWARNING("%s(): ckdb ready", __func__);
summarizer_go = true;
while (true) {
dealloc(buf);
sockd = accept(us->sockd, NULL, NULL);
@ -6044,6 +6301,7 @@ int main(int argc, char **argv)
ckpool_t ckp;
int c, ret;
char *kill;
tv_t now;
feenableexcept(FE_DIVBYZERO | FE_INVALID | FE_OVERFLOW);
@ -6144,7 +6402,8 @@ int main(int argc, char **argv)
write_namepid(&ckp.main);
create_process_unixsock(&ckp.main);
srand((unsigned int)time(NULL));
setnow(&now);
srand((unsigned int)(now.tv_usec * 4096 + now.tv_sec % 4096));
create_pthread(&ckp.pth_listener, listener, &ckp.main);
handler.sa_flags = 0;

Loading…
Cancel
Save