diff --git a/sql/ckdb.sql b/sql/ckdb.sql index d03bd39d..f9bf6d73 100644 --- a/sql/ckdb.sql +++ b/sql/ckdb.sql @@ -247,7 +247,7 @@ CREATE TABLE sharesummary ( -- per workinfo for each user+worker modifyby character varying(64) NOT NULL, modifycode character varying(128) NOT NULL, modifyinet character varying(128) NOT NULL, - PRIMARY KEY (userid, workername, workinfoid) + PRIMARY KEY (workinfoid, userid, workername) ); @@ -417,4 +417,4 @@ CREATE TABLE version ( PRIMARY KEY (vlock) ); -insert into version (vlock,version) values (1,'0.9.3'); +insert into version (vlock,version) values (1,'0.9.4'); diff --git a/sql/v0.9.2-v0.9.3.sql b/sql/v0.9.2-v0.9.3.sql index 9ca93b06..e20d9582 100644 --- a/sql/v0.9.2-v0.9.3.sql +++ b/sql/v0.9.2-v0.9.3.sql @@ -68,5 +68,4 @@ CREATE TABLE markersummary ( -- sum of sharesummary for a workinfo range PRIMARY KEY (markerid, userid, workername) ); - END transaction; diff --git a/src/ckdb.c b/src/ckdb.c index a167a231..3554a11e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -40,8 +40,8 @@ * This error would be very rare and also not an issue * To avoid this, we start the ckpool message queue after loading * the users, auths, idcontrol and workers DB tables, before loading the - * much larger sharesummary, workinfo, userstats and poolstats DB tables - * so that ckdb is effectively ready for messages almost immediately + * much larger DB tables so that ckdb is effectively ready for messages + * almost immediately * The first ckpool message also allows us to know where ckpool is up to * in the CCLs and thus where to stop processing the CCLs to stay in * sync with ckpool @@ -121,7 +121,7 @@ * Tables that are/will be written straight to the DB, so are OK: * users, useraccounts, paymentaddresses, payments, * accountadjustment, optioncontrol, miningpayouts, - * eventlog + * eventlog, workmarkers, markersummary * * The code deals with the issue of 'now' when reloading by: * createdate is considered 'now' for all data during a reload and is @@ -148,6 +148,8 @@ * 3) ageworkinfo records are also handled by the shares date * while processing, any records already aged are not updated * and a warning is displayed if there were any matching shares + * Any ageworkinfos that match a workmarker are ignored with an error + * message */ static bool socketer_using_data; @@ -437,11 +439,13 @@ K_STORE *workerstatus_store; // MARKERSUMMARY K_TREE *markersummary_root; +K_TREE *markersummary_userid_root; K_LIST *markersummary_free; K_STORE *markersummary_store; // WORKMARKERS K_TREE *workmarkers_root; +K_TREE *workmarkers_workinfoid_root; K_LIST *workmarkers_free; K_STORE *workmarkers_store; @@ -1017,11 +1021,15 @@ static void alloc_storage() ALLOC_MARKERSUMMARY, LIMIT_MARKERSUMMARY, true); markersummary_store = k_new_store(markersummary_free); markersummary_root = new_ktree(); + markersummary_userid_root = new_ktree(); + markersummary_free->dsp_func = dsp_markersummary; workmarkers_free = k_new_list("WorkMarkers", sizeof(WORKMARKERS), ALLOC_WORKMARKERS, LIMIT_WORKMARKERS, true); workmarkers_store = k_new_store(workmarkers_free); workmarkers_root = new_ktree(); + workmarkers_workinfoid_root = new_ktree(); + workmarkers_free->dsp_func = dsp_workmarkers; } static void free_workinfo_data(K_ITEM *item) @@ -1123,10 +1131,12 @@ static void dealloc_storage() { FREE_LISTS(logqueue); + FREE_TREE(workmarkers_workinfoid); FREE_TREE(workmarkers); FREE_STORE_DATA(workmarkers); FREE_LIST_DATA(workmarkers); + FREE_TREE(markersummary_userid); FREE_TREE(markersummary); FREE_STORE_DATA(markersummary); FREE_LIST_DATA(markersummary); @@ -1499,15 +1509,18 @@ static void check_blocks() static void summarise_blocks() { K_ITEM *b_item, *b_prev, *wi_item, ss_look, *ss_item; - K_TREE_CTX ctx[1], ss_ctx[1]; + K_ITEM wm_look, *wm_item, ms_look, *ms_item; + K_TREE_CTX ctx[1], ss_ctx[1], ms_ctx[1]; double diffacc, diffinv, shareacc, shareinv; tv_t now, elapsed_start, elapsed_finish; int64_t elapsed, wi_start, wi_finish; BLOCKS *blocks, *prev_blocks; WORKINFO *prev_workinfo; SHARESUMMARY looksharesummary, *sharesummary; + WORKMARKERS lookworkmarkers, *workmarkers; + MARKERSUMMARY lookmarkersummary, *markersummary; + bool has_ss = false, has_ms = false, ok; int32_t hi, prev_hi; - bool ok; setnow(&now); @@ -1576,26 +1589,24 @@ static void summarise_blocks() looksharesummary.workername[0] = '\0'; INIT_SHARESUMMARY(&ss_look); ss_look.data = (void *)(&looksharesummary); + + // For now, just lock all 3 K_RLOCK(sharesummary_free); + K_RLOCK(workmarkers_free); + K_RLOCK(markersummary_free); + ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx); - - if (!ss_item) { - K_RUNLOCK(sharesummary_free); - // This will repeat each call here until fixed ... - LOGERR("%s() block %d, prev %d no sharesummaries " - "on or before %"PRId64, - __func__, blocks->height, - prev_hi, wi_finish); - return; - } - DATA_SHARESUMMARY(sharesummary, ss_item); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); while (ss_item && sharesummary->workinfoid > wi_start) { if (sharesummary->complete[0] == SUMMARY_NEW) { // Not aged yet + K_RUNLOCK(markersummary_free); + K_RUNLOCK(workmarkers_free); K_RUNLOCK(sharesummary_free); return; } + has_ss = true; if (elapsed_start.tv_sec == 0 || !tv_newer(&elapsed_start, &(sharesummary->firstshare))) { copy_tv(&elapsed_start, &(sharesummary->firstshare)); @@ -1613,8 +1624,77 @@ static void summarise_blocks() ss_item = prev_in_ktree(ss_ctx); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); } + + // Add in the workmarkers...markersummaries + lookworkmarkers.expirydate.tv_sec = default_expiry.tv_sec; + lookworkmarkers.expirydate.tv_usec = default_expiry.tv_usec; + lookworkmarkers.workinfoidend = wi_finish+1; + INIT_WORKMARKERS(&wm_look); + wm_look.data = (void *)(&lookworkmarkers); + wm_item = find_before_in_ktree(workmarkers_workinfoid_root, &wm_look, + cmp_workmarkers_workinfoid, ctx); + DATA_WORKMARKERS_NULL(workmarkers, wm_item); + while (wm_item && + CURRENT(&(workmarkers->expirydate)) && + workmarkers->workinfoidend > wi_start) { + + if (workmarkers->workinfoidstart < wi_start) { + LOGEMERG("%s() workmarkers %"PRId64"/%s/%"PRId64 + "/%"PRId64"/%s/%s crosses block " + "%"PRId32"/%"PRId64" boundary", + __func__, workmarkers->markerid, + workmarkers->poolinstance, + workmarkers->workinfoidstart, + workmarkers->workinfoidend, + workmarkers->description, + workmarkers->status, hi, wi_finish); + } + if (WMREADY(workmarkers->status)) { + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = MAXID; + lookmarkersummary.workername[0] = '\0'; + INIT_MARKERSUMMARY(&ms_look); + ms_look.data = (void *)(&lookmarkersummary); + ms_item = find_before_in_ktree(markersummary_root, &ms_look, + cmp_markersummary, ms_ctx); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + while (ms_item && markersummary->markerid == workmarkers->markerid) { + has_ms = true; + if (elapsed_start.tv_sec == 0 || + !tv_newer(&elapsed_start, &(markersummary->firstshare))) { + copy_tv(&elapsed_start, &(markersummary->firstshare)); + } + if (tv_newer(&elapsed_finish, &(markersummary->lastshare))) + copy_tv(&elapsed_finish, &(markersummary->lastshare)); + + diffacc += markersummary->diffacc; + diffinv += markersummary->diffsta + markersummary->diffdup + + markersummary->diffhi + markersummary-> diffrej; + shareacc += markersummary->shareacc; + shareinv += markersummary->sharesta + markersummary->sharedup + + markersummary->sharehi + markersummary-> sharerej; + + ms_item = prev_in_ktree(ms_ctx); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + } + } + wm_item = prev_in_ktree(ctx); + DATA_WORKMARKERS_NULL(workmarkers, wm_item); + } + + K_RUNLOCK(markersummary_free); + K_RUNLOCK(workmarkers_free); K_RUNLOCK(sharesummary_free); + if (!has_ss && !has_ms) { + // This will repeat each call here until fixed ... + LOGERR("%s() block %d, after block %d, no sharesummaries " + "or markersummaries after %"PRId64" up to %"PRId64, + __func__, blocks->height, + prev_hi, wi_start, wi_finish); + return; + } + elapsed = (int64_t)(tvdiff(&elapsed_finish, &elapsed_start) + 0.5); ok = blocks_stats(NULL, blocks->height, blocks->blockhash, diffacc, diffinv, shareacc, shareinv, elapsed, @@ -3184,6 +3264,7 @@ static void confirm_reload() true, false); } +// TODO: handle workmarkers/markersummaries static void confirm_summaries() { pthread_t log_pt; diff --git a/src/ckdb.h b/src/ckdb.h index 33227a3a..c49852f6 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,8 +51,8 @@ */ #define DB_VLOCK "1" -#define DB_VERSION "0.9.3" -#define CKDB_VERSION DB_VERSION"-0.610" +#define DB_VERSION "0.9.4" +#define CKDB_VERSION DB_VERSION"-0.630" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1294,6 +1294,7 @@ typedef struct markersummary { #define DATA_MARKERSUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, markersummary, false) extern K_TREE *markersummary_root; +extern K_TREE *markersummary_userid_root; extern K_LIST *markersummary_free; extern K_STORE *markersummary_store; @@ -1315,10 +1316,12 @@ typedef struct workmarkers { #define DATA_WORKMARKERS_NULL(_var, _item) DATA_GENERIC(_var, _item, workmarkers, false) extern K_TREE *workmarkers_root; +extern K_TREE *workmarkers_workinfoid_root; extern K_LIST *workmarkers_free; extern K_STORE *workmarkers_store; #define MARKER_COMPLETE 'x' +#define WMREADY(_status) (tolower(_status[0]) == MARKER_COMPLETE) extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); @@ -1472,8 +1475,18 @@ extern cmp_t cmp_userstats_workername(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats_statsdate(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b); extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate); +extern void dsp_markersummary(K_ITEM *item, FILE *stream); extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b); +extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b); +extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, + char *workername); +extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername, + K_TREE_CTX *ctx); +extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); +extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); +extern K_ITEM *find_workmarkers(int64_t workinfoid); +extern K_ITEM *find_workmarkerid(int64_t markerid); // *** // *** PostgreSQL functions ckdb_dbio.c diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index fff59f2c..c7c580c8 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -2920,10 +2920,12 @@ static K_TREE *upd_add_mu(K_TREE *mu_root, K_STORE *mu_store, int64_t userid, in up to the createdate of the last share The user average hashrate would be: diffacc_user * 2^32 / pplns_elapsed - PPLNS fraction of the block would be: + PPLNS fraction of the payout would be: diffacc_user / diffacc_total */ +/* TODO: redesign to include workmarkers + * ... before next payout that extends into a markersummary ... */ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, @@ -3283,7 +3285,14 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, dsp_ktree(sharesummary_free, sharesummary_root, transfer_data(i_file), NULL); - dsp_ktree(userstats_free, userstats_root, transfer_data(i_file), NULL); + dsp_ktree(userstats_free, userstats_root, + transfer_data(i_file), NULL); + + dsp_ktree(markersummary_free, markersummary_root, + transfer_data(i_file), NULL); + + dsp_ktree(workmarkers_free, workmarkers_root, + transfer_data(i_file), NULL); LOGDEBUG("%s.ok.dsp.file='%s'", id, transfer_data(i_file)); return strdup("ok.dsp"); @@ -3341,8 +3350,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(shares, 1, 1); USEINFO(shareerrors, 1, 1); USEINFO(sharesummary, 1, 2); - USEINFO(workmarkers, 1, 1); - USEINFO(markersummary, 1, 1); + USEINFO(workmarkers, 1, 2); + USEINFO(markersummary, 1, 2); USEINFO(blocks, 1, 1); USEINFO(miningpayouts, 1, 1); USEINFO(auths, 1, 1); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index f786f78a..e712f520 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -596,24 +596,33 @@ K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, TODO: combine set_block_share_counters() with this? */ void workerstatus_ready() { - K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1]; + K_TREE_CTX ws_ctx[1], us_ctx[1], ss_ctx[1], ms_ctx[1]; K_ITEM *ws_item, us_look, ss_look, *us_item, *ss_item; + K_ITEM *ms_item, ms_look, *wm_item; USERSTATS lookuserstats, *userstats; SHARESUMMARY looksharesummary, *sharesummary; + MARKERSUMMARY *markersummary; WORKERSTATUS *workerstatus; + LOGWARNING("%s(): Updating workerstatus...", __func__); + INIT_USERSTATS(&us_look); + INIT_MARKERSUMMARY(&ms_look); INIT_SHARESUMMARY(&ss_look); ws_item = first_in_ktree(workerstatus_root, ws_ctx); while (ws_item) { DATA_WORKERSTATUS(workerstatus, ws_item); + + // The last one lookuserstats.userid = workerstatus->userid; STRNCPY(lookuserstats.workername, workerstatus->workername); lookuserstats.statsdate.tv_sec = date_eot.tv_sec; lookuserstats.statsdate.tv_usec = date_eot.tv_usec; us_look.data = (void *)(&lookuserstats); + K_RLOCK(userstats_free); us_item = find_before_in_ktree(userstats_workerstatus_root, &us_look, cmp_userstats_workerstatus, us_ctx); + K_RUNLOCK(userstats_free); if (us_item) { DATA_USERSTATS(userstats, us_item); if (userstats->idle) { @@ -631,6 +640,26 @@ void workerstatus_ready() } } + K_RLOCK(markersummary_free); + // This is the last one + ms_item = find_markersummary_userid(workerstatus->userid, + workerstatus->workername, ms_ctx); + K_RUNLOCK(markersummary_free); + if (ms_item) { + DATA_MARKERSUMMARY(markersummary, ms_item); + K_RLOCK(workmarkers_free); + wm_item = find_workmarkerid(markersummary->markerid); + K_RUNLOCK(workmarkers_free); + if (wm_item && + tv_newer(&(workerstatus->last_share), &(markersummary->lastshare))) { + copy_tv(&(workerstatus->last_share), + &(markersummary->lastshare)); + workerstatus->last_diff = + markersummary->lastdiffacc; + } + } + + // The last one looksharesummary.userid = workerstatus->userid; STRNCPY(looksharesummary.workername, workerstatus->workername); looksharesummary.workinfoid = MAXID; @@ -652,6 +681,8 @@ void workerstatus_ready() ws_item = next_in_ktree(ws_ctx); } + + LOGWARNING("%s(): Update workerstatus complete", __func__); } void _workerstatus_update(AUTHS *auths, SHARES *shares, @@ -1240,7 +1271,8 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, int64_t *s_count, int64_t *s_diff) { - K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item; + K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item; + K_ITEM *wm_item, *tmp_item; K_TREE_CTX ss_ctx[1], s_ctx[1]; char cd_buf[DATE_BUFSIZ]; int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped; @@ -1276,6 +1308,19 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, goto bye; } + K_RLOCK(markersummary_free); + wm_item = find_workmarkers(workinfoid); + K_RUNLOCK(markersummary_free); + // Should never happen? + if (wm_item && !reloading) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s attempt to age a " + "workmarker! Age ignored!", + __func__, workinfoid, poolinstance, + cd->tv_sec, cd->tv_usec, cd_buf); + goto bye; + } + INIT_SHARESUMMARY(&ss_look); INIT_SHARES(&s_look); @@ -1521,6 +1566,8 @@ K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid) return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx); } +/* TODO: markersummary checking? + * However, there should be no issues since the sharesummaries are removed */ void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, char *by, char *code, char *inet, tv_t *cd) { @@ -1835,12 +1882,17 @@ void zero_on_new_block() * Will need to add locking if it's used, later, after startup completes */ void set_block_share_counters() { - K_TREE_CTX ctx[1]; - K_ITEM *ss_item, ss_look, *ws_item; + K_TREE_CTX ctx[1], ctx_ms[1]; + K_ITEM *ss_item, ss_look, *ws_item, *wm_item, *ms_item, ms_look; WORKERSTATUS *workerstatus; SHARESUMMARY *sharesummary, looksharesummary; + WORKMARKERS *workmarkers; + MARKERSUMMARY *markersummary, lookmarkersummary; + + LOGWARNING("%s(): Updating block sharesummary counters...", __func__); INIT_SHARESUMMARY(&ss_look); + INIT_MARKERSUMMARY(&ms_look); zero_on_new_block(); @@ -1852,7 +1904,7 @@ void set_block_share_counters() ss_item = last_in_ktree(sharesummary_root, ctx); while (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); - if (sharesummary->workinfoid < pool.workinfoid) { + if (sharesummary->workinfoid <= pool.workinfoid) { // Skip back to the next worker looksharesummary.userid = sharesummary->userid; STRNCPY(looksharesummary.workername, @@ -1908,6 +1960,89 @@ void set_block_share_counters() ss_item = prev_in_ktree(ctx); } K_RUNLOCK(sharesummary_free); + + LOGWARNING("%s(): Updating block markersummary counters...", __func__); + + // workmarkers after the workinfoid of the last pool block + // TODO: tune the loop layout if needed + ws_item = NULL; + wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); + DATA_WORKMARKERS_NULL(workmarkers, wm_item); + while (wm_item && + CURRENT(&(workmarkers->expirydate)) && + workmarkers->workinfoidend > pool.workinfoid) { + + if (WMREADY(workmarkers->status)) + { + // Should never be true + if (workmarkers->workinfoidstart <= pool.workinfoid) { + LOGEMERG("%s(): ERROR workmarker %"PRId64" has an invalid" + " workinfoid range start=%"PRId64" end=%"PRId64 + " due to pool lastblock=%"PRId32 + " workinfoid="PRId64, + __func__, workmarkers->markerid, + workmarkers->workinfoidstart, + workmarkers->workinfoidend, + pool.height, pool.workinfoid); + } + + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = MAXID; + lookmarkersummary.workername = EMPTY; + ms_look.data = (void *)(&lookmarkersummary); + ms_item = find_before_in_ktree(markersummary_root, &ms_look, cmp_markersummary, ctx_ms); + while (ms_item) { + DATA_MARKERSUMMARY(markersummary, ms_item); + if (markersummary->markerid != workmarkers->markerid) + break; + + /* Check for user/workername change for new workerstatus + * The tree has user/workername grouped together in order + * so this will only be once per user/workername */ + if (!ws_item || + markersummary->userid != workerstatus->userid || + strcmp(markersummary->workername, workerstatus->workername)) { + /* This is to trigger a console error if it is missing + * since it should always exist + * However, it is simplest to simply create it + * and keep going */ + ws_item = find_workerstatus(markersummary->userid, + markersummary->workername, + __FILE__, __func__, __LINE__); + if (!ws_item) { + ws_item = find_create_workerstatus(markersummary->userid, + markersummary->workername, + __FILE__, __func__, __LINE__); + } + DATA_WORKERSTATUS(workerstatus, ws_item); + } + + pool.diffacc += markersummary->diffacc; + pool.diffinv += markersummary->diffsta + markersummary->diffdup + + markersummary->diffhi + markersummary->diffrej; + workerstatus->diffacc += markersummary->diffacc; + workerstatus->diffinv += markersummary->diffsta + markersummary->diffdup + + markersummary->diffhi + markersummary->diffrej; + workerstatus->diffsta += markersummary->diffsta; + workerstatus->diffdup += markersummary->diffdup; + workerstatus->diffhi += markersummary->diffhi; + workerstatus->diffrej += markersummary->diffrej; + workerstatus->shareacc += markersummary->shareacc; + workerstatus->shareinv += markersummary->sharesta + markersummary->sharedup + + markersummary->sharehi + markersummary->sharerej; + workerstatus->sharesta += markersummary->sharesta; + workerstatus->sharedup += markersummary->sharedup; + workerstatus->sharehi += markersummary->sharehi; + workerstatus->sharerej += markersummary->sharerej; + + ms_item = prev_in_ktree(ctx_ms); + } + } + wm_item = prev_in_ktree(ctx); + DATA_WORKMARKERS_NULL(workmarkers, wm_item); + } + + LOGWARNING("%s(): Update block counters complete", __func__); } /* order by height asc,userid asc,expirydate asc @@ -2074,6 +2209,23 @@ bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate) return true; } +void dsp_markersummary(K_ITEM *item, FILE *stream) +{ + MARKERSUMMARY *ms; + + if (!item) + fprintf(stream, "%s() called with (null) item\n", __func__); + else { + DATA_MARKERSUMMARY(ms, item); + fprintf(stream, " markerid=%"PRId64" userid=%"PRId64 + " worker='%s' " "diffacc=%f shares=%"PRId64 + " errs=%"PRId64" lastdiff=%f\n", + ms->markerid, ms->userid, ms->workername, + ms->diffacc, ms->sharecount, ms->errorcount, + ms->lastdiffacc); + } +} + // order by markerid asc,userid asc,workername asc cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b) { @@ -2089,18 +2241,148 @@ cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b) return c; } -// order by markerid asc,workinfoidend asc,expirydate desc +// order by userid asc,workername asc,lastshare asc +cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b) +{ + MARKERSUMMARY *ma, *mb; + DATA_MARKERSUMMARY(ma, a); + DATA_MARKERSUMMARY(mb, b); + cmp_t c = CMP_BIGINT(ma->userid, mb->userid); + if (c == 0) { + c = CMP_STR(ma->workername, mb->workername); + if (c == 0) + c = CMP_TV(ma->lastshare, mb->lastshare); + } + return c; +} + +// Finds the last markersummary for the worker but also returns the CTX +K_ITEM *find_markersummary_userid(int64_t userid, char *workername, K_TREE_CTX *ctx) +{ + K_ITEM look, *ms_item = NULL; + MARKERSUMMARY markersummary, *ms; + + markersummary.userid = userid; + markersummary.workername = workername; + markersummary.lastshare.tv_sec = DATE_S_EOT; + + INIT_MARKERSUMMARY(&look); + look.data = (void *)(&markersummary); + ms_item = find_before_in_ktree(markersummary_userid_root, &look, cmp_markersummary_userid, ctx); + if (ms_item) { + DATA_MARKERSUMMARY(ms, ms_item); + if (ms->userid != userid || strcmp(ms->workername, workername)) + ms_item = NULL; + } + return ms_item; +} + +K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername) +{ + K_ITEM look, *wm_item, *ms_item = NULL; + MARKERSUMMARY markersummary; + WORKMARKERS *wm; + K_TREE_CTX ctx[1]; + + wm_item = find_workmarkers(workinfoid); + if (wm_item) { + DATA_WORKMARKERS(wm, wm_item); + markersummary.markerid = wm->markerid; + markersummary.userid = userid; + markersummary.workername = workername; + + INIT_MARKERSUMMARY(&look); + look.data = (void *)(&markersummary); + ms_item = find_in_ktree(markersummary_root, &look, cmp_markersummary, ctx); + } + + return ms_item; +} + +void dsp_workmarkers(K_ITEM *item, FILE *stream) +{ + WORKMARKERS *wm; + + if (!item) + fprintf(stream, "%s() called with (null) item\n", __func__); + else { + DATA_WORKMARKERS(wm, item); + fprintf(stream, " id=%"PRId64" pi='%s' end=%"PRId64" stt=%" + PRId64" sta='%s' des='%s'\n", + wm->markerid, wm->poolinstance, + wm->workinfoidend, wm->workinfoidstart, + wm->status, wm->description); + } +} + +// order by expirydate asc,markerid asc cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b) { WORKMARKERS *wa, *wb; DATA_WORKMARKERS(wa, a); DATA_WORKMARKERS(wb, b); - cmp_t c = CMP_BIGINT(wa->markerid, wb->markerid); - if (c == 0) { + cmp_t c = CMP_TV(wa->expirydate, wb->expirydate); + if (c == 0) + c = CMP_BIGINT(wa->markerid, wb->markerid); + return c; +} + +// order by expirydate asc,workinfoidend asc +// TODO: add poolinstance +cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b) +{ + WORKMARKERS *wa, *wb; + DATA_WORKMARKERS(wa, a); + DATA_WORKMARKERS(wb, b); + cmp_t c = CMP_TV(wa->expirydate, wb->expirydate); + if (c == 0) c = CMP_BIGINT(wa->workinfoidend, wb->workinfoidend); - if (c == 0) - c = CMP_TV(wb->expirydate, wa->expirydate); - } return c; } +K_ITEM *find_workmarkers(int64_t workinfoid) +{ + WORKMARKERS workmarkers, *wm; + K_TREE_CTX ctx[1]; + K_ITEM look, *wm_item; + + workmarkers.expirydate.tv_sec = default_expiry.tv_sec; + workmarkers.expirydate.tv_usec = default_expiry.tv_usec; + workmarkers.workinfoidend = workinfoid-1; + + INIT_WORKMARKERS(&look); + look.data = (void *)(&workmarkers); + wm_item = find_after_in_ktree(workmarkers_workinfoid_root, &look, cmp_workmarkers_workinfoid, ctx); + if (wm_item) { + DATA_WORKMARKERS(wm, wm_item); + if (!CURRENT(&(wm->expirydate)) || + !WMREADY(wm->status) || + workinfoid < wm->workinfoidstart || + workinfoid > wm->workinfoidend) + wm_item = NULL; + } + return wm_item; +} + +K_ITEM *find_workmarkerid(int64_t markerid) +{ + WORKMARKERS workmarkers, *wm; + K_TREE_CTX ctx[1]; + K_ITEM look, *wm_item; + + workmarkers.expirydate.tv_sec = default_expiry.tv_sec; + workmarkers.expirydate.tv_usec = default_expiry.tv_usec; + workmarkers.markerid = markerid; + + INIT_WORKMARKERS(&look); + look.data = (void *)(&workmarkers); + wm_item = find_in_ktree(workmarkers_root, &look, cmp_workmarkers, ctx); + if (wm_item) { + DATA_WORKMARKERS(wm, wm_item); + if (!CURRENT(&(wm->expirydate)) || + !WMREADY(wm->status)) + wm_item = NULL; + } + return wm_item; +} + diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 36b2e76d..8c0b3d52 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -2393,7 +2393,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername char *nonce, char *diff, char *sdiff, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { - K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; + K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item; char cd_buf[DATE_BUFSIZ]; SHARESUMMARY *sharesummary; SHARES *shares; @@ -2451,6 +2451,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername wi_item = find_workinfo(shares->workinfoid); if (!wi_item) { tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + // TODO: store it for a few workinfoid changes LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Share discarded!", __func__, shares->workinfoid, workername, cd->tv_sec, cd->tv_usec, cd_buf); @@ -2463,6 +2464,14 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername goto unitem; if (reloading && !confirm_sharesummary) { + // We only need to know if the workmarker is ready + wm_item = find_workmarkers(shares->workinfoid); + if (wm_item) { + K_WLOCK(shares_free); + k_add_head(shares_free, s_item); + K_WUNLOCK(shares_free); + return true; + } ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); @@ -2506,7 +2515,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *error, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { - K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; + K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item; char cd_buf[DATE_BUFSIZ]; SHARESUMMARY *sharesummary; SHAREERRORS *shareerrors; @@ -2572,6 +2581,14 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, goto unitem; if (reloading && !confirm_sharesummary) { + // We only need to know if the workmarker is ready + wm_item = find_workmarkers(shareerrors->workinfoid); + if (wm_item) { + K_WLOCK(shareerrors_free); + k_add_head(shareerrors_free, s_item); + K_WUNLOCK(shareerrors_free); + return true; + } ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); @@ -2615,8 +2632,9 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE { ExecStatusType rescode; PGresult *res = NULL; + WORKMARKERS *wm; SHARESUMMARY *row; - K_ITEM *item; + K_ITEM *item, *wm_item; char *ins, *upd; bool ok = false, new; char *params[19 + MODIFYDATECOUNT]; @@ -2665,6 +2683,23 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE sharecreatedate = &(e_row->createdate); } + K_RLOCK(workmarkers_free); + wm_item = find_workmarkers(workinfoid); + K_RUNLOCK(workmarkers_free); + if (wm_item) { + char *tmp; + DATA_WORKMARKERS(wm, wm_item); + LOGERR("%s(): attempt to update sharesummary " + "with %s %"PRId64"/%"PRId64"/%s createdate %s" + " but ready workmarkers %"PRId64" exists", + __func__, s_row ? "shares" : "shareerrors", + workinfoid, userid, workername, + (tmp = ctv_to_buf(sharecreatedate, NULL, 0)), + wm->markerid); + free(tmp); + return false; + } + K_RLOCK(sharesummary_free); item = find_sharesummary(userid, workername, workinfoid); K_RUNLOCK(sharesummary_free); @@ -3082,7 +3117,7 @@ bool sharesummary_fill(PGconn *conn) sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root, item, cmp_sharesummary_workinfoid); k_add_head(sharesummary_store, item); - // A share summary is currently only shares in a single workinfo, at all 3 levels n,a,y + // A share summary is shares in a single workinfo, at all 3 levels n,a,y if (tolower(row->complete[0]) == SUMMARY_NEW) { if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 || !tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) { @@ -4933,6 +4968,7 @@ bool markersummary_fill(PGconn *conn) break; markersummary_root = add_to_ktree(markersummary_root, item, cmp_markersummary); + markersummary_userid_root = add_to_ktree(markersummary_userid_root, item, cmp_markersummary_userid); k_add_head(markersummary_store, item); tick(); @@ -5033,6 +5069,8 @@ bool workmarkers_fill(PGconn *conn) break; workmarkers_root = add_to_ktree(workmarkers_root, item, cmp_workmarkers); + workmarkers_workinfoid_root = add_to_ktree(workmarkers_workinfoid_root, + item, cmp_workmarkers_workinfoid); k_add_head(workmarkers_store, item); tick();