Browse Source

ckdb - workinfo aging

master
kanoi 11 years ago
parent
commit
7c81e35de9
  1. 242
      src/ckdb.c

242
src/ckdb.c

@ -479,9 +479,10 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$";
#define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1)
// Methods for sharelog (common function for all 3)
#define METHOD_WORKINFO "workinfo"
#define METHOD_SHARES "shares"
#define METHOD_SHAREERRORS "shareerror"
#define STR_WORKINFO "workinfo"
#define STR_SHARES "shares"
#define STR_SHAREERRORS "shareerror"
#define STR_AGEWORKINFO "ageworkinfo"
// TRANSFER
#define NAME_SIZE 63
@ -793,11 +794,10 @@ typedef struct sharesummary {
#define DATA_SHARESUMMARY(_item) ((SHARESUMMARY *)(_item->data))
#define SUMMARY_NEW 'n'
#define SUMMARY_AGED 'a'
#define SUMMARY_CONFIRM 'y'
static K_TREE *sharesummary_root;
/* TODO: for 'complete' flagging i.e.
older items should be flagged if we flag a newer item */
static K_TREE *sharesummary_workinfoid_root;
static K_LIST *sharesummary_list;
static K_STORE *sharesummary_store;
@ -2468,6 +2468,87 @@ unparam:
return workinfoid;
}
static double cmp_shares(K_ITEM *a, K_ITEM *b);
static double cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b);
static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
tv_t *now, char *by, char *code, char *inet);
static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance,
tv_t *now, char *by, char *code, char *inet)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1];
int64_t workinfoid;
SHARESUMMARY sharesummary;
SHARES shares;
bool ok = false, conned = false;
LOGDEBUG("%s(): complete", __func__);
TXT_TO_BIGINT("workinfoid", workinfoidstr, workinfoid);
wi_item = find_workinfo(workinfoid);
if (!wi_item)
goto bye;
if (strcmp(poolinstance, DATA_WORKINFO(wi_item)->poolinstance) != 0)
goto bye;
// Find the first matching sharesummary
sharesummary.workinfoid = workinfoid;
sharesummary.userid = -1;
sharesummary.workername[0] = '\0';
ok = true;
ss_look.data = (void *)(&sharesummary);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx);
while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == workinfoid) {
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
if (!sharesummary_update(conn, NULL, NULL, ss_item, now, by, code, inet)) {
LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64,
__func__, DATA_SHARESUMMARY(ss_item)->userid,
DATA_SHARESUMMARY(ss_item)->workername,
DATA_SHARESUMMARY(ss_item)->workinfoid);
ok = false;
}
// Discard the shares either way
shares.workinfoid = workinfoid;
shares.userid = DATA_SHARESUMMARY(ss_item)->userid;
strcpy(shares.workername, DATA_SHARESUMMARY(ss_item)->workername);
shares.createdate.tv_sec = 0;
shares.createdate.tv_usec = 0;
s_look.data = (void *)(&shares);
s_item = find_after_in_ktree(shares_root, &s_look, cmp_shares, s_ctx);
K_WLOCK(shares_list);
while (s_item) {
if (DATA_SHARES(s_item)->workinfoid != workinfoid ||
DATA_SHARES(s_item)->userid != shares.userid ||
strcmp(DATA_SHARES(s_item)->workername, shares.workername) != 0)
break;
tmp_item = next_in_ktree(s_ctx);
shares_root = remove_from_ktree(shares_root, s_item, cmp_shares, tmp_ctx);
k_unlink_item(shares_store, s_item);
k_add_head(shares_list, s_item);
s_item = tmp_item;
}
K_WUNLOCK(shares_list);
ss_item = next_in_ktree(ss_ctx);
}
if (conned)
PQfinish(conn);
bye:
return ok;
}
static bool workinfo_fill(PGconn *conn)
{
ExecStatusType rescode;
@ -2611,7 +2692,7 @@ void workinfo_reload()
*/
}
// order by workinfoid asc,userid asc,createdate asc,nonce asc,expirydate desc
// order by workinfoid asc,userid asc,workername asc,createdate asc,nonce asc,expirydate desc
static double cmp_shares(K_ITEM *a, K_ITEM *b)
{
double c = (double)(DATA_SHARES(a)->workinfoid -
@ -2619,6 +2700,9 @@ static double cmp_shares(K_ITEM *a, K_ITEM *b)
if (c == 0) {
c = (double)(DATA_SHARES(a)->userid -
DATA_SHARES(b)->userid);
if (c == 0) {
c = strcmp(DATA_SHARES(a)->workername,
DATA_SHARES(b)->workername);
if (c == 0) {
c = tvdiff(&(DATA_SHARES(a)->createdate),
&(DATA_SHARES(b)->createdate));
@ -2632,12 +2716,10 @@ static double cmp_shares(K_ITEM *a, K_ITEM *b)
}
}
}
}
return c;
}
static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
tv_t *now, char *by, char *code, char *inet);
// Memory (and log file) only
static bool shares_add(char *workinfoid, char *username, char *workername, char *clientid,
char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff,
@ -2684,7 +2766,7 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char
if (!w_item)
goto unitem;
sharesummary_update(NULL, shares, NULL, now, by, code, inet);
sharesummary_update(NULL, shares, NULL, NULL, now, by, code, inet);
ok = true;
unitem:
@ -2771,7 +2853,7 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername,
if (!w_item)
goto unitem;
sharesummary_update(NULL, NULL, shareerrors, now, by, code, inet);
sharesummary_update(NULL, NULL, shareerrors, NULL, now, by, code, inet);
ok = true;
unitem:
@ -2865,10 +2947,7 @@ static K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t worki
return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx);
}
// TODO: discard shares,shareerrors in caller if this returns true?
// or keep them for a specific history so can check the errors?
// or error checking should use the log files?
static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
tv_t *now, char *by, char *code, char *inet)
{
ExecStatusType rescode;
@ -2882,13 +2961,25 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
int64_t userid, workinfoid;
char *workername;
tv_t *sharecreatedate;
bool conned = false;
bool must_update = false, conned = false;
LOGDEBUG("%s(): add", __func__);
LOGDEBUG("%s(): update", __func__);
if (ss_item) {
if (s_row || e_row) {
quithere(1, "ERR: %s() only one of s_row, e_row and ss_item allowed",
__func__);
}
new = false;
item = ss_item;
row = DATA_SHARESUMMARY(item);
must_update = true;
row->complete[0] = SUMMARY_AGED;
row->complete[1] = '\0';
} else {
if (s_row) {
if (e_row) {
quithere(1, "ERR: %s() one of s_row and e_row must be NULL",
quithere(1, "ERR: %s() only one of s_row, e_row (and ss_item) allowed",
__func__);
}
userid = s_row->userid;
@ -2897,7 +2988,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
sharecreatedate = &(s_row->createdate);
} else {
if (!e_row) {
quithere(1, "ERR: %s() both s_row and e_row are NULL",
quithere(1, "ERR: %s() all s_row, e_row and ss_item are NULL",
__func__);
}
userid = e_row->userid;
@ -2993,6 +3084,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
row->workinfoid, row->complete);
}
}
}
if (conn == NULL) {
conn = dbconnect();
@ -3040,10 +3132,18 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
row->countlastupdate = row->sharecount + row->errorcount;
row->inserted = true;
} else {
bool stats_update = false;
MODIFYUPDATE(row, now, by, code, inet);
if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) <
(row->sharecount + row->errorcount)) {
(row->sharecount + row->errorcount))
stats_update = true;
if (must_update && row->countlastupdate < (row->sharecount + row->errorcount))
stats_update = true;
if (stats_update) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
@ -3060,26 +3160,49 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row,
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 19, params);
PARCHKVAL(par, 20, params);
upd = "update sharesummary "
"set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8,"
"shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12,"
"sharerej=$13,firstshare=$14,lastshare=$15"
",modifydate=$16,modifyby=$17,modifycode=$18,modifyinet=$19 "
"sharerej=$13,firstshare=$14,lastshare=$15,complete=$16"
",modifydate=$17,modifyby=$18,modifycode=$19,modifyinet=$20 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
PGLOGERR("Update", rescode, conn);
goto unparam;
}
row->countlastupdate = row->sharecount + row->errorcount;
} else {
if (!must_update) {
ok = true;
goto late;
} else {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 8, params);
upd = "update sharesummary "
"set complete=$4,modifydate=$5,modifyby=$6,modifycode=$7,modifyinet=$8 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("MustUpdate", rescode, conn);
goto unparam;
}
row->countlastupdate = row->sharecount + row->errorcount;
}
}
}
@ -3092,7 +3215,7 @@ late:
if (conned)
PQfinish(conn);
// We keep the new item anyway since it can be inserted next time
// We keep the new item no matter what 'ok' is, since it will be inserted later
K_WLOCK(sharesummary_list);
if (new) {
sharesummary_root = add_to_ktree(sharesummary_root, item, cmp_sharesummary);
@ -4798,7 +4921,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
__maybe_unused char *code, __maybe_unused char *inet)
{
K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item;
K_TREE_CTX wctx[1], usctx[1];
K_TREE_CTX w_ctx[1], us_ctx[1];
WORKERS workers;
USERSTATS userstats;
char reply[1024] = "";
@ -4830,7 +4953,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
workers.expirydate.tv_sec = 0;
workers.expirydate.tv_usec = 0;
wlook.data = (void *)(&workers);
w_item = find_after_in_ktree(workers_root, &wlook, cmp_workers, wctx);
w_item = find_after_in_ktree(workers_root, &wlook, cmp_workers, w_ctx);
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok.");
rows = 0;
@ -4854,7 +4977,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
if (stats) {
K_TREE *userstats_workername_root = new_ktree();
K_TREE_CTX uswctx[1];
K_TREE_CTX usw_ctx[1];
double w_hashrate5m, w_hashrate1hr;
int64_t w_elapsed;
tv_t w_lastshare;
@ -4871,7 +4994,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
userstats.poolinstance[0] = '\0';
userstats.workername[0] = '\0';
uslook.data = (void *)(&userstats);
us_item = find_before_in_ktree(userstats_root, &uslook, cmp_userstats, usctx);
us_item = find_before_in_ktree(userstats_root, &uslook, cmp_userstats, us_ctx);
while (us_item && DATA_USERSTATS(us_item)->userid == userstats.userid) {
if (strcmp(DATA_USERSTATS(us_item)->workername, DATA_WORKERS(w_item)->workername) == 0) {
// first found is the newest share
@ -4880,7 +5003,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
if (tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < USERSTATS_PER_S) {
// TODO: add together the latest per pool instance (this is the latest per worker)
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, uswctx)) {
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx)) {
w_hashrate5m += DATA_USERSTATS(us_item)->hashrate5m;
w_hashrate1hr += DATA_USERSTATS(us_item)->hashrate1hr;
if (w_elapsed == -1 || w_elapsed > DATA_USERSTATS(us_item)->elapsed)
@ -4894,7 +5017,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
break;
}
us_item = prev_in_ktree(usctx);
us_item = prev_in_ktree(us_ctx);
}
double_to_buf(w_hashrate5m, reply, sizeof(reply));
@ -4918,7 +5041,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
rows++;
}
w_item = next_in_ktree(wctx);
w_item = next_in_ktree(w_ctx);
}
snprintf(tmp, sizeof(tmp), "rows=%d", rows);
APPEND_REALLOC(buf, off, len, tmp);
@ -4932,7 +5055,7 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
{
K_TREE *userstats_workername_root = new_ktree();
K_ITEM *us_item, *usw_item, *tmp_item, *u_item;
K_TREE_CTX usctx[1], uswctx[1];
K_TREE_CTX us_ctx[1], usw_ctx[1];
char reply[1024] = "";
char tmp[1024];
char *buf;
@ -4945,9 +5068,9 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
// Find last records for each user/worker in ALLUSERS_LIMIT_S
// TODO: include pool_instance
us_item = last_in_ktree(userstats_root, usctx);
us_item = last_in_ktree(userstats_root, us_ctx);
while (us_item && tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < ALLUSERS_LIMIT_S) {
usw_item = find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, uswctx);
usw_item = find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, usw_ctx);
if (!usw_item) {
K_WLOCK(userstats_list);
usw_item = k_unlink_head(userstats_list);
@ -4966,7 +5089,7 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
APPEND_REALLOC(buf, off, len, "ok.");
rows = 0;
// Add up per user
usw_item = first_in_ktree(userstats_workername_root, uswctx);
usw_item = first_in_ktree(userstats_workername_root, usw_ctx);
while (usw_item) {
if (DATA_USERSTATS(usw_item)->userid != userid) {
if (userid != -1) {
@ -4996,7 +5119,7 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
u_hashrate1hr += DATA_USERSTATS(usw_item)->hashrate1hr;
tmp_item = usw_item;
usw_item = tmp_item->next;
usw_item = next_in_ktree(usw_ctx);
K_WLOCK(userstats_list);
k_add_head(userstats_list, tmp_item);
@ -5043,7 +5166,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code,
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
if (strcasecmp(cmd, METHOD_WORKINFO) == 0) {
if (strcasecmp(cmd, STR_WORKINFO) == 0) {
K_ITEM *i_workinfoid, *i_poolinstance, *i_transactiontree, *i_merklehash;
K_ITEM *i_prevhash, *i_coinbase1, *i_coinbase2, *i_version, *i_bits;
K_ITEM *i_ntime, *i_reward;
@ -5115,7 +5238,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code,
LOGDEBUG("%s.ok.added %"PRId64, id, workinfoid);
snprintf(reply, siz, "ok.%"PRId64, workinfoid);
return strdup(reply);
} else if (strcasecmp(cmd, METHOD_SHARES) == 0) {
} else if (strcasecmp(cmd, STR_SHARES) == 0) {
K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_enonce1;
K_ITEM *i_nonce2, *i_nonce, *i_diff, *i_sdiff, *i_secondaryuserid;
bool ok;
@ -5179,7 +5302,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code,
LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_nonce)->data);
snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_nonce)->data);
return strdup(reply);
} else if (strcasecmp(cmd, METHOD_SHAREERRORS) == 0) {
} else if (strcasecmp(cmd, STR_SHAREERRORS) == 0) {
K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_errn;
K_ITEM *i_error, *i_secondaryuserid;
bool ok;
@ -5226,6 +5349,34 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code,
}
LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_username)->data);
snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_username)->data);
return strdup(reply);
} else if (strcasecmp(cmd, STR_AGEWORKINFO) == 0) {
K_ITEM *i_workinfoid, *i_poolinstance;
bool ok;
i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz);
if (!i_workinfoid)
return strdup(reply);
i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz);
if (!i_poolinstance)
return strdup(reply);
ok = workinfo_age(NULL, DATA_TRANSFER(i_workinfoid)->data,
DATA_TRANSFER(i_poolinstance)->data,
now, by, code, inet);
if (!ok) {
LOGERR("%s.failed.DATA", id);
return strdup("failed.DATA");
}
LOGDEBUG("%s.ok.aged %.*s",
id, BIGINT_BUFSIZ,
DATA_TRANSFER(i_workinfoid)->data);
snprintf(reply, siz, "ok.%.*s",
BIGINT_BUFSIZ,
DATA_TRANSFER(i_workinfoid)->data);
return strdup(reply);
}
LOGERR("%s.bad.cmd %s", cmd);
@ -5398,7 +5549,7 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
char reply[1024], tmp[1024], *buf;
USERSTATS userstats;
int64_t u_elapsed;
K_TREE_CTX ctx[1], wctx[1];
K_TREE_CTX ctx[1], w_ctx[1];
size_t len, off;
bool has_uhr;
@ -5483,7 +5634,7 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
DATA_USERSTATS(us_item)->userid == userstats.userid &&
tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < USERSTATS_PER_S) {
// TODO: add the latest per pool instance (this is the latest per worker)
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, wctx)) {
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, w_ctx)) {
u_hashrate5m += DATA_USERSTATS(us_item)->hashrate5m;
u_hashrate1hr += DATA_USERSTATS(us_item)->hashrate1hr;
if (u_elapsed == -1 ||
@ -5586,9 +5737,10 @@ static struct CMDS {
} cmds[] = {
{ CMD_SHUTDOWN, "shutdown", NULL, ACCESS_SYSTEM },
{ CMD_PING, "ping", NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB },
{ CMD_SHARELOG, "workinfo", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shares", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shareerrors", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, STR_WORKINFO, cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, STR_SHARES, cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, STR_SHAREERRORS, cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, STR_AGEWORKINFO, cmd_sharelog, ACCESS_POOL },
{ CMD_AUTH, "authorise", cmd_auth, ACCESS_POOL },
{ CMD_ADDUSER, "adduser", cmd_adduser, ACCESS_WEB },
{ CMD_CHKPASS, "chkpass", cmd_chkpass, ACCESS_WEB },

Loading…
Cancel
Save