From 1777d00d30c00883e6bb04d4afd98d985e75e94c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 17 Apr 2017 13:57:22 +1000 Subject: [PATCH] Load all existing worker stats on startup. --- src/stratifier.c | 158 ++++++++++++++++++++++++++++------------------- 1 file changed, 93 insertions(+), 65 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index d7adf939..de1aabdd 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5099,12 +5099,20 @@ static void decay_user(user_instance_t *user, double diff, tv_t *now_t) } static user_instance_t *get_create_user(sdata_t *sdata, const char *username, bool *new_user); +static worker_instance_t *get_create_worker(sdata_t *sdata, user_instance_t *user, + const char *workername, bool *new_worker); /* Load the statistics of and create all known users at startup */ -static void read_userstats(ckpool_t *ckp, int tvsec_diff) +static void read_userstats(ckpool_t *ckp, sdata_t *sdata, int tvsec_diff) { + char dnam[512], s[512], *username; + user_instance_t *user; struct dirent *dir; - char dnam[512]; + bool new_user; + int ret, len; + json_t *val; + FILE *fp; + tv_t now; DIR *d; snprintf(dnam, 511, "%susers", ckp->logdir); @@ -5113,18 +5121,16 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff) LOGNOTICE("No user directory found"); return; } - while ((dir = readdir(d)) != NULL) { - char *username = basename(dir->d_name), s[512]; - user_instance_t *user; - bool new_user = false; - json_t *val; - FILE *fp; - tv_t now; - int ret; + tv_time(&now); + + while ((dir = readdir(d)) != NULL) { + username = basename(dir->d_name); if (!strcmp(username, "/") || !strcmp(username, ".") || !strcmp(username, "..")) continue; - user = get_create_user(ckp->sdata, username, &new_user); + + new_user = false; + user = get_create_user(sdata, username, &new_user); if (unlikely(!new_user)) { /* All users should be new at this stage */ LOGWARNING("Duplicate user in read_userstats %s", username); @@ -5141,16 +5147,15 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff) ret = fread(s, 1, 511, fp); fclose(fp); if (ret < 1) { - LOGNOTICE("Failed to read user %s logfile", user->username); + LOGNOTICE("Failed to read user %s logfile", username); continue; } val = json_loads(s, 0, NULL); if (!val) { - LOGNOTICE("Failed to json decode user %s logfile: %s", user->username, s); + LOGNOTICE("Failed to json decode user %s logfile: %s", username, s); continue; } - tv_time(&now); copy_tv(&user->last_share, &now); copy_tv(&user->last_decay, &now); user->dsps1 = dsps_from_key(val, "hashrate1m"); @@ -5160,62 +5165,88 @@ static void read_userstats(ckpool_t *ckp, int tvsec_diff) user->dsps10080 = dsps_from_key(val, "hashrate7d"); json_get_int64(&user->shares, val, "shares"); json_get_double(&user->best_diff, val, "bestshare"); - LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", user->username, + LOGDEBUG("Successfully read user %s stats %f %f %f %f %f %f", username, user->dsps1, user->dsps5, user->dsps60, user->dsps1440, user->dsps10080, user->best_diff); json_decref(val); if (tvsec_diff > 60) decay_user(user, 0, &now); + } closedir(d); -} - -/* Enter holding a reference count */ -static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) -{ - int tvsec_diff = 0, ret; - char s[512]; - json_t *val; - FILE *fp; - tv_t now; - snprintf(s, 511, "%s/workers/%s", ckp->logdir, worker->workername); - fp = fopen(s, "re"); - if (!fp) { - LOGINFO("Worker %s does not have a logfile to read", worker->workername); - return; - } - memset(s, 0, 512); - ret = fread(s, 1, 511, fp); - fclose(fp); - if (ret < 1) { - LOGINFO("Failed to read worker %s logfile", worker->workername); - return; - } - val = json_loads(s, 0, NULL); - if (!val) { - LOGINFO("Failed to json decode worker %s logfile: %s", worker->workername, s); + /* Now get all the worker stats */ + snprintf(dnam, 511, "%sworkers", ckp->logdir); + d = opendir(dnam); + if (unlikely(!d)) { + LOGNOTICE("No worker directory found"); return; } - tv_time(&now); - copy_tv(&worker->last_share, &now); - copy_tv(&worker->last_decay, &now); - worker->dsps1 = dsps_from_key(val, "hashrate1m"); - worker->dsps5 = dsps_from_key(val, "hashrate5m"); - worker->dsps60 = dsps_from_key(val, "hashrate1hr"); - worker->dsps1440 = dsps_from_key(val, "hashrate1d"); - worker->dsps10080 = dsps_from_key(val, "hashrate7d"); - json_get_double(&worker->best_diff, val, "bestshare"); - json_get_int64(&worker->shares, val, "shares"); - LOGINFO("Successfully read worker %s stats %f %f %f %f %f", worker->workername, - worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff); - json_decref(val); - if (tvsec_diff > 60) { - LOGINFO("Old worker stats indicate not logged for %d seconds, decaying stats", - tvsec_diff); - decay_worker(worker, 0, &now); + while ((dir = readdir(d)) != NULL) { + char *workername = basename(dir->d_name), *base_username; + worker_instance_t *worker; + bool new_worker = false; + + if (!strcmp(workername, "/") || !strcmp(workername, ".") || !strcmp(workername, "..")) + continue; + + base_username = strdupa(workername); + if (!username || !strlen(username)) + username = base_username; + len = strlen(username); + if (unlikely(len > 127)) + username[127] = '\0'; + + new_user = false; + user = get_create_user(sdata, username, &new_user); + if (unlikely(new_user)) { + /* This shouldn't happen */ + LOGWARNING("Created new user from worker %s in read_userstats", + workername); + } + worker = get_create_worker(sdata, user, workername, &new_worker); + if (unlikely(!new_worker)) { + LOGWARNING("Duplicate worker in read_userstats %s", workername); + continue; + } + snprintf(s, 511, "%s/%s", dnam, workername); + fp = fopen(s, "re"); + if (unlikely(!fp)) { + LOGWARNING("Failed to load worker %s logfile to read", workername); + continue; + } + memset(s, 0, 512); + ret = fread(s, 1, 511, fp); + fclose(fp); + if (ret < 1) { + LOGNOTICE("Failed to read worker %s logfile", workername); + continue; + } + val = json_loads(s, 0, NULL); + if (!val) { + LOGNOTICE("Failed to json decode worker %s logfile: %s", + workername, s); + continue; + } + + copy_tv(&worker->last_share, &now); + copy_tv(&worker->last_decay, &now); + worker->dsps1 = dsps_from_key(val, "hashrate1m"); + worker->dsps5 = dsps_from_key(val, "hashrate5m"); + worker->dsps60 = dsps_from_key(val, "hashrate1hr"); + worker->dsps1440 = dsps_from_key(val, "hashrate1d"); + worker->dsps10080 = dsps_from_key(val, "hashrate7d"); + json_get_double(&worker->best_diff, val, "bestshare"); + json_get_int64(&worker->shares, val, "shares"); + LOGDEBUG("Successfully read worker %s stats %f %f %f %f %f", worker->workername, + worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff); + json_decref(val); + if (tvsec_diff > 60) + decay_worker(worker, 0, &now); } + closedir(d); + } #define DEFAULT_AUTH_BACKOFF (3) /* Set initial backoff to 3 seconds */ @@ -5281,7 +5312,7 @@ static worker_instance_t *__get_worker(user_instance_t *user, const char *worker /* Find worker amongst a user's workers by workername or create one if it * doesn't yet exist. */ -static worker_instance_t *get_create_worker(ckpool_t *ckp, sdata_t *sdata, user_instance_t *user, +static worker_instance_t *get_create_worker(sdata_t *sdata, user_instance_t *user, const char *workername, bool *new_worker) { worker_instance_t *worker; @@ -5294,9 +5325,6 @@ static worker_instance_t *get_create_worker(ckpool_t *ckp, sdata_t *sdata, user_ } ck_wunlock(&sdata->instance_lock); - if (CKP_STANDALONE(ckp) && *new_worker) - read_workerstats(ckp, worker); - return worker; } @@ -5304,7 +5332,7 @@ static worker_instance_t *get_worker(sdata_t *sdata, user_instance_t *user, cons { bool dummy = false; - return get_create_worker(sdata->ckp, sdata, user, workername, &dummy); + return get_create_worker(sdata, user, workername, &dummy); } /* This simply strips off the first part of the workername and matches it to a @@ -5328,7 +5356,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, username[127] = '\0'; user = get_create_user(sdata, username, &new_user); - worker = get_create_worker(ckp, sdata, user, workername, &new_worker); + worker = get_create_worker(sdata, user, workername, &new_worker); /* Create one worker instance for combined data from workers of the * same name */ @@ -8658,7 +8686,7 @@ void *stratifier(void *arg) sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp, &tvsec_diff); - read_userstats(ckp, tvsec_diff); + read_userstats(ckp, sdata, tvsec_diff); cklock_init(&sdata->txn_lock); cklock_init(&sdata->workbase_lock);