From 2bbb7a6f754123e7463cf404d00ea28fae197375 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 20 Aug 2014 02:47:21 +1000 Subject: [PATCH] ckdb - store/display last share diff and expedite shutdown - DB v0.7 --- pool/base.php | 27 +++++++++ pool/page_workers.php | 14 ++++- sql/ckdb.sql | 3 +- sql/v0.6-v0.7.sql | 28 +++++++++ src/ckdb.c | 129 +++++++++++++++++++++++++++--------------- 5 files changed, 151 insertions(+), 50 deletions(-) create mode 100644 sql/v0.6-v0.7.sql diff --git a/pool/base.php b/pool/base.php index 97211a63..c00c06fd 100644 --- a/pool/base.php +++ b/pool/base.php @@ -24,6 +24,33 @@ function btcfmt($amt) return number_format($amt, 8); } # +global $sipre; +$sipre = array('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'); +function siprefmt($amt) +{ + global $sipre; + + $pref = floor(log10($amt)/3); + if ($pref < 0) + $pref = 0; + if ($pref >= count($sipre)) + $pref = count($sipre)-1; + + $amt = round(100.0 * $amt / pow(10, $pref * 3)) / 100; + if ($amt > 999.99 && $pref < (count($sipre)-1)) + { + $amt /= 1000; + $pref++; + } + + return number_format($amt, 2).$sipre[$pref]; +} +# +function difffmt($amt) +{ + return siprefmt($amt); +} +# function emailStr($str) { $all = '/[^A-Za-z0-9_+\.@-]/'; // no space = trim diff --git a/pool/page_workers.php b/pool/page_workers.php index 83c5da01..8e305dfa 100644 --- a/pool/page_workers.php +++ b/pool/page_workers.php @@ -10,9 +10,10 @@ function doworker($data, $user) $pg .= "\n"; $pg .= ""; $pg .= ""; - $pg .= ""; - $pg .= ""; - $pg .= ""; +// $pg .= ""; +// $pg .= ""; +// $pg .= ""; + $pg .= ""; $pg .= ""; $pg .= ""; $pg .= "\n"; @@ -28,6 +29,7 @@ function doworker($data, $user) $pg .= ""; $pg .= ''; +/* $pg .= ''; $nots = $ans['idlenotificationenabled'.$i]; switch ($nots) @@ -41,6 +43,12 @@ function doworker($data, $user) } $pg .= ''; $pg .= ''; +*/ + if ($ans['w_lastdiff'.$i] > 0) + $ld = difffmt($ans['w_lastdiff'.$i]); + else + $ld = ' '; + $pg .= ""; $lst = $ans['STAMP'] - $ans['w_lastshare'.$i]; if ($lst < 60) $lstdes = $lst.'s'; diff --git a/sql/ckdb.sql b/sql/ckdb.sql index eebc9df9..374d753c 100644 --- a/sql/ckdb.sql +++ b/sql/ckdb.sql @@ -216,6 +216,7 @@ CREATE TABLE sharesummary ( -- per workinfo for each user+worker errorcount bigint NOT NULL, firstshare timestamp with time zone NOT NULL, lastshare timestamp with time zone NOT NULL, + lastdiffacc float NOT NULL, complete char NOT NULL, createdate timestamp with time zone NOT NULL, createby character varying(64) NOT NULL, @@ -390,4 +391,4 @@ CREATE TABLE version ( PRIMARY KEY (vlock) ); -insert into version (vlock,version) values (1,'0.6'); +insert into version (vlock,version) values (1,'0.7'); diff --git a/sql/v0.6-v0.7.sql b/sql/v0.6-v0.7.sql new file mode 100644 index 00000000..ec06cfd1 --- /dev/null +++ b/sql/v0.6-v0.7.sql @@ -0,0 +1,28 @@ +SET SESSION AUTHORIZATION 'postgres'; + +BEGIN transaction; + +DO $$ +DECLARE ver TEXT; +BEGIN + + UPDATE version set version='0.7' where vlock=1 and version='0.6'; + + IF found THEN + RETURN; + END IF; + + SELECT version into ver from version + WHERE vlock=1; + + RAISE EXCEPTION 'Wrong DB version - expect "0.6" - found "%"', ver; + +END $$; + +ALTER TABLE ONLY sharesummary + ADD COLUMN lastdiffacc float DEFAULT 0; + +ALTER TABLE ONLY sharesummary + ALTER COLUMN lastdiffacc DROP DEFAULT; + +END transaction; diff --git a/src/ckdb.c b/src/ckdb.c index 176ebf07..96b03262 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -45,7 +45,7 @@ */ #define DB_VLOCK "1" -#define DB_VERSION "0.6" +#define DB_VERSION "0.7" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -599,16 +599,15 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; #define PQPARAM6 "$1,$2,$3,$4,$5,$6" #define PQPARAM7 "$1,$2,$3,$4,$5,$6,$7" #define PQPARAM8 "$1,$2,$3,$4,$5,$6,$7,$8" -#define PQPARAM9 "$1,$2,$3,$4,$5,$6,$7,$8,$9" -#define PQPARAM10 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10" -#define PQPARAM11 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11" -#define PQPARAM12 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12" -#define PQPARAM13 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13" -#define PQPARAM14 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14" -#define PQPARAM15 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15" -#define PQPARAM16 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16" -#define PQPARAM17 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17" -#define PQPARAM26 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26" +#define PQPARAM9 PQPARAM8 ",$9" +#define PQPARAM10 PQPARAM8 ",$9,$10" +#define PQPARAM11 PQPARAM8 ",$9,$10,$11" +#define PQPARAM12 PQPARAM8 ",$9,$10,$11,$12" +#define PQPARAM13 PQPARAM8 ",$9,$10,$11,$12,$13" +#define PQPARAM14 PQPARAM8 ",$9,$10,$11,$12,$13,$14" +#define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15" +#define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16" +#define PQPARAM27 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27" #define PARCHK(_par, _params) do { \ if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \ @@ -978,7 +977,7 @@ static K_LIST *shares_free; static K_STORE *shares_store; // SHAREERRORS shareerrors.id.json={...} -typedef struct shareerrorss { +typedef struct shareerrors { int64_t workinfoid; int64_t userid; char workername[TXT_BIG+1]; @@ -1020,6 +1019,7 @@ typedef struct sharesummary { bool reset; // non-DB field tv_t firstshare; tv_t lastshare; + double lastdiffacc; char complete[TXT_FLAG+1]; MODIFYDATECONTROLFIELDS; } SHARESUMMARY; @@ -1269,6 +1269,7 @@ typedef struct workerstatus { char workername[TXT_BIG+1]; tv_t last_auth; tv_t last_share; + double last_diff; tv_t last_stats; tv_t last_idle; } WORKERSTATUS; @@ -1942,7 +1943,7 @@ static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool static cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b); static cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b); -/* All data is loaded, now update workerstatus last_share, last_idle, last_stats +/* All data is loaded, now update workerstatus fields * Since shares are all part of a sharesummary, there's no need to search shares */ static void workerstatus_ready() @@ -1987,6 +1988,8 @@ static void workerstatus_ready() &(DATA_SHARESUMMARY(ss_item)->lastshare))) { copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_share), &(DATA_SHARESUMMARY(ss_item)->lastshare)); + DATA_WORKERSTATUS(ws_item)->last_diff = + DATA_SHARESUMMARY(ss_item)->lastdiffacc; } } @@ -2010,8 +2013,10 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta if (startup_complete && shares) { item = find_create_workerstatus(shares->userid, shares->workername); row = DATA_WORKERSTATUS(item); - if (tv_newer(&(row->last_share), &(shares->createdate))) + if (tv_newer(&(row->last_share), &(shares->createdate))) { copy_tv(&(row->last_share), &(shares->createdate)); + row->last_diff = shares->diff; + } } if (startup_complete && userstats) { @@ -2029,8 +2034,10 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta if (startup_complete && sharesummary) { item = find_create_workerstatus(sharesummary->userid, sharesummary->workername); row = DATA_WORKERSTATUS(item); - if (tv_newer(&(row->last_share), &(sharesummary->lastshare))) + if (tv_newer(&(row->last_share), &(sharesummary->lastshare))) { copy_tv(&(row->last_share), &(sharesummary->lastshare)); + row->last_diff = sharesummary->lastdiffacc; + } } } @@ -3495,7 +3502,7 @@ static cmp_t cmp_shares(K_ITEM *a, K_ITEM *b) return c; } -static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd) +static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff) { row->diffacc = row->diffsta = row->diffdup = row->diffhi = row->diffrej = row->shareacc = row->sharesta = row->sharedup = @@ -3506,6 +3513,7 @@ static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd) row->firstshare.tv_usec = cd->tv_usec; row->lastshare.tv_sec = row->firstshare.tv_sec; row->lastshare.tv_usec = row->firstshare.tv_usec; + row->lastdiffacc = diff; row->complete[0] = SUMMARY_NEW; row->complete[1] = '\0'; } @@ -3572,7 +3580,7 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor } if (!DATA_SHARESUMMARY(ss_item)->reset) { - zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd); + zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd, shares->diff); DATA_SHARESUMMARY(ss_item)->reset = true; } } @@ -3682,7 +3690,7 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, } if (!DATA_SHARESUMMARY(ss_item)->reset) { - zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd); + zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd, 0.0); DATA_SHARESUMMARY(ss_item)->reset = true; } } @@ -3786,12 +3794,13 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row K_ITEM *item; char *ins, *upd; bool ok = false, new; - char *params[18 + MODIFYDATECOUNT]; + char *params[19 + MODIFYDATECOUNT]; int n, par; int64_t userid, workinfoid; char *workername; tv_t *sharecreatedate; bool must_update = false, conned = false; + double diff = 0; LOGDEBUG("%s(): update", __func__); @@ -3817,6 +3826,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row userid = s_row->userid; workername = s_row->workername; workinfoid = s_row->workinfoid; + diff = s_row->diff; sharecreatedate = &(s_row->createdate); } else { if (!e_row) { @@ -3843,7 +3853,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row row->userid = userid; STRNCPY(row->workername, workername); row->workinfoid = workinfoid; - zero_sharesummary(row, sharecreatedate); + zero_sharesummary(row, sharecreatedate, diff); row->inserted = false; row->saveaged = false; } @@ -3890,12 +3900,14 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row free(tmp1); row->firstshare.tv_sec = sharecreatedate->tv_sec; row->firstshare.tv_usec = sharecreatedate->tv_usec; + // Don't change lastdiffacc } td = tvdiff(sharecreatedate, &(row->lastshare)); // don't LOGERR '=' in case shares come from ckpool with the same timestamp if (td >= 0.0) { row->lastshare.tv_sec = sharecreatedate->tv_sec; row->lastshare.tv_usec = sharecreatedate->tv_usec; + row->lastdiffacc = diff; } else { char *tmp1, *tmp2; LOGERR("%s(): %s createdate (%s) is < summary lastshare (%s)", @@ -3939,6 +3951,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row params[par++] = bigint_to_buf(row->errorcount, NULL, 0); params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); params[par++] = str_to_buf(row->complete, NULL, 0); MODIFYDATEPARAMS(params, par, row); PARCHK(par, params); @@ -3946,8 +3959,9 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row ins = "insert into sharesummary " "(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," - "sharecount,errorcount,firstshare,lastshare,complete" - MODIFYDATECONTROL ") values (" PQPARAM26 ")"; + "sharecount,errorcount,firstshare,lastshare," + "lastdiffacc,complete" + MODIFYDATECONTROL ") values (" PQPARAM27 ")"; res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); @@ -3989,15 +4003,17 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row params[par++] = double_to_buf(row->sharerej, NULL, 0); params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); params[par++] = str_to_buf(row->complete, NULL, 0); MODIFYUPDATEPARAMS(params, par, row); - PARCHKVAL(par, 20, params); + PARCHKVAL(par, 21, params); upd = "update sharesummary " "set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8," "shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12," - "sharerej=$13,firstshare=$14,lastshare=$15,complete=$16" - ",modifydate=$17,modifyby=$18,modifycode=$19,modifyinet=$20 " + "sharerej=$13,firstshare=$14,lastshare=$15," + "lastdiffacc=$16,complete=$17" + ",modifydate=$18,modifyby=$19,modifycode=$20,modifyinet=$21 " "where userid=$1 and workername=$2 and workinfoid=$3"; res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0); @@ -4071,7 +4087,7 @@ static bool sharesummary_fill(PGconn *conn) SHARESUMMARY *row; char *field; char *sel; - int fields = 18; + int fields = 19; bool ok; LOGDEBUG("%s(): select", __func__); @@ -4080,7 +4096,8 @@ static bool sharesummary_fill(PGconn *conn) sel = "select " "userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," - "sharecount,errorcount,firstshare,lastshare,complete" + "sharecount,errorcount,firstshare,lastshare," + "lastdiffacc,complete" MODIFYDATECONTROL " from sharesummary"; res = PQexec(conn, sel); @@ -4196,6 +4213,11 @@ static bool sharesummary_fill(PGconn *conn) break; TXT_TO_TV("lastshare", field, row->lastshare); + PQ_GET_FLD(res, i, "lastdiffacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc); + PQ_GET_FLD(res, i, "complete", field, ok); if (!ok) break; @@ -5760,19 +5782,19 @@ static bool getdata2() PGconn *conn = dbconnect(); bool ok = true; - if (!(ok = blocks_fill(conn))) + if (!(ok = blocks_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = payments_fill(conn))) + if (!(ok = payments_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = workinfo_fill(conn))) + if (!(ok = workinfo_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = shares_fill())) + if (!(ok = shares_fill()) || everyone_die) goto sukamudai; - if (!(ok = shareerrors_fill())) + if (!(ok = shareerrors_fill()) || everyone_die) goto sukamudai; - if (!(ok = sharesummary_fill(conn))) + if (!(ok = sharesummary_fill(conn)) || everyone_die) goto sukamudai; - if (!(ok = poolstats_fill(conn))) + if (!(ok = poolstats_fill(conn)) || everyone_die) goto sukamudai; ok = userstats_fill(conn); @@ -6037,18 +6059,18 @@ static bool setup_data() workerstatus_store = k_new_store(workerstatus_free); workerstatus_root = new_ktree(); - if (!getdata1()) + if (!getdata1() || everyone_die) return false; db_auths_complete = true; cksem_post(&socketer_sem); - if (!getdata2()) + if (!getdata2() || everyone_die) return false; db_load_complete = true; - if (!reload()) + if (!reload() || everyone_die) return false; workerstatus_ready(); @@ -6589,15 +6611,19 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, double w_hashrate5m, w_hashrate1hr; int64_t w_elapsed; tv_t w_lastshare; + double w_lastdiff; w_hashrate5m = w_hashrate1hr = 0.0; w_elapsed = -1; w_lastshare.tv_sec = 0; + w_lastdiff = 0; ws_item = find_workerstatus(DATA_USERS(u_item)->userid, DATA_WORKERS(w_item)->workername); - if (ws_item) + if (ws_item) { w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->last_share.tv_sec; + w_lastdiff = DATA_WORKERSTATUS(ws_item)->last_diff; + } // find last stored userid record userstats.userid = DATA_USERS(u_item)->userid; @@ -6646,6 +6672,10 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, snprintf(tmp, sizeof(tmp), "w_lastshare%d=%s%c", rows, reply, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); + double_to_buf((int)(w_lastdiff), reply, sizeof(reply)); + snprintf(tmp, sizeof(tmp), "w_lastdiff%d=%s%c", rows, reply, FLDSEP); + APPEND_REALLOC(buf, off, len, tmp); + userstats_workername_root = free_ktree(userstats_workername_root, NULL); K_RUNLOCK(userstats_free); } @@ -8360,7 +8390,7 @@ static bool reload_from(tv_t *start) total = 0; processing = 0; - while (!finished) { + while (!everyone_die && !finished) { LOGWARNING("%s(): processing %s", __func__, filename); processing++; count = 0; @@ -8438,6 +8468,9 @@ static bool reload_from(tv_t *start) processing, processing == 1 ? "" : "s", total, total == 1 ? "" : "s"); + if (everyone_die) + return true; + if (!matched) { ck_wlock(&fpm_lock); if (first_pool_message) { @@ -8512,18 +8545,22 @@ static void *listener(void *arg) rename_proc(pi->sockname); if (!setup_data()) { - everyone_die = true; - LOGEMERG("ABORTING"); + if (!everyone_die) { + LOGEMERG("ABORTING"); + everyone_die = true; + } return NULL; } - K_RLOCK(workqueue_store); - qc = workqueue_store->count; - K_RUNLOCK(workqueue_store); + if (!everyone_die) { + K_RLOCK(workqueue_store); + qc = workqueue_store->count; + K_RUNLOCK(workqueue_store); - LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc); + LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc); - startup_complete = true; + startup_complete = true; + } // Process queued work while (!everyone_die) {
Worker NameDifficultyIdle NotificationsIdle Notification TimeDifficultyIdle NotificationsIdle Notification TimeWork DiffLast ShareHash Rate
'.$ans['workername'.$i].''.$ans['difficultydefault'.$i].''.$nots.''.$ans['idlenotificationtime'.$i].'$ld