From 44a0c12f4c67978384bbd18e9af6f94844c76598 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 17 Apr 2017 13:00:46 +1000 Subject: [PATCH] Load all userstats at startup. --- src/stratifier.c | 165 +++++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 78 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index fe3c6ac8..d7adf939 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -206,7 +206,6 @@ struct user_instance { double dsps10080; tv_t last_share; tv_t last_decay; - tv_t last_update; bool authorised; /* Has this username ever been authorised? */ time_t auth_time; @@ -234,7 +233,6 @@ struct worker_instance { double dsps10080; tv_t last_share; tv_t last_decay; - tv_t last_update; time_t start_time; double best_diff; /* Best share found by this worker */ @@ -5100,56 +5098,76 @@ static void decay_user(user_instance_t *user, double diff, tv_t *now_t) copy_tv(&user->last_decay, now_t); } -/* Enter holding a reference count */ -static void read_userstats(ckpool_t *ckp, user_instance_t *user) +static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user); + +/* Load the statistics of and create all known users at startup */ +static void read_userstats(ckpool_t *ckp, int tvsec_diff) { - int tvsec_diff = 0, ret; - char s[512]; - json_t *val; - FILE *fp; - tv_t now; + struct dirent *dir; + char dnam[512]; + DIR *d; - snprintf(s, 511, "%s/users/%s", ckp->logdir, user->username); - fp = fopen(s, "re"); - if (!fp) { - LOGINFO("User %s does not have a logfile to read", user->username); - return; - } - memset(s, 0, 512); - ret = fread(s, 1, 511, fp); - fclose(fp); - if (ret < 1) { - LOGINFO("Failed to read user %s logfile", user->username); - return; - } - val = json_loads(s, 0, NULL); - if (!val) { - LOGINFO("Failed to json decode user %s logfile: %s", user->username, s); + snprintf(dnam, 511, "%susers", ckp->logdir); + d = opendir(dnam); + if (!d) { + LOGNOTICE("No user directory found"); return; } + while ((dir = readdir(d)) != NULL) { + char *username = basename(dir->d_name), s[512]; + user_instance_t *user; + bool new_user = false; + json_t *val; + FILE *fp; + tv_t now; + int ret; - tv_time(&now); - copy_tv(&user->last_share, &now); - copy_tv(&user->last_decay, &now); - user->dsps1 = dsps_from_key(val, "hashrate1m"); - user->dsps5 = dsps_from_key(val, "hashrate5m"); - user->dsps60 = dsps_from_key(val, "hashrate1hr"); - user->dsps1440 = dsps_from_key(val, "hashrate1d"); - user->dsps10080 = dsps_from_key(val, "hashrate7d"); - json_get_int64(&user->last_update.tv_sec, val, "lastupdate"); - json_get_int64(&user->shares, val, "shares"); - json_get_double(&user->best_diff, val, "bestshare"); - LOGINFO("Successfully read user %s stats %f %f %f %f %f %f", user->username, - user->dsps1, user->dsps5, user->dsps60, user->dsps1440, - user->dsps10080, user->best_diff); - json_decref(val); - if (user->last_update.tv_sec) - tvsec_diff = now.tv_sec - user->last_update.tv_sec - 60; - if (tvsec_diff > 60) { - LOGINFO("Old user stats indicate not logged for %d seconds, decaying stats", - tvsec_diff); - decay_user(user, 0, &now); + if (!strcmp(username, "/") || !strcmp(username, ".") || !strcmp(username, "..")) + continue; + user = get_create_user(ckp->sdata, username, &new_user); + if (unlikely(!new_user)) { + /* All users should be new at this stage */ + LOGWARNING("Duplicate user in read_userstats %s", username); + continue; + } + snprintf(s, 511, "%s/%s", dnam, username); + fp = fopen(s, "re"); + if (unlikely(!fp)) { + /* Permission problems should be the only reason this happens */ + LOGWARNING("Failed to load user %s logfile to read", username); + continue; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGNOTICE("Failed to read user %s logfile", user->username); + continue; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGNOTICE("Failed to json decode user %s logfile: %s", user->username, s); + continue; + } + + tv_time(&now); + copy_tv(&user->last_share, &now); + copy_tv(&user->last_decay, &now); + user->dsps1 = dsps_from_key(val, "hashrate1m"); + user->dsps5 = dsps_from_key(val, "hashrate5m"); + user->dsps60 = dsps_from_key(val, "hashrate1hr"); + user->dsps1440 = dsps_from_key(val, "hashrate1d"); + user->dsps10080 = dsps_from_key(val, "hashrate7d"); + json_get_int64(&user->shares, val, "shares"); + json_get_double(&user->best_diff, val, "bestshare"); + LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", user->username, + user->dsps1, user->dsps5, user->dsps60, user->dsps1440, + user->dsps10080, user->best_diff); + json_decref(val); + if (tvsec_diff > 60) + decay_user(user, 0, &now); } + closedir(d); } /* Enter holding a reference count */ @@ -5189,13 +5207,10 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) worker->dsps1440 = dsps_from_key(val, "hashrate1d"); worker->dsps10080 = dsps_from_key(val, "hashrate7d"); json_get_double(&worker->best_diff, val, "bestshare"); - json_get_int64(&worker->last_update.tv_sec, val, "lastupdate"); json_get_int64(&worker->shares, val, "shares"); LOGINFO("Successfully read worker %s stats %f %f %f %f %f", worker->workername, worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff); json_decref(val); - if (worker->last_update.tv_sec) - tvsec_diff = now.tv_sec - worker->last_update.tv_sec - 60; if (tvsec_diff > 60) { LOGINFO("Old worker stats indicate not logged for %d seconds, decaying stats", tvsec_diff); @@ -5218,7 +5233,7 @@ static user_instance_t *__create_user(sdata_t *sdata, const char *username) /* Find user by username or create one if it doesn't already exist */ -static user_instance_t *get_create_user(ckpool_t *ckp, sdata_t *sdata, const char *username, bool *new_user) +static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user) { user_instance_t *user; @@ -5230,9 +5245,6 @@ static user_instance_t *get_create_user(ckpool_t *ckp, sdata_t *sdata, const cha } ck_wunlock(&sdata->instance_lock); - if (CKP_STANDALONE(ckp) && *new_user) - read_userstats(ckp, user); - return user; } @@ -5240,7 +5252,7 @@ static user_instance_t *get_user(sdata_t *sdata, const char *username) { bool dummy = false; - return get_create_user(sdata->ckp, sdata, username, &dummy); + return get_create_user(sdata, username, &dummy); } static worker_instance_t *__create_worker(user_instance_t *user, const char *workername) @@ -5315,7 +5327,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, if (unlikely(len > 127)) username[127] = '\0'; - user = get_create_user(ckp, sdata, username, &new_user); + user = get_create_user(sdata, username, &new_user); worker = get_create_worker(ckp, sdata, user, workername, &new_worker); /* Create one worker instance for combined data from workers of the @@ -6791,7 +6803,7 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna if (unlikely(len > 127)) username[127] = '\0'; - user = get_create_user(ckp, sdata, username, &new_user); + user = get_create_user(sdata, username, &new_user); /* Is this a btc address based username? */ if (!ckp->proxy && (new_user || !user->btcaddress) && (len > 26 && len < 35)) @@ -8172,8 +8184,6 @@ static void *statsupdate(void *arg) ghs = worker->dsps10080 * nonces; suffix_string(ghs, suffix10080, 16, 0); - copy_tv(&worker->last_update, &now); - JSON_CPACK(val, "{ss,ss,ss,ss,ss,si,sI,sf}", "hashrate1m", suffix1, "hashrate5m", suffix5, @@ -8214,8 +8224,6 @@ static void *statsupdate(void *arg) ghs = user->dsps10080 * nonces; suffix_string(ghs, suffix10080, 16, 0); - copy_tv(&user->last_update, &now); - JSON_CPACK(val, "{ss,ss,ss,ss,ss,si,si,sI,sf}", "hashrate1m", suffix1, "hashrate5m", suffix5, @@ -8477,15 +8485,15 @@ static void *ckdb_heartbeat(void *arg) return NULL; } -static void read_poolstats(ckpool_t *ckp) +static void read_poolstats(ckpool_t *ckp, int *tvsec_diff) { char *s = alloca(4096), *pstats, *dsps, *sps; sdata_t *sdata = ckp->sdata; pool_stats_t *stats = &sdata->stats; - int tvsec_diff = 0, ret; tv_t now, last; json_t *val; FILE *fp; + int ret; snprintf(s, 4095, "%s/pool/pool.status", ckp->logdir); fp = fopen(s, "re"); @@ -8547,22 +8555,22 @@ static void read_poolstats(ckpool_t *ckp) LOGINFO("Successfully read pool sps: %s", sps); if (last.tv_sec) - tvsec_diff = now.tv_sec - last.tv_sec - 60; - if (tvsec_diff > 60) { + *tvsec_diff = now.tv_sec - last.tv_sec - 60; + if (*tvsec_diff > 60) { LOGNOTICE("Old pool stats indicate pool down for %d seconds, decaying stats", - tvsec_diff); - decay_time(&stats->sps1, 0, tvsec_diff, MIN1); - decay_time(&stats->sps5, 0, tvsec_diff, MIN5); - decay_time(&stats->sps15, 0, tvsec_diff, MIN15); - decay_time(&stats->sps60, 0, tvsec_diff, HOUR); + *tvsec_diff); + decay_time(&stats->sps1, 0, *tvsec_diff, MIN1); + decay_time(&stats->sps5, 0, *tvsec_diff, MIN5); + decay_time(&stats->sps15, 0, *tvsec_diff, MIN15); + decay_time(&stats->sps60, 0, *tvsec_diff, HOUR); - decay_time(&stats->dsps1, 0, tvsec_diff, MIN1); - decay_time(&stats->dsps5, 0, tvsec_diff, MIN5); - decay_time(&stats->dsps15, 0, tvsec_diff, MIN15); - decay_time(&stats->dsps60, 0, tvsec_diff, HOUR); - decay_time(&stats->dsps360, 0, tvsec_diff, HOUR6); - decay_time(&stats->dsps1440, 0, tvsec_diff, DAY); - decay_time(&stats->dsps10080, 0, tvsec_diff, WEEK); + decay_time(&stats->dsps1, 0, *tvsec_diff, MIN1); + decay_time(&stats->dsps5, 0, *tvsec_diff, MIN5); + decay_time(&stats->dsps15, 0, *tvsec_diff, MIN15); + decay_time(&stats->dsps60, 0, *tvsec_diff, HOUR); + decay_time(&stats->dsps360, 0, *tvsec_diff, HOUR6); + decay_time(&stats->dsps1440, 0, *tvsec_diff, DAY); + decay_time(&stats->dsps10080, 0, *tvsec_diff, WEEK); } } @@ -8577,10 +8585,10 @@ void *stratifier(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; + int threads, tvsec_diff = 0; ckpool_t *ckp = pi->ckp; int64_t randomiser; sdata_t *sdata; - int threads; rename_proc(pi->processname); LOGWARNING("%s stratifier starting", ckp->name); @@ -8649,7 +8657,8 @@ void *stratifier(void *arg) sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); - read_poolstats(ckp); + read_poolstats(ckp, &tvsec_diff); + read_userstats(ckp, tvsec_diff); cklock_init(&sdata->txn_lock); cklock_init(&sdata->workbase_lock);