diff --git a/src/ckdb.c b/src/ckdb.c index adde4b43..b290ea60 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -640,6 +640,11 @@ K_TREE *optioncontrol_root; K_LIST *optioncontrol_free; K_STORE *optioncontrol_store; +// ESM (Early Share/Shareerror Messages) +K_TREE *esm_root; +K_LIST *esm_free; +K_STORE *esm_store; + // WORKINFO workinfo.id.json={...} K_TREE *workinfo_root; // created during data load then destroyed since not needed later @@ -2044,6 +2049,10 @@ static void alloc_storage() ALLOC_IDCONTROL, LIMIT_IDCONTROL, true); idcontrol_store = k_new_store(idcontrol_free); + esm_free = k_new_list("ESM", sizeof(ESM), ALLOC_ESM, LIMIT_ESM, true); + esm_store = k_new_store(esm_free); + esm_root = new_ktree(NULL, cmp_esm, esm_free); + workinfo_free = k_new_list("WorkInfo", sizeof(WORKINFO), ALLOC_WORKINFO, LIMIT_WORKINFO, true); workinfo_store = k_new_store(workinfo_free); @@ -2256,6 +2265,7 @@ static void alloc_storage() DLPRIO(userstats, 10); // Don't currently nest any locks in these: + DLPRIO(esm, PRIO_TERMINAL); DLPRIO(workers, PRIO_TERMINAL); DLPRIO(idcontrol, PRIO_TERMINAL); DLPRIO(paymentaddresses, PRIO_TERMINAL); @@ -2556,6 +2566,12 @@ static void dealloc_storage() LOGWARNING("%s() etc ...", __func__); + if (esm_store->count > 0) { + LOGWARNING("%s() ***ESM had %d records ...", + __func__, esm_store->count); + } + FREE_ALL(esm); + FREE_LISTS(idcontrol); FREE_ALL(accountbalance); FREE_ALL(payments); diff --git a/src/ckdb.h b/src/ckdb.h index 7a34a3d4..a812a33c 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.441" +#define CKDB_VERSION DB_VERSION"-2.442" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1913,7 +1913,38 @@ typedef struct oc_trigger { void (*func)(OPTIONCONTROL *, const char *); } OC_TRIGGER; -// TODO: discarding workinfo,shares +// ESM (Early Share/Shareerror Messages) +typedef struct esm { + int64_t workinfoid; + int queued; + int procured; + int discarded; + int errqueued; + int errprocured; + int errdiscarded; + tv_t createdate; +} ESM; + +/* This is to reduce the number of console Early messages + * The first queued message is displayed LOG_ERR + * then the rest, for the given workinfoid, are LOG_NOTICE + * A final summary for the workinfoid will be displayed later with LOG_ERR, + * if there were any */ +#define ALLOC_ESM 10 +#define LIMIT_ESM 0 +#define INIT_ESM(_item) INIT_GENERIC(_item, esm) +#define DATA_ESM(_var, _item) DATA_GENERIC(_var, _item, esm, true) +#define DATA_ESM_NULL(_var, _item) DATA_GENERIC(_var, _item, esm, false) + +extern K_TREE *esm_root; +extern K_LIST *esm_free; +extern K_STORE *esm_store; + +/* Age limit, in seconds, before displaying ESM summary messages and + * deleting the associated ESM record */ +#define ESM_LIMIT 60.0 + +// TODO: discarding workinfo // WORKINFO workinfo.id.json={...} typedef struct workinfo { int64_t workinfoid; @@ -3213,6 +3244,10 @@ extern K_ITEM *find_optioncontrol(char *optionname, const tv_t *now, int32_t hei #define sys_setting(_name, _def, _now) user_sys_setting(0, _name, _def, _now) extern int64_t user_sys_setting(int64_t userid, char *setting_name, int64_t setting_default, const tv_t *now); +extern cmp_t cmp_esm(K_ITEM *a, K_ITEM *b); +extern K_ITEM *find_esm(int64_t workinfoid); +extern bool esm_flag(int64_t workinfoid, bool error, bool procured); +extern void esm_check(tv_t *now); extern cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b); #define coinbase1height(_wi) _coinbase1height(_wi, WHERE_FFL_HERE) extern int32_t _coinbase1height(WORKINFO *wi, WHERE_FFL_ARGS); @@ -3220,6 +3255,8 @@ extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b); #define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx); extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx); extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); +extern K_ITEM *find_workinfo_esm(int64_t workinfoid, bool error, bool *created, + tv_t *createdate); extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, int64_t *s_count, int64_t *s_diff); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 88e81cd8..8e24aa0c 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -2126,6 +2126,145 @@ int64_t user_sys_setting(int64_t userid, char *setting_name, return setting_default; } +// order by workinfoid asc +cmp_t cmp_esm(K_ITEM *a, K_ITEM *b) +{ + ESM *ea, *eb; + DATA_ESM(ea, a); + DATA_ESM(eb, b); + return CMP_BIGINT(ea->workinfoid, eb->workinfoid); +} + +// must be locked before calling since data access must also be under lock +K_ITEM *find_esm(int64_t workinfoid) +{ + K_TREE_CTX ctx[1]; + K_ITEM look; + ESM lookesm; + + lookesm.workinfoid = workinfoid; + INIT_ESM(&look); + look.data = (void *)(&lookesm); + return find_in_ktree(esm_root, &look, ctx); +} + +bool esm_flag(int64_t workinfoid, bool error, bool procured) +{ + K_ITEM *esm_item = NULL; + ESM *esm = NULL; + bool failed; + + K_WLOCK(esm_free); + esm_item = find_esm(workinfoid); + if (!esm_item) { + /* This isn't fatal since the message will be logged anyway + * It just means the esm workinfoid summary was early and + * incorrect (which shouldn't happen) */ + failed = true; + } else { + DATA_ESM(esm, esm_item); + if (!error && procured) + esm->procured++; + else if (!error && !procured) + esm->discarded++; + else if (error && procured) + esm->errprocured++; + else if (error && !procured) + esm->errdiscarded++; + failed = false; + } + K_WUNLOCK(esm_free); + + return failed; +} + +// called under workinfo lock +static bool find_create_esm(int64_t workinfoid, bool error, tv_t *createdate) +{ + K_ITEM look, *esm_item; + K_TREE_CTX ctx[1]; + ESM lookesm, *esm; + bool created; + + lookesm.workinfoid = workinfoid; + INIT_ESM(&look); + look.data = (void *)(&lookesm); + K_WLOCK(esm_free); + esm_item = find_in_ktree(esm_root, &look, ctx); + if (!esm_item) { + created = true; + esm_item = k_unlink_head_zero(esm_free); + DATA_ESM(esm, esm_item); + esm->workinfoid = workinfoid; + copy_tv(&(esm->createdate), createdate); + } else { + created = false; + DATA_ESM(esm, esm_item); + } + if (error) + esm->errqueued++; + else + esm->queued++; + K_WUNLOCK(esm_free); + + return created; +} + +void esm_check(tv_t *now) +{ + K_ITEM *esm_item = NULL; + ESM *esm = NULL; + bool had = true; + + while (had) { + K_WLOCK(esm_free); + if (esm_store->count == 0) + had = false; + else { + // items should be rare and few, so just loop thru them + esm_item = STORE_WHEAD(esm_store); + while (esm_item) { + DATA_ESM(esm, esm_item); + if (tvdiff(now, &(esm->createdate)) > ESM_LIMIT) { + k_unlink_item(esm_store, esm_item); + break; + } + esm_item = esm_item->next; + } + } + K_WUNLOCK(esm_free); + if (!esm_item) + had = false; + else { + if (esm->queued || esm->procured || esm->discarded) { + int diff = esm->queued - esm->procured - + esm->discarded; + LOGWARNING("%s() %s%d wid=%"PRId64" early " + "shares=%d procured=%d discarded=%d", + __func__, diff ? "DIFF " : EMPTY, + diff, esm->workinfoid, + esm->queued, esm->procured, + esm->discarded); + } + if (esm->errqueued || esm->errprocured || + esm->errdiscarded) { + int diff = esm->errqueued - esm->errprocured - + esm->errdiscarded; + LOGWARNING("%s() %s%d wid=%"PRId64" early " + "shareerrors=%d procured=%d " + "discarded=%d", + __func__, diff ? "DIFF " : EMPTY, + diff, esm->workinfoid, + esm->errqueued, esm->errprocured, + esm->errdiscarded); + } + K_WLOCK(esm_free); + k_add_head(esm_free, esm_item); + K_WUNLOCK(esm_free); + } + } +} + // order by workinfoid asc,expirydate asc cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b) { @@ -2243,6 +2382,28 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx) return item; } +// create the esm record inside the workinfo lock +K_ITEM *find_workinfo_esm(int64_t workinfoid, bool error, bool *created, tv_t *createdate) +{ + WORKINFO workinfo; + K_TREE_CTX ctx[1]; + K_ITEM look, *wi_item; + + *created = false; + workinfo.workinfoid = workinfoid; + workinfo.expirydate.tv_sec = default_expiry.tv_sec; + workinfo.expirydate.tv_usec = default_expiry.tv_usec; + + INIT_WORKINFO(&look); + look.data = (void *)(&workinfo); + K_RLOCK(workinfo_free); + wi_item = find_in_ktree(workinfo_root, &look, ctx); + if (!wi_item) + *created = find_create_esm(workinfoid, error, createdate); + K_RUNLOCK(workinfo_free); + return wi_item; +} + #define DISCARD_ALL -1 /* No longer required since we already discard the shares after being added * to the sharesummary */ diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index a9580be1..41b43ef7 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3636,7 +3636,7 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd, char *st = NULL; char tmp[1024]; double delta; - bool ok; + bool ok, failed; LOGDEBUG("%s() add", __func__); @@ -3645,7 +3645,7 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd, if (shares_early_store->count == 0) { K_WUNLOCK(shares_free); // None - return; + goto out; } es_item = last_in_ktree(shares_early_root, ctx); if (es_item) { @@ -3696,42 +3696,49 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd, } } } - return; + goto out; redo: K_WLOCK(shares_free); add_to_ktree(shares_early_root, es_item); k_add_tail(shares_early_store, es_item); K_WUNLOCK(shares_free); - return; + goto out; keep: + failed = esm_flag(early_shares->workinfoid, false, true); btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 - " Early share procured", - __func__, early_shares->workinfoid, - st = safe_text_nonull(early_shares->workername), - early_shares->createdate.tv_sec, - early_shares->createdate.tv_usec, cd_buf, - early_shares->oldcount, early_shares->redo); + LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share procured", + __func__, failed ? "***ESM " : EMPTY, + early_shares->workinfoid, + st = safe_text_nonull(early_shares->workername), + early_shares->createdate.tv_sec, + early_shares->createdate.tv_usec, cd_buf, + early_shares->oldcount, early_shares->redo); FREENULL(st); K_WLOCK(shares_free); // Discard it, it's been processed k_add_head(shares_free, es_item); K_WUNLOCK(shares_free); - return; + goto out; discard: + failed = esm_flag(early_shares->workinfoid, false, false); btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 - " Early share discarded!%s", - __func__, early_shares->workinfoid, - st = safe_text_nonull(early_shares->workername), - early_shares->createdate.tv_sec, - early_shares->createdate.tv_usec, cd_buf, - early_shares->oldcount, early_shares->redo, why); + LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share discarded!%s", + __func__, failed ? "***ESM " : EMPTY, + early_shares->workinfoid, + st = safe_text_nonull(early_shares->workername), + early_shares->createdate.tv_sec, + early_shares->createdate.tv_usec, cd_buf, + early_shares->oldcount, early_shares->redo, why); FREENULL(st); K_WLOCK(shares_free); k_add_head(shares_free, es_item); K_WUNLOCK(shares_free); - return; +out: + // accessed outside lock, but esm_check() uses the lock + if (esm_store->count) + esm_check(good_cd); } static void shareerrors_process_early(PGconn *conn, int64_t good_wid, @@ -3750,7 +3757,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername SHARES *shares = NULL, *shares2 = NULL; double sdiff_amt; USERS *users; - bool ok = false, dup = false; + bool ok = false, dup = false, created; char *st = NULL; tv_t share_cd; @@ -3827,10 +3834,12 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername memcpy(shares2, shares, sizeof(*shares2)); } - wi_item = find_workinfo(shares->workinfoid, NULL); + wi_item = find_workinfo_esm(shares->workinfoid, false, &created, + &(shares->createdate)); if (!wi_item) { + int sta = (created ? LOG_ERR : LOG_NOTICE); btv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " + LOGMSG(sta, "%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " "Early share queued!", __func__, shares->workinfoid, st = safe_text_nonull(workername), @@ -4338,7 +4347,7 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid, char *st = NULL; char tmp[1024]; double delta; - bool ok; + bool ok, failed; LOGDEBUG("%s() add", __func__); @@ -4346,7 +4355,7 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid, if (shareerrors_early_store->count == 0) { K_WUNLOCK(shareerrors_free); // None - return; + goto out; } es_item = last_in_ktree(shareerrors_early_root, ctx); if (es_item) { @@ -4399,42 +4408,49 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid, } } } - return; + goto out; redo: K_WLOCK(shareerrors_free); add_to_ktree(shareerrors_early_root, es_item); k_add_tail(shareerrors_early_store, es_item); K_WUNLOCK(shareerrors_free); - return; + goto out; keep: + failed = esm_flag(early_shareerrors->workinfoid, true, true); btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 - " Early share procured", - __func__, early_shareerrors->workinfoid, - st = safe_text_nonull(early_shareerrors->workername), - early_shareerrors->createdate.tv_sec, - early_shareerrors->createdate.tv_usec, cd_buf, - early_shareerrors->oldcount, early_shareerrors->redo); + LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early shareerror procured", + __func__, failed ? "***ESM " : EMPTY, + early_shareerrors->workinfoid, + st = safe_text_nonull(early_shareerrors->workername), + early_shareerrors->createdate.tv_sec, + early_shareerrors->createdate.tv_usec, cd_buf, + early_shareerrors->oldcount, early_shareerrors->redo); FREENULL(st); K_WLOCK(shareerrors_free); add_to_ktree(shareerrors_root, es_item); k_add_head(shareerrors_store, es_item); K_WUNLOCK(shareerrors_free); - return; + goto out; discard: + failed = esm_flag(early_shareerrors->workinfoid, true, false); btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 - " Early share discarded!%s", - __func__, early_shareerrors->workinfoid, - st = safe_text_nonull(early_shareerrors->workername), - early_shareerrors->createdate.tv_sec, - early_shareerrors->createdate.tv_usec, cd_buf, - early_shareerrors->oldcount, early_shareerrors->redo, why); + LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early shareerror discarded!%s", + __func__, failed ? "***ESM " : EMPTY, + early_shareerrors->workinfoid, + st = safe_text_nonull(early_shareerrors->workername), + early_shareerrors->createdate.tv_sec, + early_shareerrors->createdate.tv_usec, cd_buf, + early_shareerrors->oldcount, early_shareerrors->redo, why); FREENULL(st); K_WLOCK(shareerrors_free); k_add_head(shareerrors_free, es_item); K_WUNLOCK(shareerrors_free); - return; +out: + // accessed outside lock, but esm_check() uses the lock + if (esm_store->count) + esm_check(good_cd); } // Memory (and log file) only @@ -4447,7 +4463,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char cd_buf[DATE_BUFSIZ]; SHAREERRORS *shareerrors = NULL; USERS *users; - bool ok = false; + bool ok = false, created; char *st = NULL; LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", @@ -4501,10 +4517,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, HISTORYDATEINIT(shareerrors, cd, by, code, inet); HISTORYDATETRANSFER(trf_root, shareerrors); - wi_item = find_workinfo(shareerrors->workinfoid, NULL); + wi_item = find_workinfo_esm(shareerrors->workinfoid, true, &created, + &(shareerrors->createdate)); if (!wi_item) { + int sta = (created ? LOG_ERR : LOG_NOTICE); btv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " + LOGMSG(sta, "%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " "Early shareerror queued!", __func__, shareerrors->workinfoid, st = safe_text_nonull(workername),