Browse Source

ckdb - add keysummary code, ako and stop core dumping on a missing : in oc_ips()

master
kanoi 9 years ago
parent
commit
b530704419
  1. 52
      src/ckdb.c
  2. 287
      src/ckdb.h
  3. 2
      src/ckdb_cmd.c
  4. 166
      src/ckdb_data.c
  5. 525
      src/ckdb_dbio.c
  6. 4
      src/ktree.c

52
src/ckdb.c

@ -711,6 +711,16 @@ K_STORE *markersummary_pool_store;
char mark_start_type = '\0';
int64_t mark_start = -1;
// KEYSHARESUMMARY
K_TREE *keysharesummary_root;
K_LIST *keysharesummary_free;
K_STORE *keysharesummary_store;
// KEYSUMMARY
K_TREE *keysummary_root;
K_LIST *keysummary_free;
K_STORE *keysummary_store;
// WORKMARKERS
K_TREE *workmarkers_root;
K_TREE *workmarkers_workinfoid_root;
@ -1617,6 +1627,20 @@ static void alloc_storage()
cmp_markersummary,
markersummary_free);
keysharesummary_free = k_new_list("KeyShareSummary",
sizeof(KEYSHARESUMMARY),
ALLOC_KEYSHARESUMMARY,
LIMIT_KEYSHARESUMMARY, true);
keysharesummary_store = k_new_store(keysharesummary_free);
keysharesummary_root = new_ktree(NULL, cmp_keysharesummary,
keysharesummary_free);
keysummary_free = k_new_list("KeySummary", sizeof(KEYSUMMARY),
ALLOC_KEYSUMMARY, LIMIT_KEYSUMMARY,
true);
keysummary_store = k_new_store(keysummary_free);
keysummary_root = new_ktree(NULL, cmp_keysummary, keysummary_free);
workmarkers_free = k_new_list("WorkMarkers", sizeof(WORKMARKERS),
ALLOC_WORKMARKERS, LIMIT_WORKMARKERS, true);
workmarkers_store = k_new_store(workmarkers_free);
@ -1649,8 +1673,10 @@ static void alloc_storage()
DLPRIO(workerstatus, 69);
DLPRIO(sharesummary, 68);
DLPRIO(markersummary, 67);
DLPRIO(workmarkers, 66);
DLPRIO(keysharesummary, 67);
DLPRIO(markersummary, 65);
DLPRIO(keysummary, 64);
DLPRIO(workmarkers, 62);
DLPRIO(marks, 60);
@ -1855,9 +1881,17 @@ static void dealloc_storage()
FREE_LIST_DATA(workmarkers);
if (free_mode != FREE_MODE_ALL)
LOGWARNING("%s() markersummary skipped", __func__);
LOGWARNING("%s() key/markersummary skipped", __func__);
else {
LOGWARNING("%s() markersummary ...", __func__);
LOGWARNING("%s() key/markersummary ...", __func__);
FREE_TREE(keysummary);
FREE_STORE_DATA(keysummary);
FREE_LIST(keysummary);
FREE_TREE(keysharesummary);
FREE_STORE_DATA(keysharesummary);
FREE_LIST(keysharesummary);
FREE_TREE(markersummary_pool);
k_list_transfer_to_tail_nolock(markersummary_pool_store,
@ -4161,7 +4195,7 @@ static void *summariser(__maybe_unused void *arg)
#define SHIFT_WORDS 26
static char *shift_words[] =
{
"akatsuki",
"ako",
"belldandy",
"charlotte",
"darkchii",
@ -4277,7 +4311,9 @@ static void make_a_shift_mark()
}
/* Find the last !new sharesummary workinfoid
* If the shift needs to go beyond this, then it's not ready yet */
* If the shift needs to go beyond this, then it's not ready yet
* keysharesummaries will have the same workinfoid,
* so don't need checking */
ss_age_wid = 0;
K_RLOCK(sharesummary_free);
ss_item = first_in_ktree(sharesummary_workinfoid_root, ss_ctx);
@ -4499,7 +4535,9 @@ static void make_a_shift_mark()
was_block = true;
break;
}
// Does workinfo have (aged) sharesummaries?
/* Does workinfo have (aged) sharesummaries?
* keysharesummary will be the same,
* so doesn't need checking (and doesn't matter) */
looksharesummary.workinfoid = workinfo->workinfoid;
looksharesummary.userid = MAXID;
looksharesummary.workername = EMPTY;

287
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.114"
#define CKDB_VERSION DB_VERSION"-2.200"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -830,26 +830,47 @@ enum cmd_values {
char createby[TXT_SML+1]; \
char createcode[TXT_MED+1]; \
char createinet[TXT_MED+1]; \
tv_t expirydate
tv_t expirydate; \
bool buffers
#define HISTORYDATECONTROLPOINTERS \
tv_t createdate; \
char *createby; \
char *createcode; \
char *createinet; \
tv_t expirydate; \
bool pointers
#define HISTORYDATEINIT(_row, _cd, _by, _code, _inet) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->createby, _by); \
STRNCPY(_row->createcode, _code); \
STRNCPY(_row->createinet, _inet); \
_row->expirydate.tv_sec = default_expiry.tv_sec; \
_row->expirydate.tv_usec = default_expiry.tv_usec; \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->createby, _by); \
STRNCPY((_row)->createcode, _code); \
STRNCPY((_row)->createinet, _inet); \
(_row)->expirydate.tv_sec = default_expiry.tv_sec; \
(_row)->expirydate.tv_usec = default_expiry.tv_usec; \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define HISTORYDATEDEFAULT(_row, _cd) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->createby, by_default); \
STRNCPY(_row->createcode, (char *)__func__); \
STRNCPY(_row->createinet, inet_default); \
_row->expirydate.tv_sec = default_expiry.tv_sec; \
_row->expirydate.tv_usec = default_expiry.tv_usec; \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->createby, by_default); \
STRNCPY((_row)->createcode, (char *)__func__); \
STRNCPY((_row)->createinet, inet_default); \
(_row)->expirydate.tv_sec = default_expiry.tv_sec; \
(_row)->expirydate.tv_usec = default_expiry.tv_usec; \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define HISTORYDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
SET_CREATEBY(_list, (_row)->createby, _by); \
SET_CREATECODE(_list, (_row)->createcode, _code); \
SET_CREATEINET(_list, (_row)->createinet, _inet); \
(_row)->expirydate.tv_sec = default_expiry.tv_sec; \
(_row)->expirydate.tv_usec = default_expiry.tv_usec; \
(_row)->pointers = (_row)->pointers; \
} while (0)
/* Override _row defaults if transfer fields are present
@ -875,6 +896,7 @@ enum cmd_values {
DATA_TRANSFER(__transfer, __item); \
STRNCPY((_row)->createinet, __transfer->mvalue); \
} \
(_row)->buffers = (_row)->buffers; \
} \
} while (0)
@ -891,7 +913,7 @@ enum cmd_values {
char modifyby[TXT_SML+1]; \
char modifycode[TXT_MED+1]; \
char modifyinet[TXT_MED+1]; \
bool buffers;
bool buffers
#define MODIFYDATECONTROLPOINTERS \
tv_t createdate; \
char *createby; \
@ -901,50 +923,50 @@ enum cmd_values {
char *modifyby; \
char *modifycode; \
char *modifyinet; \
bool pointers;
bool pointers
#define MODIFYDATEINIT(_row, _cd, _by, _code, _inet) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->createby, _by); \
STRNCPY(_row->createcode, _code); \
STRNCPY(_row->createinet, _inet); \
DATE_ZERO(&(_row->modifydate)); \
_row->modifyby[0] = '\0'; \
_row->modifycode[0] = '\0'; \
_row->modifyinet[0] = '\0'; \
_row->buffers = _row->buffers; \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->createby, _by); \
STRNCPY((_row)->createcode, _code); \
STRNCPY((_row)->createinet, _inet); \
DATE_ZERO(&((_row)->modifydate)); \
(_row)->modifyby[0] = '\0'; \
(_row)->modifycode[0] = '\0'; \
(_row)->modifyinet[0] = '\0'; \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define MODIFYUPDATE(_row, _cd, _by, _code, _inet) do { \
_row->modifydate.tv_sec = (_cd)->tv_sec; \
_row->modifydate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->modifyby, _by); \
STRNCPY(_row->modifycode, _code); \
STRNCPY(_row->modifyinet, _inet); \
_row->buffers = _row->buffers; \
(_row)->modifydate.tv_sec = (_cd)->tv_sec; \
(_row)->modifydate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->modifyby, _by); \
STRNCPY((_row)->modifycode, _code); \
STRNCPY((_row)->modifyinet, _inet); \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define MODIFYDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
SET_CREATEBY(_list, _row->createby, _by); \
SET_CREATECODE(_list, _row->createcode, _code); \
SET_CREATEINET(_list, _row->createinet, _inet); \
DATE_ZERO(&(_row->modifydate)); \
SET_MODIFYBY(_list, _row->modifyby, EMPTY); \
SET_MODIFYCODE(_list, _row->modifycode, EMPTY); \
SET_MODIFYINET(_list, _row->modifyinet, EMPTY); \
_row->pointers = _row->pointers; \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
SET_CREATEBY(_list, (_row)->createby, _by); \
SET_CREATECODE(_list, (_row)->createcode, _code); \
SET_CREATEINET(_list, (_row)->createinet, _inet); \
DATE_ZERO(&((_row)->modifydate)); \
SET_MODIFYBY(_list, (_row)->modifyby, EMPTY); \
SET_MODIFYCODE(_list, (_row)->modifycode, EMPTY); \
SET_MODIFYINET(_list, (_row)->modifyinet, EMPTY); \
(_row)->pointers = (_row)->pointers; \
} while (0)
#define MODIFYUPDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \
_row->modifydate.tv_sec = (_cd)->tv_sec; \
_row->modifydate.tv_usec = (_cd)->tv_usec; \
SET_MODIFYBY(_list, _row->modifyby, _by); \
SET_MODIFYCODE(_list, _row->modifycode, _code); \
SET_MODIFYINET(_list, _row->modifyinet, _inet); \
_row->pointers = _row->pointers; \
(_row)->modifydate.tv_sec = (_cd)->tv_sec; \
(_row)->modifydate.tv_usec = (_cd)->tv_usec; \
SET_MODIFYBY(_list, (_row)->modifyby, _by); \
SET_MODIFYCODE(_list, (_row)->modifycode, _code); \
SET_MODIFYINET(_list, (_row)->modifyinet, _inet); \
(_row)->pointers = (_row)->pointers; \
} while (0)
/* Override _row defaults if transfer fields are present
@ -959,19 +981,19 @@ enum cmd_values {
__item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \
SET_CREATEBY(_list, (_row)->createby, __transfer->mvalue); \
} \
__item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \
SET_CREATECODE(_list, (_row)->createcode, __transfer->mvalue); \
} \
__item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \
SET_CREATEINET(_list, (_row)->createinet, __transfer->mvalue); \
} \
_row->pointers = _row->pointers; \
(_row)->pointers = (_row)->pointers; \
} \
} while (0)
@ -981,22 +1003,40 @@ enum cmd_values {
tv_t createdate; \
char createby[TXT_SML+1]; \
char createcode[TXT_MED+1]; \
char createinet[TXT_MED+1]
char createinet[TXT_MED+1]; \
bool buffers
#define SIMPLEDATECONTROLPOINTERS \
tv_t createdate; \
char *createby; \
char *createcode; \
char *createinet; \
bool pointers
#define SIMPLEDATEINIT(_row, _cd, _by, _code, _inet) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->createby, _by); \
STRNCPY(_row->createcode, _code); \
STRNCPY(_row->createinet, _inet); \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->createby, _by); \
STRNCPY((_row)->createcode, _code); \
STRNCPY((_row)->createinet, _inet); \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define SIMPLEDATEDEFAULT(_row, _cd) do { \
_row->createdate.tv_sec = (_cd)->tv_sec; \
_row->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY(_row->createby, by_default); \
STRNCPY(_row->createcode, (char *)__func__); \
STRNCPY(_row->createinet, inet_default); \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
STRNCPY((_row)->createby, by_default); \
STRNCPY((_row)->createcode, (char *)__func__); \
STRNCPY((_row)->createinet, inet_default); \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define SIMPLEDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \
(_row)->createdate.tv_sec = (_cd)->tv_sec; \
(_row)->createdate.tv_usec = (_cd)->tv_usec; \
SET_CREATEBY(_list, (_row)->createby, _by); \
SET_CREATECODE(_list, (_row)->createcode, _code); \
SET_CREATEINET(_list, (_row)->createinet, _inet); \
(_row)->pointers = (_row)->pointers; \
} while (0)
/* Override _row defaults if transfer fields are present
@ -1009,18 +1049,42 @@ enum cmd_values {
__item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
STRNCPY(_row->createby, __transfer->mvalue); \
STRNCPY((_row)->createby, __transfer->mvalue); \
} \
__item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
STRNCPY((_row)->createcode, __transfer->mvalue); \
} \
__item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
STRNCPY((_row)->createinet, __transfer->mvalue); \
} \
(_row)->buffers = (_row)->buffers; \
} while (0)
#define SIMPLEDATEPTRTRANSFER(_list, _root, _row) do { \
char __reply[16]; \
size_t __siz = sizeof(__reply); \
K_ITEM *__item; \
TRANSFER *__transfer; \
__item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATEBY(_list, (_row)->createby, __transfer->mvalue); \
} \
__item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
STRNCPY(_row->createcode, __transfer->mvalue); \
SET_CREATECODE(_list, (_row)->createcode, __transfer->mvalue); \
} \
__item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
STRNCPY(_row->createinet, __transfer->mvalue); \
SET_CREATEINET(_list, (_row)->createinet, __transfer->mvalue); \
} \
(_row)->pointers = (_row)->pointers; \
} while (0)
// LOGQUEUE
@ -2601,6 +2665,84 @@ extern K_STORE *markersummary_pool_store;
extern char mark_start_type;
extern int64_t mark_start;
// KEYSHARESUMMARY
typedef struct keysharesummary {
int64_t workinfoid;
char keytype[TXT_FLAG+1];
char *key;
double diffacc;
double diffsta;
double diffdup;
double diffhi;
double diffrej;
double shareacc;
double sharesta;
double sharedup;
double sharehi;
double sharerej;
int64_t sharecount;
int64_t errorcount;
tv_t firstshare;
tv_t lastshare;
tv_t firstshareacc;
tv_t lastshareacc;
double lastdiffacc;
char complete[TXT_FLAG+1];
SIMPLEDATECONTROLPOINTERS;
} KEYSHARESUMMARY;
#define ALLOC_KEYSHARESUMMARY 1000
#define LIMIT_KEYSHARESUMMARY 0
#define INIT_KEYSHARESUMMARY(_item) INIT_GENERIC(_item, keysharesummary)
#define DATA_KEYSHARESUMMARY(_var, _item) DATA_GENERIC(_var, _item, keysharesummary, true)
#define DATA_KEYSHARESUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, keysharesummary, false)
extern K_TREE *keysharesummary_root;
extern K_LIST *keysharesummary_free;
extern K_STORE *keysharesummary_store;
// KEYSUMMARY
typedef struct keysummary {
int64_t markerid;
char keytype[TXT_FLAG+1];
char *key;
double diffacc;
double diffsta;
double diffdup;
double diffhi;
double diffrej;
double shareacc;
double sharesta;
double sharedup;
double sharehi;
double sharerej;
int64_t sharecount;
int64_t errorcount;
tv_t firstshare;
tv_t lastshare;
tv_t firstshareacc;
tv_t lastshareacc;
double lastdiffacc;
SIMPLEDATECONTROLPOINTERS;
} KEYSUMMARY;
#define ALLOC_KEYSUMMARY 1000
#define LIMIT_KEYSUMMARY 0
#define INIT_KEYSUMMARY(_item) INIT_GENERIC(_item, keysummary)
#define DATA_KEYSUMMARY(_var, _item) DATA_GENERIC(_var, _item, keysummary, true)
#define DATA_KEYSUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, keysummary, false)
extern K_TREE *keysummary_root;
extern K_LIST *keysummary_free;
extern K_STORE *keysummary_store;
#define KEYTYPE_IP 'i'
#define KEYTYPE_IP_STR "i"
#define KEYIP(_keytype) (tolower((_keytype)[0]) == KEYTYPE_IP)
#define KEYTYPE_AGENT 'a'
#define KEYTYPE_AGENT_STR "a"
#define KEYAGENT(_keytype) (tolower((_keytype)[0]) == KEYTYPE_AGENT)
// WORKMARKERS
typedef struct workmarkers {
int64_t markerid;
@ -2811,6 +2953,8 @@ extern void free_payouts_data(K_ITEM *item);
extern void free_ips_data(K_ITEM *item);
extern void free_optioncontrol_data(K_ITEM *item);
extern void free_markersummary_data(K_ITEM *item);
extern void free_keysharesummary_data(K_ITEM *item);
extern void free_keysummary_data(K_ITEM *item);
extern void free_workmarkers_data(K_ITEM *item);
extern void free_marks_data(K_ITEM *item);
#define free_seqset_data(_item) _free_seqset_data(_item)
@ -3101,6 +3245,10 @@ extern K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
int64_t userid, char *workername, bool pool);
extern bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
extern cmp_t cmp_keysharesummary(K_ITEM *a, K_ITEM *b);
extern void zero_keysharesummary(KEYSHARESUMMARY *row);
extern K_ITEM *find_keysharesummary(int64_t workinfoid, char keytype, char *key);
extern cmp_t cmp_keysummary(K_ITEM *a, K_ITEM *b);
extern void dsp_workmarkers(K_ITEM *item, FILE *stream);
extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b);
@ -3264,6 +3412,7 @@ extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
_sharesummary_age(_ss_item, _by, _code, _inet, _cd, WHERE_FFL_HERE)
extern bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
tv_t *cd, WHERE_FFL_ARGS);
extern bool keysharesummary_age(K_ITEM *kss_item);
extern bool sharesummary_fill(PGconn *conn);
extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
double diffacc, double diffinv, double shareacc,
@ -3325,6 +3474,8 @@ extern bool userstats_fill(PGconn *conn);
extern bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root);
extern bool markersummary_fill(PGconn *conn);
extern bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root);
#define workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \
_workinfoidend, _workinfoidstart, _description, \
_status, _by, _code, _inet, _cd, _trf_root) \

2
src/ckdb_cmd.c

@ -6476,7 +6476,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
* Of course if you don't have ALL the necessary shares in
* the CCLs then you'd lose data doing this
*
* SS_to_MS will complain if any markersummaries already exist
* K/SS_to_K/MS will complain if any markersummaries already exist
* when processing a workmarker
* Normally you would use 'processed' if the markersummaries
* are OK, and just the workmarker failed to be updated to

166
src/ckdb_data.c

@ -121,6 +121,30 @@ void free_markersummary_data(K_ITEM *item)
SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY);
}
void free_keysharesummary_data(K_ITEM *item)
{
KEYSHARESUMMARY *keysharesummary;
DATA_KEYSHARESUMMARY(keysharesummary, item);
LIST_MEM_SUB(keysharesummary_free, keysharesummary->key);
FREENULL(keysharesummary->key);
SET_CREATEBY(keysharesummary_free, keysharesummary->createby, EMPTY);
SET_CREATECODE(keysharesummary_free, keysharesummary->createcode, EMPTY);
SET_CREATEINET(keysharesummary_free, keysharesummary->createinet, EMPTY);
}
void free_keysummary_data(K_ITEM *item)
{
KEYSUMMARY *keysummary;
DATA_KEYSUMMARY(keysummary, item);
LIST_MEM_SUB(keysummary_free, keysummary->key);
FREENULL(keysummary->key);
SET_CREATEBY(keysummary_free, keysummary->createby, EMPTY);
SET_CREATECODE(keysummary_free, keysummary->createcode, EMPTY);
SET_CREATEINET(keysummary_free, keysummary->createinet, EMPTY);
}
void free_workmarkers_data(K_ITEM *item)
{
WORKMARKERS *workmarkers;
@ -2111,15 +2135,17 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item;
K_ITEM *wm_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1];
K_ITEM ks_look, *ks_item, *wm_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1], ks_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
int64_t ks_tot, ks_already, ks_failed;
int64_t diff_tot;
KEYSHARESUMMARY lookkeysharesummary, *keysharesummary;
SHARESUMMARY looksharesummary, *sharesummary;
WORKINFO *workinfo;
SHARES lookshares, *shares;
bool ok = false, skipupdate;
bool ok = false, ksok = false, skipupdate;
char error[1024];
LOGDEBUG("%s(): age", __func__);
@ -2280,8 +2306,70 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
shares_dumped, diff_tot);
}
}
INIT_KEYSHARESUMMARY(&ks_look);
// Find the first matching keysharesummary
lookkeysharesummary.workinfoid = workinfoid;
lookkeysharesummary.keytype[0] = '\0';
lookkeysharesummary.key = EMPTY;
ksok = true;
ks_tot = ks_already = ks_failed = 0;
ks_look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free);
ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx);
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
while (ks_item && keysharesummary->workinfoid == workinfoid) {
ks_tot++;
skipupdate = false;
/* Reloading during a confirm will not have any old data
* so finding an aged keysharesummary here is an error
* N.B. this can only happen with (very) old reload files */
if (reloading) {
if (keysharesummary->complete[0] == SUMMARY_COMPLETE) {
ks_already++;
skipupdate = true;
if (confirm_sharesummary) {
LOGERR("%s(): Duplicate %s found during confirm %"PRId64"/%s/%s",
__func__, __func__,
keysharesummary->workinfoid,
keysharesummary->keytype,
keysharesummary->key);
}
}
}
if (!skipupdate) {
if (!keysharesummary_age(ks_item)) {
ks_failed++;
LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s",
__func__, keysharesummary->workinfoid,
keysharesummary->keytype,
keysharesummary->key);
ksok = false;
}
}
/* All shares should have been discarded during sharesummary
* processing above */
K_RLOCK(keysharesummary_free);
ks_item = next_in_ktree(ks_ctx);
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
}
if (ks_already) {
LOGNOTICE("%s(): Keysummary aging of %"PRId64"/%s "
"kstotal=%"PRId64" already=%"PRId64" failed=%"PRId64,
__func__, workinfoid, poolinstance,
ks_tot, ks_already, ks_failed);
}
bye:
return ok;
return (ok && ksok);
}
// Block height coinbase reward value
@ -2533,7 +2621,8 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
age_id = prev_found;
// Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found
/* Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found
* Unaged keysharesummaries will have the same workinfoids */
looksharesummary.workinfoid = prev_found;
looksharesummary.userid = -1;
looksharesummary.workername = EMPTY;
@ -2571,6 +2660,7 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
* been missed and can report the range of data that was aged,
* which would normally just be an approx 10min set of workinfoids
* from the last time ckpool stopped
* workinfo_age also processes the matching keysharesummaries
* Each next group of unaged sharesummaries following this, will be
* picked up by each next aging */
wid_count = 0;
@ -2870,7 +2960,8 @@ void set_block_share_counters()
ws_item = NULL;
/* From the end backwards so we can skip the workinfoid's we don't
* want by jumping back to just before the current worker when the
* workinfoid goes below the limit */
* workinfoid goes below the limit
* N.B. keysharesummaries duplicate the totals, so are ignored */
ss_item = last_in_ktree(sharesummary_root, ctx);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
@ -5657,6 +5748,69 @@ flailed:
return ok;
}
// order by workinfoid asc,keytype asc,key asc (has no expirydate)
cmp_t cmp_keysharesummary(K_ITEM *a, K_ITEM *b)
{
KEYSHARESUMMARY *ka, *kb;
DATA_KEYSHARESUMMARY(ka, a);
DATA_KEYSHARESUMMARY(kb, b);
cmp_t c = CMP_BIGINT(ka->workinfoid, kb->workinfoid);
if (c == 0) {
c = CMP_STR(ka->keytype, kb->keytype);
if (c == 0)
c = CMP_STR(ka->key, kb->key);
}
return c;
}
void zero_keysharesummary(KEYSHARESUMMARY *row)
{
LIST_WRITE(keysharesummary_free);
row->diffacc = row->diffsta = row->diffdup = row->diffhi =
row->diffrej = row->shareacc = row->sharesta = row->sharedup =
row->sharehi = row->sharerej = 0.0;
row->sharecount = row->errorcount = 0;
DATE_ZERO(&(row->firstshare));
DATE_ZERO(&(row->lastshare));
DATE_ZERO(&(row->firstshareacc));
DATE_ZERO(&(row->lastshareacc));
row->lastdiffacc = 0;
row->complete[0] = SUMMARY_NEW;
row->complete[1] = '\0';
}
// Must be R or W locked
K_ITEM *find_keysharesummary(int64_t workinfoid, char keytype, char *key)
{
KEYSHARESUMMARY keysharesummary;
K_TREE_CTX ctx[1];
K_ITEM look;
keysharesummary.workinfoid = workinfoid;
keysharesummary.keytype[0] = keytype;
keysharesummary.keytype[1] = '\0';
keysharesummary.key = key;
INIT_KEYSHARESUMMARY(&look);
look.data = (void *)(&keysharesummary);
return find_in_ktree(keysharesummary_root, &look, ctx);
}
// order by markerid asc,keytype asc,key asc (has no expirydate)
cmp_t cmp_keysummary(K_ITEM *a, K_ITEM *b)
{
KEYSUMMARY *ka, *kb;
DATA_KEYSUMMARY(ka, a);
DATA_KEYSUMMARY(kb, b);
cmp_t c = CMP_BIGINT(ka->markerid, kb->markerid);
if (c == 0) {
c = CMP_STR(ka->keytype, kb->keytype);
if (c == 0)
c = CMP_STR(ka->key, kb->key);
}
return c;
}
void dsp_workmarkers(K_ITEM *item, FILE *stream)
{
WORKMARKERS *wm;

525
src/ckdb_dbio.c

@ -187,7 +187,8 @@ char *pqerrmsg(PGconn *conn)
#define PQPARAM18 PQPARAM16 ",$17,$18"
#define PQPARAM21 PQPARAM16 ",$17,$18,$19,$20,$21"
#define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22"
#define PQPARAM23 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23"
#define PQPARAM23 PQPARAM22 ",$23"
#define PQPARAM24 PQPARAM22 ",$23,$24"
#define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26"
#define PQPARAM27 PQPARAM26 ",$27"
#define PQPARAM28 PQPARAM26 ",$27,$28"
@ -2636,6 +2637,7 @@ void oc_ips(OPTIONCONTROL *oc, const char *from)
if (!colon) {
LOGERR("%s(%s): ERR: Missing ':' after IP '%s' name '%s'",
from, __func__, oc->optionvalue, oc->optionname);
return;
}
STRNCPY(ips.eventname, colon+1);
@ -3177,14 +3179,9 @@ bool workinfo_fill(PGconn *conn)
printf(TICK_PREFIX"wi 0\r");
fflush(stdout);
// TODO: select the data based on sharesummary since old data isn't needed
// however, the ageing rules for workinfo will decide that also
// keep the last block + current? Rules will depend on payout scheme also
APPEND_REALLOC_INIT(sel, off, len);
APPEND_REALLOC(sel, off, len,
"declare wi cursor for select "
// "workinfoid,poolinstance,transactiontree,merklehash,prevhash,"
"workinfoid,poolinstance,merklehash,prevhash,"
"coinbase1,coinbase2,version,bits,ntime,reward"
HISTORYDATECONTROL
@ -3285,13 +3282,6 @@ bool workinfo_fill(PGconn *conn)
break;
TXT_TO_BIGINT("workinfoid", field, row->workinfoid);
/* Not currently needed in RAM
PQ_GET_FLD(res, i, "transactiontree", field, ok);
if (!ok)
break;
TXT_TO_BLOB("transactiontree", field, row->transactiontree);
LIST_MEM_ADD(workinfo_free, row->transactiontree);
*/
row->transactiontree = EMPTY;
PQ_GET_FLD(res, i, "merklehash", field, ok);
@ -3386,8 +3376,10 @@ bool workinfo_fill(PGconn *conn)
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
for (i = 0; i < par; i++)
free(params[i]);
par = 0;
free(sel);
if (ok) {
@ -3642,7 +3634,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
SHARES *shares = NULL, *shares2 = NULL;
double sdiff_amt;
USERS *users;
bool ok = false;
bool ok = false, dup = false;
char *st = NULL;
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
@ -3733,7 +3725,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
add_to_ktree(shares_early_root, s_item);
k_add_head(shares_early_store, s_item);
if (s2_item) {
/* Just ignore duplicates - this matches the DB index
/* Discard duplicates - this matches the DB index
N.B. a duplicate share doesn't have to be SE_DUPE,
two shares can be SE_NONE and SE_STALE */
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
@ -3743,11 +3735,23 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
add_to_ktree(shares_db_root, s2_item);
k_add_head(shares_hi_store, s2_item);
} else {
dup = true;
k_add_head(shares_free, s2_item);
s2_item = NULL;
}
}
K_WUNLOCK(shares_free);
if (dup) {
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGWARNING("%s() duplicate DB share discarded: "
"%"PRId64"/%s/%"PRId32"/%s/%.0f/%"PRId32"/%ld,%ld %s",
__func__, shares->workinfoid,
st = safe_text_nonull(workername),
shares->clientid, shares->nonce,
shares->sdiff, shares->errn,
cd->tv_sec, cd->tv_usec, cd_buf);
FREENULL(st);
}
/* It was all OK except the missing workinfoid
* and it was queued, so most likely OK */
return true;
@ -3759,18 +3763,30 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
add_to_ktree(shares_root, s_item);
k_add_head(shares_store, s_item);
if (s2_item) {
// Just ignore duplicates
// Discard duplicates
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
if (tmp_item == NULL) {
add_to_ktree(shares_hi_root, s2_item);
add_to_ktree(shares_db_root, s2_item);
k_add_head(shares_hi_store, s2_item);
} else {
dup = true;
k_add_head(shares_free, s2_item);
s2_item = NULL;
}
}
K_WUNLOCK(shares_free);
if (dup) {
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGWARNING("%s() duplicate DB share discarded: "
"%"PRId64"/%s/%"PRId32"/%s/%.0f/%"PRId32"/%ld,%ld %s",
__func__, shares->workinfoid,
st = safe_text_nonull(workername),
shares->clientid, shares->nonce,
shares->sdiff, shares->errn,
cd->tv_sec, cd->tv_usec, cd_buf);
FREENULL(st);
}
shares_process_early(conn, wi_item, &(shares->createdate),
trf_root);
@ -3874,10 +3890,11 @@ bool shares_fill(PGconn *conn)
char *field;
char *sel = NULL;
char *params[1];
int fields = 14, par = 0;
int fields = 16, par = 0;
bool ok = false;
int64_t workinfoid;
tv_t old;
int no_addr = 0, no_agent = 0;
LOGDEBUG("%s(): select", __func__);
@ -3916,7 +3933,8 @@ bool shares_fill(PGconn *conn)
sel = "declare sh cursor for select "
"workinfoid,userid,workername,clientid,enonce1,nonce2,nonce,"
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff"
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff,agent,"
"address"
HISTORYDATECONTROL
" from shares where workinfoid>=$1";
par = 0;
@ -4053,6 +4071,20 @@ bool shares_fill(PGconn *conn)
if (!ok)
break;
PQ_GET_FLD(res, i, "agent", field, ok);
if (!ok)
break;
if (!(*field))
no_agent++;
TXT_TO_STR("agent", field, row->agent);
PQ_GET_FLD(res, i, "address", field, ok);
if (!ok)
break;
if (!(*field))
no_addr++;
TXT_TO_STR("address", field, row->address);
add_to_ktree(shares_db_root, item);
k_add_head(shares_hi_store, item);
@ -4086,6 +4118,15 @@ flail:
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): fetched %d shares records", __func__, n);
if (no_addr || no_agent) {
if (no_addr == no_agent) {
LOGWARNING(" %d had no address and agent",
no_addr);
} else {
LOGWARNING(" %d had no address %d had no agent",
no_addr, no_agent);
}
}
}
return ok;
@ -4435,25 +4476,37 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_TREE *trf_root)
{
// shorter name for log messages
static const char *shortname = "SS_to_MS";
static const char *shortname = "K/SS_to_K/MS";
static const char *sshortname = "SS_to_MS";
static const char *kshortname = "KSS_to_KS";
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ss_ctx[1], ms_ctx[1];
K_TREE_CTX ss_ctx[1], kss_ctx[1], ms_ctx[1], ks_ctx[1];
SHARESUMMARY *sharesummary, looksharesummary;
KEYSHARESUMMARY *keysharesummary, lookkeysharesummary;
MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary = NULL;
K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look;
K_ITEM *p_ss_item, *p_ms_item;
KEYSUMMARY *keysummary, lookkeysummary;
K_ITEM *ss_item, *ss_prev, ss_look;
K_ITEM *kss_item, *kss_prev, kss_look;
K_ITEM *ms_item, ms_look, *p_ss_item, *p_ms_item;
K_ITEM *ks_item, ks_look;
bool ok = false, conned = false;
int64_t diffacc, shareacc;
int64_t diffacc = 0, shareacc = 0;
int64_t kdiffacc= 0, kshareacc = 0;
char *reason = NULL;
int ss_count, ms_count;
int ss_count, kss_count, ms_count, ks_count;
char *st = NULL;
tv_t add_stt, add_fin, db_stt, db_fin, lck_stt, lck_got, lck_fin;
tv_t kadd_stt, kadd_fin, kdb_stt, kdb_fin;
DATE_ZERO(&add_stt);
DATE_ZERO(&add_fin);
DATE_ZERO(&db_stt);
DATE_ZERO(&db_fin);
DATE_ZERO(&kadd_stt);
DATE_ZERO(&kadd_fin);
DATE_ZERO(&kdb_stt);
DATE_ZERO(&kdb_fin);
LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s",
@ -4462,12 +4515,16 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
workmarkers->description, workmarkers->status);
K_STORE *old_sharesummary_store = k_new_store(sharesummary_free);
K_STORE *old_keysharesummary_store = k_new_store(keysharesummary_free);
K_STORE *new_markersummary_store = k_new_store(markersummary_free);
K_STORE *new_keysummary_store = k_new_store(keysummary_free);
/* Use the master size for this local tree since
* it's large and doesn't get created often */
K_TREE *ms_root = new_ktree_local(shortname, cmp_markersummary,
/* Use the master size for this local trees since
* they're large and doesn't get created often */
K_TREE *ms_root = new_ktree_local(sshortname, cmp_markersummary,
markersummary_free);
K_TREE *ks_root = new_ktree_local(kshortname, cmp_keysummary,
keysummary_free);
if (!CURRENT(&(workmarkers->expirydate))) {
reason = "unexpired";
@ -4480,7 +4537,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
}
setnow(&add_stt);
// Check there aren't already any matching markersummaries
/* Check there aren't already any matching markersummaries
* and assume keysummaries are the same */
lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = 0;
lookmarkersummary.workername = EMPTY;
@ -4505,7 +4563,6 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
goto flail;
}
diffacc = shareacc = 0;
ms_item = NULL;
looksharesummary.workinfoid = workmarkers->workinfoidend;
@ -4603,6 +4660,106 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
}
setnow(&add_fin);
setnow(&kadd_stt);
ks_item = NULL;
lookkeysharesummary.workinfoid = workmarkers->workinfoidend;
lookkeysharesummary.keytype[0] = '\0';;
lookkeysharesummary.key = EMPTY;
INIT_KEYSHARESUMMARY(&kss_look);
kss_look.data = (void *)(&lookkeysharesummary);
/* Since shares come in from ckpool at a high rate,
* we don't want to lock keysharesummary for long
* Those incoming shares will not be touching the keysharesummaries
* we are processing here */
K_RLOCK(keysharesummary_free);
kss_item = find_before_in_ktree(keysharesummary_root,
&kss_look, kss_ctx);
K_RUNLOCK(keysharesummary_free);
while (kss_item) {
DATA_KEYSHARESUMMARY(keysharesummary, kss_item);
if (keysharesummary->workinfoid < workmarkers->workinfoidstart)
break;
K_RLOCK(keysharesummary_free);
kss_prev = prev_in_ktree(kss_ctx);
K_RUNLOCK(keysharesummary_free);
// Find/create the keysummary only once per key change
if (!ks_item || strcmp(keysummary->keytype, keysharesummary->keytype) != 0 ||
strcmp(keysummary->key, keysharesummary->key) != 0) {
lookkeysummary.markerid = workmarkers->markerid;
lookkeysummary.keytype[0] = keysharesummary->keytype[0];
lookkeysummary.keytype[1] = keysharesummary->keytype[1];
lookkeysummary.key = keysharesummary->key;
ks_look.data = (void *)(&lookkeysummary);
ks_item = find_in_ktree_nolock(ks_root, &ks_look, ks_ctx);
if (!ks_item) {
K_WLOCK(keysummary_free);
ks_item = k_unlink_head(keysummary_free);
K_WUNLOCK(keysummary_free);
k_add_head_nolock(new_keysummary_store, ks_item);
DATA_KEYSUMMARY(keysummary, ks_item);
bzero(keysummary, sizeof(*keysummary));
keysummary->markerid = workmarkers->markerid;
keysummary->keytype[0] = keysharesummary->keytype[0];
keysummary->keytype[1] = '\0';
DUP_POINTER(keysummary_free,
keysummary->key,
keysharesummary->key);
add_to_ktree_nolock(ks_root, ks_item);
LOGDEBUG("%s() new ks %"PRId64"/%s/%s",
shortname, keysummary->markerid,
keysummary->keytype,
st = safe_text(keysummary->key));
FREENULL(st);
} else {
DATA_KEYSUMMARY(keysummary, ks_item);
}
}
keysummary->diffacc += keysharesummary->diffacc;
keysummary->diffsta += keysharesummary->diffsta;
keysummary->diffdup += keysharesummary->diffdup;
keysummary->diffhi += keysharesummary->diffhi;
keysummary->diffrej += keysharesummary->diffrej;
keysummary->shareacc += keysharesummary->shareacc;
keysummary->sharesta += keysharesummary->sharesta;
keysummary->sharedup += keysharesummary->sharedup;
keysummary->sharehi += keysharesummary->sharehi;
keysummary->sharerej += keysharesummary->sharerej;
keysummary->sharecount += keysharesummary->sharecount;
keysummary->errorcount += keysharesummary->errorcount;
if (!keysummary->firstshare.tv_sec ||
!tv_newer(&(keysummary->firstshare), &(keysharesummary->firstshare))) {
copy_tv(&(keysummary->firstshare), &(keysharesummary->firstshare));
}
if (tv_newer(&(keysummary->lastshare), &(keysharesummary->lastshare)))
copy_tv(&(keysummary->lastshare), &(keysharesummary->lastshare));
if (keysharesummary->diffacc > 0) {
if (!keysummary->firstshareacc.tv_sec ||
!tv_newer(&(keysummary->firstshareacc), &(keysharesummary->firstshareacc))) {
copy_tv(&(keysummary->firstshareacc), &(keysharesummary->firstshareacc));
}
if (tv_newer(&(keysummary->lastshareacc), &(keysharesummary->lastshareacc))) {
copy_tv(&(keysummary->lastshareacc), &(keysharesummary->lastshareacc));
keysummary->lastdiffacc = keysharesummary->lastdiffacc;
}
}
kdiffacc += keysharesummary->diffacc;
kshareacc += keysharesummary->shareacc;
K_WLOCK(keysharesummary_free);
k_unlink_item(keysharesummary_store, kss_item);
K_WUNLOCK(keysharesummary_free);
k_add_head_nolock(old_keysharesummary_store, kss_item);
kss_item = kss_prev;
}
setnow(&kadd_fin);
setnow(&db_stt);
if (conn == NULL) {
conn = dbconnect();
@ -4628,6 +4785,19 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
}
ms_item = ms_item->next;
}
setnow(&db_fin);
setnow(&kdb_stt);
ks_item = STORE_HEAD_NOLOCK(new_keysummary_store);
while (ks_item) {
if (!(keysummary_add(conn, ks_item, by, code, inet,
cd, trf_root))) {
reason = "db error";
setnow(&kdb_fin);
goto rollback;
}
ks_item = ks_item->next;
}
ok = workmarkers_process(conn, true, true,
workmarkers->markerid,
@ -4637,7 +4807,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
workmarkers->description,
MARKER_PROCESSED_STR,
by, code, inet, cd, trf_root);
setnow(&db_fin);
setnow(&kdb_fin);
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
@ -4652,10 +4822,11 @@ flail:
if (reason) {
// already displayed the full workmarkers detail at the top
LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s add=%.3fs "
"db=%.3fs",
"kadd=%.3fs db=%.3fs kdb=%.3fs",
shortname, reason, workmarkers->markerid,
workmarkers->description, workmarkers->status,
tvdiff(&add_fin, &add_stt), tvdiff(&db_fin, &db_stt));
tvdiff(&add_fin, &add_stt), tvdiff(&kadd_fin, &kadd_stt),
tvdiff(&db_fin, &db_stt), tvdiff(&kdb_fin, &kdb_stt));
ok = false;
}
@ -4678,13 +4849,34 @@ flail:
k_list_transfer_to_head(old_sharesummary_store, sharesummary_store);
K_WUNLOCK(sharesummary_free);
}
if (new_keysummary_store->count > 0) {
// Throw them away (they don't exist anywhere else)
ks_item = STORE_HEAD_NOLOCK(new_keysummary_store);
while (ks_item) {
free_keysummary_data(ks_item);
ks_item = ks_item->next;
}
K_WLOCK(keysummary_free);
k_list_transfer_to_head(new_keysummary_store, keysummary_free);
K_WUNLOCK(keysummary_free);
}
if (old_keysharesummary_store->count > 0) {
// Put them back in the store where they came from
K_WLOCK(keysharesummary_free);
k_list_transfer_to_head(old_keysharesummary_store, keysharesummary_store);
K_WUNLOCK(keysharesummary_free);
}
} else {
ms_count = new_markersummary_store->count;
ks_count = new_keysummary_store->count;
ss_count = old_sharesummary_store->count;
kss_count = old_keysharesummary_store->count;
setnow(&lck_stt);
K_WLOCK(sharesummary_free);
K_WLOCK(keysharesummary_free);
K_WLOCK(markersummary_free);
K_WLOCK(keysummary_free);
K_RLOCK(workmarkers_free);
setnow(&lck_got);
ms_item = STORE_HEAD_NOLOCK(new_markersummary_store);
@ -4711,6 +4903,14 @@ flail:
}
k_list_transfer_to_head(new_markersummary_store, markersummary_store);
ks_item = STORE_HEAD_NOLOCK(new_keysummary_store);
while (ks_item) {
// move the new keysummaries into the tree
add_to_ktree(keysummary_root, ks_item);
ks_item = ks_item->next;
}
k_list_transfer_to_head(new_keysummary_store, keysummary_store);
/* For normal shift processing this wont be very quick
* so it will be a 'long' LOCK */
ss_item = STORE_HEAD_NOLOCK(old_sharesummary_store);
@ -4733,32 +4933,54 @@ flail:
ss_item = ss_item->next;
}
k_list_transfer_to_head(old_sharesummary_store, sharesummary_free);
/* For normal shift processing this wont be very quick
* so it will be a 'long' LOCK */
kss_item = STORE_HEAD_NOLOCK(old_keysharesummary_store);
while (kss_item) {
// remove the old keysharesummaries from the trees
remove_from_ktree(keysharesummary_root, kss_item);
free_keysharesummary_data(kss_item);
kss_item = kss_item->next;
}
k_list_transfer_to_head(old_keysharesummary_store, keysharesummary_free);
K_RUNLOCK(workmarkers_free);
K_WUNLOCK(keysummary_free);
K_WUNLOCK(markersummary_free);
K_WUNLOCK(keysharesummary_free);
K_WUNLOCK(sharesummary_free);
setnow(&lck_fin);
LOGWARNING("%s() Processed: %d ms %d ss %"PRId64" shares "
"%"PRId64" diff for workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s add=%.3f "
"db=%.3fs lck=%.3fs+%.3f",
shortname, ms_count, ss_count, shareacc, diffacc,
LOGWARNING("%s() Processed: %d ms %d ks %d ss %d kss "
"%"PRId64" shares %"PRId64" diff "
"k(%"PRId64"/%"PRId64") for workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s add=%.3fs "
"kadd=%.3fs db=%.3fs kdb=%.3fs lck=%.3f+%.3fs",
shortname, ms_count, ks_count, ss_count, kss_count,
shareacc, diffacc, kshareacc, kdiffacc,
workmarkers->markerid, workmarkers->poolinstance,
workmarkers->workinfoidend,
workmarkers->workinfoidstart,
workmarkers->description,
workmarkers->status, tvdiff(&add_fin, &add_stt),
tvdiff(&kadd_fin, &kadd_stt),
tvdiff(&db_fin, &db_stt),
tvdiff(&kdb_fin, &kdb_stt),
tvdiff(&lck_got, &lck_stt),
tvdiff(&lck_fin, &lck_got));
}
free_ktree(ms_root, NULL);
free_ktree(ks_root, NULL);
new_markersummary_store = k_free_store(new_markersummary_store);
new_keysummary_store = k_free_store(new_keysummary_store);
old_sharesummary_store = k_free_store(old_sharesummary_store);
old_keysharesummary_store = k_free_store(old_keysharesummary_store);
return ok;
}
// TODO: keysummaries ...
bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm)
{
// shorter name for log messages
@ -4988,6 +5210,62 @@ static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
K_WUNLOCK(sharesummary_free);
}
static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row,
bool new)
{
K_WLOCK(keysharesummary_free);
if (new) {
zero_keysharesummary(row);
copy_tv(&(row->firstshare), &(s_row->createdate));
copy_tv(&(row->lastshare), &(s_row->createdate));
} else {
if (!row->firstshare.tv_sec ||
!tv_newer(&(row->firstshare), &(s_row->createdate))) {
copy_tv(&(row->firstshare), &(s_row->createdate));
}
if (tv_newer(&(row->lastshare), &(s_row->createdate)))
copy_tv(&(row->lastshare), &(s_row->createdate));
}
row->sharecount += 1;
switch (s_row->errn) {
case SE_NONE:
row->diffacc += s_row->diff;
row->shareacc++;
// should always be true
if (s_row->diff > 0) {
if (!row->firstshareacc.tv_sec ||
!tv_newer(&(row->firstshareacc), &(s_row->createdate))) {
copy_tv(&(row->firstshareacc), &(s_row->createdate));
}
if (tv_newer(&(row->lastshareacc), &(s_row->createdate))) {
copy_tv(&(row->lastshareacc), &(s_row->createdate));
row->lastdiffacc = s_row->diff;
}
}
break;
case SE_STALE:
row->diffsta += s_row->diff;
row->sharesta++;
break;
case SE_DUPE:
row->diffdup += s_row->diff;
row->sharedup++;
break;
case SE_HIGH_DIFF:
row->diffhi += s_row->diff;
row->sharehi++;
break;
default:
row->diffrej += s_row->diff;
row->sharerej++;
break;
}
K_WUNLOCK(keysharesummary_free);
}
/* Keep some simple stats on how often shares are out of order
* and how often they produce a WARNING due to OOOLIMIT */
static int64_t ooof0, ooof, oool0, oool;
@ -5005,16 +5283,18 @@ char *ooo_status(char *buf, size_t siz)
return buf;
}
// No longer stored in the DB but fields are updated as before
/* sharesummaries are no longer stored in the DB but fields are updated as b4
* This creates/updates both the sharesummaries and the keysharesummaries */
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS)
{
WORKMARKERS *wm;
SHARESUMMARY *row, *p_row;
K_ITEM *ss_item, *wm_item, *p_item = NULL;
bool new = false, p_new = false;
KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL;
K_ITEM *ss_item, *kiss_item, *kass_item, *wm_item, *p_item = NULL;
bool new = false, p_new = false, ki_new = false, ka_new = false;
int64_t userid, workinfoid;
char *workername;
char *workername, *address = NULL, *agent = NULL;
char *st = NULL, *db = NULL;
char ooo_buf[256];
double tdf, tdl;
@ -5030,6 +5310,8 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
userid = s_row->userid;
workername = s_row->workername;
workinfoid = s_row->workinfoid;
address = s_row->address;
agent = s_row->agent;
} else {
if (!e_row) {
quithere(1, "ERR: both s_row and e_row are NULL"
@ -5082,10 +5364,54 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data
set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl);
// Ignore shareerrors for keysummaries
if (s_row) {
K_RLOCK(keysharesummary_free);
kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address);
kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent);
K_RUNLOCK(keysharesummary_free);
if (kiss_item) {
DATA_KEYSHARESUMMARY(ki_row, kiss_item);
} else {
ki_new = true;
K_WLOCK(keysharesummary_free);
kiss_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ki_row, kiss_item);
bzero(ki_row, sizeof(*ki_row));
ki_row->workinfoid = workinfoid;
ki_row->keytype[0] = KEYTYPE_IP;
ki_row->keytype[1] = '\0';
DUP_POINTER(keysharesummary_free, ki_row->key, address);
}
// N.B. this directly updates the non-key data
set_keysharesummary_stats(ki_row, s_row, ki_new);
if (kass_item) {
DATA_KEYSHARESUMMARY(ka_row, kass_item);
} else {
ka_new = true;
K_WLOCK(keysharesummary_free);
kass_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ka_row, kass_item);
bzero(ka_row, sizeof(*ka_row));
ka_row->workinfoid = workinfoid;
ka_row->keytype[0] = KEYTYPE_AGENT;
ka_row->keytype[1] = '\0';
DUP_POINTER(keysharesummary_free, ka_row->key, agent);
}
// N.B. this directly updates the non-key data
set_keysharesummary_stats(ka_row, s_row, ka_new);
}
if (!new) {
// don't LOG '=' in case shares come from ckpool with the same timestamp
if (tdf < 0.0) {
char *tmp1, *tmp2;
char *tmp1 = NULL, *tmp2 = NULL;
int level = LOG_DEBUG;
// WARNING for shares exceeding the OOOLIMIT but not during startup
if (tdf < OOOLIMIT) {
@ -5106,7 +5432,7 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// don't LOG '=' in case shares come from ckpool with the same timestamp
if (tdl < 0.0) {
char *tmp1, *tmp2;
char *tmp1 = NULL, *tmp2 = NULL;
int level = LOG_DEBUG;
// WARNING for shares exceeding the OOOLIMIT but not during startup
if (tdl < OOOLIMIT) {
@ -5167,6 +5493,19 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
K_WUNLOCK(sharesummary_free);
}
if (ki_new || ka_new) {
K_WLOCK(keysharesummary_free);
if (ki_new) {
add_to_ktree(keysharesummary_root, kiss_item);
k_add_head(keysharesummary_store, kiss_item);
}
if (ka_new) {
add_to_ktree(keysharesummary_root, kass_item);
k_add_head(keysharesummary_store, kass_item);
}
K_WUNLOCK(keysharesummary_free);
}
return true;
}
@ -5187,6 +5526,20 @@ bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
return true;
}
// No key fields are modified
bool keysharesummary_age(K_ITEM *kss_item)
{
KEYSHARESUMMARY *row;
LOGDEBUG("%s(): update", __func__);
DATA_KEYSHARESUMMARY(row, kss_item);
row->complete[0] = SUMMARY_COMPLETE;
row->complete[1] = '\0';
return true;
}
bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
double diffacc, double diffinv, double shareacc,
double shareinv, int64_t elapsed,
@ -8135,6 +8488,88 @@ flail:
return ok;
}
bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
KEYSUMMARY *row;
char *params[20 + SIMPLEDATECOUNT];
int n, par = 0;
char *ins;
bool ok = false;
char *st = NULL;
LOGDEBUG("%s(): add", __func__);
DATA_KEYSUMMARY(row, ks_item);
SIMPLEDATEPOINTERS(markersummary_free, row, cd, by, code, inet);
SIMPLEDATEPTRTRANSFER(markersummary_free, trf_root, row);
par = 0;
params[par++] = bigint_to_buf(row->markerid, NULL, 0);
params[par++] = str_to_buf(row->keytype, NULL, 0);
params[par++] = str_to_buf(row->key, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = tv_to_buf(&(row->firstshareacc), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshareacc), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
SIMPLEDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into keysummary "
"(markerid,keytype,key,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,firstshareacc,"
"lastshareacc,lastdiffacc"
SIMPLEDATECONTROL ") values (" PQPARAM24 ")";
LOGDEBUG("%s() adding ks %"PRId64"/%s/%s/%.0f",
__func__, row->markerid, row->keytype,
st = safe_text_nonull(row->key),
row->diffacc);
FREENULL(st);
if (!conn) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
free(params[n]);
// caller must do tree/list/store changes
return ok;
}
/* Already means there is a transaction already in progress
* so don't begin or commit/rollback
* Add means create a new one and expire the old one if it exists,

4
src/ktree.c

@ -9,8 +9,8 @@
#include "ktree.h"
static const int dbg = 0;
#define DBG if (dbg != 0) printf
//static const int dbg = 0;
//#define DBG if (dbg != 0) printf
#define FAIL(fmt, ...) do \
{ \

Loading…
Cancel
Save