diff --git a/src/ckdb.c b/src/ckdb.c index 854aa2ff..27ba98f6 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -113,8 +113,8 @@ static char *restorefrom; * message was written to disk and found in the CCL before it was * processed in the message queue. * This can be seen if the message displayed in the fatal error IS in - * ckdb's message logfile and means the messages after it in the - * logfile have already been processed. + * ckdb's message logfile and means the messages after it in ckdb's + * message logfile have already been processed. * Again, a ckdb restart will resolve this * In both the above (very rare) cases, if ckdb was to continue running, * it would break the synchronisation and could cause DB problems, so @@ -123,9 +123,9 @@ static char *restorefrom; * immediately and is not affected by ckpool messages until we * TODO: allow bitcoin addresses - this will also need to be handled * while filling the queue during reload, once we allow BTC addresses - * During the reload we can use the userstats createdate as 'now' for - * the userstats summarisation process to allow the summarisation to - * run during the reload + * During the reload, when checking the timeframe for summarisation, we + * use the current last userstats createdate as 'now' to avoid touching a + * timeframe where data could still be waiting to be loaded */ /* Reload data needed @@ -626,6 +626,8 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; // Different input data handling static bool reloading = false; +// DB load is complete +static bool db_load_complete = false; // Data load is complete static bool startup_complete = false; // Tell the summarizer to die @@ -1834,7 +1836,7 @@ static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b); static double cmp_sharesummary(K_ITEM *a, K_ITEM *b); /* All data is loaded, now update workerstatus last_share, last_idle, last_stats - * shares are all part of a sharesummary so no need to search shares + * Since shares are all part of a sharesummary, there's no need to search shares */ static void workerstatus_ready() { @@ -5564,6 +5566,8 @@ static bool setup_data() if (!getdata()) return false; + db_load_complete = true; + if (!reload()) return false; @@ -7153,11 +7157,11 @@ static void summarise_poolstats() static void summarise_userstats() { K_TREE_CTX ctx[1], ctx2[1]; - K_ITEM *first, *new, *next, *tmp; + K_ITEM *first, *last, *new, *next, *tmp; USERSTATS *userstats; double statrange, factor; bool locked, upgrade; - tv_t now, when; + tv_t now, process, when; PGconn *conn = NULL; int count; char error[1024]; @@ -7171,6 +7175,18 @@ static void summarise_userstats() upgrade = false; locked = true; K_ILOCK(userstats_free); + + if (!reloading) + copy_tv(&process, &now); + else { + // During reload, base the check date on the newest statsdate + last = last_in_ktree(userstats_statsdate_root, ctx); + if (!last) + break; + + copy_tv(&process, &DATA_USERSTATS(last)->statsdate); + } + first = first_in_ktree(userstats_statsdate_root, ctx); // Oldest non DB stat // TODO: make the index start with summarylevel? so can find faster @@ -7180,7 +7196,7 @@ static void summarise_userstats() if (!first) break; - statrange = tvdiff(&now, &(DATA_USERSTATS(first)->statsdate)); + statrange = tvdiff(&process, &(DATA_USERSTATS(first)->statsdate)); // Is there data ready for summarising? if (statrange <= USERSTATS_AGE) break; @@ -7192,7 +7208,7 @@ static void summarise_userstats() when.tv_usec = 0; // Is the whole timerange up to before 'when' ready for summarising? - statrange = tvdiff(&now, &when); + statrange = tvdiff(&process, &when); if (statrange < USERSTATS_AGE) break; @@ -7239,6 +7255,14 @@ static void summarise_userstats() next = tmp; } + // Can temporarily release the lock since all our data is now not part of the lock + if (upgrade) + K_WUNLOCK(userstats_free); + else + K_IUNLOCK(userstats_free); + upgrade = false; + locked = false; + if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0) userstats->idle = false; else @@ -7265,33 +7289,37 @@ static void summarise_userstats() if (!conn) conn = dbconnect(); - // TODO: Consider releasing the lock for the DB insert? if (!userstats_add_db(conn, userstats)) { /* This should only happen if a restart finds data that wasn't found during the reload but is in the same timeframe as DB data - i.e. it shouldn't happen, but keep the summary */ + i.e. it shouldn't happen, but keep the summary anyway */ when.tv_sec -= USERSTATS_DB_S; tv_to_buf(&when, tvbuf1, sizeof(tvbuf1)); tv_to_buf(&(userstats->statsdate), tvbuf2, sizeof(tvbuf2)); snprintf(error, sizeof(error), - "Userid %"PRId64" %d userstats records discarded " - "from %s to %s", - userstats->userid, count, tvbuf1, tvbuf2); + "Userid %"PRId64" Worker %s, %d userstats record%s " + "discarded from %s to %s", + userstats->userid, + userstats->workername, + count, (count == 1 ? "" : "s"), + tvbuf1, tvbuf2); } + // The flags are not needed + //upgrade = true; + //locked = true; + K_WLOCK(userstats_free); k_list_transfer_to_tail(userstats_summ, userstats_free); k_add_head(userstats_store, new); userstats_root = add_to_ktree(userstats_root, new, cmp_userstats); userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new, cmp_userstats_statsdate); - if (upgrade) - K_WUNLOCK(userstats_free); - else - K_IUNLOCK(userstats_free); - upgrade = false; - locked = false; + K_WUNLOCK(userstats_free); + //locked = false; + //upgrade = false; + if (error[0]) LOGERR(error); } @@ -7311,7 +7339,7 @@ static void *summariser(__maybe_unused void *arg) { pthread_detach(pthread_self()); - while (!summarizer_die && !startup_complete) + while (!summarizer_die && !db_load_complete) cksleep_ms(42); while (!summarizer_die) { @@ -7327,14 +7355,7 @@ static void *summariser(__maybe_unused void *arg) return NULL; } -// TODO: zzzz -// auth ?!? what about invalid ?!? -// At the end we need to wait for catch up of the ckpool data to our last read item -// BUT this would require replaying all the replies ... -// NEED to add to ckpool a request for the next message before opening the socket -// to get at start of processing, and stop the reload when this is found - -static bool reload_line(uint64_t count, char *buf) +static bool reload_line(char *filename, uint64_t count, char *buf) { char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; @@ -7378,14 +7399,25 @@ static bool reload_line(uint64_t count, char *buf) case CMD_HOMEPAGE: case CMD_DSP: case CMD_STATS: + LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", + __func__, count, cmd); break; - default: + case CMD_SHARELOG: + case CMD_AUTH: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_BLOCK: ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", (char *)__func__, (char *)"127.0.0.1", &cd); if (ans) free(ans); break; + default: + // Force this switch to be updated if new cmds are added + quithere(1, "%s line %"PRIu64" '%s' - not handled by reload", + filename, count, cmd); + break; } } @@ -7417,7 +7449,7 @@ jilted: /* To handle a new database with no data: * touch the filename reported in "Failed to open 'filename'" - * when ckdb aborts */ + * when ckdb aborts at the beginning of the reload */ static bool reload_from(tv_t *start) { char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; @@ -7455,7 +7487,7 @@ static bool reload_from(tv_t *start) count = 0; while (ok && fgets_unlocked(data, MAX_READ, fp)) - ok = reload_line(++count, data); + ok = reload_line(filename, ++count, data); if (ok) { if (ferror(fp)) {