Browse Source

ckdb - add block stats confirmation and required sharesummary locking

master
kanoi 10 years ago
parent
commit
32889605e1
  1. 372
      src/ckdb.c

372
src/ckdb.c

@ -47,7 +47,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.9"
#define CKDB_VERSION DB_VERSION"-0.250"
#define CKDB_VERSION DB_VERSION"-0.251"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -63,6 +63,9 @@
#define TRUE_STR "Y"
#define FALSE_STR "N"
#define TRUE_CHR 'Y'
#define FALSE_CHR 'N'
#define coinbase1height(_cb1) _coinbase1height(_cb1, WHERE_FFL_HERE)
#define cmp_height(_cb1a, _cb1b) _cmp_height(_cb1a, _cb1b, WHERE_FFL_HERE)
@ -1298,6 +1301,9 @@ typedef struct blocks {
#define BLOCKS_ORPHAN 'O'
#define BLOCKS_ORPHAN_STR "O"
#define BLOCKS_STATSPENDING FALSE_CHR
#define BLOCKS_STATSCONFIRMED TRUE_CHR
static const char *blocks_new = "New";
static const char *blocks_confirm = "1-Confirm";
static const char *blocks_42 = "42-Confirm";
@ -2299,6 +2305,9 @@ static cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b)
return c;
}
/* TODO: replace a lot of the code for all data types that codes finds,
* each with specific functions for finding, to centralise the finds,
* with passed ctx's */
static K_ITEM *get_workerstatus(int64_t userid, char *workername)
{
WORKERSTATUS workerstatus;
@ -2400,6 +2409,7 @@ static void set_block_share_counters()
/* From the end backwards so we can skip the workinfoid's we don't
* want by jumping back to just before the current worker when the
* workinfoid goes below the limit */
K_RLOCK(sharesummary_free);
ss_item = last_in_ktree(sharesummary_root, ctx);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
@ -2425,6 +2435,7 @@ static void set_block_share_counters()
* since it should always exist
* However, it is simplest to simply create it
* and keep going */
K_RUNLOCK(sharesummary_free);
ws_item = find_workerstatus(sharesummary->userid,
sharesummary->workername,
__FILE__, __func__, __LINE__);
@ -2433,6 +2444,7 @@ static void set_block_share_counters()
sharesummary->workername,
__FILE__, __func__, __LINE__);
}
K_RLOCK(sharesummary_free);
DATA_WORKERSTATUS(workerstatus, ws_item);
}
@ -2448,6 +2460,7 @@ static void set_block_share_counters()
ss_item = prev_in_ktree(ctx);
}
K_RLOCK(sharesummary_free);
}
/* All data is loaded, now update workerstatus fields
@ -2493,8 +2506,10 @@ static void workerstatus_ready()
STRNCPY(looksharesummary.workername, workerstatus->workername);
looksharesummary.workinfoid = MAXID;
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_before_in_ktree(sharesummary_root, &ss_look,
cmp_sharesummary, ss_ctx);
K_RUNLOCK(sharesummary_free);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (tv_newer(&(workerstatus->last_share),
@ -3875,7 +3890,7 @@ static K_ITEM *find_workinfo(int64_t workinfoid)
{
WORKINFO workinfo;
K_TREE_CTX ctx[1];
K_ITEM look;
K_ITEM look, *item;
workinfo.workinfoid = workinfoid;
workinfo.expirydate.tv_sec = default_expiry.tv_sec;
@ -3883,7 +3898,10 @@ static K_ITEM *find_workinfo(int64_t workinfoid)
INIT_WORKINFO(&look);
look.data = (void *)(&workinfo);
return find_in_ktree(workinfo_root, &look, cmp_workinfo, ctx);
K_RLOCK(workinfo_free);
item = find_in_ktree(workinfo_root, &look, cmp_workinfo, ctx);
K_RUNLOCK(workinfo_free);
return item;
}
static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance,
@ -4087,7 +4105,9 @@ static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
ok = true;
ss_tot = ss_already = ss_failed = shares_tot = shares_dumped = 0;
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid == workinfoid) {
ss_tot++;
@ -4168,12 +4188,15 @@ static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
s_item = tmp_item;
}
K_WUNLOCK(shares_free);
K_RLOCK(sharesummary_free);
ss_item = next_in_ktree(ss_ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
if (error[0])
LOGERR("%s(): %s", __func__, error);
}
K_RUNLOCK(sharesummary_free);
if (conned)
PQfinish(conn);
@ -4223,6 +4246,8 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
looksharesummary.workername[0] = '\0';
INIT_SHARESUMMARY(&look);
look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look,
cmp_sharesummary_workinfoid, ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
@ -4242,6 +4267,7 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
ss_item = next_in_ktree(ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
K_RUNLOCK(sharesummary_free);
LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found);
// Don't repeat searching old items to avoid accessing their ram
@ -4277,10 +4303,14 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
to_id = do_id;
wid_count++;
K_RLOCK(sharesummary_free);
while (ss_item && sharesummary->workinfoid == to_id) {
K_RLOCK(sharesummary_free);
ss_item = next_in_ktree(ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
K_RUNLOCK(sharesummary_free);
if (ss_item) {
do_id = sharesummary->workinfoid;
@ -4898,7 +4928,9 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
sharecreatedate = &(e_row->createdate);
}
K_RLOCK(sharesummary_free);
item = find_sharesummary(userid, workername, workinfoid);
K_RUNLOCK(sharesummary_free);
if (item) {
new = false;
DATA_SHARESUMMARY(row, item);
@ -5410,6 +5442,9 @@ static cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b)
return c;
}
/* TODO: and make sure all block searches use these
* or add new ones as required here */
// Must be R or W locked before call - gets current status (default_expiry)
static K_ITEM *find_blocks(int32_t height, char *blockhash)
{
@ -5427,6 +5462,34 @@ static K_ITEM *find_blocks(int32_t height, char *blockhash)
return find_in_ktree(blocks_root, &look, cmp_blocks, ctx);
}
// Must be R or W locked before call
static K_ITEM *find_prev_blocks(int32_t height)
{
BLOCKS lookblocks, *blocks;
K_TREE_CTX ctx[1];
K_ITEM look, *b_item;
/* TODO: For self orphaned (if that ever happens)
* this will find based on blockhash order if it has two,
* not NEW, blocks, which might not find the right one */
lookblocks.height = height;
lookblocks.blockhash[0] = '\0';
lookblocks.expirydate.tv_sec = 0L;
lookblocks.expirydate.tv_usec = 0L;
INIT_BLOCKS(&look);
look.data = (void *)(&lookblocks);
b_item = find_before_in_ktree(blocks_root, &look, cmp_blocks, ctx);
while (b_item) {
DATA_BLOCKS(blocks, b_item);
if (blocks->confirmed[0] != BLOCKS_NEW &&
CURRENT(&(blocks->expirydate)))
return b_item;
b_item = prev_in_ktree(ctx);
}
return NULL;
}
static const char *blocks_confirmed(char *confirmed)
{
switch (confirmed[0]) {
@ -5442,6 +5505,150 @@ static const char *blocks_confirmed(char *confirmed)
return blocks_unknown;
}
static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
double diffacc, double diffinv, double shareacc,
double shareinv, int64_t elapsed,
char *by, char *code, char *inet, tv_t *cd)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res = NULL;
K_TREE_CTX ctx[1];
K_ITEM *b_item, *old_b_item;
BLOCKS *row, *oldblocks;
char hash_dsp[16+1];
char *upd, *ins;
char *params[8 + HISTORYDATECOUNT];
bool ok = false, update_old = false;
int par = 0;
int n;
LOGDEBUG("%s(): confirm", __func__);
dsp_hash(blockhash, hash_dsp, sizeof(hash_dsp));
K_RLOCK(blocks_free);
old_b_item = find_blocks(height, blockhash);
K_RUNLOCK(blocks_free);
if (!old_b_item) {
LOGERR("%s(): Non-existent Block: %s/...%s",
__func__, height, hash_dsp);
return false;
}
DATA_BLOCKS_NULL(oldblocks, old_b_item);
K_WLOCK(blocks_free);
b_item = k_unlink_head(blocks_free);
K_WUNLOCK(blocks_free);
DATA_BLOCKS(row, b_item);
memcpy(row, oldblocks, sizeof(*row));
row->diffacc = diffacc;
row->diffinv = diffinv;
row->shareacc = shareacc;
row->shareinv = shareinv;
row->elapsed = elapsed;
row->statsconfirmed[0] = BLOCKS_STATSCONFIRMED;
row->statsconfirmed[1] = '\0';
HISTORYDATEINIT(row, cd, by, code, inet);
upd = "update blocks set expirydate=$1 where blockhash=$2 and expirydate=$3";
par = 0;
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = str_to_buf(row->blockhash, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
PQclear(res);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
}
update_old = true;
for (n = 0; n < par; n++)
free(params[n]);
par = 0;
params[par++] = str_to_buf(row->blockhash, NULL, 0);
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffinv, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->shareinv, NULL, 0);
params[par++] = bigint_to_buf(row->elapsed, NULL, 0);
params[par++] = str_to_buf(row->statsconfirmed, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHKVAL(par, 8 + HISTORYDATECOUNT, params); // 13 as per ins
ins = "insert into blocks "
"(height,blockhash,workinfoid,userid,workername,"
"clientid,enonce1,nonce2,nonce,reward,confirmed,"
"diffacc,diffinv,shareacc,shareinv,elapsed,"
"statsconfirmed"
HISTORYDATECONTROL ") select "
"height,blockhash,workinfoid,userid,workername,"
"clientid,enonce1,nonce2,nonce,reward,confirmed,"
"$3,$4,$5,$6,$7,$8,"
"$9,$10,$11,$12,$13 from blocks where "
"blockhash=$1 and expirydate=$2";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
}
res = PQexec(conn, "Commit", CKPQ_WRITE);
ok = true;
unparam:
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
if (conned)
PQfinish(conn);
K_WLOCK(blocks_free);
if (!ok)
k_add_head(blocks_free, b_item);
else {
if (update_old) {
blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx);
copy_tv(&(oldblocks->expirydate), cd);
blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks);
}
blocks_root = add_to_ktree(blocks_root, b_item, cmp_blocks);
k_add_head(blocks_store, b_item);
}
K_WUNLOCK(blocks_free);
return ok;
}
static bool blocks_add(PGconn *conn, char *height, char *blockhash,
char *confirmed, char *workinfoid, char *username,
char *workername, char *clientid, char *enonce1,
@ -9340,6 +9547,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
tvs_to_buf(&last_bc, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "lastbc=%s%c", reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
K_RLOCK(workinfo_free);
if (workinfo_current) {
WORKINFO *wic;
int32_t hi;
@ -9352,6 +9560,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
snprintf(tmp, sizeof(tmp), "lastheight=?%c", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
}
K_RUNLOCK(workinfo_free);
} else {
snprintf(tmp, sizeof(tmp), "lastbc=?%clastheight=?%c",
FLDSEP, FLDSEP);
@ -9672,9 +9881,11 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
looksharesummary.workername[0] = '\0';
INIT_SHARESUMMARY(&ss_look);
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look,
cmp_sharesummary_workinfoid, ctx);
if (!ss_item) {
K_RUNLOCK(sharesummary_free);
snprintf(reply, siz,
"ERR.no shares found with or before "
"workinfo %"PRId64,
@ -9697,6 +9908,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
if (allow_aged)
break;
default:
K_RUNLOCK(sharesummary_free);
snprintf(reply, siz,
"ERR.sharesummary1 not ready in "
"workinfo %"PRId64,
@ -9724,6 +9936,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
if (allow_aged)
break;
default:
K_RUNLOCK(sharesummary_free);
snprintf(reply, siz,
"ERR.sharesummary2 not ready in "
"workinfo %"PRId64,
@ -9738,6 +9951,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
ss_item = prev_in_ktree(ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
K_RUNLOCK(sharesummary_free);
if (total == 0.0) {
snprintf(reply, siz,
@ -10304,6 +10518,139 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
return cmds[*which_cmds].cmd_val;
}
static void summarise_blocks()
{
K_ITEM *b_item, *b_prev, *wi_item, ss_look, *ss_item;
K_TREE_CTX ctx[1], ss_ctx[1];
double diffacc, diffinv, shareacc, shareinv;
tv_t now, elapsed_start, elapsed_finish;
int64_t elapsed, wi_start, wi_finish;
BLOCKS *blocks, *prev_blocks;
WORKINFO *prev_workinfo;
SHARESUMMARY looksharesummary, *sharesummary;
int32_t hi;
bool ok;
setnow(&now);
K_RLOCK(blocks_free);
// Find the oldest, stats pending, not new, block
b_item = first_in_ktree(blocks_root, ctx);
while (b_item) {
DATA_BLOCKS(blocks, b_item);
if (CURRENT(&(blocks->expirydate)) &&
blocks->statsconfirmed[0] == BLOCKS_STATSPENDING &&
blocks->confirmed[0] != BLOCKS_NEW)
break;
b_item = next_in_ktree(ctx);
}
K_RUNLOCK(blocks_free);
// None
if (!b_item)
return;
wi_finish = blocks->workinfoid;
hi = 0;
K_RLOCK(workinfo_free);
if (workinfo_current) {
WORKINFO *wic;
DATA_WORKINFO(wic, workinfo_current);
hi = coinbase1height(wic->coinbase1);
}
K_RUNLOCK(workinfo_free);
// Wait at least for the (badly named) '2nd' confirm
if (hi == 0 || blocks->height >= (hi - 1))
return;
diffacc = diffinv = shareacc = shareinv = 0;
elapsed = 0;
K_RLOCK(blocks_free);
b_prev = find_prev_blocks(blocks->height);
K_RUNLOCK(blocks_free);
if (!b_prev) {
wi_start = 0;
elapsed_start.tv_sec = elapsed_start.tv_usec = 0L;
} else {
DATA_BLOCKS(prev_blocks, b_prev);
wi_start = prev_blocks->workinfoid;
wi_item = find_workinfo(wi_start);
if (!wi_item) {
// This will repeat until fixed ...
LOGERR("%s() block %d, but prev %d wid "
"%"PRId64" is missing",
__func__, blocks->height,
prev_blocks->height,
prev_blocks->workinfoid);
return;
}
DATA_WORKINFO(prev_workinfo, wi_item);
copy_tv(&elapsed_start, &(prev_workinfo->createdate));
}
elapsed_finish.tv_sec = elapsed_finish.tv_usec = 0L;
// Add up the sharesummaries, abort if any SUMMARY_NEW
looksharesummary.workinfoid = wi_finish;
looksharesummary.userid = MAXID;
looksharesummary.workername[0] = '\0';
INIT_SHARESUMMARY(&ss_look);
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look,
cmp_sharesummary_workinfoid, ss_ctx);
if (!ss_item) {
K_RUNLOCK(sharesummary_free);
// This will repeat each call here until fixed ...
LOGERR("%s() block %d, prev %d no sharesummaries "
"on or before %"PRId64,
__func__, blocks->height,
prev_blocks->height, wi_finish);
return;
}
DATA_SHARESUMMARY(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid > wi_start) {
if (sharesummary->complete[0] == SUMMARY_NEW) {
// Not aged yet
K_RUNLOCK(sharesummary_free);
return;
}
if (elapsed_start.tv_sec == 0 ||
!tv_newer(&elapsed_start, &(sharesummary->firstshare))) {
copy_tv(&elapsed_start, &(sharesummary->firstshare));
}
if (tv_newer(&elapsed_finish, &(sharesummary->lastshare)))
copy_tv(&elapsed_finish, &(sharesummary->lastshare));
diffacc += sharesummary->diffacc;
diffinv += sharesummary->diffsta + sharesummary->diffdup +
sharesummary->diffhi + sharesummary-> diffrej;
shareacc += sharesummary->shareacc;
shareinv += sharesummary->sharesta + sharesummary->sharedup +
sharesummary->sharehi + sharesummary-> sharerej;
ss_item = prev_in_ktree(ss_ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
K_RUNLOCK(sharesummary_free);
elapsed = (int64_t)(tvdiff(&elapsed_finish, &elapsed_start) + 0.5);
ok = blocks_stats(NULL, blocks->height, blocks->blockhash,
diffacc, diffinv, shareacc, shareinv, elapsed,
by_default, (char *)__func__, inet_default, &now);
if (ok) {
LOGWARNING("%s() block %d, stats confirmed "
"%0.f/%.0f/%.0f/%.0f/%"PRId64,
__func__, blocks->height,
diffacc, diffinv, shareacc, shareinv, elapsed);
} else {
LOGERR("%s() block %d, failed to confirm stats",
__func__, blocks->height);
}
}
static void summarise_poolstats()
{
// TODO
@ -10508,11 +10855,15 @@ static void *summariser(__maybe_unused void *arg)
cksleep_ms(42);
while (!everyone_die) {
sleep(13);
sleep(5);
if (!everyone_die)
summarise_blocks();
sleep(4);
if (!everyone_die)
summarise_poolstats();
sleep(4);
if (!everyone_die)
summarise_userstats();
}
@ -11834,9 +12185,7 @@ static void confirm_summaries()
confirm_reload();
}
#define RELOADFILES "ckdb"
static void check_restore_dir()
static void check_restore_dir(char *name)
{
struct stat statbuf;
@ -11854,14 +12203,15 @@ static void check_restore_dir()
if (stat(restorefrom, &statbuf))
quit(1, "ERR: -r '%s' directory doesn't exist", restorefrom);
restorefrom = realloc(restorefrom, strlen(restorefrom)+sizeof(RELOADFILES));
restorefrom = realloc(restorefrom, strlen(restorefrom)+strlen(name)+1);
if (!restorefrom)
quithere(1, "OOM");
strcat(restorefrom, RELOADFILES);
strcat(restorefrom, name);
}
static struct option long_options[] = {
{ "dbprefix", required_argument, 0, 'b' },
{ "config", required_argument, 0, 'c' },
{ "dbname", required_argument, 0, 'd' },
{ "help", no_argument, 0, 'h' },
@ -11986,8 +12336,6 @@ int main(int argc, char **argv)
else
dbcode = "";
check_restore_dir();
if (!db_name)
db_name = "ckdb";
if (!db_user)
@ -11998,6 +12346,8 @@ int main(int argc, char **argv)
prctl(PR_SET_NAME, buf, 0, 0, 0);
memset(buf, 0, 15);
check_restore_dir(ckp.name);
if (!ckp.config) {
ckp.config = strdup(ckp.name);
realloc_strcat(&ckp.config, ".conf");

Loading…
Cancel
Save