Browse Source

ckdb - limit console early share messages

master
kanoi 8 years ago
parent
commit
929c30df06
  1. 16
      src/ckdb.c
  2. 41
      src/ckdb.h
  3. 161
      src/ckdb_data.c
  4. 74
      src/ckdb_dbio.c

16
src/ckdb.c

@ -640,6 +640,11 @@ K_TREE *optioncontrol_root;
K_LIST *optioncontrol_free; K_LIST *optioncontrol_free;
K_STORE *optioncontrol_store; K_STORE *optioncontrol_store;
// ESM (Early Share/Shareerror Messages)
K_TREE *esm_root;
K_LIST *esm_free;
K_STORE *esm_store;
// WORKINFO workinfo.id.json={...} // WORKINFO workinfo.id.json={...}
K_TREE *workinfo_root; K_TREE *workinfo_root;
// created during data load then destroyed since not needed later // created during data load then destroyed since not needed later
@ -2044,6 +2049,10 @@ static void alloc_storage()
ALLOC_IDCONTROL, LIMIT_IDCONTROL, true); ALLOC_IDCONTROL, LIMIT_IDCONTROL, true);
idcontrol_store = k_new_store(idcontrol_free); idcontrol_store = k_new_store(idcontrol_free);
esm_free = k_new_list("ESM", sizeof(ESM), ALLOC_ESM, LIMIT_ESM, true);
esm_store = k_new_store(esm_free);
esm_root = new_ktree(NULL, cmp_esm, esm_free);
workinfo_free = k_new_list("WorkInfo", sizeof(WORKINFO), workinfo_free = k_new_list("WorkInfo", sizeof(WORKINFO),
ALLOC_WORKINFO, LIMIT_WORKINFO, true); ALLOC_WORKINFO, LIMIT_WORKINFO, true);
workinfo_store = k_new_store(workinfo_free); workinfo_store = k_new_store(workinfo_free);
@ -2256,6 +2265,7 @@ static void alloc_storage()
DLPRIO(userstats, 10); DLPRIO(userstats, 10);
// Don't currently nest any locks in these: // Don't currently nest any locks in these:
DLPRIO(esm, PRIO_TERMINAL);
DLPRIO(workers, PRIO_TERMINAL); DLPRIO(workers, PRIO_TERMINAL);
DLPRIO(idcontrol, PRIO_TERMINAL); DLPRIO(idcontrol, PRIO_TERMINAL);
DLPRIO(paymentaddresses, PRIO_TERMINAL); DLPRIO(paymentaddresses, PRIO_TERMINAL);
@ -2556,6 +2566,12 @@ static void dealloc_storage()
LOGWARNING("%s() etc ...", __func__); LOGWARNING("%s() etc ...", __func__);
if (esm_store->count > 0) {
LOGWARNING("%s() ***ESM had %d records ...",
__func__, esm_store->count);
}
FREE_ALL(esm);
FREE_LISTS(idcontrol); FREE_LISTS(idcontrol);
FREE_ALL(accountbalance); FREE_ALL(accountbalance);
FREE_ALL(payments); FREE_ALL(payments);

41
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.441" #define CKDB_VERSION DB_VERSION"-2.442"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1913,7 +1913,38 @@ typedef struct oc_trigger {
void (*func)(OPTIONCONTROL *, const char *); void (*func)(OPTIONCONTROL *, const char *);
} OC_TRIGGER; } OC_TRIGGER;
// TODO: discarding workinfo,shares // ESM (Early Share/Shareerror Messages)
typedef struct esm {
int64_t workinfoid;
int queued;
int procured;
int discarded;
int errqueued;
int errprocured;
int errdiscarded;
tv_t createdate;
} ESM;
/* This is to reduce the number of console Early messages
* The first queued message is displayed LOG_ERR
* then the rest, for the given workinfoid, are LOG_NOTICE
* A final summary for the workinfoid will be displayed later with LOG_ERR,
* if there were any */
#define ALLOC_ESM 10
#define LIMIT_ESM 0
#define INIT_ESM(_item) INIT_GENERIC(_item, esm)
#define DATA_ESM(_var, _item) DATA_GENERIC(_var, _item, esm, true)
#define DATA_ESM_NULL(_var, _item) DATA_GENERIC(_var, _item, esm, false)
extern K_TREE *esm_root;
extern K_LIST *esm_free;
extern K_STORE *esm_store;
/* Age limit, in seconds, before displaying ESM summary messages and
* deleting the associated ESM record */
#define ESM_LIMIT 60.0
// TODO: discarding workinfo
// WORKINFO workinfo.id.json={...} // WORKINFO workinfo.id.json={...}
typedef struct workinfo { typedef struct workinfo {
int64_t workinfoid; int64_t workinfoid;
@ -3213,6 +3244,10 @@ extern K_ITEM *find_optioncontrol(char *optionname, const tv_t *now, int32_t hei
#define sys_setting(_name, _def, _now) user_sys_setting(0, _name, _def, _now) #define sys_setting(_name, _def, _now) user_sys_setting(0, _name, _def, _now)
extern int64_t user_sys_setting(int64_t userid, char *setting_name, extern int64_t user_sys_setting(int64_t userid, char *setting_name,
int64_t setting_default, const tv_t *now); int64_t setting_default, const tv_t *now);
extern cmp_t cmp_esm(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_esm(int64_t workinfoid);
extern bool esm_flag(int64_t workinfoid, bool error, bool procured);
extern void esm_check(tv_t *now);
extern cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b);
#define coinbase1height(_wi) _coinbase1height(_wi, WHERE_FFL_HERE) #define coinbase1height(_wi) _coinbase1height(_wi, WHERE_FFL_HERE)
extern int32_t _coinbase1height(WORKINFO *wi, WHERE_FFL_ARGS); extern int32_t _coinbase1height(WORKINFO *wi, WHERE_FFL_ARGS);
@ -3220,6 +3255,8 @@ extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b);
#define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx); #define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx);
extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx); extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx);
extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern K_ITEM *find_workinfo_esm(int64_t workinfoid, bool error, bool *created,
tv_t *createdate);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd, extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff); int64_t *s_count, int64_t *s_diff);

161
src/ckdb_data.c

@ -2126,6 +2126,145 @@ int64_t user_sys_setting(int64_t userid, char *setting_name,
return setting_default; return setting_default;
} }
// order by workinfoid asc
cmp_t cmp_esm(K_ITEM *a, K_ITEM *b)
{
ESM *ea, *eb;
DATA_ESM(ea, a);
DATA_ESM(eb, b);
return CMP_BIGINT(ea->workinfoid, eb->workinfoid);
}
// must be locked before calling since data access must also be under lock
K_ITEM *find_esm(int64_t workinfoid)
{
K_TREE_CTX ctx[1];
K_ITEM look;
ESM lookesm;
lookesm.workinfoid = workinfoid;
INIT_ESM(&look);
look.data = (void *)(&lookesm);
return find_in_ktree(esm_root, &look, ctx);
}
bool esm_flag(int64_t workinfoid, bool error, bool procured)
{
K_ITEM *esm_item = NULL;
ESM *esm = NULL;
bool failed;
K_WLOCK(esm_free);
esm_item = find_esm(workinfoid);
if (!esm_item) {
/* This isn't fatal since the message will be logged anyway
* It just means the esm workinfoid summary was early and
* incorrect (which shouldn't happen) */
failed = true;
} else {
DATA_ESM(esm, esm_item);
if (!error && procured)
esm->procured++;
else if (!error && !procured)
esm->discarded++;
else if (error && procured)
esm->errprocured++;
else if (error && !procured)
esm->errdiscarded++;
failed = false;
}
K_WUNLOCK(esm_free);
return failed;
}
// called under workinfo lock
static bool find_create_esm(int64_t workinfoid, bool error, tv_t *createdate)
{
K_ITEM look, *esm_item;
K_TREE_CTX ctx[1];
ESM lookesm, *esm;
bool created;
lookesm.workinfoid = workinfoid;
INIT_ESM(&look);
look.data = (void *)(&lookesm);
K_WLOCK(esm_free);
esm_item = find_in_ktree(esm_root, &look, ctx);
if (!esm_item) {
created = true;
esm_item = k_unlink_head_zero(esm_free);
DATA_ESM(esm, esm_item);
esm->workinfoid = workinfoid;
copy_tv(&(esm->createdate), createdate);
} else {
created = false;
DATA_ESM(esm, esm_item);
}
if (error)
esm->errqueued++;
else
esm->queued++;
K_WUNLOCK(esm_free);
return created;
}
void esm_check(tv_t *now)
{
K_ITEM *esm_item = NULL;
ESM *esm = NULL;
bool had = true;
while (had) {
K_WLOCK(esm_free);
if (esm_store->count == 0)
had = false;
else {
// items should be rare and few, so just loop thru them
esm_item = STORE_WHEAD(esm_store);
while (esm_item) {
DATA_ESM(esm, esm_item);
if (tvdiff(now, &(esm->createdate)) > ESM_LIMIT) {
k_unlink_item(esm_store, esm_item);
break;
}
esm_item = esm_item->next;
}
}
K_WUNLOCK(esm_free);
if (!esm_item)
had = false;
else {
if (esm->queued || esm->procured || esm->discarded) {
int diff = esm->queued - esm->procured -
esm->discarded;
LOGWARNING("%s() %s%d wid=%"PRId64" early "
"shares=%d procured=%d discarded=%d",
__func__, diff ? "DIFF " : EMPTY,
diff, esm->workinfoid,
esm->queued, esm->procured,
esm->discarded);
}
if (esm->errqueued || esm->errprocured ||
esm->errdiscarded) {
int diff = esm->errqueued - esm->errprocured -
esm->errdiscarded;
LOGWARNING("%s() %s%d wid=%"PRId64" early "
"shareerrors=%d procured=%d "
"discarded=%d",
__func__, diff ? "DIFF " : EMPTY,
diff, esm->workinfoid,
esm->errqueued, esm->errprocured,
esm->errdiscarded);
}
K_WLOCK(esm_free);
k_add_head(esm_free, esm_item);
K_WUNLOCK(esm_free);
}
}
}
// order by workinfoid asc,expirydate asc // order by workinfoid asc,expirydate asc
cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b) cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b)
{ {
@ -2243,6 +2382,28 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx)
return item; return item;
} }
// create the esm record inside the workinfo lock
K_ITEM *find_workinfo_esm(int64_t workinfoid, bool error, bool *created, tv_t *createdate)
{
WORKINFO workinfo;
K_TREE_CTX ctx[1];
K_ITEM look, *wi_item;
*created = false;
workinfo.workinfoid = workinfoid;
workinfo.expirydate.tv_sec = default_expiry.tv_sec;
workinfo.expirydate.tv_usec = default_expiry.tv_usec;
INIT_WORKINFO(&look);
look.data = (void *)(&workinfo);
K_RLOCK(workinfo_free);
wi_item = find_in_ktree(workinfo_root, &look, ctx);
if (!wi_item)
*created = find_create_esm(workinfoid, error, createdate);
K_RUNLOCK(workinfo_free);
return wi_item;
}
#define DISCARD_ALL -1 #define DISCARD_ALL -1
/* No longer required since we already discard the shares after being added /* No longer required since we already discard the shares after being added
* to the sharesummary */ * to the sharesummary */

74
src/ckdb_dbio.c

@ -3636,7 +3636,7 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd,
char *st = NULL; char *st = NULL;
char tmp[1024]; char tmp[1024];
double delta; double delta;
bool ok; bool ok, failed;
LOGDEBUG("%s() add", __func__); LOGDEBUG("%s() add", __func__);
@ -3645,7 +3645,7 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd,
if (shares_early_store->count == 0) { if (shares_early_store->count == 0) {
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
// None // None
return; goto out;
} }
es_item = last_in_ktree(shares_early_root, ctx); es_item = last_in_ktree(shares_early_root, ctx);
if (es_item) { if (es_item) {
@ -3696,18 +3696,20 @@ static void shares_process_early(PGconn *conn, K_ITEM *wi, tv_t *good_cd,
} }
} }
} }
return; goto out;
redo: redo:
K_WLOCK(shares_free); K_WLOCK(shares_free);
add_to_ktree(shares_early_root, es_item); add_to_ktree(shares_early_root, es_item);
k_add_tail(shares_early_store, es_item); k_add_tail(shares_early_store, es_item);
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
return; goto out;
keep: keep:
failed = esm_flag(early_shares->workinfoid, false, true);
btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share procured", " Early share procured",
__func__, early_shares->workinfoid, __func__, failed ? "***ESM " : EMPTY,
early_shares->workinfoid,
st = safe_text_nonull(early_shares->workername), st = safe_text_nonull(early_shares->workername),
early_shares->createdate.tv_sec, early_shares->createdate.tv_sec,
early_shares->createdate.tv_usec, cd_buf, early_shares->createdate.tv_usec, cd_buf,
@ -3717,12 +3719,14 @@ keep:
// Discard it, it's been processed // Discard it, it's been processed
k_add_head(shares_free, es_item); k_add_head(shares_free, es_item);
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
return; goto out;
discard: discard:
failed = esm_flag(early_shares->workinfoid, false, false);
btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf)); btv_to_buf(&(early_shares->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share discarded!%s", " Early share discarded!%s",
__func__, early_shares->workinfoid, __func__, failed ? "***ESM " : EMPTY,
early_shares->workinfoid,
st = safe_text_nonull(early_shares->workername), st = safe_text_nonull(early_shares->workername),
early_shares->createdate.tv_sec, early_shares->createdate.tv_sec,
early_shares->createdate.tv_usec, cd_buf, early_shares->createdate.tv_usec, cd_buf,
@ -3731,7 +3735,10 @@ discard:
K_WLOCK(shares_free); K_WLOCK(shares_free);
k_add_head(shares_free, es_item); k_add_head(shares_free, es_item);
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
return; out:
// accessed outside lock, but esm_check() uses the lock
if (esm_store->count)
esm_check(good_cd);
} }
static void shareerrors_process_early(PGconn *conn, int64_t good_wid, static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
@ -3750,7 +3757,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
SHARES *shares = NULL, *shares2 = NULL; SHARES *shares = NULL, *shares2 = NULL;
double sdiff_amt; double sdiff_amt;
USERS *users; USERS *users;
bool ok = false, dup = false; bool ok = false, dup = false, created;
char *st = NULL; char *st = NULL;
tv_t share_cd; tv_t share_cd;
@ -3827,10 +3834,12 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
memcpy(shares2, shares, sizeof(*shares2)); memcpy(shares2, shares, sizeof(*shares2));
} }
wi_item = find_workinfo(shares->workinfoid, NULL); wi_item = find_workinfo_esm(shares->workinfoid, false, &created,
&(shares->createdate));
if (!wi_item) { if (!wi_item) {
int sta = (created ? LOG_ERR : LOG_NOTICE);
btv_to_buf(cd, cd_buf, sizeof(cd_buf)); btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " LOGMSG(sta, "%s() %"PRId64"/%s/%ld,%ld %s no workinfo! "
"Early share queued!", "Early share queued!",
__func__, shares->workinfoid, __func__, shares->workinfoid,
st = safe_text_nonull(workername), st = safe_text_nonull(workername),
@ -4338,7 +4347,7 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
char *st = NULL; char *st = NULL;
char tmp[1024]; char tmp[1024];
double delta; double delta;
bool ok; bool ok, failed;
LOGDEBUG("%s() add", __func__); LOGDEBUG("%s() add", __func__);
@ -4346,7 +4355,7 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
if (shareerrors_early_store->count == 0) { if (shareerrors_early_store->count == 0) {
K_WUNLOCK(shareerrors_free); K_WUNLOCK(shareerrors_free);
// None // None
return; goto out;
} }
es_item = last_in_ktree(shareerrors_early_root, ctx); es_item = last_in_ktree(shareerrors_early_root, ctx);
if (es_item) { if (es_item) {
@ -4399,18 +4408,20 @@ static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
} }
} }
} }
return; goto out;
redo: redo:
K_WLOCK(shareerrors_free); K_WLOCK(shareerrors_free);
add_to_ktree(shareerrors_early_root, es_item); add_to_ktree(shareerrors_early_root, es_item);
k_add_tail(shareerrors_early_store, es_item); k_add_tail(shareerrors_early_store, es_item);
K_WUNLOCK(shareerrors_free); K_WUNLOCK(shareerrors_free);
return; goto out;
keep: keep:
failed = esm_flag(early_shareerrors->workinfoid, true, true);
btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share procured", " Early shareerror procured",
__func__, early_shareerrors->workinfoid, __func__, failed ? "***ESM " : EMPTY,
early_shareerrors->workinfoid,
st = safe_text_nonull(early_shareerrors->workername), st = safe_text_nonull(early_shareerrors->workername),
early_shareerrors->createdate.tv_sec, early_shareerrors->createdate.tv_sec,
early_shareerrors->createdate.tv_usec, cd_buf, early_shareerrors->createdate.tv_usec, cd_buf,
@ -4420,12 +4431,14 @@ keep:
add_to_ktree(shareerrors_root, es_item); add_to_ktree(shareerrors_root, es_item);
k_add_head(shareerrors_store, es_item); k_add_head(shareerrors_store, es_item);
K_WUNLOCK(shareerrors_free); K_WUNLOCK(shareerrors_free);
return; goto out;
discard: discard:
failed = esm_flag(early_shareerrors->workinfoid, true, false);
btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf)); btv_to_buf(&(early_shareerrors->createdate), cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 LOGNOTICE("%s() %s%"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32
" Early share discarded!%s", " Early shareerror discarded!%s",
__func__, early_shareerrors->workinfoid, __func__, failed ? "***ESM " : EMPTY,
early_shareerrors->workinfoid,
st = safe_text_nonull(early_shareerrors->workername), st = safe_text_nonull(early_shareerrors->workername),
early_shareerrors->createdate.tv_sec, early_shareerrors->createdate.tv_sec,
early_shareerrors->createdate.tv_usec, cd_buf, early_shareerrors->createdate.tv_usec, cd_buf,
@ -4434,7 +4447,10 @@ discard:
K_WLOCK(shareerrors_free); K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, es_item); k_add_head(shareerrors_free, es_item);
K_WUNLOCK(shareerrors_free); K_WUNLOCK(shareerrors_free);
return; out:
// accessed outside lock, but esm_check() uses the lock
if (esm_store->count)
esm_check(good_cd);
} }
// Memory (and log file) only // Memory (and log file) only
@ -4447,7 +4463,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
char cd_buf[DATE_BUFSIZ]; char cd_buf[DATE_BUFSIZ];
SHAREERRORS *shareerrors = NULL; SHAREERRORS *shareerrors = NULL;
USERS *users; USERS *users;
bool ok = false; bool ok = false, created;
char *st = NULL; char *st = NULL;
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
@ -4501,10 +4517,12 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
HISTORYDATEINIT(shareerrors, cd, by, code, inet); HISTORYDATEINIT(shareerrors, cd, by, code, inet);
HISTORYDATETRANSFER(trf_root, shareerrors); HISTORYDATETRANSFER(trf_root, shareerrors);
wi_item = find_workinfo(shareerrors->workinfoid, NULL); wi_item = find_workinfo_esm(shareerrors->workinfoid, true, &created,
&(shareerrors->createdate));
if (!wi_item) { if (!wi_item) {
int sta = (created ? LOG_ERR : LOG_NOTICE);
btv_to_buf(cd, cd_buf, sizeof(cd_buf)); btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " LOGMSG(sta, "%s() %"PRId64"/%s/%ld,%ld %s no workinfo! "
"Early shareerror queued!", "Early shareerror queued!",
__func__, shareerrors->workinfoid, __func__, shareerrors->workinfoid,
st = safe_text_nonull(workername), st = safe_text_nonull(workername),

Loading…
Cancel
Save