Browse Source

ckdb/sql - include markersummary in share calculations and alter the sharesummary index for fast marker processing - db 0.9.4

master
kanoi 10 years ago
parent
commit
d09c7f9c6b
  1. 4
      sql/ckdb.sql
  2. 1
      sql/v0.9.2-v0.9.3.sql
  3. 113
      src/ckdb.c
  4. 17
      src/ckdb.h
  5. 17
      src/ckdb_cmd.c
  6. 304
      src/ckdb_data.c
  7. 46
      src/ckdb_dbio.c

4
sql/ckdb.sql

@ -247,7 +247,7 @@ CREATE TABLE sharesummary ( -- per workinfo for each user+worker
modifyby character varying(64) NOT NULL,
modifycode character varying(128) NOT NULL,
modifyinet character varying(128) NOT NULL,
PRIMARY KEY (userid, workername, workinfoid)
PRIMARY KEY (workinfoid, userid, workername)
);
@ -417,4 +417,4 @@ CREATE TABLE version (
PRIMARY KEY (vlock)
);
insert into version (vlock,version) values (1,'0.9.3');
insert into version (vlock,version) values (1,'0.9.4');

1
sql/v0.9.2-v0.9.3.sql

@ -68,5 +68,4 @@ CREATE TABLE markersummary ( -- sum of sharesummary for a workinfo range
PRIMARY KEY (markerid, userid, workername)
);
END transaction;

113
src/ckdb.c

@ -40,8 +40,8 @@
* This error would be very rare and also not an issue
* To avoid this, we start the ckpool message queue after loading
* the users, auths, idcontrol and workers DB tables, before loading the
* much larger sharesummary, workinfo, userstats and poolstats DB tables
* so that ckdb is effectively ready for messages almost immediately
* much larger DB tables so that ckdb is effectively ready for messages
* almost immediately
* The first ckpool message also allows us to know where ckpool is up to
* in the CCLs and thus where to stop processing the CCLs to stay in
* sync with ckpool
@ -121,7 +121,7 @@
* Tables that are/will be written straight to the DB, so are OK:
* users, useraccounts, paymentaddresses, payments,
* accountadjustment, optioncontrol, miningpayouts,
* eventlog
* eventlog, workmarkers, markersummary
*
* The code deals with the issue of 'now' when reloading by:
* createdate is considered 'now' for all data during a reload and is
@ -148,6 +148,8 @@
* 3) ageworkinfo records are also handled by the shares date
* while processing, any records already aged are not updated
* and a warning is displayed if there were any matching shares
* Any ageworkinfos that match a workmarker are ignored with an error
* message
*/
static bool socketer_using_data;
@ -437,11 +439,13 @@ K_STORE *workerstatus_store;
// MARKERSUMMARY
K_TREE *markersummary_root;
K_TREE *markersummary_userid_root;
K_LIST *markersummary_free;
K_STORE *markersummary_store;
// WORKMARKERS
K_TREE *workmarkers_root;
K_TREE *workmarkers_workinfoid_root;
K_LIST *workmarkers_free;
K_STORE *workmarkers_store;
@ -1017,11 +1021,15 @@ static void alloc_storage()
ALLOC_MARKERSUMMARY, LIMIT_MARKERSUMMARY, true);
markersummary_store = k_new_store(markersummary_free);
markersummary_root = new_ktree();
markersummary_userid_root = new_ktree();
markersummary_free->dsp_func = dsp_markersummary;
workmarkers_free = k_new_list("WorkMarkers", sizeof(WORKMARKERS),
ALLOC_WORKMARKERS, LIMIT_WORKMARKERS, true);
workmarkers_store = k_new_store(workmarkers_free);
workmarkers_root = new_ktree();
workmarkers_workinfoid_root = new_ktree();
workmarkers_free->dsp_func = dsp_workmarkers;
}
static void free_workinfo_data(K_ITEM *item)
@ -1123,10 +1131,12 @@ static void dealloc_storage()
{
FREE_LISTS(logqueue);
FREE_TREE(workmarkers_workinfoid);
FREE_TREE(workmarkers);
FREE_STORE_DATA(workmarkers);
FREE_LIST_DATA(workmarkers);
FREE_TREE(markersummary_userid);
FREE_TREE(markersummary);
FREE_STORE_DATA(markersummary);
FREE_LIST_DATA(markersummary);
@ -1499,15 +1509,18 @@ static void check_blocks()
static void summarise_blocks()
{
K_ITEM *b_item, *b_prev, *wi_item, ss_look, *ss_item;
K_TREE_CTX ctx[1], ss_ctx[1];
K_ITEM wm_look, *wm_item, ms_look, *ms_item;
K_TREE_CTX ctx[1], ss_ctx[1], ms_ctx[1];
double diffacc, diffinv, shareacc, shareinv;
tv_t now, elapsed_start, elapsed_finish;
int64_t elapsed, wi_start, wi_finish;
BLOCKS *blocks, *prev_blocks;
WORKINFO *prev_workinfo;
SHARESUMMARY looksharesummary, *sharesummary;
WORKMARKERS lookworkmarkers, *workmarkers;
MARKERSUMMARY lookmarkersummary, *markersummary;
bool has_ss = false, has_ms = false, ok;
int32_t hi, prev_hi;
bool ok;
setnow(&now);
@ -1576,26 +1589,24 @@ static void summarise_blocks()
looksharesummary.workername[0] = '\0';
INIT_SHARESUMMARY(&ss_look);
ss_look.data = (void *)(&looksharesummary);
// For now, just lock all 3
K_RLOCK(sharesummary_free);
K_RLOCK(workmarkers_free);
K_RLOCK(markersummary_free);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look,
cmp_sharesummary_workinfoid, ss_ctx);
if (!ss_item) {
K_RUNLOCK(sharesummary_free);
// This will repeat each call here until fixed ...
LOGERR("%s() block %d, prev %d no sharesummaries "
"on or before %"PRId64,
__func__, blocks->height,
prev_hi, wi_finish);
return;
}
DATA_SHARESUMMARY(sharesummary, ss_item);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid > wi_start) {
if (sharesummary->complete[0] == SUMMARY_NEW) {
// Not aged yet
K_RUNLOCK(markersummary_free);
K_RUNLOCK(workmarkers_free);
K_RUNLOCK(sharesummary_free);
return;
}
has_ss = true;
if (elapsed_start.tv_sec == 0 ||
!tv_newer(&elapsed_start, &(sharesummary->firstshare))) {
copy_tv(&elapsed_start, &(sharesummary->firstshare));
@ -1613,8 +1624,77 @@ static void summarise_blocks()
ss_item = prev_in_ktree(ss_ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
// Add in the workmarkers...markersummaries
lookworkmarkers.expirydate.tv_sec = default_expiry.tv_sec;
lookworkmarkers.expirydate.tv_usec = default_expiry.tv_usec;
lookworkmarkers.workinfoidend = wi_finish+1;
INIT_WORKMARKERS(&wm_look);
wm_look.data = (void *)(&lookworkmarkers);
wm_item = find_before_in_ktree(workmarkers_workinfoid_root, &wm_look,
cmp_workmarkers_workinfoid, ctx);
DATA_WORKMARKERS_NULL(workmarkers, wm_item);
while (wm_item &&
CURRENT(&(workmarkers->expirydate)) &&
workmarkers->workinfoidend > wi_start) {
if (workmarkers->workinfoidstart < wi_start) {
LOGEMERG("%s() workmarkers %"PRId64"/%s/%"PRId64
"/%"PRId64"/%s/%s crosses block "
"%"PRId32"/%"PRId64" boundary",
__func__, workmarkers->markerid,
workmarkers->poolinstance,
workmarkers->workinfoidstart,
workmarkers->workinfoidend,
workmarkers->description,
workmarkers->status, hi, wi_finish);
}
if (WMREADY(workmarkers->status)) {
lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = MAXID;
lookmarkersummary.workername[0] = '\0';
INIT_MARKERSUMMARY(&ms_look);
ms_look.data = (void *)(&lookmarkersummary);
ms_item = find_before_in_ktree(markersummary_root, &ms_look,
cmp_markersummary, ms_ctx);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
while (ms_item && markersummary->markerid == workmarkers->markerid) {
has_ms = true;
if (elapsed_start.tv_sec == 0 ||
!tv_newer(&elapsed_start, &(markersummary->firstshare))) {
copy_tv(&elapsed_start, &(markersummary->firstshare));
}
if (tv_newer(&elapsed_finish, &(markersummary->lastshare)))
copy_tv(&elapsed_finish, &(markersummary->lastshare));
diffacc += markersummary->diffacc;
diffinv += markersummary->diffsta + markersummary->diffdup +
markersummary->diffhi + markersummary-> diffrej;
shareacc += markersummary->shareacc;
shareinv += markersummary->sharesta + markersummary->sharedup +
markersummary->sharehi + markersummary-> sharerej;
ms_item = prev_in_ktree(ms_ctx);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
}
}
wm_item = prev_in_ktree(ctx);
DATA_WORKMARKERS_NULL(workmarkers, wm_item);
}
K_RUNLOCK(markersummary_free);
K_RUNLOCK(workmarkers_free);
K_RUNLOCK(sharesummary_free);
if (!has_ss && !has_ms) {
// This will repeat each call here until fixed ...
LOGERR("%s() block %d, after block %d, no sharesummaries "
"or markersummaries after %"PRId64" up to %"PRId64,
__func__, blocks->height,
prev_hi, wi_start, wi_finish);
return;
}
elapsed = (int64_t)(tvdiff(&elapsed_finish, &elapsed_start) + 0.5);
ok = blocks_stats(NULL, blocks->height, blocks->blockhash,
diffacc, diffinv, shareacc, shareinv, elapsed,
@ -3184,6 +3264,7 @@ static void confirm_reload()
true, false);
}
// TODO: handle workmarkers/markersummaries
static void confirm_summaries()
{
pthread_t log_pt;

17
src/ckdb.h

@ -51,8 +51,8 @@
*/
#define DB_VLOCK "1"
#define DB_VERSION "0.9.3"
#define CKDB_VERSION DB_VERSION"-0.610"
#define DB_VERSION "0.9.4"
#define CKDB_VERSION DB_VERSION"-0.630"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1294,6 +1294,7 @@ typedef struct markersummary {
#define DATA_MARKERSUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, markersummary, false)
extern K_TREE *markersummary_root;
extern K_TREE *markersummary_userid_root;
extern K_LIST *markersummary_free;
extern K_STORE *markersummary_store;
@ -1315,10 +1316,12 @@ typedef struct workmarkers {
#define DATA_WORKMARKERS_NULL(_var, _item) DATA_GENERIC(_var, _item, workmarkers, false)
extern K_TREE *workmarkers_root;
extern K_TREE *workmarkers_workinfoid_root;
extern K_LIST *workmarkers_free;
extern K_STORE *workmarkers_store;
#define MARKER_COMPLETE 'x'
#define WMREADY(_status) (tolower(_status[0]) == MARKER_COMPLETE)
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now);
@ -1472,8 +1475,18 @@ extern cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate);
extern void dsp_markersummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid,
char *workername);
extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername,
K_TREE_CTX *ctx);
extern void dsp_workmarkers(K_ITEM *item, FILE *stream);
extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_workmarkers(int64_t workinfoid);
extern K_ITEM *find_workmarkerid(int64_t markerid);
// ***
// *** PostgreSQL functions ckdb_dbio.c

17
src/ckdb_cmd.c

@ -2920,10 +2920,12 @@ static K_TREE *upd_add_mu(K_TREE *mu_root, K_STORE *mu_store, int64_t userid, in
up to the createdate of the last share
The user average hashrate would be:
diffacc_user * 2^32 / pplns_elapsed
PPLNS fraction of the block would be:
PPLNS fraction of the payout would be:
diffacc_user / diffacc_total
*/
/* TODO: redesign to include workmarkers
* ... before next payout that extends into a markersummary ... */
static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
@ -3283,7 +3285,14 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd,
dsp_ktree(sharesummary_free, sharesummary_root,
transfer_data(i_file), NULL);
dsp_ktree(userstats_free, userstats_root, transfer_data(i_file), NULL);
dsp_ktree(userstats_free, userstats_root,
transfer_data(i_file), NULL);
dsp_ktree(markersummary_free, markersummary_root,
transfer_data(i_file), NULL);
dsp_ktree(workmarkers_free, workmarkers_root,
transfer_data(i_file), NULL);
LOGDEBUG("%s.ok.dsp.file='%s'", id, transfer_data(i_file));
return strdup("ok.dsp");
@ -3341,8 +3350,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(shares, 1, 1);
USEINFO(shareerrors, 1, 1);
USEINFO(sharesummary, 1, 2);
USEINFO(workmarkers, 1, 1);
USEINFO(markersummary, 1, 1);
USEINFO(workmarkers, 1, 2);
USEINFO(markersummary, 1, 2);
USEINFO(blocks, 1, 1);
USEINFO(miningpayouts, 1, 1);
USEINFO(auths, 1, 1);

304
src/ckdb_data.c

@ -596,24 +596,33 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername,
TODO: combine set_block_share_counters() with this? */
void workerstatus_ready()
{
K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1];
K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1], ms_ctx[1];
K_ITEM *ws_item, us_look, ss_look, *us_item, *ss_item;
K_ITEM *ms_item, ms_look, *wm_item;
USERSTATS lookuserstats, *userstats;
SHARESUMMARY looksharesummary, *sharesummary;
MARKERSUMMARY *markersummary;
WORKERSTATUS *workerstatus;
LOGWARNING("%s(): Updating workerstatus...", __func__);
INIT_USERSTATS(&us_look);
INIT_MARKERSUMMARY(&ms_look);
INIT_SHARESUMMARY(&ss_look);
ws_item = first_in_ktree(workerstatus_root, ws_ctx);
while (ws_item) {
DATA_WORKERSTATUS(workerstatus, ws_item);
// The last one
lookuserstats.userid = workerstatus->userid;
STRNCPY(lookuserstats.workername, workerstatus->workername);
lookuserstats.statsdate.tv_sec = date_eot.tv_sec;
lookuserstats.statsdate.tv_usec = date_eot.tv_usec;
us_look.data = (void *)(&lookuserstats);
K_RLOCK(userstats_free);
us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look,
cmp_userstats_workerstatus, us_ctx);
K_RUNLOCK(userstats_free);
if (us_item) {
DATA_USERSTATS(userstats, us_item);
if (userstats->idle) {
@ -631,6 +640,26 @@ void workerstatus_ready()
}
}
K_RLOCK(markersummary_free);
// This is the last one
ms_item = find_markersummary_userid(workerstatus->userid,
workerstatus->workername, ms_ctx);
K_RUNLOCK(markersummary_free);
if (ms_item) {
DATA_MARKERSUMMARY(markersummary, ms_item);
K_RLOCK(workmarkers_free);
wm_item = find_workmarkerid(markersummary->markerid);
K_RUNLOCK(workmarkers_free);
if (wm_item &&
tv_newer(&(workerstatus->last_share), &(markersummary->lastshare))) {
copy_tv(&(workerstatus->last_share),
&(markersummary->lastshare));
workerstatus->last_diff =
markersummary->lastdiffacc;
}
}
// The last one
looksharesummary.userid = workerstatus->userid;
STRNCPY(looksharesummary.workername, workerstatus->workername);
looksharesummary.workinfoid = MAXID;
@ -652,6 +681,8 @@ void workerstatus_ready()
ws_item = next_in_ktree(ws_ctx);
}
LOGWARNING("%s(): Update workerstatus complete", __func__);
}
void _workerstatus_update(AUTHS *auths, SHARES *shares,
@ -1240,7 +1271,8 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item;
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item;
K_ITEM *wm_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
@ -1276,6 +1308,19 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
goto bye;
}
K_RLOCK(markersummary_free);
wm_item = find_workmarkers(workinfoid);
K_RUNLOCK(markersummary_free);
// Should never happen?
if (wm_item && !reloading) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s attempt to age a "
"workmarker! Age ignored!",
__func__, workinfoid, poolinstance,
cd->tv_sec, cd->tv_usec, cd_buf);
goto bye;
}
INIT_SHARESUMMARY(&ss_look);
INIT_SHARES(&s_look);
@ -1521,6 +1566,8 @@ K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid)
return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx);
}
/* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */
void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
{
@ -1835,12 +1882,17 @@ void zero_on_new_block()
* Will need to add locking if it's used, later, after startup completes */
void set_block_share_counters()
{
K_TREE_CTX ctx[1];
K_ITEM *ss_item, ss_look, *ws_item;
K_TREE_CTX ctx[1], ctx_ms[1];
K_ITEM *ss_item, ss_look, *ws_item, *wm_item, *ms_item, ms_look;
WORKERSTATUS *workerstatus;
SHARESUMMARY *sharesummary, looksharesummary;
WORKMARKERS *workmarkers;
MARKERSUMMARY *markersummary, lookmarkersummary;
LOGWARNING("%s(): Updating block sharesummary counters...", __func__);
INIT_SHARESUMMARY(&ss_look);
INIT_MARKERSUMMARY(&ms_look);
zero_on_new_block();
@ -1852,7 +1904,7 @@ void set_block_share_counters()
ss_item = last_in_ktree(sharesummary_root, ctx);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->workinfoid < pool.workinfoid) {
if (sharesummary->workinfoid <= pool.workinfoid) {
// Skip back to the next worker
looksharesummary.userid = sharesummary->userid;
STRNCPY(looksharesummary.workername,
@ -1908,6 +1960,89 @@ void set_block_share_counters()
ss_item = prev_in_ktree(ctx);
}
K_RUNLOCK(sharesummary_free);
LOGWARNING("%s(): Updating block markersummary counters...", __func__);
// workmarkers after the workinfoid of the last pool block
// TODO: tune the loop layout if needed
ws_item = NULL;
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
DATA_WORKMARKERS_NULL(workmarkers, wm_item);
while (wm_item &&
CURRENT(&(workmarkers->expirydate)) &&
workmarkers->workinfoidend > pool.workinfoid) {
if (WMREADY(workmarkers->status))
{
// Should never be true
if (workmarkers->workinfoidstart <= pool.workinfoid) {
LOGEMERG("%s(): ERROR workmarker %"PRId64" has an invalid"
" workinfoid range start=%"PRId64" end=%"PRId64
" due to pool lastblock=%"PRId32
" workinfoid="PRId64,
__func__, workmarkers->markerid,
workmarkers->workinfoidstart,
workmarkers->workinfoidend,
pool.height, pool.workinfoid);
}
lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = MAXID;
lookmarkersummary.workername = EMPTY;
ms_look.data = (void *)(&lookmarkersummary);
ms_item = find_before_in_ktree(markersummary_root, &ms_look, cmp_markersummary, ctx_ms);
while (ms_item) {
DATA_MARKERSUMMARY(markersummary, ms_item);
if (markersummary->markerid != workmarkers->markerid)
break;
/* Check for user/workername change for new workerstatus
* The tree has user/workername grouped together in order
* so this will only be once per user/workername */
if (!ws_item ||
markersummary->userid != workerstatus->userid ||
strcmp(markersummary->workername, workerstatus->workername)) {
/* This is to trigger a console error if it is missing
* since it should always exist
* However, it is simplest to simply create it
* and keep going */
ws_item = find_workerstatus(markersummary->userid,
markersummary->workername,
__FILE__, __func__, __LINE__);
if (!ws_item) {
ws_item = find_create_workerstatus(markersummary->userid,
markersummary->workername,
__FILE__, __func__, __LINE__);
}
DATA_WORKERSTATUS(workerstatus, ws_item);
}
pool.diffacc += markersummary->diffacc;
pool.diffinv += markersummary->diffsta + markersummary->diffdup +
markersummary->diffhi + markersummary->diffrej;
workerstatus->diffacc += markersummary->diffacc;
workerstatus->diffinv += markersummary->diffsta + markersummary->diffdup +
markersummary->diffhi + markersummary->diffrej;
workerstatus->diffsta += markersummary->diffsta;
workerstatus->diffdup += markersummary->diffdup;
workerstatus->diffhi += markersummary->diffhi;
workerstatus->diffrej += markersummary->diffrej;
workerstatus->shareacc += markersummary->shareacc;
workerstatus->shareinv += markersummary->sharesta + markersummary->sharedup +
markersummary->sharehi + markersummary->sharerej;
workerstatus->sharesta += markersummary->sharesta;
workerstatus->sharedup += markersummary->sharedup;
workerstatus->sharehi += markersummary->sharehi;
workerstatus->sharerej += markersummary->sharerej;
ms_item = prev_in_ktree(ctx_ms);
}
}
wm_item = prev_in_ktree(ctx);
DATA_WORKMARKERS_NULL(workmarkers, wm_item);
}
LOGWARNING("%s(): Update block counters complete", __func__);
}
/* order by height asc,userid asc,expirydate asc
@ -2074,6 +2209,23 @@ bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate)
return true;
}
void dsp_markersummary(K_ITEM *item, FILE *stream)
{
MARKERSUMMARY *ms;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_MARKERSUMMARY(ms, item);
fprintf(stream, " markerid=%"PRId64" userid=%"PRId64
" worker='%s' " "diffacc=%f shares=%"PRId64
" errs=%"PRId64" lastdiff=%f\n",
ms->markerid, ms->userid, ms->workername,
ms->diffacc, ms->sharecount, ms->errorcount,
ms->lastdiffacc);
}
}
// order by markerid asc,userid asc,workername asc
cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b)
{
@ -2089,18 +2241,148 @@ cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b)
return c;
}
// order by markerid asc,workinfoidend asc,expirydate desc
// order by userid asc,workername asc,lastshare asc
cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b)
{
MARKERSUMMARY *ma, *mb;
DATA_MARKERSUMMARY(ma, a);
DATA_MARKERSUMMARY(mb, b);
cmp_t c = CMP_BIGINT(ma->userid, mb->userid);
if (c == 0) {
c = CMP_STR(ma->workername, mb->workername);
if (c == 0)
c = CMP_TV(ma->lastshare, mb->lastshare);
}
return c;
}
// Finds the last markersummary for the worker but also returns the CTX
K_ITEM *find_markersummary_userid(int64_t userid, char *workername, K_TREE_CTX *ctx)
{
K_ITEM look, *ms_item = NULL;
MARKERSUMMARY markersummary, *ms;
markersummary.userid = userid;
markersummary.workername = workername;
markersummary.lastshare.tv_sec = DATE_S_EOT;
INIT_MARKERSUMMARY(&look);
look.data = (void *)(&markersummary);
ms_item = find_before_in_ktree(markersummary_userid_root, &look, cmp_markersummary_userid, ctx);
if (ms_item) {
DATA_MARKERSUMMARY(ms, ms_item);
if (ms->userid != userid || strcmp(ms->workername, workername))
ms_item = NULL;
}
return ms_item;
}
K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername)
{
K_ITEM look, *wm_item, *ms_item = NULL;
MARKERSUMMARY markersummary;
WORKMARKERS *wm;
K_TREE_CTX ctx[1];
wm_item = find_workmarkers(workinfoid);
if (wm_item) {
DATA_WORKMARKERS(wm, wm_item);
markersummary.markerid = wm->markerid;
markersummary.userid = userid;
markersummary.workername = workername;
INIT_MARKERSUMMARY(&look);
look.data = (void *)(&markersummary);
ms_item = find_in_ktree(markersummary_root, &look, cmp_markersummary, ctx);
}
return ms_item;
}
void dsp_workmarkers(K_ITEM *item, FILE *stream)
{
WORKMARKERS *wm;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_WORKMARKERS(wm, item);
fprintf(stream, " id=%"PRId64" pi='%s' end=%"PRId64" stt=%"
PRId64" sta='%s' des='%s'\n",
wm->markerid, wm->poolinstance,
wm->workinfoidend, wm->workinfoidstart,
wm->status, wm->description);
}
}
// order by expirydate asc,markerid asc
cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b)
{
WORKMARKERS *wa, *wb;
DATA_WORKMARKERS(wa, a);
DATA_WORKMARKERS(wb, b);
cmp_t c = CMP_BIGINT(wa->markerid, wb->markerid);
if (c == 0) {
cmp_t c = CMP_TV(wa->expirydate, wb->expirydate);
if (c == 0)
c = CMP_BIGINT(wa->markerid, wb->markerid);
return c;
}
// order by expirydate asc,workinfoidend asc
// TODO: add poolinstance
cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b)
{
WORKMARKERS *wa, *wb;
DATA_WORKMARKERS(wa, a);
DATA_WORKMARKERS(wb, b);
cmp_t c = CMP_TV(wa->expirydate, wb->expirydate);
if (c == 0)
c = CMP_BIGINT(wa->workinfoidend, wb->workinfoidend);
if (c == 0)
c = CMP_TV(wb->expirydate, wa->expirydate);
}
return c;
}
K_ITEM *find_workmarkers(int64_t workinfoid)
{
WORKMARKERS workmarkers, *wm;
K_TREE_CTX ctx[1];
K_ITEM look, *wm_item;
workmarkers.expirydate.tv_sec = default_expiry.tv_sec;
workmarkers.expirydate.tv_usec = default_expiry.tv_usec;
workmarkers.workinfoidend = workinfoid-1;
INIT_WORKMARKERS(&look);
look.data = (void *)(&workmarkers);
wm_item = find_after_in_ktree(workmarkers_workinfoid_root, &look, cmp_workmarkers_workinfoid, ctx);
if (wm_item) {
DATA_WORKMARKERS(wm, wm_item);
if (!CURRENT(&(wm->expirydate)) ||
!WMREADY(wm->status) ||
workinfoid < wm->workinfoidstart ||
workinfoid > wm->workinfoidend)
wm_item = NULL;
}
return wm_item;
}
K_ITEM *find_workmarkerid(int64_t markerid)
{
WORKMARKERS workmarkers, *wm;
K_TREE_CTX ctx[1];
K_ITEM look, *wm_item;
workmarkers.expirydate.tv_sec = default_expiry.tv_sec;
workmarkers.expirydate.tv_usec = default_expiry.tv_usec;
workmarkers.markerid = markerid;
INIT_WORKMARKERS(&look);
look.data = (void *)(&workmarkers);
wm_item = find_in_ktree(workmarkers_root, &look, cmp_workmarkers, ctx);
if (wm_item) {
DATA_WORKMARKERS(wm, wm_item);
if (!CURRENT(&(wm->expirydate)) ||
!WMREADY(wm->status))
wm_item = NULL;
}
return wm_item;
}

46
src/ckdb_dbio.c

@ -2393,7 +2393,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
char *nonce, char *diff, char *sdiff, char *secondaryuserid,
char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item;
K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item;
char cd_buf[DATE_BUFSIZ];
SHARESUMMARY *sharesummary;
SHARES *shares;
@ -2451,6 +2451,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
wi_item = find_workinfo(shares->workinfoid);
if (!wi_item) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
// TODO: store it for a few workinfoid changes
LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Share discarded!",
__func__, shares->workinfoid, workername,
cd->tv_sec, cd->tv_usec, cd_buf);
@ -2463,6 +2464,14 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
goto unitem;
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is ready
wm_item = find_workmarkers(shares->workinfoid);
if (wm_item) {
K_WLOCK(shares_free);
k_add_head(shares_free, s_item);
K_WUNLOCK(shares_free);
return true;
}
ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
@ -2506,7 +2515,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
char *error, char *secondaryuserid, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item;
K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item;
char cd_buf[DATE_BUFSIZ];
SHARESUMMARY *sharesummary;
SHAREERRORS *shareerrors;
@ -2572,6 +2581,14 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
goto unitem;
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is ready
wm_item = find_workmarkers(shareerrors->workinfoid);
if (wm_item) {
K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, s_item);
K_WUNLOCK(shareerrors_free);
return true;
}
ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
@ -2615,8 +2632,9 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
{
ExecStatusType rescode;
PGresult *res = NULL;
WORKMARKERS *wm;
SHARESUMMARY *row;
K_ITEM *item;
K_ITEM *item, *wm_item;
char *ins, *upd;
bool ok = false, new;
char *params[19 + MODIFYDATECOUNT];
@ -2665,6 +2683,23 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
sharecreatedate = &(e_row->createdate);
}
K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(workinfoid);
K_RUNLOCK(workmarkers_free);
if (wm_item) {
char *tmp;
DATA_WORKMARKERS(wm, wm_item);
LOGERR("%s(): attempt to update sharesummary "
"with %s %"PRId64"/%"PRId64"/%s createdate %s"
" but ready workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, workername,
(tmp = ctv_to_buf(sharecreatedate, NULL, 0)),
wm->markerid);
free(tmp);
return false;
}
K_RLOCK(sharesummary_free);
item = find_sharesummary(userid, workername, workinfoid);
K_RUNLOCK(sharesummary_free);
@ -3082,7 +3117,7 @@ bool sharesummary_fill(PGconn *conn)
sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root, item, cmp_sharesummary_workinfoid);
k_add_head(sharesummary_store, item);
// A share summary is currently only shares in a single workinfo, at all 3 levels n,a,y
// A share summary is shares in a single workinfo, at all 3 levels n,a,y
if (tolower(row->complete[0]) == SUMMARY_NEW) {
if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) {
@ -4933,6 +4968,7 @@ bool markersummary_fill(PGconn *conn)
break;
markersummary_root = add_to_ktree(markersummary_root, item, cmp_markersummary);
markersummary_userid_root = add_to_ktree(markersummary_userid_root, item, cmp_markersummary_userid);
k_add_head(markersummary_store, item);
tick();
@ -5033,6 +5069,8 @@ bool workmarkers_fill(PGconn *conn)
break;
workmarkers_root = add_to_ktree(workmarkers_root, item, cmp_workmarkers);
workmarkers_workinfoid_root = add_to_ktree(workmarkers_workinfoid_root,
item, cmp_workmarkers_workinfoid);
k_add_head(workmarkers_store, item);
tick();

Loading…
Cancel
Save