Browse Source

ckdb - restart documentation and data store variables

master
kanoi 10 years ago
parent
commit
593d8681fb
  1. 130
      src/ckdb.c

130
src/ckdb.c

@ -70,6 +70,74 @@ static char *db_pass;
// Currently hard coded at 4 characters // Currently hard coded at 4 characters
static char *status_chars = "|/-\\"; static char *status_chars = "|/-\\";
static char *restorefrom;
/* Restart data needed
* -------------------
* After the DB load, load "ckpool's ckdb logfile" (CCL), and all
* later CCLs, that contains the oldest date of all of the following:
* RAM shares: oldest DB sharesummary firstshare where complete='n'
* All shares before this have been summarised to the DB with
* complete='a' (or 'y') and were deleted from RAM
* RAM shareerrors: as above
* DB+RAM sharesummary: created from shares, so as above
* Some shares after this may have been summarised to other
* sharesummary complete='n', but for any such sharesummary
* we reset it back to the first share found and it will
* correct itself during the CCL reload
* Verify that all DB sharesummaries with complete='n' have
* done this
* DB+RAM workinfo: start from newest DB createdate workinfo
* DB+RAM auths: start from newest DB createdate auths
* DB+RAM poolstats: newest createdate poolstats
* TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting
* requirements
* DB+RAM userstats: for each pool/user/worker it would be the start
* of the next time band after the last DB createdate,
* since all previous data was summarised and deleted -
* use the oldest of these for all pools/users/workers
* TODO: multiple pools is not yet handled by ckdb
* TODO: handle a pool restart with a different instance name
* since this would always make the userstats reload point
* just after the instance name was changed - however
* this can be handled for now by simply ignoring the
* poolinstance
* DB+RAM workers: created by auths so auths will resolve it
* DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any)
* will be after the last workinfo
* DB+RAM accountbalance (TODO): resolved by shares/workinfo/blocks
* RAM workerstats: last_auth, last_share, last_stats all handled by
* DB load up to whatever the CCL restart point is, and then
* corrected with the CCL reload
* last_idle will be the last idle userstats in the CCL load or 0
* Code currently doesn't use last_idle, so for now this is OK
*
* TODO: handle the ckpool messages/CCL overlap in the start of the
* ckpool messages that will arrive after the CCL load finishes
*
* idcontrol: only userid reuse is critical and the user is added
* immeditately to the DB before replying to the add message
*
* Tables that are/will be written straight to the DB, so ar OK:
* users, useraccounts, paymentaddresses, payments,
* accountadjustment, optioncontrol, miningpayouts,
* eventlog
*/
typedef struct loadstatus {
tv_t oldest_sharesummary_firstshare_n;
tv_t newest_createdate_workinfo;
tv_t newest_createdate_auth;
tv_t newest_createdate_poolstats;
} LOADSTATUS;
static LOADSTATUS dbstatus;
/* Temporary while doing restart - it (of course) contains the fields
* required to track the newest userstats per user/worker
*/
static K_STORE *userstats_ccl;
// size limit on the command string // size limit on the command string
#define CMD_SIZ 31 #define CMD_SIZ 31
#define ID_SIZ 31 #define ID_SIZ 31
@ -807,13 +875,14 @@ typedef struct sharesummary {
int64_t countlastupdate; // non-DB field int64_t countlastupdate; // non-DB field
bool inserted; // non-DB field bool inserted; // non-DB field
bool saveaged; // non-DB field bool saveaged; // non-DB field
bool reset; // non-DB field
tv_t firstshare; tv_t firstshare;
tv_t lastshare; tv_t lastshare;
char complete[TXT_FLAG+1]; char complete[TXT_FLAG+1];
MODIFYDATECONTROLFIELDS; MODIFYDATECONTROLFIELDS;
} SHARESUMMARY; } SHARESUMMARY;
/* After many shares added, we need to update the DB record /* After this many shares added, we need to update the DB record
The DB record is added with the 1st share */ The DB record is added with the 1st share */
#define SHARESUMMARY_UPDATE_EVERY 10 #define SHARESUMMARY_UPDATE_EVERY 10
@ -1030,7 +1099,7 @@ static K_STORE *userstats_summ;
/* summarisation of the userstats after this many days are done /* summarisation of the userstats after this many days are done
* at the day level and the above stats are deleted from the db * at the day level and the above stats are deleted from the db
* Obvious WARNING - the larger this is, the more stats in the DB * Obvious WARNING - the larger this is, the more stats in the DB
* This is summary level '2' * This is summary level '2' (TODO)
*/ */
#define USERSTATS_DB_D 7 #define USERSTATS_DB_D 7
@ -1042,10 +1111,10 @@ static K_STORE *userstats_summ;
typedef struct workerstatus { typedef struct workerstatus {
int64_t userid; int64_t userid;
char workername[TXT_BIG+1]; char workername[TXT_BIG+1];
tv_t auth; tv_t last_auth;
tv_t share; tv_t last_share;
tv_t stats; tv_t last_stats;
tv_t idle; tv_t last_idle;
} WORKERSTATUS; } WORKERSTATUS;
#define ALLOC_WORKERSTATUS 1000 #define ALLOC_WORKERSTATUS 1000
@ -1661,34 +1730,34 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta
if (auths) { if (auths) {
item = find_create_workerstatus(auths->userid, auths->workername); item = find_create_workerstatus(auths->userid, auths->workername);
row = DATA_WORKERSTATUS(item); row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->auth), &(auths->createdate))) if (tv_newer(&(row->last_auth), &(auths->createdate)))
memcpy(&(row->auth), &(auths->createdate), sizeof(row->auth)); memcpy(&(row->last_auth), &(auths->createdate), sizeof(row->last_auth));
} }
if (shares) { if (shares) {
item = find_create_workerstatus(shares->userid, shares->workername); item = find_create_workerstatus(shares->userid, shares->workername);
row = DATA_WORKERSTATUS(item); row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->share), &(shares->createdate))) if (tv_newer(&(row->last_share), &(shares->createdate)))
memcpy(&(row->share), &(shares->createdate), sizeof(row->share)); memcpy(&(row->last_share), &(shares->createdate), sizeof(row->last_share));
} }
if (userstats) { if (userstats) {
item = find_create_workerstatus(userstats->userid, userstats->workername); item = find_create_workerstatus(userstats->userid, userstats->workername);
row = DATA_WORKERSTATUS(item); row = DATA_WORKERSTATUS(item);
if (userstats->idle) { if (userstats->idle) {
if (tv_newer(&(row->idle), &(userstats->statsdate))) if (tv_newer(&(row->last_idle), &(userstats->statsdate)))
memcpy(&(row->idle), &(userstats->statsdate), sizeof(row->idle)); memcpy(&(row->last_idle), &(userstats->statsdate), sizeof(row->last_idle));
} else { } else {
if (tv_newer(&(row->stats), &(userstats->statsdate))) if (tv_newer(&(row->last_stats), &(userstats->statsdate)))
memcpy(&(row->stats), &(userstats->statsdate), sizeof(row->idle)); memcpy(&(row->last_stats), &(userstats->statsdate), sizeof(row->last_stats));
} }
} }
if (sharesummary) { if (sharesummary) {
item = find_create_workerstatus(sharesummary->userid, sharesummary->workername); item = find_create_workerstatus(sharesummary->userid, sharesummary->workername);
row = DATA_WORKERSTATUS(item); row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->share), &(sharesummary->lastshare))) if (tv_newer(&(row->last_share), &(sharesummary->lastshare)))
memcpy(&(row->share), &(sharesummary->lastshare), sizeof(row->share)); memcpy(&(row->last_share), &(sharesummary->lastshare), sizeof(row->last_share));
} }
} }
@ -4965,6 +5034,7 @@ static bool setup_data()
userstats_store = k_new_store(userstats_list); userstats_store = k_new_store(userstats_list);
userstats_eos_store = k_new_store(userstats_list); userstats_eos_store = k_new_store(userstats_list);
userstats_summ = k_new_store(userstats_list); userstats_summ = k_new_store(userstats_list);
userstats_ccl = k_new_store(userstats_list);
userstats_root = new_ktree(); userstats_root = new_ktree();
userstats_list->dsp_func = dsp_userstats; userstats_list->dsp_func = dsp_userstats;
@ -5435,7 +5505,7 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_
ws_item = find_workerstatus(DATA_USERS(u_item)->userid, ws_item = find_workerstatus(DATA_USERS(u_item)->userid,
DATA_WORKERS(w_item)->workername); DATA_WORKERS(w_item)->workername);
if (ws_item) if (ws_item)
w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->share.tv_sec; w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->last_share.tv_sec;
// find last stored userid record // find last stored userid record
userstats.userid = DATA_USERS(u_item)->userid; userstats.userid = DATA_USERS(u_item)->userid;
@ -6558,7 +6628,7 @@ static void *listener(void *arg)
char *last_msg = NULL, *last_reply = NULL; char *last_msg = NULL, *last_reply = NULL;
char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1];
// Minimise the size in case of garbage // Minimise the size in case of garbage
char duptype[16+1]; char duptype[CMD_SIZ+1];
enum cmd_values cmdnum, last_cmd = 9001; enum cmd_values cmdnum, last_cmd = 9001;
int sockd, which_cmds; int sockd, which_cmds;
pthread_t summzer; pthread_t summzer;
@ -6712,6 +6782,22 @@ static void *listener(void *arg)
return NULL; return NULL;
} }
static void check_restore_dir()
{
struct stat statbuf;
if (!restorefrom)
quit(1, "ERR: '-r dir' required to specify the ckpool hourly ckdb log dir");
if (!(*restorefrom))
quit(1, "ERR: '-r dir' can't be empty");
trail_slash(&restorefrom);
if (stat(restorefrom, &statbuf))
quit(1, "ERR: -r '%s' directory doesn't exist", restorefrom);
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
struct sigaction handler; struct sigaction handler;
@ -6727,7 +6813,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt(argc, argv, "c:kl:n:p:s:u:")) != -1) { while ((c = getopt(argc, argv, "c:kl:n:p:r:s:u:")) != -1) {
switch(c) { switch(c) {
case 'c': case 'c':
ckp.config = optarg; ckp.config = optarg;
@ -6745,6 +6831,9 @@ int main(int argc, char **argv)
LOG_EMERG, LOG_DEBUG, ckp.loglevel); LOG_EMERG, LOG_DEBUG, ckp.loglevel);
} }
break; break;
case 'r':
restorefrom = strdup(optarg);
break;
case 's': case 's':
ckp.socket_dir = strdup(optarg); ckp.socket_dir = strdup(optarg);
break; break;
@ -6764,6 +6853,9 @@ int main(int argc, char **argv)
break; break;
} }
} }
check_restore_dir();
// if (!db_pass) // if (!db_pass)
// zzz // zzz
if (!db_user) if (!db_user)

Loading…
Cancel
Save