From 2e7171cf8429ee8bad5dd42bb01770800330f2ea Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 5 Aug 2014 17:56:44 +1000 Subject: [PATCH] ckdb - reload plus changes to handle createdate --- src/ckdb.c | 1009 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 730 insertions(+), 279 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index e227d5dc..1737965e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -105,7 +105,7 @@ static char *restorefrom; * poolinstance * DB+RAM workers: created by auths so auths will resolve it * DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any) - * will be after the last workinfo + * will be after the last DB workinfo * DB+RAM accountbalance (TODO): resolved by shares/workinfo/blocks * RAM workerstats: last_auth, last_share, last_stats all handled by * DB load up to whatever the CCL restart point is, and then @@ -123,6 +123,31 @@ static char *restorefrom; * users, useraccounts, paymentaddresses, payments, * accountadjustment, optioncontrol, miningpayouts, * eventlog + * + * The code deals with the issue of 'now' when reloading by: + * createdate is considered 'now' for all data during a reload and is + * a mandatory field always checked for in ckpool data + * Implementing this is simple by assuming 'now' is always createdate + * i.e. during reload but also during normal running to avoid timing + * issues with ckpool data + * Other data supplies the current time to the functions that require + * 'now' since that data is not part of a reload + * + * During reload, any data before the calculated reload stamp for + * that data is discarded + * Any data that matches the reload stamp is processed with an + * ignore duplicates flag for all except as below. + * Any data after the stamp, is processed normally for all except: + * 1) userstats: any record that falls in a DB userstats that would + * summarise that record is discarded, + * otherwise the userstats is processed normally + * 2) shares/shareerrors: any record that matches an incomplete DB + * sharesummary that hasn't been reset, will reset the sharesummary + * so that the sharesummary will be recalculated + * The record is processed normally in both situations + * 3) ageworkinfo records are also handled by the shares date + * while processing, any records already aged are not updated + * and a warning is displayed if there were any matching shares */ typedef struct loadstatus { @@ -130,14 +155,16 @@ typedef struct loadstatus { tv_t newest_createdate_workinfo; tv_t newest_createdate_auths; tv_t newest_createdate_poolstats; + tv_t userstats; + tv_t newest_createdate_blocks; } LOADSTATUS; static LOADSTATUS dbstatus; /* Temporary while doing restart - it (of course) contains the fields * required to track the newest userstats per user/worker */ -static K_TREE *userstats_ccl_root; -static K_STORE *userstats_ccl; +static K_TREE *userstats_db_root; +static K_STORE *userstats_db; // size limit on the command string #define CMD_SIZ 31 @@ -313,9 +340,9 @@ static const tv_t default_expiry = { DEFAULT_EXPIRY, 0L }; #define DATE_uS_EOT 0L static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; -#define HISTORYDATEINIT(_row, _now, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_now)->tv_sec; \ - _row->createdate.tv_usec = (_now)->tv_usec; \ +#define HISTORYDATEINIT(_row, _cd, _by, _code, _inet) do { \ + _row->createdate.tv_sec = (_cd)->tv_sec; \ + _row->createdate.tv_usec = (_cd)->tv_usec; \ STRNCPY(_row->createby, _by); \ STRNCPY(_row->createcode, _code); \ STRNCPY(_row->createinet, _inet); \ @@ -326,19 +353,6 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; // Override _row defaults if transfer fields are present #define HISTORYDATETRANSFER(_row) do { \ K_ITEM *item; \ - item = optional_name("createdate", 10, NULL); \ - if (item) { \ - long sec, usec; \ - int n; \ - n = sscanf(DATA_TRANSFER(item)->data, "%ld,%ld", &sec, &usec); \ - if (n > 0) { \ - _row->createdate.tv_sec = (time_t)sec; \ - if (n > 1) \ - _row->createdate.tv_usec = usec / 1000; \ - else \ - _row->createdate.tv_usec = 0L; \ - } \ - } \ item = optional_name("createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ @@ -421,9 +435,9 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; _params[_mod_pos++] = str_to_buf(_row->modifyinet, NULL, 0); \ } while (0) -#define MODIFYDATEINIT(_row, _now, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_now)->tv_sec; \ - _row->createdate.tv_usec = (_now)->tv_usec; \ +#define MODIFYDATEINIT(_row, _cd, _by, _code, _inet) do { \ + _row->createdate.tv_sec = (_cd)->tv_sec; \ + _row->createdate.tv_usec = (_cd)->tv_usec; \ STRNCPY(_row->createby, _by); \ STRNCPY(_row->createcode, _code); \ STRNCPY(_row->createinet, _inet); \ @@ -434,9 +448,9 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; _row->modifyinet[0] = '\0'; \ } while (0) -#define MODIFYUPDATE(_row, _now, _by, _code, _inet) do { \ - _row->modifydate.tv_sec = (_now)->tv_sec; \ - _row->modifydate.tv_usec = (_now)->tv_usec; \ +#define MODIFYUPDATE(_row, _cd, _by, _code, _inet) do { \ + _row->modifydate.tv_sec = (_cd)->tv_sec; \ + _row->modifydate.tv_usec = (_cd)->tv_usec; \ STRNCPY(_row->modifyby, _by); \ STRNCPY(_row->modifycode, _code); \ STRNCPY(_row->modifyinet, _inet); \ @@ -479,17 +493,17 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; _params[_his_pos++] = str_to_buf(_row->createinet, NULL, 0); \ } while (0) -#define SIMPLEDATEINIT(_row, _now, _by, _code, _inet) do { \ - _row->createdate.tv_sec = (_now)->tv_sec; \ - _row->createdate.tv_usec = (_now)->tv_usec; \ +#define SIMPLEDATEINIT(_row, _cd, _by, _code, _inet) do { \ + _row->createdate.tv_sec = (_cd)->tv_sec; \ + _row->createdate.tv_usec = (_cd)->tv_usec; \ STRNCPY(_row->createby, _by); \ STRNCPY(_row->createcode, _code); \ STRNCPY(_row->createinet, _inet); \ } while (0) -#define SIMPLEDATEDEFAULT(_row, _now ) do { \ - _row->createdate.tv_sec = (_now)->tv_sec; \ - _row->createdate.tv_usec = (_now)->tv_usec; \ +#define SIMPLEDATEDEFAULT(_row, _cd) do { \ + _row->createdate.tv_sec = (_cd)->tv_sec; \ + _row->createdate.tv_usec = (_cd)->tv_usec; \ STRNCPY(_row->createby, (char *)"code"); \ STRNCPY(_row->createcode, (char *)__func__); \ STRNCPY(_row->createinet, (char *)"127.0.0.1"); \ @@ -498,19 +512,6 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; // Override _row defaults if transfer fields are present #define SIMPLEDATETRANSFER(_row) do { \ K_ITEM *item; \ - item = optional_name("createdate", 10, NULL); \ - if (item) { \ - long sec, usec; \ - int n; \ - n = sscanf(DATA_TRANSFER(item)->data, "%ld,%ld", &sec, &usec); \ - if (n > 0) { \ - _row->createdate.tv_sec = (time_t)sec; \ - if (n > 1) \ - _row->createdate.tv_usec = usec / 1000; \ - else \ - _row->createdate.tv_usec = 0L; \ - } \ - } \ item = optional_name("createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ @@ -560,6 +561,8 @@ static const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; } \ } while (0) +// Different input data handling +static bool reloading = false; // Tell the summarizer data load is complete static bool summarizer_go = false; // Tell the summarizer to die @@ -808,8 +811,7 @@ static K_STORE *workinfo_store; // one in the current block static K_ITEM *workinfo_current; // first workinfo of current block -// TODO: have it's own memory? -static tv_t *last_bc; +static tv_t last_bc; // SHARES shares.id.json={...} typedef struct shares { @@ -1111,6 +1113,8 @@ static K_STORE *userstats_summ; #define tv_newer(_old, _new) (((_old)->tv_sec == (_new)->tv_sec) ? \ ((_old)->tv_usec < (_new)->tv_usec) : \ ((_old)->tv_sec < (_new)->tv_sec)) +#define tv_equal(_a, _b) (((_a)->tv_sec == (_b)->tv_sec) && \ + ((_a)->tv_usec == (_b)->tv_usec)) // WORKERSTATUS from various incoming data typedef struct workerstatus { @@ -1320,7 +1324,7 @@ static K_ITEM *_require_name(char *name, int len, char *patt, char *reply, return item; } -static void txt_to_data(enum data_type typ, char *nam, char *fld, void *data, size_t siz) +static void _txt_to_data(enum data_type typ, char *nam, char *fld, void *data, size_t siz, WHERE_FFL_ARGS) { char *tmp; @@ -1328,29 +1332,32 @@ static void txt_to_data(enum data_type typ, char *nam, char *fld, void *data, si case TYPE_STR: // A database field being bigger than local storage is a fatal error if (siz < (strlen(fld)+1)) { - quithere(1, "Field %s structure size %d is smaller than db %d", - nam, (int)siz, (int)strlen(fld)+1); + quithere(1, "Field %s structure size %d is smaller than db %d" WHERE_FFL, + nam, (int)siz, (int)strlen(fld)+1, WHERE_FFL_PASS); } strcpy((char *)data, fld); break; case TYPE_BIGINT: if (siz != sizeof(int64_t)) { - quithere(1, "Field %s bigint incorrect structure size %d - should be %d", - nam, (int)siz, (int)sizeof(int64_t)); + quithere(1, "Field %s bigint incorrect structure size %d - should be %d" + WHERE_FFL, + nam, (int)siz, (int)sizeof(int64_t), WHERE_FFL_PASS); } *((long long *)data) = atoll(fld); break; case TYPE_INT: if (siz != sizeof(int32_t)) { - quithere(1, "Field %s int incorrect structure size %d - should be %d", - nam, (int)siz, (int)sizeof(int32_t)); + quithere(1, "Field %s int incorrect structure size %d - should be %d" + WHERE_FFL, + nam, (int)siz, (int)sizeof(int32_t), WHERE_FFL_PASS); } *((int32_t *)data) = atoi(fld); break; case TYPE_TV: if (siz != sizeof(tv_t)) { - quithere(1, "Field %s timeval incorrect structure size %d - should be %d", - nam, (int)siz, (int)sizeof(tv_t)); + quithere(1, "Field %s timeval incorrect structure size %d - should be %d" + WHERE_FFL, + nam, (int)siz, (int)sizeof(tv_t), WHERE_FFL_PASS); } unsigned int yyyy, mm, dd, HH, MM, SS, uS = 0, tz; struct tm tm; @@ -1363,8 +1370,8 @@ static void txt_to_data(enum data_type typ, char *nam, char *fld, void *data, si n = sscanf(fld, "%u-%u-%u %u:%u:%u.%u+%u", &yyyy, &mm, &dd, &HH, &MM, &SS, &uS, &tz); if (n != 8) { - quithere(1, "Field %s timeval unhandled date '%s' (%d)", - nam, fld, n); + quithere(1, "Field %s timeval unhandled date '%s' (%d)" WHERE_FFL, + nam, fld, n, WHERE_FFL_PASS); } } tm.tm_sec = (int)SS; @@ -1386,19 +1393,20 @@ static void txt_to_data(enum data_type typ, char *nam, char *fld, void *data, si break; case TYPE_CTV: if (siz != sizeof(tv_t)) { - quithere(1, "Field %s timeval incorrect structure size %d - should be %d", - nam, (int)siz, (int)sizeof(tv_t)); + quithere(1, "Field %s timeval incorrect structure size %d - should be %d" + WHERE_FFL, + nam, (int)siz, (int)sizeof(tv_t), WHERE_FFL_PASS); } - long sec, usec; + long sec, nsec; int c; + // Caller test for tv_sec=0 for failure ((tv_t *)data)->tv_sec = 0L; ((tv_t *)data)->tv_usec = 0L; - c = sscanf(fld, "%ld,%ld", &sec, &usec); - // For converting msg fields - so not fatal if it's no good + c = sscanf(fld, "%ld,%ld", &sec, &nsec); if (c > 0) { ((tv_t *)data)->tv_sec = (time_t)sec; if (c > 1) - ((tv_t *)data)->tv_usec = usec; + ((tv_t *)data)->tv_usec = (nsec + 500) / 1000; if (((tv_t *)data)->tv_sec >= COMPARE_EXPIRY) { ((tv_t *)data)->tv_sec = default_expiry.tv_sec; ((tv_t *)data)->tv_usec = default_expiry.tv_usec; @@ -1407,61 +1415,73 @@ static void txt_to_data(enum data_type typ, char *nam, char *fld, void *data, si break; case TYPE_BLOB: tmp = strdup(fld); - if (!tmp) - quithere(1, "Field %s (%d) OOM", nam, (int)strlen(fld)); + if (!tmp) { + quithere(1, "Field %s (%d) OOM" WHERE_FFL, + nam, (int)strlen(fld), WHERE_FFL_PASS); + } *((char **)data) = tmp; break; case TYPE_DOUBLE: if (siz != sizeof(double)) { - quithere(1, "Field %s int incorrect structure size %d - should be %d", - nam, (int)siz, (int)sizeof(double)); + quithere(1, "Field %s int incorrect structure size %d - should be %d" + WHERE_FFL, + nam, (int)siz, (int)sizeof(double), WHERE_FFL_PASS); } *((double *)data) = atof(fld); break; default: - quithere(1, "Unknown field %s (%d) to convert", nam, (int)typ); + quithere(1, "Unknown field %s (%d) to convert" WHERE_FFL, + nam, (int)typ, WHERE_FFL_PASS); break; } } +#define txt_to_str(_nam, _fld, _data, _siz) _txt_to_str(_nam, _fld, _data, _siz, WHERE_FFL_HERE) +#define txt_to_bigint(_nam, _fld, _data, _siz) _txt_to_bigint(_nam, _fld, _data, _siz, WHERE_FFL_HERE) +#define txt_to_int(_nam, _fld, _data, _siz) _txt_to_int(_nam, _fld, _data, _siz, WHERE_FFL_HERE) +#define txt_to_tv(_nam, _fld, _data, _siz) _txt_to_tv(_nam, _fld, _data, _siz, WHERE_FFL_HERE) +#define txt_to_ctv(_nam, _fld, _data, _siz) _txt_to_ctv(_nam, _fld, _data, _siz, WHERE_FFL_HERE) +#define txt_to_blob(_nam, _fld, _data) _txt_to_blob(_nam, _fld, _data, WHERE_FFL_HERE) +#define txt_to_double(_nam, _fld, _data, _siz) _txt_to_double(_nam, _fld, _data, _siz, WHERE_FFL_HERE) + // N.B. STRNCPY* macros truncate, whereas this aborts ckdb if src > trg -static void txt_to_str(char *nam, char *fld, char data[], size_t siz) +static void _txt_to_str(char *nam, char *fld, char data[], size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_STR, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_STR, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static void txt_to_bigint(char *nam, char *fld, int64_t *data, size_t siz) +static void _txt_to_bigint(char *nam, char *fld, int64_t *data, size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_BIGINT, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_BIGINT, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static void txt_to_int(char *nam, char *fld, int32_t *data, size_t siz) +static void _txt_to_int(char *nam, char *fld, int32_t *data, size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_INT, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_INT, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static void txt_to_tv(char *nam, char *fld, tv_t *data, size_t siz) +static void _txt_to_tv(char *nam, char *fld, tv_t *data, size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_TV, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_TV, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } // Convert msg S,nS to tv_t -static void txt_to_ctv(char *nam, char *fld, tv_t *data, size_t siz) +static void _txt_to_ctv(char *nam, char *fld, tv_t *data, size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_CTV, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_CTV, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static void txt_to_blob(char *nam, char *fld, char *data) +static void _txt_to_blob(char *nam, char *fld, char *data, WHERE_FFL_ARGS) { - txt_to_data(TYPE_BLOB, nam, fld, (void *)(&data), 0); + _txt_to_data(TYPE_BLOB, nam, fld, (void *)(&data), 0, WHERE_FFL_PASS); } -static void txt_to_double(char *nam, char *fld, double *data, size_t siz) +static void _txt_to_double(char *nam, char *fld, double *data, size_t siz, WHERE_FFL_ARGS) { - txt_to_data(TYPE_DOUBLE, nam, fld, (void *)data, siz); + _txt_to_data(TYPE_DOUBLE, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static char *data_to_buf(enum data_type typ, void *data, char *buf, size_t siz) +static char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_FFL_ARGS) { struct tm tm; char *buf2; @@ -1489,13 +1509,14 @@ static char *data_to_buf(enum data_type typ, void *data, char *buf, size_t siz) siz = DOUBLE_BUFSIZ; break; default: - quithere(1, "Unknown field (%d) to convert", (int)typ); + quithere(1, "Unknown field (%d) to convert" WHERE_FFL, + (int)typ, WHERE_FFL_PASS); break; } buf = malloc(siz); if (!buf) - quithere(1, "OOM (%d)", (int)siz); + quithere(1, "OOM (%d)" WHERE_FFL, (int)siz, WHERE_FFL_PASS); } switch (typ) { @@ -1511,8 +1532,10 @@ static char *data_to_buf(enum data_type typ, void *data, char *buf, size_t siz) break; case TYPE_TV: buf2 = malloc(siz); - if (!buf2) - quithere(1, "OOM (%d)", (int)siz); + if (!buf2) { + quithere(1, "OOM (%d)" WHERE_FFL, + (int)siz, WHERE_FFL_PASS); + } localtime_r(&(((struct timeval *)data)->tv_sec), &tm); strftime(buf2, siz, "%Y-%m-%d %H:%M:%S", &tm); snprintf(buf, siz, "%s.%06ld", buf2, @@ -1535,48 +1558,57 @@ static char *data_to_buf(enum data_type typ, void *data, char *buf, size_t siz) return buf; } -static char *str_to_buf(char data[], char *buf, size_t siz) +#define str_to_buf(_data, _buf, _siz) _str_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define bigint_to_buf(_data, _buf, _siz) _bigint_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define int_to_buf(_data, _buf, _siz) _int_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define tv_to_buf(_data, _buf, _siz) _tv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define ctv_to_buf(_data, _buf, _siz) _ctv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define tvs_to_buf(_data, _buf, _siz) _tvs_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +//#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) + +static char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_STR, (void *)data, buf, siz); + return _data_to_buf(TYPE_STR, (void *)data, buf, siz, WHERE_FFL_PASS); } -static char *bigint_to_buf(int64_t data, char *buf, size_t siz) +static char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_BIGINT, (void *)(&data), buf, siz); + return _data_to_buf(TYPE_BIGINT, (void *)(&data), buf, siz, WHERE_FFL_PASS); } -static char *int_to_buf(int32_t data, char *buf, size_t siz) +static char *_int_to_buf(int32_t data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_INT, (void *)(&data), buf, siz); + return _data_to_buf(TYPE_INT, (void *)(&data), buf, siz, WHERE_FFL_PASS); } -static char *tv_to_buf(tv_t *data, char *buf, size_t siz) +static char *_tv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_TV, (void *)data, buf, siz); + return _data_to_buf(TYPE_TV, (void *)data, buf, siz, WHERE_FFL_PASS); } // Convert tv to S,uS -static char *ctv_to_buf(tv_t *data, char *buf, size_t siz) +static char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_CTV, (void *)data, buf, siz); + return _data_to_buf(TYPE_CTV, (void *)data, buf, siz, WHERE_FFL_PASS); } // Convert tv to seconds (ignore uS) -static char *tvs_to_buf(tv_t *data, char *buf, size_t siz) +static char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_TVS, (void *)data, buf, siz); + return _data_to_buf(TYPE_TVS, (void *)data, buf, siz, WHERE_FFL_PASS); } /* unused yet -static char *blob_to_buf(char *data, char *buf, size_t siz) +static char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_BLOB, (void *)data, buf, siz); + return _data_to_buf(TYPE_BLOB, (void *)data, buf, siz, WHERE_FFL_PASS); } */ -static char *double_to_buf(double data, char *buf, size_t siz) +static char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS) { - return data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz); + return _data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz, WHERE_FFL_PASS); } static PGconn *dbconnect() @@ -1594,7 +1626,7 @@ static PGconn *dbconnect() } static int64_t nextid(PGconn *conn, char *idname, int64_t increment, - tv_t *now, char *by, char *code, char *inet) + tv_t *cd, char *by, char *code, char *inet) { ExecStatusType rescode; PGresult *res; @@ -1649,7 +1681,7 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, par = 0; params[par++] = bigint_to_buf(lastid, NULL, 0); - params[par++] = tv_to_buf(now, NULL, 0); + params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = str_to_buf(by, NULL, 0); params[par++] = str_to_buf(code, NULL, 0); params[par++] = str_to_buf(inet, NULL, 0); @@ -1720,6 +1752,11 @@ static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool #define find_create_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, true) #define find_workerstatus(_u, _w) _find_create_workerstatus(_u, _w, false) +/* TODO: Change this to calculate after the DB load rather than during + for: shares, userstats and sharesummary + auths will fully populate the workerstats tree to use for the + reverse search back into the shares, userstats and sharesummary + for the latest records */ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats, SHARESUMMARY *sharesummary) { @@ -2043,8 +2080,8 @@ static K_ITEM *find_workers(int64_t userid, char *workername) static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, char *difficultydefault, char *idlenotificationenabled, - char *idlenotificationtime, tv_t *now, char *by, - char *code, char *inet) + char *idlenotificationtime, char *by, + char *code, char *inet, tv_t *cd) { ExecStatusType rescode; PGresult *res; @@ -2065,7 +2102,7 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, row = DATA_WORKERS(item); - row->workerid = nextid(conn, "workerid", (int64_t)1, now, by, code, inet); + row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet); if (row->workerid == 0) goto unitem; @@ -2101,7 +2138,7 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, } else row->idlenotificationtime = IDLENOTIFICATIONTIME_DEF; - HISTORYDATEINIT(row, now, by, code, inet); + HISTORYDATEINIT(row, cd, by, code, inet); par = 0; params[par++] = bigint_to_buf(row->workerid, NULL, 0); @@ -2145,7 +2182,7 @@ unitem: static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, char *idlenotificationenabled, char *idlenotificationtime, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd) { ExecStatusType rescode; PGresult *res; @@ -2189,7 +2226,7 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, } else nottime = row->idlenotificationtime; - HISTORYDATEINIT(row, now, by, code, inet); + HISTORYDATEINIT(row, cd, by, code, inet); if (diffdef == row->difficultydefault && idlenot == row->idlenotificationenabled[0] && @@ -2199,7 +2236,7 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, } else { upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3"; par = 0; - params[par++] = tv_to_buf(now, NULL, 0); + params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(row->workerid, NULL, 0); params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); @@ -2267,8 +2304,8 @@ early: static K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *workername, char *diffdef, char *idlenotificationenabled, - char *idlenotificationtime, tv_t *now, char *by, - char *code, char *inet) + char *idlenotificationtime, char *by, + char *code, char *inet, tv_t *cd) { K_ITEM *item; @@ -2276,19 +2313,19 @@ static K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *worke if (item) { if (update) { workers_update(conn, item, diffdef, idlenotificationenabled, - idlenotificationtime, now, by, code, inet); + idlenotificationtime, by, code, inet, cd); } } else { // TODO: limit how many? item = workers_add(conn, userid, workername, diffdef, idlenotificationenabled, idlenotificationtime, - now, by, code, inet); + by, code, inet, cd); } return item; } static K_ITEM *new_default_worker(PGconn *conn, bool update, int64_t userid, char *workername, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd) { bool conned = false; K_ITEM *item; @@ -2300,7 +2337,7 @@ static K_ITEM *new_default_worker(PGconn *conn, bool update, int64_t userid, cha item = new_worker(conn, update, userid, workername, DIFFICULTYDEFAULT_DEF_STR, IDLENOTIFICATIONENABLED_DEF, IDLENOTIFICATIONTIME_DEF_STR, - now, by, code, inet); + by, code, inet, cd); if (conned) PQfinish(conn); @@ -2652,10 +2689,11 @@ static K_ITEM *find_workinfo(int64_t workinfoid) static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance, char *transactiontree, char *merklehash, char *prevhash, char *coinbase1, char *coinbase2, char *version, - char *bits, char *ntime, char *reward, - tv_t *now, char *by, char *code, char *inet) + char *bits, char *ntime, char *reward, char *by, + char *code, char *inet, tv_t *cd, bool igndup) { ExecStatusType rescode; + K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; int n; @@ -2685,9 +2723,17 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc STRNCPY(row->ntime, ntime); TXT_TO_BIGINT("reward", reward, row->reward); - HISTORYDATEINIT(row, now, by, code, inet); + HISTORYDATEINIT(row, cd, by, code, inet); HISTORYDATETRANSFER(row); + if (igndup && find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { + workinfoid = row->workinfoid; + K_WLOCK(workinfo_free); + k_add_head(workinfo_free, item); + K_WUNLOCK(workinfo_free); + return workinfoid; + } + par = 0; params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); params[par++] = str_to_buf(row->poolinstance, NULL, 0); @@ -2735,11 +2781,11 @@ unparam: workinfo_root = add_to_ktree(workinfo_root, item, cmp_workinfo); k_add_head(workinfo_store, item); - // Remember the bc 'now' when the height changes + // Remember the bc = 'cd' when the height changes if (workinfo_current) { if (cmp_height(DATA_WORKINFO(workinfo_current)->coinbase1, DATA_WORKINFO(item)->coinbase1) != 0) - last_bc = &(DATA_WORKINFO(item)->createdate); + copy_tv(&last_bc, cd); } workinfo_current = item; @@ -2749,20 +2795,25 @@ unparam: return workinfoid; } -static double cmp_shares(K_ITEM *a, K_ITEM *b); +#define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ + _sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ + WHERE_FFL_HERE) + +static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, + char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS); static double cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b); -static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, - tv_t *now, char *by, char *code, char *inet); +static double cmp_shares(K_ITEM *a, K_ITEM *b); static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item; K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1]; int64_t workinfoid; SHARESUMMARY sharesummary; SHARES shares; - bool ok = false, conned = false; + bool ok = false, conned = false, skipupdate; + char error[1024]; LOGDEBUG("%s(): complete", __func__); @@ -2784,17 +2835,26 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, ss_look.data = (void *)(&sharesummary); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx); while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == workinfoid) { - if (conn == NULL) { - conn = dbconnect(); - conned = true; + error[0] = '\0'; + skipupdate = false; + if (reloading) { + if (DATA_SHARESUMMARY(ss_item)->complete[0] == SUMMARY_AGED) + skipupdate = true; } - if (!sharesummary_update(conn, NULL, NULL, ss_item, now, by, code, inet)) { - LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64, - __func__, DATA_SHARESUMMARY(ss_item)->userid, - DATA_SHARESUMMARY(ss_item)->workername, - DATA_SHARESUMMARY(ss_item)->workinfoid); - ok = false; + if (!skipupdate) { + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + if (!sharesummary_update(conn, NULL, NULL, ss_item, by, code, inet, cd)) { + LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64, + __func__, DATA_SHARESUMMARY(ss_item)->userid, + DATA_SHARESUMMARY(ss_item)->workername, + DATA_SHARESUMMARY(ss_item)->workinfoid); + ok = false; + } } // Discard the shares either way @@ -2816,11 +2876,21 @@ static bool workinfo_age(PGconn *conn, char *workinfoidstr, char *poolinstance, tmp_item = next_in_ktree(s_ctx); shares_root = remove_from_ktree(shares_root, s_item, cmp_shares, tmp_ctx); k_unlink_item(shares_store, s_item); + if (reloading && skipupdate && !error[0]) { + snprintf(error, sizeof(error), + "reload found aged shares: %"PRId64"/%"PRId64"/%s", + DATA_SHARES(s_item)->workinfoid, + DATA_SHARES(s_item)->userid, + DATA_SHARES(s_item)->workername); + } k_add_head(shares_free, s_item); s_item = tmp_item; } K_WUNLOCK(shares_free); ss_item = next_in_ktree(ss_ctx); + + if (error[0]) + LOGERR("%s(): %s", __func__, error); } if (conned) @@ -3010,12 +3080,29 @@ static double cmp_shares(K_ITEM *a, K_ITEM *b) return c; } +static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd) +{ + row->diffacc = row->diffsta = row->diffdup = row->diffhi = + row->diffrej = row->shareacc = row->sharesta = row->sharedup = + row->sharehi = row->sharerej = 0.0; + row->sharecount = row->errorcount = row->countlastupdate = 0; + row->reset = false; + row->firstshare.tv_sec = cd->tv_sec; + row->firstshare.tv_usec = cd->tv_usec; + row->lastshare.tv_sec = row->firstshare.tv_sec; + row->lastshare.tv_usec = row->firstshare.tv_usec; + row->complete[0] = SUMMARY_NEW; + row->complete[1] = '\0'; +} + +static K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid); + // Memory (and log file) only static bool shares_add(char *workinfoid, char *username, char *workername, char *clientid, char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff, - char *secondaryuserid, tv_t *now, char *by, char *code, char *inet) + char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd) { - K_ITEM *s_item, *u_item, *wi_item, *w_item; + K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHARES *shares; bool ok = false; @@ -3044,7 +3131,7 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char TXT_TO_DOUBLE("sdiff", sdiff, shares->sdiff); STRNCPY(shares->secondaryuserid, secondaryuserid); - HISTORYDATEINIT(shares, now, by, code, inet); + HISTORYDATEINIT(shares, cd, by, code, inet); HISTORYDATETRANSFER(shares); wi_item = find_workinfo(shares->workinfoid); @@ -3052,13 +3139,30 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char goto unitem; w_item = new_default_worker(NULL, false, shares->userid, shares->workername, - now, by, code, inet); + by, code, inet, cd); if (!w_item) goto unitem; + if (reloading) { + ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid); + if (ss_item) { + if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW) { + K_WLOCK(shares_free); + k_add_head(shares_free, s_item); + K_WUNLOCK(shares_free); + return true; + } + + if (!DATA_SHARESUMMARY(ss_item)->reset) { + zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd); + DATA_SHARESUMMARY(ss_item)->reset = true; + } + } + } + workerstatus_update(NULL, shares, NULL, NULL); - sharesummary_update(NULL, shares, NULL, NULL, now, by, code, inet); + sharesummary_update(NULL, shares, NULL, NULL, by, code, inet, cd); ok = true; unitem: @@ -3103,11 +3207,12 @@ static double cmp_shareerrors(K_ITEM *a, K_ITEM *b) } // Memory (and log file) only +// TODO: handle shareerrors that appear after a workinfoid is aged or doesn't exist? static bool shareerrors_add(char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *error, char *secondaryuserid, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd) { - K_ITEM *s_item, *u_item, *wi_item, *w_item; + K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHAREERRORS *shareerrors; bool ok = false; @@ -3133,7 +3238,7 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername, STRNCPY(shareerrors->error, error); STRNCPY(shareerrors->secondaryuserid, secondaryuserid); - HISTORYDATEINIT(shareerrors, now, by, code, inet); + HISTORYDATEINIT(shareerrors, cd, by, code, inet); HISTORYDATETRANSFER(shareerrors); wi_item = find_workinfo(shareerrors->workinfoid); @@ -3141,11 +3246,28 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername, goto unitem; w_item = new_default_worker(NULL, false, shareerrors->userid, shareerrors->workername, - now, by, code, inet); + by, code, inet, cd); if (!w_item) goto unitem; - sharesummary_update(NULL, NULL, shareerrors, NULL, now, by, code, inet); + if (reloading) { + ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid); + if (ss_item) { + if (DATA_SHARESUMMARY(ss_item)->complete[0] != SUMMARY_NEW) { + K_WLOCK(shareerrors_free); + k_add_head(shareerrors_free, s_item); + K_WUNLOCK(shareerrors_free); + return true; + } + + if (!DATA_SHARESUMMARY(ss_item)->reset) { + zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd); + DATA_SHARESUMMARY(ss_item)->reset = true; + } + } + } + + sharesummary_update(NULL, NULL, shareerrors, NULL, by, code, inet, cd); ok = true; unitem: @@ -3234,8 +3356,8 @@ static K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t worki return find_in_ktree(sharesummary_root, &look, cmp_sharesummary, ctx); } -static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, - tv_t *now, char *by, char *code, char *inet) +static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, + char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) { ExecStatusType rescode; PGresult *res; @@ -3254,8 +3376,9 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, if (ss_item) { if (s_row || e_row) { - quithere(1, "ERR: %s() only one of s_row, e_row and ss_item allowed", - __func__); + quithere(1, "ERR: only one of s_row, e_row and " + "ss_item allowed" WHERE_FFL, + WHERE_FFL_PASS); } new = false; item = ss_item; @@ -3266,8 +3389,9 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, } else { if (s_row) { if (e_row) { - quithere(1, "ERR: %s() only one of s_row, e_row (and ss_item) allowed", - __func__); + quithere(1, "ERR: only one of s_row, e_row " + "(and ss_item) allowed" WHERE_FFL, + WHERE_FFL_PASS); } userid = s_row->userid; workername = s_row->workername; @@ -3275,8 +3399,9 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, sharecreatedate = &(s_row->createdate); } else { if (!e_row) { - quithere(1, "ERR: %s() all s_row, e_row and ss_item are NULL", - __func__); + quithere(1, "ERR: all s_row, e_row and " + "ss_item are NULL" WHERE_FFL, + WHERE_FFL_PASS); } userid = e_row->userid; workername = e_row->workername; @@ -3297,18 +3422,9 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, row->userid = userid; STRNCPY(row->workername, workername); row->workinfoid = workinfoid; - row->diffacc = row->diffsta = row->diffdup = row->diffhi = - row->diffrej = row->shareacc = row->sharesta = row->sharedup = - row->sharehi = row->sharerej = 0.0; - row->sharecount = row->errorcount = row->countlastupdate = 0; + zero_sharesummary(row, sharecreatedate); row->inserted = false; row->saveaged = false; - row->firstshare.tv_sec = sharecreatedate->tv_sec; - row->firstshare.tv_usec = sharecreatedate->tv_usec; - row->lastshare.tv_sec = row->firstshare.tv_sec; - row->lastshare.tv_usec = row->firstshare.tv_usec; - row->complete[0] = SUMMARY_NEW; - row->complete[1] = '\0'; } if (e_row) @@ -3382,7 +3498,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, } if (new || !(row->inserted)) { - MODIFYDATEINIT(row, now, by, code, inet); + MODIFYDATEINIT(row, cd, by, code, inet); par = 0; params[par++] = bigint_to_buf(row->userid, NULL, 0); @@ -3426,7 +3542,7 @@ static bool sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, } else { bool stats_update = false; - MODIFYUPDATE(row, now, by, code, inet); + MODIFYUPDATE(row, cd, by, code, inet); if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) < (row->sharecount + row->errorcount)) @@ -3745,10 +3861,12 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, char *workinfoid, char *username, char *workername, char *clientid, char *enonce1, char *nonce2, char *nonce, char *reward, char *confirmed, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd, + bool igndup) { ExecStatusType rescode; PGresult *res = NULL; + K_TREE_CTX ctx[1]; K_ITEM *item, *u_item; BLOCKS *row; char *upd, *ins; @@ -3769,7 +3887,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, STRNCPY(row->blockhash, blockhash); STRNCPY(row->confirmed, confirmed); - HISTORYDATEINIT(row, now, by, code, inet); + HISTORYDATEINIT(row, cd, by, code, inet); switch (confirmed[0]) { case BLOCKS_NEW: @@ -3787,6 +3905,13 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, HISTORYDATETRANSFER(row); + if (igndup && find_in_ktree(blocks_root, item, cmp_blocks, ctx)) { + K_WLOCK(blocks_free); + k_add_head(blocks_free, item); + K_WUNLOCK(blocks_free); + return true; + } + par = 0; params[par++] = int_to_buf(row->height, NULL, 0); params[par++] = str_to_buf(row->blockhash, NULL, 0); @@ -3815,9 +3940,10 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, } break; case BLOCKS_CONFIRM: + // TODO: ignore a duplicate if igndup upd = "update blocks set expirydate=$1 where blockhash=$2 and expirydate=$3"; par = 0; - params[par++] = tv_to_buf(now, NULL, 0); + params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = str_to_buf(row->blockhash, NULL, 0); params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); @@ -3844,7 +3970,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, par = 0; params[par++] = str_to_buf(row->blockhash, NULL, 0); - params[par++] = tv_to_buf(now, NULL, 0); + params[par++] = tv_to_buf(cd, NULL, 0); HISTORYDATEPARAMS(params, par, row); PARCHKVAL(par, 2 + HISTORYDATECOUNT, params); // 7 as per ins @@ -3997,6 +4123,9 @@ static bool blocks_fill(PGconn *conn) blocks_root = add_to_ktree(blocks_root, item, cmp_blocks); k_add_head(blocks_store, item); + + if (tv_newer(&(dbstatus.newest_createdate_blocks), &(row->createdate))) + copy_tv(&(dbstatus.newest_createdate_blocks), &(row->createdate)); } if (!ok) k_add_head(blocks_free, item); @@ -4047,12 +4176,13 @@ static double cmp_auths(K_ITEM *a, K_ITEM *b) } static char *auths_add(PGconn *conn, char *poolinstance, char *username, - char *workername, char *clientid, - char *enonce1, char *useragent, - tv_t *now, char *by, char *code, char *inet) + char *workername, char *clientid, char *enonce1, + char *useragent, char *by, char *code, char *inet, + tv_t *cd, bool igndup) { ExecStatusType rescode; PGresult *res; + K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item; int n; AUTHS *row; @@ -4075,21 +4205,29 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, STRNCPY(row->poolinstance, poolinstance); row->userid = DATA_USERS(u_item)->userid; + // since update=false, a dup will be ok and do nothing when igndup=true new_worker(conn, false, row->userid, workername, DIFFICULTYDEFAULT_DEF_STR, - IDLENOTIFICATIONENABLED_DEF, IDLENOTIFICATIONTIME_DEF_STR, now, - by, code, inet); + IDLENOTIFICATIONENABLED_DEF, IDLENOTIFICATIONTIME_DEF_STR, + by, code, inet, cd); STRNCPY(row->workername, workername); TXT_TO_INT("clientid", clientid, row->clientid); STRNCPY(row->enonce1, enonce1); STRNCPY(row->useragent, useragent); - HISTORYDATEINIT(row, now, by, code, inet); + HISTORYDATEINIT(row, cd, by, code, inet); HISTORYDATETRANSFER(row); + if (igndup && find_in_ktree(auths_root, a_item, cmp_auths, ctx)) { + K_WLOCK(auths_free); + k_add_head(auths_free, a_item); + K_WUNLOCK(auths_free); + return DATA_USERS(u_item)->secondaryuserid; + } + // Update even if DB fails workerstatus_update(row, NULL, NULL, NULL); - row->authid = nextid(conn, "authid", (int64_t)1, now, by, code, inet); + row->authid = nextid(conn, "authid", (int64_t)1, cd, by, code, inet); if (row->authid == 0) goto unitem; @@ -4267,10 +4405,12 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, char *elapsed, char *users, char *workers, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, - tv_t *now, char *by, char *code, char *inet) + char *by, char *code, char *inet, tv_t *cd, + bool igndup) { ExecStatusType rescode; PGresult *res; + K_TREE_CTX ctx[1]; K_ITEM *p_item; int n; POOLSTATS *row; @@ -4296,9 +4436,16 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, TXT_TO_DOUBLE("hashrate1hr", hashrate1hr, row->hashrate1hr); TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr); - SIMPLEDATEINIT(row, now, by, code, inet); + SIMPLEDATEINIT(row, cd, by, code, inet); SIMPLEDATETRANSFER(row); + if (igndup && find_in_ktree(poolstats_root, p_item, cmp_poolstats, ctx)) { + K_WLOCK(poolstats_free); + k_add_head(poolstats_free, p_item); + K_WUNLOCK(poolstats_free); + return true; + } + par = 0; if (store) { params[par++] = str_to_buf(row->poolinstance, NULL, 0); @@ -4593,11 +4740,12 @@ unparam: static bool userstats_add(char *poolinstance, char *elapsed, char *username, char *workername, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, bool idle, - bool eos, tv_t *now, char *by, char *code, char *inet) + bool eos, char *by, char *code, char *inet, tv_t *cd) { - K_ITEM *us_item, *u_item, *us_match, *us_next; + K_ITEM *us_item, *u_item, *us_match, *us_next, *db_match; + K_TREE_CTX ctx[1]; USERSTATS *row; - tv_t createdate; + tv_t eosdate; LOGDEBUG("%s(): add", __func__); @@ -4622,14 +4770,34 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, row->summarylevel[0] = SUMMARY_NONE; row->summarylevel[1] = '\0'; row->summarycount = 1; - SIMPLEDATEINIT(row, now, by, code, inet); + SIMPLEDATEINIT(row, cd, by, code, inet); SIMPLEDATETRANSFER(row); copy_tv(&(row->statsdate), &(row->createdate)); if (eos) { // Save it for end processing - createdate.tv_sec = row->createdate.tv_sec; - createdate.tv_usec = row->createdate.tv_usec; + eosdate.tv_sec = row->createdate.tv_sec; + eosdate.tv_usec = row->createdate.tv_usec; + } + + if (reloading) { + /* If the db load said the statsdate for this userid+workername + * is already summarised then we discard it */ + db_match = find_in_ktree(userstats_db_root, us_item, + cmp_userstats_workername, ctx); + if (db_match && + !tv_newer(&(DATA_USERSTATS(db_match)->statsdate), cd)) { + K_WLOCK(userstats_free); + k_add_head(userstats_free, us_item); + K_WUNLOCK(userstats_free); + + /* If this was an eos record and eos_store has data, + * it means we need to process the eos_store */ + if (eos && userstats_eos_store->count > 0) + goto advancetogo; + + return true; + } } workerstatus_update(NULL, NULL, row, NULL); @@ -4660,13 +4828,14 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, } if (eos) { +advancetogo: K_WLOCK(userstats_free); us_next = userstats_eos_store->head; while (us_next) { - if (tvdiff(&DATA_USERSTATS(us_next)->createdate, &createdate) != 0.0) { + if (tvdiff(&DATA_USERSTATS(us_next)->createdate, &eosdate) != 0.0) { char date_buf[DATE_BUFSIZ]; LOGERR("userstats != eos '%s' discarded: %s/%"PRId64"/%s", - tv_to_buf(&createdate, date_buf, DATE_BUFSIZ), + tv_to_buf(&eosdate, date_buf, DATE_BUFSIZ), DATA_USERSTATS(us_next)->poolinstance, DATA_USERSTATS(us_next)->userid, DATA_USERSTATS(us_next)->workername); @@ -4722,7 +4891,7 @@ static void userstats_update_ccl(USERSTATS *row) return; } look.data = (void *)(&userstats); - item = find_in_ktree(userstats_ccl_root, &look, cmp_userstats_workername, ctx); + item = find_in_ktree(userstats_db_root, &look, cmp_userstats_workername, ctx); if (item) { tmp = DATA_USERSTATS(item); if (tv_newer(&(tmp->statsdate), &(userstats.statsdate))) @@ -4734,9 +4903,9 @@ static void userstats_update_ccl(USERSTATS *row) tmp->userid = userstats.userid; STRNCPY(tmp->workername, userstats.workername); copy_tv(&(tmp->statsdate), &(userstats.statsdate)); - userstats_ccl_root = add_to_ktree(userstats_ccl_root, item, + userstats_db_root = add_to_ktree(userstats_db_root, item, cmp_userstats_workername); - k_add_head(userstats_ccl, item); + k_add_head(userstats_db, item); } } @@ -4993,6 +5162,56 @@ matane: return ok; } +static bool reload_from(tv_t *start); + +static bool reload() +{ + char buf[DATE_BUFSIZ+1]; + K_ITEM *ccl; + tv_t start; + bool ok; + + tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); + LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB workinfo", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_auths), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB auths", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); + + ccl = userstats_db->head; + // oldest in ccl + while (ccl) { + if (dbstatus.userstats.tv_sec == 0 || + !tv_newer(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate))) + copy_tv(&(dbstatus.userstats), &(DATA_USERSTATS(ccl)->statsdate)); + ccl = ccl->next; + } + + tv_to_buf(&(dbstatus.userstats), buf, sizeof(buf)); + LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf); + tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf)); + LOGWARNING("%s(): %s newest DB blocks", __func__, buf); + + copy_tv(&start, &(dbstatus.oldest_sharesummary_firstshare_n)); + if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) + copy_tv(&start, &(dbstatus.newest_createdate_workinfo)); + if (!tv_newer(&start, &(dbstatus.newest_createdate_auths))) + copy_tv(&start, &(dbstatus.newest_createdate_auths)); + if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) + copy_tv(&start, &(dbstatus.newest_createdate_poolstats)); + if (!tv_newer(&start, &(dbstatus.userstats))) + copy_tv(&start, &(dbstatus.userstats)); + + ok = reload_from(&start); + + free_ktree(userstats_db_root, NULL); + k_list_transfer_to_head(userstats_db, userstats_free); + + return ok; +} + /* TODO: static PGconn *dbquit(PGconn *conn) { @@ -5086,10 +5305,8 @@ static void clean_up(ckpool_t *ckp) static bool setup_data() { K_TREE_CTX ctx[1]; - K_ITEM look, *found, *ccl; + K_ITEM look, *found; WORKINFO wi; - char buf[DATE_BUFSIZ+1]; - tv_t statsdate; transfer_free = k_new_list("Transfer", sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); @@ -5160,11 +5377,11 @@ static bool setup_data() userstats_store = k_new_store(userstats_free); userstats_eos_store = k_new_store(userstats_free); userstats_summ = k_new_store(userstats_free); - userstats_ccl = k_new_store(userstats_free); + userstats_db = k_new_store(userstats_free); userstats_root = new_ktree(); userstats_statsdate_root = new_ktree(); userstats_free->dsp_func = dsp_userstats; - userstats_ccl_root = new_ktree(); + userstats_db_root = new_ktree(); workerstatus_free = k_new_list("WorkerStatus", sizeof(WORKERSTATUS), ALLOC_WORKERSTATUS, LIMIT_WORKERSTATUS, true); @@ -5174,28 +5391,8 @@ static bool setup_data() if (!getdata()) return false; - tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf)); - LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf); - tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB workinfo", __func__, buf); - tv_to_buf(&(dbstatus.newest_createdate_auths), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB auths", __func__, buf); - tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf)); - LOGWARNING("%s(): %s newest DB poolstats", __func__, buf); - - bzero(&statsdate, sizeof(statsdate)); - ccl = userstats_ccl->head; - // oldest in ccl - while (ccl) { - if (statsdate.tv_sec == 0 || - !tv_newer(&statsdate, &(DATA_USERSTATS(ccl)->statsdate))) - copy_tv(&statsdate, &(DATA_USERSTATS(ccl)->statsdate)); - ccl = ccl->next; - } - tv_to_buf(&statsdate, buf, sizeof(buf)); - LOGWARNING("%s(): %s oldest new DB userstats", __func__, buf); - free_ktree(userstats_ccl_root, NULL); - k_list_transfer_to_head(userstats_ccl, userstats_free); + if (!reload()) + return false; workinfo_current = last_in_ktree(workinfo_height_root, ctx); if (workinfo_current) { @@ -5206,7 +5403,7 @@ static bool setup_data() // Find the first workinfo for this height found = find_after_in_ktree(workinfo_height_root, &look, cmp_workinfo_height, ctx); if (found) - last_bc = &(DATA_WORKINFO(found)->createdate); + copy_tv(&last_bc, &(DATA_WORKINFO(found)->createdate)); // No longer needed workinfo_height_root = free_ktree(workinfo_height_root, NULL); } @@ -5214,7 +5411,8 @@ static bool setup_data() return true; } -static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, + __maybe_unused tv_t *notcd) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5254,7 +5452,8 @@ static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, c } static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet) + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd) { K_ITEM *i_username, *i_passwordhash, *u_item; char reply[1024] = ""; @@ -5290,7 +5489,8 @@ static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ return strdup("ok."); } -static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, + char *inet, tv_t *cd, bool igndup) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5302,8 +5502,7 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char * K_ITEM *i_poolinstance, *i_elapsed, *i_users, *i_workers; K_ITEM *i_hashrate, *i_hashrate5m, *i_hashrate1hr, *i_hashrate24hr; - K_ITEM *i_createdate, look, *ps; - tv_t createdate; + K_ITEM look, *ps; POOLSTATS row; bool ok = false; @@ -5349,11 +5548,7 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char * if (!ps) store = true; else { - i_createdate = require_name("createdate", 1, NULL, reply, siz); - if (!i_createdate) - return strdup(reply); - TXT_TO_CTV("createdate", DATA_TRANSFER(i_createdate)->data, createdate); - if (tvdiff(&createdate, &(DATA_POOLSTATS(ps)->createdate)) > STATS_PER) + if (tvdiff(cd, &(DATA_POOLSTATS(ps)->createdate)) > STATS_PER) store = true; else store = false; @@ -5368,7 +5563,7 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char * DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - now, by, code, inet); + by, code, inet, cd, igndup); PQfinish(conn); if (!ok) { @@ -5380,7 +5575,23 @@ static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char * return strdup(reply); } -static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_poolstats(char *cmd, char *id, __maybe_unused tv_t *notnow, + char *by, char *code, char *inet, tv_t *cd) +{ + bool igndup = false; + + if (reloading) { + if (tv_equal(cd, &(dbstatus.newest_createdate_blocks))) + igndup = true; + else if (tv_newer(cd, &(dbstatus.newest_createdate_blocks))) + return NULL; + } + + return cmd_poolstats_do(cmd, id, by, code, inet, cd, igndup); +} + +static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, + char *by, char *code, char *inet, tv_t *cd) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5446,7 +5657,7 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char * DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - idle, eos, now, by, code, inet); + idle, eos, by, code, inet, cd); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -5457,7 +5668,8 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char * return strdup(reply); } -static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, + __maybe_unused tv_t *cd) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5532,7 +5744,8 @@ foil: } static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet) + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd) { K_ITEM *i_username, look, *u_item, *p_item; K_TREE_CTX ctx[1]; @@ -5586,7 +5799,8 @@ static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe } static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet) + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd) { K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item, *ws_item; K_TREE_CTX w_ctx[1], us_ctx[1]; @@ -5722,7 +5936,8 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ } static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet) + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd) { K_TREE *userstats_workername_root = new_ktree(); K_ITEM *us_item, *usw_item, *tmp_item, *u_item; @@ -5825,7 +6040,8 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe return buf; } -static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, + char *by, char *code, char *inet, tv_t *cd) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5840,6 +6056,14 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, K_ITEM *i_prevhash, *i_coinbase1, *i_coinbase2, *i_version, *i_bits; K_ITEM *i_ntime, *i_reward; int64_t workinfoid; + bool igndup = false; + + if (reloading) { + if (tv_equal(cd, &(dbstatus.newest_createdate_workinfo))) + igndup = true; + else if (tv_newer(cd, &(dbstatus.newest_createdate_workinfo))) + return NULL; + } i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) @@ -5897,7 +6121,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, DATA_TRANSFER(i_bits)->data, DATA_TRANSFER(i_ntime)->data, DATA_TRANSFER(i_reward)->data, - now, by, code, inet); + by, code, inet, cd, igndup); PQfinish(conn); if (workinfoid == -1) { @@ -5912,6 +6136,12 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, K_ITEM *i_nonce2, *i_nonce, *i_diff, *i_sdiff, *i_secondaryuserid; bool ok; + // This just excludes the shares we certainly don't need + if (reloading) { + if (tv_newer(cd, &(dbstatus.oldest_sharesummary_firstshare_n))) + return NULL; + } + i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); @@ -5962,7 +6192,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, DATA_TRANSFER(i_diff)->data, DATA_TRANSFER(i_sdiff)->data, DATA_TRANSFER(i_secondaryuserid)->data, - now, by, code, inet); + by, code, inet, cd); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -5976,6 +6206,12 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, K_ITEM *i_error, *i_secondaryuserid; bool ok; + // This just excludes the shareerrors we certainly don't need + if (reloading) { + if (tv_newer(cd, &(dbstatus.oldest_sharesummary_firstshare_n))) + return NULL; + } + i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); @@ -6011,7 +6247,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, DATA_TRANSFER(i_errn)->data, DATA_TRANSFER(i_error)->data, DATA_TRANSFER(i_secondaryuserid)->data, - now, by, code, inet); + by, code, inet, cd); if (!ok) { LOGERR("%s.failed.DATA", id); return strdup("failed.DATA"); @@ -6023,6 +6259,11 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, K_ITEM *i_workinfoid, *i_poolinstance; bool ok; + if (reloading) { + if (tv_newer(cd, &(dbstatus.oldest_sharesummary_firstshare_n))) + return NULL; + } + i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); @@ -6033,7 +6274,7 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, ok = workinfo_age(NULL, DATA_TRANSFER(i_workinfoid)->data, DATA_TRANSFER(i_poolinstance)->data, - now, by, code, inet); + by, code, inet, cd); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -6053,7 +6294,8 @@ static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, } // TODO: the confirm update: identify block changes from workinfo height? -static char *cmd_blocks(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet, + tv_t *cd, bool igndup) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6125,7 +6367,7 @@ static char *cmd_blocks(char *cmd, char *id, tv_t *now, char *by, char *code, ch DATA_TRANSFER(i_nonce2)->data, DATA_TRANSFER(i_nonce)->data, DATA_TRANSFER(i_reward)->data, - now, by, code, inet); + by, code, inet, cd, igndup); break; case BLOCKS_CONFIRM: msg = "confirmed"; @@ -6135,7 +6377,7 @@ static char *cmd_blocks(char *cmd, char *id, tv_t *now, char *by, char *code, ch DATA_TRANSFER(i_confirmed)->data, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, - now, by, code, inet); + by, code, inet, cd, igndup); break; default: LOGERR("%s.failed.invalid conf='%s'", @@ -6155,7 +6397,23 @@ static char *cmd_blocks(char *cmd, char *id, tv_t *now, char *by, char *code, ch return strdup(reply); } -static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet) +static char *cmd_blocks(char *cmd, char *id, __maybe_unused tv_t *notnow, + char *by, char *code, char *inet, tv_t *cd) +{ + bool igndup = false; + + if (reloading) { + if (tv_equal(cd, &(dbstatus.newest_createdate_blocks))) + igndup = true; + else if (tv_newer(cd, &(dbstatus.newest_createdate_blocks))) + return NULL; + } + + return cmd_blocks_do(cmd, id, by, code, inet, cd, igndup); +} + +static char *cmd_auth_do(char *cmd, char *id, __maybe_unused tv_t *now, char *by, + char *code, char *inet, tv_t *cd, bool igndup) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6197,7 +6455,7 @@ static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, char *code, char DATA_TRANSFER(i_clientid)->data, DATA_TRANSFER(i_enonce1)->data, DATA_TRANSFER(i_useragent)->data, - now, by, code, inet); + by, code, inet, cd, igndup); PQfinish(conn); if (!secuserid) { @@ -6210,8 +6468,24 @@ static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, char *code, char return strdup(reply); } +static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, + char *code, char *inet, tv_t *cd) +{ + bool igndup = false; + + if (reloading) { + if (tv_equal(cd, &(dbstatus.newest_createdate_auths))) + igndup = true; + else if (tv_newer(cd, &(dbstatus.newest_createdate_auths))) + return NULL; + } + + return cmd_auth_do(cmd, id, now, by, code, inet, cd, igndup); +} + static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet) + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd) { K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look; double u_hashrate5m, u_hashrate1hr; @@ -6228,8 +6502,8 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe APPEND_REALLOC_INIT(buf, off, len); APPEND_REALLOC(buf, off, len, "ok."); - if (last_bc) { - tvs_to_buf(last_bc, reply, sizeof(reply)); + if (last_bc.tv_sec) { + tvs_to_buf(&last_bc, reply, sizeof(reply)); snprintf(tmp, sizeof(tmp), "lastbc=%s%c", reply, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); } else { @@ -6347,7 +6621,7 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe static char *cmd_dsp(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, - __maybe_unused char *inet) + __maybe_unused char *inet, __maybe_unused tv_t *notcd) { __maybe_unused K_ITEM *i_file; __maybe_unused char reply[1024] = ""; @@ -6376,7 +6650,7 @@ static char *cmd_dsp(char *cmd, char *id, __maybe_unused tv_t *now, static char *cmd_stats(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, - __maybe_unused char *inet) + __maybe_unused char *inet, __maybe_unused tv_t *notcd) { char tmp[1024], *buf; size_t len, off; @@ -6463,33 +6737,35 @@ static struct CMDS { enum cmd_values cmd_val; char *cmd_str; bool noid; // doesn't require an id - char *(*func)(char *, char *, tv_t *, char *, char *, char *); + bool createdate; // requires a createdate + char *(*func)(char *, char *, tv_t *, char *, char *, char *, tv_t *); char *access; } cmds[] = { - { CMD_SHUTDOWN, "shutdown", true, NULL, ACCESS_SYSTEM }, - { CMD_PING, "ping", true, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, - { CMD_SHARELOG, STR_WORKINFO, false, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_SHARES, false, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_SHAREERRORS, false, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_AGEWORKINFO, false, cmd_sharelog, ACCESS_POOL }, - { CMD_AUTH, "authorise", false, cmd_auth, ACCESS_POOL }, - { CMD_ADDUSER, "adduser", false, cmd_adduser, ACCESS_WEB }, - { CMD_CHKPASS, "chkpass", false, cmd_chkpass, ACCESS_WEB }, - { CMD_POOLSTAT, "poolstats", false, cmd_poolstats, ACCESS_POOL }, - { CMD_USERSTAT, "userstats", false, cmd_userstats, ACCESS_POOL }, - { CMD_BLOCK, "block", false, cmd_blocks, ACCESS_POOL }, - { CMD_NEWID, "newid", false, cmd_newid, ACCESS_SYSTEM }, - { CMD_PAYMENTS, "payments", false, cmd_payments, ACCESS_WEB }, - { CMD_WORKERS, "workers", false, cmd_workers, ACCESS_WEB }, - { CMD_ALLUSERS, "allusers", false, cmd_allusers, ACCESS_WEB }, - { CMD_HOMEPAGE, "homepage", false, cmd_homepage, ACCESS_WEB }, - { CMD_DSP, "dsp", false, cmd_dsp, ACCESS_SYSTEM }, - { CMD_STATS, "stats", true, cmd_stats, ACCESS_SYSTEM }, - { CMD_END, NULL, false, NULL, NULL } + { CMD_SHUTDOWN, "shutdown", true, false, NULL, ACCESS_SYSTEM }, + { CMD_PING, "ping", true, false, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, + { CMD_SHARELOG, STR_WORKINFO, false, true, cmd_sharelog, ACCESS_POOL }, + { CMD_SHARELOG, STR_SHARES, false, true, cmd_sharelog, ACCESS_POOL }, + { CMD_SHARELOG, STR_SHAREERRORS, false, true, cmd_sharelog, ACCESS_POOL }, + { CMD_SHARELOG, STR_AGEWORKINFO, false, true, cmd_sharelog, ACCESS_POOL }, + { CMD_AUTH, "authorise", false, true, cmd_auth, ACCESS_POOL }, + { CMD_ADDUSER, "adduser", false, false, cmd_adduser, ACCESS_WEB }, + { CMD_CHKPASS, "chkpass", false, false, cmd_chkpass, ACCESS_WEB }, + { CMD_POOLSTAT, "poolstats", false, true, cmd_poolstats, ACCESS_POOL }, + { CMD_USERSTAT, "userstats", false, true, cmd_userstats, ACCESS_POOL }, + { CMD_BLOCK, "block", false, true, cmd_blocks, ACCESS_POOL }, + { CMD_NEWID, "newid", false, false, cmd_newid, ACCESS_SYSTEM }, + { CMD_PAYMENTS, "payments", false, false, cmd_payments, ACCESS_WEB }, + { CMD_WORKERS, "workers", false, false, cmd_workers, ACCESS_WEB }, + { CMD_ALLUSERS, "allusers", false, false, cmd_allusers, ACCESS_WEB }, + { CMD_HOMEPAGE, "homepage", false, false, cmd_homepage, ACCESS_WEB }, + { CMD_DSP, "dsp", false, false, cmd_dsp, ACCESS_SYSTEM }, + { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM }, + { CMD_END, NULL, false, false, NULL, NULL } }; -static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id) +static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id, tv_t *cd) { + char reply[1024] = ""; K_TREE_CTX ctx[1]; K_ITEM *item; char *cmdptr, *idptr, *data, *next, *eq; @@ -6497,6 +6773,8 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id *which_cmds = CMD_UNSET; *cmd = *id = '\0'; + cd->tv_sec = 0; + cd->tv_usec = 0; cmdptr = strdup(buf); idptr = strchr(cmdptr, '.'); @@ -6530,7 +6808,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id } STRNCPYSIZ(id, cmdptr, ID_SIZ); - LOGERR("Listener received invalid message: '%s'", buf); + LOGERR("Listener received invalid (noid) message: '%s'", buf); free(cmdptr); return CMD_REPLY; } @@ -6673,6 +6951,19 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id } K_WUNLOCK(transfer_free); } + if (cmds[*which_cmds].createdate) { + item = require_name("createdate", 10, NULL, reply, sizeof(reply)); + if (!item) + return CMD_REPLY; + + txt_to_ctv("createdate", DATA_TRANSFER(item)->data, cd, sizeof(*cd)); + if (cd->tv_sec == 0) { + LOGERR("%s(): failed, %s has invalid createdate '%s'", + __func__, cmdptr, DATA_TRANSFER(item)->data); + free(cmdptr); + return CMD_REPLY; + } + } free(cmdptr); return cmds[*which_cmds].cmd_val; } @@ -6683,7 +6974,7 @@ static void summarise_poolstats() } // TODO: daily -// TODO: consider limiting how much/long this processes each time +// TODO: consider limiting how much/how long this processes each time static void summarise_userstats() { K_TREE_CTX ctx[1], ctx2[1]; @@ -6791,6 +7082,7 @@ static void summarise_userstats() userstats->statsdate.tv_sec -= 1; userstats->statsdate.tv_usec = 999999; + // This is simply when it was written, so 'now' is fine SIMPLEDATEDEFAULT(userstats, &now); if (!conn) @@ -6857,6 +7149,166 @@ static void *summariser(__maybe_unused void *arg) return NULL; } +// TODO: zzzz +// auth ?!? what about invalid ?!? +// At the end we need to wait for catch up of the ckpool data to our last read item +// BUT this would require replaying all the replies ... +// NEED to add to ckpool a request for the next message before opening the socket +// to get at start of processing, and stop the reload when this is found + +static bool reload_line(uint64_t count, char *buf) +{ + char cmd[CMD_SIZ+1], id[ID_SIZ+1]; + enum cmd_values cmdnum; + char *end, *ans; + int which_cmds; + K_ITEM *item; + tv_t now, cd; + bool ok = false; + + // Once we've read the message + setnow(&now); + if (buf) { + end = buf + strlen(buf) - 1; + // strip trailing \n and \r + while (end >= buf && (*end == '\n' || *end == '\r')) + *(end--) = '\0'; + } + if (!buf || !*buf) { + if (!buf) + LOGERR("%s() NULL message line %"PRIu64, __func__, count); + else + LOGERR("%s() Empty message line %"PRIu64, __func__, count); + + goto jilted; + } else { + LOGFILE(buf); + cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); + switch (cmdnum) { + // Ignore + case CMD_REPLY: + // Shouldn't be there, but ignore also + case CMD_SHUTDOWN: + case CMD_PING: + // Non pool commands, shouldn't be there, ignore + case CMD_ADDUSER: + case CMD_CHKPASS: + case CMD_NEWID: + case CMD_PAYMENTS: + case CMD_WORKERS: + case CMD_ALLUSERS: + case CMD_HOMEPAGE: + case CMD_DSP: + case CMD_STATS: + break; + default: + ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", + (char *)__func__, + (char *)"127.0.0.1", &cd); + if (ans) + free(ans); + break; + } + } + + char ch = status_chars[count & 0x3]; + putchar(ch); + putchar('\r'); + fflush(stdout); + + K_WLOCK(transfer_free); + transfer_root = free_ktree(transfer_root, NULL); + item = transfer_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + k_list_transfer_to_head(transfer_store, transfer_free); + K_WUNLOCK(transfer_free); + + ok = true; +jilted: + return ok; +} + +// Log files are every ... +#define ROLL_S 3600 +// 10Mb for now +#define MAX_READ (10 * 1024 * 1024) + +// TODO: be able to specify a start time on the command line +// TODO: handle missing files? +// TODO: handle a new database with no data or some missing data +static bool reload_from(tv_t *start) +{ + char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; + bool ok = true; + char *filename; + char data[MAX_READ]; + uint64_t count, total; + tv_t now; + FILE *fp; + + reloading = true; + + tv_to_buf(start, buf, sizeof(buf)); + LOGWARNING("%s(): from %s", __func__, buf); + + filename = rotating_filename(restorefrom, start->tv_sec); + fp = fopen(filename, "r"); + if (!fp) + quithere(1, "Failed to open '%s'", filename); + + setnow(&now); + tvs_to_buf(&now, run, sizeof(run)); + snprintf(data, sizeof(data), "reload.%s.0", run); + LOGFILE(data); + + total = 0; + while (ok) { + LOGWARNING("%s(): processing %s", __func__, filename); + count = 0; + + while (ok && fgets_unlocked(data, MAX_READ, fp)) + ok = reload_line(++count, data); + + if (ok) { + if (ferror(fp)) { + int err = errno; + quithere(1, "Read failed on %s (%d) '%s'", + filename, err, strerror(err)); + } + + LOGWARNING("%s(): read %"PRIu64" lines from %s", + count, filename); + + total += count; + } + + fclose(fp); + + if (ok) { + free(filename); + start->tv_sec += ROLL_S; + filename = rotating_filename(restorefrom, start->tv_sec); + fp = fopen(filename, "r"); + if (!fp) { + LOGWARNING("%s(): completed total %"PRIu64" lines", __func__, total); + break; + } + } + } + free(filename); + + snprintf(data, sizeof(data), "reload.%s.%"PRIu64, run, total); + LOGFILE(data); + + reloading = false; + + return ok; +} + // TODO: equivalent of api_allow static void *listener(void *arg) { @@ -6873,7 +7325,7 @@ static void *listener(void *arg) uint64_t counter = 0; K_ITEM *item; size_t siz; - tv_t now; + tv_t now, cd; bool dup; create_pthread(&summzer, summariser, NULL); @@ -6942,7 +7394,7 @@ static void *listener(void *arg) } else { dup = false; LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id); + cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); last_cmd = cmdnum; } switch (cmdnum) { @@ -6972,10 +7424,9 @@ static void *listener(void *arg) if (dup) send_unix_msg(sockd, last_reply); else { - // TODO: optionally get by/code/inet from transfer here instead? ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", (char *)__func__, - (char *)"127.0.0.1"); + (char *)"127.0.0.1", &cd); siz = strlen(ans) + strlen(id) + 32; rep = malloc(siz); snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans);