Browse Source

ckdb - add summary of shares/markers for the pool

master
kanoi 10 years ago
parent
commit
12a3fc5130
  1. 19
      src/ckdb.c
  2. 32
      src/ckdb.h
  3. 2
      src/ckdb_cmd.c
  4. 33
      src/ckdb_data.c
  5. 379
      src/ckdb_dbio.c

19
src/ckdb.c

@ -215,6 +215,9 @@ bool confirm_sharesummary;
/* Optional workinfoid range -Y to supply when confirming sharesummaries
* N.B. if you specify -Y it will enable -y, so -y isn't also required
*
* TODO: update to include markersummaries
* -Y/-y isn't currently usable since it won't work without the update
*
* Default (NULL) is to confirm all aged sharesummaries
* Default should normally be used every time
* The below options are mainly for debugging or
@ -394,6 +397,9 @@ K_TREE *sharesummary_root;
K_TREE *sharesummary_workinfoid_root;
K_LIST *sharesummary_free;
K_STORE *sharesummary_store;
// Pool total sharesummary stats
K_TREE *sharesummary_pool_root;
K_STORE *sharesummary_pool_store;
// BLOCKS block.id.json={...}
const char *blocks_new = "New";
@ -454,6 +460,9 @@ K_TREE *markersummary_root;
K_TREE *markersummary_userid_root;
K_LIST *markersummary_free;
K_STORE *markersummary_store;
// Pool total markersummary stats
K_TREE *markersummary_pool_root;
K_STORE *markersummary_pool_store;
// WORKMARKERS
K_TREE *workmarkers_root;
@ -1016,6 +1025,8 @@ static void alloc_storage()
sharesummary_root = new_ktree();
sharesummary_workinfoid_root = new_ktree();
sharesummary_free->dsp_func = dsp_sharesummary;
sharesummary_pool_store = k_new_store(sharesummary_free);
sharesummary_pool_root = new_ktree();
blocks_free = k_new_list("Blocks", sizeof(BLOCKS),
ALLOC_BLOCKS, LIMIT_BLOCKS, true);
@ -1062,6 +1073,8 @@ static void alloc_storage()
markersummary_root = new_ktree();
markersummary_userid_root = new_ktree();
markersummary_free->dsp_func = dsp_markersummary;
markersummary_pool_store = k_new_store(markersummary_free);
markersummary_pool_root = new_ktree();
workmarkers_free = k_new_list("WorkMarkers", sizeof(WORKMARKERS),
ALLOC_WORKMARKERS, LIMIT_WORKMARKERS, true);
@ -1134,6 +1147,9 @@ static void dealloc_storage()
LOGWARNING("%s() markersummary ...", __func__);
FREE_TREE(markersummary_pool);
k_list_transfer_to_tail(markersummary_pool_store, markersummary_store);
FREE_STORE(markersummary_pool);
FREE_TREE(markersummary_userid);
FREE_TREE(markersummary);
FREE_STORE_DATA(markersummary);
@ -1159,6 +1175,9 @@ static void dealloc_storage()
LOGWARNING("%s() sharesummary ...", __func__);
FREE_TREE(sharesummary_pool);
k_list_transfer_to_tail(sharesummary_pool_store, sharesummary_store);
FREE_STORE(sharesummary_pool);
FREE_TREE(sharesummary_workinfoid);
FREE_TREE(sharesummary);
FREE_STORE_DATA(sharesummary);

32
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.052"
#define CKDB_VERSION DB_VERSION"-1.060"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1135,6 +1135,9 @@ extern K_TREE *sharesummary_root;
extern K_TREE *sharesummary_workinfoid_root;
extern K_LIST *sharesummary_free;
extern K_STORE *sharesummary_store;
// Pool total sharesummary stats
extern K_TREE *sharesummary_pool_root;
extern K_STORE *sharesummary_pool_store;
// BLOCKS block.id.json={...}
typedef struct blocks {
@ -1537,6 +1540,9 @@ extern K_TREE *markersummary_root;
extern K_TREE *markersummary_userid_root;
extern K_LIST *markersummary_free;
extern K_STORE *markersummary_store;
// Pool total markersummary stats
extern K_TREE *markersummary_pool_root;
extern K_STORE *markersummary_pool_store;
// WORKMARKERS
typedef struct workmarkers {
@ -1832,8 +1838,16 @@ extern void dsp_sharesummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b);
extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff);
extern K_ITEM *find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid);
#define find_sharesummary(_userid, _workername, _workinfoid) \
_find_sharesummary(_userid, _workername, _workinfoid, false)
#define find_sharesummary_p(_workinfoid) \
_find_sharesummary(KANO, EMPTY, _workinfoid, true)
#define POOL_SS(_row) do { \
_row->userid = KANO; \
_row->workername = strdup(EMPTY); \
} while (0)
extern K_ITEM *_find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid, bool pool);
extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername);
extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd);
@ -1876,8 +1890,16 @@ extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername,
K_TREE_CTX *ctx);
extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid,
char *workername);
#define find_markersummary(_workinfoid, _userid, _workername) \
_find_markersummary(0, _workinfoid, _userid, _workername, false)
#define find_markersummary_p(_markerid) \
_find_markersummary(_markerid, 0, KANO, EMPTY, true)
#define POOL_MS(_row) do { \
_row->userid = KANO; \
_row->workername = strdup(EMPTY); \
} while (0)
extern K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
int64_t userid, char *workername, bool pool);
extern bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
extern void dsp_workmarkers(K_ITEM *item, FILE *stream);

2
src/ckdb_cmd.c

@ -5158,8 +5158,10 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(shares, 2, 1);
// Trees don't share items so count as 1 tree
USEINFO(shareerrors, 2, 1);
// _pool doesn't share items so is included
USEINFO(sharesummary, 1, 2);
USEINFO(workmarkers, 1, 2);
// _pool doesn't share items so is included
USEINFO(markersummary, 1, 2);
USEINFO(marks, 1, 1);
USEINFO(blocks, 1, 1);

33
src/ckdb_data.c

@ -1943,7 +1943,7 @@ void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff)
row->complete[1] = '\0';
}
K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid)
K_ITEM *_find_sharesummary(int64_t userid, char *workername, int64_t workinfoid, bool pool)
{
SHARESUMMARY sharesummary;
K_TREE_CTX ctx[1];
@ -1955,7 +1955,13 @@ K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid)
INIT_SHARESUMMARY(&look);
look.data = (void *)(&sharesummary);
return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx);
if (pool) {
return find_in_ktree(sharesummary_pool_root, &look,
cmp_sharesummary, ctx);
} else {
return find_in_ktree(sharesummary_root, &look,
cmp_sharesummary, ctx);
}
}
K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
@ -3719,23 +3725,40 @@ K_ITEM *find_markersummary_userid(int64_t userid, char *workername,
return ms_item;
}
K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername)
K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
int64_t userid, char *workername, bool pool)
{
K_ITEM look, *wm_item, *ms_item = NULL;
MARKERSUMMARY markersummary;
WORKMARKERS *wm;
K_TREE_CTX ctx[1];
if (markerid == 0) {
wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED);
if (wm_item) {
DATA_WORKMARKERS(wm, wm_item);
markersummary.markerid = wm->markerid;
markerid = wm->markerid;
}
} else {
wm_item = find_workmarkerid(markerid, false, MARKER_PROCESSED);
if (!wm_item)
markerid = 0;
}
if (markerid != 0) {
markersummary.markerid = markerid;
markersummary.userid = userid;
markersummary.workername = workername;
INIT_MARKERSUMMARY(&look);
look.data = (void *)(&markersummary);
ms_item = find_in_ktree(markersummary_root, &look, cmp_markersummary, ctx);
if (pool) {
ms_item = find_in_ktree(markersummary_pool_root, &look,
cmp_markersummary, ctx);
} else {
ms_item = find_in_ktree(markersummary_root, &look,
cmp_markersummary, ctx);
}
}
return ms_item;

379
src/ckdb_dbio.c

@ -539,6 +539,7 @@ K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress,
K_WUNLOCK(users_free);
DATA_USERS(row, item);
bzero(row, sizeof(*row));
STRNCPY(row->username, username);
username_trim(row);
@ -682,6 +683,7 @@ bool users_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(users_free);
DATA_USERS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -891,6 +893,7 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname,
item = k_unlink_head(useratts_free);
K_WUNLOCK(useratts_free);
DATA_USERATTS(row, item);
bzero(row, sizeof(*row));
K_RLOCK(users_free);
u_item = find_users(username);
@ -1061,6 +1064,7 @@ bool useratts_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(useratts_free);
DATA_USERATTS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -1436,6 +1440,7 @@ bool workers_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(workers_free);
DATA_WORKERS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -1787,6 +1792,7 @@ bool paymentaddresses_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(paymentaddresses_free);
DATA_PAYMENTADDRESSES(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -2022,6 +2028,7 @@ bool payments_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(payments_free);
DATA_PAYMENTS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -2318,6 +2325,7 @@ K_ITEM *optioncontrol_add(PGconn *conn, char *optionname, char *optionvalue,
K_WUNLOCK(optioncontrol_free);
DATA_OPTIONCONTROL(row, item);
bzero(row, sizeof(*row));
STRNCPY(row->optionname, optionname);
row->optionvalue = strdup(optionvalue);
@ -2386,6 +2394,7 @@ bool optioncontrol_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(optioncontrol_free);
DATA_OPTIONCONTROL(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -2401,6 +2410,7 @@ bool optioncontrol_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_BLOB("optionvalue", field, row->optionvalue);
LIST_MEM_ADD(optioncontrol_free, row->optionvalue);
PQ_GET_FLD(res, i, "activationdate", field, ok);
if (!ok)
@ -2472,15 +2482,18 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance,
K_WUNLOCK(workinfo_free);
DATA_WORKINFO(row, item);
bzero(row, sizeof(*row));
TXT_TO_BIGINT("workinfoid", workinfoidstr, row->workinfoid);
STRNCPY(row->poolinstance, poolinstance);
row->transactiontree = strdup(transactiontree);
if (!(row->transactiontree))
quithere(1, "malloc (%d) OOM", (int)strlen(transactiontree));
LIST_MEM_ADD(workinfo_free, row->transactiontree);
row->merklehash = strdup(merklehash);
if (!(row->merklehash))
quithere(1, "malloc (%d) OOM", (int)strlen(merklehash));
LIST_MEM_ADD(workinfo_free, row->merklehash);
STRNCPY(row->prevhash, prevhash);
STRNCPY(row->coinbase1, coinbase1);
STRNCPY(row->coinbase2, coinbase2);
@ -2495,7 +2508,9 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance,
K_WLOCK(workinfo_free);
if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) {
LIST_MEM_SUB(workinfo_free, row->transactiontree);
FREENULL(row->transactiontree);
LIST_MEM_SUB(workinfo_free, row->merklehash);
FREENULL(row->merklehash);
workinfoid = row->workinfoid;
k_add_head(workinfo_free, item);
@ -2558,14 +2573,18 @@ unparam:
K_WLOCK(workinfo_free);
if (workinfoid == -1) {
LIST_MEM_SUB(workinfo_free, row->transactiontree);
FREENULL(row->transactiontree);
LIST_MEM_SUB(workinfo_free, row->merklehash);
FREENULL(row->merklehash);
k_add_head(workinfo_free, item);
} else {
if (row->transactiontree && *(row->transactiontree)) {
// Not currently needed in RAM
LIST_MEM_SUB(workinfo_free, row->transactiontree);
free(row->transactiontree);
row->transactiontree = strdup(EMPTY);
LIST_MEM_ADD(workinfo_free, row->transactiontree);
}
hex2bin(ndiffbin, row->bits, 4);
@ -2660,10 +2679,11 @@ bool workinfo_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(workinfo_free);
//K_WLOCK(workinfo_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(workinfo_free);
DATA_WORKINFO(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -2687,11 +2707,13 @@ bool workinfo_fill(PGconn *conn)
TXT_TO_BLOB("transactiontree", field, row->transactiontree);
*/
row->transactiontree = strdup(EMPTY);
LIST_MEM_ADD(workinfo_free, row->transactiontree);
PQ_GET_FLD(res, i, "merklehash", field, ok);
if (!ok)
break;
TXT_TO_BLOB("merklehash", field, row->merklehash);
LIST_MEM_ADD(workinfo_free, row->merklehash);
PQ_GET_FLD(res, i, "prevhash", field, ok);
if (!ok)
@ -2743,10 +2765,13 @@ bool workinfo_fill(PGconn *conn)
tick();
}
if (!ok)
if (!ok) {
//FREENULL(row->transactiontree);
FREENULL(row->merklehash);
k_add_head(workinfo_free, item);
}
K_WUNLOCK(workinfo_free);
//K_WUNLOCK(workinfo_free);
PQclear(res);
if (ok) {
@ -2966,6 +2991,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
K_WUNLOCK(shares_free);
DATA_SHARES(shares, s_item);
bzero(shares, sizeof(*shares));
K_RLOCK(users_free);
u_item = find_users(username);
@ -3273,6 +3299,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
K_WUNLOCK(shareerrors_free);
DATA_SHAREERRORS(shareerrors, s_item);
bzero(shareerrors, sizeof(*shareerrors));
K_RLOCK(users_free);
u_item = find_users(username);
@ -3366,6 +3393,30 @@ bool shareerrors_fill()
return true;
}
static void markersummary_to_pool(MARKERSUMMARY *p_row, MARKERSUMMARY *row)
{
p_row->diffacc += row->diffacc;
p_row->diffsta += row->diffsta;
p_row->diffdup += row->diffdup;
p_row->diffhi += row->diffhi;
p_row->diffrej += row->diffrej;
p_row->shareacc += row->shareacc;
p_row->sharesta += row->sharesta;
p_row->sharedup += row->sharedup;
p_row->sharehi += row->sharehi;
p_row->sharerej += row->sharerej;
p_row->sharecount += row->sharecount;
p_row->errorcount += row->errorcount;
if (!p_row->firstshare.tv_sec ||
!tv_newer(&(p_row->firstshare), &(row->firstshare))) {
copy_tv(&(p_row->firstshare), &(row->firstshare));
}
if (tv_newer(&(p_row->lastshare), &(row->lastshare))) {
copy_tv(&(p_row->lastshare), &(row->lastshare));
p_row->lastdiffacc = row->lastdiffacc;
}
}
/* TODO: what to do about a failure?
* since it will repeat every ~13s
* Of course manual intervention is possible via cmd_marks,
@ -3385,8 +3436,9 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
PGresult *res;
K_TREE_CTX ss_ctx[1], ms_ctx[1];
SHARESUMMARY *sharesummary, looksharesummary;
MARKERSUMMARY *markersummary, lookmarkersummary;
MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary;
K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look;
K_ITEM *p_ss_item, *p_ms_item;
bool ok = false, conned = false;
int64_t diffacc, shareacc;
char *reason = NULL, *tuples = NULL;
@ -3631,13 +3683,31 @@ flail:
K_WLOCK(markersummary_free);
ms_item = new_markersummary_store->head;
while (ms_item) {
// Move the new markersummaries into the trees/stores
// move the new markersummaries into the trees/stores
markersummary_root = add_to_ktree(markersummary_root,
ms_item,
cmp_markersummary);
markersummary_userid_root = add_to_ktree(markersummary_userid_root,
ms_item,
cmp_markersummary_userid);
// create/update the pool markersummaries
DATA_MARKERSUMMARY(markersummary, ms_item);
p_ms_item = find_markersummary_p(markersummary->markerid);
if (!p_ms_item) {
p_ms_item = k_unlink_head(markersummary_free);
DATA_MARKERSUMMARY(p_markersummary, p_ms_item);
bzero(p_markersummary, sizeof(*p_markersummary));
p_markersummary->markerid = markersummary->markerid;
POOL_MS(p_markersummary);
LIST_MEM_ADD(markersummary_free, p_markersummary->workername);
markersummary_pool_root = add_to_ktree(markersummary_pool_root,
p_ms_item,
cmp_markersummary);
k_add_head(markersummary_pool_store, p_ms_item);
}
markersummary_to_pool(p_markersummary, markersummary);
ms_item = ms_item->next;
}
k_list_transfer_to_head(new_markersummary_store, markersummary_store);
@ -3653,8 +3723,20 @@ flail:
sharesummary_workinfoid_root = remove_from_ktree(sharesummary_workinfoid_root,
ss_item,
cmp_sharesummary_workinfoid);
free_sharesummary_data(ss_item);
// remove the pool sharesummaries
DATA_SHARESUMMARY(sharesummary, ss_item);
p_ss_item = find_sharesummary_p(sharesummary->workinfoid);
if (p_ss_item) {
sharesummary_pool_root = remove_from_ktree(sharesummary_pool_root,
p_ss_item,
cmp_sharesummary);
k_unlink_item(sharesummary_pool_store, p_ss_item);
free_sharesummary_data(p_ss_item);
k_add_head(sharesummary_free, p_ss_item);
}
free_sharesummary_data(ss_item);
ss_item = ss_item->next;
}
k_list_transfer_to_head(old_sharesummary_store, sharesummary_free);
@ -3678,6 +3760,87 @@ flail:
return ok;
}
static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row)
{
p_row->diffacc += row->diffacc;
p_row->diffsta += row->diffsta;
p_row->diffdup += row->diffdup;
p_row->diffhi += row->diffhi;
p_row->diffrej += row->diffrej;
p_row->shareacc += row->shareacc;
p_row->sharesta += row->sharesta;
p_row->sharedup += row->sharedup;
p_row->sharehi += row->sharehi;
p_row->sharerej += row->sharerej;
p_row->sharecount += row->sharecount;
p_row->errorcount += row->errorcount;
if (!p_row->firstshare.tv_sec ||
!tv_newer(&(p_row->firstshare), &(row->firstshare))) {
copy_tv(&(p_row->firstshare), &(row->firstshare));
}
if (tv_newer(&(p_row->lastshare), &(row->lastshare))) {
copy_tv(&(p_row->lastshare), &(row->lastshare));
p_row->lastdiffacc = row->lastdiffacc;
}
}
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new,
double *tdf, double *tdl)
{
tv_t *createdate;
double diff;
if (s_row) {
createdate = &(s_row->createdate);
diff = s_row->diff;
} else {
createdate = &(e_row->createdate);
diff = 0;
}
if (new)
zero_sharesummary(row, createdate, diff);
if (s_row) {
row->sharecount += 1;
switch (s_row->errn) {
case SE_NONE:
row->diffacc += s_row->diff;
row->shareacc++;
break;
case SE_STALE:
row->diffsta += s_row->diff;
row->sharesta++;
break;
case SE_DUPE:
row->diffdup += s_row->diff;
row->sharedup++;
break;
case SE_HIGH_DIFF:
row->diffhi += s_row->diff;
row->sharehi++;
break;
default:
row->diffrej += s_row->diff;
row->sharerej++;
break;
}
} else
row->errorcount += 1;
if (!new) {
*tdf = tvdiff(createdate, &(row->firstshare));
if (*tdf < 0.0)
copy_tv(&(row->firstshare), createdate);
*tdl = tvdiff(createdate, &(row->lastshare));
if (*tdl >= 0.0) {
copy_tv(&(row->lastshare), createdate);
row->lastdiffacc = diff;
}
}
}
/* Keep some simple stats on how often shares are out of order
* and how often they produce a WARNING due to OOOLIMIT */
static int64_t ooof0, ooof, oool0, oool;
@ -3701,29 +3864,29 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
ExecStatusType rescode;
PGresult *res = NULL;
WORKMARKERS *wm;
SHARESUMMARY *row;
K_ITEM *item, *wm_item;
SHARESUMMARY *row, *p_row;
K_ITEM *item, *wm_item, *p_item;
char *ins, *upd;
bool ok = false, new;
bool ok = false, new = false, p_new = false;
char *params[19 + MODIFYDATECOUNT];
int n, par = 0;
int64_t userid, workinfoid;
char *workername;
tv_t *sharecreatedate;
tv_t *createdate;
bool must_update = false, conned = false;
double diff = 0;
char *st = NULL, *db = NULL;
char ooo_buf[256];
double tdf, tdl;
LOGDEBUG("%s(): update", __func__);
// this will never be a pool_ summary
if (ss_item) {
if (s_row || e_row) {
quithere(1, "ERR: only one of s_row, e_row and "
"ss_item allowed" WHERE_FFL,
WHERE_FFL_PASS);
}
new = false;
item = ss_item;
DATA_SHARESUMMARY(row, item);
must_update = true;
@ -3739,8 +3902,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
userid = s_row->userid;
workername = s_row->workername;
workinfoid = s_row->workinfoid;
diff = s_row->diff;
sharecreatedate = &(s_row->createdate);
createdate = &(s_row->createdate);
} else {
if (!e_row) {
quithere(1, "ERR: all s_row, e_row and "
@ -3750,7 +3912,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
userid = e_row->userid;
workername = e_row->workername;
workinfoid = e_row->workinfoid;
sharecreatedate = &(e_row->createdate);
createdate = &(e_row->createdate);
}
K_RLOCK(workmarkers_free);
@ -3763,7 +3925,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
" but processed workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, st = safe_text(workername),
db = ctv_to_buf(sharecreatedate, NULL, 0),
db = ctv_to_buf(createdate, NULL, 0),
wm->markerid);
FREENULL(st);
FREENULL(db);
@ -3772,9 +3934,10 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
K_RLOCK(sharesummary_free);
item = find_sharesummary(userid, workername, workinfoid);
p_item = find_sharesummary_p(workinfoid);
K_RUNLOCK(sharesummary_free);
if (item) {
new = false;
DATA_SHARESUMMARY(row, item);
} else {
new = true;
@ -3782,52 +3945,25 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(row, item);
bzero(row, sizeof(*row));
row->userid = userid;
row->workername = strdup(workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
row->workinfoid = workinfoid;
zero_sharesummary(row, sharecreatedate, diff);
row->inserted = false;
row->saveaged = false;
}
if (e_row)
row->errorcount += 1;
else {
row->sharecount += 1;
switch (s_row->errn) {
case SE_NONE:
row->diffacc += s_row->diff;
row->shareacc++;
break;
case SE_STALE:
row->diffsta += s_row->diff;
row->sharesta++;
break;
case SE_DUPE:
row->diffdup += s_row->diff;
row->sharedup++;
break;
case SE_HIGH_DIFF:
row->diffhi += s_row->diff;
row->sharehi++;
break;
default:
row->diffrej += s_row->diff;
row->sharerej++;
break;
}
}
// N.B. this directly updates the non-key data
set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl);
if (!new) {
double td;
td = tvdiff(sharecreatedate, &(row->firstshare));
// don't LOG '=' in case shares come from ckpool with the same timestamp
if (td < 0.0) {
if (tdf < 0.0) {
char *tmp1, *tmp2;
int level = LOG_DEBUG;
// WARNING for shares exceeding the OOOLIMIT but not during startup
if (td < OOOLIMIT) {
if (tdf < OOOLIMIT) {
ooof++;
if (startup_complete)
level = LOG_WARNING;
@ -3836,26 +3972,19 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
LOGMSG(level, "%s(): OoO %s createdate (%s) is < summary"
" firstshare (%s) (%s)",
__func__, s_row ? "shares" : "shareerrors",
(tmp1 = ctv_to_buf(sharecreatedate, NULL, 0)),
(tmp1 = ctv_to_buf(createdate, NULL, 0)),
(tmp2 = ctv_to_buf(&(row->firstshare), NULL, 0)),
ooo_status(ooo_buf, sizeof(ooo_buf)));
free(tmp2);
free(tmp1);
row->firstshare.tv_sec = sharecreatedate->tv_sec;
row->firstshare.tv_usec = sharecreatedate->tv_usec;
// Don't change lastdiffacc
}
td = tvdiff(sharecreatedate, &(row->lastshare));
// don't LOG '=' in case shares come from ckpool with the same timestamp
if (td >= 0.0) {
row->lastshare.tv_sec = sharecreatedate->tv_sec;
row->lastshare.tv_usec = sharecreatedate->tv_usec;
row->lastdiffacc = diff;
} else {
if (tdl < 0.0) {
char *tmp1, *tmp2;
int level = LOG_DEBUG;
// WARNING for shares exceeding the OOOLIMIT but not during startup
if (td < OOOLIMIT) {
if (tdl < OOOLIMIT) {
oool++;
if (startup_complete)
level = LOG_WARNING;
@ -3864,12 +3993,13 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
LOGMSG(level, "%s(): OoO %s createdate (%s) is < summary"
" lastshare (%s) (%s)",
__func__, s_row ? "shares" : "shareerrors",
(tmp1 = ctv_to_buf(sharecreatedate, NULL, 0)),
(tmp1 = ctv_to_buf(createdate, NULL, 0)),
(tmp2 = ctv_to_buf(&(row->lastshare), NULL, 0)),
ooo_status(ooo_buf, sizeof(ooo_buf)));
free(tmp2);
free(tmp1);
}
if (row->complete[0] != SUMMARY_NEW) {
LOGDEBUG("%s(): updating sharesummary not '%c'"
" %"PRId64"/%s/%"PRId64"/%s",
@ -3879,6 +4009,23 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
FREENULL(st);
}
}
// p_items are ram only
if (p_item) {
DATA_SHARESUMMARY(p_row, p_item);
} else {
p_new = true;
K_WLOCK(sharesummary_free);
p_item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
POOL_SS(p_row);
LIST_MEM_ADD(sharesummary_free, p_row->workername);
p_row->workinfoid = workinfoid;
}
set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl);
}
// During startup, don't save 'new' sharesummaries, to reduce DB I/O
@ -4036,13 +4183,21 @@ late:
PQfinish(conn);
// We keep the new item no matter what 'ok' is, since it will be inserted later
if (new) {
if (new || p_new) {
K_WLOCK(sharesummary_free);
if (new) {
sharesummary_root = add_to_ktree(sharesummary_root, item, cmp_sharesummary);
sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root,
item,
cmp_sharesummary_workinfoid);
k_add_head(sharesummary_store, item);
}
if (p_new) {
sharesummary_pool_root = add_to_ktree(sharesummary_pool_root,
p_item,
cmp_sharesummary);
k_add_head(sharesummary_pool_store, p_item);
}
K_WUNLOCK(sharesummary_free);
}
@ -4054,9 +4209,9 @@ bool sharesummary_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ctx[1];
K_ITEM *item, *m_item;
int n, i, par = 0;
SHARESUMMARY *row;
K_ITEM *item, *m_item, *p_item;
int n, i, par = 0, p_n;
SHARESUMMARY *row, *p_row;
MARKS *marks;
char *params[2];
char *field;
@ -4092,7 +4247,6 @@ bool sharesummary_fill(PGconn *conn)
"and pplns calculations may be wrong");
}
// TODO: limit how far back
sel = "select "
"userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
@ -4123,10 +4277,11 @@ bool sharesummary_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
//K_WLOCK(sharesummary_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(row, item);
row->workername = NULL;
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -4143,7 +4298,7 @@ bool sharesummary_fill(PGconn *conn)
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
row->workername = strdup(field);
TXT_TO_PTR("workername", field, row->workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
PQ_GET_FLD(res, i, "workinfoid", field, ok);
@ -4269,22 +4424,39 @@ bool sharesummary_fill(PGconn *conn)
}
}
p_item = find_sharesummary_p(row->workinfoid);
if (!p_item) {
p_item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
POOL_SS(p_row);
LIST_MEM_ADD(sharesummary_free, p_row->workername);
p_row->workinfoid = row->workinfoid;
sharesummary_pool_root = add_to_ktree(sharesummary_pool_root,
p_item,
cmp_sharesummary);
k_add_head(sharesummary_pool_store, p_item);
} else {
DATA_SHARESUMMARY(p_row, p_item);
}
sharesummary_to_pool(p_row, row);
tick();
}
if (!ok) {
DATA_SHARESUMMARY(row, item);
if (row->workername) {
LIST_MEM_SUB(sharesummary_free, row->workername);
FREENULL(row->workername);
}
k_add_head(sharesummary_free, item);
}
p_n = sharesummary_pool_store->count;
//K_WUNLOCK(sharesummary_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d sharesummary records", __func__, n);
LOGWARNING("%s(): created %d sharesummary pool records", __func__, p_n);
}
return ok;
@ -4468,6 +4640,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash,
K_WUNLOCK(blocks_free);
DATA_BLOCKS(row, b_item);
bzero(row, sizeof(*row));
TXT_TO_INT("height", height, row->height);
STRNCPY(row->blockhash, blockhash);
@ -4848,9 +5021,11 @@ bool blocks_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(blocks_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(blocks_free);
DATA_BLOCKS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -4960,6 +5135,7 @@ bool blocks_fill(PGconn *conn)
if (!ok)
k_add_head(blocks_free, item);
K_WUNLOCK(blocks_free);
PQclear(res);
if (ok) {
@ -5137,6 +5313,7 @@ bool miningpayouts_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(miningpayouts_free);
DATA_MININGPAYOUTS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -5377,6 +5554,7 @@ bool payouts_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(payouts_free);
DATA_PAYOUTS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -5447,6 +5625,7 @@ bool payouts_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_BLOB("stats", field, row->stats);
LIST_MEM_ADD(payouts_free, row->stats);
HISTORYDATEFLDS(res, i, row, ok);
if (!ok)
@ -5458,8 +5637,10 @@ bool payouts_fill(PGconn *conn)
tick();
}
if (!ok)
if (!ok) {
FREENULL(row->stats);
k_add_head(payouts_free, item);
}
K_WUNLOCK(payouts_free);
PQclear(res);
@ -5493,6 +5674,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
K_WUNLOCK(auths_free);
DATA_AUTHS(row, a_item);
bzero(row, sizeof(*row));
K_RLOCK(users_free);
u_item = find_users(username);
@ -5605,6 +5787,7 @@ bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
K_WUNLOCK(poolstats_free);
DATA_POOLSTATS(row, p_item);
bzero(row, sizeof(*row));
row->stored = false;
@ -5767,6 +5950,7 @@ bool poolstats_fill(PGconn *conn)
for (i = 0; i < n; i++) {
item = k_unlink_head(poolstats_free);
DATA_POOLSTATS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -5862,6 +6046,7 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
K_WUNLOCK(userstats_free);
DATA_USERSTATS(row, us_item);
bzero(row, sizeof(*row));
STRNCPY(row->poolinstance, poolinstance);
TXT_TO_BIGINT("elapsed", elapsed, row->elapsed);
@ -5970,6 +6155,7 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username,
K_WUNLOCK(userstats_free);
DATA_USERSTATS(row, us_item);
bzero(row, sizeof(*row));
STRNCPY(row->poolinstance, poolinstance);
TXT_TO_BIGINT("elapsed", elapsed, row->elapsed);
@ -6108,9 +6294,9 @@ bool markersummary_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
MARKERSUMMARY *row;
K_ITEM *item, *p_item;
int n, i, p_n;
MARKERSUMMARY *row, *p_row;
char *field;
char *sel;
int fields = 18;
@ -6145,9 +6331,11 @@ bool markersummary_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
//K_WLOCK(markersummary_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(markersummary_free);
DATA_MARKERSUMMARY(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -6168,6 +6356,7 @@ bool markersummary_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_PTR("workername", field, row->workername);
LIST_MEM_ADD(markersummary_free, row->workername);
PQ_GET_FLD(res, i, "diffacc", field, ok);
if (!ok)
@ -6252,16 +6441,41 @@ bool markersummary_fill(PGconn *conn)
markersummary_userid_root = add_to_ktree(markersummary_userid_root, item, cmp_markersummary_userid);
k_add_head(markersummary_store, item);
p_item = find_markersummary_p(row->markerid);
if (!p_item) {
/* N.B. this could be false due to the markerid
* having the wrong status TODO: deal with that? */
p_item = k_unlink_head(markersummary_free);
DATA_MARKERSUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
p_row->markerid = row->markerid;
POOL_MS(p_row);
LIST_MEM_ADD(markersummary_free, p_row->workername);
markersummary_pool_root = add_to_ktree(markersummary_pool_root,
p_item,
cmp_markersummary);
k_add_head(markersummary_pool_store, p_item);
} else {
DATA_MARKERSUMMARY(p_row, p_item);
}
markersummary_to_pool(p_row, row);
tick();
}
if (!ok)
if (!ok) {
FREENULL(row->workername);
k_add_head(markersummary_free, item);
}
p_n = markersummary_pool_store->count;
//K_WUNLOCK(markersummary_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d markersummary records", __func__, n);
LOGWARNING("%s(): created %d markersummary pool records", __func__, p_n);
}
return ok;
@ -6549,9 +6763,11 @@ bool workmarkers_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(workmarkers_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(workmarkers_free);
DATA_WORKMARKERS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -6567,6 +6783,7 @@ bool workmarkers_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_PTR("poolinstance", field, row->poolinstance);
LIST_MEM_ADD(workmarkers_free, row->poolinstance);
PQ_GET_FLD(res, i, "workinfoidend", field, ok);
if (!ok)
@ -6582,6 +6799,7 @@ bool workmarkers_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_PTR("description", field, row->description);
LIST_MEM_ADD(workmarkers_free, row->description);
PQ_GET_FLD(res, i, "status", field, ok);
if (!ok)
@ -6602,6 +6820,7 @@ bool workmarkers_fill(PGconn *conn)
if (!ok)
k_add_head(workmarkers_free, item);
K_WUNLOCK(workmarkers_free);
PQclear(res);
if (ok) {
@ -6848,9 +7067,11 @@ bool marks_fill(PGconn *conn)
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(marks_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(marks_free);
DATA_MARKS(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
@ -6861,6 +7082,7 @@ bool marks_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_PTR("poolinstance", field, row->poolinstance);
LIST_MEM_ADD(marks_free, row->poolinstance);
PQ_GET_FLD(res, i, "workinfoid", field, ok);
if (!ok)
@ -6871,11 +7093,13 @@ bool marks_fill(PGconn *conn)
if (!ok)
break;
TXT_TO_PTR("description", field, row->description);
LIST_MEM_ADD(marks_free, row->description);
PQ_GET_FLD(res, i, "extra", field, ok);
if (!ok)
break;
TXT_TO_PTR("extra", field, row->extra);
LIST_MEM_ADD(marks_free, row->extra);
PQ_GET_FLD(res, i, "marktype", field, ok);
if (!ok)
@ -6899,6 +7123,7 @@ bool marks_fill(PGconn *conn)
if (!ok)
k_add_head(marks_free, item);
K_WUNLOCK(marks_free);
PQclear(res);
if (ok) {

Loading…
Cancel
Save