Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 8 years ago
parent
commit
fed222c6c6
  1. 11
      pool/base.php
  2. 191
      pool/page_blocks.php
  3. 20
      pool/page_ckp.php
  4. 4
      pool/page_shifts.php
  5. 519
      src/ckdb.c
  6. 176
      src/ckdb.h
  7. 27
      src/ckdb.php
  8. 427
      src/ckdb_cmd.c
  9. 123
      src/ckdb_data.c
  10. 2170
      src/ckdb_dbio.c
  11. 199
      src/klist.c
  12. 25
      src/klist.h
  13. 6
      src/ktree.c

11
pool/base.php

@ -479,8 +479,19 @@ session_start();
#
include_once('db.php');
#
global $disable_login;
$disable_login = false;
if (file_exists('../pool/disable_login.php'))
include_once('../pool/disable_login.php');
#
function validUserPass($user, $pass, $twofa)
{
global $disable_login;
if (function_exists('checklogin'))
checklogin($user);
if ($disable_login == true)
exit(0);
#
$rep = checkPass($user, $pass, $twofa);
if ($rep != null)
$ans = repDecode($rep);

191
pool/page_blocks.php

@ -69,27 +69,47 @@ function pctcolour($pct)
return array($fg, $bg);
}
#
function doblocks($data, $user)
function mthcolour($luck)
{
$blink = '<a href=https://www.blocktrail.com/BTC/block/';
$pg = '';
if ($user === null)
$ans = getBlocks('Anon');
if ($luck == 1.0)
{
$fg = 'white';
$bg = 'black';
}
else if ($luck > 1.0)
{
// 1.0 .. 1.1 (> 1.1 = max)
$grn = ($luck - 1.0) * 2550.0;
if ($grn > 255)
$grn = 255;
if ($grn < 0)
$grn = 0;
if ($grn > 190)
$fg = 'blue';
else
$ans = getBlocks($user);
if (nuem(getparam('csv', true)))
$wantcsv = false;
$fg = 'white';
$bg = sprintf("#00%02x00", $grn);
}
else
$wantcsv = true;
if ($wantcsv === false)
{
if ($ans['STATUS'] == 'ok' and isset($ans['s_rows']) and $ans['s_rows'] > 0)
// 0.9 .. 1.0 (< 0.9 = max)
$red = (1.0 - $luck) * 2550.0;
if ($red > 255)
$red = 255;
if ($red < 0)
$red = 0;
$fg = 'white';
$bg = sprintf("#%02x0000", $red);
}
return array($fg, $bg);
}
#
function statstable($poolfee, $ans, $data)
{
$pg .= '<h1>Block Statistics</h1>';
if ($ans['STATUS'] != 'ok' or !isset($ans['s_rows']) or $ans['s_rows'] < 1)
return '';
$pg = '<h1>Block Statistics</h1>';
$pg .= "<table cellpadding=0 cellspacing=0 border=0>\n";
$pg .= "<thead><tr class=title>";
$pg .= "<td class=dl>Description</td>";
@ -131,7 +151,6 @@ function doblocks($data, $user)
$luck = number_format(100 * $ans['s_luck:'.$i], 2);
$txm = number_format(100 * $ans['s_txmean:'.$i], 1);
$poolfee = 0.9; # pool fee as a % out of 100
$o = number_format((100 - $poolfee) * $ans['s_txmean:'.$i] / $ans['s_diffmean:'.$i], 2);
$pg .= "<tr class=$row>";
@ -146,7 +165,145 @@ function doblocks($data, $user)
$pg .= "</tr>\n";
}
$pg .= "</tbody></table>\n";
return $pg;
}
#
function monthtable($poolfee, $ans, $limit)
{
if ($ans['STATUS'] != 'ok' or !isset($ans['rows']) or $ans['rows'] < 1)
return '';
$nowmon = intval(gmdate('n', $ans['STAMP']));
$nowyyyy = intval(gmdate('Y', $ans['STAMP']));
$pg = '<h1>Monthly Statistics</h1>';
$pg .= "<table cellpadding=0 cellspacing=0 border=0>\n";
$pg .= "<thead><tr class=title>";
$pg .= "<td class=dl>UTC Month</td>";
$pg .= "<td class=dr>Pool Avg</td>";
$pg .= "<td class=dr>Blocks</td>";
$pg .= "<td class=dr>Expected</td>";
$pg .= "<td class=dr>Mean Diff%</td>";
$pg .= "<td class=dr>MeanTx%</td>";
$pg .= "<td class=dr>Luck%</td>";
$pg .= "<td class=dr>PPS%</td>";
$pg .= "</tr></thead>\n";
$pg .= '<tbody>';
$count = $ans['rows'];
$rout = $bcount = $bcd = $bmon = $byyyy = $bdiffacc = $bdiffratio = $btxn = 0;
$skipped = false;
for ($i = 0; $i < $count; $i++)
{
$conf = $ans['confirmed:'.$i];
// Skip leading orphans
if (!$skipped && ($conf == 'O' || $conf == 'R'))
continue;
$skipped = true;
// If anything is missing, skip this table
$diffratio = $ans['diffratio:'.$i];
if ($diffratio == '?')
break;
$cd = $ans['firstcreatedate:'.$i];
$mon = intval(gmdate('n', $cd));
$yyyy = intval(gmdate('Y', $cd));
// all orphans after a block must be included with that block
if (($conf != 'O' && $conf != 'R')
&& ($mon != $bmon || $yyyy != $byyyy))
{
if ($bcount != 0)
{
if (($rout % 2) == 0)
$row = 'even';
else
$row = 'odd';
if ($bmon == $nowmon && $byyyy == $nowyyyy)
$dots = '&hellip;';
else
$dots = '';
$elap = $bcd - $cd;
$phr = ($bdiffacc / $elap) * pow(2, 32);
$phrdsp = siprefmt($phr);
$name = gmdate('Y M', $bcd);
$exc = number_format($bdiffratio, 2);
if ($bdiffratio > $bcount)
$bcol = 'darkred';
else
$bcol = 'darkgreen';
$md = number_format(100 * $bdiffratio / $bcount, 2);
$mr = number_format(100 * $btxn / $bcount, 2);
$ml = $bcount / $bdiffratio;
$mldsp = number_format(100 * $ml, 2);
$oa = (100 - $poolfee) * ($bcount / $bdiffratio) * ($btxn / $bcount);
$odsp = number_format($oa, 2);
list($fg, $bg) = mthcolour($ml);
$pg .= "<tr class=$row>";
$pg .= "<td class=dl>$name$dots</td>";
$pg .= "<td class=dr>${phrdsp}Hs</td>";
$pg .= "<td class=dr bgcolor=$bg><font color=$fg>$bcount</font></td>";
$pg .= "<td class=dr>$exc</td>";
$pg .= "<td class=dr>$md%</td>";
$pg .= "<td class=dr>$mr%</td>";
$pg .= "<td class=dr>$mldsp%</td>";
$pg .= "<td class=dr>$odsp%</td>";
$pg .= "</tr>\n";
$rout++;
}
if ($rout > $limit)
break;
$bcd = $cd;
$bmon = $mon;
$byyyy = $yyyy;
$bcount = $bdiffacc = $bdiffratio = $btxn = 0;
}
$bdiffratio += floatval($ans['diffratio:'.$i]);
$bdiffacc += floatval($ans['diffacc:'.$i]);
if ($conf != 'O' and $conf != 'R')
{
$height = $ans['height:'.$i];
$reward = floatval($ans['reward:'.$i]);
$re = 5000000000.0 * pow(0.5, floor($height / 210000.0));
$btxn += $reward / $re;
$bcount++;
}
}
$pg .= '</tbody></table>';
return $pg;
}
#
function doblocks($data, $user)
{
$blink = '<a href=https://www.blocktrail.com/BTC/block/';
$poolfee = 0.9; # pool fee as a % out of 100
$pg = '';
if ($user === null)
$ans = getBlocks('Anon');
else
$ans = getBlocks($user);
if (nuem(getparam('csv', true)))
$wantcsv = false;
else
$wantcsv = true;
if ($wantcsv === false)
{
$pg .= statstable($poolfee, $ans, $data);
$pg .= monthtable($poolfee, $ans, 7);
if ($ans['STATUS'] == 'ok')
{

20
pool/page_ckp.php

@ -16,7 +16,7 @@ function stnum($num)
#
function dockp($data, $user)
{
$pg = '<h1>CKPool</h1>';
$pg = '<h1>CKDB</h1>';
$msg = msgEncode('stats', 'stats', array(), $user);
$rep = sendsockreply('stats', $msg);
@ -32,11 +32,12 @@ function dockp($data, $user)
$pg .= '<thead><tr class=title>';
$pg .= "<td class=dl><span class=nb>Name:<$r id=srtname data-sf=s0></span></td>";
$pg .= '<td class=dr>Initial</td>';
$pg .= '<td class=dr>Allocated</td>';
$pg .= "<td class=dr><span class=nb><$r id=srtname data-sf=r3>:In&nbsp;Store</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtname data-sf=r4>:RAM</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtname data-sf=r5>:RAM2</span></td>";
$pg .= '<td class=dr>Cull</td>';
$pg .= "<td class=dr><span class=nb><$r id=srtalloc data-sf=r2>:Alloc</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtstore data-sf=r3>:In&nbsp;Store</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtram data-sf=r4>:RAM</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtram2 data-sf=r5>:RAM2</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtcull data-sf=r6>:Cull</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtlim data-sf=r7>:Limit</span></td>";
$pg .= "</tr></thead>\n";
if ($ans['STATUS'] == 'ok')
{
@ -52,16 +53,19 @@ function dockp($data, $user)
$pg .= "<tr class=$row>";
$pg .= "<td class=dl data-srt='".$ans['name:'.$i]."'>".$ans['name:'.$i].'</td>';
$pg .= '<td class=dr>'.stnum($ans['initial:'.$i]).'</td>';
$pg .= '<td class=dr>'.stnum($ans['allocated:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['allocated:'.$i]."'>".stnum($ans['allocated:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['instore:'.$i]."'>".stnum($ans['instore:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['ram:'.$i]."'>".stnum($ans['ram:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['ram2:'.$i]."'>".stnum($ans['ram2:'.$i]).'</td>';
$pg .= '<td class=dr>'.stnum($ans['cull:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['cull:'.$i]."'>".stnum($ans['cull:'.$i]).'</td>';
$pg .= "<td class=dr data-srt='".$ans['cull_limit:'.$i]."'>".stnum($ans['cull_limit:'.$i]).'</td>';
$pg .= "</tr>\n";
}
$pg .= '</tbody>';
}
$pg .= "</table>\n";
$pg .= "<script type='text/javascript'>\n";
$pg .= "sotc('ckpsrt','srtram');</script>\n";
return $pg;
}

4
pool/page_shifts.php

@ -81,7 +81,7 @@ function doshifts($data, $user)
$pg .= '<td class=dr>'.$ans['rewards:'.$i].'</td>';
$ppsr = (float)$ans['ppsrewarded:'.$i];
if ($ppsr > 0)
$ppsd = sprintf('%.5f', $ppsr);
$ppsd = sprintf('%.5f', $ppsr*1000.0);
else
$ppsd = '0';
$pg .= "<td class=dr>$ppsd</td>";
@ -96,7 +96,7 @@ function doshifts($data, $user)
$pg .= '</tbody>';
}
$pg .= "</table>\n";
$pg .= "<span class=st1>*</span> The Rewarded value unit is satoshis per 1diff share<br>";
$pg .= "<span class=st1>*</span> The Rewarded value unit is satoshis per 1000diff share<br>";
return $pg;
}

519
src/ckdb.c

@ -14,6 +14,48 @@
* Consider adding row level locking (a per kitem usage count) if needed
*/
/* Thread layout
* -------------
* Any thread that manages a thread count will have 00 in it's name
* and name each subsequent thread it creates, with the same name
* but with 01, 02 etc, and then wait on them all before exiting
* The 2 digit 00 relates to THREAD_LIMIT which is 99
* there's a limit of THREAD_LIMIT active threads per thread manager
* WARNING - however the total number of threads created is limited by the
* LOCK_CHECK code which allows only MAX_THREADS total ckdb thread to
* be created irrelevant of any being deleted
*
* The threads that can be managed have a command option to set them when
* starting ckdb and can also be changed via the cmd_threads socket command
*
* The main() 'ckdb' thread starts:
* iomsgs() for filelog '_fiomsgs' and console '_ciomsgs'
* listener() '_p00qproc'
* which manages it's thread count in pqproc()
*
* listener() starts:
* breakdown() for reload '_r00breaker' and cmd '_c00breaker'
* each of which manage their thead counts
* logger() '_logger'
* socksetup() '_socksetup'
* summariser() '_summarise'
* marker() '_marker'
* then calls setup_data() which calls reload()
* then calls pqproc()
* which manages the thread count
*
* socksetup() starts:
* replier() for pool '_preplier' cmd '_creplier' and btc '_breplier'
* listener_all() for cmd '_c00listen' and btc '_b00listen'
* each of which manage their thead counts
* process_socket() '_procsock'
* sockrun() for ckpool '_psockrun' web '_wsockrun' and cmd '_csockrun'
*
* reload() starts:
* process_reload() '_p00rload'
* which manages it's thread count
*/
/* Startup
* -------
* During startup we load the DB and track where it is up to with
@ -29,13 +71,13 @@
* completes and just process authorise messages immediately while the
* reload runs
* However, we start the ckpool message queue after loading
* the optioncontrol, users, workers and useratts DB tables, before loading
* the much larger DB tables, so that ckdb is effectively ready for messages
* almost immediately
* the optioncontrol, idcontrol, users, workers and useratts DB tables,
* before loading the much larger DB tables, so that ckdb is effectively
* ready for messages almost immediately
* The first ckpool message allows us to know where ckpool is up to
* in the CCLs - see reload_from() for how this is handled
* The users table, required for the authorise messages, is always updated
* immediately
* in the disk DB immediately
*/
/* Reload data needed
@ -74,7 +116,7 @@
* RAM accountbalance: TODO: created as data is loaded
*
* idcontrol: only userid reuse is critical and the user is added
* immeditately to the DB before replying to the add message
* immeditately to the disk DB before replying to the add message
*
* Tables that are/will be written straight to the DB, so are OK:
* users, useraccounts, paymentaddresses, payments,
@ -157,6 +199,11 @@ static int cmd_breakdown_threads = -1;
int reload_breakdown_threads_delta = 0;
int cmd_breakdown_threads_delta = 0;
int cmd_listener_threads = 2;
int btc_listener_threads = 2;
int cmd_listener_threads_delta = 0;
int btc_listener_threads_delta = 0;
// Lock used to determine when the last breakdown thread exits
static cklock_t breakdown_lock;
@ -164,7 +211,7 @@ static int replier_count = 0;
static cklock_t replier_lock;
char *EMPTY = "";
const char *nullstr = "(null)";
const char *nullstr = NULLSTR;
const char *true_str = "true";
const char *false_str = "false";
@ -328,7 +375,7 @@ bool dbload_only_sharesummary = false;
* markersummaries and pplns payouts may not be correct */
bool sharesummary_marks_limit = false;
// DB optioncontrol,users,workers,useratts load is complete
// DB optioncontrol,idcontrol,users,workers,useratts load is complete
bool db_users_complete = false;
// DB load is complete
bool db_load_complete = false;
@ -341,7 +388,7 @@ bool reloaded_N_files = false;
// Data load is complete
bool startup_complete = false;
// Set to true when pool0 completes, pool0 = socket data during reload
static bool reload_queue_complete = false;
bool reload_queue_complete = false;
// Tell everyone to die
bool everyone_die = false;
// Set to true every time a store is created
@ -381,10 +428,10 @@ static uint64_t sock_acc[MAXSOCK], sock_recv[MAXSOCK];
// breaker() summarised
static tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
static uint64_t break_reload_processed, break_cmd_processed;
// clistener()
// listener_all()
static cklock_t listener_all_lock;
static double clis_us;
static uint64_t clis_processed;
// blistener()
static double blis_us;
static uint64_t blis_processed;
@ -406,6 +453,24 @@ char *by_default = "code";
char *inet_default = "127.0.0.1";
char *id_default = "42";
// Emulate a list for lock checking
K_LIST *pgdb_free;
// Count of db connections
int pgdb_count;
__thread char *connect_file = NULLSTR;
__thread char *connect_func = NULLSTR;
__thread int connect_line = 0;
__thread bool connect_dis = true;
// Pause all DB IO (permanently)
cklock_t pgdb_pause_lock;
__thread int pause_read_count = 0;
__thread char *pause_read_file = NULLSTR;
__thread char *pause_read_func = NULLSTR;
__thread int pause_read_line = 0;
__thread bool pause_read_unlock = false;
bool pgdb_paused = false;
bool pgdb_pause_disabled = false;
// NULL or poolinstance must match
const char *sys_poolinstance = NULL;
// lock for accessing all mismatch variables
@ -504,6 +569,8 @@ int reload_processing;
int cmd_processing;
int sockd_count;
int max_sockd_count;
ts_t breaker_sleep_stt;
int breaker_sleep_ms;
// Trigger breaker() processing
mutex_t bq_reload_waitlock;
@ -577,6 +644,7 @@ K_STORE *heartbeatqueue_store;
// TRANSFER
K_LIST *transfer_free;
int cull_transfer = CULL_TRANSFER;
// SEQSET
K_LIST *seqset_free;
@ -640,8 +708,7 @@ K_LIST *accountadjustment_free;
K_STORE *accountadjustment_store;
// IDCONTROL
// These are only used for db access - not stored in memory
//K_TREE *idcontrol_root;
K_TREE *idcontrol_root;
K_LIST *idcontrol_free;
K_STORE *idcontrol_store;
@ -1663,19 +1730,22 @@ PGconn *dbconnect()
}
/* Load tables required to support auths,adduser,chkpass and newid
* N.B. idcontrol is DB internal so is always ready
* OptionControl is loaded first in case it is needed by other loads
* (though not yet)
*/
static bool getdata1()
{
PGconn *conn = dbconnect();
PGconn *conn = NULL;
bool ok = true;
CKPQConn(&conn);
if (!(ok = check_db_version(conn)))
goto matane;
if (!(ok = optioncontrol_fill(conn)))
goto matane;
if (!(ok = idcontrol_fill(conn)))
goto matane;
if (!(ok = users_fill(conn)))
goto matane;
if (!(ok = workers_fill(conn)))
@ -1684,7 +1754,7 @@ static bool getdata1()
matane:
PQfinish(conn);
CKPQFinish(&conn);
return ok;
}
@ -1693,19 +1763,25 @@ matane:
*/
static bool getdata2()
{
PGconn *conn = dbconnect();
bool ok = blocks_fill(conn);
PGconn *conn = NULL;
bool ok;
PQfinish(conn);
CKPQConn(&conn);
ok = blocks_fill(conn);
CKPQFinish(&conn);
return ok;
}
static bool getdata3()
{
PGconn *conn = dbconnect();
PGconn *conn = NULL;
bool ok = true;
CKPQConn(&conn);
if (!key_update && !confirm_sharesummary) {
if (!(ok = paymentaddresses_fill(conn)) || everyone_die)
goto sukamudai;
@ -1715,12 +1791,12 @@ static bool getdata3()
if (!(ok = miningpayouts_fill(conn)) || everyone_die)
goto sukamudai;
}
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai;
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
if (!(ok = marks_fill(conn)) || everyone_die)
goto sukamudai;
/* must be after workinfo */
@ -1731,14 +1807,14 @@ static bool getdata3()
if (!(ok = payouts_fill(conn)) || everyone_die)
goto sukamudai;
}
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
if (!key_update) {
if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai;
}
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
if (!key_update) {
if (!(ok = shares_fill(conn)) || everyone_die)
goto sukamudai;
@ -1748,7 +1824,7 @@ static bool getdata3()
sukamudai:
PQfinish(conn);
CKPQFinish(&conn);
return ok;
}
@ -1915,8 +1991,9 @@ static void alloc_storage()
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
breakqueue_free = k_new_list_cull("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE,
true, CULL_BREAKQUEUE);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
cmd_breakqueue_store = k_new_store(breakqueue_free);
@ -1970,15 +2047,19 @@ static void alloc_storage()
}
}
seqtrans_free = k_new_list("SeqTrans", sizeof(SEQTRANS),
ALLOC_SEQTRANS, LIMIT_SEQTRANS, true);
seqtrans_free = k_new_list_cull("SeqTrans", sizeof(SEQTRANS),
ALLOC_SEQTRANS, LIMIT_SEQTRANS, true,
CULL_SEQTRANS);
msgline_free = k_new_list("MsgLine", sizeof(MSGLINE),
ALLOC_MSGLINE, LIMIT_MSGLINE, true);
msgline_free = k_new_list_cull("MsgLine", sizeof(MSGLINE),
ALLOC_MSGLINE, LIMIT_MSGLINE, true,
CULL_MSGLINE);
msgline_store = k_new_store(msgline_free);
msgline_free->dsp_func = dsp_msgline;
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
workqueue_free = k_new_list_cull("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE,
true, CULL_WORKQUEUE);
pool0_workqueue_store = k_new_store(workqueue_free);
pool_workqueue_store = k_new_store(workqueue_free);
cmd_workqueue_store = k_new_store(workqueue_free);
@ -1997,8 +2078,9 @@ static void alloc_storage()
LIMIT_HEARTBEATQUEUE, true);
heartbeatqueue_store = k_new_store(heartbeatqueue_free);
transfer_free = k_new_list(Transfer, sizeof(TRANSFER),
ALLOC_TRANSFER, LIMIT_TRANSFER, true);
transfer_free = k_new_list_cull(Transfer, sizeof(TRANSFER),
ALLOC_TRANSFER, LIMIT_TRANSFER, true,
cull_transfer);
transfer_free->dsp_func = dsp_transfer;
users_free = k_new_list("Users", sizeof(USERS),
@ -2051,6 +2133,8 @@ static void alloc_storage()
idcontrol_free = k_new_list("IDControl", sizeof(IDCONTROL),
ALLOC_IDCONTROL, LIMIT_IDCONTROL, true);
idcontrol_store = k_new_store(idcontrol_free);
idcontrol_root = new_ktree(NULL, cmp_idcontrol, idcontrol_free);
idcontrol_free->dsp_func = dsp_idcontrol;
esm_free = k_new_list("ESM", sizeof(ESM), ALLOC_ESM, LIMIT_ESM, true);
esm_store = k_new_store(esm_free);
@ -2215,6 +2299,9 @@ static void alloc_storage()
userinfo_store = k_new_store(userinfo_free);
userinfo_root = new_ktree(NULL, cmp_userinfo, userinfo_free);
// Emulate a list for lock checking
pgdb_free = k_lock_only_list("PGDB");
#if LOCK_CHECK
DLPRIO(seqset, 91);
@ -2269,12 +2356,15 @@ static void alloc_storage()
DLPRIO(paymentaddresses, 5);
// Must be above instransient
DLPRIO(idcontrol, 3);
// Don't currently nest any locks in these:
DLPRIO(esm, PRIO_TERMINAL);
DLPRIO(workers, PRIO_TERMINAL);
DLPRIO(idcontrol, PRIO_TERMINAL);
DLPRIO(ips, PRIO_TERMINAL);
DLPRIO(replies, PRIO_TERMINAL);
DLPRIO(pgdb, PRIO_TERMINAL);
DLPCHECK();
@ -2612,7 +2702,7 @@ static void dealloc_storage()
esm_report();
FREE_ALL(esm);
FREE_LISTS(idcontrol);
FREE_ALL(idcontrol);
FREE_ALL(accountbalance);
FREE_ALL(payments);
@ -2848,13 +2938,16 @@ static bool setup_data()
#define DATASETTRANS(_seqdata, _u) \
ENTRYSETTRANS(&((_seqdata)->entry[(_u) & ((_seqdata)->size - 1)]))
// Check for transient missing every 2s
// Check for transient missing every X seconds
#define TRANCHECKLIMIT 2.0
static tv_t last_trancheck;
// Don't let these messages be slowed down by a trans_process()
#define TRANCHKSEQOK(_seq) ((_seq) != SEQ_SHARES && (_seq) != SEQ_AUTH && \
(_seq) != SEQ_ADDRAUTH && (_seq) != SEQ_BLOCK)
// How many seconds to allow the build up of trans range messages
#define TRANSAGELIMIT 10.0
/* time (now) is used, not cd, since cd is only relevant to reloading
* and we don't run trans_process() during reloading
* We also only know now, not cd, for a missing item
@ -3049,9 +3142,6 @@ static void trans_seq(tv_t *now)
if (store->count) {
K_WLOCK(seqtrans_free);
k_list_transfer_to_head(store, seqtrans_free);
if (seqtrans_free->count == seqtrans_free->total &&
seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS)
k_cull_list(seqtrans_free);
K_WUNLOCK(seqtrans_free);
}
}
@ -3678,10 +3768,10 @@ setitemdata:
found[seq].forced_msg = true;
}
}
// Check if there are any ranges >= 2s old (or forced)
// Check if there are any ranges >= the limit (or forced)
for (i = 0; i < SEQ_MAX; i++) {
if (found[i].forced_msg || (found[i].last.tv_sec != 0 &&
tvdiff(&found_now, &(found[i].last)) >= 2.0)) {
tvdiff(&found_now, &(found[i].last)) >= TRANSAGELIMIT)) {
memcpy(&(found_msgs[i]), &(found[i]),
sizeof(SEQFOUND));
// will be displayed, so erase it
@ -3715,7 +3805,7 @@ setitemdata:
setnow(&found_now);
for (i = 0; i < SEQ_MAX; i++) {
if (found[i].last.tv_sec != 0 &&
tvdiff(&found_now, &(found[i].last)) >= 2.0) {
tvdiff(&found_now, &(found[i].last)) >= TRANSAGELIMIT) {
memcpy(&(found_msgs[i]),
&(found[i]),
sizeof(SEQFOUND));
@ -3830,9 +3920,6 @@ setitemdata:
}
K_WLOCK(seqtrans_free);
k_list_transfer_to_head(lost, seqtrans_free);
if (seqtrans_free->count == seqtrans_free->total &&
seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS)
k_cull_list(seqtrans_free);
K_WUNLOCK(seqtrans_free);
}
@ -4456,6 +4543,9 @@ static void *breaker(void *arg)
ts_t when, when_add;
int i, typ, mythread, done, tot, ret;
int breaker_delta = 0;
ts_t last_sleep = { 0L, 0L };
int last_sleep_ms = 0;
bool do_sleep = false;
setup = (struct breaker_setup *)(arg);
mythread = setup->thread;
@ -4531,7 +4621,11 @@ static void *breaker(void *arg)
K_WLOCK(breakqueue_free);
bq_item = NULL;
was_null = false;
if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) {
if (breaker_sleep_stt.tv_sec > last_sleep.tv_sec) {
copy_ts(&last_sleep, &breaker_sleep_stt);
last_sleep_ms = breaker_sleep_ms;
do_sleep = true;
} else if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) {
breaker_delta = reload_breakdown_threads_delta;
reload_breakdown_threads_delta = 0;
} else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) {
@ -4561,6 +4655,12 @@ static void *breaker(void *arg)
}
K_WUNLOCK(breakqueue_free);
if (do_sleep) {
do_sleep = false;
cksleep_ms_r(&last_sleep, last_sleep_ms);
continue;
}
// TODO: deal with thread creation/shutdown failure
if (breaker_delta != 0) {
if (breaker_delta > 0) {
@ -4724,10 +4824,6 @@ static void *breaker(void *arg)
pthread_cond_signal(&process_socket_waitcond);
mutex_unlock(&process_socket_waitlock);
}
if (breakqueue_free->count == breakqueue_free->total &&
breakqueue_free->total >= ALLOC_BREAKQUEUE * CULL_BREAKQUEUE)
k_cull_list(breakqueue_free);
K_WUNLOCK(breakqueue_free);
}
@ -6035,7 +6131,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t
K_WUNLOCK(breakqueue_free);
FREENULL(ans);
free_msgline_data(ml_item, true, true);
free_msgline_data(ml_item, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, ml_item);
@ -6043,130 +6139,194 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq_item);
if (workqueue_free->count == workqueue_free->total &&
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE)
k_cull_list(workqueue_free);
K_WUNLOCK(workqueue_free);
tick();
}
static void *clistener(__maybe_unused void *arg)
struct listener_setup {
int bc;
int thread;
};
#define BC_B 0
#define BC_C 1
static pthread_t listener_pt[2][THREAD_LIMIT];
static struct listener_setup listener_setup[2][THREAD_LIMIT];
static void *listener_all(void *arg)
{
static bool running[2][THREAD_LIMIT];
struct listener_setup *setup;
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
char buf[128];
time_t now;
ts_t when, when_add;
int ret;
int i, typ, mythread, done, tot, ret;
int listener_delta = 0;
pthread_detach(pthread_self());
setup = (struct listener_setup *)(arg);
typ = setup->bc;
mythread = setup->thread;
snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__);
snprintf(buf, sizeof(buf), "db%s_%c%02d%s",
dbcode, (typ == BC_B) ? 'b' : 'c', mythread, "listen");
LOCK_INIT(buf);
rename_proc(buf);
LOGNOTICE("%s() processing", __func__);
if (mythread == 0) {
pthread_detach(pthread_self());
for (i = 1; i < THREAD_LIMIT; i++) {
listener_setup[typ][i].thread = i;
listener_setup[typ][i].bc = typ;
running[typ][i] = false;
}
running[typ][0] = true;
if (typ == BC_B)
listener_delta = btc_listener_threads - 1;
else
listener_delta = cmd_listener_threads - 1;
LOGNOTICE("%s() %s initialised - delta %d",
__func__, (typ == BC_B) ? "btc" : "cmd", listener_delta);
}
LOGNOTICE("%s() %s processing", __func__, buf);
when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
if (typ == BC_B)
blistener_using_data = true;
else
clistener_using_data = true;
conn = dbconnect();
CKPQConn(&conn);
now = time(NULL);
while (!everyone_die) {
if (mythread && !running[typ][mythread])
break;
wq_item = NULL;
K_WLOCK(workqueue_free);
if (mythread == 0 && typ == BC_B &&
btc_listener_threads_delta != 0) {
listener_delta = btc_listener_threads_delta;
btc_listener_threads_delta = 0;
} else if (mythread == 0 && typ == BC_C &&
cmd_listener_threads_delta != 0) {
listener_delta = cmd_listener_threads_delta;
cmd_listener_threads_delta = 0;
} else {
if (typ == BC_B)
wq_item = k_unlink_head(btc_workqueue_store);
else
wq_item = k_unlink_head(cmd_workqueue_store);
}
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
// TODO: deal with thread creation/shutdown failure
if (listener_delta != 0) {
if (listener_delta > 0) {
// Add threads
tot = 1;
done = 0;
for (i = 1; i < THREAD_LIMIT; i++) {
if (!running[typ][i]) {
if (listener_delta > 0) {
listener_delta--;
running[typ][i] = true;
create_pthread(&(listener_pt[typ][i]),
listener_all,
&(listener_setup[typ][i]));
done++;
tot++;
}
if (wq_item) {
setnow(&now1);
process_sockd(conn, wq_item, REPLIER_CMD);
setnow(&now2);
clis_us += us_tvdiff(&now2, &now1);
clis_processed++;
} else
tot++;
}
LOGWARNING("%s() created %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
(typ == BC_B) ? "btc" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
} else {
setnowts(&when);
timeraddspec(&when, &when_add);
mutex_lock(&wq_cmd_waitlock);
ret = cond_timedwait(&wq_cmd_waitcond,
&wq_cmd_waitlock, &when);
if (ret == 0)
wq_cmd_wakes++;
else if (errno == ETIMEDOUT)
wq_cmd_timeouts++;
mutex_unlock(&wq_cmd_waitlock);
// Notify and wait for each to exit
tot = 1;
done = 0;
for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (running[typ][i]) {
if (listener_delta < 0) {
listener_delta++;
LOGNOTICE("%s() %s stopping %d",
__func__,
(typ == BC_B) ? "btc" : "cmd",
i);
running[typ][i] = false;
join_pthread(listener_pt[typ][i]);
done++;
} else
tot++;
}
}
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed);
clistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
LOGWARNING("%s() stopped %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
(typ == BC_B) ? "btc" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
}
listener_delta = 0;
continue;
}
static void *blistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
char buf[128];
time_t now;
ts_t when, when_add;
int ret;
pthread_detach(pthread_self());
snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__);
LOCK_INIT(buf);
rename_proc(buf);
LOGNOTICE("%s() processing", __func__);
when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
blistener_using_data = true;
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(btc_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
now = time(NULL);
}
if (wq_item) {
setnow(&now1);
process_sockd(conn, wq_item, REPLIER_BTC);
process_sockd(conn, wq_item,
(typ == BC_B) ? REPLIER_BTC : REPLIER_CMD);
setnow(&now2);
ck_wlock(&listener_all_lock);
if (typ == BC_B) {
blis_us += us_tvdiff(&now2, &now1);
blis_processed++;
} else {
clis_us += us_tvdiff(&now2, &now1);
clis_processed++;
}
ck_wunlock(&listener_all_lock);
} else {
setnowts(&when);
timeraddspec(&when, &when_add);
if (typ == BC_B) {
mutex_lock(&wq_btc_waitlock);
ret = cond_timedwait(&wq_btc_waitcond,
&wq_btc_waitlock, &when);
@ -6175,15 +6335,40 @@ static void *blistener(__maybe_unused void *arg)
else if (errno == ETIMEDOUT)
wq_btc_timeouts++;
mutex_unlock(&wq_btc_waitlock);
} else {
mutex_lock(&wq_cmd_waitlock);
ret = cond_timedwait(&wq_cmd_waitcond,
&wq_cmd_waitlock, &when);
if (ret == 0)
wq_cmd_wakes++;
else if (errno == ETIMEDOUT)
wq_cmd_timeouts++;
mutex_unlock(&wq_cmd_waitlock);
}
}
}
CKPQFinish(&conn);
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed);
if (mythread != 0)
LOGNOTICE("%s() %s exiting", __func__, buf);
else {
for (i = 1; i < THREAD_LIMIT; i++) {
if (running[typ][i]) {
running[typ][i] = false;
LOGNOTICE("%s() %s waiting for %d",
__func__, buf, i);
join_pthread(listener_pt[typ][i]);
}
}
if (typ == BC_B) {
LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, blis_processed);
blistener_using_data = false;
if (conn)
PQfinish(conn);
} else {
LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, clis_processed);
clistener_using_data = false;
}
}
return NULL;
}
@ -6251,6 +6436,7 @@ static void *process_socket(__maybe_unused void *arg)
case CMD_CHKPASS:
case CMD_GETATTS:
case CMD_THREADS:
case CMD_PAUSE:
case CMD_HOMEPAGE:
case CMD_QUERY:
break;
@ -6452,6 +6638,7 @@ static void *process_socket(__maybe_unused void *arg)
case CMD_EVENTS:
case CMD_HIGH:
case CMD_THREADS:
case CMD_PAUSE:
case CMD_QUERY:
msgline->sockd = bq->sockd;
bq->sockd = -1;
@ -6613,7 +6800,7 @@ static void *process_socket(__maybe_unused void *arg)
K_ITEM *ml_item = wq->msgline_item;
MSGLINE *ml;
DATA_MSGLINE(ml, ml_item);
free_msgline_data(ml_item, true, false);
free_msgline_data(ml_item, true);
K_WLOCK(msgline_free);
msgline_free->ram -= ml->msgsiz;
k_add_head(msgline_free, ml_item);
@ -6656,7 +6843,7 @@ skippy:
if (bq->ml_item) {
MSGLINE *ml;
DATA_MSGLINE(ml, bq->ml_item);
free_msgline_data(bq->ml_item, true, true);
free_msgline_data(bq->ml_item, true);
K_WLOCK(msgline_free);
msgline_free->ram -= ml->msgsiz;
k_add_head(msgline_free, bq->ml_item);
@ -6841,7 +7028,7 @@ static void *socksetup(__maybe_unused void *arg)
{
pthread_t prep_pt, crep_pt, brep_pt;
enum reply_type p_typ, c_typ, b_typ;
pthread_t clis_pt, blis_pt, proc_pt;
pthread_t proc_pt;
pthread_t psock_pt, wsock_pt, csock_pt;
char nbuf[64];
@ -6868,9 +7055,15 @@ static void *socksetup(__maybe_unused void *arg)
LOGWARNING("%s() Start processing...", __func__);
socksetup_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
listener_setup[BC_B][0].bc = BC_B;
listener_setup[BC_B][0].thread = 0;
create_pthread(&listener_pt[BC_B][0], listener_all,
&(listener_setup[BC_B][0]));
create_pthread(&blis_pt, blistener, NULL);
listener_setup[BC_C][0].bc = BC_C;
listener_setup[BC_C][0].thread = 0;
create_pthread(&listener_pt[BC_C][0], listener_all,
&(listener_setup[BC_C][0]));
create_pthread(&proc_pt, process_socket, NULL);
@ -6960,6 +7153,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
case CMD_EVENTS:
case CMD_HIGH:
case CMD_THREADS:
case CMD_PAUSE:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
__func__, bq->count,
@ -7010,7 +7204,7 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
if (bq->ml_item) {
DATA_MSGLINE(msgline, bq->ml_item);
free_msgline_data(bq->ml_item, true, true);
free_msgline_data(bq->ml_item, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, bq->ml_item);
@ -7062,7 +7256,7 @@ static void *process_reload(__maybe_unused void *arg)
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000;
conn = dbconnect();
CKPQConn(&conn);
now = time(NULL);
while (!everyone_die) {
@ -7166,8 +7360,8 @@ static void *process_reload(__maybe_unused void *arg)
// Don't keep a connection for more than ~10s ... of processing
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
now = time(NULL);
}
@ -7182,9 +7376,7 @@ static void *process_reload(__maybe_unused void *arg)
tick();
}
if (conn)
PQfinish(conn);
CKPQFinish(&conn);
if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) {
@ -7649,7 +7841,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item)
break;
}
free_msgline_data(ml_item, true, true);
free_msgline_data(ml_item, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, ml_item);
@ -7657,9 +7849,6 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item)
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq_item);
if (workqueue_free->count == workqueue_free->total &&
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE)
k_cull_list(workqueue_free);
K_WUNLOCK(workqueue_free);
}
@ -7668,9 +7857,6 @@ static void free_lost(SEQDATA *seqdata)
if (seqdata->reload_lost) {
K_WLOCK(seqtrans_free);
k_list_transfer_to_head(seqdata->reload_lost, seqtrans_free);
if (seqtrans_free->count == seqtrans_free->total &&
seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS)
k_cull_list(seqtrans_free);
K_WUNLOCK(seqtrans_free);
seqdata->reload_lost = NULL;
}
@ -7725,7 +7911,7 @@ static void *pqproc(void *arg)
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
now = time(NULL);
conn = dbconnect();
CKPQConn(&conn);
wqgot = 0;
// Override checking until pool0 is complete
@ -7838,8 +8024,8 @@ static void *pqproc(void *arg)
/* Don't keep a connection for more than ~10s or ~10000 items
* but always have a connection open */
if ((time(NULL) - now) > 10 || wqgot > 10000) {
PQfinish(conn);
conn = dbconnect();
CKPQFinish(&conn);
CKPQConn(&conn);
now = time(NULL);
wqgot = 0;
}
@ -7909,9 +8095,7 @@ static void *pqproc(void *arg)
mutex_unlock(&wq_pool_waitlock);
}
}
if (conn)
PQfinish(conn);
CKPQFinish(&conn);
if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) {
@ -8995,6 +9179,7 @@ static struct option long_options[] = {
// override calculated value
{ "breakdown-threads", required_argument, 0, 'B' },
{ "config", required_argument, 0, 'c' },
{ "cmd-listener-threads", required_argument, 0, 'C' },
{ "dbname", required_argument, 0, 'd' },
{ "minsdiff", required_argument, 0, 'D' },
{ "free", required_argument, 0, 'f' },
@ -9077,7 +9262,7 @@ int main(int argc, char **argv)
memset(&ckpcmd, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:Ab:B:c:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "a:Ab:B:c:C:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) {
switch(c) {
case '?':
case ':':
@ -9120,6 +9305,18 @@ int main(int argc, char **argv)
case 'c':
ckp.config = strdup(optarg);
break;
case 'C':
{
int cl = atoi(optarg);
if (cl < 1 || cl > THREAD_LIMIT) {
quit(1, "Invalid listener "
"thread count %d "
"- must be >0 and <=%d",
cl, THREAD_LIMIT);
}
cmd_listener_threads = cl;
}
break;
case 'd':
db_name = strdup(optarg);
kill = optarg;
@ -9467,8 +9664,10 @@ int main(int argc, char **argv)
cklock_init(&breakdown_lock);
cklock_init(&replier_lock);
cklock_init(&listener_all_lock);
cklock_init(&last_lock);
cklock_init(&btc_lock);
cklock_init(&pgdb_pause_lock);
cklock_init(&poolinstance_lock);
cklock_init(&seq_found_lock);

176
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.508"
#define CKDB_VERSION DB_VERSION"-2.715"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -70,6 +70,10 @@
#define STRINT(x) STRINT2(x)
#define STRINT2(x) #x
// Same as above but name it what we are using it for
#define STRMACRO(x) STRMAC2(x)
#define STRMAC2(x) #x
// So they can fit into a 1 byte flag field
#define TRUE_STR "Y"
#define TRUE2_STR "T"
@ -129,8 +133,14 @@ extern int proc_queue_threads_delta;
extern int reload_breakdown_threads_delta;
extern int cmd_breakdown_threads_delta;
extern int cmd_listener_threads;
extern int btc_listener_threads;
extern int cmd_listener_threads_delta;
extern int btc_listener_threads_delta;
#define BLANK " "
extern char *EMPTY;
#define NULLSTR "(null)"
extern const char *nullstr;
extern const char *true_str;
@ -346,7 +356,7 @@ extern bool dbload_only_sharesummary;
* markersummaries and pplns payouts may not be correct */
extern bool sharesummary_marks_limit;
// DB users,workers load is complete
// DB optioncontrol,idcontrol,users,workers,useratts load is complete
extern bool db_users_complete;
// DB load is complete
extern bool db_load_complete;
@ -358,6 +368,8 @@ extern bool reloading;
extern bool reloaded_N_files;
// Data load is complete
extern bool startup_complete;
// Set to true when pool0 completes, pool0 = socket data during reload
extern bool reload_queue_complete;
// Tell everyone to die
extern bool everyone_die;
@ -402,24 +414,81 @@ extern int btc_timeout;
// Lock access to the above variables so they can be changed
extern cklock_t btc_lock;
#define EDDB "expirydate"
#define CDDB "createdate"
#define CDTRF CDDB
#define BYDB "createby"
#define BYTRF BYDB
#define CODEDB "createcode"
#define CODETRF CODEDB
#define INETDB "createinet"
#define INETTRF INETDB
#define MDDB "modifydate"
#define MBYDB "modifyby"
#define MCODEDB "modifycode"
#define MINETDB "modifyinet"
#define _EDDB expirydate
#define _CDDB createdate
#define _CDTRF _CDDB
#define _BYDB createby
#define _BYTRF _BYDB
#define _CODEDB createcode
#define _CODETRF _CODEDB
#define _INETDB createinet
#define _INETTRF _INETDB
#define _MDDB modifydate
#define _MBYDB modifyby
#define _MCODEDB modifycode
#define _MINETDB modifyinet
#define EDDB STRMACRO(_EDDB)
#define CDDB STRMACRO(_CDDB)
#define CDTRF STRMACRO(_CDTRF)
#define BYDB STRMACRO(_BYDB)
#define BYTRF STRMACRO(_BYTRF)
#define CODEDB STRMACRO(_CODEDB)
#define CODETRF STRMACRO(_CODETRF)
#define INETDB STRMACRO(_INETDB)
#define INETTRF STRMACRO(_INETTRF)
#define MDDB STRMACRO(_MDDB)
#define MBYDB STRMACRO(_MBYDB)
#define MCODEDB STRMACRO(_MCODEDB)
#define MINETDB STRMACRO(_MINETDB)
extern char *by_default;
extern char *inet_default;
extern char *id_default;
// Emulate a list for lock checking
extern K_LIST *pgdb_free;
// Count of db connections
extern int pgdb_count;
extern __thread char *connect_file;
extern __thread char *connect_func;
extern __thread int connect_line;
extern __thread bool connect_dis;
/* (WHEN FINISHED) Pause all DB IO (permanently) pause.1.name=pgTABconfirm=Y
* Without confirm=Y it will return the pause state
* All DB IO commands must take out a read of this lock before
* starting anything that shouldn't be done partially
* This also means that the whole of CKDB can lock up for a short
* time if e.g. shift processing has taken the read lock shortly before
* the write lock is taken for the pause request to set the flag
* Once pgdb_paused is true, all DB IO will not access the database
* and all web access will be in read only mode i.e. no user changes
* All connections to the DB will close and thus DB changes and outages
* can then occur without affecting CKDB at all
* check the connection count with query.1.request=pg
* Shift generation is unchanged, Payout generation is permanently disabled
* To restart CKDB you must terminate it then rerun it, then CKDB will
* reload/redo all ckpool data it didn't store in the database
* The aim is to look the same as a normal running CKDB except doing no
* DB I/O - that will be deferred until CKDB is later restarted
* You can't pause CKDB until the dbload has completed, but you can
* during the CCL reload
* The function to take out the pause lock increments the thread's
* pause_read_count so each function that expects the lock to be held can
* easily test that and incorrectly calling it multiple times is tracked
* If the read lock code is called incorrectly, i.e. a code bug,
* pgdb_pause_disabled will be set to true and the pause command can no
* longer be activated, however if it was already activated, this wont
* affect anything */
extern cklock_t pgdb_pause_lock;
extern __thread int pause_read_count;
extern __thread char *pause_read_file;
extern __thread char *pause_read_func;
extern __thread int pause_read_line;
extern __thread bool pause_read_unlock;
extern bool pgdb_paused;
extern bool pgdb_pause_disabled;
// Number of seconds per poolinstance message for run
#define POOLINSTANCE_MSG_EVERY 30
@ -724,6 +793,7 @@ enum cmd_values {
CMD_EVENTS,
CMD_HIGH,
CMD_THREADS,
CMD_PAUSE,
CMD_END
};
@ -1365,7 +1435,7 @@ typedef struct msgline {
#define ALLOC_MSGLINE 8192
#define LIMIT_MSGLINE 0
#define CULL_MSGLINE 8
#define CULL_MSGLINE (8 * ALLOC_MSGLINE)
#define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline)
#define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true)
#define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false)
@ -1391,7 +1461,7 @@ typedef struct breakqueue {
#define ALLOC_BREAKQUEUE 16384
#define LIMIT_BREAKQUEUE 0
#define CULL_BREAKQUEUE 4
#define CULL_BREAKQUEUE (4 * ALLOC_BREAKQUEUE)
#define INIT_BREAKQUEUE(_item) INIT_GENERIC(_item, breakqueue)
#define DATA_BREAKQUEUE(_var, _item) DATA_GENERIC(_var, _item, breakqueue, true)
@ -1427,6 +1497,8 @@ extern int reload_processing;
extern int cmd_processing;
extern int sockd_count;
extern int max_sockd_count;
extern ts_t breaker_sleep_stt;
extern int breaker_sleep_ms;
// Trigger breaker() processing
extern mutex_t bq_reload_waitlock;
@ -1458,7 +1530,7 @@ typedef struct workqueue {
#define ALLOC_WORKQUEUE 1024
#define LIMIT_WORKQUEUE 0
#define CULL_WORKQUEUE 32
#define CULL_WORKQUEUE (32 * ALLOC_WORKQUEUE)
#define INIT_WORKQUEUE(_item) INIT_GENERIC(_item, workqueue)
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
@ -1558,12 +1630,18 @@ typedef struct transfer {
// Suggest malloc use MMAP = largest under 2MB
#define ALLOC_TRANSFER ((int)(2*1024*1024/sizeof(TRANSFER)))
#define LIMIT_TRANSFER 0
#define CULL_TRANSFER 16
// ALLOC_TRANSFER is ~14k, allocated often is 3
#define CULL_TRANSFER (4 * ALLOC_TRANSFER)
#define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer)
#define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true)
extern K_LIST *transfer_free;
/* Allow defining and adjusting it on a running system
* cull_limit is set to the optionvalue * ALLOC_TRANSFER */
#define CULL_TRANSFER_NAME "CullTransfer"
extern int cull_transfer;
#define transfer_data(_item) _transfer_data(_item, WHERE_FFL_HERE)
extern const char Transfer[];
@ -1782,7 +1860,7 @@ extern K_LIST *seqtrans_free;
#define ALLOC_SEQTRANS 1024
#define LIMIT_SEQTRANS 0
#define CULL_SEQTRANS 64
#define CULL_SEQTRANS (16 * ALLOC_SEQTRANS)
#define INIT_SEQTRANS(_item) INIT_GENERIC(_item, seqtrans)
#define DATA_SEQTRANS(_var, _item) DATA_GENERIC(_var, _item, seqtrans, true)
#define DATA_SEQTRANS_NULL(_var, _item) DATA_GENERIC(_var, _item, seqtrans, false)
@ -1839,6 +1917,9 @@ typedef struct users {
// Address account, not a username account
#define USER_ADDRESS 0x1
// Username created due to a share that had an unknown username
#define USER_MISSING 0x2
// 16 x base 32 (5 bits) = 10 bytes (8 bits)
#define TOTPAUTH_KEYSIZE 10
#define TOTPAUTH_DSP_KEYSIZE 16
@ -2030,7 +2111,7 @@ extern K_STORE *accountadjustment_store;
typedef struct idcontrol {
char idname[TXT_SML+1];
int64_t lastid;
MODIFYDATECONTROLFIELDS;
MODIFYDATECONTROLIN;
} IDCONTROL;
#define ALLOC_IDCONTROL 16
@ -2038,8 +2119,7 @@ typedef struct idcontrol {
#define INIT_IDCONTROL(_item) INIT_GENERIC(_item, idcontrol)
#define DATA_IDCONTROL(_var, _item) DATA_GENERIC(_var, _item, idcontrol, true)
// These are only used for db access - not stored in memory
//extern K_TREE *idcontrol_root;
extern K_TREE *idcontrol_root;
extern K_LIST *idcontrol_free;
extern K_STORE *idcontrol_store;
@ -2846,6 +2926,11 @@ extern K_STORE *userstats_eos_store;
// newer OR equal
#define tv_newer_eq(_old, _new) (!(tv_newer(_new, _old)))
#define copy_ts(_dest, _src) do { \
(_dest)->tv_sec = (_src)->tv_sec; \
(_dest)->tv_nsec = (_src)->tv_nsec; \
} while(0)
// WORKERSTATUS from various incoming data
typedef struct workerstatus {
int64_t userid;
@ -3219,7 +3304,7 @@ extern void sequence_report(bool lock);
#define FREE_ITEM(item) do { } while(0)
// TODO: make a macro for all other to use above macro
extern void free_transfer_data(TRANSFER *transfer);
extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull);
extern void free_msgline_data(K_ITEM *item, bool t_lock);
extern void free_users_data(K_ITEM *item);
extern void free_workinfo_data(K_ITEM *item);
#define free_sharesummary_data(_i) FREE_ITEM(_i)
@ -3325,6 +3410,7 @@ extern INTRANSIENT *_get_intransient(const char *fldnam, char *value,
#define intransient_str(_fld, _val) \
_intransient_str(_fld, _val, WHERE_FFL_HERE)
extern char *_intransient_str(char *fldnam, char *value, WHERE_FFL_ARGS);
extern void dsp_msgline(K_ITEM *item, FILE *stream);
extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS);
extern void dsp_transfer(K_ITEM *item, FILE *stream);
extern cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b);
@ -3427,6 +3513,9 @@ extern K_ITEM *find_first_payments(int64_t userid, K_TREE_CTX *ctx);
extern K_ITEM *find_first_paypayid(int64_t userid, int64_t payoutid, K_TREE_CTX *ctx);
extern cmp_t cmp_accountbalance(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_accountbalance(int64_t userid);
extern void dsp_idcontrol(K_ITEM *item, FILE *stream);
extern cmp_t cmp_idcontrol(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_idcontrol(char *idname);
extern cmp_t cmp_optioncontrol(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_optioncontrol(char *optionname, const tv_t *now, int32_t height);
#define sys_setting(_name, _def, _now) user_sys_setting(0, _name, _def, _now)
@ -3603,12 +3692,12 @@ extern void userinfo_block(BLOCKS *blocks, enum info_type isnew, int delta);
#define CKPQ_READ true
#define CKPQ_WRITE false
#define CKPQexec(_conn, _qry, _isread) _CKPQexec(_conn, _qry, _isread, WHERE_FFL_HERE)
extern PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS);
#define CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \
_CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \
#define CKPQExec(_conn, _qry, _isread) _CKPQExec(_conn, _qry, _isread, WHERE_FFL_HERE)
extern PGresult *_CKPQExec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS);
#define CKPQExecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \
_CKPQExecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \
_isread, WHERE_FFL_HERE)
extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
extern PGresult *_CKPQExecParams(PGconn *conn, const char *qry,
int nParams,
const Oid *paramTypes,
const char *const * paramValues,
@ -3616,10 +3705,10 @@ extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
const int *paramFormats,
int resultFormat,
bool isread, WHERE_FFL_ARGS);
// Force use CKPQ... for PQ functions in use
#define PQexec CKPQexec
#define PQexecParams CKPQexecParams
extern ExecStatusType _CKPQResultStatus(PGresult *res, WHERE_FFL_ARGS);
#define CKPQResultStatus(_res) _CKPQResultStatus(_res, WHERE_FFL_HERE)
extern void _CKPQClear(PGresult *res, WHERE_FFL_ARGS);
#define CKPQClear(_res) _CKPQClear(_res, WHERE_FFL_HERE)
#define PGLOG(__LOG, __str, __rescode, __conn) do { \
char *__buf = pqerrmsg(__conn); \
@ -3630,14 +3719,23 @@ extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
#define PGLOGERR(_str, _rescode, _conn) PGLOG(LOGERR, _str, _rescode, _conn)
#define PGLOGEMERG(_str, _rescode, _conn) PGLOG(LOGEMERG, _str, _rescode, _conn)
#define PGLOGNOTICE(_str, _rescode, _conn) PGLOG(LOGNOTICE, _str, _rescode, _conn)
extern void _pause_read_lock(WHERE_FFL_ARGS);
#define pause_read_lock() _pause_read_lock(WHERE_FFL_HERE)
extern void _pause_read_unlock(WHERE_FFL_ARGS);
#define pause_read_unlock() _pause_read_unlock(WHERE_FFL_HERE)
extern char *pqerrmsg(PGconn *conn);
extern bool CKPQConn(PGconn **conn);
extern void CKPQDisco(PGconn **conn, bool conned);
extern bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS);
#define CKPQConn(_conn) _CKPQConn(_conn, WHERE_FFL_HERE)
extern bool _CKPQDisco(PGconn **conn, bool conned, WHERE_FFL_ARGS);
#define CKPQDisco(_conn, _conned) _CKPQDisco(_conn, _conned, WHERE_FFL_HERE)
#define CKPQFinish(_conn) CKPQDisco(_conn, true)
extern bool _CKPQBegin(PGconn *conn, WHERE_FFL_ARGS);
#define CKPQBegin(_conn) _CKPQBegin(conn, WHERE_FFL_HERE)
extern void _CKPQEnd(PGconn *conn, bool commit, WHERE_FFL_ARGS);
#define CKPQEnd(_conn, _commit) _CKPQEnd(_conn, _commit, WHERE_FFL_HERE)
#define CKPQCommit(_conn) _CKPQEnd(_conn, true, WHERE_FFL_HERE)
extern int64_t nextid(PGconn *conn, char *idname, int64_t increment,
tv_t *cd, char *by, char *code, char *inet);
@ -3647,11 +3745,14 @@ extern bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash,
int *event);
extern K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username,
char *emailaddress, char *passwordhash,
int64_t userbits, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
char *secondaryuserid, int64_t userbits, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root);
extern bool users_replace(PGconn *conn, K_ITEM *u_item, K_ITEM *old_u_item,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root);
extern K_ITEM *create_missing_user(PGconn *conn, char *username,
char *secondaryuserid, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root);
extern bool users_fill(PGconn *conn);
extern bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd,
bool begun);
@ -3685,6 +3786,7 @@ extern bool payments_add(PGconn *conn, bool add, K_ITEM *p_item,
extern bool payments_fill(PGconn *conn);
extern bool idcontrol_add(PGconn *conn, char *idname, char *idvalue, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root);
extern bool idcontrol_fill(PGconn *conn);
extern K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool begun);
extern K_ITEM *optioncontrol_add(PGconn *conn, char *optionname, char *optionvalue,
char *activationdate, char *activationheight,

27
src/ckdb.php

@ -19,10 +19,12 @@ function getsock2($fun, $tmo)
return _getsock($fun, "$socket_dir$socket_name/$socket_file", $tmo);
}
#
function msg($line, $tmo = false)
function msg($line, $tabs, $tmo = false)
{
global $fld_sep, $val_sep;
if ($tabs)
$line = str_replace("TAB", "\t", $line);
$fun = 'stdin';
$ret = false;
$socket = getsock2($fun, $tmo);
@ -40,7 +42,8 @@ function msg($line, $tmo = false)
function usAge($a0)
{
global $socket_name_def, $socket_dir_def, $socket_file_def;
echo "usAge: php $a0 [name [dir [socket]]]\n";
echo "usAge: php $a0 [-t] [name [dir [socket]]]\n";
echo " -t = don't convert 'TAB' to a tab character\n";
echo " default name = $socket_name_def\n";
echo " default dir = $socket_dir_def\n";
echo " default socket = $socket_file_def\n";
@ -48,18 +51,26 @@ function usAge($a0)
exit(1);
}
#
$tabs = true;
#
if (count($argv) > 1)
{
if ($argv[1] == '-?' || $argv[1] == '-h' || $argv[1] == '-help'
|| $argv[1] == '--help')
usAge($argv[0]);
$socket_name = $argv[1];
if (count($argv) > 2)
$a = 1;
if ($argv[$a] == '-t')
{
$tabs = false;
$a++;
}
$socket_name = $argv[$a++];
if (count($argv) > $a)
{
$socket_dir = $argv[2];
if (count($argv) > 3)
$socket_file = $argv[3];
$socket_dir = $argv[$a++];
if (count($argv) > $a)
$socket_file = $argv[$a];
}
}
#
@ -68,7 +79,7 @@ while ($line = fgets(STDIN))
$line = trim($line);
if (strlen($line) > 0)
{
$rep = msg($line);
$rep = msg($line, $tabs);
if ($rep === false)
echo "Failed\n";
else

427
src/ckdb_cmd.c

@ -72,8 +72,9 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
if (event == EVENT_OK) {
u_item = users_add(conn, in_username,
transfer_data(i_emailaddress),
transfer_data(i_passwordhash), 0,
by, code, inet, now, trf_root);
transfer_data(i_passwordhash),
NULL, 0, by, code, inet, now,
trf_root);
}
}
@ -3116,7 +3117,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
DATA_OPTIONCONTROL(optioncontrol, oc_item);
u_item = users_add(conn, in_username, EMPTY,
optioncontrol->optionvalue,
0, by, code, inet, cd,
NULL, 0, by, code, inet, cd,
trf_root);
} else
ok = false;
@ -3347,7 +3348,7 @@ static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id,
goto pulse;
}
hq_store = k_new_store(heartbeatqueue_free);
hq_store = k_new_store_locked(heartbeatqueue_free);
k_list_transfer_to_head(heartbeatqueue_store, hq_store);
K_WUNLOCK(heartbeatqueue_free);
@ -3871,8 +3872,6 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
ExecStatusType rescode;
PGresult *res;
bool conned = false;
K_ITEM *t_item, *u_item, *ua_item = NULL;
INTRANSIENT *in_username;
@ -3920,21 +3919,14 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
*(dot++) = '\0';
// If we already had a different one, save it to the DB
if (ua_item && strcmp(useratts->attname, attname) != 0) {
if (conn == NULL) {
conn = dbconnect();
if (CKPQConn(&conn))
conned = true;
}
if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
begun = CKPQBegin(conn);
if (!begun) {
reason = "DBERR";
goto bats;
}
begun = true;
}
if (useratts_item_add(conn, ua_item, now, begun)) {
ua_item = NULL;
@ -3982,21 +3974,14 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
t_item = next_in_ktree(ctx);
}
if (ua_item) {
if (conn == NULL) {
conn = dbconnect();
if (CKPQConn(&conn))
conned = true;
}
if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
begun = CKPQBegin(conn);
if (!begun) {
reason = "DBERR";
goto bats;
}
begun = true;
}
if (!useratts_item_add(conn, ua_item, now, begun)) {
reason = "DBERR";
@ -4006,15 +3991,11 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
}
}
rollback:
if (!reason)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
CKPQEnd(conn, (reason == NULL));
bats:
if (conned)
PQfinish(conn);
CKPQDisco(&conn, conned);
if (reason) {
if (ua_item) {
K_WLOCK(useratts_free);
@ -4206,8 +4187,6 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
ExecStatusType rescode;
PGresult *res;
bool conned = false;
K_ITEM *t_item, *oc_item = NULL, *ok = NULL;
K_TREE_CTX ctx[1];
@ -4241,21 +4220,14 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value";
goto rollback;
}
if (conn == NULL) {
conn = dbconnect();
if (CKPQConn(&conn))
conned = true;
}
if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
begun = CKPQBegin(conn);
if (!begun) {
reason = "DBERR";
goto rollback;
}
begun = true;
}
ok = optioncontrol_item_add(conn, oc_item, now, begun);
oc_item = NULL;
@ -4298,21 +4270,14 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value";
goto rollback;
}
if (conn == NULL) {
conn = dbconnect();
if (CKPQConn(&conn))
conned = true;
}
if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
begun = CKPQBegin(conn);
if (!begun) {
reason = "DBERR";
goto rollback;
}
begun = true;
}
ok = optioncontrol_item_add(conn, oc_item, now, begun);
oc_item = NULL;
@ -4324,17 +4289,10 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
}
}
rollback:
if (begun) {
if (reason)
res = PQexec(conn, "Rollback", CKPQ_WRITE);
else
res = PQexec(conn, "Commit", CKPQ_WRITE);
PQclear(res);
}
if (begun)
CKPQEnd(conn, (reason == NULL));
if (conned)
PQfinish(conn);
CKPQDisco(&conn, conned);
if (reason) {
snprintf(reply, siz, "ERR.%s", reason);
LOGERR("%s.%s.%s", cmd, id, reply);
@ -5903,45 +5861,116 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd,
__maybe_unused K_TREE *trf_root,
__maybe_unused bool reload_data)
{
__maybe_unused K_ITEM *i_file;
__maybe_unused char reply[1024] = "";
__maybe_unused size_t siz = sizeof(reply);
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
// WARNING: This is a gaping security hole - only use in development
#if 1
LOGDEBUG("%s.disabled.dsp", id);
return strdup("disabled.dsp");
/*
#else
// WARNING: This is a gaping security hole - only use in development
K_ITEM *i_file, *i_name, *i_type;
char reply[1024] = "", *fil, *name, *typ;
size_t siz = sizeof(reply);
K_STORE *store = NULL;
K_TREE *tree = NULL;
bool unknown_typ = true, unknown_name = true, msg = false;
i_file = require_name(trf_root, "file", 1, NULL, reply, siz);
if (!i_file)
return strdup(reply);
dsp_ktree(blocks_free, blocks_root, transfer_data(i_file), NULL);
i_name = require_name(trf_root, "name", 1, NULL, reply, siz);
if (!i_name)
return strdup(reply);
dsp_ktree(transfer_free, trf_root, transfer_data(i_file), NULL);
i_type = optional_name(trf_root, "type", 1, NULL, reply, siz);
if (*reply)
return strdup(reply);
dsp_ktree(paymentaddresses_free, paymentaddresses_root,
transfer_data(i_file), NULL);
fil = transfer_data(i_file);
name = transfer_data(i_name);
if (i_type)
typ = transfer_data(i_type);
else
typ = "tree";
dsp_ktree(paymentaddresses_create_free, paymentaddresses_root,
transfer_data(i_file), NULL);
if (strcasecmp(typ, "tree") == 0) {
unknown_typ = false;
dsp_ktree(sharesummary_free, sharesummary_root,
transfer_data(i_file), NULL);
if (strcasecmp(name, "blocks") == 0)
tree = blocks_root;
dsp_ktree(userstats_free, userstats_root,
transfer_data(i_file), NULL);
if (strcasecmp(name, "transfer") == 0)
tree = trf_root;
dsp_ktree(markersummary_free, markersummary_root,
transfer_data(i_file), NULL);
if (strcasecmp(name, "paymentaddresses") == 0)
tree = paymentaddresses_root;
dsp_ktree(workmarkers_free, workmarkers_root,
transfer_data(i_file), NULL);
if (strcasecmp(name, "paymentaddresses_create") == 0)
tree = paymentaddresses_create_root;
LOGDEBUG("%s.ok.dsp.file='%s'", id, transfer_data(i_file));
return strdup("ok.dsp");
*/
if (strcasecmp(name, "sharesummary") == 0)
tree = sharesummary_root;
if (strcasecmp(name, "userstats") == 0)
tree = userstats_root;
if (strcasecmp(name, "markersummary") == 0)
tree = markersummary_root;
if (strcasecmp(name, "workmarkers") == 0)
tree = workmarkers_root;
if (strcasecmp(name, "idcontrol") == 0)
tree = idcontrol_root;
if (tree) {
unknown_name = false;
if (tree->master->dsp_func)
dsp_ktree(tree, fil, NULL);
else {
snprintf(reply, siz,
"%s %s has no dsp_func",
typ, name);
msg = true;
}
}
} else if (strcasecmp(typ, "store") == 0) {
unknown_typ = false;
if (strcasecmp(name, "blocks") == 0)
store = blocks_store;
if (strcasecmp(name, "markersummary") == 0)
store = markersummary_store;
if (strcasecmp(name, "msgline") == 0)
store = msgline_store;
if (store) {
unknown_name = false;
if (store->master->dsp_func)
dsp_kstore(store, fil, NULL);
else {
snprintf(reply, siz,
"%s %s has no dsp_func",
typ, name);
msg = true;
}
}
}
if (unknown_typ) {
snprintf(reply, siz, "unknown typ '%s'", typ);
} else if (unknown_name) {
snprintf(reply, siz, "unknown name '%s' for '%s'", name, typ);
} else {
if (!msg)
snprintf(reply, siz, "ok.dsp.file='%s'", fil);
}
LOGDEBUG("%s.%s'", id, reply);
return strdup(reply);
#endif
}
static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
@ -5954,7 +5983,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
char tmp[1024], *buf;
const char *name;
size_t len, off;
uint64_t ram, ram2, tot = 0;
int64_t ram, ram2, tot = 0;
K_LIST *klist;
K_LISTS *klists;
int rows = 0;
@ -5999,8 +6028,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
snprintf(tmp, sizeof(tmp),
"name:%d=%s%s%s%cinitial:%d=%d%callocated:%d=%d%c"
"instore:%d=%d%cram:%d=%"PRIu64"%c"
"ram2:%d=%"PRIu64"%ccull:%d=%d%c",
"instore:%d=%d%cram:%d=%"PRId64"%c"
"ram2:%d=%"PRId64"%ccull:%d=%d%ccull_limit:%d=%d%c",
rows, name, istree ? " (tree)" : "",
klist->is_lock_only ? " (lock)" : "", FLDSEP,
rows, klist->allocate, FLDSEP,
@ -6008,7 +6037,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
rows, klist->total - klist->count, FLDSEP,
rows, ram, FLDSEP,
rows, ram2, FLDSEP,
rows, klist->cull_count, FLDSEP);
rows, klist->cull_count, FLDSEP,
rows, klist->cull_limit, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
tot += ram + ram2;
@ -6018,13 +6048,13 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
}
ck_wunlock(&lock_check_lock);
snprintf(tmp, sizeof(tmp), "totalram=%"PRIu64"%c", tot, FLDSEP);
snprintf(tmp, sizeof(tmp), "totalram=%"PRId64"%c", tot, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp),
"rows=%d%cflds=%s%c",
rows, FLDSEP,
"name,initial,allocated,instore,ram,cull", FLDSEP);
"name,initial,allocated,instore,ram,cull,cull_limit", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "arn=%s%carp=%s", "Stats", FLDSEP, "");
@ -7030,7 +7060,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_height)
return strdup(reply);
goto badreply;
TXT_TO_INT("height", transfer_data(i_height), height);
i_expired = optional_name(trf_root, "expired",
@ -7116,7 +7146,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_wid)
return strdup(reply);
goto badreply;
TXT_TO_BIGINT("wid", transfer_data(i_wid), wid);
i_expired = optional_name(trf_root, "expired",
@ -7252,7 +7282,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_height)
return strdup(reply);
goto badreply;
TXT_TO_INT("height", transfer_data(i_height), height);
int_to_buf(height, reply, sizeof(reply));
@ -7319,7 +7349,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_height)
return strdup(reply);
goto badreply;
TXT_TO_INT("height", transfer_data(i_height), height);
int_to_buf(height, reply, sizeof(reply));
@ -7450,7 +7480,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_height)
return strdup(reply);
goto badreply;
TXT_TO_INT("height", transfer_data(i_height), height);
int_to_buf(height, reply, sizeof(reply));
@ -7571,7 +7601,7 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
1, (char *)intpatt,
reply, siz);
if (!i_wid)
return strdup(reply);
goto badreply;
TXT_TO_BIGINT("wid", transfer_data(i_wid), selwid);
INIT_SHARES(&s_look);
@ -7703,21 +7733,117 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp);
ok = true;
} else if (strcasecmp(request, "pg") == 0) {
K_RLOCK(pgdb_free);
snprintf(tmp, sizeof(tmp), "connections=%d%c",
pgdb_count, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
K_RUNLOCK(pgdb_free);
rows++;
ok = true;
#if 0
} else if (strcasecmp(request, "transfer") == 0) {
/* Code for debugging the transfer stores
* limit is set to avoid a very large reply,
* since transfer can be millions of items during a reload */
TRANSFER *trf = NULL;
K_STORE *trf_store;
K_ITEM *trf_item, *i_limit;
int stores = 0, limit = 20, tot_stores = 0;
bool exceeded = false;
i_limit = optional_name(trf_root, "limit",
1, (char *)intpatt,
reply, siz);
if (*reply) {
LOGERR("%s() %s.%s", __func__, id, reply);
goto badreply;
}
if (i_limit)
limit = atoi(transfer_data(i_limit));
snprintf(tmp, sizeof(tmp), "limit=%d%c", limit, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
K_RLOCK(transfer_free);
trf_store = transfer_free->next_store;
while (!exceeded && trf_store) {
trf_item = trf_store->head;
while (trf_item) {
if (rows >= limit) {
exceeded = true;
break;
}
DATA_TRANSFER(trf, trf_item);
snprintf(tmp, sizeof(tmp), "store:%d=%d%c",
rows, stores, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "storename:%d=%s%c",
rows, trf_store->name, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "name:%d=%s%c",
rows, trf->name, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "mvalue:%d=%s%c",
rows, trf->mvalue, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp),
"malloc:%d=%"PRIu64"%c",
rows, trf->msiz, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "intrans:%d=%c%c",
rows, trf->intransient ? 'Y' : 'N',
FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
trf_item = trf_item->next;
rows++;
}
trf_store = trf_store->next_store;
stores++;
}
tot_stores = stores;
if (exceeded) {
while (trf_store) {
trf_store = trf_store->next_store;
tot_stores++;
}
}
K_RUNLOCK(transfer_free);
snprintf(tmp, sizeof(tmp), "rowstores=%d%c",
stores, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "totstores=%d%c",
tot_stores, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "limitexceeded=%c%c",
exceeded ? 'Y' : 'N', FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "flds=%s%c",
"store,storename,name,mvalue,malloc,intrans", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "arn=%s%carp=%s%c",
transfer_free->name, FLDSEP, "", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
ok = true;
#endif
} else {
free(buf);
snprintf(reply, siz, "unknown request '%s'", request);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
goto badreply;
}
if (!ok) {
free(buf);
snprintf(reply, siz, "failed.%s%s%s",
request,
msg[0] ? " " : "",
msg[0] ? msg : "");
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
goto badreply;
}
snprintf(tmp, sizeof(tmp), "rows=%d", rows);
@ -7726,6 +7852,10 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
msg[0] ? " " : "",
msg[0] ? msg : "");
return buf;
badreply:
free(buf);
return strdup(reply);
}
// Query and disable internal lock detection code
@ -8353,10 +8483,8 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
if (strcasecmp(action, "store") == 0) {
/* Store the shares_hi_root list in the db now,
* rather than wait for a shift process to do it */
if (!conn) {
conn = dbconnect();
if (CKPQConn(&conn))
conned = true;
}
count = 0;
do {
did = false;
@ -8371,8 +8499,7 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
count++;
}
} while (did);
if (conned)
PQfinish(conn);
CKPQDisco(&conn, conned);
if (count) {
LOGWARNING("%s() Stored: %d high shares",
__func__, count);
@ -8463,6 +8590,22 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "cl") == 0 ||
strcasecmp(name, "cmd_listener") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
cmd_listener_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "bl") == 0 ||
strcasecmp(name, "btc_listener") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
btc_listener_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else {
snprintf(reply, siz, "unknown name '%s'", name);
LOGERR("%s() %s.%s", __func__, id, reply);
@ -8472,6 +8615,71 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
return buf;
}
static char *cmd_pause(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
K_ITEM *i_name;
char reply[1024] = "";
size_t siz = sizeof(reply);
char *name;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_name = require_name(trf_root, "name", 1, NULL, reply, siz);
if (!i_name)
return strdup(reply);
name = transfer_data(i_name);
/* Pause the breaker threads to help culling to take place for some
* tables that can be culled but 'never' empty due to threads always
* creating new data before the old data has finished being processed
* N.B. this should only be needed on a sizeable pool, once after
* the reload completes ... and even 499ms would be a long time to
* pause in the case of a sizeable pool ... DANGER, WILL ROBINSON! */
if (strcasecmp(name, "breaker") == 0) {
K_ITEM *i_ms;
int ms = 100;
i_ms = optional_name(trf_root, "ms", 1, NULL, reply, siz);
if (*reply)
return strdup(reply);
if (i_ms) {
ms = atoi(transfer_data(i_ms));
// 4999 is too long, don't do it!
if (ms < 10 || ms > 4999) {
snprintf(reply, siz,
"%s ms %d outside range 10-4999",
name, ms);
goto out;
}
}
if (!reload_queue_complete && !key_update) {
snprintf(reply, siz,
"no point pausing %s before reload completes",
name);
goto out;
}
/* Use an absolute start time to try to get all threads asleep
* at the same time */
K_WLOCK(breakqueue_free);
cksleep_prepare_r(&breaker_sleep_stt);
breaker_sleep_ms = ms;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.%s %s%dms pause sent", name,
ms > 499 ? "ALERT!!! " : EMPTY, ms);
} else
snprintf(reply, siz, "unknown name '%s'", name);
out:
LOGWARNING("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
/* The socket command format is as follows:
* Basic structure:
* cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=...
@ -8585,5 +8793,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM | ACCESS_WEB },
{ CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_PAUSE, "pause", false, false, cmd_pause, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 }
};

123
src/ckdb_data.c

@ -18,7 +18,7 @@ void free_transfer_data(TRANSFER *transfer)
FREENULL(transfer->mvalue);
}
void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull)
void free_msgline_data(K_ITEM *item, bool t_lock)
{
K_ITEM *t_item = NULL;
TRANSFER *transfer;
@ -40,11 +40,6 @@ void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull)
K_WLOCK(transfer_free);
transfer_free->ram -= ram2;
k_list_transfer_to_head(msgline->trf_store, transfer_free);
if (t_cull) {
if (transfer_free->count == transfer_free->total &&
transfer_free->total >= ALLOC_TRANSFER * CULL_TRANSFER)
k_cull_list(transfer_free);
}
if (t_lock)
K_WUNLOCK(transfer_free);
msgline->trf_store = k_free_store(msgline->trf_store);
@ -510,9 +505,27 @@ void _txt_to_double(char *nam, char *fld, double *data, size_t siz, WHERE_FFL_AR
char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
static bool had_null = false;
struct tm tm;
double d;
// Return an empty string but only log a console message the first time
if (!data) {
// locking doesn't matter - if we get extra messages
if (!had_null) {
had_null = true;
LOGEMERG("%s() BUG - called with null data - check"
" log file" WHERE_FFL,
__func__, WHERE_FFL_PASS);
}
LOGNOTICE("%s() BUG - called with null data typ=%d" WHERE_FFL,
__func__, (int)typ, WHERE_FFL_PASS);
if (!buf)
buf = malloc(1);
*buf = '\0';
return buf;
}
if (!buf) {
switch (typ) {
case TYPE_STR:
@ -816,6 +829,37 @@ char *_intransient_str(char *fldnam, char *value, WHERE_FFL_ARGS)
return in->str;
}
void dsp_msgline(K_ITEM *item, FILE *stream)
{
K_ITEM *t_item;
MSGLINE *m;
int c;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_MSGLINE(m, item);
if (m->trf_store)
c = m->trf_store->count;
else
c = 0;
fprintf(stream, " which=%d id='%s' cmd='%s' msg='%.42s' "
"trf_store=%c count=%d\n",
m->which_cmds, m->id, m->cmd, m->msg,
m->trf_store ? 'Y' : 'N', c);
if (m->trf_store) {
t_item = m->trf_store->head;
while (t_item) {
fputc(' ', stream);
dsp_transfer(t_item, stream);
t_item = t_item->next;
}
}
}
}
// For mutiple variable function calls that need the data
char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS)
{
@ -856,8 +900,10 @@ void dsp_transfer(K_ITEM *item, FILE *stream)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_TRANSFER(t, item);
fprintf(stream, " name='%s' mvalue='%s' malloc=%"PRIu64"\n",
t->name, t->mvalue, t->msiz);
fprintf(stream, " name='%s' mvalue='%s' malloc=%"PRIu64
" intransient=%c\n",
t->name, t->mvalue, t->msiz,
t->intransient ? 'Y' : 'N');
}
}
@ -2141,6 +2187,52 @@ K_ITEM *find_accountbalance(int64_t userid)
return item;
}
void dsp_idcontrol(K_ITEM *item, FILE *stream)
{
char createdate_buf[DATE_BUFSIZ], modifydate_buf[DATE_BUFSIZ];
IDCONTROL *i;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_IDCONTROL(i, item);
tv_to_buf(&(i->createdate), createdate_buf, sizeof(createdate_buf));
tv_to_buf(&(i->modifydate), modifydate_buf, sizeof(modifydate_buf));
fprintf(stream, " idname='%s' lastid=%"PRId64" cdate='%s'"
" cby='%s' ccode='%s' cinet='%s' mdate='%s'"
" mby='%s' mcode='%s' minet='%s'\n",
i->idname, i->lastid, createdate_buf,
i->in_createby, i->in_createcode,
i->in_createinet, modifydate_buf,
i->in_modifyby, i->in_modifycode,
i->in_modifyinet);
}
}
// order by idname asc
cmp_t cmp_idcontrol(K_ITEM *a, K_ITEM *b)
{
IDCONTROL *ida, *idb;
DATA_IDCONTROL(ida, a);
DATA_IDCONTROL(idb, b);
return CMP_STR(ida->idname, idb->idname);
}
// idcontrol must be R or W locked
K_ITEM *find_idcontrol(char *idname)
{
IDCONTROL idcontrol;
K_TREE_CTX ctx[1];
K_ITEM look, *item;
STRNCPY(idcontrol.idname, idname);
INIT_IDCONTROL(&look);
look.data = (void *)(&idcontrol);
item = find_in_ktree(idcontrol_root, &look, ctx);
return item;
}
// order by optionname asc,activationdate asc,activationheight asc,expirydate desc
cmp_t cmp_optioncontrol(K_ITEM *a, K_ITEM *b)
{
@ -4881,7 +4973,8 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
FLDSEP, cd_buf);
DUP_POINTER(payouts_free, payouts->stats, &buf[0]);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
begun = CKPQBegin(conn);
if (!begun)
goto shazbot;
@ -4984,6 +5077,9 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
(double)(pa->payratio) /
(double)paytotal;
used += d64;
payments->in_originaltxn =
payments->in_committxn =
payments->in_commitblockhash = EMPTY;
k_add_tail_nolock(pay_store, pay_item);
ok = payments_add(conn, true, pay_item,
&(payments->old_item),
@ -5013,6 +5109,9 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
payments->amount = amount;
payments->diffacc = miningpayouts->diffacc;
used = amount;
payments->in_originaltxn =
payments->in_committxn =
payments->in_commitblockhash = EMPTY;
k_add_tail_nolock(pay_store, pay_item);
ok = payments_add(conn, true, pay_item,
&(payments->old_item),
@ -6345,7 +6444,7 @@ K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root)
{
PGconn *conn;
PGconn *conn = NULL;
K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL, *s_item = NULL;
@ -6375,7 +6474,7 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
return false;
}
conn = dbconnect();
CKPQConn(&conn);
/* Store all shares in the DB before processing the workmarker
* This way we know that the high shares in the DB will match the start
@ -6434,7 +6533,7 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tvdiff(&proc_lock_fin, &proc_lock_got));
flailed:
PQfinish(conn);
CKPQDisco(&conn, true);
if (count > 0) {
LOGWARNING("%s() Stored: %d high shares %.3fs",

2170
src/ckdb_dbio.c

File diff suppressed because it is too large Load Diff

199
src/klist.c

@ -60,6 +60,63 @@ K_LISTS *all_klists;
#define CHKITEM(__item, __list) _CHKITEM(__item, __list, "item")
void _dsp_kstore(K_STORE *store, char *filename, char *msg, KLIST_FFL_ARGS)
{
K_ITEM *item;
FILE *stream;
struct tm tm;
time_t now_t;
char stamp[128];
if (!(store->master->dsp_func)) {
quithere(1, "List %s has no dsp_func" KLIST_FFL,
store->master->name, KLIST_FFL_PASS);
}
now_t = time(NULL);
localtime_r(&now_t, &tm);
snprintf(stamp, sizeof(stamp),
"[%d-%02d-%02d %02d:%02d:%02d]",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
stream = fopen(filename, "ae");
if (!stream)
{
fprintf(stderr, "%s %s() failed to open '%s' (%d) %s",
stamp, __func__, filename, errno, strerror(errno));
return;
}
if (msg)
fprintf(stream, "%s %s\n", stamp, msg);
else
fprintf(stream, "%s Dump of store '%s':\n", stamp, store->master->name);
if (store->count > 0)
{
K_RLOCK(store->master);
item = store->head;
while (item)
{
store->master->dsp_func(item, stream);
item = item->next;
}
K_RUNLOCK(store->master);
fprintf(stream, "End\n\n");
}
else
fprintf(stream, "Empty kstore\n\n");
fclose(stream);
}
static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS)
{
K_ITEM *item;
@ -134,7 +191,7 @@ static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS)
list->count_up = allocate;
}
K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS)
K_STORE *_k_new_store(K_LIST *list, bool gotlock, KLIST_FFL_ARGS)
{
K_STORE *store;
@ -149,14 +206,31 @@ K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS)
store->lock = NULL;
store->name = list->name;
store->do_tail = list->do_tail;
store->prev_store = NULL;
// Only tracked for lists with a lock
if (store->master->lock == NULL) {
store->next_store = NULL;
store->master->stores++;
} else {
if (!gotlock)
K_WLOCK(list);
// In the master list, next is the head
if (list->next_store)
list->next_store->prev_store = store;
store->next_store = list->next_store;
list->next_store = store;
list->stores++;
if (!gotlock)
K_WUNLOCK(list);
}
return store;
}
K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit,
bool do_tail, bool lock_only, bool without_lock,
bool local_list, const char *name2, KLIST_FFL_ARGS)
bool local_list, const char *name2, int cull_limit,
KLIST_FFL_ARGS)
{
K_LIST *list;
@ -166,6 +240,11 @@ K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit,
if (limit < 0)
quithere(1, "Invalid new list %s with limit %d must be >= 0", name, limit);
/* after culling, the first block of items are again allocated,
* so there's no point culling a single block of items */
if (cull_limit > 0 && cull_limit <= allocate)
quithere(1, "Invalid new list %s with cull_limit %d must be > allocate (%d)", name, cull_limit, allocate);
list = calloc(1, sizeof(*list));
if (!list)
quithere(1, "Failed to calloc list %s", name);
@ -191,6 +270,8 @@ K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit,
list->allocate = allocate;
list->limit = limit;
list->do_tail = do_tail;
list->cull_limit = cull_limit;
list->next_store = list->prev_store = NULL;
if (!(list->is_lock_only))
k_alloc_items(list, KLIST_FFL_PASS);
@ -303,6 +384,58 @@ K_ITEM *_k_unlink_tail(K_LIST *list, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS)
return item;
}
#define CHKCULL(_list) \
do { \
if (!((_list)->is_store) && !((_list)->is_lock_only) && \
(_list)->cull_limit > 0 && \
(_list)->count == (_list)->total && \
(_list)->total >= (_list)->cull_limit) { \
k_cull_list(_list, file, func, line); \
} \
} while(0);
static void k_cull_list(K_LIST *list, KLIST_FFL_ARGS)
{
int i;
CHKLIST(list);
_LIST_WRITE(list, true, file, func, line);
if (list->is_store) {
quithere(1, "List %s can't %s() a store" KLIST_FFL,
list->name, __func__, KLIST_FFL_PASS);
}
if (list->is_lock_only) {
quithere(1, "List %s can't %s() a lock_only" KLIST_FFL,
list->name, __func__, KLIST_FFL_PASS);
}
if (list->count != list->total) {
quithere(1, "List %s can't %s() a list in use" KLIST_FFL,
list->name, __func__, KLIST_FFL_PASS);
}
for (i = 0; i < list->item_mem_count; i++)
free(list->item_memory[i]);
free(list->item_memory);
list->item_memory = NULL;
list->item_mem_count = 0;
for (i = 0; i < list->data_mem_count; i++)
free(list->data_memory[i]);
free(list->data_memory);
list->data_memory = NULL;
list->data_mem_count = 0;
list->total = list->count = list->count_up = 0;
list->head = list->tail = NULL;
list->cull_count++;
k_alloc_items(list, KLIST_FFL_PASS);
}
void _k_add_head(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS)
{
CHKLS(list);
@ -333,6 +466,8 @@ void _k_add_head(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_
list->count++;
list->count_up++;
CHKCULL(list);
}
/* slows it down (of course) - only for debugging
@ -380,6 +515,8 @@ void _k_add_tail(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_
list->count++;
list->count_up++;
CHKCULL(list);
}
// Insert item into the list next after 'after'
@ -418,6 +555,8 @@ void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, LOCK_MAYBE bool
list->count++;
list->count_up++;
// no point checking cull since this wouldn't be an _free list
}
void _k_unlink_item(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS)
@ -484,6 +623,8 @@ void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock,
from->count = 0;
to->count_up += from->count_up;
from->count_up = 0;
CHKCULL(to);
}
void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS)
@ -520,6 +661,8 @@ void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock,
from->count = 0;
to->count_up += from->count_up;
from->count_up = 0;
CHKCULL(to);
}
K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS)
@ -592,47 +735,23 @@ K_STORE *_k_free_store(K_STORE *store, KLIST_FFL_ARGS)
store->name, __func__, KLIST_FFL_PASS);
}
if (store->master->lock == NULL)
store->master->stores--;
else {
K_WLOCK(store->master);
// unlink store from the list
if (store->prev_store)
store->prev_store->next_store = store->next_store;
if (store->next_store)
store->next_store->prev_store = store->prev_store;
// correct the head if we are the head
if (store->master->next_store == store)
store->master->next_store = store->next_store;
store->master->stores--;
K_WUNLOCK(store->master);
}
free(store);
return NULL;
}
// Must be locked and none in use and/or unlinked
void _k_cull_list(K_LIST *list, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS)
{
int i;
CHKLIST(list);
_LIST_WRITE(list, chklock, file, func, line);
if (list->is_store) {
quithere(1, "List %s can't %s() a store" KLIST_FFL,
list->name, __func__, KLIST_FFL_PASS);
}
if (list->count != list->total) {
quithere(1, "List %s can't %s() a list in use" KLIST_FFL,
list->name, __func__, KLIST_FFL_PASS);
}
for (i = 0; i < list->item_mem_count; i++)
free(list->item_memory[i]);
free(list->item_memory);
list->item_memory = NULL;
list->item_mem_count = 0;
for (i = 0; i < list->data_mem_count; i++)
free(list->data_memory[i]);
free(list->data_memory);
list->data_memory = NULL;
list->data_mem_count = 0;
list->total = list->count = list->count_up = 0;
list->head = list->tail = NULL;
list->cull_count++;
k_alloc_items(list, KLIST_FFL_PASS);
}

25
src/klist.h

@ -151,8 +151,11 @@ typedef struct k_list {
int data_mem_count; // how many item data memory buffers have been allocated
void **data_memory; // allocated item data memory buffers
void (*dsp_func)(K_ITEM *, FILE *); // optional data display to a file
int cull_count;
int cull_limit; // <1 means don't cull, otherwise total to cull at
int cull_count; // number of times culled
uint64_t ram; // ram allocated for data pointers - code must manage it
struct k_list *next_store; // list of all stores - the head is next_store in the list master
struct k_list *prev_store; // the stores themselves have their prev and next
int stores; // how many stores it currently has
#if LOCK_CHECK
// Since each thread has it's own k_lock no locking is required on this
@ -663,18 +666,23 @@ static inline K_ITEM *list_rtail(K_LIST *list)
#define STORE_HEAD_NOLOCK(_s) LIST_HEAD_NOLOCK(_s)
#define STORE_TAIL_NOLOCK(_s) LIST_TAIL_NOLOCK(_s)
extern K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS);
#define k_new_store(_list) _k_new_store(_list, KLIST_FFL_HERE)
extern void _dsp_kstore(K_STORE *store, char *filename, char *msg, KLIST_FFL_ARGS);
#define dsp_kstore(_store, _file, _msg) _dsp_kstore(_store, _file, _msg, KLIST_FFL_HERE)
extern K_STORE *_k_new_store(K_LIST *list, bool gotlock, KLIST_FFL_ARGS);
#define k_new_store(_list) _k_new_store(_list, false, KLIST_FFL_HERE)
#define k_new_store_locked(_list) _k_new_store(_list, true, KLIST_FFL_HERE)
extern K_LIST *_k_new_list(const char *name, size_t siz, int allocate,
int limit, bool do_tail, bool lock_only,
bool without_lock, bool local_list,
const char *name2, KLIST_FFL_ARGS);
const char *name2, int cull_limit, KLIST_FFL_ARGS);
#define k_new_list(_name, _siz, _allocate, _limit, _do_tail) \
_k_new_list(_name, _siz, _allocate, _limit, _do_tail, false, false, false, NULL, KLIST_FFL_HERE)
_k_new_list(_name, _siz, _allocate, _limit, _do_tail, false, false, false, NULL, 0, KLIST_FFL_HERE)
#define k_lock_only_list(_name) \
_k_new_list(_name, 1, 1, 1, true, true, false, false, NULL, KLIST_FFL_HERE)
_k_new_list(_name, 1, 1, 1, true, true, false, false, NULL, 0, KLIST_FFL_HERE)
#define k_new_tree_list(_name, _siz, _allocate, _limit, _do_tail, _local_tree, _name2) \
_k_new_list(_name, _siz, _allocate, _limit, _do_tail, false, true, _local_tree, _name2, KLIST_FFL_HERE)
_k_new_list(_name, _siz, _allocate, _limit, _do_tail, false, true, _local_tree, _name2, 0, KLIST_FFL_HERE)
#define k_new_list_cull(_name, _siz, _allocate, _limit, _do_tail, _cull) \
_k_new_list(_name, _siz, _allocate, _limit, _do_tail, false, false, false, NULL, _cull, KLIST_FFL_HERE)
extern K_ITEM *_k_unlink_head(K_LIST *list, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS);
#define k_unlink_head(_list) _k_unlink_head(_list, true, KLIST_FFL_HERE)
#define k_unlink_head_nolock(_list) _k_unlink_head(_list, false, KLIST_FFL_HERE)
@ -709,8 +717,5 @@ extern K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS);
#define k_free_list(_list) _k_free_list(_list, KLIST_FFL_HERE)
extern K_STORE *_k_free_store(K_STORE *store, KLIST_FFL_ARGS);
#define k_free_store(_store) _k_free_store(_store, KLIST_FFL_HERE)
extern void _k_cull_list(K_LIST *list, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS);
#define k_cull_list(_list) _k_cull_list(_list, true, KLIST_FFL_HERE)
//#define k_cull_list_nolock(_list) _k_cull_list(_list, false, KLIST_FFL_HERE)
#endif

6
src/ktree.c

@ -182,8 +182,6 @@ void _dsp_ktree(K_TREE *tree, char *filename, char *msg, KTREE_FFL_ARGS)
if (!(tree->master->dsp_func))
FAIL("NULLDSP NULL dsp_func in %s", tree->master->name);
_TREE_READ(tree, true, file, func, line);
now_t = time(NULL);
localtime_r(&now_t, &tm);
snprintf(stamp, sizeof(stamp),
@ -210,12 +208,16 @@ void _dsp_ktree(K_TREE *tree, char *filename, char *msg, KTREE_FFL_ARGS)
if (tree->root->isNil == No)
{
K_RLOCK(tree->master);
item = first_in_ktree(tree, ctx);
while (item)
{
tree->master->dsp_func(item, stream);
item = next_in_ktree(ctx);
}
K_RUNLOCK(tree->master);
fprintf(stream, "End\n\n");
}
else

Loading…
Cancel
Save