Browse Source

ckdb - add miningpayouts db code and a pplns stats generator

master
kanoi 10 years ago
parent
commit
24c287b54f
  1. 473
      src/ckdb.c

473
src/ckdb.c

@ -47,7 +47,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.7" #define DB_VERSION "0.7"
#define CKDB_VERSION DB_VERSION"-0.54" #define CKDB_VERSION DB_VERSION"-0.66"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -447,6 +447,9 @@ enum data_type {
static const tv_t default_expiry = { DEFAULT_EXPIRY, 0L }; static const tv_t default_expiry = { DEFAULT_EXPIRY, 0L };
// No actual need to test tv_usec
#define CURRENT(_tv) (((_tv)->tv_sec == DEFAULT_EXPIRY) ? true : false)
// 31-Dec-9999 23:59:59+00 // 31-Dec-9999 23:59:59+00
#define DATE_S_EOT 253402300799L #define DATE_S_EOT 253402300799L
#define DATE_uS_EOT 0L #define DATE_uS_EOT 0L
@ -776,6 +779,7 @@ enum cmd_values {
CMD_HOMEPAGE, CMD_HOMEPAGE,
CMD_DSP, CMD_DSP,
CMD_STATS, CMD_STATS,
CMD_PPLNS,
CMD_END CMD_END
}; };
@ -1191,7 +1195,6 @@ static K_TREE *blocks_root;
static K_LIST *blocks_free; static K_LIST *blocks_free;
static K_STORE *blocks_store; static K_STORE *blocks_store;
/*
// MININGPAYOUTS // MININGPAYOUTS
typedef struct miningpayouts { typedef struct miningpayouts {
int64_t miningpayoutid; int64_t miningpayoutid;
@ -1210,6 +1213,7 @@ static K_TREE *miningpayouts_root;
static K_LIST *miningpayouts_free; static K_LIST *miningpayouts_free;
static K_STORE *miningpayouts_store; static K_STORE *miningpayouts_store;
/*
// EVENTLOG // EVENTLOG
typedef struct eventlog { typedef struct eventlog {
int64_t eventlogid; int64_t eventlogid;
@ -2724,7 +2728,7 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername,
conn = dbconnect(); conn = dbconnect();
conned = true; conned = true;
} }
row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet); row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet);
if (row->workerid == 0) if (row->workerid == 0)
goto unitem; goto unitem;
@ -5103,6 +5107,219 @@ void blocks_reload()
PQfinish(conn); PQfinish(conn);
} }
/* order by height asc,userid asc,expirydate asc
* i.e. only one payout amount per block per user */
static cmp_t cmp_miningpayouts(K_ITEM *a, K_ITEM *b)
{
cmp_t c = CMP_INT(DATA_MININGPAYOUTS(a)->height,
DATA_MININGPAYOUTS(b)->height);
if (c == 0) {
c = CMP_BIGINT(DATA_MININGPAYOUTS(a)->userid,
DATA_MININGPAYOUTS(b)->userid);
if (c == 0) {
c = CMP_TV(DATA_MININGPAYOUTS(a)->expirydate,
DATA_MININGPAYOUTS(b)->expirydate);
}
}
return c;
}
__maybe_unused static bool miningpayouts_add(PGconn *conn, char *username, char *height,
char *blockhash, char *amount, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
K_ITEM *m_item, *u_item;
bool ok = false;
int n;
MININGPAYOUTS *row;
char *ins;
char *params[5 + HISTORYDATECOUNT];
int par;
LOGDEBUG("%s(): add", __func__);
K_WLOCK(miningpayouts_free);
m_item = k_unlink_head(miningpayouts_free);
K_WUNLOCK(miningpayouts_free);
row = DATA_MININGPAYOUTS(m_item);
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
row->miningpayoutid = nextid(conn, "miningpayoutid", (int64_t)1, cd, by, code, inet);
if (row->miningpayoutid == 0)
goto unitem;
K_RLOCK(users_free);
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item)
goto unitem;
row->userid = DATA_USERS(u_item)->userid;
TXT_TO_INT("height", height, row->height);
STRNCPY(row->blockhash, blockhash);
TXT_TO_BIGINT("amount", amount, row->amount);
HISTORYDATEINIT(row, cd, by, code, inet);
HISTORYDATETRANSFER(trf_root, row);
par = 0;
params[par++] = bigint_to_buf(row->miningpayoutid, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = int_to_buf(row->height, NULL, 0);
params[par++] = str_to_buf(row->blockhash, NULL, 0);
params[par++] = bigint_to_buf(row->amount, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into miningpayouts "
"(miningpayoutid,userid,height,blockhash,amount"
HISTORYDATECONTROL ") values (" PQPARAM10 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
unitem:
if (conned)
PQfinish(conn);
K_WLOCK(miningpayouts_free);
if (!ok)
k_add_head(miningpayouts_free, m_item);
else {
miningpayouts_root = add_to_ktree(miningpayouts_root, m_item, cmp_miningpayouts);
k_add_head(miningpayouts_store, m_item);
}
K_WUNLOCK(miningpayouts_free);
return ok;
}
static bool miningpayouts_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
MININGPAYOUTS *row;
char *params[1];
int par;
char *field;
char *sel;
int fields = 5;
bool ok;
LOGDEBUG("%s(): select", __func__);
sel = "select "
"miningpayoutid,userid,height,blockhash,amount"
HISTORYDATECONTROL
" from miningpayouts where expirydate=$1";
par = 0;
params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
return false;
}
n = PQnfields(res);
if (n != (fields + HISTORYDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n);
PQclear(res);
return false;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(miningpayouts_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(miningpayouts_free);
row = DATA_MININGPAYOUTS(item);
PQ_GET_FLD(res, i, "miningpayoutid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("miningpayoutid", field, row->miningpayoutid);
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "height", field, ok);
if (!ok)
break;
TXT_TO_INT("height", field, row->height);
PQ_GET_FLD(res, i, "blockhash", field, ok);
if (!ok)
break;
TXT_TO_STR("blockhash", field, row->blockhash);
PQ_GET_FLD(res, i, "amount", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("amount", field, row->amount);
HISTORYDATEFLDS(res, i, row, ok);
if (!ok)
break;
miningpayouts_root = add_to_ktree(miningpayouts_root, item, cmp_miningpayouts);
k_add_head(miningpayouts_store, item);
tick();
}
if (!ok)
k_add_head(miningpayouts_free, item);
K_WUNLOCK(miningpayouts_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d miningpayout records", __func__, n);
}
return ok;
}
void miningpayouts_reload()
{
PGconn *conn = dbconnect();
K_WLOCK(miningpayouts_free);
miningpayouts_root = free_ktree(miningpayouts_root, NULL);
k_list_transfer_to_head(miningpayouts_store, miningpayouts_free);
K_WUNLOCK(miningpayouts_free);
miningpayouts_fill(conn);
PQfinish(conn);
}
// order by userid asc,createdate asc,authid asc,expirydate desc // order by userid asc,createdate asc,authid asc,expirydate desc
static cmp_t cmp_auths(K_ITEM *a, K_ITEM *b) static cmp_t cmp_auths(K_ITEM *a, K_ITEM *b)
{ {
@ -6420,6 +6637,11 @@ static void alloc_storage()
blocks_store = k_new_store(blocks_free); blocks_store = k_new_store(blocks_free);
blocks_root = new_ktree(); blocks_root = new_ktree();
miningpayouts_free = k_new_list("MiningPayouts", sizeof(MININGPAYOUTS),
ALLOC_MININGPAYOUTS, LIMIT_MININGPAYOUTS, true);
miningpayouts_store = k_new_store(miningpayouts_free);
miningpayouts_root = new_ktree();
auths_free = k_new_list("Auths", sizeof(AUTHS), auths_free = k_new_list("Auths", sizeof(AUTHS),
ALLOC_AUTHS, LIMIT_AUTHS, true); ALLOC_AUTHS, LIMIT_AUTHS, true);
auths_store = k_new_store(auths_free); auths_store = k_new_store(auths_free);
@ -7844,6 +8066,248 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
return buf; return buf;
} }
// order by userid asc
static cmp_t cmp_mu(K_ITEM *a, K_ITEM *b)
{
return CMP_BIGINT(DATA_MININGPAYOUTS(a)->userid,
DATA_MININGPAYOUTS(b)->userid);
}
static K_TREE *upd_add_mu(K_TREE *mu_root, K_STORE *mu_store, int64_t userid, int64_t diffacc)
{
MININGPAYOUTS miningpayouts;
K_ITEM look, *mu_item;
K_TREE_CTX ctx[1];
miningpayouts.userid = userid;
look.data = (void *)(&miningpayouts);
mu_item = find_in_ktree(mu_root, &look, cmp_mu, ctx);
if (mu_item)
DATA_MININGPAYOUTS(mu_item)->amount += diffacc;
else {
K_WLOCK(mu_store);
mu_item = k_unlink_head(miningpayouts_free);
DATA_MININGPAYOUTS(mu_item)->userid = userid;
DATA_MININGPAYOUTS(mu_item)->amount = diffacc;
mu_root = add_to_ktree(mu_root, mu_item, cmp_mu);
k_add_head(mu_store, mu_item);
K_WUNLOCK(mu_store);
}
return mu_root;
}
/* Find the block_workinfoid of the block requested
then add all it's diffacc shares
then keep stepping back shares until diffacc_total matches or exceeds
the blocks network difficulty (block_ndiff) - this is begin_workinfoid
(also summarising diffacc per user)
then keep stepping back until we complete the current begin_workinfoid
(also summarising diffacc per user)
This will give us the total number of diff1 shares (diffacc_total)
to use for the payment calculations
The pplns_elapsed time of the shares is from the createdate of the
begin_workinfoid that has shares accounted to the total,
up to the createdate of the block
The user average hashrate would be:
diffacc_user * 2^32 / pplns_elapsed
PPLNS fraction of the block would be:
diffacc_user / diffacc_total
*/
static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd, K_TREE *trf_root)
{
char reply[1024], tmp[1024], *buf;
size_t siz = sizeof(reply);
K_ITEM look, *i_height, *b_item, *w_item, *ss_item;
K_ITEM *mu_item, *wb_item, *u_item;
SHARESUMMARY sharesummary;
BLOCKS blocks;
K_TREE *mu_root;
K_STORE *mu_store;
int32_t height;
int64_t workinfoid;
int64_t end_workinfoid;
int64_t begin_workinfoid;
tv_t cd, begin_tv, end_tv;
K_TREE_CTX ctx[1];
double ndiff, total, elapsed;
char ndiffbin[TXT_SML+1];
size_t len, off;
int rows;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_height = require_name(trf_root, "height", 1, NULL, reply, siz);
if (!i_height)
return strdup(reply);
TXT_TO_INT("height", DATA_TRANSFER(i_height)->data, height);
cd.tv_sec = cd.tv_usec = 0L;
blocks.height = height + 1;
blocks.blockhash[0] = '\0';
look.data = (void *)(&blocks);
b_item = find_before_in_ktree(blocks_root, &look, cmp_blocks, ctx);
if (!b_item) {
snprintf(reply, siz, "ERR.no block height %d", height);
return strdup(reply);
}
while (b_item && DATA_BLOCKS(b_item)->height == height) {
if (DATA_BLOCKS(b_item)->confirmed[0] == BLOCKS_CONFIRM)
break;
b_item = prev_in_ktree(ctx);
}
if (!b_item || DATA_BLOCKS(b_item)->height != height) {
snprintf(reply, siz, "ERR.unconfirmed block %d", height);
return strdup(reply);
}
workinfoid = DATA_BLOCKS(b_item)->workinfoid;
copy_tv(&end_tv, &(DATA_BLOCKS(b_item)->createdate));
w_item = find_workinfo(workinfoid);
if (!w_item) {
snprintf(reply, siz, "ERR.missing workinfo %"PRId64, workinfoid);
return strdup(reply);
}
hex2bin(ndiffbin, DATA_WORKINFO(w_item)->bits, 4);
ndiff = diff_from_nbits(ndiffbin);
total = 0;
sharesummary.workinfoid = workinfoid;
sharesummary.userid = MAXID;
sharesummary.workername[0] = '\0';
look.data = (void *)(&sharesummary);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &look,
cmp_sharesummary_workinfoid, ctx);
if (!ss_item) {
snprintf(reply, siz,
"ERR.no shares found with or before "
"workinfo %"PRId64,
workinfoid);
return strdup(reply);
}
mu_store = k_new_store(miningpayouts_free);
mu_root = new_ktree();
end_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid;
// add up all sharesummaries until >= ndiff
while (ss_item && total < ndiff) {
total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc);
begin_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid;
mu_root = upd_add_mu(mu_root, mu_store,
DATA_SHARESUMMARY(ss_item)->userid,
(int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc));
ss_item = prev_in_ktree(ctx);
}
// include all the rest of the sharesummaries with begin_workinfoid
while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == begin_workinfoid) {
total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc);
mu_root = upd_add_mu(mu_root, mu_store,
DATA_SHARESUMMARY(ss_item)->userid,
(int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc));
ss_item = prev_in_ktree(ctx);
}
if (total == 0.0) {
snprintf(reply, siz,
"ERR.total share diff 0 before workinfo %"PRId64,
workinfoid);
goto shazbot;
}
wb_item = find_workinfo(begin_workinfoid);
if (!wb_item) {
snprintf(reply, siz, "ERR.missing begin workinfo record! %"PRId64, workinfoid);
goto shazbot;
}
copy_tv(&begin_tv, &(DATA_WORKINFO(wb_item)->createdate));
/* Elapsed is short by the remainder of the block's workinfoid
* after the block was found, but we include all shares
* that were accepted as part of the block's workinfoid anyway
* All shares accepted in a workinfoid after the blocks workinfoid
* will not be creditied in this block no matter what the height
* of the workinfo - but will be candidates for the next block */
elapsed = tvdiff(&end_tv, &begin_tv);
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok.");
snprintf(tmp, sizeof(tmp), "block=%d%c", height, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "block_workinfoid=%"PRId64"%c", workinfoid, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "end_workinfoid=%"PRId64"%c", end_workinfoid, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "begin_workinfoid=%"PRId64"%c", begin_workinfoid, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diffacc_total=%.0f%c", total, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "block_ndiff=%f%c", ndiff, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "pplns_elapsed=%f%c", elapsed, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows = 0;
mu_item = first_in_ktree(mu_root, ctx);
while (mu_item) {
rows++;
K_RLOCK(users_free);
u_item = find_userid(DATA_MININGPAYOUTS(mu_item)->userid);
K_RUNLOCK(users_free);
if (u_item) {
snprintf(tmp, sizeof(tmp),
"user%d=%s%c",
rows,
DATA_USERS(u_item)->username,
FLDSEP);
} else {
snprintf(tmp, sizeof(tmp),
"user%d=%"PRId64"%c",
rows,
DATA_MININGPAYOUTS(mu_item)->userid,
FLDSEP);
}
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp),
"diffacc_user%d=%"PRId64"%c",
rows,
DATA_MININGPAYOUTS(mu_item)->amount,
FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
mu_item = next_in_ktree(ctx);
}
snprintf(tmp, sizeof(tmp), "rows=%d", rows);
APPEND_REALLOC(buf, off, len, tmp);
mu_root = free_ktree(mu_root, NULL);
K_WLOCK(mu_store);
k_list_transfer_to_head(mu_store, miningpayouts_free);
K_WUNLOCK(mu_store);
mu_store = k_free_store(mu_store);
LOGDEBUG("%s.ok.pplns.%s", id, buf);
return buf;
shazbot:
mu_root = free_ktree(mu_root, NULL);
K_WLOCK(mu_store);
k_list_transfer_to_head(mu_store, miningpayouts_free);
K_WUNLOCK(mu_store);
mu_store = k_free_store(mu_store);
return strdup(reply);
}
static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd,
char *id, __maybe_unused tv_t *now, char *id, __maybe_unused tv_t *now,
__maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *by, __maybe_unused char *code,
@ -8017,6 +8481,7 @@ static struct CMDS {
{ CMD_HOMEPAGE, "homepage", false, false, cmd_homepage, ACCESS_WEB }, { CMD_HOMEPAGE, "homepage", false, false, cmd_homepage, ACCESS_WEB },
{ CMD_DSP, "dsp", false, false, cmd_dsp, ACCESS_SYSTEM }, { CMD_DSP, "dsp", false, false, cmd_dsp, ACCESS_SYSTEM },
{ CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM }, { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM },
{ CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, NULL } { CMD_END, NULL, false, false, NULL, NULL }
}; };
@ -8719,6 +9184,7 @@ static void *socketer(__maybe_unused void *arg)
case CMD_ALLUSERS: case CMD_ALLUSERS:
case CMD_WORKERS: case CMD_WORKERS:
case CMD_PAYMENTS: case CMD_PAYMENTS:
case CMD_PPLNS:
case CMD_DSP: case CMD_DSP:
if (!startup_complete) { if (!startup_complete) {
snprintf(reply, sizeof(reply), snprintf(reply, sizeof(reply),
@ -8886,6 +9352,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
case CMD_HOMEPAGE: case CMD_HOMEPAGE:
case CMD_DSP: case CMD_DSP:
case CMD_STATS: case CMD_STATS:
case CMD_PPLNS:
LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored",
__func__, count, cmd); __func__, count, cmd);
break; break;

Loading…
Cancel
Save