Browse Source

ckdb - store/display last share diff and expedite shutdown - DB v0.7

master
kanoi 10 years ago
parent
commit
2bbb7a6f75
  1. 27
      pool/base.php
  2. 14
      pool/page_workers.php
  3. 3
      sql/ckdb.sql
  4. 28
      sql/v0.6-v0.7.sql
  5. 129
      src/ckdb.c

27
pool/base.php

@ -24,6 +24,33 @@ function btcfmt($amt)
return number_format($amt, 8);
}
#
global $sipre;
$sipre = array('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y');
function siprefmt($amt)
{
global $sipre;
$pref = floor(log10($amt)/3);
if ($pref < 0)
$pref = 0;
if ($pref >= count($sipre))
$pref = count($sipre)-1;
$amt = round(100.0 * $amt / pow(10, $pref * 3)) / 100;
if ($amt > 999.99 && $pref < (count($sipre)-1))
{
$amt /= 1000;
$pref++;
}
return number_format($amt, 2).$sipre[$pref];
}
#
function difffmt($amt)
{
return siprefmt($amt);
}
#
function emailStr($str)
{
$all = '/[^A-Za-z0-9_+\.@-]/'; // no space = trim

14
pool/page_workers.php

@ -10,9 +10,10 @@ function doworker($data, $user)
$pg .= "<table callpadding=0 cellspacing=0 border=0>\n";
$pg .= "<tr class=title>";
$pg .= "<td class=dl>Worker Name</td>";
$pg .= "<td class=dr>Difficulty</td>";
$pg .= "<td class=dc>Idle Notifications</td>";
$pg .= "<td class=dr>Idle Notification Time</td>";
// $pg .= "<td class=dr>Difficulty</td>";
// $pg .= "<td class=dc>Idle Notifications</td>";
// $pg .= "<td class=dr>Idle Notification Time</td>";
$pg .= "<td class=dr>Work Diff</td>";
$pg .= "<td class=dr>Last Share</td>";
$pg .= "<td class=dr>Hash Rate</td>";
$pg .= "</tr>\n";
@ -28,6 +29,7 @@ function doworker($data, $user)
$pg .= "<tr class=$row>";
$pg .= '<td class=dl>'.$ans['workername'.$i].'</td>';
/*
$pg .= '<td class=dr>'.$ans['difficultydefault'.$i].'</td>';
$nots = $ans['idlenotificationenabled'.$i];
switch ($nots)
@ -41,6 +43,12 @@ function doworker($data, $user)
}
$pg .= '<td class=dc>'.$nots.'</td>';
$pg .= '<td class=dr>'.$ans['idlenotificationtime'.$i].'</td>';
*/
if ($ans['w_lastdiff'.$i] > 0)
$ld = difffmt($ans['w_lastdiff'.$i]);
else
$ld = '&nbsp;';
$pg .= "<td class=dr>$ld</td>";
$lst = $ans['STAMP'] - $ans['w_lastshare'.$i];
if ($lst < 60)
$lstdes = $lst.'s';

3
sql/ckdb.sql

@ -216,6 +216,7 @@ CREATE TABLE sharesummary ( -- per workinfo for each user+worker
errorcount bigint NOT NULL,
firstshare timestamp with time zone NOT NULL,
lastshare timestamp with time zone NOT NULL,
lastdiffacc float NOT NULL,
complete char NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) NOT NULL,
@ -390,4 +391,4 @@ CREATE TABLE version (
PRIMARY KEY (vlock)
);
insert into version (vlock,version) values (1,'0.6');
insert into version (vlock,version) values (1,'0.7');

28
sql/v0.6-v0.7.sql

@ -0,0 +1,28 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.7' where vlock=1 and version='0.6';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.6" - found "%"', ver;
END $$;
ALTER TABLE ONLY sharesummary
ADD COLUMN lastdiffacc float DEFAULT 0;
ALTER TABLE ONLY sharesummary
ALTER COLUMN lastdiffacc DROP DEFAULT;
END transaction;

129
src/ckdb.c

@ -45,7 +45,7 @@
*/
#define DB_VLOCK "1"
#define DB_VERSION "0.6"
#define DB_VERSION "0.7"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -599,16 +599,15 @@ static const tv_t date_begin = { DATE_BEGIN, 0L };
#define PQPARAM6 "$1,$2,$3,$4,$5,$6"
#define PQPARAM7 "$1,$2,$3,$4,$5,$6,$7"
#define PQPARAM8 "$1,$2,$3,$4,$5,$6,$7,$8"
#define PQPARAM9 "$1,$2,$3,$4,$5,$6,$7,$8,$9"
#define PQPARAM10 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10"
#define PQPARAM11 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11"
#define PQPARAM12 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12"
#define PQPARAM13 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13"
#define PQPARAM14 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14"
#define PQPARAM15 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15"
#define PQPARAM16 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16"
#define PQPARAM17 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17"
#define PQPARAM26 "$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26"
#define PQPARAM9 PQPARAM8 ",$9"
#define PQPARAM10 PQPARAM8 ",$9,$10"
#define PQPARAM11 PQPARAM8 ",$9,$10,$11"
#define PQPARAM12 PQPARAM8 ",$9,$10,$11,$12"
#define PQPARAM13 PQPARAM8 ",$9,$10,$11,$12,$13"
#define PQPARAM14 PQPARAM8 ",$9,$10,$11,$12,$13,$14"
#define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15"
#define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16"
#define PQPARAM27 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27"
#define PARCHK(_par, _params) do { \
if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \
@ -978,7 +977,7 @@ static K_LIST *shares_free;
static K_STORE *shares_store;
// SHAREERRORS shareerrors.id.json={...}
typedef struct shareerrorss {
typedef struct shareerrors {
int64_t workinfoid;
int64_t userid;
char workername[TXT_BIG+1];
@ -1020,6 +1019,7 @@ typedef struct sharesummary {
bool reset; // non-DB field
tv_t firstshare;
tv_t lastshare;
double lastdiffacc;
char complete[TXT_FLAG+1];
MODIFYDATECONTROLFIELDS;
} SHARESUMMARY;
@ -1269,6 +1269,7 @@ typedef struct workerstatus {
char workername[TXT_BIG+1];
tv_t last_auth;
tv_t last_share;
double last_diff;
tv_t last_stats;
tv_t last_idle;
} WORKERSTATUS;
@ -1942,7 +1943,7 @@ static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool
static cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b);
static cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b);
/* All data is loaded, now update workerstatus last_share, last_idle, last_stats
/* All data is loaded, now update workerstatus fields
* Since shares are all part of a sharesummary, there's no need to search shares
*/
static void workerstatus_ready()
@ -1987,6 +1988,8 @@ static void workerstatus_ready()
&(DATA_SHARESUMMARY(ss_item)->lastshare))) {
copy_tv(&(DATA_WORKERSTATUS(ws_item)->last_share),
&(DATA_SHARESUMMARY(ss_item)->lastshare));
DATA_WORKERSTATUS(ws_item)->last_diff =
DATA_SHARESUMMARY(ss_item)->lastdiffacc;
}
}
@ -2010,8 +2013,10 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta
if (startup_complete && shares) {
item = find_create_workerstatus(shares->userid, shares->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->last_share), &(shares->createdate)))
if (tv_newer(&(row->last_share), &(shares->createdate))) {
copy_tv(&(row->last_share), &(shares->createdate));
row->last_diff = shares->diff;
}
}
if (startup_complete && userstats) {
@ -2029,8 +2034,10 @@ static void workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *usersta
if (startup_complete && sharesummary) {
item = find_create_workerstatus(sharesummary->userid, sharesummary->workername);
row = DATA_WORKERSTATUS(item);
if (tv_newer(&(row->last_share), &(sharesummary->lastshare)))
if (tv_newer(&(row->last_share), &(sharesummary->lastshare))) {
copy_tv(&(row->last_share), &(sharesummary->lastshare));
row->last_diff = sharesummary->lastdiffacc;
}
}
}
@ -3495,7 +3502,7 @@ static cmp_t cmp_shares(K_ITEM *a, K_ITEM *b)
return c;
}
static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd)
static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff)
{
row->diffacc = row->diffsta = row->diffdup = row->diffhi =
row->diffrej = row->shareacc = row->sharesta = row->sharedup =
@ -3506,6 +3513,7 @@ static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd)
row->firstshare.tv_usec = cd->tv_usec;
row->lastshare.tv_sec = row->firstshare.tv_sec;
row->lastshare.tv_usec = row->firstshare.tv_usec;
row->lastdiffacc = diff;
row->complete[0] = SUMMARY_NEW;
row->complete[1] = '\0';
}
@ -3572,7 +3580,7 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor
}
if (!DATA_SHARESUMMARY(ss_item)->reset) {
zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd);
zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd, shares->diff);
DATA_SHARESUMMARY(ss_item)->reset = true;
}
}
@ -3682,7 +3690,7 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
}
if (!DATA_SHARESUMMARY(ss_item)->reset) {
zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd);
zero_sharesummary(DATA_SHARESUMMARY(ss_item), cd, 0.0);
DATA_SHARESUMMARY(ss_item)->reset = true;
}
}
@ -3786,12 +3794,13 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
K_ITEM *item;
char *ins, *upd;
bool ok = false, new;
char *params[18 + MODIFYDATECOUNT];
char *params[19 + MODIFYDATECOUNT];
int n, par;
int64_t userid, workinfoid;
char *workername;
tv_t *sharecreatedate;
bool must_update = false, conned = false;
double diff = 0;
LOGDEBUG("%s(): update", __func__);
@ -3817,6 +3826,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
userid = s_row->userid;
workername = s_row->workername;
workinfoid = s_row->workinfoid;
diff = s_row->diff;
sharecreatedate = &(s_row->createdate);
} else {
if (!e_row) {
@ -3843,7 +3853,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
row->userid = userid;
STRNCPY(row->workername, workername);
row->workinfoid = workinfoid;
zero_sharesummary(row, sharecreatedate);
zero_sharesummary(row, sharecreatedate, diff);
row->inserted = false;
row->saveaged = false;
}
@ -3890,12 +3900,14 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
free(tmp1);
row->firstshare.tv_sec = sharecreatedate->tv_sec;
row->firstshare.tv_usec = sharecreatedate->tv_usec;
// Don't change lastdiffacc
}
td = tvdiff(sharecreatedate, &(row->lastshare));
// don't LOGERR '=' in case shares come from ckpool with the same timestamp
if (td >= 0.0) {
row->lastshare.tv_sec = sharecreatedate->tv_sec;
row->lastshare.tv_usec = sharecreatedate->tv_usec;
row->lastdiffacc = diff;
} else {
char *tmp1, *tmp2;
LOGERR("%s(): %s createdate (%s) is < summary lastshare (%s)",
@ -3939,6 +3951,7 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYDATEPARAMS(params, par, row);
PARCHK(par, params);
@ -3946,8 +3959,9 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
ins = "insert into sharesummary "
"(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,complete"
MODIFYDATECONTROL ") values (" PQPARAM26 ")";
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL ") values (" PQPARAM27 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0);
rescode = PQresultStatus(res);
@ -3989,15 +4003,17 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 20, params);
PARCHKVAL(par, 21, params);
upd = "update sharesummary "
"set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8,"
"shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12,"
"sharerej=$13,firstshare=$14,lastshare=$15,complete=$16"
",modifydate=$17,modifyby=$18,modifycode=$19,modifyinet=$20 "
"sharerej=$13,firstshare=$14,lastshare=$15,"
"lastdiffacc=$16,complete=$17"
",modifydate=$18,modifyby=$19,modifycode=$20,modifyinet=$21 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0);
@ -4071,7 +4087,7 @@ static bool sharesummary_fill(PGconn *conn)
SHARESUMMARY *row;
char *field;
char *sel;
int fields = 18;
int fields = 19;
bool ok;
LOGDEBUG("%s(): select", __func__);
@ -4080,7 +4096,8 @@ static bool sharesummary_fill(PGconn *conn)
sel = "select "
"userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,complete"
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL
" from sharesummary";
res = PQexec(conn, sel);
@ -4196,6 +4213,11 @@ static bool sharesummary_fill(PGconn *conn)
break;
TXT_TO_TV("lastshare", field, row->lastshare);
PQ_GET_FLD(res, i, "lastdiffacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc);
PQ_GET_FLD(res, i, "complete", field, ok);
if (!ok)
break;
@ -5760,19 +5782,19 @@ static bool getdata2()
PGconn *conn = dbconnect();
bool ok = true;
if (!(ok = blocks_fill(conn)))
if (!(ok = blocks_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = payments_fill(conn)))
if (!(ok = payments_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = workinfo_fill(conn)))
if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = shares_fill()))
if (!(ok = shares_fill()) || everyone_die)
goto sukamudai;
if (!(ok = shareerrors_fill()))
if (!(ok = shareerrors_fill()) || everyone_die)
goto sukamudai;
if (!(ok = sharesummary_fill(conn)))
if (!(ok = sharesummary_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = poolstats_fill(conn)))
if (!(ok = poolstats_fill(conn)) || everyone_die)
goto sukamudai;
ok = userstats_fill(conn);
@ -6037,18 +6059,18 @@ static bool setup_data()
workerstatus_store = k_new_store(workerstatus_free);
workerstatus_root = new_ktree();
if (!getdata1())
if (!getdata1() || everyone_die)
return false;
db_auths_complete = true;
cksem_post(&socketer_sem);
if (!getdata2())
if (!getdata2() || everyone_die)
return false;
db_load_complete = true;
if (!reload())
if (!reload() || everyone_die)
return false;
workerstatus_ready();
@ -6589,15 +6611,19 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
double w_hashrate5m, w_hashrate1hr;
int64_t w_elapsed;
tv_t w_lastshare;
double w_lastdiff;
w_hashrate5m = w_hashrate1hr = 0.0;
w_elapsed = -1;
w_lastshare.tv_sec = 0;
w_lastdiff = 0;
ws_item = find_workerstatus(DATA_USERS(u_item)->userid,
DATA_WORKERS(w_item)->workername);
if (ws_item)
if (ws_item) {
w_lastshare.tv_sec = DATA_WORKERSTATUS(ws_item)->last_share.tv_sec;
w_lastdiff = DATA_WORKERSTATUS(ws_item)->last_diff;
}
// find last stored userid record
userstats.userid = DATA_USERS(u_item)->userid;
@ -6646,6 +6672,10 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
snprintf(tmp, sizeof(tmp), "w_lastshare%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf((int)(w_lastdiff), reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "w_lastdiff%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
K_RUNLOCK(userstats_free);
}
@ -8360,7 +8390,7 @@ static bool reload_from(tv_t *start)
total = 0;
processing = 0;
while (!finished) {
while (!everyone_die && !finished) {
LOGWARNING("%s(): processing %s", __func__, filename);
processing++;
count = 0;
@ -8438,6 +8468,9 @@ static bool reload_from(tv_t *start)
processing, processing == 1 ? "" : "s",
total, total == 1 ? "" : "s");
if (everyone_die)
return true;
if (!matched) {
ck_wlock(&fpm_lock);
if (first_pool_message) {
@ -8512,18 +8545,22 @@ static void *listener(void *arg)
rename_proc(pi->sockname);
if (!setup_data()) {
everyone_die = true;
LOGEMERG("ABORTING");
if (!everyone_die) {
LOGEMERG("ABORTING");
everyone_die = true;
}
return NULL;
}
K_RLOCK(workqueue_store);
qc = workqueue_store->count;
K_RUNLOCK(workqueue_store);
if (!everyone_die) {
K_RLOCK(workqueue_store);
qc = workqueue_store->count;
K_RUNLOCK(workqueue_store);
LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc);
LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc);
startup_complete = true;
startup_complete = true;
}
// Process queued work
while (!everyone_die) {

Loading…
Cancel
Save