diff --git a/src/ckdb.c b/src/ckdb.c index fc0e0ea4..d46608cf 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -93,14 +93,22 @@ ASSERT2(sizeof(int64_t) == 8); (_res) == PGRES_TUPLES_OK || \ (_res) == PGRES_EMPTY_QUERY) +static char *pqerrmsg(PGconn *conn) +{ + char *ptr, *buf = strdup(PQerrorMessage(conn)); + + ptr = buf + strlen(buf) - 1; + while (ptr >= buf && (*ptr == '\n' || *ptr == '\r')) + *(ptr--) = '\0'; + while (--ptr >= buf) { + if (*ptr == '\n' || *ptr == '\r' || *ptr == '\t') + *ptr = ' '; + } + return buf; +} + #define PGLOG(__LOG, __str, __rescode, __conn) do { \ - char *__ptr, *__buf = strdup(PQerrorMessage(__conn)); \ - __ptr = __buf + strlen(__buf) - 1; \ - while (__ptr >= __buf && (*__ptr == '\n' || *__ptr == '\r')) \ - *(__ptr--) = '\0'; \ - while (--__ptr >= __buf) \ - if (*__ptr == '\n' || *__ptr == '\r' || *__ptr == '\t') \ - *__ptr = ' '; \ + char *__buf = pqerrmsg(__conn); \ __LOG("%s(): %s failed (%d) '%s'", __func__, \ __str, (int)rescode, __buf); \ free(__buf); \ @@ -469,6 +477,11 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; } \ } while (0) +// Tell the summarizer data load is complete +static bool summarizer_go = false; +// Tell the summarizer to die +static bool summarizer_die = false; + static const char *userpatt = "^[!-~]*$"; // no spaces static const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.]*[A-Za-z0-9]$"; static const char *idpatt = "^[_A-Za-z][_A-Za-z0-9]*$"; @@ -478,7 +491,7 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$"; #define JSON_TRANSFER "json=" #define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1) -// Methods for sharelog (common function for all 3) +// Methods for sharelog (common function for all) #define STR_WORKINFO "workinfo" #define STR_SHARES "shares" #define STR_SHAREERRORS "shareerror" @@ -779,6 +792,7 @@ typedef struct sharesummary { int64_t errorcount; int64_t countlastupdate; // non-DB field bool inserted; // non-DB field + bool saveaged; // non-DB field tv_t firstshare; tv_t lastshare; char complete[TXT_FLAG+1]; @@ -912,7 +926,6 @@ static K_STORE *poolstats_store; // USERSTATS userstats.id.json={...} // Pool sends each user (staggered) once per 10m -// TODO: When to discard? typedef struct userstats { char poolinstance[TXT_BIG+1]; int64_t userid; @@ -922,15 +935,17 @@ typedef struct userstats { double hashrate5m; double hashrate1hr; double hashrate24hr; + bool idle; // Non-db field + char summarylevel[TXT_FLAG+1]; // Initially '0' in the DB SIMPLEDATECONTROLFIELDS; } USERSTATS; -/* USERSATS protocol includes a boolean 'eos' that when true, +/* USERSTATS protocol includes a boolean 'eos' that when true, * we have received the full set of data for the given * createdate batch, and thus can move all (complete) records - * matching the createdate in userstats_eos_store into the tree */ + * matching the createdate from userstats_eos_store into the tree */ -#define ALLOC_USERSTATS 1000 +#define ALLOC_USERSTATS 10000 #define LIMIT_USERSTATS 0 #define DATA_USERSTATS(_item) ((USERSTATS *)(_item->data)) @@ -944,11 +959,79 @@ static K_STORE *userstats_eos_store; * This is used when grouping the sub-worker stats into a single user * We add each worker's latest stats to the total - except we ignore * any worker with newest stats being older than USERSTATS_PER_S */ -#define USERSTATS_PER_S (int)(600 * 1.5) +#define USERSTATS_PER_S 900 -/* on the allusers page, show any with stats in the last 1hr */ +/* on the allusers page, show any with stats in the last ... */ #define ALLUSERS_LIMIT_S 3600 +/* Userstats get stored in the DB for each time band of this + * amount from midnight (UTC+00) + * Thus we simply put each stats value in the time band of the + * stat's timestamp + * Userstats are sumarised in the the same userstats table + * If USERSTATS_DB_S is close to the expected time per USERSTATS + * then it will have higher variance i.e. obviously: a higher + * average of stats per sample will mean a lower SD of the number + * of stats per sample + * The #if below ensures USERSTATS_DB_S times an integer = a day + * so the last band is the same size as the rest - + * and will graph easily + * Obvious WARNING - the smaller this is, the more stats in the DB + * This is summary level '1' + */ +#define USERSTATS_DB_S 3600 + +#if (((24*60*60) % USERSTATS_DB_S) != 0) +#error "USERSTATS_DB_S times an integer must = a day" +#endif + +/* We summarise and discard userstats that are older than the + * maximum of USERSTATS_DB_S, USERSTATS_PER_S, ALLUSERS_LIMIT_S + */ +#if (USERSTATS_PER_S > ALLUSERS_LIMIT_S) + #if (USERSTATS_PER_S > USERSTATS_DB_S) + #define USERSTATS_AGE USERSTATS_PER_S + #else + #define USERSTATS_AGE USERSTATS_DB_S + #endif +#else + #if (ALLUSERS_LIMIT_S > USERSTATS_DB_S) + #define USERSTATS_AGE ALLUSERS_LIMIT_S + #else + #define USERSTATS_AGE USERSTATS_DB_S + #endif +#endif + +/* summarisation of the userstats after this many days are done + * at the day level and the above stats are deleted from the db + * Obvious WARNING - the larger this is, the more stats in the DB + * This is summary level '2' + */ +#define USERSTATS_DB_D 7 + +#define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \ + ((_old)->tv_usec < (_new)->tv_usec) : \ + ((_old)->tv_sec < (_new)->tv_sec)) + +// zzz TODO +// WORKERSTATUS from various incoming data +typedef struct workerstatus { + int64_t userid; + char workername[TXT_BIG+1]; + tv_t auth; + tv_t share; + tv_t stats; + tv_t idle; +} WORKERSTATUS; + +#define ALLOC_WORKERSTATUS 1000 +#define LIMIT_WORKERSTATUS 0 +#define DATA_WORKERSTATUS(_item) ((WORKERSTATUS *)(_item->data)) + +static K_TREE *workerstatus_root; +static K_LIST *workerstatus_list; +static K_STORE *workerstatus_store; + static char logname[512]; #define LOGFILE(_msg) rotating_log(logname, _msg) @@ -1410,7 +1493,7 @@ static PGconn *dbconnect() conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK) - quithere(1, "ERR: Failed to connect to db '%s'", PQerrorMessage(conn)); + quithere(1, "ERR: Failed to connect to db '%s'", pqerrmsg(conn)); return conn; } @@ -1491,6 +1574,91 @@ cleanup: return lastid; } +// order by userid asc,workername asc +static double cmp_workerstatus(K_ITEM *a, K_ITEM *b) +{ + double c = (double)(DATA_WORKERSTATUS(a)->userid - + DATA_WORKERSTATUS(b)->userid); + if (c == 0.0) { + c = strcmp(DATA_WORKERSTATUS(a)->workername, + DATA_WORKERSTATUS(b)->workername); + } + return c; +} + +static K_ITEM *find_workerstatus(int64_t userid, char *workername) +{ + WORKERSTATUS workerstatus; + K_TREE_CTX ctx[1]; + K_ITEM look; + + workerstatus.userid = userid; + STRNCPY(workerstatus.workername, workername); + + look.data = (void *)(&workerstatus); + return find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx); +} + +static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool create) +{ + K_ITEM *item = NULL; + WORKERSTATUS *row; + + item = find_workerstatus(userid, workername); + if (!item && create) { + K_WLOCK(workerstatus_list); + item = k_unlink_head(workerstatus_list); + + row = DATA_WORKERSTATUS(item); + + bzero(row, sizeof(*row)); + row->userid = userid; + STRNCPY(row->workername, workername); + + workerstatus_root = add_to_ktree(workerstatus_root, item, cmp_workerstatus); + k_add_head(workerstatus_store, item); + K_WUNLOCK(workerstatus_list); + } + return item; +} + +#define find_create_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, true) +#define find_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, false) + +static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats) +{ + WORKERSTATUS *row; + K_ITEM *item; + + LOGDEBUG("%s()", __func__); + + if (auths) { + item = find_create_workerstatus(auths->userid, auths->workername); + row = DATA_WORKERSTATUS(item); + if (tv_newer(&(row->auth), &(auths->createdate))) + memcpy(&(row->auth), &(auths->createdate), sizeof(row->auth)); + } + + if (shares) { + item = find_create_workerstatus(shares->userid, shares->workername); + row = DATA_WORKERSTATUS(item); + if (tv_newer(&(row->share), &(shares->createdate))) + memcpy(&(row->share), &(shares->createdate), sizeof(row->share)); + } + + if (userstats) { + item = find_create_workerstatus(userstats->userid, userstats->workername); + row = DATA_WORKERSTATUS(item); + if (userstats->idle) { + if (tv_newer(&(row->idle), &(userstats->createdate))) + memcpy(&(row->idle), &(userstats->createdate), sizeof(row->idle)); + } else { + if (tv_newer(&(row->stats), &(userstats->createdate))) + memcpy(&(row->stats), &(userstats->createdate), sizeof(row->idle)); + } + } +} + // default tree order by username asc,expirydate desc static double cmp_users(K_ITEM *a, K_ITEM *b) { @@ -1716,10 +1884,12 @@ static bool users_fill(PGconn *conn) K_WUNLOCK(users_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d users records", __func__, n); + } - return true; + return ok; } void users_reload() @@ -2140,10 +2310,12 @@ static bool workers_fill(PGconn *conn) K_WUNLOCK(workers_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d workers records", __func__, n); + } - return true; + return ok; } void workers_reload() @@ -2281,10 +2453,12 @@ static bool payments_fill(PGconn *conn) K_WUNLOCK(payments_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d payments records", __func__, n); + } - return true; + return ok; } void payments_reload() @@ -2669,10 +2843,12 @@ static bool workinfo_fill(PGconn *conn) K_WUNLOCK(workinfo_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d workinfo records", __func__, n); + } - return true; + return ok; } void workinfo_reload() @@ -2766,6 +2942,8 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char if (!w_item) goto unitem; + workerstatus_update(NULL, shares, NULL); + sharesummary_update(NULL, shares, NULL, NULL, now, by, code, inet); ok = true; @@ -3015,6 +3193,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, row->sharehi = row->sharerej = 0.0; row->sharecount = row->errorcount = row->countlastupdate = 0; row->inserted = false; + row->saveaged = false; row->firstshare.tv_sec = sharecreatedate->tv_sec; row->firstshare.tv_usec = sharecreatedate->tv_usec; row->lastshare.tv_sec = row->firstshare.tv_sec; @@ -3131,6 +3310,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, row->countlastupdate = row->sharecount + row->errorcount; row->inserted = true; + if (row->complete[0] == SUMMARY_AGED) + row->saveaged = true; } else { bool stats_update = false; @@ -3178,6 +3359,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, goto unparam; } row->countlastupdate = row->sharecount + row->errorcount; + if (row->complete[0] == SUMMARY_AGED) + row->saveaged = true; } else { if (!must_update) { ok = true; @@ -3202,6 +3385,8 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, goto unparam; } row->countlastupdate = row->sharecount + row->errorcount; + if (row->complete[0] == SUMMARY_AGED) + row->saveaged = true; } } } @@ -3382,10 +3567,12 @@ static bool sharesummary_fill(PGconn *conn) K_WUNLOCK(sharesummary_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d sharesummary records", __func__, n); + } - return true; + return ok; } void sharesummary_reload() @@ -3698,10 +3885,12 @@ static bool blocks_fill(PGconn *conn) K_WUNLOCK(blocks_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d blocks records", __func__, n); + } - return true; + return ok; } void blocks_reload() @@ -3778,6 +3967,9 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, HISTORYDATEINIT(row, now, by, code, inet); HISTORYDATETRANSFER(row); + // Update even if DB fails + workerstatus_update(row, NULL, NULL); + row->authid = nextid(conn, "authid", (int64_t)1, now, by, code, inet); if (row->authid == 0) goto unitem; @@ -3914,10 +4106,12 @@ static bool auths_fill(PGconn *conn) K_WUNLOCK(auths_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d auth records", __func__, n); + } - return true; + return ok; } void auths_reload() @@ -4115,10 +4309,12 @@ static bool poolstats_fill(PGconn *conn) K_WUNLOCK(poolstats_list); PQclear(res); - if (ok) + if (ok) { LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d poolstats records", __func__, n); + } - return true; + return ok; } void poolstats_reload() @@ -4195,8 +4391,8 @@ static double cmp_userstats_workername(K_ITEM *a, K_ITEM *b) static bool userstats_add(char *poolinstance, char *elapsed, char *username, char *workername, char *hashrate, char *hashrate5m, - char *hashrate1hr, char *hashrate24hr, bool eos, - tv_t *now, char *by, char *code, char *inet) + char *hashrate1hr, char *hashrate24hr, bool idle, + bool eos, tv_t *now, char *by, char *code, char *inet) { K_ITEM *us_item, *u_item, *us_match, *us_next; USERSTATS *row; @@ -4221,6 +4417,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, TXT_TO_DOUBLE("hashrate5m", hashrate5m, row->hashrate5m); TXT_TO_DOUBLE("hashrate1hr", hashrate1hr, row->hashrate1hr); TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr); + row->idle = idle; SIMPLEDATEINIT(row, now, by, code, inet); SIMPLEDATETRANSFER(row); @@ -4230,6 +4427,8 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, createdate.tv_usec = row->createdate.tv_usec; } + workerstatus_update(NULL, NULL, row); + /* group at full key: userid,createdate,poolinstance,workername i.e. ignore instance and group together down at workername */ us_match = userstats_eos_store->head; @@ -4353,29 +4552,39 @@ static bool check_db_version(PGconn *conn) } PQclear(res); + + LOGWARNING("%s(): DB version (%s) correct", __func__, DB_VERSION); + return true; } static bool getdata() { PGconn *conn = dbconnect(); - - if (!check_db_version(conn)) { - PQfinish(conn); - return false; - } - - users_fill(conn); - workers_fill(conn); - payments_fill(conn); - workinfo_fill(conn); - shares_fill(); - shareerrors_fill(); - auths_fill(conn); - poolstats_fill(conn); + bool ok = true; + + if (!(ok = check_db_version(conn))) + goto matane; + if (!(ok = users_fill(conn))) + goto matane; + if (!(ok = workers_fill(conn))) + goto matane; + if (!(ok = payments_fill(conn))) + goto matane; + if (!(ok = workinfo_fill(conn))) + goto matane; + if (!(ok = shares_fill())) + goto matane; + if (!(ok = shareerrors_fill())) + goto matane; + if (!(ok = auths_fill(conn))) + goto matane; + ok = poolstats_fill(conn); + +matane: PQfinish(conn); - return true; + return ok; } /* TODO: @@ -4532,6 +4741,10 @@ static bool setup_data() userstats_root = new_ktree(); userstats_list->dsp_func = dsp_userstats; + workerstatus_list = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); + workerstatus_store = k_new_store(workerstatus_list); + workerstatus_root = new_ktree(); + if (!getdata()) return false; @@ -4727,8 +4940,8 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char * K_ITEM *i_poolinstance, *i_elapsed, *i_username, *i_workername; K_ITEM *i_hashrate, *i_hashrate5m, *i_hashrate1hr, *i_hashrate24hr; - K_ITEM *i_eos; - bool ok = false, eos; + K_ITEM *i_eos, *i_idle; + bool ok = false, idle, eos; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); @@ -4764,6 +4977,12 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char * if (!i_hashrate24hr) return strdup(reply); + i_idle = require_name("idle", 1, NULL, reply, siz); + if (!i_idle) + return strdup(reply); + + idle = (strcasecmp(DATA_TRANSFER(i_idle)->data, TRUE_STR) == 0); + i_eos = require_name("eos", 1, NULL, reply, siz); if (!i_eos) return strdup(reply); @@ -4778,7 +4997,7 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char * DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - eos, now, by, code, inet); + idle, eos, now, by, code, inet); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -4920,7 +5139,7 @@ static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet) { - K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item; + K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item, *ws_item; K_TREE_CTX w_ctx[1], us_ctx[1]; WORKERS workers; USERSTATS userstats; @@ -4982,9 +5201,14 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ int64_t w_elapsed; tv_t w_lastshare; - w_lastshare.tv_sec = 0; w_hashrate5m = w_hashrate1hr = 0.0; w_elapsed = -1; + w_lastshare.tv_sec = 0; + + ws_item = find_workerstatus(DATA_USERS(u_item)->userid, + DATA_WORKERS(w_item)->workername); + if (ws_item) + w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->share.tv_sec; // find last stored userid record userstats.userid = DATA_USERS(u_item)->userid; @@ -4997,10 +5221,6 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ us_item = find_before_in_ktree(userstats_root, &uslook, cmp_userstats, us_ctx); while (us_item && DATA_USERSTATS(us_item)->userid == userstats.userid) { if (strcmp(DATA_USERSTATS(us_item)->workername, DATA_WORKERS(w_item)->workername) == 0) { - // first found is the newest share - if (w_lastshare.tv_sec == 0) - w_lastshare.tv_sec = DATA_USERSTATS(us_item)->createdate.tv_sec; - if (tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < USERSTATS_PER_S) { // TODO: add together the latest per pool instance (this is the latest per worker) if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx)) { @@ -5935,6 +6155,36 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id return cmds[*which_cmds].cmd_val; } +static void summarise_poolstats() +{ +// TODO +} + +static void summarise_userstats() +{ +// TODO +} + +static void *summariser(__maybe_unused void *arg) +{ + pthread_detach(pthread_self()); + + while (!summarizer_die && !summarizer_go) + cksleep_ms(42); + + while (!summarizer_die) { + sleep(19); + + if (!summarizer_die) + summarise_poolstats(); + + if (!summarizer_die) + summarise_userstats(); + } + + return NULL; +} + // TODO: equivalent of api_allow static void *listener(void *arg) { @@ -5944,10 +6194,13 @@ static void *listener(void *arg) char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; enum cmd_values cmdnum; int sockd, which_cmds; + pthread_t summzer; K_ITEM *item; size_t siz; tv_t now; + create_pthread(&summzer, summariser, NULL); + rename_proc(pi->sockname); if (!setup_data()) { @@ -5955,6 +6208,10 @@ static void *listener(void *arg) return NULL; } + LOGWARNING("%s(): ckdb ready", __func__); + + summarizer_go = true; + while (true) { dealloc(buf); sockd = accept(us->sockd, NULL, NULL); @@ -6044,6 +6301,7 @@ int main(int argc, char **argv) ckpool_t ckp; int c, ret; char *kill; + tv_t now; feenableexcept(FE_DIVBYZERO | FE_INVALID | FE_OVERFLOW); @@ -6144,7 +6402,8 @@ int main(int argc, char **argv) write_namepid(&ckp.main); create_process_unixsock(&ckp.main); - srand((unsigned int)time(NULL)); + setnow(&now); + srand((unsigned int)(now.tv_usec * 4096 + now.tv_sec % 4096)); create_pthread(&ckp.pth_listener, listener, &ckp.main); handler.sa_flags = 0;