Browse Source

Load all userstats at startup.

master
Con Kolivas 8 years ago
parent
commit
44a0c12f4c
  1. 165
      src/stratifier.c

165
src/stratifier.c

@ -206,7 +206,6 @@ struct user_instance {
double dsps10080;
tv_t last_share;
tv_t last_decay;
tv_t last_update;
bool authorised; /* Has this username ever been authorised? */
time_t auth_time;
@ -234,7 +233,6 @@ struct worker_instance {
double dsps10080;
tv_t last_share;
tv_t last_decay;
tv_t last_update;
time_t start_time;
double best_diff; /* Best share found by this worker */
@ -5100,56 +5098,76 @@ static void decay_user(user_instance_t *user, double diff, tv_t *now_t)
copy_tv(&user->last_decay, now_t);
}
/* Enter holding a reference count */
static void read_userstats(ckpool_t *ckp, user_instance_t *user)
static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user);
/* Load the statistics of and create all known users at startup */
static void read_userstats(ckpool_t *ckp, int tvsec_diff)
{
int tvsec_diff = 0, ret;
char s[512];
json_t *val;
FILE *fp;
tv_t now;
struct dirent *dir;
char dnam[512];
DIR *d;
snprintf(s, 511, "%s/users/%s", ckp->logdir, user->username);
fp = fopen(s, "re");
if (!fp) {
LOGINFO("User %s does not have a logfile to read", user->username);
return;
}
memset(s, 0, 512);
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGINFO("Failed to read user %s logfile", user->username);
return;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGINFO("Failed to json decode user %s logfile: %s", user->username, s);
snprintf(dnam, 511, "%susers", ckp->logdir);
d = opendir(dnam);
if (!d) {
LOGNOTICE("No user directory found");
return;
}
while ((dir = readdir(d)) != NULL) {
char *username = basename(dir->d_name), s[512];
user_instance_t *user;
bool new_user = false;
json_t *val;
FILE *fp;
tv_t now;
int ret;
tv_time(&now);
copy_tv(&user->last_share, &now);
copy_tv(&user->last_decay, &now);
user->dsps1 = dsps_from_key(val, "hashrate1m");
user->dsps5 = dsps_from_key(val, "hashrate5m");
user->dsps60 = dsps_from_key(val, "hashrate1hr");
user->dsps1440 = dsps_from_key(val, "hashrate1d");
user->dsps10080 = dsps_from_key(val, "hashrate7d");
json_get_int64(&user->last_update.tv_sec, val, "lastupdate");
json_get_int64(&user->shares, val, "shares");
json_get_double(&user->best_diff, val, "bestshare");
LOGINFO("Successfully read user %s stats %f %f %f %f %f %f", user->username,
user->dsps1, user->dsps5, user->dsps60, user->dsps1440,
user->dsps10080, user->best_diff);
json_decref(val);
if (user->last_update.tv_sec)
tvsec_diff = now.tv_sec - user->last_update.tv_sec - 60;
if (tvsec_diff > 60) {
LOGINFO("Old user stats indicate not logged for %d seconds, decaying stats",
tvsec_diff);
decay_user(user, 0, &now);
if (!strcmp(username, "/") || !strcmp(username, ".") || !strcmp(username, ".."))
continue;
user = get_create_user(ckp->sdata, username, &new_user);
if (unlikely(!new_user)) {
/* All users should be new at this stage */
LOGWARNING("Duplicate user in read_userstats %s", username);
continue;
}
snprintf(s, 511, "%s/%s", dnam, username);
fp = fopen(s, "re");
if (unlikely(!fp)) {
/* Permission problems should be the only reason this happens */
LOGWARNING("Failed to load user %s logfile to read", username);
continue;
}
memset(s, 0, 512);
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGNOTICE("Failed to read user %s logfile", user->username);
continue;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGNOTICE("Failed to json decode user %s logfile: %s", user->username, s);
continue;
}
tv_time(&now);
copy_tv(&user->last_share, &now);
copy_tv(&user->last_decay, &now);
user->dsps1 = dsps_from_key(val, "hashrate1m");
user->dsps5 = dsps_from_key(val, "hashrate5m");
user->dsps60 = dsps_from_key(val, "hashrate1hr");
user->dsps1440 = dsps_from_key(val, "hashrate1d");
user->dsps10080 = dsps_from_key(val, "hashrate7d");
json_get_int64(&user->shares, val, "shares");
json_get_double(&user->best_diff, val, "bestshare");
LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", user->username,
user->dsps1, user->dsps5, user->dsps60, user->dsps1440,
user->dsps10080, user->best_diff);
json_decref(val);
if (tvsec_diff > 60)
decay_user(user, 0, &now);
}
closedir(d);
}
/* Enter holding a reference count */
@ -5189,13 +5207,10 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
worker->dsps1440 = dsps_from_key(val, "hashrate1d");
worker->dsps10080 = dsps_from_key(val, "hashrate7d");
json_get_double(&worker->best_diff, val, "bestshare");
json_get_int64(&worker->last_update.tv_sec, val, "lastupdate");
json_get_int64(&worker->shares, val, "shares");
LOGINFO("Successfully read worker %s stats %f %f %f %f %f", worker->workername,
worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff);
json_decref(val);
if (worker->last_update.tv_sec)
tvsec_diff = now.tv_sec - worker->last_update.tv_sec - 60;
if (tvsec_diff > 60) {
LOGINFO("Old worker stats indicate not logged for %d seconds, decaying stats",
tvsec_diff);
@ -5218,7 +5233,7 @@ static user_instance_t *__create_user(sdata_t *sdata, const char *username)
/* Find user by username or create one if it doesn't already exist */
static user_instance_t *get_create_user(ckpool_t *ckp, sdata_t *sdata, const char *username, bool *new_user)
static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user)
{
user_instance_t *user;
@ -5230,9 +5245,6 @@ static user_instance_t *get_create_user(ckpool_t *ckp, sdata_t *sdata, const cha
}
ck_wunlock(&sdata->instance_lock);
if (CKP_STANDALONE(ckp) && *new_user)
read_userstats(ckp, user);
return user;
}
@ -5240,7 +5252,7 @@ static user_instance_t *get_user(sdata_t *sdata, const char *username)
{
bool dummy = false;
return get_create_user(sdata->ckp, sdata, username, &dummy);
return get_create_user(sdata, username, &dummy);
}
static worker_instance_t *__create_worker(user_instance_t *user, const char *workername)
@ -5315,7 +5327,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
if (unlikely(len > 127))
username[127] = '\0';
user = get_create_user(ckp, sdata, username, &new_user);
user = get_create_user(sdata, username, &new_user);
worker = get_create_worker(ckp, sdata, user, workername, &new_worker);
/* Create one worker instance for combined data from workers of the
@ -6791,7 +6803,7 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna
if (unlikely(len > 127))
username[127] = '\0';
user = get_create_user(ckp, sdata, username, &new_user);
user = get_create_user(sdata, username, &new_user);
/* Is this a btc address based username? */
if (!ckp->proxy && (new_user || !user->btcaddress) && (len > 26 && len < 35))
@ -8172,8 +8184,6 @@ static void *statsupdate(void *arg)
ghs = worker->dsps10080 * nonces;
suffix_string(ghs, suffix10080, 16, 0);
copy_tv(&worker->last_update, &now);
JSON_CPACK(val, "{ss,ss,ss,ss,ss,si,sI,sf}",
"hashrate1m", suffix1,
"hashrate5m", suffix5,
@ -8214,8 +8224,6 @@ static void *statsupdate(void *arg)
ghs = user->dsps10080 * nonces;
suffix_string(ghs, suffix10080, 16, 0);
copy_tv(&user->last_update, &now);
JSON_CPACK(val, "{ss,ss,ss,ss,ss,si,si,sI,sf}",
"hashrate1m", suffix1,
"hashrate5m", suffix5,
@ -8477,15 +8485,15 @@ static void *ckdb_heartbeat(void *arg)
return NULL;
}
static void read_poolstats(ckpool_t *ckp)
static void read_poolstats(ckpool_t *ckp, int *tvsec_diff)
{
char *s = alloca(4096), *pstats, *dsps, *sps;
sdata_t *sdata = ckp->sdata;
pool_stats_t *stats = &sdata->stats;
int tvsec_diff = 0, ret;
tv_t now, last;
json_t *val;
FILE *fp;
int ret;
snprintf(s, 4095, "%s/pool/pool.status", ckp->logdir);
fp = fopen(s, "re");
@ -8547,22 +8555,22 @@ static void read_poolstats(ckpool_t *ckp)
LOGINFO("Successfully read pool sps: %s", sps);
if (last.tv_sec)
tvsec_diff = now.tv_sec - last.tv_sec - 60;
if (tvsec_diff > 60) {
*tvsec_diff = now.tv_sec - last.tv_sec - 60;
if (*tvsec_diff > 60) {
LOGNOTICE("Old pool stats indicate pool down for %d seconds, decaying stats",
tvsec_diff);
decay_time(&stats->sps1, 0, tvsec_diff, MIN1);
decay_time(&stats->sps5, 0, tvsec_diff, MIN5);
decay_time(&stats->sps15, 0, tvsec_diff, MIN15);
decay_time(&stats->sps60, 0, tvsec_diff, HOUR);
*tvsec_diff);
decay_time(&stats->sps1, 0, *tvsec_diff, MIN1);
decay_time(&stats->sps5, 0, *tvsec_diff, MIN5);
decay_time(&stats->sps15, 0, *tvsec_diff, MIN15);
decay_time(&stats->sps60, 0, *tvsec_diff, HOUR);
decay_time(&stats->dsps1, 0, tvsec_diff, MIN1);
decay_time(&stats->dsps5, 0, tvsec_diff, MIN5);
decay_time(&stats->dsps15, 0, tvsec_diff, MIN15);
decay_time(&stats->dsps60, 0, tvsec_diff, HOUR);
decay_time(&stats->dsps360, 0, tvsec_diff, HOUR6);
decay_time(&stats->dsps1440, 0, tvsec_diff, DAY);
decay_time(&stats->dsps10080, 0, tvsec_diff, WEEK);
decay_time(&stats->dsps1, 0, *tvsec_diff, MIN1);
decay_time(&stats->dsps5, 0, *tvsec_diff, MIN5);
decay_time(&stats->dsps15, 0, *tvsec_diff, MIN15);
decay_time(&stats->dsps60, 0, *tvsec_diff, HOUR);
decay_time(&stats->dsps360, 0, *tvsec_diff, HOUR6);
decay_time(&stats->dsps1440, 0, *tvsec_diff, DAY);
decay_time(&stats->dsps10080, 0, *tvsec_diff, WEEK);
}
}
@ -8577,10 +8585,10 @@ void *stratifier(void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
int threads, tvsec_diff = 0;
ckpool_t *ckp = pi->ckp;
int64_t randomiser;
sdata_t *sdata;
int threads;
rename_proc(pi->processname);
LOGWARNING("%s stratifier starting", ckp->name);
@ -8649,7 +8657,8 @@ void *stratifier(void *arg)
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
read_poolstats(ckp);
read_poolstats(ckp, &tvsec_diff);
read_userstats(ckp, tvsec_diff);
cklock_init(&sdata->txn_lock);
cklock_init(&sdata->workbase_lock);

Loading…
Cancel
Save