Browse Source

Merge branch 'master' into multiproxy

master
Con Kolivas 10 years ago
parent
commit
29cb57fb61
  1. 2
      configure.ac
  2. 10
      pool/base.php
  3. 12
      pool/db.php
  4. 66
      pool/email.php
  5. 49
      pool/page_settings.php
  6. 3
      pool/page_shifts.php
  7. 87
      pool/page_userinfo.php
  8. 3
      pool/prime.php
  9. 156
      src/ckdb.c
  10. 91
      src/ckdb.h
  11. 269
      src/ckdb_cmd.c
  12. 297
      src/ckdb_data.c
  13. 812
      src/ckdb_dbio.c
  14. 14
      src/libckpool.c
  15. 1
      src/libckpool.h
  16. 41
      src/stratifier.c

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.8.7, kernel@kolivas.org)
AC_INIT(ckpool, 0.8.8, kernel@kolivas.org)
AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4])

10
pool/base.php

@ -100,18 +100,18 @@ global $sipre;
# max of uint256 is ~1.158x10^77, which is well above 'Y' (10^24)
$sipre = array('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y');
#
function siprefmt($amt)
function siprefmt($amt, $dot = 2)
{
global $sipre;
$dot = 2;
$rnd = pow(10, $dot);
$pref = floor(log10($amt)/3);
if ($pref < 0)
$pref = 0;
if ($pref >= count($sipre))
$pref = count($sipre)-1;
$amt = round(100.0 * $amt / pow(10, $pref * 3)) / 100;
$amt = round($rnd * $amt / pow(10, $pref * 3)) / $rnd;
if ($amt > 999.99 && $pref < (count($sipre)-1))
{
$amt /= 1000;
@ -141,7 +141,7 @@ function dsprate($hr)
#
function difffmt($amt)
{
return siprefmt($amt);
return siprefmt($amt, 3);
}
#
function emailStr($str)
@ -354,7 +354,7 @@ function logout()
#
function requestRegister()
{
$reg = getparam('Register', false);
$reg = getparam('Register', true);
$reg2 = getparam('Reset', false);
if ($reg !== NULL || $reg2 !== NULL)
{

12
pool/db.php

@ -386,6 +386,18 @@ function getBlocks($user)
return repDecode($rep);
}
#
function getUserInfo($user)
{
if ($user == false)
showIndex();
$flds = array('username' => $user);
$msg = msgEncode('userinfo', 'usr', $flds, $user);
$rep = sendsockreply('getUserInfo', $msg);
if (!$rep)
dbdown();
return repDecode($rep);
}
#
# e.g. $atts = array('ua_Reset.str' => 'FortyTwo',
# 'ua_Reset.date' => 'now+3600')
# 'ua_Tanuki.str' => 'Meme',

66
pool/email.php

@ -106,6 +106,72 @@ function passReset($to, $code, $whoip, $emailinfo)
return sendnoheader($to, "Password Reset", $message, $emailinfo);
}
#
function payoutAddressChanged($to, $whoip, $emailinfo)
{
global $eol;
if (!isset($emailinfo['KWebURL']))
return false;
$web = $emailinfo['KWebURL'];
$ret = emailEnd('payout address change', $whoip, $emailinfo);
if ($ret === false)
return false;
$message = "Your payout address has been changed.$eol$eol";
$message .= $ret;
return sendnoheader($to, "Payout Address Change", $message, $emailinfo);
}
#
function emailAddressChanged($to, $whoip, $emailinfo, $old)
{
global $eol;
if (!isset($emailinfo['KWebURL']))
return false;
$web = $emailinfo['KWebURL'];
$ret = emailEnd('email address change', $whoip, $emailinfo);
if ($ret === false)
return false;
$message = "Your email address has been changed to:$eol <$to>$eol$eol";
if ($old != null && $old != '')
{
$message .= "You will no longer receive notifications at the address:$eol <$old>$eol$eol";
$send = "$to,$old";
}
else
$send = $to;
$message .= $ret;
return sendnoheader($send, "EMail Address Change", $message, $emailinfo);
}
#
function passChanged($to, $whoip, $emailinfo)
{
global $eol;
if (!isset($emailinfo['KWebURL']))
return false;
$web = $emailinfo['KWebURL'];
$ret = emailEnd('password change', $whoip, $emailinfo);
if ($ret === false)
return false;
$message = "Your password was changed.$eol$eol";
$message .= "If you didn't change it, then you need to urgently use$eol";
$message .= "the password reset at the pool to change it again.$eol$eol";
$message .= $ret;
return sendnoheader($to, "Password Change", $message, $emailinfo);
}
#
# getOpts required for email
# If they aren't all setup in the DB then email functions will return false
function emailOptList()

49
pool/page_settings.php

@ -1,5 +1,7 @@
<?php
#
include_once('email.php');
#
function settings($data, $user, $email, $addr, $err)
{
$pg = '<h1>Account Settings</h1>';
@ -10,6 +12,8 @@ function settings($data, $user, $email, $addr, $err)
$pg .= '<table cellpadding=20 cellspacing=0 border=1>';
$pg .= '<tr class=dc><td><center>';
$_SESSION['old_set_email'] = $email;
$pg .= makeForm('settings');
$pg .= '<table cellpadding=5 cellspacing=0 border=0>';
$pg .= '<tr class=dc><td class=dr colspan=2>';
@ -101,6 +105,7 @@ function dosettings($data, $user)
$email = getparam('email', false);
$pass = getparam('pass', false);
$ans = userSettings($user, $email, null, $pass);
$err = 'EMail changed';
$check = true;
break;
case 'Address':
@ -110,6 +115,7 @@ function dosettings($data, $user)
$addrarr = array(array('addr' => $addr));
$pass = getparam('pass', false);
$ans = userSettings($user, null, $addrarr, $pass);
$err = 'Payout address changed';
$check = true;
}
break;
@ -132,13 +138,18 @@ function dosettings($data, $user)
}
break;
}
$doemail = false;
if ($check === true)
{
if ($ans['STATUS'] != 'ok')
{
$err = $ans['STATUS'];
if ($ans['ERROR'] != '')
$err .= ': '.$ans['ERROR'];
}
else
$doemail = true;
}
$ans = userSettings($user);
if ($ans['STATUS'] != 'ok')
dbdown(); // Should be no other reason?
@ -151,6 +162,44 @@ function dosettings($data, $user)
$addr = $ans['addr:0'];
else
$addr = '';
if ($doemail)
{
if ($email == '')
{
if ($err != '')
$err .= '<br>';
$err .= 'An error occurred, check your details below';
goto iroiroattanoyo;
}
$emailinfo = getOpts($user, emailOptList());
if ($emailinfo['STATUS'] != 'ok')
{
if ($err != '')
$err .= '<br>';
$err .= 'An error occurred, check your details below';
goto iroiroattanoyo;
}
switch ($chg)
{
case 'EMail':
if (isset($_SESSION['old_set_email']))
$old = $_SESSION['old_set_email'];
else
$old = null;
emailAddressChanged($email, zeip(), $emailinfo, $old);
break;
case 'Address':
payoutAddressChanged($email, zeip(), $emailinfo);
break;
case 'Password':
passChanged($email, zeip(), $emailinfo);
break;
}
}
iroiroattanoyo:
$pg = settings($data, $user, $email, $addr, $err);
return $pg;
}

3
pool/page_shifts.php

@ -10,6 +10,7 @@ function doshifts($data, $user)
$pg .= "<td class=dl>Start</td>";
$pg .= "<td class=dr>Length</td>";
$pg .= "<td class=dr>Your Diff</td>";
$pg .= "<td class=dr>Inv Diff</td>";
$pg .= "<td class=dr>Avg Hs</td>";
$pg .= "<td class=dr>Shares</td>";
$pg .= "<td class=dr>Avg Share</td>";
@ -54,6 +55,8 @@ function doshifts($data, $user)
$pg .= '<td class=dr>'.howmanyhrs($elapsed).'</td>';
$diffacc = $ans[$pre.'diffacc:'.$i];
$pg .= '<td class=dr>'.difffmt($diffacc).'</td>';
$diffinv = $ans[$pre.'diffinv:'.$i];
$pg .= '<td class=dr>'.difffmt($diffinv).'</td>';
$hr = $diffacc * pow(2,32) / $elapsed;
$pg .= '<td class=dr>'.dsprate($hr).'</td>';
$shareacc = $ans[$pre.'shareacc:'.$i];

87
pool/page_userinfo.php

@ -0,0 +1,87 @@
<?php
#
function blocksorder($a, $b)
{
if ($b['blocks'] != $a['blocks'])
return $b['blocks'] - $a['blocks'];
else
{
if ($b['diffacc'] != $a['diffacc'])
return $a['diffacc'] - $b['diffacc'];
else
return strcasecmp($a['username'], $b['username']);
}
}
#
function douserinfo($data, $user)
{
$sall = ($user == 'Kano');
$ans = getUserInfo($user);
$pg = '<h1>Block Acclaim</h1>'.$pg;
$pg .= "<table callpadding=0 cellspacing=0 border=0>\n";
$pg .= "<tr class=title>";
$pg .= "<td class=dl>User</td>";
$pg .= "<td class=dr>Blocks</td>";
if ($sall)
{
$pg .= "<td class=dr>Diff</td>";
$pg .= "<td class=dr>Avg</td>";
}
$pg .= "</tr>\n";
if ($ans['STATUS'] == 'ok')
{
$all = array();
$count = $ans['rows'];
for ($i = 0; $i < $count; $i++)
{
if ($sall)
$diffacc = $ans['diffacc:'.$i];
else
$diffacc = 0;
$all[] = array('blocks' => $ans['blocks:'.$i],
'username' => $ans['username:'.$i],
'diffacc' => $diffacc);
}
usort($all, 'blocksorder');
for ($i = 0; $i < $count; $i++)
{
$bl = $all[$i]['blocks'];
if ($sall == false && $bl < 1)
break;
if (($i % 2) == 0)
$row = 'even';
else
$row = 'odd';
$pg .= "<tr class=$row>";
$un = htmlspecialchars($all[$i]['username']);
$pg .= "<td class=dl>$un</td>";
$pg .= "<td class=dr>$bl</td>";
if ($sall)
{
$diffacc = $all[$i]['diffacc'];
$pg .= '<td class=dr>'.difffmt($diffacc).'</td>';
if ($bl == 0)
$bl = 1;
$pg .= '<td class=dr>'.difffmt($diffacc/$bl).'</td>';
}
$pg .= "</tr>\n";
}
}
$pg .= "</table>\n";
return $pg;
}
#
function show_userinfo($info, $page, $menu, $name, $user)
{
gopage($info, NULL, 'douserinfo', $page, $menu, $name, $user);
}
#
?>

3
pool/prime.php

@ -84,7 +84,8 @@ function check()
'Pool' => array(
'Stats' => 'stats',
'Blocks' => 'blocks',
'Graph' => 'psperf'
'Graph' => 'psperf',
'Acclaim' => 'userinfo'
),
'Admin' => NULL,
'gap' => array( # options not shown

156
src/ckdb.c

@ -268,7 +268,7 @@ bool dbload_only_sharesummary = false;
* markersummaries and pplns payouts may not be correct */
bool sharesummary_marks_limit = false;
// DB users,workers load is complete
// DB optioncontrol,users,workers,useratts load is complete
bool db_users_complete = false;
// DB load is complete
bool db_load_complete = false;
@ -510,6 +510,11 @@ const char *marktype_other_finish_fmt = "fin: %s";
const char *marktype_shift_begin_skip = "Shift stt: ";
const char *marktype_shift_end_skip = "Shift fin: ";
// USERINFO from various incoming data
K_TREE *userinfo_root;
K_LIST *userinfo_free;
K_STORE *userinfo_store;
static char logname[512];
static char *dbcode;
@ -753,7 +758,9 @@ static bool getdata1()
goto matane;
if (!(ok = users_fill(conn)))
goto matane;
ok = workers_fill(conn);
if (!(ok = workers_fill(conn)))
goto matane;
ok = useratts_fill(conn);
matane:
@ -791,21 +798,15 @@ static bool getdata3()
}
if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai;
/* marks must be loaded before sharesummary
* since sharesummary looks at the marks data */
if (!(ok = marks_fill(conn)) || everyone_die)
goto sukamudai;
/* must be after workinfo */
if (!(ok = workmarkers_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai;
if (!(ok = sharesummary_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary) {
if (!(ok = useratts_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary && !everyone_die)
ok = poolstats_fill(conn);
}
sukamudai:
@ -823,32 +824,25 @@ static bool reload()
char *reason;
FILE *fp;
tv_to_buf(&(dbstatus.oldest_sharesummary_firstshare_n), buf, sizeof(buf));
LOGWARNING("%s(): %s oldest DB incomplete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_sharesummary_firstshare_ay), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB complete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_workmarker_workinfo),
buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workmarker wid %"PRId64,
__func__, buf,
dbstatus.newest_workmarker_workinfoid);
tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workinfo", __func__, buf);
LOGWARNING("%s(): %s newest DB workinfo wid %"PRId64,
__func__, buf, dbstatus.newest_workinfoid);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
LOGWARNING("%s(): %s newest DB poolstats (ignored)", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB blocks (ignored)", __func__, buf);
if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec)
copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.oldest_sharesummary_firstshare_n));
else
copy_tv(&(dbstatus.sharesummary_firstshare), &(dbstatus.newest_sharesummary_firstshare_ay));
copy_tv(&start, &(dbstatus.sharesummary_firstshare));
reason = "sharesummary";
copy_tv(&start, &(dbstatus.newest_createdate_workmarker_workinfo));
reason = "workmarkers";
if (!tv_newer(&start, &(dbstatus.newest_createdate_workinfo))) {
copy_tv(&start, &(dbstatus.newest_createdate_workinfo));
reason = "workinfo";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) {
copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
reason = "poolstats";
}
tv_to_buf(&start, buf, sizeof(buf));
LOGWARNING("%s() restart timestamp %s for %s", __func__, buf, reason);
@ -896,15 +890,18 @@ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid)
fclose(fp);
if (ret == 1 && !(kill(oldpid, 0))) {
if (!ckp->killold) {
LOGEMERG("Process %s pid %d still exists, start ckpool with -k if you wish to kill it",
LOGEMERG("Process %s pid %d still exists, start"
" ckpool with -k if you wish to kill it",
path, oldpid);
return false;
}
if (kill(oldpid, 9)) {
LOGEMERG("Unable to kill old process %s pid %d", path, oldpid);
LOGEMERG("Unable to kill old process %s pid %d",
path, oldpid);
return false;
}
LOGWARNING("Killing off old process %s pid %d", path, oldpid);
LOGWARNING("Killing off old process %s pid %d",
path, oldpid);
}
}
fp = fopen(path, "we");
@ -1145,18 +1142,26 @@ static void alloc_storage()
ALLOC_MARKS, LIMIT_MARKS, true);
marks_store = k_new_store(marks_free);
marks_root = new_ktree();
userinfo_free = k_new_list("UserInfo", sizeof(USERINFO),
ALLOC_USERINFO, LIMIT_USERINFO, true);
userinfo_store = k_new_store(userinfo_free);
userinfo_root = new_ktree();
}
#define SEQSETWARN(_set, _seqset, _msgtxt, _endtxt) do { \
#define SEQSETMSG(_set, _seqset, _msgtxt, _endtxt) do { \
char _t_buf[DATE_BUFSIZ]; \
bool _warn = ((_seqset)->seqdata[SEQ_SHARES].missing > 0) || \
((_seqset)->seqdata[SEQ_SHARES].lost > 0); \
btu64_to_buf(&((_seqset)->seqstt), _t_buf, sizeof(_t_buf)); \
LOGWARNING("SEQ %s: %d/"SEQSTT":%"PRIu64"=%s "SEQPID":%"PRIu64 \
LOGWARNING("%s %s: %d/"SEQSTT":%"PRIu64"=%s "SEQPID":%"PRIu64 \
" M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \
"/R%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 \
"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/R%"PRIu64 \
"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64 \
"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/R%"PRIu64 \
"/OK%"PRIu64"%s", \
_warn ? "SEQ" : "Seq", \
_msgtxt, _set, (_seqset)->seqstt, _t_buf, \
(_seqset)->seqpid, (_seqset)->missing, (_seqset)->trans, \
(_seqset)->lost, (_seqset)->stale, (_seqset)->high, \
@ -1248,16 +1253,15 @@ void sequence_report(bool lock)
seqset->seqdata[SEQ_SHARES].missing ||
seqset->seqdata[SEQ_SHARES].trans ||
seqset->seqdata[SEQ_SHARES].lost)) {
miss = (seqset->seqdata[SEQ_SHARES].missing ||
seqset->seqdata[SEQ_SHARES].trans ||
seqset->seqdata[SEQ_SHARES].lost);
miss = (seqset->seqdata[SEQ_SHARES].missing > 0) ||
(seqset->seqdata[SEQ_SHARES].lost > 0);
if (lock) {
memcpy(&seqset_copy, seqset, sizeof(seqset_copy));
ck_wunlock(&seq_lock);
seqset = &seqset_copy;
}
SEQSETWARN(set, seqset,
miss ? "SHARES MISSING" : "status" , EMPTY);
SEQSETMSG(set, seqset,
miss ? "SHARES MISSING" : "status" , EMPTY);
if (lock)
ck_wlock(&seq_lock);
}
@ -1280,6 +1284,8 @@ static void dealloc_storage()
FREE_LISTS(logqueue);
FREE_ALL(userinfo);
FREE_TREE(marks);
FREE_STORE_DATA(marks);
FREE_LIST_DATA(marks);
@ -1746,8 +1752,9 @@ static void seq_reloadmax()
* messages or incorrect messages on the console when errors occur
* It wont lose msglines from the reload or the queue, since if there is any
* problem with any msgline, it will be processed rather than skipped
* Only valid duplicates, with all 3 sequence numbers (cmd, stt, pid) matching
* a previous msgline, are flagged DUP to be skipped by the sequence code */
* Only valid duplicates, with all 4 sequence numbers (all, cmd, stt, pid)
* matching a previous msgline, are flagged DUP to be skipped by the
* sequence code */
static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
uint64_t n_seqstt, uint64_t n_seqpid,
char *nam, tv_t *now, tv_t *cd, char *code,
@ -1768,6 +1775,11 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
int set = -1, expset = -1, highlimit, i;
K_STORE *lost = NULL;
LOGDEBUG("%s() SQ %c:%d/%s/%"PRIu64"/%"PRIu64"/%"PRIu64"/%s '%.80s...",
__func__, SECHR(seqentryflags), seq, nam, n_seqcmd, n_seqstt,
n_seqpid, code, st = safe_text(msg));
FREENULL(st);
firstseq = newseq = expseq = gothigh = okhi = gotstale =
gotstalestart = dup = wastrans = gotrecover = false;
ck_wlock(&seq_lock);
@ -1954,7 +1966,7 @@ gotseqset:
* there was some problem with the reload data
* When we switch from the reload data to the queue
* data, it is also flagged ok since it may also be
* due to lost data at the end or missing reload files
* due to lost data at the end, or missing reload files
* In both these cases the message will be 'OKHI'
* instead of 'HIGH'
* If however this is caused by a corrupt seq number
@ -2186,12 +2198,13 @@ setitemdata:
} else {
if (newseq) {
if (set == 0)
SEQSETWARN(0, &seqset_pre, "previous", EMPTY);
SEQSETMSG(0, &seqset_pre, "previous", EMPTY);
else
SEQSETWARN(0, &seqset_pre, "current", EMPTY);
SEQSETMSG(0, &seqset_pre, "current", EMPTY);
}
if (expseq) {
SEQSETMSG(expset, &seqset_exp, "discarded old", " for:");
}
if (expseq)
SEQSETWARN(expset, &seqset_exp, "discarded old", " for:");
if (newseq || expseq) {
btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf));
LOGWARNING("Seq created new: set:%d %s:%"PRIu64" "
@ -2203,15 +2216,21 @@ setitemdata:
if (dup) {
int level = LOG_WARNING;
/* If one is SE_RELOAD and the other is SE_EARLYSOCK then it's
* not unexpected so only LOG_DEBUG */
/* If one is SE_RELOAD and the other is SE_EARLYSOCK or
* SE_SOCKET then it's not unexpected, so only use LOG_DEBUG
* Technically SE_SOCKET is unexpected, except that at the end
* of the reload sync there may still be pool messages that
* haven't got into the queue yet - it wouldn't be expected
* for there to be many since it would be ckdb emptying the
* queue faster than it is filling due to the reload delay -
* but either way they don't need to be reported */
if (((seqentry_copy.flags | seqentryflags) & SE_RELOAD) &&
((seqentry_copy.flags | seqentryflags) & SE_EARLYSOCK))
((seqentry_copy.flags | seqentryflags) & (SE_EARLYSOCK | SE_SOCKET)))
level = LOG_DEBUG;
btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf));
bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2));
LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set:%d/%"PRIu64
"=%s/%"PRIu64" %s/%s v%"PRIu64"/^%"PRIu64
"=%s/%"PRIu64" %s/%s -vs- v%"PRIu64"/^%"PRIu64
"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64
"/H%"PRIu64"/OK%"PRIu64" cmd=%.42s...",
(level == LOG_DEBUG) ? "*" : EMPTY,
@ -3049,7 +3068,7 @@ static void *summariser(__maybe_unused void *arg)
rename_proc("db_summariser");
while (!everyone_die && !startup_complete)
while (!everyone_die && !reload_queue_complete)
cksleep_ms(42);
summariser_using_data = true;
@ -3106,7 +3125,7 @@ static char *shift_words[] =
"quinn",
"rika",
"sena",
"tsubasa",
"tenshi",
"ur",
"valentina",
"winry",
@ -3541,7 +3560,7 @@ static void *marker(__maybe_unused void *arg)
rename_proc("db_marker");
while (!everyone_die && !startup_complete)
while (!everyone_die && !reload_queue_complete)
cksleep_ms(42);
if (sharesummary_marks_limit) {
@ -3552,16 +3571,6 @@ static void *marker(__maybe_unused void *arg)
marker_using_data = true;
/* TODO: trigger this every workinfo change?
* note that history catch up would also mean the tigger would
* catch up at most 100 missing marks per shift
* however, also, a workinfo change means a sharesummary DB update,
* so would be best to (usually) wait until that is done
* OR: avoid writing the sharesummaries to the DB at all
* and only write the markersummaries? - since 100 workinfoid shifts
* will usually mean that markersummaries are less than every hour
* (and a reload processes more than an hour) */
while (!everyone_die) {
for (i = 0; i < 5; i++) {
if (!everyone_die)
@ -3938,6 +3947,7 @@ static void *socketer(__maybe_unused void *arg)
case CMD_STATS:
case CMD_USERSTATUS:
case CMD_SHSTA:
case CMD_USERINFO:
ans = ckdb_cmds[msgline->which_cmds].func(NULL,
msgline->cmd,
msgline->id,
@ -4135,6 +4145,21 @@ static void *socketer(__maybe_unused void *arg)
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
k_add_tail(workqueue_store, wq_item);
/* Stop the reload queue from growing too big
* Use a size that should be big enough */
if (reloading && workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(workqueue_store);
K_WUNLOCK(workqueue_free);
WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item);
K_ITEM *ml_item = wq->msgline_item;
free_msgline_data(ml_item, true, false);
K_WLOCK(msgline_free);
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq2_item);
}
K_WUNLOCK(workqueue_free);
ml_item = NULL;
mutex_lock(&wq_waitlock);
@ -4260,6 +4285,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
case CMD_MARKS:
case CMD_PSHIFT:
case CMD_SHSTA:
case CMD_USERINFO:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
__func__, count,
@ -4410,6 +4436,7 @@ static bool reload_from(tv_t *start)
reloading = true;
copy_tv(&reload_timestamp, start);
// Go back further - one reload file
reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S;
tv_to_buf(start, buf, sizeof(buf));
@ -4732,6 +4759,7 @@ static void *listener(void *arg)
seqdata++;
}
}
ss_item = ss_item->next;
}
}
seqdata_reload_lost = false;
@ -4761,6 +4789,7 @@ static void *listener(void *arg)
return NULL;
}
#if 0
/* TODO: This will be way faster traversing both trees simultaneously
* rather than traversing one and searching the other, then repeating
* in reverse. Will change it later */
@ -4864,6 +4893,7 @@ static void compare_summaries(K_TREE *leftsum, char *leftname,
diff_first, diff_last, cd_buf1, cd_buf2);
}
}
#endif
/* TODO: have a seperate option to find/store missing workinfo/shares/etc
* from the reload files, in a supplied UTC time range
@ -4876,6 +4906,9 @@ static void compare_summaries(K_TREE *leftsum, char *leftname,
* and the payment is now wrong */
static void confirm_reload()
{
#if 0
TODO: redo this using workmarkers
K_TREE *sharesummary_workinfoid_save;
__maybe_unused K_TREE *sharesummary_save;
__maybe_unused K_TREE *workinfo_save;
@ -5190,6 +5223,7 @@ static void confirm_reload()
compare_summaries(sharesummary_workinfoid_root, "ReLoad",
sharesummary_workinfoid_save, "DB",
true, false);
#endif
}
// TODO: handle workmarkers/markersummaries

91
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.090"
#define CKDB_VERSION DB_VERSION"-1.112"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -105,6 +105,9 @@ extern int switch_state;
#define BLANK " "
extern char *EMPTY;
// To ensure there's space for the ticker
#define TICK_PREFIX " "
// Field patterns
extern const char *userpatt;
extern const char *mailpatt;
@ -125,20 +128,12 @@ extern const char *addrpatt;
#define MAX_PAYADDR '~'
typedef struct loadstatus {
tv_t oldest_sharesummary_firstshare_n;
tv_t newest_sharesummary_firstshare_a;
tv_t newest_sharesummary_firstshare_ay;
tv_t sharesummary_firstshare; // whichever of above 2 used
tv_t oldest_sharesummary_firstshare_a;
tv_t newest_sharesummary_firstshare_y;
int64_t newest_workmarker_workinfoid;
int64_t newest_workinfoid;
tv_t newest_createdate_workmarker_workinfo;
tv_t newest_createdate_workinfo;
tv_t newest_createdate_poolstats;
tv_t newest_starttimeband_userstats;
tv_t newest_createdate_blocks;
int64_t oldest_workinfoid_n; // of oldest firstshare sharesummary n
int64_t oldest_workinfoid_a; // of oldest firstshare sharesummary a
int64_t newest_workinfoid_a; // of newest firstshare sharesummary a
int64_t newest_workinfoid_y; // of newest firstshare sharesummary y
} LOADSTATUS;
extern LOADSTATUS dbstatus;
@ -403,6 +398,7 @@ enum cmd_values {
CMD_MARKS,
CMD_PSHIFT,
CMD_SHSTA,
CMD_USERINFO,
CMD_END
};
@ -1365,10 +1361,6 @@ typedef struct sharesummary {
double sharerej;
int64_t sharecount;
int64_t errorcount;
int64_t countlastupdate; // non-DB field
bool inserted; // non-DB field
bool saveaged; // non-DB field
bool reset; // non-DB field
tv_t firstshare;
tv_t lastshare;
double lastdiffacc;
@ -1929,6 +1921,35 @@ extern const char *marktype_shift_end_skip;
#define MARK_USED_STR "u"
#define MUSED(_status) (tolower((_status)[0]) == MARK_USED)
// USERINFO from various incoming data
typedef struct userinfo {
int64_t userid;
char username[TXT_BIG+1];
int blocks;
int orphans; // How many blocks are orphans
tv_t last_block;
// For all time
double diffacc;
double diffsta;
double diffdup;
double diffhi;
double diffrej;
double shareacc;
double sharesta;
double sharedup;
double sharehi;
double sharerej;
} USERINFO;
#define ALLOC_USERINFO 1000
#define LIMIT_USERINFO 0
#define INIT_USERINFO(_item) INIT_GENERIC(_item, userinfo)
#define DATA_USERINFO(_var, _item) DATA_GENERIC(_var, _item, userinfo, true)
extern K_TREE *userinfo_root;
extern K_LIST *userinfo_free;
extern K_STORE *userinfo_store;
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now);
extern void tick();
@ -1953,6 +1974,8 @@ extern void sequence_report(bool lock);
#define PPLNSDIFFTIMES "pplns_diff_times"
#define PPLNSDIFFADD "pplns_diff_add"
#define REWARDOVERRIDE "MinerReward"
// Data free functions (first)
extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull);
extern void free_workinfo_data(K_ITEM *item);
@ -2101,10 +2124,10 @@ extern cmp_t _cmp_height(char *coinbase1a, char *coinbase1b, WHERE_FFL_ARGS);
extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd, tv_t *ss_first,
tv_t *ss_last, int64_t *ss_count, int64_t *s_count,
int64_t *s_diff);
extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b);
extern void dsp_sharesummary(K_ITEM *item, FILE *stream);
@ -2122,8 +2145,8 @@ extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff);
extern K_ITEM *_find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid, bool pool);
extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername);
extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd);
extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd);
#define dbhash2btchash(_hash, _buf, _siz) \
_dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
@ -2192,6 +2215,17 @@ extern bool _marks_description(char *description, size_t siz, char *marktype,
int32_t height, char *shift, char *other,
WHERE_FFL_ARGS);
extern char *shiftcode(tv_t *createdate);
extern cmp_t cmp_userinfo(K_ITEM *a, K_ITEM *b);
#define get_userinfo(_userid) _get_userinfo(_userid, true)
extern K_ITEM *_get_userinfo(int64_t userid, bool lock);
#define find_userinfo(_userid) _find_create_userinfo(_userid, true, WHERE_FFL_HERE)
#define _find_userinfo(_userid, _lock) _find_create_userinfo(_userid, _lock, WHERE_FFL_HERE)
extern K_ITEM *_find_create_userinfo(int64_t userid, bool lock, WHERE_FFL_ARGS);
#define userinfo_update(_s, _ss, _ms) _userinfo_update(_s, _ss, _ms, true, true)
extern void _userinfo_update(SHARES *shares, SHARESUMMARY *sharesummary,
MARKERSUMMARY *markersummary, bool ss_sub, bool lock);
#define userinfo_block(_blocks, _isnew) _userinfo_block(_blocks, _isnew, true)
extern void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock);
// ***
// *** PostgreSQL functions ckdb_dbio.c
@ -2204,6 +2238,8 @@ extern char *shiftcode(tv_t *createdate);
(_res) == PGRES_TUPLES_OK || \
(_res) == PGRES_EMPTY_QUERY)
#define SQL_UNIQUE_VIOLATION "23505"
#define CKPQ_READ true
#define CKPQ_WRITE false
@ -2307,11 +2343,12 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar
char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
extern char *ooo_status(char *buf, size_t siz);
#define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \
_sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \
#define sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \
_sharesummary_update(_s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \
WHERE_FFL_HERE)
extern bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS);
extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS);
extern bool sharesummary_fill(PGconn *conn);
extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
double diffacc, double diffinv, double shareacc,
@ -2336,6 +2373,8 @@ extern void payouts_add_ram(bool ok, K_ITEM *p_item, K_ITEM *old_p_item,
extern bool payouts_add(PGconn *conn, bool add, K_ITEM *p_item,
K_ITEM **old_p_item, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root, bool already);
extern K_ITEM *payouts_full_expire(PGconn *conn, int64_t payoutid, tv_t *now,
bool lock);
extern bool payouts_fill(PGconn *conn);
extern bool auths_add(PGconn *conn, char *poolinstance, char *username,
char *workername, char *clientid, char *enonce1,

269
src/ckdb_cmd.c

@ -683,7 +683,8 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by,
by, code, inet, cd, igndup, trf_root);
if (!ok) {
LOGERR("%s() %s.failed.DBE", __func__, id);
if (!igndup)
LOGERR("%s() %s.failed.DBE", __func__, id);
return strdup("failed.DBE");
}
LOGDEBUG("%s.ok.", id);
@ -698,13 +699,11 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id,
{
bool igndup = false;
// confirm_summaries() doesn't call this
if (reloading) {
if (tv_equal(cd, &(dbstatus.newest_createdate_blocks)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_blocks)))
return NULL;
}
/* confirm_summaries() doesn't call this
* We don't care about dups during reload since poolstats_fill()
* doesn't load all the data */
if (reloading)
igndup = true;
return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root);
}
@ -1965,20 +1964,18 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id,
K_ITEM *i_ntime, *i_reward;
bool igndup = false;
if (reloading && !confirm_sharesummary) {
if (tv_equal(cd, &(dbstatus.newest_createdate_workinfo)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_workinfo)))
return NULL;
}
i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz);
if (!i_workinfoid)
return strdup(reply);
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
if (workinfoid <= dbstatus.newest_workinfoid)
igndup = true;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
goto wiconf;
@ -2056,12 +2053,6 @@ wiconf:
K_ITEM *i_secondaryuserid;
bool ok;
// This just excludes the shares we certainly don't need
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz);
if (!i_nonce)
return strdup(reply);
@ -2070,9 +2061,28 @@ wiconf:
if (!i_workinfoid)
return strdup(reply);
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
/* ISDR (Ignored shares during reload)
* This will discard any shares older than the newest
* workinfoidend of any workmarker - including ready
* but not processed workmarkers
* This means that if a workmarker needs re-processing
* and all of it's shares need to be redone, that will
* require a seperate procedure to the reload
* This would be the (as yet non-existant)
* confirm_markersummary which will replace the
* now unusable confirm_sharesummary code
* However, if the workmarker simply just needs to be
* flagged as processed, this avoids the problem of
* duplicating shares before flagging it
*/
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
goto sconf;
@ -2151,12 +2161,6 @@ sconf:
K_ITEM *i_error, *i_secondaryuserid;
bool ok;
// This just excludes the shareerrors we certainly don't need
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_username = require_name(trf_root, "username", 1, NULL, reply, siz);
if (!i_username)
return strdup(reply);
@ -2165,6 +2169,13 @@ sconf:
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
// See comment 'ISDR' above for shares
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
@ -2216,15 +2227,18 @@ seconf:
tv_t ss_first, ss_last;
bool ok;
if (reloading && !confirm_sharesummary) {
if (tv_newer(cd, &(dbstatus.sharesummary_firstshare)))
return NULL;
}
i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
// This excludes any already summarised
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
@ -2237,27 +2251,19 @@ seconf:
if (!i_poolinstance)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
ok = workinfo_age(conn, workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
ok = workinfo_age(workinfoid, transfer_data(i_poolinstance),
by, code, inet, cd, &ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
if (!ok) {
LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id);
return strdup("failed.DATA");
} else {
/* Don't slow down the reload - do them later
* N.B. this means if you abort/terminate the reload,
* next restart will again go back to the oldest
* unaged sharesummary due to a pool terminate */
/* Don't slow down the reload - do them later */
if (!reloading) {
// Aging is a queued item thus the reply is ignored
auto_age_older(conn, workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd);
auto_age_older(workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd);
}
}
LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid);
@ -4514,34 +4520,27 @@ static char *cmd_payouts(PGconn *conn, char *cmd, char *id, tv_t *now,
"%"PRId32"/%s",
payoutid, old_payouts2->status, payouts2->status,
payouts2->height, payouts2->blockhash);
/*
} else if (strcasecmp(action, "expire") == 0) {
/ TODO: Expire the payout - effectively deletes it
/* Expire the payout - effectively deletes it
* Require payoutid
* If any payments are paid then don't allow it /
* TODO: If any payments are paid then don't allow it */
i_payoutid = require_name(trf_root, "payoutid", 1,
(char *)intpatt, reply, siz);
if (!i_payoutid)
return strdup(reply);
TXT_TO_BIGINT("payoutid", transfer_data(i_payoutid), payoutid);
K_WLOCK(payouts_free);
p_item = find_payoutid(payoutid);
p_item = payouts_full_expire(conn, payoutid, now, true);
if (!p_item) {
K_WUNLOCK(payouts_free);
snprintf(reply, siz,
"no payout with id %"PRId64, payoutid);
snprintf(reply, siz, "failed payout %"PRId64, payoutid);
return strdup(reply);
}
p2_item = k_unlink_head(payouts_free);
K_WUNLOCK(payouts_free);
DATA_PAYOUTS(payouts2, p2_item);
bzero(payouts2, sizeof(*payouts2));
payouts2->payoutid = payouts->payoutid;
...
*/
DATA_PAYOUTS(payouts, p_item);
snprintf(msg, sizeof(msg),
"payout %"PRId64" block %"PRId32" reward %"PRId64
" status '%s'",
payouts->payoutid, payouts->height,
payouts->minerreward, payouts->status);
} else if (strcasecmp(action, "process") == 0) {
/* Generate a payout
* Require height, blockhash and addrdate
@ -4809,7 +4808,7 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t marker_end = { 0L, 0L };
int rows, want, i, where_all;
int64_t maxrows;
double wm_count;
double wm_count, d;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -4893,8 +4892,14 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
while (ms_item && ms->markerid == wm->markerid &&
ms->userid == users->userid) {
ms_add.diffacc += ms->diffacc;
ms_add.diffsta += ms->diffsta;
ms_add.diffdup += ms->diffdup;
ms_add.diffhi += ms->diffhi;
ms_add.diffrej += ms->diffrej;
ms_add.shareacc += ms->shareacc;
ms_add.sharesta += ms->sharesta;
ms_add.sharedup += ms->sharedup;
ms_add.sharehi += ms->sharehi;
ms_add.sharerej += ms->sharerej;
want = worker_offset(selects, ms->workername);
@ -4905,8 +4910,10 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
want, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms->diffrej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffrej:%d=%s%c",
d = ms->diffsta + ms->diffdup +
ms->diffhi + ms->diffrej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffinv:%d=%s%c",
want, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@ -4915,8 +4922,10 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
want, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms->sharerej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_sharerej:%d=%s%c",
d = ms->sharesta + ms->sharedup +
ms->sharehi + ms->sharerej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_shareinv:%d=%s%c",
want, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
}
@ -5000,8 +5009,10 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms_add.diffrej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffrej:%d=%s%c",
d = ms_add.diffsta + ms_add.diffdup +
ms_add.diffhi + ms_add.diffrej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffinv:%d=%s%c",
where_all, rows,
reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@ -5012,8 +5023,10 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms_add.sharerej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_sharerej:%d=%s%c",
d = ms_add.sharesta + ms_add.sharedup +
ms_add.sharehi + ms_add.sharerej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_shareinv:%d=%s%c",
where_all, rows,
reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@ -5038,7 +5051,7 @@ static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp),
"%d_flds=%s%c", i,
"diffacc,diffrej,shareacc,sharerej", FLDSEP);
"diffacc,diffinv,shareacc,shareinv", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
}
}
@ -5194,6 +5207,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(poolstats, 1, 1);
USEINFO(userstats, 2, 1);
USEINFO(workerstatus, 1, 1);
USEINFO(userinfo, 1, 1);
USEINFO(msgline, 1, 0);
USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0);
@ -5666,7 +5680,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t marker_end = { 0L, 0L };
int rows;
int64_t maxrows;
double wm_count;
double wm_count, d;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -5728,8 +5742,10 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
0, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms->diffrej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffrej:%d=%s%c",
d = ms->diffsta + ms->diffdup + ms->diffhi +
ms->diffrej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_diffinv:%d=%s%c",
0, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@ -5738,8 +5754,10 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
0, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(ms->sharerej, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_sharerej:%d=%s%c",
d = ms->sharesta + ms->sharedup + ms->sharehi +
ms->sharerej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "%d_shareinv:%d=%s%c",
0, rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
}
@ -5824,7 +5842,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
snprintf(tmp, sizeof(tmp), "%d_pool=%s%c", 0, "all", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "%d_flds=%s%c",
0, "diffacc,diffrej,shareacc,sharerej", FLDSEP);
0, "diffacc,diffinv,shareacc,shareinv", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "prefix_all=%d_%c", 0, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@ -5863,6 +5881,88 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
return strdup(buf);
}
static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
__maybe_unused K_TREE *trf_root)
{
K_ITEM *ui_item;
USERINFO *userinfo;
char reply[1024] = "";
char tmp[1024];
size_t len, off;
double d;
char *buf;
int rows;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok.");
rows = 0;
K_RLOCK(userinfo_free);
ui_item = userinfo_store->head;
while (ui_item) {
DATA_USERINFO(userinfo, ui_item);
str_to_buf(userinfo->username, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "username:%d=%s%c",
rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "blocks:%d=%d%c", rows,
userinfo->blocks - userinfo->orphans, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "orphans:%d=%d%c", rows,
userinfo->orphans, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(userinfo->diffacc, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "diffacc:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
d = userinfo->diffsta + userinfo->diffdup + userinfo->diffhi +
userinfo->diffrej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "diffinv:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(userinfo->shareacc, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "shareacc:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
d = userinfo->sharesta + userinfo->sharedup + userinfo->sharehi +
userinfo->sharerej;
double_to_buf(d, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "shareinv:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "lastblock=%ld%c",
userinfo->last_block.tv_sec, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows++;
ui_item = ui_item->next;
}
K_RUNLOCK(userinfo_free);
snprintf(tmp, sizeof(tmp),
"rows=%d%cflds=%s%c",
rows, FLDSEP,
"username,blocks,orphans,diffacc,diffinv,shareacc,shareinv,"
"lastblock", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "arn=%s%carp=", "UserInfo", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
LOGDEBUG("%s.ok.%d_rows", id, rows);
return buf;
}
// TODO: limit access by having seperate sockets for each
#define ACCESS_POOL "p"
#define ACCESS_SYSTEM "s"
@ -5975,5 +6075,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_MARKS, "marks", false, false, cmd_marks, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_PSHIFT, "pshift", false, false, cmd_pshift, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB },
{ CMD_SHSTA, "shsta", true, false, cmd_shsta, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_USERINFO, "userinfo", false, false, cmd_userinfo, SEQ_NONE, ACCESS_WEB },
{ CMD_END, NULL, false, false, NULL, SEQ_NONE, NULL }
};

297
src/ckdb_data.c

@ -936,8 +936,10 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
K_WLOCK(workerstatus_free);
if (tv_newer(&(row->last_auth), &(auths->createdate)))
copy_tv(&(row->last_auth), &(auths->createdate));
K_WUNLOCK(workerstatus_free);
}
}
@ -953,6 +955,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
K_WLOCK(workerstatus_free);
if (tv_newer(&(row->last_share), &(shares->createdate))) {
copy_tv(&(row->last_share), &(shares->createdate));
row->last_diff = shares->diff;
@ -987,6 +990,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
row->sharerej++;
break;
}
K_WUNLOCK(workerstatus_free);
}
}
@ -995,6 +999,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
file, func, line);
if (item) {
DATA_WORKERSTATUS(row, item);
K_WLOCK(workerstatus_free);
if (userstats->idle) {
if (tv_newer(&(row->last_idle), &(userstats->statsdate)))
copy_tv(&(row->last_idle), &(userstats->statsdate));
@ -1002,6 +1007,7 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
if (tv_newer(&(row->last_stats), &(userstats->statsdate)))
copy_tv(&(row->last_stats), &(userstats->statsdate));
}
K_WUNLOCK(workerstatus_free);
}
}
}
@ -1557,6 +1563,28 @@ cmp_t cmp_optioncontrol(K_ITEM *a, K_ITEM *b)
return c;
}
#define reward_override_name(_height, _buf, _siz) \
_reward_override_name(_height, _buf, _siz, WHERE_FFL_HERE)
static bool _reward_override_name(int32_t height, char *buf, size_t siz,
WHERE_FFL_ARGS)
{
char tmp[128];
size_t len;
snprintf(tmp, sizeof(tmp), REWARDOVERRIDE"_%"PRId32, height);
// Code bug - detect and notify truncation coz that would be bad :P
len = strlen(tmp) + 1;
if (len > siz) {
LOGEMERG("%s(): Invalid size %d passed - required %d" WHERE_FFL,
__func__, (int)siz, (int)len, WHERE_FFL_PASS);
return false;
}
strcpy(buf, tmp);
return true;
}
// Must be R or W locked before call
K_ITEM *find_optioncontrol(char *optionname, tv_t *now, int32_t height)
{
@ -1758,20 +1786,21 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx)
return item;
}
bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff)
// Duplicates during a reload are set to not show messages
bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item;
K_ITEM *wm_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
int64_t diff_tot;
SHARESUMMARY looksharesummary, *sharesummary;
WORKINFO *workinfo;
SHARES lookshares, *shares;
bool ok = false, conned = false, skipupdate;
bool ok = false, skipupdate;
char error[1024];
LOGDEBUG("%s(): age", __func__);
@ -1822,7 +1851,8 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
looksharesummary.workername = EMPTY;
ok = true;
ss_tot = ss_already = ss_failed = shares_tot = shares_dumped = 0;
ss_tot = ss_already = ss_failed = shares_tot = shares_dumped =
diff_tot = 0;
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx);
@ -1849,14 +1879,9 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
}
if (!skipupdate) {
if (conn == NULL && !confirm_sharesummary) {
conn = dbconnect();
conned = true;
}
if (!sharesummary_update(conn, NULL, NULL, ss_item, by, code, inet, cd)) {
if (!sharesummary_update(NULL, NULL, ss_item, by, code, inet, cd)) {
ss_failed++;
LOGERR("%s(): Failed to age share summary %"PRId64"/%s/%"PRId64,
LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64,
__func__, sharesummary->userid,
sharesummary->workername,
sharesummary->workinfoid);
@ -1891,6 +1916,8 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
break;
shares_tot++;
if (shares->errn == SE_NONE)
diff_tot += shares->diff;
tmp_item = next_in_ktree(s_ctx);
shares_root = remove_from_ktree(shares_root, s_item, cmp_shares);
k_unlink_item(shares_store, s_item);
@ -1898,10 +1925,13 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
shares_dumped++;
if (reloading && skipupdate && !error[0]) {
snprintf(error, sizeof(error),
"reload found aged shares: %"PRId64"/%"PRId64"/%s",
"reload found aged share: %"PRId64
"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername);
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
k_add_head(shares_free, s_item);
s_item = tmp_item;
@ -1916,19 +1946,17 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
LOGERR("%s(): %s", __func__, error);
}
if (conned)
PQfinish(conn);
if (ss_already || ss_failed || shares_dumped) {
/* If all were already aged, and no shares
* then we don't want a message */
if (!(ss_already == ss_tot && shares_tot == 0)) {
LOGERR("%s(): Summary aging of %"PRId64"/%s sstotal=%"PRId64
" already=%"PRId64" failed=%"PRId64
", sharestotal=%"PRId64" dumped=%"PRId64,
LOGERR("%s(): Summary aging of %"PRId64
"/%s sstotal=%"PRId64" already=%"PRId64
" failed=%"PRId64", sharestotal=%"PRId64
" dumped=%"PRId64", diff=%"PRId64,
__func__, workinfoid, poolinstance, ss_tot,
ss_already, ss_failed, shares_tot,
shares_dumped);
shares_dumped, diff_tot);
}
}
bye:
@ -2032,8 +2060,7 @@ void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff)
row->diffacc = row->diffsta = row->diffdup = row->diffhi =
row->diffrej = row->shareacc = row->sharesta = row->sharedup =
row->sharehi = row->sharerej = 0.0;
row->sharecount = row->errorcount = row->countlastupdate = 0;
row->reset = false;
row->sharecount = row->errorcount = 0;
row->firstshare.tv_sec = cd->tv_sec;
row->firstshare.tv_usec = cd->tv_usec;
row->lastshare.tv_sec = row->firstshare.tv_sec;
@ -2088,8 +2115,8 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
/* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */
void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd)
void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int64_t prev_found = 0;
@ -2156,10 +2183,9 @@ void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(conn, do_id, poolinstance,
by, code, inet, cd,
&ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
ok = workinfo_age(do_id, poolinstance, by, code, inet,
cd, &ss_first, &ss_last, &ss_count,
&s_count, &s_diff);
ss_count_tot += ss_count;
s_count_tot += s_count;
@ -2992,7 +3018,7 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
K_TREE *mu_root = NULL;
int usercount;
double ndiff, total_diff, diff_want, elapsed;
char ndiffbin[TXT_SML+1];
char ndiffbin[TXT_SML+1], rewardbuf[32];
double diff_times, diff_add;
char cd_buf[CDATE_BUFSIZ];
tv_t end_tv = { 0L, 0L };
@ -3361,6 +3387,39 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
d64 = blocks->reward * 9 / 1000;
g64 = blocks->reward - d64;
payouts->minerreward = g64;
/* We can hard code a miner reward for a block in optioncontrol
* if it ever needs adjusting - so just expire the payout and
* re-process the reward ... before it's paid */
bool oname;
oname = reward_override_name(blocks->height, rewardbuf,
sizeof(rewardbuf));
if (oname) {
OPTIONCONTROL *oc;
K_ITEM *oc_item;
// optioncontrol must be default limits or below these limits
oc_item = find_optioncontrol(rewardbuf, &now, blocks->height+1);
if (oc_item) {
int64_t override, delta;
char *moar = "more";
double per;
DATA_OPTIONCONTROL(oc, oc_item);
override = (int64_t)atol(oc->optionvalue);
delta = override - g64;
if (delta < 0) {
moar = "less";
delta = -delta;
}
per = 100.0 * (double)delta / (double)g64;
LOGWARNING("%s(): *** block %"PRId32" payout reward"
" overridden, was %"PRId64" now %"PRId64
" = %"PRId64" (%.4f%%) %s",
__func__, blocks->height,
g64, override, delta, per, moar);
payouts->minerreward = override;
}
}
payouts->workinfoidstart = begin_workinfoid;
payouts->workinfoidend = end_workinfoid;
payouts->elapsed = elapsed;
@ -3871,6 +3930,7 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL;
tv_t now;
bool ok;
K_RLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
@ -3912,8 +3972,15 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
else
setnow(&now);
return sharesummaries_to_markersummaries(NULL, workmarkers, by, code,
inet, &now, trf_root);
/* So we can't change any sharesummaries/markersummaries while a
* payout is being generated
* N.B. this is a long lock since it stores the markersummaries */
ck_wlock(&process_pplns_lock);
ok = sharesummaries_to_markersummaries(NULL, workmarkers, by, code,
inet, &now, trf_root);
ck_wunlock(&process_pplns_lock);
return ok;
}
void dsp_workmarkers(K_ITEM *item, FILE *stream)
@ -4435,3 +4502,167 @@ char *shiftcode(tv_t *createdate)
LOGDEBUG("%s() code_buf='%s'", __func__, code_buf);
return(code_buf);
}
// order by userid asc
cmp_t cmp_userinfo(K_ITEM *a, K_ITEM *b)
{
USERINFO *ua, *ub;
DATA_USERINFO(ua, a);
DATA_USERINFO(ub, b);
return CMP_BIGINT(ua->userid, ub->userid);
}
K_ITEM *_get_userinfo(int64_t userid, bool lock)
{
USERINFO userinfo;
K_TREE_CTX ctx[1];
K_ITEM look, *find;
userinfo.userid = userid;
INIT_USERINFO(&look);
look.data = (void *)(&userinfo);
if (lock)
K_RLOCK(userinfo_free);
find = find_in_ktree(userinfo_root, &look, cmp_userinfo, ctx);
if (lock)
K_RUNLOCK(userinfo_free);
return find;
}
K_ITEM *_find_create_userinfo(int64_t userid, bool lock, WHERE_FFL_ARGS)
{
K_ITEM *ui_item, *u_item;
USERS *users = NULL;
USERINFO *row;
ui_item = _get_userinfo(userid, lock);
if (!ui_item) {
if (lock)
K_RLOCK(users_free);
u_item = find_userid(userid);
if (lock)
K_RUNLOCK(users_free);
DATA_USERS_NULL(users, u_item);
if (lock)
K_WLOCK(userinfo_free);
ui_item = k_unlink_head(userinfo_free);
DATA_USERINFO(row, ui_item);
bzero(row, sizeof(*row));
row->userid = userid;
if (u_item)
STRNCPY(row->username, users->username);
else
bigint_to_buf(userid, row->username, sizeof(row->username));
userinfo_root = add_to_ktree(userinfo_root, ui_item, cmp_userinfo);
k_add_head(userinfo_store, ui_item);
if (lock)
K_WUNLOCK(userinfo_free);
}
return ui_item;
}
void _userinfo_update(SHARES *shares, SHARESUMMARY *sharesummary,
MARKERSUMMARY *markersummary, bool ss_sub, bool lock)
{
USERINFO *row;
K_ITEM *item;
if (shares) {
item = _find_userinfo(shares->userid, lock);
DATA_USERINFO(row, item);
if (lock)
K_WLOCK(userinfo_free);
switch (shares->errn) {
case SE_NONE:
row->diffacc += shares->diff;
row->shareacc++;
break;
case SE_STALE:
row->diffsta += shares->diff;
row->sharesta++;
break;
case SE_DUPE:
row->diffdup += shares->diff;
row->sharedup++;
break;
case SE_HIGH_DIFF:
row->diffhi += shares->diff;
row->sharehi++;
break;
default:
row->diffrej += shares->diff;
row->sharerej++;
break;
}
if (lock)
K_WUNLOCK(userinfo_free);
}
// Only during db load so no locking required
if (sharesummary) {
item = _find_userinfo(sharesummary->userid, false);
DATA_USERINFO(row, item);
if (ss_sub) {
row->diffacc -= sharesummary->diffacc;
row->diffsta -= sharesummary->diffsta;
row->diffdup -= sharesummary->diffdup;
row->diffhi -= sharesummary->diffhi;
row->diffrej -= sharesummary->diffrej;
row->shareacc -= sharesummary->shareacc;
row->sharesta -= sharesummary->sharesta;
row->sharedup -= sharesummary->sharedup;
row->sharehi -= sharesummary->sharehi;
row->sharerej -= sharesummary->sharerej;
} else {
row->diffacc += sharesummary->diffacc;
row->diffsta += sharesummary->diffsta;
row->diffdup += sharesummary->diffdup;
row->diffhi += sharesummary->diffhi;
row->diffrej += sharesummary->diffrej;
row->shareacc += sharesummary->shareacc;
row->sharesta += sharesummary->sharesta;
row->sharedup += sharesummary->sharedup;
row->sharehi += sharesummary->sharehi;
row->sharerej += sharesummary->sharerej;
}
}
// Only during db load so no locking required
if (markersummary) {
item = _find_userinfo(markersummary->userid, false);
DATA_USERINFO(row, item);
row->diffacc += markersummary->diffacc;
row->diffsta += markersummary->diffsta;
row->diffdup += markersummary->diffdup;
row->diffhi += markersummary->diffhi;
row->diffrej += markersummary->diffrej;
row->shareacc += markersummary->shareacc;
row->sharesta += markersummary->sharesta;
row->sharedup += markersummary->sharedup;
row->sharehi += markersummary->sharehi;
row->sharerej += markersummary->sharerej;
}
}
// N.B. good blocks = blocks - orphans
void _userinfo_block(BLOCKS *blocks, bool isnew, bool lock)
{
USERINFO *row;
K_ITEM *item;
item = find_userinfo(blocks->userid);
DATA_USERINFO(row, item);
if (lock)
K_WLOCK(userinfo_free);
if (isnew) {
row->blocks++;
copy_tv(&(row->last_block), &(blocks->createdate));
} else
row->orphans++;
if (lock)
K_WUNLOCK(userinfo_free);
}

812
src/ckdb_dbio.c

@ -9,6 +9,17 @@
#include "ckdb.h"
// Doesn't work with negative numbers ...
void pcom(int n)
{
if (n < 1000)
printf("%d", n);
else {
pcom(n/1000);
printf(",%03d", n % 1000);
}
}
char *pqerrmsg(PGconn *conn)
{
char *ptr, *buf = strdup(PQerrorMessage(conn));
@ -2291,6 +2302,7 @@ nostart:
DATA_OPTIONCONTROL(optioncontrol, old_item);
optioncontrol_root = remove_from_ktree(optioncontrol_root, old_item,
cmp_optioncontrol);
k_unlink_item(optioncontrol_store, old_item);
FREENULL(optioncontrol->optionvalue);
k_add_head(optioncontrol_free, old_item);
}
@ -2624,6 +2636,9 @@ bool workinfo_fill(PGconn *conn)
LOGDEBUG("%s(): select", __func__);
printf(TICK_PREFIX"wi 0\r");
fflush(stdout);
// TODO: select the data based on sharesummary since old data isn't needed
// however, the ageing rules for workinfo will decide that also
// keep the last block + current? Rules will depend on payout scheme also
@ -2760,8 +2775,17 @@ bool workinfo_fill(PGconn *conn)
workinfo_height_root = add_to_ktree(workinfo_height_root, item, cmp_workinfo_height);
k_add_head(workinfo_store, item);
if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate)))
if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) {
copy_tv(&(dbstatus.newest_createdate_workinfo), &(row->createdate));
dbstatus.newest_workinfoid = row->workinfoid;
}
if (i == 0 || ((i+1) % 100000) == 0) {
printf(TICK_PREFIX"wi ");
pcom(i+1);
putchar('\r');
fflush(stdout);
}
tick();
}
@ -2834,20 +2858,15 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root)
// Reloading a share already summarised
return true;
}
if (!sharesummary->reset) {
zero_sharesummary(sharesummary,
&(shares->createdate),
shares->diff);
sharesummary->reset = true;
}
}
}
if (!confirm_sharesummary)
if (!confirm_sharesummary) {
workerstatus_update(NULL, shares, NULL);
userinfo_update(shares, NULL, NULL);
}
sharesummary_update(conn, shares, NULL, NULL, shares->createby,
sharesummary_update(shares, NULL, NULL, shares->createby,
shares->createcode, shares->createinet,
&(shares->createdate));
@ -3142,17 +3161,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
FREENULL(st);
return false;
}
if (!sharesummary->reset) {
zero_sharesummary(sharesummary,
&(shareerrors->createdate),
0.0);
sharesummary->reset = true;
}
}
}
sharesummary_update(conn, NULL, shareerrors, NULL,
sharesummary_update(NULL, shareerrors, NULL,
shareerrors->createby,
shareerrors->createcode,
shareerrors->createinet,
@ -3441,12 +3453,11 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_ITEM *p_ss_item, *p_ms_item;
bool ok = false, conned = false;
int64_t diffacc, shareacc;
char *reason = NULL, *tuples = NULL;
char *reason = NULL;
char *params[2];
int n, par = 0, deleted = -7;
int n, par = 0;
int ss_count, ms_count;
char *st = NULL;
char *del;
LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s",
@ -3595,37 +3606,6 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
ms_item = ms_item->next;
}
if (old_sharesummary_store->count > 0) {
par = 0;
params[par++] = bigint_to_buf(workmarkers->workinfoidstart, NULL, 0);
params[par++] = bigint_to_buf(workmarkers->workinfoidend, NULL, 0);
PARCHK(par, params);
del = "delete from sharesummary "
"where workinfoid >= $1 and workinfoid <= $2";
res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples)
deleted = atoi(tuples);
}
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Delete", rescode, conn);
reason = "delete failure";
goto rollback;
}
if (deleted != old_sharesummary_store->count) {
LOGERR("%s() processed sharesummaries=%d but deleted=%d",
shortname, old_sharesummary_store->count, deleted);
reason = "delete mismatch";
goto rollback;
}
}
ok = workmarkers_process(conn, true, true,
workmarkers->markerid,
workmarkers->poolinstance,
@ -3760,6 +3740,8 @@ flail:
return ok;
}
// no longer used
#if 0
static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row)
{
p_row->diffacc += row->diffacc;
@ -3783,6 +3765,7 @@ static void sharesummary_to_pool(SHARESUMMARY *p_row, SHARESUMMARY *row)
p_row->lastdiffacc = row->lastdiffacc;
}
}
#endif
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new,
@ -3858,22 +3841,18 @@ char *ooo_status(char *buf, size_t siz)
return buf;
}
bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS)
// No longer stored in the DB but fields are updated as before
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS)
{
ExecStatusType rescode;
PGresult *res = NULL;
WORKMARKERS *wm;
SHARESUMMARY *row, *p_row;
K_ITEM *item, *wm_item, *p_item = NULL;
char *ins, *upd;
bool ok = false, new = false, p_new = false;
char *params[19 + MODIFYDATECOUNT];
int n, par = 0;
bool new = false, p_new = false;
int64_t userid, workinfoid;
char *workername;
tv_t *createdate;
bool must_update = false, conned = false;
char *st = NULL, *db = NULL;
char ooo_buf[256];
double tdf, tdl;
@ -3889,7 +3868,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
}
item = ss_item;
DATA_SHARESUMMARY(row, item);
must_update = true;
row->complete[0] = SUMMARY_COMPLETE;
row->complete[1] = '\0';
} else {
@ -3950,8 +3928,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
row->workername = strdup(workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
row->workinfoid = workinfoid;
row->inserted = false;
row->saveaged = false;
}
// N.B. this directly updates the non-key data
@ -4010,7 +3986,6 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
}
}
// p_items are ram only
if (p_item) {
DATA_SHARESUMMARY(p_row, p_item);
} else {
@ -4028,161 +4003,9 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl);
}
// During startup, don't save 'new' sharesummaries, to reduce DB I/O
// ... and also during normal processing
if (row->complete[0] == SUMMARY_NEW)
goto startupskip;
if (conn == NULL && !confirm_sharesummary) {
conn = dbconnect();
conned = true;
}
if (new || !(row->inserted)) {
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into sharesummary "
"(userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL ") values (" PQPARAM27 ")";
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
row->inserted = true;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
} else {
bool stats_update = false;
MODIFYUPDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
if ((row->countlastupdate + SHARESUMMARY_UPDATE_EVERY) <
(row->sharecount + row->errorcount))
stats_update = true;
if (must_update && row->countlastupdate < (row->sharecount + row->errorcount))
stats_update = true;
if (stats_update) {
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 23, params);
upd = "update sharesummary "
"set diffacc=$4,diffsta=$5,diffdup=$6,diffhi=$7,diffrej=$8,"
"shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12,"
"sharerej=$13,firstshare=$14,lastshare=$15,"
"sharecount=$16,errorcount=$17,lastdiffacc=$18,complete=$19"
","MDDB"=$20,"MBYDB"=$21,"MCODEDB"=$22,"MINETDB"=$23 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
} else {
if (!must_update) {
ok = true;
goto late;
} else {
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = str_to_buf(row->complete, NULL, 0);
MODIFYUPDATEPARAMS(params, par, row);
PARCHKVAL(par, 8, params);
upd = "update sharesummary "
"set complete=$4,"MDDB"=$5,"MBYDB"=$6,"MCODEDB"=$7,"MINETDB"=$8 "
"where userid=$1 and workername=$2 and workinfoid=$3";
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("MustUpdate", rescode, conn);
goto unparam;
}
}
row->countlastupdate = row->sharecount + row->errorcount;
if (row->complete[0] == SUMMARY_COMPLETE)
row->saveaged = true;
}
}
}
startupskip:
ok = true;
unparam:
if (par) {
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
}
late:
if (conned)
PQfinish(conn);
// We keep the new item no matter what 'ok' is, since it will be inserted later
// Store either new item
if (new || p_new) {
K_WLOCK(sharesummary_free);
if (new) {
@ -4201,265 +4024,7 @@ late:
K_WUNLOCK(sharesummary_free);
}
return ok;
}
bool sharesummary_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ctx[1];
K_ITEM *item, *m_item, *p_item;
int n, i, par = 0, p_n;
SHARESUMMARY *row, *p_row;
MARKS *marks;
char *params[2];
char *field;
char *sel;
int fields = 19;
bool ok;
LOGDEBUG("%s(): select", __func__);
/* Load needs to go back to the last marks workinfoid(+1)
* If it is later than that, we can't create markersummaries
* since some of the required data is missing -
* thus we also can't make the shift markersummaries */
m_item = last_in_ktree(marks_root, ctx);
if (!m_item) {
if (dbload_workinfoid_start != -1) {
sharesummary_marks_limit = true;
LOGWARNING("WARNING: dbload -w start used "
"but there are no marks ...");
}
} else {
DATA_MARKS(marks, m_item);
if (dbload_workinfoid_start > marks->workinfoid) {
sharesummary_marks_limit = true;
LOGWARNING("WARNING: dbload -w start %"PRId64
" is after the last mark %"PRId64" ...",
dbload_workinfoid_start,
marks->workinfoid);
}
}
if (sharesummary_marks_limit) {
LOGWARNING("WARNING: ... markersummaries cannot be created "
"and pplns calculations may be wrong");
}
sel = "select "
"userid,workername,workinfoid,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL
" from sharesummary where workinfoid>=$1 and workinfoid<=$2";
par = 0;
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
return false;
}
n = PQnfields(res);
if (n != (fields + MODIFYDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + MODIFYDATECOUNT, n);
PQclear(res);
return false;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
//K_WLOCK(sharesummary_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
break;
}
row->inserted = true;
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_PTR("workername", field, row->workername);
LIST_MEM_ADD(sharesummary_free, row->workername);
PQ_GET_FLD(res, i, "workinfoid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("workinfoid", field, row->workinfoid);
PQ_GET_FLD(res, i, "diffacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffacc", field, row->diffacc);
PQ_GET_FLD(res, i, "diffsta", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffsta", field, row->diffsta);
PQ_GET_FLD(res, i, "diffdup", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffdup", field, row->diffdup);
PQ_GET_FLD(res, i, "diffhi", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffhi", field, row->diffhi);
PQ_GET_FLD(res, i, "diffrej", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diffrej", field, row->diffrej);
PQ_GET_FLD(res, i, "shareacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("shareacc", field, row->shareacc);
PQ_GET_FLD(res, i, "sharesta", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharesta", field, row->sharesta);
PQ_GET_FLD(res, i, "sharedup", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharedup", field, row->sharedup);
PQ_GET_FLD(res, i, "sharehi", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharehi", field, row->sharehi);
PQ_GET_FLD(res, i, "sharerej", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sharerej", field, row->sharerej);
PQ_GET_FLD(res, i, "sharecount", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("sharecount", field, row->sharecount);
PQ_GET_FLD(res, i, "errorcount", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("errorcount", field, row->errorcount);
row->countlastupdate = row->sharecount + row->errorcount;
PQ_GET_FLD(res, i, "firstshare", field, ok);
if (!ok)
break;
TXT_TO_TV("firstshare", field, row->firstshare);
PQ_GET_FLD(res, i, "lastshare", field, ok);
if (!ok)
break;
TXT_TO_TV("lastshare", field, row->lastshare);
PQ_GET_FLD(res, i, "lastdiffacc", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc);
PQ_GET_FLD(res, i, "complete", field, ok);
if (!ok)
break;
TXT_TO_STR("complete", field, row->complete);
MODIFYDATEFLDPOINTERS(sharesummary_free, res, i, row, ok);
if (!ok)
break;
sharesummary_root = add_to_ktree(sharesummary_root, item, cmp_sharesummary);
sharesummary_workinfoid_root = add_to_ktree(sharesummary_workinfoid_root, item, cmp_sharesummary_workinfoid);
k_add_head(sharesummary_store, item);
// A share summary is shares in a single workinfo, at all 3 levels n,a,y
if (tolower(row->complete[0]) == SUMMARY_NEW) {
if (dbstatus.oldest_sharesummary_firstshare_n.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare))) {
copy_tv(&(dbstatus.oldest_sharesummary_firstshare_n), &(row->firstshare));
dbstatus.oldest_workinfoid_n = row->workinfoid;
}
} else {
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare)))
copy_tv(&(dbstatus.newest_sharesummary_firstshare_ay), &(row->firstshare));
if (tolower(row->complete[0]) == SUMMARY_COMPLETE) {
if (dbstatus.oldest_sharesummary_firstshare_a.tv_sec == 0 ||
!tv_newer(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare))) {
copy_tv(&(dbstatus.oldest_sharesummary_firstshare_a), &(row->firstshare));
dbstatus.oldest_workinfoid_a = row->workinfoid;
}
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare))) {
copy_tv(&(dbstatus.newest_sharesummary_firstshare_a), &(row->firstshare));
dbstatus.newest_workinfoid_a = row->workinfoid;
}
} else /* SUMMARY_CONFIRM */ {
if (tv_newer(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare))) {
copy_tv(&(dbstatus.newest_sharesummary_firstshare_y), &(row->firstshare));
dbstatus.newest_workinfoid_y = row->workinfoid;
}
}
}
p_item = find_sharesummary_p(row->workinfoid);
if (!p_item) {
p_item = k_unlink_head(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
POOL_SS(p_row);
LIST_MEM_ADD(sharesummary_free, p_row->workername);
p_row->workinfoid = row->workinfoid;
sharesummary_pool_root = add_to_ktree(sharesummary_pool_root,
p_item,
cmp_sharesummary);
k_add_head(sharesummary_pool_store, p_item);
} else {
DATA_SHARESUMMARY(p_row, p_item);
}
sharesummary_to_pool(p_row, row);
tick();
}
if (!ok) {
FREENULL(row->workername);
k_add_head(sharesummary_free, item);
}
p_n = sharesummary_pool_store->count;
//K_WUNLOCK(sharesummary_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d sharesummary records", __func__, n);
LOGWARNING("%s(): created %d sharesummary pool records", __func__, p_n);
}
return ok;
return true;
}
bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
@ -4743,6 +4308,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash,
}
// We didn't use a Begin
ok = true;
userinfo_block(row, true);
goto unparam;
break;
case BLOCKS_ORPHAN:
@ -4877,6 +4443,8 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash,
}
update_old = true;
if (confirmed[0] == BLOCKS_ORPHAN)
userinfo_block(row, false);
break;
default:
LOGERR("%s(): %s.failed.invalid confirm='%s'",
@ -5131,6 +4699,12 @@ bool blocks_fill(PGconn *conn)
pool.workinfoid = row->workinfoid;
pool.height = row->height;
}
if (CURRENT(&(row->expirydate))) {
_userinfo_block(row, true, false);
if (row->confirmed[0] == BLOCKS_ORPHAN)
_userinfo_block(row, false, false);
}
}
if (!ok)
k_add_head(blocks_free, item);
@ -5512,6 +5086,245 @@ unparam:
return ok;
}
/* Expire the entire payout, miningpayouts and payments
* If it returns false, nothing was changed
* and a console message will say why */
K_ITEM *payouts_full_expire(PGconn *conn, int64_t payoutid, tv_t *now, bool lock)
{
bool locked = false, conned = false, begun = false, ok = false;
K_TREE_CTX mp_ctx[1], pm_ctx[1];
K_ITEM *po_item = NULL, *mp_item, *pm_item, *next_item;
PAYMENTS *payments = NULL;
MININGPAYOUTS *mp = NULL;
PAYOUTS *payouts = NULL;
ExecStatusType rescode;
PGresult *res;
char *params[8];
int n, par = 0;
char *upd, *tuples = NULL;
int po_upd = -7, mp_upd = -7, pm_upd = -7;
// If not already done before calling
if (lock)
ck_wlock(&process_pplns_lock);
// This will be rare so a full lock is best
K_WLOCK(payouts_free);
K_WLOCK(miningpayouts_free);
K_WLOCK(payments_free);
locked = true;
po_item = find_payoutid(payoutid);
if (!po_item) {
LOGERR("%s(): unknown payoutid %"PRId64, __func__, payoutid);
goto matane;
}
conned = CKPQConn(&conn);
begun = CKPQBegin(conn);
if (!begun)
goto matane;
upd = "update payouts set "EDDB"=$1 where payoutid=$2 and "EDDB"=$3";
par = 0;
params[par++] = tv_to_buf(now, NULL, 0);
params[par++] = bigint_to_buf(payoutid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples) {
po_upd = atoi(tuples);
if (po_upd != 1) {
LOGERR("%s() updated payouts should be 1"
" but updated=%d",
__func__, po_upd);
goto matane;
}
}
}
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update payouts", rescode, conn);
goto matane;
}
for (n = 0; n < par; n++)
free(params[n]);
upd = "update miningpayouts set "EDDB"=$1 where payoutid=$2 and "EDDB"=$3";
par = 0;
params[par++] = tv_to_buf(now, NULL, 0);
params[par++] = bigint_to_buf(payoutid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples)
mp_upd = atoi(tuples);
}
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update miningpayouts", rescode, conn);
goto matane;
}
for (n = 0; n < par; n++)
free(params[n]);
upd = "update payments set "EDDB"=$1 where payoutid=$2 and "EDDB"=$3";
par = 0;
params[par++] = tv_to_buf(now, NULL, 0);
params[par++] = bigint_to_buf(payoutid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples)
pm_upd = atoi(tuples);
}
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update payments", rescode, conn);
goto matane;
}
for (n = 0; n < par; n++)
free(params[n]);
par = 0;
// Check miningpayouts failure condition
mp_item = first_miningpayouts(payoutid, mp_ctx);
if (!mp_item) {
if (mp_upd != 0) {
LOGERR("%s() updated miningpayouts should be 0 but"
" updated=%d",
__func__, mp_upd);
goto matane;
}
} else {
int count = 0;
DATA_MININGPAYOUTS(mp, mp_item);
while (mp_item && mp->payoutid == payoutid) {
if (CURRENT(&(mp->expirydate)))
count++;
mp_item = next_in_ktree(mp_ctx);
DATA_MININGPAYOUTS_NULL(mp, mp_item);
}
if (count != mp_upd) {
LOGERR("%s() updated miningpayouts should be %d but"
" updated=%d",
__func__, count, mp_upd);
goto matane;
}
}
/* Check payments failure condition
*
* This does a full table search since there is no index
* This should be so rare that adding an index/tree for it
* would be a waste */
pm_item = first_in_ktree(payments_root, pm_ctx);
if (!pm_item) {
if (pm_upd != 0) {
LOGERR("%s() updated payments should be 0 but"
" updated=%d",
__func__, pm_upd);
goto matane;
}
} else {
int count = 0;
DATA_PAYMENTS(payments, pm_item);
while (pm_item) {
if (payments->payoutid == payoutid &&
CURRENT(&(payments->expirydate))) {
count++;
}
pm_item = next_in_ktree(pm_ctx);
DATA_PAYMENTS_NULL(payments, pm_item);
}
if (count != pm_upd) {
LOGERR("%s() updated payments should be %d but"
" updated=%d",
__func__, count, pm_upd);
goto matane;
}
}
// No more possible errors, so update the ram tables
DATA_PAYOUTS(payouts, po_item);
payouts_root = remove_from_ktree(payouts_root, po_item, cmp_payouts);
payouts_id_root = remove_from_ktree(payouts_id_root, po_item, cmp_payouts_id);
copy_tv(&(payouts->expirydate), now);
payouts_root = add_to_ktree(payouts_root, po_item, cmp_payouts);
payouts_id_root = add_to_ktree(payouts_id_root, po_item, cmp_payouts_id);
mp_item = first_miningpayouts(payoutid, mp_ctx);
DATA_MININGPAYOUTS_NULL(mp, mp_item);
while (mp_item && mp->payoutid == payoutid) {
if (CURRENT(&(mp->expirydate))) {
next_item = next_in_ktree(mp_ctx);
miningpayouts_root = remove_from_ktree(miningpayouts_root, mp_item, cmp_miningpayouts);
copy_tv(&(mp->expirydate), now);
miningpayouts_root = add_to_ktree(miningpayouts_root, mp_item, cmp_miningpayouts);
mp_item = next_item;
} else
mp_item = next_in_ktree(mp_ctx);
DATA_MININGPAYOUTS_NULL(mp, mp_item);
}
pm_item = first_in_ktree(payments_root, pm_ctx);
DATA_PAYMENTS_NULL(payments, pm_item);
while (pm_item) {
if (payments->payoutid == payoutid &&
CURRENT(&(payments->expirydate))) {
next_item = next_in_ktree(pm_ctx);
payments_root = remove_from_ktree(payments_root, pm_item, cmp_payments);
copy_tv(&(payments->expirydate), now);
payments_root = add_to_ktree(payments_root, pm_item, cmp_payments);
pm_item = next_item;
} else
pm_item = next_in_ktree(pm_ctx);
DATA_PAYMENTS_NULL(payments, pm_item);
}
ok = true;
matane:
if (begun)
CKPQEnd(conn, ok);
if (locked) {
K_WUNLOCK(payments_free);
K_WUNLOCK(miningpayouts_free);
K_WUNLOCK(payouts_free);
}
CKPQDisco(&conn, conned);
if (lock)
ck_wunlock(&process_pplns_lock);
for (n = 0; n < par; n++)
free(params[n]);
if (ok)
return po_item;
else
return NULL;
}
bool payouts_fill(PGconn *conn)
{
ExecStatusType rescode;
@ -5836,7 +5649,15 @@ bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
bool show_msg = true;
char *code;
if (igndup) {
code = PQresultErrorField(res, PG_DIAG_SQLSTATE);
if (code && strcmp(code, SQL_UNIQUE_VIOLATION) == 0)
show_msg = false;
}
if (show_msg)
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
@ -6304,6 +6125,9 @@ bool markersummary_fill(PGconn *conn)
LOGDEBUG("%s(): select", __func__);
printf(TICK_PREFIX"ms 0\r");
fflush(stdout);
// TODO: limit how far back
sel = "select "
"markerid,userid,workername,diffacc,diffsta,diffdup,diffhi,"
@ -6461,6 +6285,15 @@ bool markersummary_fill(PGconn *conn)
markersummary_to_pool(p_row, row);
_userinfo_update(NULL, NULL, row, false, false);
if (i == 0 || ((i+1) % 100000) == 0) {
printf(TICK_PREFIX"ms ");
pcom(i+1);
putchar('\r');
fflush(stdout);
}
tick();
}
if (!ok) {
@ -6469,6 +6302,7 @@ bool markersummary_fill(PGconn *conn)
}
p_n = markersummary_pool_store->count;
//K_WUNLOCK(markersummary_free);
PQclear(res);
@ -6728,7 +6562,8 @@ bool workmarkers_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
K_ITEM *item, *wi_item;
WORKINFO *workinfo;
int n, i;
WORKMARKERS *row;
char *field;
@ -6815,6 +6650,23 @@ bool workmarkers_fill(PGconn *conn)
item, cmp_workmarkers_workinfoid);
k_add_head(workmarkers_store, item);
if (dbstatus.newest_workmarker_workinfoid < row->workinfoidend) {
dbstatus.newest_workmarker_workinfoid = row->workinfoidend;
wi_item = find_workinfo(row->workinfoidend, NULL);
if (!wi_item) {
LOGEMERG("%s(): FAILURE workmarkerid %"PRId64
" wid end %"PRId64" doesn't exist! "
"You should abort ckdb and fix it, "
" since the reload may skip some data",
__func__, row->markerid,
row->workinfoidend);
} else {
DATA_WORKINFO(workinfo, wi_item);
copy_tv(&(dbstatus.newest_createdate_workmarker_workinfo),
&(workinfo->createdate));
}
}
tick();
}
if (!ok)

14
src/libckpool.c

@ -1619,9 +1619,8 @@ char *http_base64(const char *src)
void address_to_pubkeytxn(char *pkh, const char *addr)
{
char b58bin[25];
char b58bin[25] = {};
memset(b58bin, 0, 25);
b58tobin(b58bin, addr);
pkh[0] = 0x76;
pkh[1] = 0xa9;
@ -1631,6 +1630,17 @@ void address_to_pubkeytxn(char *pkh, const char *addr)
pkh[24] = 0xac;
}
void address_to_scripttxn(char *pkh, const char *addr)
{
char b58bin[25] = {};
b58tobin(b58bin, addr);
pkh[0] = 0xa9;
pkh[1] = 0x14;
memcpy(&pkh[2], &b58bin[1], 20);
pkh[22] = 0x87;
}
/* For encoding nHeight into coinbase, return how many bytes were used */
int ser_number(uchar *s, int32_t val)
{

1
src/libckpool.h

@ -537,6 +537,7 @@ int safecmp(const char *a, const char *b);
bool cmdmatch(const char *buf, const char *cmd);
void address_to_pubkeytxn(char *pkh, const char *addr);
void address_to_scripttxn(char *pkh, const char *addr);
int ser_number(uchar *s, int32_t val);
int get_sernumber(uchar *s);
bool fulltest(const uchar *hash, const uchar *target);

41
src/stratifier.c

@ -228,9 +228,9 @@ struct stratum_instance {
* instance_lock */
int ref;
char enonce1[32];
char enonce1[36]; /* Fit up to 16 byte binary enonce1 */
uchar enonce1bin[16];
char enonce1var[12];
char enonce1var[20]; /* Fit up to 8 byte binary enonce1var */
uint64_t enonce1_64;
int session_id;
@ -390,7 +390,9 @@ struct stratifier_data {
ckpool_t *ckp;
char pubkeytxnbin[25];
int pubkeytxnlen;
char donkeytxnbin[25];
int donkeytxnlen;
pool_stats_t stats;
/* Protects changes to pool stats */
@ -599,18 +601,18 @@ static void generate_coinbase(const ckpool_t *ckp, workbase_t *wb)
*u64 = htole64(g64);
wb->coinb2len += 8;
wb->coinb2bin[wb->coinb2len++] = 25;
memcpy(wb->coinb2bin + wb->coinb2len, sdata->pubkeytxnbin, 25);
wb->coinb2len += 25;
wb->coinb2bin[wb->coinb2len++] = sdata->pubkeytxnlen;
memcpy(wb->coinb2bin + wb->coinb2len, sdata->pubkeytxnbin, sdata->pubkeytxnlen);
wb->coinb2len += sdata->pubkeytxnlen;
if (ckp->donvalid) {
u64 = (uint64_t *)&wb->coinb2bin[wb->coinb2len];
*u64 = htole64(d64);
wb->coinb2len += 8;
wb->coinb2bin[wb->coinb2len++] = 25;
memcpy(wb->coinb2bin + wb->coinb2len, sdata->donkeytxnbin, 25);
wb->coinb2len += 25;
wb->coinb2bin[wb->coinb2len++] = sdata->donkeytxnlen;
memcpy(wb->coinb2bin + wb->coinb2len, sdata->donkeytxnbin, sdata->donkeytxnlen);
wb->coinb2len += sdata->donkeytxnlen;
}
wb->coinb2len += 4; // Blank lock
@ -5903,6 +5905,13 @@ static void read_poolstats(ckpool_t *ckp)
}
}
/* Braindead check to see if this btcaddress is an M of N script address which
* is currently unsupported as a generation address. */
static bool script_address(const char *btcaddress)
{
return btcaddress[0] == '3';
}
int stratifier(proc_instance_t *pi)
{
pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat;
@ -5938,11 +5947,23 @@ int stratifier(proc_instance_t *pi)
/* Store this for use elsewhere */
hex2bin(scriptsig_header_bin, scriptsig_header, 41);
address_to_pubkeytxn(sdata->pubkeytxnbin, ckp->btcaddress);
if (script_address(ckp->btcaddress)) {
address_to_scripttxn(sdata->pubkeytxnbin, ckp->btcaddress);
sdata->pubkeytxnlen = 23;
} else {
address_to_pubkeytxn(sdata->pubkeytxnbin, ckp->btcaddress);
sdata->pubkeytxnlen = 25;
}
if (test_address(ckp, ckp->donaddress)) {
ckp->donvalid = true;
address_to_pubkeytxn(sdata->donkeytxnbin, ckp->donaddress);
if (script_address(ckp->donaddress)) {
sdata->donkeytxnlen = 23;
address_to_scripttxn(sdata->donkeytxnbin, ckp->donaddress);
} else {
sdata->donkeytxnlen = 25;
address_to_pubkeytxn(sdata->donkeytxnbin, ckp->donaddress);
}
}
}

Loading…
Cancel
Save