Browse Source

ckdb - v0.5 userstats chanegs and summarisation fixes

master
kanoi 11 years ago
parent
commit
01f4cd9335
  1. 1
      sql/ckdb.sql
  2. 42
      sql/v0.4-v0.5.sql
  3. 84
      src/ckdb.c

1
sql/ckdb.sql

@ -374,6 +374,7 @@ CREATE TABLE userstats (
hashrate1hr float NOT NULL, hashrate1hr float NOT NULL,
hashrate24hr float NOT NULL, hashrate24hr float NOT NULL,
summarylevel char NOT NULL, summarylevel char NOT NULL,
summarycount integer NOT NULL,
statsdate timestamp with time zone NOT NULL, statsdate timestamp with time zone NOT NULL,
createdate timestamp with time zone NOT NULL, createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL, createby character varying(64) DEFAULT ''::character varying NOT NULL,

42
sql/v0.4-v0.5.sql

@ -0,0 +1,42 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.5' where vlock=1 and version='0.4';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.4" - found "%"', ver;
END $$;
DROP TABLE userstats;
CREATE TABLE userstats (
userid bigint NOT NULL,
workername character varying(256) NOT NULL,
elapsed bigint NOT NULL,
hashrate float NOT NULL,
hashrate5m float NOT NULL,
hashrate1hr float NOT NULL,
hashrate24hr float NOT NULL,
summarylevel char NOT NULL,
summarycount integer NOT NULL,
statsdate timestamp with time zone NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
PRIMARY KEY (userid, workername, summarylevel, statsdate)
);
END transaction;

84
src/ckdb.c

@ -44,7 +44,7 @@
* */ * */
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.4" #define DB_VERSION "0.5"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1021,6 +1021,7 @@ typedef struct userstats {
double hashrate24hr; double hashrate24hr;
bool idle; // Non-db field bool idle; // Non-db field
char summarylevel[TXT_FLAG+1]; // Initially SUMMARY_NONE in RAM char summarylevel[TXT_FLAG+1]; // Initially SUMMARY_NONE in RAM
int32_t summarycount;
tv_t statsdate; tv_t statsdate;
SIMPLEDATECONTROLFIELDS; SIMPLEDATECONTROLFIELDS;
} USERSTATS; } USERSTATS;
@ -4470,7 +4471,7 @@ void poolstats_reload()
static void dsp_userstats(K_ITEM *item, FILE *stream) static void dsp_userstats(K_ITEM *item, FILE *stream)
{ {
USERSTATS *u = NULL; USERSTATS *u = NULL;
char *createdate_buf; char *statsdate_buf, *createdate_buf;
if (!stream) if (!stream)
LOGERR("%s() called with (null) stream", __func__); LOGERR("%s() called with (null) stream", __func__);
@ -4480,13 +4481,16 @@ static void dsp_userstats(K_ITEM *item, FILE *stream)
else { else {
u = DATA_USERSTATS(item); u = DATA_USERSTATS(item);
statsdate_buf = tv_to_buf(&(u->statsdate), NULL, 0);
createdate_buf = tv_to_buf(&(u->createdate), NULL, 0); createdate_buf = tv_to_buf(&(u->createdate), NULL, 0);
fprintf(stream, " pi='%s' e=%"PRId64" uid=%"PRId64" w='%s' Hs=%f " fprintf(stream, " pi='%s' uid=%"PRId64" w='%s' e=%"PRId64" Hs=%f "
"Hs5m=%f Hs1hr=%f Hs24hr=%f cd=%s\n", "Hs5m=%f Hs1hr=%f Hs24hr=%f sd=%s cd=%s\n",
u->poolinstance, u->elapsed, u->userid, u->poolinstance, u->userid, u->workername,
u->workername, u->hashrate, u->hashrate5m, u->elapsed, u->hashrate, u->hashrate5m,
u->hashrate1hr, u->hashrate24hr, createdate_buf); u->hashrate1hr, u->hashrate24hr,
statsdate_buf, createdate_buf);
free(createdate_buf); free(createdate_buf);
free(statsdate_buf);
} }
} }
} }
@ -4552,7 +4556,7 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row)
PGresult *res; PGresult *res;
char *ins; char *ins;
bool ok = false; bool ok = false;
char *params[9 + SIMPLEDATECOUNT]; char *params[10 + SIMPLEDATECOUNT];
int par; int par;
int n; int n;
@ -4567,14 +4571,15 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row)
params[par++] = double_to_buf(row->hashrate1hr, NULL, 0); params[par++] = double_to_buf(row->hashrate1hr, NULL, 0);
params[par++] = double_to_buf(row->hashrate24hr, NULL, 0); params[par++] = double_to_buf(row->hashrate24hr, NULL, 0);
params[par++] = str_to_buf(row->summarylevel, NULL, 0); params[par++] = str_to_buf(row->summarylevel, NULL, 0);
params[par++] = int_to_buf(row->summarycount, NULL, 0);
params[par++] = tv_to_buf(&(row->statsdate), NULL, 0); params[par++] = tv_to_buf(&(row->statsdate), NULL, 0);
SIMPLEDATEPARAMS(params, par, row); SIMPLEDATEPARAMS(params, par, row);
PARCHK(par, params); PARCHK(par, params);
ins = "insert into userstats " ins = "insert into userstats "
"(userid,workername,elapsed,hashrate,hashrate5m," "(userid,workername,elapsed,hashrate,hashrate5m,hashrate1hr,"
"hashrate1hr,hashrate24hr,summarylevel,statsdate" "hashrate24hr,summarylevel,summarycount,statsdate"
SIMPLEDATECONTROL ") values (" PQPARAM13 ")"; SIMPLEDATECONTROL ") values (" PQPARAM14 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
@ -4623,6 +4628,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
row->idle = idle; row->idle = idle;
row->summarylevel[0] = SUMMARY_NONE; row->summarylevel[0] = SUMMARY_NONE;
row->summarylevel[1] = '\0'; row->summarylevel[1] = '\0';
row->summarycount = 1;
SIMPLEDATEINIT(row, now, by, code, inet); SIMPLEDATEINIT(row, now, by, code, inet);
SIMPLEDATETRANSFER(row); SIMPLEDATETRANSFER(row);
copy_tv(&(row->statsdate), &(row->createdate)); copy_tv(&(row->statsdate), &(row->createdate));
@ -4692,6 +4698,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
return true; return true;
} }
// Requires K_WLOCK(userstats_list)
static void userstats_update_ccl(USERSTATS *row) static void userstats_update_ccl(USERSTATS *row)
{ {
USERSTATS userstats, *tmp; USERSTATS userstats, *tmp;
@ -4728,7 +4735,6 @@ static void userstats_update_ccl(USERSTATS *row)
if (tv_newer(&(tmp->statsdate), &(userstats.statsdate))) if (tv_newer(&(tmp->statsdate), &(userstats.statsdate)))
copy_tv(&(tmp->statsdate), &(userstats.statsdate)); copy_tv(&(tmp->statsdate), &(userstats.statsdate));
} else { } else {
K_WLOCK(userstats_list);
item = k_unlink_head(userstats_list); item = k_unlink_head(userstats_list);
tmp = DATA_USERSTATS(item); tmp = DATA_USERSTATS(item);
bzero(tmp, sizeof(*tmp)); bzero(tmp, sizeof(*tmp));
@ -4738,7 +4744,6 @@ static void userstats_update_ccl(USERSTATS *row)
userstats_ccl_root = add_to_ktree(userstats_ccl_root, item, userstats_ccl_root = add_to_ktree(userstats_ccl_root, item,
cmp_userstats_workername); cmp_userstats_workername);
k_add_head(userstats_ccl, item); k_add_head(userstats_ccl, item);
K_WUNLOCK(userstats_list);
} }
} }
@ -4752,14 +4757,14 @@ static bool userstats_fill(PGconn *conn)
USERSTATS *row; USERSTATS *row;
char *field; char *field;
char *sel; char *sel;
int fields = 9; int fields = 10;
bool ok; bool ok;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
sel = "select " sel = "select "
"userid,workername,elapsed,hashrate,hashrate5m,hashrate1hr," "userid,workername,elapsed,hashrate,hashrate5m,hashrate1hr,"
"hashrate24hr,summarylevel,statsdate" "hashrate24hr,summarylevel,summarycount,statsdate"
SIMPLEDATECONTROL SIMPLEDATECONTROL
" from userstats"; " from userstats";
res = PQexec(conn, sel); res = PQexec(conn, sel);
@ -4829,6 +4834,11 @@ static bool userstats_fill(PGconn *conn)
break; break;
TXT_TO_STR("summarylevel", field, row->summarylevel); TXT_TO_STR("summarylevel", field, row->summarylevel);
PQ_GET_FLD(res, i, "summarycount", field, ok);
if (!ok)
break;
TXT_TO_INT("summarycount", field, row->summarycount);
PQ_GET_FLD(res, i, "statsdate", field, ok); PQ_GET_FLD(res, i, "statsdate", field, ok);
if (!ok) if (!ok)
break; break;
@ -6618,10 +6628,13 @@ static void summarise_userstats()
tv_t now, when; tv_t now, when;
PGconn *conn = NULL; PGconn *conn = NULL;
int count; int count;
char error[1024];
char tvbuf1[DATE_BUFSIZ], tvbuf2[DATE_BUFSIZ];
upgrade = false; upgrade = false;
locked = false; locked = false;
while (1764) { while (1764) {
error[0] = '\0';
setnow(&now); setnow(&now);
upgrade = false; upgrade = false;
locked = true; locked = true;
@ -6659,8 +6672,8 @@ static void summarise_userstats()
userstats = DATA_USERSTATS(new); userstats = DATA_USERSTATS(new);
memcpy(userstats, DATA_USERSTATS(first), sizeof(USERSTATS)); memcpy(userstats, DATA_USERSTATS(first), sizeof(USERSTATS));
remove_from_ktree(userstats_root, first, cmp_userstats, ctx2); userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats, ctx2);
remove_from_ktree(userstats_statsdate_root, first, cmp_userstats_statsdate, ctx2); userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first, cmp_userstats_statsdate, ctx2);
k_unlink_item(userstats_store, first); k_unlink_item(userstats_store, first);
k_add_head(userstats_summ, first); k_add_head(userstats_summ, first);
@ -6672,7 +6685,9 @@ static void summarise_userstats()
tmp = next_in_ktree(ctx); tmp = next_in_ktree(ctx);
if (DATA_USERSTATS(next)->userid == userstats->userid &&
if (DATA_USERSTATS(next)->summarylevel[0] == SUMMARY_NONE &&
DATA_USERSTATS(next)->userid == userstats->userid &&
strcmp(DATA_USERSTATS(next)->workername, userstats->workername) == 0) { strcmp(DATA_USERSTATS(next)->workername, userstats->workername) == 0) {
count++; count++;
userstats->hashrate += DATA_USERSTATS(next)->hashrate; userstats->hashrate += DATA_USERSTATS(next)->hashrate;
@ -6681,9 +6696,10 @@ static void summarise_userstats()
userstats->hashrate24hr += DATA_USERSTATS(next)->hashrate24hr; userstats->hashrate24hr += DATA_USERSTATS(next)->hashrate24hr;
if (userstats->elapsed > DATA_USERSTATS(next)->elapsed) if (userstats->elapsed > DATA_USERSTATS(next)->elapsed)
userstats->elapsed = DATA_USERSTATS(next)->elapsed; userstats->elapsed = DATA_USERSTATS(next)->elapsed;
userstats->summarycount += DATA_USERSTATS(next)->summarycount;
remove_from_ktree(userstats_root, next, cmp_userstats, ctx2); userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats, ctx2);
remove_from_ktree(userstats_statsdate_root, next, cmp_userstats_statsdate, ctx2); userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next, cmp_userstats_statsdate, ctx2);
k_unlink_item(userstats_store, next); k_unlink_item(userstats_store, next);
k_add_head(userstats_summ, next); k_add_head(userstats_summ, next);
} }
@ -6717,20 +6733,22 @@ static void summarise_userstats()
// TODO: Consider releasing the lock for the DB insert? // TODO: Consider releasing the lock for the DB insert?
if (!userstats_add_db(conn, userstats)) { if (!userstats_add_db(conn, userstats)) {
// Put them back and cancel the summarisation /* This should only happen if a restart finds data
tmp = userstats_summ->head; that wasn't found during the reload but is in
while (tmp) { the same timeframe as DB data
add_to_ktree(userstats_root, tmp, cmp_userstats); i.e. it shouldn't happen, but keep the summary */
add_to_ktree(userstats_statsdate_root, tmp, cmp_userstats_statsdate); when.tv_sec -= USERSTATS_DB_S;
tmp = tmp->next; tv_to_buf(&when, tvbuf1, sizeof(tvbuf1));
} tv_to_buf(&(userstats->statsdate), tvbuf2, sizeof(tvbuf2));
k_list_transfer_to_tail(userstats_summ, userstats_store); snprintf(error, sizeof(error),
break; "Userid %"PRId64" %d userstats records discarded "
"from %s to %s",
userstats->userid, count, tvbuf1, tvbuf2);
} }
k_list_transfer_to_tail(userstats_summ, userstats_list); k_list_transfer_to_tail(userstats_summ, userstats_list);
add_to_ktree(userstats_root, new, cmp_userstats); userstats_root = add_to_ktree(userstats_root, new, cmp_userstats);
add_to_ktree(userstats_statsdate_root, new, cmp_userstats_statsdate); userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new, cmp_userstats_statsdate);
if (upgrade) if (upgrade)
K_WUNLOCK(userstats_list); K_WUNLOCK(userstats_list);
@ -6738,6 +6756,8 @@ static void summarise_userstats()
K_IUNLOCK(userstats_list); K_IUNLOCK(userstats_list);
upgrade = false; upgrade = false;
locked = false; locked = false;
if (error[0])
LOGERR(error);
} }
if (locked) { if (locked) {
@ -6839,7 +6859,7 @@ static void *listener(void *arg)
* and we get the same request repeatedly, this will * and we get the same request repeatedly, this will
* reduce the load - thus always send the same reply * reduce the load - thus always send the same reply
* Pool: must not process it, must send back the same reply * Pool: must not process it, must send back the same reply
* TODO: remember last message/reply per source * TODO: remember last message+reply per source
*/ */
if (last_msg && strcmp(last_msg, buf) == 0) { if (last_msg && strcmp(last_msg, buf) == 0) {
dup = true; dup = true;

Loading…
Cancel
Save