Browse Source

ckdb - userstats summarisation while reloading - crash reload on an unhandled command

master
kanoi 10 years ago
parent
commit
3eda6a4618
  1. 98
      src/ckdb.c

98
src/ckdb.c

@ -113,8 +113,8 @@ static char *restorefrom;
* message was written to disk and found in the CCL before it was
* processed in the message queue.
* This can be seen if the message displayed in the fatal error IS in
* ckdb's message logfile and means the messages after it in the
* logfile have already been processed.
* ckdb's message logfile and means the messages after it in ckdb's
* message logfile have already been processed.
* Again, a ckdb restart will resolve this
* In both the above (very rare) cases, if ckdb was to continue running,
* it would break the synchronisation and could cause DB problems, so
@ -123,9 +123,9 @@ static char *restorefrom;
* immediately and is not affected by ckpool messages until we
* TODO: allow bitcoin addresses - this will also need to be handled
* while filling the queue during reload, once we allow BTC addresses
* During the reload we can use the userstats createdate as 'now' for
* the userstats summarisation process to allow the summarisation to
* run during the reload
* During the reload, when checking the timeframe for summarisation, we
* use the current last userstats createdate as 'now' to avoid touching a
* timeframe where data could still be waiting to be loaded
*/
/* Reload data needed
@ -626,6 +626,8 @@ static const tv_t date_begin = { DATE_BEGIN, 0L };
// Different input data handling
static bool reloading = false;
// DB load is complete
static bool db_load_complete = false;
// Data load is complete
static bool startup_complete = false;
// Tell the summarizer to die
@ -1834,7 +1836,7 @@ static double cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
static double cmp_sharesummary(K_ITEM *a, K_ITEM *b);
/* All data is loaded, now update workerstatus last_share, last_idle, last_stats
* shares are all part of a sharesummary so no need to search shares
* Since shares are all part of a sharesummary, there's no need to search shares
*/
static void workerstatus_ready()
{
@ -5564,6 +5566,8 @@ static bool setup_data()
if (!getdata())
return false;
db_load_complete = true;
if (!reload())
return false;
@ -7153,11 +7157,11 @@ static void summarise_poolstats()
static void summarise_userstats()
{
K_TREE_CTX ctx[1], ctx2[1];
K_ITEM *first, *new, *next, *tmp;
K_ITEM *first, *last, *new, *next, *tmp;
USERSTATS *userstats;
double statrange, factor;
bool locked, upgrade;
tv_t now, when;
tv_t now, process, when;
PGconn *conn = NULL;
int count;
char error[1024];
@ -7171,6 +7175,18 @@ static void summarise_userstats()
upgrade = false;
locked = true;
K_ILOCK(userstats_free);
if (!reloading)
copy_tv(&process, &now);
else {
// During reload, base the check date on the newest statsdate
last = last_in_ktree(userstats_statsdate_root, ctx);
if (!last)
break;
copy_tv(&process, &DATA_USERSTATS(last)->statsdate);
}
first = first_in_ktree(userstats_statsdate_root, ctx);
// Oldest non DB stat
// TODO: make the index start with summarylevel? so can find faster
@ -7180,7 +7196,7 @@ static void summarise_userstats()
if (!first)
break;
statrange = tvdiff(&now, &(DATA_USERSTATS(first)->statsdate));
statrange = tvdiff(&process, &(DATA_USERSTATS(first)->statsdate));
// Is there data ready for summarising?
if (statrange <= USERSTATS_AGE)
break;
@ -7192,7 +7208,7 @@ static void summarise_userstats()
when.tv_usec = 0;
// Is the whole timerange up to before 'when' ready for summarising?
statrange = tvdiff(&now, &when);
statrange = tvdiff(&process, &when);
if (statrange < USERSTATS_AGE)
break;
@ -7239,6 +7255,14 @@ static void summarise_userstats()
next = tmp;
}
// Can temporarily release the lock since all our data is now not part of the lock
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
upgrade = false;
locked = false;
if (userstats->hashrate5m > 0.0 || userstats->hashrate1hr > 0.0)
userstats->idle = false;
else
@ -7265,33 +7289,37 @@ static void summarise_userstats()
if (!conn)
conn = dbconnect();
// TODO: Consider releasing the lock for the DB insert?
if (!userstats_add_db(conn, userstats)) {
/* This should only happen if a restart finds data
that wasn't found during the reload but is in
the same timeframe as DB data
i.e. it shouldn't happen, but keep the summary */
i.e. it shouldn't happen, but keep the summary anyway */
when.tv_sec -= USERSTATS_DB_S;
tv_to_buf(&when, tvbuf1, sizeof(tvbuf1));
tv_to_buf(&(userstats->statsdate), tvbuf2, sizeof(tvbuf2));
snprintf(error, sizeof(error),
"Userid %"PRId64" %d userstats records discarded "
"from %s to %s",
userstats->userid, count, tvbuf1, tvbuf2);
"Userid %"PRId64" Worker %s, %d userstats record%s "
"discarded from %s to %s",
userstats->userid,
userstats->workername,
count, (count == 1 ? "" : "s"),
tvbuf1, tvbuf2);
}
// The flags are not needed
//upgrade = true;
//locked = true;
K_WLOCK(userstats_free);
k_list_transfer_to_tail(userstats_summ, userstats_free);
k_add_head(userstats_store, new);
userstats_root = add_to_ktree(userstats_root, new, cmp_userstats);
userstats_statsdate_root = add_to_ktree(userstats_statsdate_root, new,
cmp_userstats_statsdate);
if (upgrade)
K_WUNLOCK(userstats_free);
else
K_IUNLOCK(userstats_free);
upgrade = false;
locked = false;
//locked = false;
//upgrade = false;
if (error[0])
LOGERR(error);
}
@ -7311,7 +7339,7 @@ static void *summariser(__maybe_unused void *arg)
{
pthread_detach(pthread_self());
while (!summarizer_die && !startup_complete)
while (!summarizer_die && !db_load_complete)
cksleep_ms(42);
while (!summarizer_die) {
@ -7327,14 +7355,7 @@ static void *summariser(__maybe_unused void *arg)
return NULL;
}
// TODO: zzzz
// auth ?!? what about invalid ?!?
// At the end we need to wait for catch up of the ckpool data to our last read item
// BUT this would require replaying all the replies ...
// NEED to add to ckpool a request for the next message before opening the socket
// to get at start of processing, and stop the reload when this is found
static bool reload_line(uint64_t count, char *buf)
static bool reload_line(char *filename, uint64_t count, char *buf)
{
char cmd[CMD_SIZ+1], id[ID_SIZ+1];
enum cmd_values cmdnum;
@ -7378,14 +7399,25 @@ static bool reload_line(uint64_t count, char *buf)
case CMD_HOMEPAGE:
case CMD_DSP:
case CMD_STATS:
break;
default:
LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored",
__func__, count, cmd);
break;
case CMD_SHARELOG:
case CMD_AUTH:
case CMD_POOLSTAT:
case CMD_USERSTAT:
case CMD_BLOCK:
ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code",
(char *)__func__,
(char *)"127.0.0.1", &cd);
if (ans)
free(ans);
break;
default:
// Force this switch to be updated if new cmds are added
quithere(1, "%s line %"PRIu64" '%s' - not handled by reload",
filename, count, cmd);
break;
}
}
@ -7417,7 +7449,7 @@ jilted:
/* To handle a new database with no data:
* touch the filename reported in "Failed to open 'filename'"
* when ckdb aborts */
* when ckdb aborts at the beginning of the reload */
static bool reload_from(tv_t *start)
{
char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1];
@ -7455,7 +7487,7 @@ static bool reload_from(tv_t *start)
count = 0;
while (ok && fgets_unlocked(data, MAX_READ, fp))
ok = reload_line(++count, data);
ok = reload_line(filename, ++count, data);
if (ok) {
if (ferror(fp)) {

Loading…
Cancel
Save