From d8f5ceb0d23ae7115e735138b7169e228a9d1d6b Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 25 Mar 2015 03:18:35 +1100 Subject: [PATCH] ckdb - retain shares with an unknown workinfoid and recheck for up to 60s --- src/ckdb.c | 53 ++++ src/ckdb.h | 44 +++- src/ckdb_cmd.c | 6 +- src/ckdb_data.c | 14 ++ src/ckdb_dbio.c | 629 ++++++++++++++++++++++++++++++++++++++---------- 5 files changed, 604 insertions(+), 142 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 6d12e221..fcc41e71 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -379,11 +379,15 @@ double current_ndiff; K_TREE *shares_root; K_LIST *shares_free; K_STORE *shares_store; +K_TREE *shares_early_root; +K_STORE *shares_early_store; // SHAREERRORS shareerrors.id.json={...} K_TREE *shareerrors_root; K_LIST *shareerrors_free; K_STORE *shareerrors_store; +K_TREE *shareerrors_early_root; +K_STORE *shareerrors_early_store; // SHARESUMMARY K_TREE *sharesummary_root; @@ -995,12 +999,16 @@ static void alloc_storage() shares_free = k_new_list("Shares", sizeof(SHARES), ALLOC_SHARES, LIMIT_SHARES, true); shares_store = k_new_store(shares_free); + shares_early_store = k_new_store(shares_free); shares_root = new_ktree(); + shares_early_root = new_ktree(); shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS), ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true); shareerrors_store = k_new_store(shareerrors_free); + shareerrors_early_store = k_new_store(shareerrors_free); shareerrors_root = new_ktree(); + shareerrors_early_root = new_ktree(); sharesummary_free = k_new_list("ShareSummary", sizeof(SHARESUMMARY), ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true); @@ -1106,6 +1114,11 @@ static void alloc_storage() static void dealloc_storage() { + SHAREERRORS *shareerrors; + SHARES *shares; + K_ITEM *s_item; + char *st = NULL; + LOGWARNING("%s() logqueue ...", __func__); FREE_LISTS(logqueue); @@ -1151,7 +1164,47 @@ static void dealloc_storage() FREE_STORE_DATA(sharesummary); FREE_LIST_DATA(sharesummary); + if (shareerrors_early_store->count > 0) { + LOGERR("%s() *** shareerrors_early count %d ***", + __func__, shareerrors_early_store->count); + s_item = shareerrors_early_store->head; + while (s_item) { + DATA_SHAREERRORS(shareerrors, s_item); + LOGERR("%s(): %"PRId64"/%s/%"PRId32"/%s/%ld,%ld", + __func__, + shareerrors->workinfoid, + st = safe_text(shareerrors->workername), + shareerrors->errn, + shareerrors->error, + shareerrors->createdate.tv_sec, + shareerrors->createdate.tv_usec); + FREENULL(st); + s_item = s_item->next; + } + } + FREE_TREE(shareerrors_early); + FREE_STORE(shareerrors_early); FREE_ALL(shareerrors); + if (shares_early_store->count > 0) { + LOGERR("%s() *** shares_early count %d ***", + __func__, shares_early_store->count); + s_item = shares_early_store->head; + while (s_item) { + DATA_SHARES(shares, s_item); + LOGERR("%s(): %"PRId64"/%s/%s/%"PRId32"/%ld,%ld", + __func__, + shares->workinfoid, + st = safe_text(shares->workername), + shares->nonce, + shares->errn, + shares->createdate.tv_sec, + shares->createdate.tv_usec); + FREENULL(st); + s_item = s_item->next; + } + } + FREE_TREE(shares_early); + FREE_STORE(shares_early); FREE_ALL(shares); LOGWARNING("%s() workinfo ...", __func__); diff --git a/src/ckdb.h b/src/ckdb.h index ed9d615d..a0fdb2a1 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.035" +#define CKDB_VERSION DB_VERSION"-1.040" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -226,6 +226,7 @@ enum data_type { TYPE_BIGINT, TYPE_INT, TYPE_TV, + TYPE_BTV, TYPE_TVS, TYPE_CTV, TYPE_FTV, @@ -721,7 +722,7 @@ typedef struct transfer { // Suggest malloc use MMAP - 1913 = largest under 2MB #define ALLOC_TRANSFER 1913 #define LIMIT_TRANSFER 0 -#define CULL_TRANSFER 16 +#define CULL_TRANSFER 64 #define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer) #define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true) @@ -747,7 +748,7 @@ extern tv_t missing_secuser_max; typedef struct users { int64_t userid; char username[TXT_BIG+1]; - char usertrim[TXT_BIG+1]; // Non DB field + char usertrim[TXT_BIG+1]; // non-DB field // Anything in 'status' fails mining authentication char status[TXT_BIG+1]; char emailaddress[TXT_BIG+1]; @@ -843,7 +844,7 @@ typedef struct paymentaddresses { char payaddress[TXT_BIG+1]; int32_t payratio; HISTORYDATECONTROLFIELDS; - bool match; // Non-db field + bool match; // non-DB field } PAYMENTADDRESSES; #define ALLOC_PAYMENTADDRESSES 1024 @@ -873,7 +874,7 @@ typedef struct payments { char committxn[TXT_BIG+1]; char commitblockhash[TXT_BIG+1]; HISTORYDATECONTROLFIELDS; - K_ITEM *old_item; // Non-db field + K_ITEM *old_item; // non-DB field } PAYMENTS; #define ALLOC_PAYMENTS 1024 @@ -1024,6 +1025,8 @@ typedef struct shares { char error[TXT_SML+1]; char secondaryuserid[TXT_SML+1]; HISTORYDATECONTROLFIELDS; + int32_t redo; // non-DB field + int32_t oldcount; // non-DB field } SHARES; #define ALLOC_SHARES 10000 @@ -1034,6 +1037,13 @@ typedef struct shares { extern K_TREE *shares_root; extern K_LIST *shares_free; extern K_STORE *shares_store; +// shares unexpectedly before the workinfo +extern K_TREE *shares_early_root; +extern K_STORE *shares_early_store; + +/* Once a share is this old, it can only once more be + check for it's workinfoid and then be discarded */ +#define EARLYSHARESLIMIT 60.0 // SHAREERRORS shareerrors.id.json={...} typedef struct shareerrors { @@ -1045,9 +1055,11 @@ typedef struct shareerrors { char error[TXT_SML+1]; char secondaryuserid[TXT_SML+1]; HISTORYDATECONTROLFIELDS; + int32_t redo; // non-DB field + int32_t oldcount; // non-DB field } SHAREERRORS; -#define ALLOC_SHAREERRORS 10000 +#define ALLOC_SHAREERRORS 1000 #define LIMIT_SHAREERRORS 0 #define INIT_SHAREERRORS(_item) INIT_GENERIC(_item, shareerrors) #define DATA_SHAREERRORS(_var, _item) DATA_GENERIC(_var, _item, shareerrors, true) @@ -1055,6 +1067,9 @@ typedef struct shareerrors { extern K_TREE *shareerrors_root; extern K_LIST *shareerrors_free; extern K_STORE *shareerrors_store; +// shareerrors unexpectedly before the workinfo +extern K_TREE *shareerrors_early_root; +extern K_STORE *shareerrors_early_store; // SHARESUMMARY typedef struct sharesummary { @@ -1123,12 +1138,12 @@ typedef struct blocks { int64_t elapsed; char statsconfirmed[TXT_FLAG+1]; HISTORYDATECONTROLFIELDS; - bool ignore; // Non DB field + bool ignore; // non-DB field // Calculated only when = 0 double netdiff; - /* Non DB fields for the web page + /* non-DB fields for the web page * Calculate them once off/recalc them when required */ double blockdiffratio; double blockcdf; @@ -1193,7 +1208,7 @@ typedef struct miningpayouts { double diffacc; int64_t amount; HISTORYDATECONTROLFIELDS; - K_ITEM *old_item; // Non-db field + K_ITEM *old_item; // non-DB field } MININGPAYOUTS; #define ALLOC_MININGPAYOUTS 1000 @@ -1305,11 +1320,11 @@ typedef struct poolstats { double hashrate5m; double hashrate1hr; double hashrate24hr; - bool stored; // Non-db field + bool stored; // non-DB field SIMPLEDATECONTROLFIELDS; } POOLSTATS; -#define ALLOC_POOLSTATS 10000 +#define ALLOC_POOLSTATS 1000 #define LIMIT_POOLSTATS 0 #define INIT_POOLSTATS(_item) INIT_GENERIC(_item, poolstats) #define DATA_POOLSTATS(_var, _item) DATA_GENERIC(_var, _item, poolstats, true) @@ -1336,7 +1351,7 @@ typedef struct userstats { double hashrate5m; double hashrate1hr; double hashrate24hr; - bool idle; // Non-db field + bool idle; // non-DB field char summarylevel[TXT_FLAG+1]; // SUMMARY_NONE in RAM int32_t summarycount; tv_t statsdate; @@ -1348,7 +1363,7 @@ typedef struct userstats { * createdate batch, and thus could move all (complete) records * matching the createdate from userstats_eos_store into the tree */ -#define ALLOC_USERSTATS 10000 +#define ALLOC_USERSTATS 1000 #define LIMIT_USERSTATS 0 #define INIT_USERSTATS(_item) INIT_GENERIC(_item, userstats) #define DATA_USERSTATS(_var, _item) DATA_GENERIC(_var, _item, userstats, true) @@ -1689,6 +1704,7 @@ extern char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, #define ctv_to_buf(_data, _buf, _siz) _ctv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define ftv_to_buf(_data, _buf, _siz) _ftv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define tvs_to_buf(_data, _buf, _siz) _tvs_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define btv_to_buf(_data, _buf, _siz) _btv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) //#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) @@ -1702,6 +1718,8 @@ extern char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_ftv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); // Convert tv to seconds (ignore uS) extern char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); +// Convert tv to (brief) DD HH:MM:SS +extern char *_btv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); /* unused yet extern char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS); */ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 6a0e1cc3..1796bc4c 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -4902,8 +4902,10 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(idcontrol, 1, 0); USEINFO(optioncontrol, 1, 1); USEINFO(workinfo, 1, 1); - USEINFO(shares, 1, 1); - USEINFO(shareerrors, 1, 1); + // Trees don't share items so count as 1 tree + USEINFO(shares, 2, 1); + // Trees don't share items so count as 1 tree + USEINFO(shareerrors, 2, 1); USEINFO(sharesummary, 1, 2); USEINFO(workmarkers, 1, 2); USEINFO(markersummary, 1, 2); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index f1303265..13641b8d 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -417,6 +417,14 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_ tm.tm_sec, (((tv_t *)data)->tv_usec)); break; + case TYPE_BTV: + gmtime_r(&(((tv_t *)data)->tv_sec), &tm); + snprintf(buf, siz, "%02d %02d:%02d:%02d", + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec); + break; case TYPE_CTV: snprintf(buf, siz, "%ld,%ld", (((tv_t *)data)->tv_sec), @@ -476,6 +484,12 @@ char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) return _data_to_buf(TYPE_TVS, (void *)data, buf, siz, WHERE_FFL_PASS); } +// Convert tv to (brief) DD HH:MM:SS +char *_btv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + return _data_to_buf(TYPE_BTV, (void *)data, buf, siz, WHERE_FFL_PASS); +} + /* unused yet char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS) { diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 033112af..02db38a1 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -883,6 +883,7 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname, USERATTS *row; USERS *users; bool ok = false; + char *st = NULL; LOGDEBUG("%s(): add", __func__); @@ -895,11 +896,10 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - char *txt; LOGERR("%s(): unknown user '%s'", __func__, - txt = safe_text(username)); - free(txt); + st = safe_text(username)); + FREENULL(st); goto unitem; } DATA_USERS(users, u_item); @@ -2757,20 +2757,208 @@ bool workinfo_fill(PGconn *conn) return ok; } +static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) +{ + K_ITEM *w_item, *wm_item, *ss_item; + SHARESUMMARY *sharesummary; + char *st = NULL; + + LOGDEBUG("%s() add", __func__); + + w_item = new_default_worker(conn, false, shares->userid, + shares->workername, shares->createby, + shares->createcode, shares->createinet, + &(shares->createdate), trf_root); + if (!w_item) { + LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld", + __func__, shares->userid, + st = safe_text(shares->workername), + shares->createdate.tv_sec, shares->createdate.tv_usec); + FREENULL(st); + return false; + } + + if (reloading && !confirm_sharesummary) { + // We only need to know if the workmarker is processed + wm_item = find_workmarkers(shares->workinfoid, false, + MARKER_PROCESSED); + if (wm_item) { + LOGDEBUG("%s(): workmarker exists for wid %"PRId64 + " %"PRId64"/%s/%ld,%ld", + __func__, shares->workinfoid, shares->userid, + st = safe_text(shares->workername), + shares->createdate.tv_sec, + shares->createdate.tv_usec); + FREENULL(st); + return false; + } + + ss_item = find_sharesummary(shares->userid, shares->workername, + shares->workinfoid); + if (ss_item) { + DATA_SHARESUMMARY(sharesummary, ss_item); + if (sharesummary->complete[0] != SUMMARY_NEW) { + LOGDEBUG("%s(): '%s' sharesummary exists " + "%"PRId64" %"PRId64"/%s/%ld,%ld", + __func__, sharesummary->complete, + shares->workinfoid, shares->userid, + st = safe_text(shares->workername), + shares->createdate.tv_sec, + shares->createdate.tv_usec); + FREENULL(st); + return false; + } + + if (!sharesummary->reset) { + zero_sharesummary(sharesummary, + &(shares->createdate), + shares->diff); + sharesummary->reset = true; + } + } + } + + if (!confirm_sharesummary) + workerstatus_update(NULL, shares, NULL); + + sharesummary_update(conn, shares, NULL, NULL, shares->createby, + shares->createcode, shares->createinet, + &(shares->createdate)); + + return true; +} + +// If it exists and it can be processed, process the oldest early share +static void shares_process_early(PGconn *conn, int64_t good_wid, tv_t *good_cd, + K_TREE *trf_root) +{ + K_TREE_CTX ctx[1]; + K_ITEM *es_item, *wi_item; + SHARES *early_shares; + char cd_buf[DATE_BUFSIZ]; + char *why = EMPTY; + char *st = NULL; + char tmp[1024]; + double delta; + bool ok; + + LOGDEBUG("%s() add", __func__); + + K_WLOCK(shares_free); + if (shares_early_store->count == 0) { + K_WUNLOCK(shares_free); + // None + return; + } + es_item = last_in_ktree(shares_early_root, ctx); + if (es_item) { + shares_early_root = remove_from_ktree(shares_early_root, + es_item, + cmp_shares); + k_unlink_item(shares_early_store, es_item); + } + K_WUNLOCK(shares_free); + if (es_item) { + DATA_SHARES(early_shares, es_item); + /* If the last (oldest) is newer than the + * current workinfo, leave it til later */ + if (early_shares->workinfoid > good_wid) + goto redo; + + /* If it matches the 'ok' share we just processed, + * we don't need to check the workinfoid */ + if (early_shares->workinfoid == good_wid) { + ok = shares_process(conn, early_shares, trf_root); + if (ok) + goto keep; + else + goto discard; + } else { + wi_item = find_workinfo(early_shares->workinfoid, NULL); + if (!wi_item) { + // good_cd is 'now' + delta = tvdiff(good_cd, + &(early_shares->createdate)); + if (early_shares->oldcount > 0) { + snprintf(tmp, sizeof(tmp), + " too old (%.1fs/%"PRId32")", + delta, + early_shares->oldcount); + why = tmp; + goto discard; + } + if (delta > EARLYSHARESLIMIT) + early_shares->oldcount++; + early_shares->redo++; + goto redo; + } else { + ok = shares_process(conn, early_shares, trf_root); + if (ok) + goto keep; + else + goto discard; + } + } + } + return; +redo: + K_WLOCK(shares_free); + shares_early_root = add_to_ktree(shares_early_root, es_item, cmp_shares); + k_add_tail(shares_early_store, es_item); + K_WUNLOCK(shares_free); + return; +keep: + btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share procured", + __func__, early_shares->workinfoid, + st = safe_text(early_shares->workername), + early_shares->createdate.tv_sec, + early_shares->createdate.tv_usec, cd_buf, + early_shares->oldcount, early_shares->redo); + FREENULL(st); + K_WLOCK(shares_free); + shares_root = add_to_ktree(shares_root, es_item, cmp_shares); + k_add_head(shares_store, es_item); + K_WUNLOCK(shares_free); + return; +discard: + btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share discarded!%s", + __func__, early_shares->workinfoid, + st = safe_text(early_shares->workername), + early_shares->createdate.tv_sec, + early_shares->createdate.tv_usec, cd_buf, + early_shares->oldcount, early_shares->redo, why); + FREENULL(st); + K_WLOCK(shares_free); + k_add_head(shares_free, es_item); + K_WUNLOCK(shares_free); + return; +} + +static void shareerrors_process_early(PGconn *conn, int64_t good_wid, + tv_t *good_cd, K_TREE *trf_root); + // Memory (and log file) only bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { - K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item; + K_ITEM *s_item = NULL, *u_item, *wi_item; char cd_buf[DATE_BUFSIZ]; - SHARESUMMARY *sharesummary; - SHARES *shares; + SHARES *shares = NULL; USERS *users; bool ok = false; + char *st = NULL; - LOGDEBUG("%s(): add", __func__); + LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", + __func__, + workinfoid, st = safe_text(workername), nonce, + errn, cd->tv_sec, cd->tv_usec); + FREENULL(st); K_WLOCK(shares_free); s_item = k_unlink_head(shares_free); @@ -2782,13 +2970,15 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - char *txt; - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %s/%ld,%ld %.19s no user! Share discarded!", - __func__, txt = safe_text(username), + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + /* This should never happen unless there's a bug in ckpool + or the authentication information got to ckdb after + the shares ... which shouldn't ever happen */ + LOGERR("%s() %s/%ld,%ld %s no user! Share discarded!", + __func__, st = safe_text(username), cd->tv_sec, cd->tv_usec, cd_buf); - free(txt); - goto unitem; + FREENULL(st); + goto tisbad; } DATA_USERS(users, u_item); @@ -2809,11 +2999,12 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername STRNCPY(shares->secondaryuserid, users->secondaryuserid); if (!tv_newer(&missing_secuser_min, cd) || !tv_newer(cd, &missing_secuser_max)) { - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %s/%ld,%ld %.19s missing secondaryuserid! " + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! " "Share corrected", - __func__, username, + __func__, st = safe_text(username), cd->tv_sec, cd->tv_usec, cd_buf); + FREENULL(st); } } @@ -2822,80 +3013,259 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername wi_item = find_workinfo(shares->workinfoid, NULL); if (!wi_item) { - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - // TODO: store it for a few workinfoid changes - LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Share discarded!", - __func__, shares->workinfoid, workername, + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " + "Early share queued!", + __func__, shares->workinfoid, + st = safe_text(workername), cd->tv_sec, cd->tv_usec, cd_buf); - goto unitem; + FREENULL(st); + shares->redo = 0; + shares->oldcount = 0; + K_WLOCK(shares_free); + // They need to be sorted by workinfoid + shares_early_root = add_to_ktree(shares_early_root, s_item, + cmp_shares); + k_add_head(shares_early_store, s_item); + K_WUNLOCK(shares_free); + /* It was all OK except the missing workinfoid + * and it was queued, so most likely OK */ + return true; } - w_item = new_default_worker(conn, false, shares->userid, shares->workername, - by, code, inet, cd, trf_root); - if (!w_item) - goto unitem; + ok = shares_process(conn, shares, trf_root); + if (ok) { + K_WLOCK(shares_free); + shares_root = add_to_ktree(shares_root, s_item, cmp_shares); + k_add_head(shares_store, s_item); + K_WUNLOCK(shares_free); + + shares_process_early(conn, shares->workinfoid, + &(shares->createdate), trf_root); + // Call both since shareerrors may be rare + shareerrors_process_early(conn, shares->workinfoid, + &(shares->createdate), trf_root); + + // The original share was ok + return true; + } + +tisbad: + K_WLOCK(shares_free); + k_add_head(shares_free, s_item); + K_WUNLOCK(shares_free); + return false; +} + +static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, + K_TREE *trf_root) +{ + K_ITEM *w_item, *wm_item, *ss_item; + SHARESUMMARY *sharesummary; + char *st = NULL; + + LOGDEBUG("%s() add", __func__); + + w_item = new_default_worker(conn, false, shareerrors->userid, + shareerrors->workername, + shareerrors->createby, + shareerrors->createcode, + shareerrors->createinet, + &(shareerrors->createdate), trf_root); + if (!w_item) { + LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld", + __func__, shareerrors->userid, + st = safe_text(shareerrors->workername), + shareerrors->createdate.tv_sec, + shareerrors->createdate.tv_usec); + FREENULL(st); + return false; + } if (reloading && !confirm_sharesummary) { // We only need to know if the workmarker is processed - wm_item = find_workmarkers(shares->workinfoid, false, + wm_item = find_workmarkers(shareerrors->workinfoid, false, MARKER_PROCESSED); if (wm_item) { - K_WLOCK(shares_free); - k_add_head(shares_free, s_item); - K_WUNLOCK(shares_free); - return true; + LOGDEBUG("%s(): workmarker exists for wid %"PRId64 + " %"PRId64"/%s/%ld,%ld", + __func__, shareerrors->workinfoid, + shareerrors->userid, + st = safe_text(shareerrors->workername), + shareerrors->createdate.tv_sec, + shareerrors->createdate.tv_usec); + FREENULL(st); + return false; } - ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid); + + ss_item = find_sharesummary(shareerrors->userid, + shareerrors->workername, + shareerrors->workinfoid); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); if (sharesummary->complete[0] != SUMMARY_NEW) { - K_WLOCK(shares_free); - k_add_head(shares_free, s_item); - K_WUNLOCK(shares_free); - return true; + LOGDEBUG("%s(): '%s' sharesummary exists " + "%"PRId64" %"PRId64"/%s/%ld,%ld", + __func__, sharesummary->complete, + shareerrors->workinfoid, + shareerrors->userid, + st = safe_text(shareerrors->workername), + shareerrors->createdate.tv_sec, + shareerrors->createdate.tv_usec); + FREENULL(st); + return false; } if (!sharesummary->reset) { - zero_sharesummary(sharesummary, cd, shares->diff); + zero_sharesummary(sharesummary, + &(shareerrors->createdate), + 0.0); sharesummary->reset = true; } } } - if (!confirm_sharesummary) - workerstatus_update(NULL, shares, NULL); + sharesummary_update(conn, NULL, shareerrors, NULL, + shareerrors->createby, + shareerrors->createcode, + shareerrors->createinet, + &(shareerrors->createdate)); - sharesummary_update(conn, shares, NULL, NULL, by, code, inet, cd); + return true; +} - ok = true; -unitem: - K_WLOCK(shares_free); - if (!ok) - k_add_head(shares_free, s_item); - else { - shares_root = add_to_ktree(shares_root, s_item, cmp_shares); - k_add_head(shares_store, s_item); - } - K_WUNLOCK(shares_free); +// If it exists and it can be processed, process the oldest early shareerror +static void shareerrors_process_early(PGconn *conn, int64_t good_wid, + tv_t *good_cd, K_TREE *trf_root) +{ + K_TREE_CTX ctx[1]; + K_ITEM *es_item, *wi_item; + SHAREERRORS *early_shareerrors; + char cd_buf[DATE_BUFSIZ]; + char *why = EMPTY; + char *st = NULL; + char tmp[1024]; + double delta; + bool ok; - return ok; + LOGDEBUG("%s() add", __func__); + + K_WLOCK(shareerrors_free); + if (shareerrors_early_store->count == 0) { + K_WUNLOCK(shareerrors_free); + // None + return; + } + es_item = last_in_ktree(shareerrors_early_root, ctx); + if (es_item) { + shareerrors_early_root = remove_from_ktree(shareerrors_early_root, + es_item, + cmp_shareerrors); + k_unlink_item(shareerrors_early_store, es_item); + } + K_WUNLOCK(shareerrors_free); + if (es_item) { + DATA_SHAREERRORS(early_shareerrors, es_item); + /* If the last (oldest) is newer than the + * current workinfo, leave it til later */ + if (early_shareerrors->workinfoid > good_wid) + goto redo; + + /* If it matches the 'ok' share/shareerror we just processed, + * we don't need to check the workinfoid */ + if (early_shareerrors->workinfoid == good_wid) { + ok = shareerrors_process(conn, early_shareerrors, + trf_root); + if (ok) + goto keep; + else + goto discard; + } else { + wi_item = find_workinfo(early_shareerrors->workinfoid, NULL); + if (!wi_item) { + // good_cd is 'now' + delta = tvdiff(good_cd, + &(early_shareerrors->createdate)); + if (early_shareerrors->oldcount > 0) { + snprintf(tmp, sizeof(tmp), + " too old (%.1fs/%"PRId32")", + delta, + early_shareerrors->oldcount); + why = tmp; + goto discard; + } + if (delta > EARLYSHARESLIMIT) + early_shareerrors->oldcount++; + early_shareerrors->redo++; + goto redo; + } else { + ok = shareerrors_process(conn, + early_shareerrors, + trf_root); + if (ok) + goto keep; + else + goto discard; + } + } + } + return; +redo: + K_WLOCK(shareerrors_free); + shareerrors_early_root = add_to_ktree(shareerrors_early_root, es_item, + cmp_shareerrors); + k_add_tail(shareerrors_early_store, es_item); + K_WUNLOCK(shareerrors_free); + return; +keep: + btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share procured", + __func__, early_shareerrors->workinfoid, + st = safe_text(early_shareerrors->workername), + early_shareerrors->createdate.tv_sec, + early_shareerrors->createdate.tv_usec, cd_buf, + early_shareerrors->oldcount, early_shareerrors->redo); + FREENULL(st); + K_WLOCK(shareerrors_free); + shareerrors_root = add_to_ktree(shareerrors_root, es_item, cmp_shareerrors); + k_add_head(shareerrors_store, es_item); + K_WUNLOCK(shareerrors_free); + return; +discard: + btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 + " Early share discarded!%s", + __func__, early_shareerrors->workinfoid, + st = safe_text(early_shareerrors->workername), + early_shareerrors->createdate.tv_sec, + early_shareerrors->createdate.tv_usec, cd_buf, + early_shareerrors->oldcount, early_shareerrors->redo, why); + FREENULL(st); + K_WLOCK(shareerrors_free); + k_add_head(shareerrors_free, es_item); + K_WUNLOCK(shareerrors_free); + return; } // Memory (and log file) only -// TODO: handle shareerrors that appear after a workinfoid is aged or doesn't exist? bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *error, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) { - K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item; + K_ITEM *s_item = NULL, *u_item, *wi_item; char cd_buf[DATE_BUFSIZ]; - SHARESUMMARY *sharesummary; - SHAREERRORS *shareerrors; + SHAREERRORS *shareerrors = NULL; USERS *users; bool ok = false; + char *st = NULL; - LOGDEBUG("%s(): add", __func__); + LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", + __func__, + workinfoid, st = safe_text(workername), errn, + error, cd->tv_sec, cd->tv_usec); + FREENULL(st); K_WLOCK(shareerrors_free); s_item = k_unlink_head(shareerrors_free); @@ -2907,13 +3277,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - char *txt; - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %s/%ld,%ld %.19s no user! Shareerror discarded!", - __func__, txt = safe_text(username), + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %s/%ld,%ld %s no user! Shareerror discarded!", + __func__, st = safe_text(username), cd->tv_sec, cd->tv_usec, cd_buf); - free(txt); - goto unitem; + FREENULL(st); + goto tisbad; } DATA_USERS(users, u_item); @@ -2930,11 +3299,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, STRNCPY(shareerrors->secondaryuserid, users->secondaryuserid); if (!tv_newer(&missing_secuser_min, cd) || !tv_newer(cd, &missing_secuser_max)) { - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %s/%ld,%ld %.19s missing secondaryuserid! " + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! " "Sharerror corrected", - __func__, username, + __func__, st = safe_text(username), cd->tv_sec, cd->tv_usec, cd_buf); + FREENULL(st); } } @@ -2943,59 +3313,51 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, wi_item = find_workinfo(shareerrors->workinfoid, NULL); if (!wi_item) { - tv_to_buf(cd, cd_buf, sizeof(cd_buf)); - LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Shareerror discarded!", - __func__, shareerrors->workinfoid, workername, + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " + "Early shareerror queued!", + __func__, shareerrors->workinfoid, + st = safe_text(workername), cd->tv_sec, cd->tv_usec, cd_buf); - goto unitem; + FREENULL(st); + shareerrors->redo = 0; + shareerrors->oldcount = 0; + K_WLOCK(shareerrors_free); + // They need to be sorted by workinfoid + shareerrors_early_root = add_to_ktree(shareerrors_early_root, + s_item, + cmp_shareerrors); + k_add_head(shareerrors_early_store, s_item); + K_WUNLOCK(shareerrors_free); + /* It was all OK except the missing workinfoid + * and it was queued, so most likely OK */ + return true; } - w_item = new_default_worker(NULL, false, shareerrors->userid, shareerrors->workername, - by, code, inet, cd, trf_root); - if (!w_item) - goto unitem; + ok = shareerrors_process(conn, shareerrors, trf_root); + if (ok) { + K_WLOCK(shareerrors_free); + shareerrors_root = add_to_ktree(shareerrors_root, s_item, + cmp_shareerrors); + k_add_head(shareerrors_store, s_item); + K_WUNLOCK(shareerrors_free); - if (reloading && !confirm_sharesummary) { - // We only need to know if the workmarker is processed - wm_item = find_workmarkers(shareerrors->workinfoid, false, - MARKER_PROCESSED); - if (wm_item) { - K_WLOCK(shareerrors_free); - k_add_head(shareerrors_free, s_item); - K_WUNLOCK(shareerrors_free); - return true; - } - ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid); - if (ss_item) { - DATA_SHARESUMMARY(sharesummary, ss_item); - if (sharesummary->complete[0] != SUMMARY_NEW) { - K_WLOCK(shareerrors_free); - k_add_head(shareerrors_free, s_item); - K_WUNLOCK(shareerrors_free); - return true; - } + shareerrors_process_early(conn, shareerrors->workinfoid, + &(shareerrors->createdate), + trf_root); + // Call both in case we are only getting errors on bad work + shares_process_early(conn, shareerrors->workinfoid, + &(shareerrors->createdate), trf_root); - if (!sharesummary->reset) { - zero_sharesummary(sharesummary, cd, 0.0); - sharesummary->reset = true; - } - } + // The original share was ok + return true; } - sharesummary_update(conn, NULL, shareerrors, NULL, by, code, inet, cd); - - ok = true; -unitem: +tisbad: K_WLOCK(shareerrors_free); - if (!ok) - k_add_head(shareerrors_free, s_item); - else { - shareerrors_root = add_to_ktree(shareerrors_root, s_item, cmp_shareerrors); - k_add_head(shareerrors_store, s_item); - } + k_add_head(shareerrors_free, s_item); K_WUNLOCK(shareerrors_free); - - return ok; + return false; } bool shareerrors_fill() @@ -3030,6 +3392,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, char *params[2]; int n, par = 0, deleted = -7; int ss_count, ms_count; + char *st = NULL; char *del; LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/" @@ -3120,7 +3483,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, LOGDEBUG("%s() new ms %"PRId64"/%"PRId64"/%s", shortname, markersummary->markerid, markersummary->userid, - markersummary->workername); + st = safe_text(markersummary->workername)); + FREENULL(st); } else { DATA_MARKERSUMMARY(markersummary, ms_item); } @@ -3335,6 +3699,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE tv_t *sharecreatedate; bool must_update = false, conned = false; double diff = 0; + char *st = NULL, *db = NULL; LOGDEBUG("%s(): update", __func__); @@ -3378,16 +3743,16 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED); K_RUNLOCK(workmarkers_free); if (wm_item) { - char *tmp; DATA_WORKMARKERS(wm, wm_item); LOGERR("%s(): attempt to update sharesummary " "with %s %"PRId64"/%"PRId64"/%s createdate %s" " but processed workmarkers %"PRId64" exists", __func__, s_row ? "shares" : "shareerrors", - workinfoid, userid, workername, - (tmp = ctv_to_buf(sharecreatedate, NULL, 0)), + workinfoid, userid, st = safe_text(workername), + db = ctv_to_buf(sharecreatedate, NULL, 0), wm->markerid); - free(tmp); + FREENULL(st); + FREENULL(db); return false; } @@ -3496,9 +3861,12 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE free(tmp1); } if (row->complete[0] != SUMMARY_NEW) { - LOGDEBUG("%s(): updating sharesummary not '%c' %"PRId64"/%s/%"PRId64"/%s", - __func__, SUMMARY_NEW, row->userid, row->workername, + LOGDEBUG("%s(): updating sharesummary not '%c'" + " %"PRId64"/%s/%"PRId64"/%s", + __func__, SUMMARY_NEW, row->userid, + st = safe_text(row->workername), row->workinfoid, row->complete); + FREENULL(st); } } } @@ -4081,6 +4449,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash, bool ok = false, update_old = false; int n, par = 0; char want = '?'; + char *st = NULL; LOGDEBUG("%s(): add", __func__); @@ -4404,8 +4773,9 @@ flail: snprintf(tmp, sizeof(tmp), " Reward: %f, Worker: %s, ShareEst: %.1f %s%s%% UTC:%s", BTC_TO_D(row->reward), - row->workername, + st = safe_text(row->workername), pool.diffacc, est, pct, cd_buf); + FREENULL(st); if (pool.workinfoid < row->workinfoid) { pool.workinfoid = row->workinfoid; pool.height = row->height; @@ -5104,6 +5474,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, char cd_buf[DATE_BUFSIZ]; AUTHS *row; bool ok = false; + char *st = NULL; LOGDEBUG("%s(): add", __func__); @@ -5121,11 +5492,10 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, u_item = users_add(conn, username, EMPTY, EMPTY, by, code, inet, cd, trf_root); } else { - char *txt; LOGDEBUG("%s(): unknown user '%s'", __func__, - txt = safe_text(username)); - free(txt); + st = safe_text(username)); + FREENULL(st); } if (!u_item) goto unitem; @@ -5165,7 +5535,9 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, // Shouldn't actually be possible unless twice in the logs tv_to_buf(cd, cd_buf, sizeof(cd_buf)); LOGERR("%s(): Duplicate auths ignored %s/%s/%s", - __func__, poolinstance, workername, cd_buf); + __func__, poolinstance, st = safe_text(workername), + cd_buf); + FREENULL(st); /* Let them mine, that's what matters :) * though this would normally only be during a reload */ @@ -5471,6 +5843,7 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username, USERSTATS *row, *match, *next; USERS *users; K_TREE_CTX ctx[1]; + char *st = NULL; LOGDEBUG("%s(): add", __func__); @@ -5486,11 +5859,10 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - char *txt; LOGERR("%s(): unknown user '%s'", __func__, - txt = safe_text(username)); - free(txt); + st = safe_text(username)); + FREENULL(st); return false; } DATA_USERS(users, u_item); @@ -5595,13 +5967,13 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - char *usr, *wrk; + char *usr = NULL, *wrk = NULL; LOGERR("%s(): unknown user '%s' (worker=%s)", __func__, usr = safe_text(username), wrk = safe_text(workername)); - free(usr); - free(wrk); + FREENULL(usr); + FREENULL(wrk); return false; } DATA_USERS(users, u_item); @@ -5654,6 +6026,7 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, int n, par = 0; char *ins; bool ok = false; + char *st = NULL; LOGDEBUG("%s(): add", __func__); @@ -5691,8 +6064,10 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, MODIFYDATECONTROL ") values (" PQPARAM26 ")"; LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f", - __func__, row->markerid, row->userid, row->workername, + __func__, row->markerid, row->userid, + st = safe_text(row->workername), row->diffacc); + FREENULL(st); if (!conn) { conn = dbconnect();