Browse Source

Load all existing worker stats on startup.

master
Con Kolivas 8 years ago
parent
commit
1777d00d30
  1. 116
      src/stratifier.c

116
src/stratifier.c

@ -5099,12 +5099,20 @@ static void decay_user(user_instance_t *user, double diff, tv_t *now_t)
}
static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user);
static worker_instance_t *get_create_worker(sdata_t *sdata, user_instance_t *user,
const char *workername, bool *new_worker);
/* Load the statistics of and create all known users at startup */
static void read_userstats(ckpool_t *ckp, int tvsec_diff)
static void read_userstats(ckpool_t *ckp, sdata_t *sdata, int tvsec_diff)
{
char dnam[512], s[512], *username;
user_instance_t *user;
struct dirent *dir;
char dnam[512];
bool new_user;
int ret, len;
json_t *val;
FILE *fp;
tv_t now;
DIR *d;
snprintf(dnam, 511, "%susers", ckp->logdir);
@ -5113,18 +5121,16 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff)
LOGNOTICE("No user directory found");
return;
}
while ((dir = readdir(d)) != NULL) {
char *username = basename(dir->d_name), s[512];
user_instance_t *user;
bool new_user = false;
json_t *val;
FILE *fp;
tv_t now;
int ret;
tv_time(&now);
while ((dir = readdir(d)) != NULL) {
username = basename(dir->d_name);
if (!strcmp(username, "/") || !strcmp(username, ".") || !strcmp(username, ".."))
continue;
user = get_create_user(ckp->sdata, username, &new_user);
new_user = false;
user = get_create_user(sdata, username, &new_user);
if (unlikely(!new_user)) {
/* All users should be new at this stage */
LOGWARNING("Duplicate user in read_userstats %s", username);
@ -5141,16 +5147,15 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff)
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGNOTICE("Failed to read user %s logfile", user->username);
LOGNOTICE("Failed to read user %s logfile", username);
continue;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGNOTICE("Failed to json decode user %s logfile: %s", user->username, s);
LOGNOTICE("Failed to json decode user %s logfile: %s", username, s);
continue;
}
tv_time(&now);
copy_tv(&user->last_share, &now);
copy_tv(&user->last_decay, &now);
user->dsps1 = dsps_from_key(val, "hashrate1m");
@ -5160,45 +5165,71 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff)
user->dsps10080 = dsps_from_key(val, "hashrate7d");
json_get_int64(&user->shares, val, "shares");
json_get_double(&user->best_diff, val, "bestshare");
LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", user->username,
LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", username,
user->dsps1, user->dsps5, user->dsps60, user->dsps1440,
user->dsps10080, user->best_diff);
json_decref(val);
if (tvsec_diff > 60)
decay_user(user, 0, &now);
}
closedir(d);
}
/* Enter holding a reference count */
static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
{
int tvsec_diff = 0, ret;
char s[512];
json_t *val;
FILE *fp;
tv_t now;
/* Now get all the worker stats */
snprintf(dnam, 511, "%sworkers", ckp->logdir);
d = opendir(dnam);
if (unlikely(!d)) {
LOGNOTICE("No worker directory found");
return;
}
snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername);
while ((dir = readdir(d)) != NULL) {
char *workername = basename(dir->d_name), *base_username;
worker_instance_t *worker;
bool new_worker = false;
if (!strcmp(workername, "/") || !strcmp(workername, ".") || !strcmp(workername, ".."))
continue;
base_username = strdupa(workername);
if (!username || !strlen(username))
username = base_username;
len = strlen(username);
if (unlikely(len > 127))
username[127] = '\0';
new_user = false;
user = get_create_user(sdata, username, &new_user);
if (unlikely(new_user)) {
/* This shouldn't happen */
LOGWARNING("Created new user from worker %s in read_userstats",
workername);
}
worker = get_create_worker(sdata, user, workername, &new_worker);
if (unlikely(!new_worker)) {
LOGWARNING("Duplicate worker in read_userstats %s", workername);
continue;
}
snprintf(s, 511, "%s/%s", dnam, workername);
fp = fopen(s, "re");
if (!fp) {
LOGINFO("Worker %s does not have a logfile to read", worker->workername);
return;
if (unlikely(!fp)) {
LOGWARNING("Failed to load worker %s logfile to read", workername);
continue;
}
memset(s, 0, 512);
ret = fread(s, 1, 511, fp);
fclose(fp);
if (ret < 1) {
LOGINFO("Failed to read worker %s logfile", worker->workername);
return;
LOGNOTICE("Failed to read worker %s logfile", workername);
continue;
}
val = json_loads(s, 0, NULL);
if (!val) {
LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s);
return;
LOGNOTICE("Failed to json decode worker %s logfile: %s",
workername, s);
continue;
}
tv_time(&now);
copy_tv(&worker->last_share, &now);
copy_tv(&worker->last_decay, &now);
worker->dsps1 = dsps_from_key(val, "hashrate1m");
@ -5208,14 +5239,14 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
worker->dsps10080 = dsps_from_key(val, "hashrate7d");
json_get_double(&worker->best_diff, val, "bestshare");
json_get_int64(&worker->shares, val, "shares");
LOGINFO("Successfully read worker %s stats %f %f %f %f %f", worker->workername,
LOGDEBUG("Successfully read worker %s stats %f %f %f %f %f", worker->workername,
worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff);
json_decref(val);
if (tvsec_diff > 60) {
LOGINFO("Old worker stats indicate not logged for %d seconds, decaying stats",
tvsec_diff);
if (tvsec_diff > 60)
decay_worker(worker, 0, &now);
}
closedir(d);
}
#define DEFAULT_AUTH_BACKOFF (3) /* Set initial backoff to 3 seconds */
@ -5281,7 +5312,7 @@ static worker_instance_t *__get_worker(user_instance_t *user, const char *worker
/* Find worker amongst a user's workers by workername or create one if it
* doesn't yet exist. */
static worker_instance_t *get_create_worker(ckpool_t *ckp, sdata_t *sdata, user_instance_t *user,
static worker_instance_t *get_create_worker(sdata_t *sdata, user_instance_t *user,
const char *workername, bool *new_worker)
{
worker_instance_t *worker;
@ -5294,9 +5325,6 @@ static worker_instance_t *get_create_worker(ckpool_t *ckp, sdata_t *sdata, user_
}
ck_wunlock(&sdata->instance_lock);
if (CKP_STANDALONE(ckp) && *new_worker)
read_workerstats(ckp, worker);
return worker;
}
@ -5304,7 +5332,7 @@ static worker_instance_t *get_worker(sdata_t *sdata, user_instance_t *user, cons
{
bool dummy = false;
return get_create_worker(sdata->ckp, sdata, user, workername, &dummy);
return get_create_worker(sdata, user, workername, &dummy);
}
/* This simply strips off the first part of the workername and matches it to a
@ -5328,7 +5356,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
username[127] = '\0';
user = get_create_user(sdata, username, &new_user);
worker = get_create_worker(ckp, sdata, user, workername, &new_worker);
worker = get_create_worker(sdata, user, workername, &new_worker);
/* Create one worker instance for combined data from workers of the
* same name */
@ -8658,7 +8686,7 @@ void *stratifier(void *arg)
sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
read_poolstats(ckp, &tvsec_diff);
read_userstats(ckp, tvsec_diff);
read_userstats(ckp, sdata, tvsec_diff);
cklock_init(&sdata->txn_lock);
cklock_init(&sdata->workbase_lock);

Loading…
Cancel
Save