From b5307044198dcd0e66d6a8db60d0cb3f9911817c Mon Sep 17 00:00:00 2001 From: kanoi Date: Sat, 9 Jul 2016 23:40:51 +1000 Subject: [PATCH] ckdb - add keysummary code, ako and stop core dumping on a missing : in oc_ips() --- src/ckdb.c | 52 ++++- src/ckdb.h | 287 +++++++++++++++++++------- src/ckdb_cmd.c | 2 +- src/ckdb_data.c | 168 +++++++++++++++- src/ckdb_dbio.c | 525 +++++++++++++++++++++++++++++++++++++++++++----- src/ktree.c | 4 +- 6 files changed, 908 insertions(+), 130 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 454d8e5b..e1479991 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -711,6 +711,16 @@ K_STORE *markersummary_pool_store; char mark_start_type = '\0'; int64_t mark_start = -1; +// KEYSHARESUMMARY +K_TREE *keysharesummary_root; +K_LIST *keysharesummary_free; +K_STORE *keysharesummary_store; + +// KEYSUMMARY +K_TREE *keysummary_root; +K_LIST *keysummary_free; +K_STORE *keysummary_store; + // WORKMARKERS K_TREE *workmarkers_root; K_TREE *workmarkers_workinfoid_root; @@ -1617,6 +1627,20 @@ static void alloc_storage() cmp_markersummary, markersummary_free); + keysharesummary_free = k_new_list("KeyShareSummary", + sizeof(KEYSHARESUMMARY), + ALLOC_KEYSHARESUMMARY, + LIMIT_KEYSHARESUMMARY, true); + keysharesummary_store = k_new_store(keysharesummary_free); + keysharesummary_root = new_ktree(NULL, cmp_keysharesummary, + keysharesummary_free); + + keysummary_free = k_new_list("KeySummary", sizeof(KEYSUMMARY), + ALLOC_KEYSUMMARY, LIMIT_KEYSUMMARY, + true); + keysummary_store = k_new_store(keysummary_free); + keysummary_root = new_ktree(NULL, cmp_keysummary, keysummary_free); + workmarkers_free = k_new_list("WorkMarkers", sizeof(WORKMARKERS), ALLOC_WORKMARKERS, LIMIT_WORKMARKERS, true); workmarkers_store = k_new_store(workmarkers_free); @@ -1649,8 +1673,10 @@ static void alloc_storage() DLPRIO(workerstatus, 69); DLPRIO(sharesummary, 68); - DLPRIO(markersummary, 67); - DLPRIO(workmarkers, 66); + DLPRIO(keysharesummary, 67); + DLPRIO(markersummary, 65); + DLPRIO(keysummary, 64); + DLPRIO(workmarkers, 62); DLPRIO(marks, 60); @@ -1855,9 +1881,17 @@ static void dealloc_storage() FREE_LIST_DATA(workmarkers); if (free_mode != FREE_MODE_ALL) - LOGWARNING("%s() markersummary skipped", __func__); + LOGWARNING("%s() key/markersummary skipped", __func__); else { - LOGWARNING("%s() markersummary ...", __func__); + LOGWARNING("%s() key/markersummary ...", __func__); + + FREE_TREE(keysummary); + FREE_STORE_DATA(keysummary); + FREE_LIST(keysummary); + + FREE_TREE(keysharesummary); + FREE_STORE_DATA(keysharesummary); + FREE_LIST(keysharesummary); FREE_TREE(markersummary_pool); k_list_transfer_to_tail_nolock(markersummary_pool_store, @@ -4161,7 +4195,7 @@ static void *summariser(__maybe_unused void *arg) #define SHIFT_WORDS 26 static char *shift_words[] = { - "akatsuki", + "ako", "belldandy", "charlotte", "darkchii", @@ -4277,7 +4311,9 @@ static void make_a_shift_mark() } /* Find the last !new sharesummary workinfoid - * If the shift needs to go beyond this, then it's not ready yet */ + * If the shift needs to go beyond this, then it's not ready yet + * keysharesummaries will have the same workinfoid, + * so don't need checking */ ss_age_wid = 0; K_RLOCK(sharesummary_free); ss_item = first_in_ktree(sharesummary_workinfoid_root, ss_ctx); @@ -4499,7 +4535,9 @@ static void make_a_shift_mark() was_block = true; break; } - // Does workinfo have (aged) sharesummaries? + /* Does workinfo have (aged) sharesummaries? + * keysharesummary will be the same, + * so doesn't need checking (and doesn't matter) */ looksharesummary.workinfoid = workinfo->workinfoid; looksharesummary.userid = MAXID; looksharesummary.workername = EMPTY; diff --git a/src/ckdb.h b/src/ckdb.h index 69581185..aabf8285 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.114" +#define CKDB_VERSION DB_VERSION"-2.200" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -830,26 +830,47 @@ enum cmd_values { char createby[TXT_SML+1]; \ char createcode[TXT_MED+1]; \ char createinet[TXT_MED+1]; \ - tv_t expirydate + tv_t expirydate; \ + bool buffers +#define HISTORYDATECONTROLPOINTERS \ + tv_t createdate; \ + char *createby; \ + char *createcode; \ + char *createinet; \ + tv_t expirydate; \ + bool pointers #define HISTORYDATEINIT(_row, _cd, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, _by); \ - STRNCPY(_row->createcode, _code); \ - STRNCPY(_row->createinet, _inet); \ - _row->expirydate.tv_sec = default_expiry.tv_sec; \ - _row->expirydate.tv_usec = default_expiry.tv_usec; \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->createby, _by); \ + STRNCPY((_row)->createcode, _code); \ + STRNCPY((_row)->createinet, _inet); \ + (_row)->expirydate.tv_sec = default_expiry.tv_sec; \ + (_row)->expirydate.tv_usec = default_expiry.tv_usec; \ + (_row)->buffers = (_row)->buffers; \ } while (0) #define HISTORYDATEDEFAULT(_row, _cd) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, by_default); \ - STRNCPY(_row->createcode, (char *)__func__); \ - STRNCPY(_row->createinet, inet_default); \ - _row->expirydate.tv_sec = default_expiry.tv_sec; \ - _row->expirydate.tv_usec = default_expiry.tv_usec; \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->createby, by_default); \ + STRNCPY((_row)->createcode, (char *)__func__); \ + STRNCPY((_row)->createinet, inet_default); \ + (_row)->expirydate.tv_sec = default_expiry.tv_sec; \ + (_row)->expirydate.tv_usec = default_expiry.tv_usec; \ + (_row)->buffers = (_row)->buffers; \ + } while (0) + +#define HISTORYDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + SET_CREATEBY(_list, (_row)->createby, _by); \ + SET_CREATECODE(_list, (_row)->createcode, _code); \ + SET_CREATEINET(_list, (_row)->createinet, _inet); \ + (_row)->expirydate.tv_sec = default_expiry.tv_sec; \ + (_row)->expirydate.tv_usec = default_expiry.tv_usec; \ + (_row)->pointers = (_row)->pointers; \ } while (0) /* Override _row defaults if transfer fields are present @@ -875,6 +896,7 @@ enum cmd_values { DATA_TRANSFER(__transfer, __item); \ STRNCPY((_row)->createinet, __transfer->mvalue); \ } \ + (_row)->buffers = (_row)->buffers; \ } \ } while (0) @@ -891,7 +913,7 @@ enum cmd_values { char modifyby[TXT_SML+1]; \ char modifycode[TXT_MED+1]; \ char modifyinet[TXT_MED+1]; \ - bool buffers; + bool buffers #define MODIFYDATECONTROLPOINTERS \ tv_t createdate; \ char *createby; \ @@ -901,50 +923,50 @@ enum cmd_values { char *modifyby; \ char *modifycode; \ char *modifyinet; \ - bool pointers; + bool pointers #define MODIFYDATEINIT(_row, _cd, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, _by); \ - STRNCPY(_row->createcode, _code); \ - STRNCPY(_row->createinet, _inet); \ - DATE_ZERO(&(_row->modifydate)); \ - _row->modifyby[0] = '\0'; \ - _row->modifycode[0] = '\0'; \ - _row->modifyinet[0] = '\0'; \ - _row->buffers = _row->buffers; \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->createby, _by); \ + STRNCPY((_row)->createcode, _code); \ + STRNCPY((_row)->createinet, _inet); \ + DATE_ZERO(&((_row)->modifydate)); \ + (_row)->modifyby[0] = '\0'; \ + (_row)->modifycode[0] = '\0'; \ + (_row)->modifyinet[0] = '\0'; \ + (_row)->buffers = (_row)->buffers; \ } while (0) #define MODIFYUPDATE(_row, _cd, _by, _code, _inet) do { \ - _row->modifydate.tv_sec = (_cd)->tv_sec; \ - _row->modifydate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->modifyby, _by); \ - STRNCPY(_row->modifycode, _code); \ - STRNCPY(_row->modifyinet, _inet); \ - _row->buffers = _row->buffers; \ + (_row)->modifydate.tv_sec = (_cd)->tv_sec; \ + (_row)->modifydate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->modifyby, _by); \ + STRNCPY((_row)->modifycode, _code); \ + STRNCPY((_row)->modifyinet, _inet); \ + (_row)->buffers = (_row)->buffers; \ } while (0) #define MODIFYDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - SET_CREATEBY(_list, _row->createby, _by); \ - SET_CREATECODE(_list, _row->createcode, _code); \ - SET_CREATEINET(_list, _row->createinet, _inet); \ - DATE_ZERO(&(_row->modifydate)); \ - SET_MODIFYBY(_list, _row->modifyby, EMPTY); \ - SET_MODIFYCODE(_list, _row->modifycode, EMPTY); \ - SET_MODIFYINET(_list, _row->modifyinet, EMPTY); \ - _row->pointers = _row->pointers; \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + SET_CREATEBY(_list, (_row)->createby, _by); \ + SET_CREATECODE(_list, (_row)->createcode, _code); \ + SET_CREATEINET(_list, (_row)->createinet, _inet); \ + DATE_ZERO(&((_row)->modifydate)); \ + SET_MODIFYBY(_list, (_row)->modifyby, EMPTY); \ + SET_MODIFYCODE(_list, (_row)->modifycode, EMPTY); \ + SET_MODIFYINET(_list, (_row)->modifyinet, EMPTY); \ + (_row)->pointers = (_row)->pointers; \ } while (0) #define MODIFYUPDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \ - _row->modifydate.tv_sec = (_cd)->tv_sec; \ - _row->modifydate.tv_usec = (_cd)->tv_usec; \ - SET_MODIFYBY(_list, _row->modifyby, _by); \ - SET_MODIFYCODE(_list, _row->modifycode, _code); \ - SET_MODIFYINET(_list, _row->modifyinet, _inet); \ - _row->pointers = _row->pointers; \ + (_row)->modifydate.tv_sec = (_cd)->tv_sec; \ + (_row)->modifydate.tv_usec = (_cd)->tv_usec; \ + SET_MODIFYBY(_list, (_row)->modifyby, _by); \ + SET_MODIFYCODE(_list, (_row)->modifycode, _code); \ + SET_MODIFYINET(_list, (_row)->modifyinet, _inet); \ + (_row)->pointers = (_row)->pointers; \ } while (0) /* Override _row defaults if transfer fields are present @@ -959,19 +981,19 @@ enum cmd_values { __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \ + SET_CREATEBY(_list, (_row)->createby, __transfer->mvalue); \ } \ __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \ + SET_CREATECODE(_list, (_row)->createcode, __transfer->mvalue); \ } \ __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \ + SET_CREATEINET(_list, (_row)->createinet, __transfer->mvalue); \ } \ - _row->pointers = _row->pointers; \ + (_row)->pointers = (_row)->pointers; \ } \ } while (0) @@ -981,22 +1003,40 @@ enum cmd_values { tv_t createdate; \ char createby[TXT_SML+1]; \ char createcode[TXT_MED+1]; \ - char createinet[TXT_MED+1] + char createinet[TXT_MED+1]; \ + bool buffers +#define SIMPLEDATECONTROLPOINTERS \ + tv_t createdate; \ + char *createby; \ + char *createcode; \ + char *createinet; \ + bool pointers #define SIMPLEDATEINIT(_row, _cd, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, _by); \ - STRNCPY(_row->createcode, _code); \ - STRNCPY(_row->createinet, _inet); \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->createby, _by); \ + STRNCPY((_row)->createcode, _code); \ + STRNCPY((_row)->createinet, _inet); \ + (_row)->buffers = (_row)->buffers; \ } while (0) #define SIMPLEDATEDEFAULT(_row, _cd) do { \ - _row->createdate.tv_sec = (_cd)->tv_sec; \ - _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, by_default); \ - STRNCPY(_row->createcode, (char *)__func__); \ - STRNCPY(_row->createinet, inet_default); \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + STRNCPY((_row)->createby, by_default); \ + STRNCPY((_row)->createcode, (char *)__func__); \ + STRNCPY((_row)->createinet, inet_default); \ + (_row)->buffers = (_row)->buffers; \ + } while (0) + +#define SIMPLEDATEPOINTERS(_list, _row, _cd, _by, _code, _inet) do { \ + (_row)->createdate.tv_sec = (_cd)->tv_sec; \ + (_row)->createdate.tv_usec = (_cd)->tv_usec; \ + SET_CREATEBY(_list, (_row)->createby, _by); \ + SET_CREATECODE(_list, (_row)->createcode, _code); \ + SET_CREATEINET(_list, (_row)->createinet, _inet); \ + (_row)->pointers = (_row)->pointers; \ } while (0) /* Override _row defaults if transfer fields are present @@ -1009,18 +1049,42 @@ enum cmd_values { __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - STRNCPY(_row->createby, __transfer->mvalue); \ + STRNCPY((_row)->createby, __transfer->mvalue); \ + } \ + __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + STRNCPY((_row)->createcode, __transfer->mvalue); \ + } \ + __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + STRNCPY((_row)->createinet, __transfer->mvalue); \ + } \ + (_row)->buffers = (_row)->buffers; \ + } while (0) + +#define SIMPLEDATEPTRTRANSFER(_list, _root, _row) do { \ + char __reply[16]; \ + size_t __siz = sizeof(__reply); \ + K_ITEM *__item; \ + TRANSFER *__transfer; \ + __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATEBY(_list, (_row)->createby, __transfer->mvalue); \ } \ __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - STRNCPY(_row->createcode, __transfer->mvalue); \ + SET_CREATECODE(_list, (_row)->createcode, __transfer->mvalue); \ } \ __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ - STRNCPY(_row->createinet, __transfer->mvalue); \ + SET_CREATEINET(_list, (_row)->createinet, __transfer->mvalue); \ } \ + (_row)->pointers = (_row)->pointers; \ } while (0) // LOGQUEUE @@ -2601,6 +2665,84 @@ extern K_STORE *markersummary_pool_store; extern char mark_start_type; extern int64_t mark_start; +// KEYSHARESUMMARY +typedef struct keysharesummary { + int64_t workinfoid; + char keytype[TXT_FLAG+1]; + char *key; + double diffacc; + double diffsta; + double diffdup; + double diffhi; + double diffrej; + double shareacc; + double sharesta; + double sharedup; + double sharehi; + double sharerej; + int64_t sharecount; + int64_t errorcount; + tv_t firstshare; + tv_t lastshare; + tv_t firstshareacc; + tv_t lastshareacc; + double lastdiffacc; + char complete[TXT_FLAG+1]; + SIMPLEDATECONTROLPOINTERS; +} KEYSHARESUMMARY; + +#define ALLOC_KEYSHARESUMMARY 1000 +#define LIMIT_KEYSHARESUMMARY 0 +#define INIT_KEYSHARESUMMARY(_item) INIT_GENERIC(_item, keysharesummary) +#define DATA_KEYSHARESUMMARY(_var, _item) DATA_GENERIC(_var, _item, keysharesummary, true) +#define DATA_KEYSHARESUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, keysharesummary, false) + +extern K_TREE *keysharesummary_root; +extern K_LIST *keysharesummary_free; +extern K_STORE *keysharesummary_store; + +// KEYSUMMARY +typedef struct keysummary { + int64_t markerid; + char keytype[TXT_FLAG+1]; + char *key; + double diffacc; + double diffsta; + double diffdup; + double diffhi; + double diffrej; + double shareacc; + double sharesta; + double sharedup; + double sharehi; + double sharerej; + int64_t sharecount; + int64_t errorcount; + tv_t firstshare; + tv_t lastshare; + tv_t firstshareacc; + tv_t lastshareacc; + double lastdiffacc; + SIMPLEDATECONTROLPOINTERS; +} KEYSUMMARY; + +#define ALLOC_KEYSUMMARY 1000 +#define LIMIT_KEYSUMMARY 0 +#define INIT_KEYSUMMARY(_item) INIT_GENERIC(_item, keysummary) +#define DATA_KEYSUMMARY(_var, _item) DATA_GENERIC(_var, _item, keysummary, true) +#define DATA_KEYSUMMARY_NULL(_var, _item) DATA_GENERIC(_var, _item, keysummary, false) + +extern K_TREE *keysummary_root; +extern K_LIST *keysummary_free; +extern K_STORE *keysummary_store; + +#define KEYTYPE_IP 'i' +#define KEYTYPE_IP_STR "i" +#define KEYIP(_keytype) (tolower((_keytype)[0]) == KEYTYPE_IP) +#define KEYTYPE_AGENT 'a' +#define KEYTYPE_AGENT_STR "a" +#define KEYAGENT(_keytype) (tolower((_keytype)[0]) == KEYTYPE_AGENT) + // WORKMARKERS typedef struct workmarkers { int64_t markerid; @@ -2811,6 +2953,8 @@ extern void free_payouts_data(K_ITEM *item); extern void free_ips_data(K_ITEM *item); extern void free_optioncontrol_data(K_ITEM *item); extern void free_markersummary_data(K_ITEM *item); +extern void free_keysharesummary_data(K_ITEM *item); +extern void free_keysummary_data(K_ITEM *item); extern void free_workmarkers_data(K_ITEM *item); extern void free_marks_data(K_ITEM *item); #define free_seqset_data(_item) _free_seqset_data(_item) @@ -3101,6 +3245,10 @@ extern K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid, int64_t userid, char *workername, bool pool); extern bool make_markersummaries(bool msg, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); +extern cmp_t cmp_keysharesummary(K_ITEM *a, K_ITEM *b); +extern void zero_keysharesummary(KEYSHARESUMMARY *row); +extern K_ITEM *find_keysharesummary(int64_t workinfoid, char keytype, char *key); +extern cmp_t cmp_keysummary(K_ITEM *a, K_ITEM *b); extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); @@ -3264,6 +3412,7 @@ extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, _sharesummary_age(_ss_item, _by, _code, _inet, _cd, WHERE_FFL_HERE) extern bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS); +extern bool keysharesummary_age(K_ITEM *kss_item); extern bool sharesummary_fill(PGconn *conn); extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, double diffacc, double diffinv, double shareacc, @@ -3325,6 +3474,8 @@ extern bool userstats_fill(PGconn *conn); extern bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); extern bool markersummary_fill(PGconn *conn); +extern bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root); #define workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \ _workinfoidend, _workinfoidstart, _description, \ _status, _by, _code, _inet, _cd, _trf_root) \ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 19fc0b36..f348fedd 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -6476,7 +6476,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, * Of course if you don't have ALL the necessary shares in * the CCLs then you'd lose data doing this * - * SS_to_MS will complain if any markersummaries already exist + * K/SS_to_K/MS will complain if any markersummaries already exist * when processing a workmarker * Normally you would use 'processed' if the markersummaries * are OK, and just the workmarker failed to be updated to diff --git a/src/ckdb_data.c b/src/ckdb_data.c index f28fd314..c0d686b9 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -121,6 +121,30 @@ void free_markersummary_data(K_ITEM *item) SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY); } +void free_keysharesummary_data(K_ITEM *item) +{ + KEYSHARESUMMARY *keysharesummary; + + DATA_KEYSHARESUMMARY(keysharesummary, item); + LIST_MEM_SUB(keysharesummary_free, keysharesummary->key); + FREENULL(keysharesummary->key); + SET_CREATEBY(keysharesummary_free, keysharesummary->createby, EMPTY); + SET_CREATECODE(keysharesummary_free, keysharesummary->createcode, EMPTY); + SET_CREATEINET(keysharesummary_free, keysharesummary->createinet, EMPTY); +} + +void free_keysummary_data(K_ITEM *item) +{ + KEYSUMMARY *keysummary; + + DATA_KEYSUMMARY(keysummary, item); + LIST_MEM_SUB(keysummary_free, keysummary->key); + FREENULL(keysummary->key); + SET_CREATEBY(keysummary_free, keysummary->createby, EMPTY); + SET_CREATECODE(keysummary_free, keysummary->createcode, EMPTY); + SET_CREATEINET(keysummary_free, keysummary->createinet, EMPTY); +} + void free_workmarkers_data(K_ITEM *item) { WORKMARKERS *workmarkers; @@ -2111,15 +2135,17 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, int64_t *ss_count, int64_t *s_count, int64_t *s_diff) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item; - K_ITEM *wm_item, *tmp_item; - K_TREE_CTX ss_ctx[1], s_ctx[1]; + K_ITEM ks_look, *ks_item, *wm_item, *tmp_item; + K_TREE_CTX ss_ctx[1], s_ctx[1], ks_ctx[1]; char cd_buf[DATE_BUFSIZ]; int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped; + int64_t ks_tot, ks_already, ks_failed; int64_t diff_tot; + KEYSHARESUMMARY lookkeysharesummary, *keysharesummary; SHARESUMMARY looksharesummary, *sharesummary; WORKINFO *workinfo; SHARES lookshares, *shares; - bool ok = false, skipupdate; + bool ok = false, ksok = false, skipupdate; char error[1024]; LOGDEBUG("%s(): age", __func__); @@ -2280,8 +2306,70 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, shares_dumped, diff_tot); } } + + INIT_KEYSHARESUMMARY(&ks_look); + + // Find the first matching keysharesummary + lookkeysharesummary.workinfoid = workinfoid; + lookkeysharesummary.keytype[0] = '\0'; + lookkeysharesummary.key = EMPTY; + + ksok = true; + ks_tot = ks_already = ks_failed = 0; + ks_look.data = (void *)(&lookkeysharesummary); + K_RLOCK(keysharesummary_free); + ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx); + K_RUNLOCK(keysharesummary_free); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); + while (ks_item && keysharesummary->workinfoid == workinfoid) { + ks_tot++; + skipupdate = false; + /* Reloading during a confirm will not have any old data + * so finding an aged keysharesummary here is an error + * N.B. this can only happen with (very) old reload files */ + if (reloading) { + if (keysharesummary->complete[0] == SUMMARY_COMPLETE) { + ks_already++; + skipupdate = true; + if (confirm_sharesummary) { + LOGERR("%s(): Duplicate %s found during confirm %"PRId64"/%s/%s", + __func__, __func__, + keysharesummary->workinfoid, + keysharesummary->keytype, + keysharesummary->key); + } + } + } + + if (!skipupdate) { + if (!keysharesummary_age(ks_item)) { + ks_failed++; + LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s", + __func__, keysharesummary->workinfoid, + keysharesummary->keytype, + keysharesummary->key); + ksok = false; + } + } + + /* All shares should have been discarded during sharesummary + * processing above */ + + K_RLOCK(keysharesummary_free); + ks_item = next_in_ktree(ks_ctx); + K_RUNLOCK(keysharesummary_free); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); + } + + if (ks_already) { + LOGNOTICE("%s(): Keysummary aging of %"PRId64"/%s " + "kstotal=%"PRId64" already=%"PRId64" failed=%"PRId64, + __func__, workinfoid, poolinstance, + ks_tot, ks_already, ks_failed); + } + bye: - return ok; + return (ok && ksok); } // Block height coinbase reward value @@ -2533,7 +2621,8 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, age_id = prev_found; - // Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found + /* Find the oldest 'unaged' sharesummary < workinfoid and >= prev_found + * Unaged keysharesummaries will have the same workinfoids */ looksharesummary.workinfoid = prev_found; looksharesummary.userid = -1; looksharesummary.workername = EMPTY; @@ -2571,6 +2660,7 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, * been missed and can report the range of data that was aged, * which would normally just be an approx 10min set of workinfoids * from the last time ckpool stopped + * workinfo_age also processes the matching keysharesummaries * Each next group of unaged sharesummaries following this, will be * picked up by each next aging */ wid_count = 0; @@ -2869,8 +2959,9 @@ void set_block_share_counters() ws_item = NULL; /* From the end backwards so we can skip the workinfoid's we don't - * want by jumping back to just before the current worker when the - * workinfoid goes below the limit */ + * want by jumping back to just before the current worker when the + * workinfoid goes below the limit + * N.B. keysharesummaries duplicate the totals, so are ignored */ ss_item = last_in_ktree(sharesummary_root, ctx); while (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); @@ -5657,6 +5748,69 @@ flailed: return ok; } +// order by workinfoid asc,keytype asc,key asc (has no expirydate) +cmp_t cmp_keysharesummary(K_ITEM *a, K_ITEM *b) +{ + KEYSHARESUMMARY *ka, *kb; + DATA_KEYSHARESUMMARY(ka, a); + DATA_KEYSHARESUMMARY(kb, b); + cmp_t c = CMP_BIGINT(ka->workinfoid, kb->workinfoid); + if (c == 0) { + c = CMP_STR(ka->keytype, kb->keytype); + if (c == 0) + c = CMP_STR(ka->key, kb->key); + } + return c; +} + +void zero_keysharesummary(KEYSHARESUMMARY *row) +{ + LIST_WRITE(keysharesummary_free); + row->diffacc = row->diffsta = row->diffdup = row->diffhi = + row->diffrej = row->shareacc = row->sharesta = row->sharedup = + row->sharehi = row->sharerej = 0.0; + row->sharecount = row->errorcount = 0; + DATE_ZERO(&(row->firstshare)); + DATE_ZERO(&(row->lastshare)); + DATE_ZERO(&(row->firstshareacc)); + DATE_ZERO(&(row->lastshareacc)); + row->lastdiffacc = 0; + row->complete[0] = SUMMARY_NEW; + row->complete[1] = '\0'; +} + +// Must be R or W locked +K_ITEM *find_keysharesummary(int64_t workinfoid, char keytype, char *key) +{ + KEYSHARESUMMARY keysharesummary; + K_TREE_CTX ctx[1]; + K_ITEM look; + + keysharesummary.workinfoid = workinfoid; + keysharesummary.keytype[0] = keytype; + keysharesummary.keytype[1] = '\0'; + keysharesummary.key = key; + + INIT_KEYSHARESUMMARY(&look); + look.data = (void *)(&keysharesummary); + return find_in_ktree(keysharesummary_root, &look, ctx); +} + +// order by markerid asc,keytype asc,key asc (has no expirydate) +cmp_t cmp_keysummary(K_ITEM *a, K_ITEM *b) +{ + KEYSUMMARY *ka, *kb; + DATA_KEYSUMMARY(ka, a); + DATA_KEYSUMMARY(kb, b); + cmp_t c = CMP_BIGINT(ka->markerid, kb->markerid); + if (c == 0) { + c = CMP_STR(ka->keytype, kb->keytype); + if (c == 0) + c = CMP_STR(ka->key, kb->key); + } + return c; +} + void dsp_workmarkers(K_ITEM *item, FILE *stream) { WORKMARKERS *wm; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 7672a20a..c73b33f1 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -187,7 +187,8 @@ char *pqerrmsg(PGconn *conn) #define PQPARAM18 PQPARAM16 ",$17,$18" #define PQPARAM21 PQPARAM16 ",$17,$18,$19,$20,$21" #define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22" -#define PQPARAM23 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23" +#define PQPARAM23 PQPARAM22 ",$23" +#define PQPARAM24 PQPARAM22 ",$23,$24" #define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26" #define PQPARAM27 PQPARAM26 ",$27" #define PQPARAM28 PQPARAM26 ",$27,$28" @@ -2636,6 +2637,7 @@ void oc_ips(OPTIONCONTROL *oc, const char *from) if (!colon) { LOGERR("%s(%s): ERR: Missing ':' after IP '%s' name '%s'", from, __func__, oc->optionvalue, oc->optionname); + return; } STRNCPY(ips.eventname, colon+1); @@ -3177,14 +3179,9 @@ bool workinfo_fill(PGconn *conn) printf(TICK_PREFIX"wi 0\r"); fflush(stdout); - // TODO: select the data based on sharesummary since old data isn't needed - // however, the ageing rules for workinfo will decide that also - // keep the last block + current? Rules will depend on payout scheme also - APPEND_REALLOC_INIT(sel, off, len); APPEND_REALLOC(sel, off, len, "declare wi cursor for select " -// "workinfoid,poolinstance,transactiontree,merklehash,prevhash," "workinfoid,poolinstance,merklehash,prevhash," "coinbase1,coinbase2,version,bits,ntime,reward" HISTORYDATECONTROL @@ -3285,13 +3282,6 @@ bool workinfo_fill(PGconn *conn) break; TXT_TO_BIGINT("workinfoid", field, row->workinfoid); -/* Not currently needed in RAM - PQ_GET_FLD(res, i, "transactiontree", field, ok); - if (!ok) - break; - TXT_TO_BLOB("transactiontree", field, row->transactiontree); - LIST_MEM_ADD(workinfo_free, row->transactiontree); -*/ row->transactiontree = EMPTY; PQ_GET_FLD(res, i, "merklehash", field, ok); @@ -3386,8 +3376,10 @@ bool workinfo_fill(PGconn *conn) flail: res = PQexec(conn, "Commit", CKPQ_READ); PQclear(res); - for (n = 0; n < par; n++) - free(params[n]); + for (i = 0; i < par; i++) + free(params[i]); + par = 0; + free(sel); if (ok) { @@ -3642,7 +3634,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername SHARES *shares = NULL, *shares2 = NULL; double sdiff_amt; USERS *users; - bool ok = false; + bool ok = false, dup = false; char *st = NULL; LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", @@ -3733,7 +3725,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername add_to_ktree(shares_early_root, s_item); k_add_head(shares_early_store, s_item); if (s2_item) { - /* Just ignore duplicates - this matches the DB index + /* Discard duplicates - this matches the DB index N.B. a duplicate share doesn't have to be SE_DUPE, two shares can be SE_NONE and SE_STALE */ tmp_item = find_in_ktree(shares_db_root, s2_item, ctx); @@ -3743,11 +3735,23 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername add_to_ktree(shares_db_root, s2_item); k_add_head(shares_hi_store, s2_item); } else { + dup = true; k_add_head(shares_free, s2_item); s2_item = NULL; } } K_WUNLOCK(shares_free); + if (dup) { + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGWARNING("%s() duplicate DB share discarded: " + "%"PRId64"/%s/%"PRId32"/%s/%.0f/%"PRId32"/%ld,%ld %s", + __func__, shares->workinfoid, + st = safe_text_nonull(workername), + shares->clientid, shares->nonce, + shares->sdiff, shares->errn, + cd->tv_sec, cd->tv_usec, cd_buf); + FREENULL(st); + } /* It was all OK except the missing workinfoid * and it was queued, so most likely OK */ return true; @@ -3759,18 +3763,30 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername add_to_ktree(shares_root, s_item); k_add_head(shares_store, s_item); if (s2_item) { - // Just ignore duplicates + // Discard duplicates tmp_item = find_in_ktree(shares_db_root, s2_item, ctx); if (tmp_item == NULL) { add_to_ktree(shares_hi_root, s2_item); add_to_ktree(shares_db_root, s2_item); k_add_head(shares_hi_store, s2_item); } else { + dup = true; k_add_head(shares_free, s2_item); s2_item = NULL; } } K_WUNLOCK(shares_free); + if (dup) { + btv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGWARNING("%s() duplicate DB share discarded: " + "%"PRId64"/%s/%"PRId32"/%s/%.0f/%"PRId32"/%ld,%ld %s", + __func__, shares->workinfoid, + st = safe_text_nonull(workername), + shares->clientid, shares->nonce, + shares->sdiff, shares->errn, + cd->tv_sec, cd->tv_usec, cd_buf); + FREENULL(st); + } shares_process_early(conn, wi_item, &(shares->createdate), trf_root); @@ -3874,10 +3890,11 @@ bool shares_fill(PGconn *conn) char *field; char *sel = NULL; char *params[1]; - int fields = 14, par = 0; + int fields = 16, par = 0; bool ok = false; int64_t workinfoid; tv_t old; + int no_addr = 0, no_agent = 0; LOGDEBUG("%s(): select", __func__); @@ -3916,7 +3933,8 @@ bool shares_fill(PGconn *conn) sel = "declare sh cursor for select " "workinfoid,userid,workername,clientid,enonce1,nonce2,nonce," - "diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff" + "diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff,agent," + "address" HISTORYDATECONTROL " from shares where workinfoid>=$1"; par = 0; @@ -4053,6 +4071,20 @@ bool shares_fill(PGconn *conn) if (!ok) break; + PQ_GET_FLD(res, i, "agent", field, ok); + if (!ok) + break; + if (!(*field)) + no_agent++; + TXT_TO_STR("agent", field, row->agent); + + PQ_GET_FLD(res, i, "address", field, ok); + if (!ok) + break; + if (!(*field)) + no_addr++; + TXT_TO_STR("address", field, row->address); + add_to_ktree(shares_db_root, item); k_add_head(shares_hi_store, item); @@ -4086,6 +4118,15 @@ flail: if (ok) { LOGDEBUG("%s(): built", __func__); LOGWARNING("%s(): fetched %d shares records", __func__, n); + if (no_addr || no_agent) { + if (no_addr == no_agent) { + LOGWARNING(" %d had no address and agent", + no_addr); + } else { + LOGWARNING(" %d had no address %d had no agent", + no_addr, no_agent); + } + } } return ok; @@ -4435,25 +4476,37 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, K_TREE *trf_root) { // shorter name for log messages - static const char *shortname = "SS_to_MS"; + static const char *shortname = "K/SS_to_K/MS"; + static const char *sshortname = "SS_to_MS"; + static const char *kshortname = "KSS_to_KS"; ExecStatusType rescode; PGresult *res; - K_TREE_CTX ss_ctx[1], ms_ctx[1]; + K_TREE_CTX ss_ctx[1], kss_ctx[1], ms_ctx[1], ks_ctx[1]; SHARESUMMARY *sharesummary, looksharesummary; + KEYSHARESUMMARY *keysharesummary, lookkeysharesummary; MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary = NULL; - K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look; - K_ITEM *p_ss_item, *p_ms_item; + KEYSUMMARY *keysummary, lookkeysummary; + K_ITEM *ss_item, *ss_prev, ss_look; + K_ITEM *kss_item, *kss_prev, kss_look; + K_ITEM *ms_item, ms_look, *p_ss_item, *p_ms_item; + K_ITEM *ks_item, ks_look; bool ok = false, conned = false; - int64_t diffacc, shareacc; + int64_t diffacc = 0, shareacc = 0; + int64_t kdiffacc= 0, kshareacc = 0; char *reason = NULL; - int ss_count, ms_count; + int ss_count, kss_count, ms_count, ks_count; char *st = NULL; tv_t add_stt, add_fin, db_stt, db_fin, lck_stt, lck_got, lck_fin; + tv_t kadd_stt, kadd_fin, kdb_stt, kdb_fin; DATE_ZERO(&add_stt); DATE_ZERO(&add_fin); DATE_ZERO(&db_stt); DATE_ZERO(&db_fin); + DATE_ZERO(&kadd_stt); + DATE_ZERO(&kadd_fin); + DATE_ZERO(&kdb_stt); + DATE_ZERO(&kdb_fin); LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/" "End %"PRId64"/Stt %"PRId64"/%s/%s", @@ -4462,12 +4515,16 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, workmarkers->description, workmarkers->status); K_STORE *old_sharesummary_store = k_new_store(sharesummary_free); + K_STORE *old_keysharesummary_store = k_new_store(keysharesummary_free); K_STORE *new_markersummary_store = k_new_store(markersummary_free); + K_STORE *new_keysummary_store = k_new_store(keysummary_free); - /* Use the master size for this local tree since - * it's large and doesn't get created often */ - K_TREE *ms_root = new_ktree_local(shortname, cmp_markersummary, + /* Use the master size for this local trees since + * they're large and doesn't get created often */ + K_TREE *ms_root = new_ktree_local(sshortname, cmp_markersummary, markersummary_free); + K_TREE *ks_root = new_ktree_local(kshortname, cmp_keysummary, + keysummary_free); if (!CURRENT(&(workmarkers->expirydate))) { reason = "unexpired"; @@ -4480,7 +4537,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, } setnow(&add_stt); - // Check there aren't already any matching markersummaries + /* Check there aren't already any matching markersummaries + * and assume keysummaries are the same */ lookmarkersummary.markerid = workmarkers->markerid; lookmarkersummary.userid = 0; lookmarkersummary.workername = EMPTY; @@ -4505,7 +4563,6 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, goto flail; } - diffacc = shareacc = 0; ms_item = NULL; looksharesummary.workinfoid = workmarkers->workinfoidend; @@ -4603,6 +4660,106 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, } setnow(&add_fin); + setnow(&kadd_stt); + ks_item = NULL; + + lookkeysharesummary.workinfoid = workmarkers->workinfoidend; + lookkeysharesummary.keytype[0] = '\0';; + lookkeysharesummary.key = EMPTY; + + INIT_KEYSHARESUMMARY(&kss_look); + kss_look.data = (void *)(&lookkeysharesummary); + /* Since shares come in from ckpool at a high rate, + * we don't want to lock keysharesummary for long + * Those incoming shares will not be touching the keysharesummaries + * we are processing here */ + K_RLOCK(keysharesummary_free); + kss_item = find_before_in_ktree(keysharesummary_root, + &kss_look, kss_ctx); + K_RUNLOCK(keysharesummary_free); + while (kss_item) { + DATA_KEYSHARESUMMARY(keysharesummary, kss_item); + if (keysharesummary->workinfoid < workmarkers->workinfoidstart) + break; + K_RLOCK(keysharesummary_free); + kss_prev = prev_in_ktree(kss_ctx); + K_RUNLOCK(keysharesummary_free); + + // Find/create the keysummary only once per key change + if (!ks_item || strcmp(keysummary->keytype, keysharesummary->keytype) != 0 || + strcmp(keysummary->key, keysharesummary->key) != 0) { + lookkeysummary.markerid = workmarkers->markerid; + lookkeysummary.keytype[0] = keysharesummary->keytype[0]; + lookkeysummary.keytype[1] = keysharesummary->keytype[1]; + lookkeysummary.key = keysharesummary->key; + + ks_look.data = (void *)(&lookkeysummary); + ks_item = find_in_ktree_nolock(ks_root, &ks_look, ks_ctx); + if (!ks_item) { + K_WLOCK(keysummary_free); + ks_item = k_unlink_head(keysummary_free); + K_WUNLOCK(keysummary_free); + k_add_head_nolock(new_keysummary_store, ks_item); + DATA_KEYSUMMARY(keysummary, ks_item); + bzero(keysummary, sizeof(*keysummary)); + keysummary->markerid = workmarkers->markerid; + keysummary->keytype[0] = keysharesummary->keytype[0]; + keysummary->keytype[1] = '\0'; + DUP_POINTER(keysummary_free, + keysummary->key, + keysharesummary->key); + add_to_ktree_nolock(ks_root, ks_item); + + LOGDEBUG("%s() new ks %"PRId64"/%s/%s", + shortname, keysummary->markerid, + keysummary->keytype, + st = safe_text(keysummary->key)); + FREENULL(st); + } else { + DATA_KEYSUMMARY(keysummary, ks_item); + } + } + keysummary->diffacc += keysharesummary->diffacc; + keysummary->diffsta += keysharesummary->diffsta; + keysummary->diffdup += keysharesummary->diffdup; + keysummary->diffhi += keysharesummary->diffhi; + keysummary->diffrej += keysharesummary->diffrej; + keysummary->shareacc += keysharesummary->shareacc; + keysummary->sharesta += keysharesummary->sharesta; + keysummary->sharedup += keysharesummary->sharedup; + keysummary->sharehi += keysharesummary->sharehi; + keysummary->sharerej += keysharesummary->sharerej; + keysummary->sharecount += keysharesummary->sharecount; + keysummary->errorcount += keysharesummary->errorcount; + if (!keysummary->firstshare.tv_sec || + !tv_newer(&(keysummary->firstshare), &(keysharesummary->firstshare))) { + copy_tv(&(keysummary->firstshare), &(keysharesummary->firstshare)); + } + if (tv_newer(&(keysummary->lastshare), &(keysharesummary->lastshare))) + copy_tv(&(keysummary->lastshare), &(keysharesummary->lastshare)); + if (keysharesummary->diffacc > 0) { + if (!keysummary->firstshareacc.tv_sec || + !tv_newer(&(keysummary->firstshareacc), &(keysharesummary->firstshareacc))) { + copy_tv(&(keysummary->firstshareacc), &(keysharesummary->firstshareacc)); + } + if (tv_newer(&(keysummary->lastshareacc), &(keysharesummary->lastshareacc))) { + copy_tv(&(keysummary->lastshareacc), &(keysharesummary->lastshareacc)); + keysummary->lastdiffacc = keysharesummary->lastdiffacc; + } + } + + kdiffacc += keysharesummary->diffacc; + kshareacc += keysharesummary->shareacc; + + K_WLOCK(keysharesummary_free); + k_unlink_item(keysharesummary_store, kss_item); + K_WUNLOCK(keysharesummary_free); + k_add_head_nolock(old_keysharesummary_store, kss_item); + + kss_item = kss_prev; + } + setnow(&kadd_fin); + setnow(&db_stt); if (conn == NULL) { conn = dbconnect(); @@ -4628,6 +4785,19 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, } ms_item = ms_item->next; } + setnow(&db_fin); + + setnow(&kdb_stt); + ks_item = STORE_HEAD_NOLOCK(new_keysummary_store); + while (ks_item) { + if (!(keysummary_add(conn, ks_item, by, code, inet, + cd, trf_root))) { + reason = "db error"; + setnow(&kdb_fin); + goto rollback; + } + ks_item = ks_item->next; + } ok = workmarkers_process(conn, true, true, workmarkers->markerid, @@ -4637,7 +4807,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, workmarkers->description, MARKER_PROCESSED_STR, by, code, inet, cd, trf_root); - setnow(&db_fin); + setnow(&kdb_fin); rollback: if (ok) res = PQexec(conn, "Commit", CKPQ_WRITE); @@ -4652,10 +4822,11 @@ flail: if (reason) { // already displayed the full workmarkers detail at the top LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s add=%.3fs " - "db=%.3fs", + "kadd=%.3fs db=%.3fs kdb=%.3fs", shortname, reason, workmarkers->markerid, workmarkers->description, workmarkers->status, - tvdiff(&add_fin, &add_stt), tvdiff(&db_fin, &db_stt)); + tvdiff(&add_fin, &add_stt), tvdiff(&kadd_fin, &kadd_stt), + tvdiff(&db_fin, &db_stt), tvdiff(&kdb_fin, &kdb_stt)); ok = false; } @@ -4678,13 +4849,34 @@ flail: k_list_transfer_to_head(old_sharesummary_store, sharesummary_store); K_WUNLOCK(sharesummary_free); } + if (new_keysummary_store->count > 0) { + // Throw them away (they don't exist anywhere else) + ks_item = STORE_HEAD_NOLOCK(new_keysummary_store); + while (ks_item) { + free_keysummary_data(ks_item); + ks_item = ks_item->next; + } + K_WLOCK(keysummary_free); + k_list_transfer_to_head(new_keysummary_store, keysummary_free); + K_WUNLOCK(keysummary_free); + } + if (old_keysharesummary_store->count > 0) { + // Put them back in the store where they came from + K_WLOCK(keysharesummary_free); + k_list_transfer_to_head(old_keysharesummary_store, keysharesummary_store); + K_WUNLOCK(keysharesummary_free); + } } else { ms_count = new_markersummary_store->count; + ks_count = new_keysummary_store->count; ss_count = old_sharesummary_store->count; + kss_count = old_keysharesummary_store->count; setnow(&lck_stt); K_WLOCK(sharesummary_free); + K_WLOCK(keysharesummary_free); K_WLOCK(markersummary_free); + K_WLOCK(keysummary_free); K_RLOCK(workmarkers_free); setnow(&lck_got); ms_item = STORE_HEAD_NOLOCK(new_markersummary_store); @@ -4711,6 +4903,14 @@ flail: } k_list_transfer_to_head(new_markersummary_store, markersummary_store); + ks_item = STORE_HEAD_NOLOCK(new_keysummary_store); + while (ks_item) { + // move the new keysummaries into the tree + add_to_ktree(keysummary_root, ks_item); + ks_item = ks_item->next; + } + k_list_transfer_to_head(new_keysummary_store, keysummary_store); + /* For normal shift processing this wont be very quick * so it will be a 'long' LOCK */ ss_item = STORE_HEAD_NOLOCK(old_sharesummary_store); @@ -4733,32 +4933,54 @@ flail: ss_item = ss_item->next; } k_list_transfer_to_head(old_sharesummary_store, sharesummary_free); + + /* For normal shift processing this wont be very quick + * so it will be a 'long' LOCK */ + kss_item = STORE_HEAD_NOLOCK(old_keysharesummary_store); + while (kss_item) { + // remove the old keysharesummaries from the trees + remove_from_ktree(keysharesummary_root, kss_item); + free_keysharesummary_data(kss_item); + kss_item = kss_item->next; + } + k_list_transfer_to_head(old_keysharesummary_store, keysharesummary_free); + K_RUNLOCK(workmarkers_free); + K_WUNLOCK(keysummary_free); K_WUNLOCK(markersummary_free); + K_WUNLOCK(keysharesummary_free); K_WUNLOCK(sharesummary_free); setnow(&lck_fin); - LOGWARNING("%s() Processed: %d ms %d ss %"PRId64" shares " - "%"PRId64" diff for workmarkers %"PRId64"/%s/" - "End %"PRId64"/Stt %"PRId64"/%s/%s add=%.3f " - "db=%.3fs lck=%.3fs+%.3f", - shortname, ms_count, ss_count, shareacc, diffacc, + LOGWARNING("%s() Processed: %d ms %d ks %d ss %d kss " + "%"PRId64" shares %"PRId64" diff " + "k(%"PRId64"/%"PRId64") for workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s add=%.3fs " + "kadd=%.3fs db=%.3fs kdb=%.3fs lck=%.3f+%.3fs", + shortname, ms_count, ks_count, ss_count, kss_count, + shareacc, diffacc, kshareacc, kdiffacc, workmarkers->markerid, workmarkers->poolinstance, workmarkers->workinfoidend, workmarkers->workinfoidstart, workmarkers->description, workmarkers->status, tvdiff(&add_fin, &add_stt), + tvdiff(&kadd_fin, &kadd_stt), tvdiff(&db_fin, &db_stt), + tvdiff(&kdb_fin, &kdb_stt), tvdiff(&lck_got, &lck_stt), tvdiff(&lck_fin, &lck_got)); } free_ktree(ms_root, NULL); + free_ktree(ks_root, NULL); new_markersummary_store = k_free_store(new_markersummary_store); + new_keysummary_store = k_free_store(new_keysummary_store); old_sharesummary_store = k_free_store(old_sharesummary_store); + old_keysharesummary_store = k_free_store(old_keysharesummary_store); return ok; } +// TODO: keysummaries ... bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm) { // shorter name for log messages @@ -4988,6 +5210,62 @@ static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, K_WUNLOCK(sharesummary_free); } +static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row, + bool new) +{ + K_WLOCK(keysharesummary_free); + + if (new) { + zero_keysharesummary(row); + copy_tv(&(row->firstshare), &(s_row->createdate)); + copy_tv(&(row->lastshare), &(s_row->createdate)); + } else { + if (!row->firstshare.tv_sec || + !tv_newer(&(row->firstshare), &(s_row->createdate))) { + copy_tv(&(row->firstshare), &(s_row->createdate)); + } + if (tv_newer(&(row->lastshare), &(s_row->createdate))) + copy_tv(&(row->lastshare), &(s_row->createdate)); + } + + row->sharecount += 1; + switch (s_row->errn) { + case SE_NONE: + row->diffacc += s_row->diff; + row->shareacc++; + // should always be true + if (s_row->diff > 0) { + if (!row->firstshareacc.tv_sec || + !tv_newer(&(row->firstshareacc), &(s_row->createdate))) { + copy_tv(&(row->firstshareacc), &(s_row->createdate)); + } + if (tv_newer(&(row->lastshareacc), &(s_row->createdate))) { + copy_tv(&(row->lastshareacc), &(s_row->createdate)); + row->lastdiffacc = s_row->diff; + } + } + break; + case SE_STALE: + row->diffsta += s_row->diff; + row->sharesta++; + break; + case SE_DUPE: + row->diffdup += s_row->diff; + row->sharedup++; + break; + case SE_HIGH_DIFF: + row->diffhi += s_row->diff; + row->sharehi++; + break; + default: + row->diffrej += s_row->diff; + row->sharerej++; + break; + } + + K_WUNLOCK(keysharesummary_free); +} + /* Keep some simple stats on how often shares are out of order * and how often they produce a WARNING due to OOOLIMIT */ static int64_t ooof0, ooof, oool0, oool; @@ -5005,16 +5283,18 @@ char *ooo_status(char *buf, size_t siz) return buf; } -// No longer stored in the DB but fields are updated as before +/* sharesummaries are no longer stored in the DB but fields are updated as b4 + * This creates/updates both the sharesummaries and the keysharesummaries */ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) { WORKMARKERS *wm; SHARESUMMARY *row, *p_row; - K_ITEM *ss_item, *wm_item, *p_item = NULL; - bool new = false, p_new = false; + KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL; + K_ITEM *ss_item, *kiss_item, *kass_item, *wm_item, *p_item = NULL; + bool new = false, p_new = false, ki_new = false, ka_new = false; int64_t userid, workinfoid; - char *workername; + char *workername, *address = NULL, *agent = NULL; char *st = NULL, *db = NULL; char ooo_buf[256]; double tdf, tdl; @@ -5030,6 +5310,8 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, userid = s_row->userid; workername = s_row->workername; workinfoid = s_row->workinfoid; + address = s_row->address; + agent = s_row->agent; } else { if (!e_row) { quithere(1, "ERR: both s_row and e_row are NULL" @@ -5082,10 +5364,54 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, // N.B. this directly updates the non-key data set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl); + // Ignore shareerrors for keysummaries + if (s_row) { + K_RLOCK(keysharesummary_free); + kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address); + kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent); + K_RUNLOCK(keysharesummary_free); + + if (kiss_item) { + DATA_KEYSHARESUMMARY(ki_row, kiss_item); + } else { + ki_new = true; + K_WLOCK(keysharesummary_free); + kiss_item = k_unlink_head(keysharesummary_free); + K_WUNLOCK(keysharesummary_free); + DATA_KEYSHARESUMMARY(ki_row, kiss_item); + bzero(ki_row, sizeof(*ki_row)); + ki_row->workinfoid = workinfoid; + ki_row->keytype[0] = KEYTYPE_IP; + ki_row->keytype[1] = '\0'; + DUP_POINTER(keysharesummary_free, ki_row->key, address); + } + + // N.B. this directly updates the non-key data + set_keysharesummary_stats(ki_row, s_row, ki_new); + + if (kass_item) { + DATA_KEYSHARESUMMARY(ka_row, kass_item); + } else { + ka_new = true; + K_WLOCK(keysharesummary_free); + kass_item = k_unlink_head(keysharesummary_free); + K_WUNLOCK(keysharesummary_free); + DATA_KEYSHARESUMMARY(ka_row, kass_item); + bzero(ka_row, sizeof(*ka_row)); + ka_row->workinfoid = workinfoid; + ka_row->keytype[0] = KEYTYPE_AGENT; + ka_row->keytype[1] = '\0'; + DUP_POINTER(keysharesummary_free, ka_row->key, agent); + } + + // N.B. this directly updates the non-key data + set_keysharesummary_stats(ka_row, s_row, ka_new); + } + if (!new) { // don't LOG '=' in case shares come from ckpool with the same timestamp if (tdf < 0.0) { - char *tmp1, *tmp2; + char *tmp1 = NULL, *tmp2 = NULL; int level = LOG_DEBUG; // WARNING for shares exceeding the OOOLIMIT but not during startup if (tdf < OOOLIMIT) { @@ -5106,7 +5432,7 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, // don't LOG '=' in case shares come from ckpool with the same timestamp if (tdl < 0.0) { - char *tmp1, *tmp2; + char *tmp1 = NULL, *tmp2 = NULL; int level = LOG_DEBUG; // WARNING for shares exceeding the OOOLIMIT but not during startup if (tdl < OOOLIMIT) { @@ -5167,6 +5493,19 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, K_WUNLOCK(sharesummary_free); } + if (ki_new || ka_new) { + K_WLOCK(keysharesummary_free); + if (ki_new) { + add_to_ktree(keysharesummary_root, kiss_item); + k_add_head(keysharesummary_store, kiss_item); + } + if (ka_new) { + add_to_ktree(keysharesummary_root, kass_item); + k_add_head(keysharesummary_store, kass_item); + } + K_WUNLOCK(keysharesummary_free); + } + return true; } @@ -5187,6 +5526,20 @@ bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, return true; } +// No key fields are modified +bool keysharesummary_age(K_ITEM *kss_item) +{ + KEYSHARESUMMARY *row; + + LOGDEBUG("%s(): update", __func__); + + DATA_KEYSHARESUMMARY(row, kss_item); + row->complete[0] = SUMMARY_COMPLETE; + row->complete[1] = '\0'; + + return true; +} + bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, double diffacc, double diffinv, double shareacc, double shareinv, int64_t elapsed, @@ -8135,6 +8488,88 @@ flail: return ok; } +bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res; + KEYSUMMARY *row; + char *params[20 + SIMPLEDATECOUNT]; + int n, par = 0; + char *ins; + bool ok = false; + char *st = NULL; + + LOGDEBUG("%s(): add", __func__); + + DATA_KEYSUMMARY(row, ks_item); + + SIMPLEDATEPOINTERS(markersummary_free, row, cd, by, code, inet); + SIMPLEDATEPTRTRANSFER(markersummary_free, trf_root, row); + + par = 0; + params[par++] = bigint_to_buf(row->markerid, NULL, 0); + params[par++] = str_to_buf(row->keytype, NULL, 0); + params[par++] = str_to_buf(row->key, NULL, 0); + params[par++] = double_to_buf(row->diffacc, NULL, 0); + params[par++] = double_to_buf(row->diffsta, NULL, 0); + params[par++] = double_to_buf(row->diffdup, NULL, 0); + params[par++] = double_to_buf(row->diffhi, NULL, 0); + params[par++] = double_to_buf(row->diffrej, NULL, 0); + params[par++] = double_to_buf(row->shareacc, NULL, 0); + params[par++] = double_to_buf(row->sharesta, NULL, 0); + params[par++] = double_to_buf(row->sharedup, NULL, 0); + params[par++] = double_to_buf(row->sharehi, NULL, 0); + params[par++] = double_to_buf(row->sharerej, NULL, 0); + params[par++] = bigint_to_buf(row->sharecount, NULL, 0); + params[par++] = bigint_to_buf(row->errorcount, NULL, 0); + params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); + params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = tv_to_buf(&(row->firstshareacc), NULL, 0); + params[par++] = tv_to_buf(&(row->lastshareacc), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); + SIMPLEDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into keysummary " + "(markerid,keytype,key,diffacc,diffsta,diffdup,diffhi," + "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," + "sharecount,errorcount,firstshare,lastshare,firstshareacc," + "lastshareacc,lastdiffacc" + SIMPLEDATECONTROL ") values (" PQPARAM24 ")"; + + LOGDEBUG("%s() adding ks %"PRId64"/%s/%s/%.0f", + __func__, row->markerid, row->keytype, + st = safe_text_nonull(row->key), + row->diffacc); + FREENULL(st); + + if (!conn) { + conn = dbconnect(); + conned = true; + } + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto unparam; + } + + ok = true; +unparam: + PQclear(res); + if (conned) + PQfinish(conn); + for (n = 0; n < par; n++) + free(params[n]); + + // caller must do tree/list/store changes + + return ok; +} + /* Already means there is a transaction already in progress * so don't begin or commit/rollback * Add means create a new one and expire the old one if it exists, diff --git a/src/ktree.c b/src/ktree.c index e97161ff..78b2ef98 100644 --- a/src/ktree.c +++ b/src/ktree.c @@ -9,8 +9,8 @@ #include "ktree.h" -static const int dbg = 0; -#define DBG if (dbg != 0) printf +//static const int dbg = 0; +//#define DBG if (dbg != 0) printf #define FAIL(fmt, ...) do \ { \