Browse Source

ckdb - retain shares with an unknown workinfoid and recheck for up to 60s

master
kanoi 10 years ago
parent
commit
d8f5ceb0d2
  1. 53
      src/ckdb.c
  2. 44
      src/ckdb.h
  3. 6
      src/ckdb_cmd.c
  4. 14
      src/ckdb_data.c
  5. 629
      src/ckdb_dbio.c

53
src/ckdb.c

@ -379,11 +379,15 @@ double current_ndiff;
K_TREE *shares_root;
K_LIST *shares_free;
K_STORE *shares_store;
K_TREE *shares_early_root;
K_STORE *shares_early_store;
// SHAREERRORS shareerrors.id.json={...}
K_TREE *shareerrors_root;
K_LIST *shareerrors_free;
K_STORE *shareerrors_store;
K_TREE *shareerrors_early_root;
K_STORE *shareerrors_early_store;
// SHARESUMMARY
K_TREE *sharesummary_root;
@ -995,12 +999,16 @@ static void alloc_storage()
shares_free = k_new_list("Shares", sizeof(SHARES),
ALLOC_SHARES, LIMIT_SHARES, true);
shares_store = k_new_store(shares_free);
shares_early_store = k_new_store(shares_free);
shares_root = new_ktree();
shares_early_root = new_ktree();
shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS),
ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, true);
shareerrors_store = k_new_store(shareerrors_free);
shareerrors_early_store = k_new_store(shareerrors_free);
shareerrors_root = new_ktree();
shareerrors_early_root = new_ktree();
sharesummary_free = k_new_list("ShareSummary", sizeof(SHARESUMMARY),
ALLOC_SHARESUMMARY, LIMIT_SHARESUMMARY, true);
@ -1106,6 +1114,11 @@ static void alloc_storage()
static void dealloc_storage()
{
SHAREERRORS *shareerrors;
SHARES *shares;
K_ITEM *s_item;
char *st = NULL;
LOGWARNING("%s() logqueue ...", __func__);
FREE_LISTS(logqueue);
@ -1151,7 +1164,47 @@ static void dealloc_storage()
FREE_STORE_DATA(sharesummary);
FREE_LIST_DATA(sharesummary);
if (shareerrors_early_store->count > 0) {
LOGERR("%s() *** shareerrors_early count %d ***",
__func__, shareerrors_early_store->count);
s_item = shareerrors_early_store->head;
while (s_item) {
DATA_SHAREERRORS(shareerrors, s_item);
LOGERR("%s(): %"PRId64"/%s/%"PRId32"/%s/%ld,%ld",
__func__,
shareerrors->workinfoid,
st = safe_text(shareerrors->workername),
shareerrors->errn,
shareerrors->error,
shareerrors->createdate.tv_sec,
shareerrors->createdate.tv_usec);
FREENULL(st);
s_item = s_item->next;
}
}
FREE_TREE(shareerrors_early);
FREE_STORE(shareerrors_early);
FREE_ALL(shareerrors);
if (shares_early_store->count > 0) {
LOGERR("%s() *** shares_early count %d ***",
__func__, shares_early_store->count);
s_item = shares_early_store->head;
while (s_item) {
DATA_SHARES(shares, s_item);
LOGERR("%s(): %"PRId64"/%s/%s/%"PRId32"/%ld,%ld",
__func__,
shares->workinfoid,
st = safe_text(shares->workername),
shares->nonce,
shares->errn,
shares->createdate.tv_sec,
shares->createdate.tv_usec);
FREENULL(st);
s_item = s_item->next;
}
}
FREE_TREE(shares_early);
FREE_STORE(shares_early);
FREE_ALL(shares);
LOGWARNING("%s() workinfo ...", __func__);

44
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.035"
#define CKDB_VERSION DB_VERSION"-1.040"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -226,6 +226,7 @@ enum data_type {
TYPE_BIGINT,
TYPE_INT,
TYPE_TV,
TYPE_BTV,
TYPE_TVS,
TYPE_CTV,
TYPE_FTV,
@ -721,7 +722,7 @@ typedef struct transfer {
// Suggest malloc use MMAP - 1913 = largest under 2MB
#define ALLOC_TRANSFER 1913
#define LIMIT_TRANSFER 0
#define CULL_TRANSFER 16
#define CULL_TRANSFER 64
#define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer)
#define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true)
@ -747,7 +748,7 @@ extern tv_t missing_secuser_max;
typedef struct users {
int64_t userid;
char username[TXT_BIG+1];
char usertrim[TXT_BIG+1]; // Non DB field
char usertrim[TXT_BIG+1]; // non-DB field
// Anything in 'status' fails mining authentication
char status[TXT_BIG+1];
char emailaddress[TXT_BIG+1];
@ -843,7 +844,7 @@ typedef struct paymentaddresses {
char payaddress[TXT_BIG+1];
int32_t payratio;
HISTORYDATECONTROLFIELDS;
bool match; // Non-db field
bool match; // non-DB field
} PAYMENTADDRESSES;
#define ALLOC_PAYMENTADDRESSES 1024
@ -873,7 +874,7 @@ typedef struct payments {
char committxn[TXT_BIG+1];
char commitblockhash[TXT_BIG+1];
HISTORYDATECONTROLFIELDS;
K_ITEM *old_item; // Non-db field
K_ITEM *old_item; // non-DB field
} PAYMENTS;
#define ALLOC_PAYMENTS 1024
@ -1024,6 +1025,8 @@ typedef struct shares {
char error[TXT_SML+1];
char secondaryuserid[TXT_SML+1];
HISTORYDATECONTROLFIELDS;
int32_t redo; // non-DB field
int32_t oldcount; // non-DB field
} SHARES;
#define ALLOC_SHARES 10000
@ -1034,6 +1037,13 @@ typedef struct shares {
extern K_TREE *shares_root;
extern K_LIST *shares_free;
extern K_STORE *shares_store;
// shares unexpectedly before the workinfo
extern K_TREE *shares_early_root;
extern K_STORE *shares_early_store;
/* Once a share is this old, it can only once more be
check for it's workinfoid and then be discarded */
#define EARLYSHARESLIMIT 60.0
// SHAREERRORS shareerrors.id.json={...}
typedef struct shareerrors {
@ -1045,9 +1055,11 @@ typedef struct shareerrors {
char error[TXT_SML+1];
char secondaryuserid[TXT_SML+1];
HISTORYDATECONTROLFIELDS;
int32_t redo; // non-DB field
int32_t oldcount; // non-DB field
} SHAREERRORS;
#define ALLOC_SHAREERRORS 10000
#define ALLOC_SHAREERRORS 1000
#define LIMIT_SHAREERRORS 0
#define INIT_SHAREERRORS(_item) INIT_GENERIC(_item, shareerrors)
#define DATA_SHAREERRORS(_var, _item) DATA_GENERIC(_var, _item, shareerrors, true)
@ -1055,6 +1067,9 @@ typedef struct shareerrors {
extern K_TREE *shareerrors_root;
extern K_LIST *shareerrors_free;
extern K_STORE *shareerrors_store;
// shareerrors unexpectedly before the workinfo
extern K_TREE *shareerrors_early_root;
extern K_STORE *shareerrors_early_store;
// SHARESUMMARY
typedef struct sharesummary {
@ -1123,12 +1138,12 @@ typedef struct blocks {
int64_t elapsed;
char statsconfirmed[TXT_FLAG+1];
HISTORYDATECONTROLFIELDS;
bool ignore; // Non DB field
bool ignore; // non-DB field
// Calculated only when = 0
double netdiff;
/* Non DB fields for the web page
/* non-DB fields for the web page
* Calculate them once off/recalc them when required */
double blockdiffratio;
double blockcdf;
@ -1193,7 +1208,7 @@ typedef struct miningpayouts {
double diffacc;
int64_t amount;
HISTORYDATECONTROLFIELDS;
K_ITEM *old_item; // Non-db field
K_ITEM *old_item; // non-DB field
} MININGPAYOUTS;
#define ALLOC_MININGPAYOUTS 1000
@ -1305,11 +1320,11 @@ typedef struct poolstats {
double hashrate5m;
double hashrate1hr;
double hashrate24hr;
bool stored; // Non-db field
bool stored; // non-DB field
SIMPLEDATECONTROLFIELDS;
} POOLSTATS;
#define ALLOC_POOLSTATS 10000
#define ALLOC_POOLSTATS 1000
#define LIMIT_POOLSTATS 0
#define INIT_POOLSTATS(_item) INIT_GENERIC(_item, poolstats)
#define DATA_POOLSTATS(_var, _item) DATA_GENERIC(_var, _item, poolstats, true)
@ -1336,7 +1351,7 @@ typedef struct userstats {
double hashrate5m;
double hashrate1hr;
double hashrate24hr;
bool idle; // Non-db field
bool idle; // non-DB field
char summarylevel[TXT_FLAG+1]; // SUMMARY_NONE in RAM
int32_t summarycount;
tv_t statsdate;
@ -1348,7 +1363,7 @@ typedef struct userstats {
* createdate batch, and thus could move all (complete) records
* matching the createdate from userstats_eos_store into the tree */
#define ALLOC_USERSTATS 10000
#define ALLOC_USERSTATS 1000
#define LIMIT_USERSTATS 0
#define INIT_USERSTATS(_item) INIT_GENERIC(_item, userstats)
#define DATA_USERSTATS(_var, _item) DATA_GENERIC(_var, _item, userstats, true)
@ -1689,6 +1704,7 @@ extern char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz,
#define ctv_to_buf(_data, _buf, _siz) _ctv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define ftv_to_buf(_data, _buf, _siz) _ftv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define tvs_to_buf(_data, _buf, _siz) _tvs_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define btv_to_buf(_data, _buf, _siz) _btv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
//#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
@ -1702,6 +1718,8 @@ extern char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
extern char *_ftv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
// Convert tv to seconds (ignore uS)
extern char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
// Convert tv to (brief) DD HH:MM:SS
extern char *_btv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
/* unused yet
extern char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS);
*/

6
src/ckdb_cmd.c

@ -4902,8 +4902,10 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(idcontrol, 1, 0);
USEINFO(optioncontrol, 1, 1);
USEINFO(workinfo, 1, 1);
USEINFO(shares, 1, 1);
USEINFO(shareerrors, 1, 1);
// Trees don't share items so count as 1 tree
USEINFO(shares, 2, 1);
// Trees don't share items so count as 1 tree
USEINFO(shareerrors, 2, 1);
USEINFO(sharesummary, 1, 2);
USEINFO(workmarkers, 1, 2);
USEINFO(markersummary, 1, 2);

14
src/ckdb_data.c

@ -417,6 +417,14 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_
tm.tm_sec,
(((tv_t *)data)->tv_usec));
break;
case TYPE_BTV:
gmtime_r(&(((tv_t *)data)->tv_sec), &tm);
snprintf(buf, siz, "%02d %02d:%02d:%02d",
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
break;
case TYPE_CTV:
snprintf(buf, siz, "%ld,%ld",
(((tv_t *)data)->tv_sec),
@ -476,6 +484,12 @@ char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
return _data_to_buf(TYPE_TVS, (void *)data, buf, siz, WHERE_FFL_PASS);
}
// Convert tv to (brief) DD HH:MM:SS
char *_btv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_BTV, (void *)data, buf, siz, WHERE_FFL_PASS);
}
/* unused yet
char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{

629
src/ckdb_dbio.c

@ -883,6 +883,7 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname,
USERATTS *row;
USERS *users;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
@ -895,11 +896,10 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *txt;
LOGERR("%s(): unknown user '%s'",
__func__,
txt = safe_text(username));
free(txt);
st = safe_text(username));
FREENULL(st);
goto unitem;
}
DATA_USERS(users, u_item);
@ -2757,20 +2757,208 @@ bool workinfo_fill(PGconn *conn)
return ok;
}
static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root)
{
K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary;
char *st = NULL;
LOGDEBUG("%s() add", __func__);
w_item = new_default_worker(conn, false, shares->userid,
shares->workername, shares->createby,
shares->createcode, shares->createinet,
&(shares->createdate), trf_root);
if (!w_item) {
LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld",
__func__, shares->userid,
st = safe_text(shares->workername),
shares->createdate.tv_sec, shares->createdate.tv_usec);
FREENULL(st);
return false;
}
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is processed
wm_item = find_workmarkers(shares->workinfoid, false,
MARKER_PROCESSED);
if (wm_item) {
LOGDEBUG("%s(): workmarker exists for wid %"PRId64
" %"PRId64"/%s/%ld,%ld",
__func__, shares->workinfoid, shares->userid,
st = safe_text(shares->workername),
shares->createdate.tv_sec,
shares->createdate.tv_usec);
FREENULL(st);
return false;
}
ss_item = find_sharesummary(shares->userid, shares->workername,
shares->workinfoid);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) {
LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete,
shares->workinfoid, shares->userid,
st = safe_text(shares->workername),
shares->createdate.tv_sec,
shares->createdate.tv_usec);
FREENULL(st);
return false;
}
if (!sharesummary->reset) {
zero_sharesummary(sharesummary,
&(shares->createdate),
shares->diff);
sharesummary->reset = true;
}
}
}
if (!confirm_sharesummary)
workerstatus_update(NULL, shares, NULL);
sharesummary_update(conn, shares, NULL, NULL, shares->createby,
shares->createcode, shares->createinet,
&(shares->createdate));
return true;
}
// If it exists and it can be processed, process the oldest early share
static void shares_process_early(PGconn *conn, int64_t good_wid, tv_t *good_cd,
K_TREE *trf_root)
{
K_TREE_CTX ctx[1];
K_ITEM *es_item, *wi_item;
SHARES *early_shares;
char cd_buf[DATE_BUFSIZ];
char *why = EMPTY;
char *st = NULL;
char tmp[1024];
double delta;
bool ok;
LOGDEBUG("%s() add", __func__);
K_WLOCK(shares_free);
if (shares_early_store->count == 0) {
K_WUNLOCK(shares_free);
// None
return;
}
es_item = last_in_ktree(shares_early_root, ctx);
if (es_item) {
shares_early_root = remove_from_ktree(shares_early_root,
es_item,
cmp_shares);
k_unlink_item(shares_early_store, es_item);
}
K_WUNLOCK(shares_free);
if (es_item) {
DATA_SHARES(early_shares, es_item);
/* If the last (oldest) is newer than the
* current workinfo, leave it til later */
if (early_shares->workinfoid > good_wid)
goto redo;
/* If it matches the 'ok' share we just processed,
* we don't need to check the workinfoid */
if (early_shares->workinfoid == good_wid) {
ok = shares_process(conn, early_shares, trf_root);
if (ok)
goto keep;
else
goto discard;
} else {
wi_item = find_workinfo(early_shares->workinfoid, NULL);
if (!wi_item) {
// good_cd is 'now'
delta = tvdiff(good_cd,
&(early_shares->createdate));
if (early_shares->oldcount > 0) {
snprintf(tmp, sizeof(tmp),
" too old (%.1fs/%"PRId32")",
delta,
early_shares->oldcount);
why = tmp;
goto discard;
}
if (delta > EARLYSHARESLIMIT)
early_shares->oldcount++;
early_shares->redo++;
goto redo;
} else {
ok = shares_process(conn, early_shares, trf_root);
if (ok)
goto keep;
else
goto discard;
}
}
}
return;
redo:
K_WLOCK(shares_free);
shares_early_root = add_to_ktree(shares_early_root, es_item, cmp_shares);
k_add_tail(shares_early_store, es_item);
K_WUNLOCK(shares_free);
return;
keep:
btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share procured",
__func__, early_shares->workinfoid,
st = safe_text(early_shares->workername),
early_shares->createdate.tv_sec,
early_shares->createdate.tv_usec, cd_buf,
early_shares->oldcount, early_shares->redo);
FREENULL(st);
K_WLOCK(shares_free);
shares_root = add_to_ktree(shares_root, es_item, cmp_shares);
k_add_head(shares_store, es_item);
K_WUNLOCK(shares_free);
return;
discard:
btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share discarded!%s",
__func__, early_shares->workinfoid,
st = safe_text(early_shares->workername),
early_shares->createdate.tv_sec,
early_shares->createdate.tv_usec, cd_buf,
early_shares->oldcount, early_shares->redo, why);
FREENULL(st);
K_WLOCK(shares_free);
k_add_head(shares_free, es_item);
K_WUNLOCK(shares_free);
return;
}
static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
tv_t *good_cd, K_TREE *trf_root);
// Memory (and log file) only
bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername,
char *clientid, char *errn, char *enonce1, char *nonce2,
char *nonce, char *diff, char *sdiff, char *secondaryuserid,
char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item;
K_ITEM *s_item = NULL, *u_item, *wi_item;
char cd_buf[DATE_BUFSIZ];
SHARESUMMARY *sharesummary;
SHARES *shares;
SHARES *shares = NULL;
USERS *users;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
__func__,
workinfoid, st = safe_text(workername), nonce,
errn, cd->tv_sec, cd->tv_usec);
FREENULL(st);
K_WLOCK(shares_free);
s_item = k_unlink_head(shares_free);
@ -2782,13 +2970,15 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *txt;
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %.19s no user! Share discarded!",
__func__, txt = safe_text(username),
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
/* This should never happen unless there's a bug in ckpool
or the authentication information got to ckdb after
the shares ... which shouldn't ever happen */
LOGERR("%s() %s/%ld,%ld %s no user! Share discarded!",
__func__, st = safe_text(username),
cd->tv_sec, cd->tv_usec, cd_buf);
free(txt);
goto unitem;
FREENULL(st);
goto tisbad;
}
DATA_USERS(users, u_item);
@ -2809,11 +2999,12 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
STRNCPY(shares->secondaryuserid, users->secondaryuserid);
if (!tv_newer(&missing_secuser_min, cd) ||
!tv_newer(cd, &missing_secuser_max)) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %.19s missing secondaryuserid! "
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! "
"Share corrected",
__func__, username,
__func__, st = safe_text(username),
cd->tv_sec, cd->tv_usec, cd_buf);
FREENULL(st);
}
}
@ -2822,80 +3013,259 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
wi_item = find_workinfo(shares->workinfoid, NULL);
if (!wi_item) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
// TODO: store it for a few workinfoid changes
LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Share discarded!",
__func__, shares->workinfoid, workername,
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! "
"Early share queued!",
__func__, shares->workinfoid,
st = safe_text(workername),
cd->tv_sec, cd->tv_usec, cd_buf);
goto unitem;
FREENULL(st);
shares->redo = 0;
shares->oldcount = 0;
K_WLOCK(shares_free);
// They need to be sorted by workinfoid
shares_early_root = add_to_ktree(shares_early_root, s_item,
cmp_shares);
k_add_head(shares_early_store, s_item);
K_WUNLOCK(shares_free);
/* It was all OK except the missing workinfoid
* and it was queued, so most likely OK */
return true;
}
w_item = new_default_worker(conn, false, shares->userid, shares->workername,
by, code, inet, cd, trf_root);
if (!w_item)
goto unitem;
ok = shares_process(conn, shares, trf_root);
if (ok) {
K_WLOCK(shares_free);
shares_root = add_to_ktree(shares_root, s_item, cmp_shares);
k_add_head(shares_store, s_item);
K_WUNLOCK(shares_free);
shares_process_early(conn, shares->workinfoid,
&(shares->createdate), trf_root);
// Call both since shareerrors may be rare
shareerrors_process_early(conn, shares->workinfoid,
&(shares->createdate), trf_root);
// The original share was ok
return true;
}
tisbad:
K_WLOCK(shares_free);
k_add_head(shares_free, s_item);
K_WUNLOCK(shares_free);
return false;
}
static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
K_TREE *trf_root)
{
K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary;
char *st = NULL;
LOGDEBUG("%s() add", __func__);
w_item = new_default_worker(conn, false, shareerrors->userid,
shareerrors->workername,
shareerrors->createby,
shareerrors->createcode,
shareerrors->createinet,
&(shareerrors->createdate), trf_root);
if (!w_item) {
LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld",
__func__, shareerrors->userid,
st = safe_text(shareerrors->workername),
shareerrors->createdate.tv_sec,
shareerrors->createdate.tv_usec);
FREENULL(st);
return false;
}
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is processed
wm_item = find_workmarkers(shares->workinfoid, false,
wm_item = find_workmarkers(shareerrors->workinfoid, false,
MARKER_PROCESSED);
if (wm_item) {
K_WLOCK(shares_free);
k_add_head(shares_free, s_item);
K_WUNLOCK(shares_free);
return true;
LOGDEBUG("%s(): workmarker exists for wid %"PRId64
" %"PRId64"/%s/%ld,%ld",
__func__, shareerrors->workinfoid,
shareerrors->userid,
st = safe_text(shareerrors->workername),
shareerrors->createdate.tv_sec,
shareerrors->createdate.tv_usec);
FREENULL(st);
return false;
}
ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid);
ss_item = find_sharesummary(shareerrors->userid,
shareerrors->workername,
shareerrors->workinfoid);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) {
K_WLOCK(shares_free);
k_add_head(shares_free, s_item);
K_WUNLOCK(shares_free);
return true;
LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete,
shareerrors->workinfoid,
shareerrors->userid,
st = safe_text(shareerrors->workername),
shareerrors->createdate.tv_sec,
shareerrors->createdate.tv_usec);
FREENULL(st);
return false;
}
if (!sharesummary->reset) {
zero_sharesummary(sharesummary, cd, shares->diff);
zero_sharesummary(sharesummary,
&(shareerrors->createdate),
0.0);
sharesummary->reset = true;
}
}
}
if (!confirm_sharesummary)
workerstatus_update(NULL, shares, NULL);
sharesummary_update(conn, NULL, shareerrors, NULL,
shareerrors->createby,
shareerrors->createcode,
shareerrors->createinet,
&(shareerrors->createdate));
sharesummary_update(conn, shares, NULL, NULL, by, code, inet, cd);
return true;
}
ok = true;
unitem:
K_WLOCK(shares_free);
if (!ok)
k_add_head(shares_free, s_item);
else {
shares_root = add_to_ktree(shares_root, s_item, cmp_shares);
k_add_head(shares_store, s_item);
}
K_WUNLOCK(shares_free);
// If it exists and it can be processed, process the oldest early shareerror
static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
tv_t *good_cd, K_TREE *trf_root)
{
K_TREE_CTX ctx[1];
K_ITEM *es_item, *wi_item;
SHAREERRORS *early_shareerrors;
char cd_buf[DATE_BUFSIZ];
char *why = EMPTY;
char *st = NULL;
char tmp[1024];
double delta;
bool ok;
return ok;
LOGDEBUG("%s() add", __func__);
K_WLOCK(shareerrors_free);
if (shareerrors_early_store->count == 0) {
K_WUNLOCK(shareerrors_free);
// None
return;
}
es_item = last_in_ktree(shareerrors_early_root, ctx);
if (es_item) {
shareerrors_early_root = remove_from_ktree(shareerrors_early_root,
es_item,
cmp_shareerrors);
k_unlink_item(shareerrors_early_store, es_item);
}
K_WUNLOCK(shareerrors_free);
if (es_item) {
DATA_SHAREERRORS(early_shareerrors, es_item);
/* If the last (oldest) is newer than the
* current workinfo, leave it til later */
if (early_shareerrors->workinfoid > good_wid)
goto redo;
/* If it matches the 'ok' share/shareerror we just processed,
* we don't need to check the workinfoid */
if (early_shareerrors->workinfoid == good_wid) {
ok = shareerrors_process(conn, early_shareerrors,
trf_root);
if (ok)
goto keep;
else
goto discard;
} else {
wi_item = find_workinfo(early_shareerrors->workinfoid, NULL);
if (!wi_item) {
// good_cd is 'now'
delta = tvdiff(good_cd,
&(early_shareerrors->createdate));
if (early_shareerrors->oldcount > 0) {
snprintf(tmp, sizeof(tmp),
" too old (%.1fs/%"PRId32")",
delta,
early_shareerrors->oldcount);
why = tmp;
goto discard;
}
if (delta > EARLYSHARESLIMIT)
early_shareerrors->oldcount++;
early_shareerrors->redo++;
goto redo;
} else {
ok = shareerrors_process(conn,
early_shareerrors,
trf_root);
if (ok)
goto keep;
else
goto discard;
}
}
}
return;
redo:
K_WLOCK(shareerrors_free);
shareerrors_early_root = add_to_ktree(shareerrors_early_root, es_item,
cmp_shareerrors);
k_add_tail(shareerrors_early_store, es_item);
K_WUNLOCK(shareerrors_free);
return;
keep:
btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share procured",
__func__, early_shareerrors->workinfoid,
st = safe_text(early_shareerrors->workername),
early_shareerrors->createdate.tv_sec,
early_shareerrors->createdate.tv_usec, cd_buf,
early_shareerrors->oldcount, early_shareerrors->redo);
FREENULL(st);
K_WLOCK(shareerrors_free);
shareerrors_root = add_to_ktree(shareerrors_root, es_item, cmp_shareerrors);
k_add_head(shareerrors_store, es_item);
K_WUNLOCK(shareerrors_free);
return;
discard:
btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share discarded!%s",
__func__, early_shareerrors->workinfoid,
st = safe_text(early_shareerrors->workername),
early_shareerrors->createdate.tv_sec,
early_shareerrors->createdate.tv_usec, cd_buf,
early_shareerrors->oldcount, early_shareerrors->redo, why);
FREENULL(st);
K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, es_item);
K_WUNLOCK(shareerrors_free);
return;
}
// Memory (and log file) only
// TODO: handle shareerrors that appear after a workinfoid is aged or doesn't exist?
bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
char *workername, char *clientid, char *errn,
char *error, char *secondaryuserid, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
K_ITEM *s_item, *u_item, *wi_item, *w_item, *wm_item, *ss_item;
K_ITEM *s_item = NULL, *u_item, *wi_item;
char cd_buf[DATE_BUFSIZ];
SHARESUMMARY *sharesummary;
SHAREERRORS *shareerrors;
SHAREERRORS *shareerrors = NULL;
USERS *users;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
__func__,
workinfoid, st = safe_text(workername), errn,
error, cd->tv_sec, cd->tv_usec);
FREENULL(st);
K_WLOCK(shareerrors_free);
s_item = k_unlink_head(shareerrors_free);
@ -2907,13 +3277,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *txt;
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %.19s no user! Shareerror discarded!",
__func__, txt = safe_text(username),
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %s no user! Shareerror discarded!",
__func__, st = safe_text(username),
cd->tv_sec, cd->tv_usec, cd_buf);
free(txt);
goto unitem;
FREENULL(st);
goto tisbad;
}
DATA_USERS(users, u_item);
@ -2930,11 +3299,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
STRNCPY(shareerrors->secondaryuserid, users->secondaryuserid);
if (!tv_newer(&missing_secuser_min, cd) ||
!tv_newer(cd, &missing_secuser_max)) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %.19s missing secondaryuserid! "
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! "
"Sharerror corrected",
__func__, username,
__func__, st = safe_text(username),
cd->tv_sec, cd->tv_usec, cd_buf);
FREENULL(st);
}
}
@ -2943,59 +3313,51 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
wi_item = find_workinfo(shareerrors->workinfoid, NULL);
if (!wi_item) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %.19s no workinfo! Shareerror discarded!",
__func__, shareerrors->workinfoid, workername,
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! "
"Early shareerror queued!",
__func__, shareerrors->workinfoid,
st = safe_text(workername),
cd->tv_sec, cd->tv_usec, cd_buf);
goto unitem;
FREENULL(st);
shareerrors->redo = 0;
shareerrors->oldcount = 0;
K_WLOCK(shareerrors_free);
// They need to be sorted by workinfoid
shareerrors_early_root = add_to_ktree(shareerrors_early_root,
s_item,
cmp_shareerrors);
k_add_head(shareerrors_early_store, s_item);
K_WUNLOCK(shareerrors_free);
/* It was all OK except the missing workinfoid
* and it was queued, so most likely OK */
return true;
}
w_item = new_default_worker(NULL, false, shareerrors->userid, shareerrors->workername,
by, code, inet, cd, trf_root);
if (!w_item)
goto unitem;
ok = shareerrors_process(conn, shareerrors, trf_root);
if (ok) {
K_WLOCK(shareerrors_free);
shareerrors_root = add_to_ktree(shareerrors_root, s_item,
cmp_shareerrors);
k_add_head(shareerrors_store, s_item);
K_WUNLOCK(shareerrors_free);
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is processed
wm_item = find_workmarkers(shareerrors->workinfoid, false,
MARKER_PROCESSED);
if (wm_item) {
K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, s_item);
K_WUNLOCK(shareerrors_free);
return true;
}
ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) {
K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, s_item);
K_WUNLOCK(shareerrors_free);
return true;
}
shareerrors_process_early(conn, shareerrors->workinfoid,
&(shareerrors->createdate),
trf_root);
// Call both in case we are only getting errors on bad work
shares_process_early(conn, shareerrors->workinfoid,
&(shareerrors->createdate), trf_root);
if (!sharesummary->reset) {
zero_sharesummary(sharesummary, cd, 0.0);
sharesummary->reset = true;
}
}
// The original share was ok
return true;
}
sharesummary_update(conn, NULL, shareerrors, NULL, by, code, inet, cd);
ok = true;
unitem:
tisbad:
K_WLOCK(shareerrors_free);
if (!ok)
k_add_head(shareerrors_free, s_item);
else {
shareerrors_root = add_to_ktree(shareerrors_root, s_item, cmp_shareerrors);
k_add_head(shareerrors_store, s_item);
}
k_add_head(shareerrors_free, s_item);
K_WUNLOCK(shareerrors_free);
return ok;
return false;
}
bool shareerrors_fill()
@ -3030,6 +3392,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
char *params[2];
int n, par = 0, deleted = -7;
int ss_count, ms_count;
char *st = NULL;
char *del;
LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/"
@ -3120,7 +3483,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
LOGDEBUG("%s() new ms %"PRId64"/%"PRId64"/%s",
shortname, markersummary->markerid,
markersummary->userid,
markersummary->workername);
st = safe_text(markersummary->workername));
FREENULL(st);
} else {
DATA_MARKERSUMMARY(markersummary, ms_item);
}
@ -3335,6 +3699,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
tv_t *sharecreatedate;
bool must_update = false, conned = false;
double diff = 0;
char *st = NULL, *db = NULL;
LOGDEBUG("%s(): update", __func__);
@ -3378,16 +3743,16 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED);
K_RUNLOCK(workmarkers_free);
if (wm_item) {
char *tmp;
DATA_WORKMARKERS(wm, wm_item);
LOGERR("%s(): attempt to update sharesummary "
"with %s %"PRId64"/%"PRId64"/%s createdate %s"
" but processed workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, workername,
(tmp = ctv_to_buf(sharecreatedate, NULL, 0)),
workinfoid, userid, st = safe_text(workername),
db = ctv_to_buf(sharecreatedate, NULL, 0),
wm->markerid);
free(tmp);
FREENULL(st);
FREENULL(db);
return false;
}
@ -3496,9 +3861,12 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
free(tmp1);
}
if (row->complete[0] != SUMMARY_NEW) {
LOGDEBUG("%s(): updating sharesummary not '%c' %"PRId64"/%s/%"PRId64"/%s",
__func__, SUMMARY_NEW, row->userid, row->workername,
LOGDEBUG("%s(): updating sharesummary not '%c'"
" %"PRId64"/%s/%"PRId64"/%s",
__func__, SUMMARY_NEW, row->userid,
st = safe_text(row->workername),
row->workinfoid, row->complete);
FREENULL(st);
}
}
}
@ -4081,6 +4449,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash,
bool ok = false, update_old = false;
int n, par = 0;
char want = '?';
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
@ -4404,8 +4773,9 @@ flail:
snprintf(tmp, sizeof(tmp),
" Reward: %f, Worker: %s, ShareEst: %.1f %s%s%% UTC:%s",
BTC_TO_D(row->reward),
row->workername,
st = safe_text(row->workername),
pool.diffacc, est, pct, cd_buf);
FREENULL(st);
if (pool.workinfoid < row->workinfoid) {
pool.workinfoid = row->workinfoid;
pool.height = row->height;
@ -5104,6 +5474,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
char cd_buf[DATE_BUFSIZ];
AUTHS *row;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
@ -5121,11 +5492,10 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
u_item = users_add(conn, username, EMPTY, EMPTY,
by, code, inet, cd, trf_root);
} else {
char *txt;
LOGDEBUG("%s(): unknown user '%s'",
__func__,
txt = safe_text(username));
free(txt);
st = safe_text(username));
FREENULL(st);
}
if (!u_item)
goto unitem;
@ -5165,7 +5535,9 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
// Shouldn't actually be possible unless twice in the logs
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s(): Duplicate auths ignored %s/%s/%s",
__func__, poolinstance, workername, cd_buf);
__func__, poolinstance, st = safe_text(workername),
cd_buf);
FREENULL(st);
/* Let them mine, that's what matters :)
* though this would normally only be during a reload */
@ -5471,6 +5843,7 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
USERSTATS *row, *match, *next;
USERS *users;
K_TREE_CTX ctx[1];
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
@ -5486,11 +5859,10 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *txt;
LOGERR("%s(): unknown user '%s'",
__func__,
txt = safe_text(username));
free(txt);
st = safe_text(username));
FREENULL(st);
return false;
}
DATA_USERS(users, u_item);
@ -5595,13 +5967,13 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
char *usr, *wrk;
char *usr = NULL, *wrk = NULL;
LOGERR("%s(): unknown user '%s' (worker=%s)",
__func__,
usr = safe_text(username),
wrk = safe_text(workername));
free(usr);
free(wrk);
FREENULL(usr);
FREENULL(wrk);
return false;
}
DATA_USERS(users, u_item);
@ -5654,6 +6026,7 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
int n, par = 0;
char *ins;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
@ -5691,8 +6064,10 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
MODIFYDATECONTROL ") values (" PQPARAM26 ")";
LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f",
__func__, row->markerid, row->userid, row->workername,
__func__, row->markerid, row->userid,
st = safe_text(row->workername),
row->diffacc);
FREENULL(st);
if (!conn) {
conn = dbconnect();

Loading…
Cancel
Save