Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 10 years ago
parent
commit
f08eb38061
  1. 61
      pool/page_workers.php
  2. 305
      src/ckdb.c
  3. 11
      src/ckdb.h
  4. 4
      src/ckdb_cmd.c
  5. 5
      src/ckdb_data.c
  6. 71
      src/ckdb_dbio.c
  7. 56
      src/ckpmsg.c
  8. 12
      src/ktree.c
  9. 3
      src/ktree.h
  10. 6
      src/libckpool.c
  11. 8
      src/libckpool.h

61
pool/page_workers.php

@ -15,9 +15,14 @@ function worktitle($data, $user)
return $pg;
}
#
function workhashorder($a, $b)
{
return $b['w_uhr'] - $a['w_uhr'];
}
#
function workuser($data, $user, &$offset, &$totshare, &$totdiff,
&$totinvalid, &$totrate, &$blockacc,
&$blockreward, $old = false)
&$blockreward, $old = false, $srt = false)
{
$ans = getWorkers($user);
@ -28,6 +33,7 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff,
$blockacc = $ans['blockacc'];
if (isset($ans['blockreward']))
$blockreward = $ans['blockreward'];
$all = array();
$count = $ans['rows'];
for ($i = 0; $i < $count; $i++)
{
@ -35,48 +41,69 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff,
if ($old !== false && $lst > $old)
continue;
if ($ans['w_elapsed:'.$i] > 3600)
$uhr = $ans['w_hashrate1hr:'.$i];
else
$uhr = $ans['w_hashrate5m:'.$i];
$all[] = array('workername' => $ans['workername:'.$i],
'w_lastshare' => $ans['w_lastshare:'.$i],
'w_lastdiff' => $ans['w_lastdiff:'.$i],
'w_shareacc' => $ans['w_shareacc:'.$i],
'w_diffacc' => $ans['w_diffacc:'.$i],
'w_diffinv' => $ans['w_diffinv:'.$i],
'w_lastdiff' => $ans['w_lastdiff:'.$i],
'w_uhr' => $uhr);
}
if ($srt)
usort($all, 'workhashorder');
for ($i = 0; $i < $count; $i++)
{
$lst = $ans['STAMP'] - $all[$i]['w_lastshare'];
if ($old !== false && $lst > $old)
continue;
if ((($offset) % 2) == 0)
$row = 'even';
else
$row = 'odd';
$pg .= "<tr class=$row>";
$pg .= '<td class=dl>'.htmlspecialchars($ans['workername:'.$i]).'</td>';
if ($ans['w_lastdiff:'.$i] > 0)
$ld = difffmt($ans['w_lastdiff:'.$i]);
$pg .= '<td class=dl>'.htmlspecialchars($all[$i]['workername']).'</td>';
if ($all[$i]['w_lastdiff'] > 0)
$ld = difffmt($all[$i]['w_lastdiff']);
else
$ld = '&nbsp;';
$pg .= "<td class=dr>$ld</td>";
$pg .= '<td class=dr>'.howlongago($lst).'</td>';
$shareacc = number_format($ans['w_shareacc:'.$i], 0);
$totshare += $ans['w_shareacc:'.$i];
$diffacc = number_format($ans['w_diffacc:'.$i], 0);
$totdiff += $ans['w_diffacc:'.$i];
$shareacc = number_format($all[$i]['w_shareacc'], 0);
$totshare += $all[$i]['w_shareacc'];
$diffacc = number_format($all[$i]['w_diffacc'], 0);
$totdiff += $all[$i]['w_diffacc'];
$pg .= "<td class=dr>$shareacc</td>";
$pg .= "<td class=dr>$diffacc</td>";
$dtot = $ans['w_diffacc:'.$i] + $ans['w_diffinv:'.$i];
$dtot = $all[$i]['w_diffacc'] + $all[$i]['w_diffinv'];
if ($dtot > 0)
$rej = number_format(100.0 * $ans['w_diffinv:'.$i] / $dtot, 3);
$rej = number_format(100.0 * $all[$i]['w_diffinv'] / $dtot, 3);
else
$rej = '0';
$totinvalid += $ans['w_diffinv:'.$i];
$totinvalid += $all[$i]['w_diffinv'];
$pg .= "<td class=dr>$rej%</td>";
if ($blockacc <= 0)
$blkpct = '&nbsp;';
else
$blkpct = number_format(100.0 * $ans['w_diffacc:'.$i] / $blockacc, 3) . '%';
$blkpct = number_format(100.0 * $all[$i]['w_diffacc'] / $blockacc, 3) . '%';
$pg .= "<td class=dr>$blkpct</td>";
if ($ans['w_elapsed:'.$i] > 3600)
$uhr = $ans['w_hashrate1hr:'.$i];
else
$uhr = $ans['w_hashrate5m:'.$i];
$uhr = $all[$i]['w_uhr'];
if ($uhr == '?')
$uhr = '?GHs';
else
@ -138,7 +165,7 @@ function doworker($data, $user)
$pg .= worktitle($data, $user);
$pg .= workuser($data, $user, $offset, $totshare, $totdiff, $totinvalid,
$totrate, $blockacc, $blockreward, false);
$totrate, $blockacc, $blockreward, false, true);
$pg .= worktotal($offset, $totshare, $totdiff, $totinvalid, $totrate,
$blockacc, $blockreward);

305
src/ckdb.c

@ -150,6 +150,11 @@
* and a warning is displayed if there were any matching shares
*/
static bool socketer_using_data;
static bool summariser_using_data;
static bool logger_using_data;
static bool listener_using_data;
char *EMPTY = "";
static char *db_name;
@ -257,6 +262,11 @@ static tv_t confirm_finish;
static tv_t reload_timestamp;
/* Allow overriding the workinfoid range used in the db load of
* workinfo and sharesummary */
int64_t dbload_workinfoid_start = -1;
int64_t dbload_workinfoid_finish = MAXID;
// DB users,workers,auth load is complete
bool db_auths_complete = false;
// DB load is complete
@ -484,12 +494,12 @@ void logmsg(int loglevel, const char *fmt, ...)
now_t = time(NULL);
localtime_r(&now_t, &tm);
minoff = timezone / 60;
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '+';
tzch = '-';
minoff *= -1;
} else
tzch = '-';
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
@ -678,13 +688,24 @@ matane:
return ok;
}
/* Load blocks first to allow data range settings to know
* the blocks info for setting limits for tables in getdata3()
*/
static bool getdata2()
{
PGconn *conn = dbconnect();
bool ok = blocks_fill(conn);
PQfinish(conn);
return ok;
}
static bool getdata3()
{
PGconn *conn = dbconnect();
bool ok = true;
if (!(ok = blocks_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary) {
if (!(ok = paymentaddresses_fill(conn)) || everyone_die)
goto sukamudai;
@ -979,6 +1000,149 @@ static void alloc_storage()
workerstatus_root = new_ktree();
}
static void free_workinfo_data(K_ITEM *item)
{
WORKINFO *workinfo;
DATA_WORKINFO(workinfo, item);
if (workinfo->transactiontree)
free(workinfo->transactiontree);
if (workinfo->merklehash)
free(workinfo->merklehash);
}
static void free_sharesummary_data(K_ITEM *item)
{
SHARESUMMARY *sharesummary;
DATA_SHARESUMMARY(sharesummary, item);
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
}
static void free_optioncontrol_data(K_ITEM *item)
{
OPTIONCONTROL *optioncontrol;
DATA_OPTIONCONTROL(optioncontrol, item);
if (optioncontrol->optionvalue)
free(optioncontrol->optionvalue);
}
#define FREE_TREE(_tree) \
if (_tree ## _root) \
_tree ## _root = free_ktree(_tree ## _root, NULL) \
#define FREE_STORE(_list) \
if (_list ## _store) \
_list ## _store = k_free_store(_list ## _store) \
#define FREE_LIST(_list) \
if (_list ## _free) \
_list ## _free = k_free_list(_list ## _free) \
#define FREE_ALL(_list) FREE_TREE(_list); FREE_STORE(_list); FREE_LIST(_list)
#define FREE_LISTS(_list) FREE_STORE(_list); FREE_LIST(_list)
static void dealloc_storage()
{
K_ITEM *item;
FREE_LISTS(logqueue);
FREE_ALL(workerstatus);
FREE_TREE(userstats_workerstatus);
FREE_TREE(userstats_statsdate);
if (userstats_summ)
userstats_summ = k_free_store(userstats_summ);
FREE_STORE(userstats_eos);
FREE_ALL(userstats);
FREE_ALL(poolstats);
FREE_ALL(auths);
FREE_ALL(miningpayouts);
FREE_ALL(blocks);
FREE_TREE(sharesummary_workinfoid);
FREE_TREE(sharesummary);
if (sharesummary_store) {
item = sharesummary_store->head;
while (item) {
free_sharesummary_data(item);
item = item->next;
}
FREE_STORE(sharesummary);
}
if (sharesummary_free) {
item = sharesummary_free->head;
while (item) {
free_sharesummary_data(item);
item = item->next;
}
FREE_LIST(sharesummary);
}
FREE_ALL(shareerrors);
FREE_ALL(shares);
FREE_TREE(workinfo_height);
FREE_TREE(workinfo);
if (workinfo_store) {
item = workinfo_store->head;
while (item) {
free_workinfo_data(item);
item = item->next;
}
FREE_STORE(workinfo);
}
if (workinfo_free) {
item = workinfo_free->head;
while (item) {
free_workinfo_data(item);
item = item->next;
}
FREE_LIST(workinfo);
}
FREE_LISTS(idcontrol);
FREE_ALL(payments);
FREE_ALL(paymentaddresses);
FREE_ALL(workers);
FREE_TREE(optioncontrol);
if (optioncontrol_store) {
item = optioncontrol_store->head;
while (item) {
free_optioncontrol_data(item);
item = item->next;
}
FREE_STORE(optioncontrol);
}
if (optioncontrol_free) {
item = optioncontrol_free->head;
while (item) {
free_optioncontrol_data(item);
item = item->next;
}
FREE_LIST(optioncontrol);
}
FREE_ALL(useratts);
FREE_TREE(userid);
FREE_ALL(users);
FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue);
FREE_LISTS(workqueue);
}
static bool setup_data()
{
K_TREE_CTX ctx[1];
@ -1001,6 +1165,14 @@ static bool setup_data()
if (!getdata2() || everyone_die)
return false;
if (dbload_workinfoid_start != -1) {
LOGWARNING("WARNING: dbload starting at workinfoid %"PRId64,
dbload_workinfoid_start);
}
if (!getdata3() || everyone_die)
return false;
db_load_complete = true;
if (!reload() || everyone_die)
@ -1434,7 +1606,7 @@ static void summarise_poolstats()
// TODO: consider limiting how much/how long this processes each time
static void summarise_userstats()
{
K_TREE_CTX ctx[1], ctx2[1];
K_TREE_CTX ctx[1];
K_ITEM *first, *last, *new, *next, *tmp;
USERSTATS *userstats, *us_first, *us_last, *us_next;
double statrange, factor;
@ -1503,9 +1675,9 @@ static void summarise_userstats()
DATA_USERSTATS(userstats, new);
memcpy(userstats, us_first, sizeof(USERSTATS));
userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats, ctx2);
userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first,
cmp_userstats_statsdate, ctx2);
cmp_userstats_statsdate);
k_unlink_item(userstats_store, first);
k_add_head(userstats_summ, first);
@ -1530,9 +1702,9 @@ static void summarise_userstats()
userstats->elapsed = us_next->elapsed;
userstats->summarycount += us_next->summarycount;
userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats, ctx2);
userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats);
userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next,
cmp_userstats_statsdate, ctx2);
cmp_userstats_statsdate);
k_unlink_item(userstats_store, next);
k_add_head(userstats_summ, next);
}
@ -1621,6 +1793,8 @@ static void summarise_userstats()
static void *summariser(__maybe_unused void *arg)
{
int i;
pthread_detach(pthread_self());
rename_proc("db_summariser");
@ -1628,23 +1802,43 @@ static void *summariser(__maybe_unused void *arg)
while (!everyone_die && !db_load_complete)
cksleep_ms(42);
summariser_using_data = true;
while (!everyone_die) {
sleep(5);
if (!everyone_die) {
for (i = 0; i < 5; i++) {
if (!everyone_die)
sleep(1);
}
if (everyone_die)
break;
else {
if (startup_complete)
check_blocks();
if (!everyone_die)
summarise_blocks();
}
sleep(4);
for (i = 0; i < 4; i++) {
if (!everyone_die)
sleep(1);
}
if (everyone_die)
break;
else
summarise_poolstats();
sleep(4);
for (i = 0; i < 4; i++) {
if (!everyone_die)
sleep(1);
}
if (everyone_die)
break;
else
summarise_userstats();
}
summariser_using_data = false;
return NULL;
}
@ -1660,6 +1854,8 @@ static void *logger(__maybe_unused void *arg)
snprintf(buf, sizeof(buf), "db%s_logger", dbcode);
rename_proc(buf);
logger_using_data = true;
setnow(&now);
snprintf(buf, sizeof(buf), "logstart.%ld,%ld",
now.tv_sec, now.tv_usec);
@ -1691,12 +1887,19 @@ static void *logger(__maybe_unused void *arg)
logqueue_store->count,
now.tv_sec, now.tv_usec);
LOGFILE(buf);
while((lq_item = k_unlink_head(logqueue_store))) {
if (logqueue_store->count)
LOGERR("%s", buf);
lq_item = logqueue_store->head;
while (lq_item) {
DATA_LOGQUEUE(lq, lq_item);
LOGFILE(lq->msg);
free(lq->msg);
lq_item = lq_item->next;
}
K_WUNLOCK(logqueue_free);
logger_using_data = false;
setnow(&now);
snprintf(buf, sizeof(buf), "logstop.%ld,%ld",
now.tv_sec, now.tv_usec);
@ -1752,6 +1955,8 @@ static void *socketer(__maybe_unused void *arg)
while (!everyone_die && !db_auths_complete)
cksem_mswait(&socketer_sem, 420);
socketer_using_data = true;
want_first = true;
while (!everyone_die) {
if (buf)
@ -2101,6 +2306,8 @@ static void *socketer(__maybe_unused void *arg)
}
}
socketer_using_data = false;
if (buf)
dealloc(buf);
// TODO: if anyone cares, free all the dup buffers :P
@ -2447,6 +2654,8 @@ static void *listener(void *arg)
rename_proc("db_listener");
listener_using_data = true;
if (!setup_data()) {
if (!everyone_die) {
LOGEMERG("ABORTING");
@ -2502,6 +2711,8 @@ static void *listener(void *arg)
}
}
listener_using_data = false;
if (conn)
PQfinish(conn);
@ -2834,8 +3045,17 @@ static void confirm_reload()
__func__, workinfo->workinfoid);
}
} else {
if (confirm_first_workinfoid == 0) {
start.tv_sec = start.tv_usec = 0;
LOGWARNING("%s() no start workinfo found ... using time 0", __func__);
LOGWARNING("%s() no start workinfo found ... "
"using time 0", __func__);
} else {
// Abort, otherwise reload will reload all log files
LOGERR("%s(): Start workinfoid doesn't exist, "
"use 0 to mean from the beginning of time",
__func__);
return;
}
}
/* Find the workinfo after confirm_last_workinfoid-1
@ -2995,6 +3215,8 @@ static void confirm_summaries()
return;
}
free(range);
dbload_workinfoid_start = confirm_range_start - 1;
dbload_workinfoid_finish = confirm_range_finish + 1;
break;
case 'w':
confirm_range_start = atoll(confirm_range+1);
@ -3026,6 +3248,11 @@ static void confirm_summaries()
return;
}
if (!getdata3()) {
LOGEMERG("%s() ABORTING from getdata3()", __func__);
return;
}
confirm_reload();
}
@ -3071,6 +3298,7 @@ static struct option long_options[] = {
{ "dbuser", required_argument, 0, 'u' },
{ "btc-user", required_argument, 0, 'U' },
{ "version", no_argument, 0, 'v' },
{ "workinfoid", required_argument, 0, 'w' },
{ "confirm", no_argument, 0, 'y' },
{ "confirmrange", required_argument, 0, 'Y' },
{ 0, 0, 0, 0 }
@ -3103,7 +3331,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vyY:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
switch(c) {
case 'c':
ckp.config = strdup(optarg);
@ -3189,6 +3417,18 @@ int main(int argc, char **argv)
break;
case 'v':
exit(0);
case 'w':
// Don't use this :)
{
int64_t start = atoll(optarg);
if (start < 0) {
quit(1, "Invalid workinfoid start"
" %"PRId64" - must be >= 0",
start);
}
dbload_workinfoid_start = start;
}
break;
case 'y':
confirm_sharesummary = true;
break;
@ -3269,6 +3509,7 @@ int main(int argc, char **argv)
if (confirm_sharesummary) {
// TODO: add a system lock to stop running 2 at once?
confirm_summaries();
everyone_die = true;
} else {
ckp.main.sockname = strdup("listener");
write_namepid(&ckp.main);
@ -3286,6 +3527,36 @@ int main(int argc, char **argv)
join_pthread(ckp.pth_listener);
}
time_t start, trigger, curr;
char *msg = NULL;
trigger = start = time(NULL);
while (socketer_using_data || summariser_using_data ||
logger_using_data || listener_using_data) {
msg = NULL;
curr = time(NULL);
if (curr - start > 4) {
if (curr - trigger > 4) {
msg = "Shutdown initial delay";
} else if (curr - trigger > 2) {
msg = "Shutdown delay";
}
}
if (msg) {
trigger = curr;
printf("%s %ds due to%s%s%s%s\n",
msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY,
summariser_using_data ? " summariser" : EMPTY,
logger_using_data ? " logger" : EMPTY,
listener_using_data ? " listener" : EMPTY);
fflush(stdout);
}
sleep(1);
}
dealloc_storage();
clean_up(&ckp);
return 0;

11
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.9.2"
#define CKDB_VERSION DB_VERSION"-0.572"
#define CKDB_VERSION DB_VERSION"-0.602"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -225,6 +225,11 @@ extern int64_t confirm_last_workinfoid;
* ckpool uses 10min - but add 1min to be sure */
#define WORKINFO_AGE 660
/* Allow defining the workinfoid range used in the db load of
* workinfo and sharesummary */
extern int64_t dbload_workinfoid_start;
extern int64_t dbload_workinfoid_finish;
// DB users,workers,auth load is complete
extern bool db_auths_complete;
// DB load is complete
@ -998,8 +1003,8 @@ typedef struct blocks {
// 42 doesn't actually mean '42' it means matured
#define BLOCKS_42 'F'
#define BLOCKS_42_STR "F"
// Current block maturity is ... 100
#define BLOCKS_42_VALUE 100
// Current block maturity is ...
#define BLOCKS_42_VALUE 101
#define BLOCKS_ORPHAN 'O'
#define BLOCKS_ORPHAN_STR "O"
/* Block height difference required before checking if it's orphaned

4
src/ckdb_cmd.c

@ -2849,8 +2849,10 @@ rollback:
PQfinish(conn);
if (reason) {
if (oc_item) {
if (optioncontrol->optionvalue)
if (optioncontrol->optionvalue) {
free(optioncontrol->optionvalue);
optioncontrol->optionvalue = NULL;
}
K_WLOCK(optioncontrol_free);
k_add_head(optioncontrol_free, oc_item);
K_WUNLOCK(optioncontrol_free);

5
src/ckdb_data.c

@ -1040,6 +1040,7 @@ cmp_t cmp_paymentaddresses(K_ITEM *a, K_ITEM *b)
return c;
}
// Only one for now ...
K_ITEM *find_paymentaddresses(int64_t userid)
{
PAYMENTADDRESSES paymentaddresses, *pa;
@ -1238,7 +1239,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1];
K_TREE_CTX ss_ctx[1], s_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
SHARESUMMARY looksharesummary, *sharesummary;
@ -1352,7 +1353,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
shares_tot++;
tmp_item = next_in_ktree(s_ctx);
shares_root = remove_from_ktree(shares_root, s_item, cmp_shares, tmp_ctx);
shares_root = remove_from_ktree(shares_root, s_item, cmp_shares);
k_unlink_item(shares_store, s_item);
if (reloading && skipupdate)
shares_dumped++;

71
src/ckdb_dbio.c

@ -310,7 +310,6 @@ bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash,
{
ExecStatusType rescode;
bool conned = false;
K_TREE_CTX ctx[1];
PGresult *res;
K_ITEM *item;
USERS *row, *users;
@ -428,8 +427,8 @@ unparam:
if (!ok)
k_add_head(users_free, item);
else {
users_root = remove_from_ktree(users_root, u_item, cmp_users, ctx);
userid_root = remove_from_ktree(userid_root, u_item, cmp_userid, ctx);
users_root = remove_from_ktree(users_root, u_item, cmp_users);
userid_root = remove_from_ktree(userid_root, u_item, cmp_userid);
copy_tv(&(users->expirydate), cd);
users_root = add_to_ktree(users_root, u_item, cmp_users);
userid_root = add_to_ktree(userid_root, u_item, cmp_userid);
@ -684,7 +683,6 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun)
{
ExecStatusType rescode;
bool conned = false;
K_TREE_CTX ctx[1];
PGresult *res;
K_ITEM *old_item;
USERATTS *old_useratts, *useratts;
@ -789,7 +787,7 @@ unparam:
if (ok) {
// Update it
if (old_item) {
useratts_root = remove_from_ktree(useratts_root, old_item, cmp_useratts, ctx);
useratts_root = remove_from_ktree(useratts_root, old_item, cmp_useratts);
copy_tv(&(old_useratts->expirydate), cd);
useratts_root = add_to_ktree(useratts_root, old_item, cmp_useratts);
}
@ -879,7 +877,6 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd)
{
ExecStatusType rescode;
bool conned = false;
K_TREE_CTX ctx[1];
PGresult *res;
K_ITEM *item;
USERATTS *useratts;
@ -934,7 +931,7 @@ unparam:
K_WLOCK(useratts_free);
if (ok && item) {
useratts_root = remove_from_ktree(useratts_root, item, cmp_useratts, ctx);
useratts_root = remove_from_ktree(useratts_root, item, cmp_useratts);
copy_tv(&(useratts->expirydate), cd);
useratts_root = add_to_ktree(useratts_root, item, cmp_useratts);
}
@ -1424,7 +1421,9 @@ bool workers_fill(PGconn *conn)
return ok;
}
// Whatever the current paymentaddresses are, replace them with this one
/* Whatever the current paymentaddresses are, replace them with this one
* Code allows for zero, one or more current payment address
* even though there currently can only be zero or one */
K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddress,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
@ -1432,7 +1431,7 @@ K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddress,
ExecStatusType rescode;
bool conned = false;
PGresult *res;
K_TREE_CTX ctx[1], ctx2[1];
K_TREE_CTX ctx[1];
K_ITEM *item, *old, *this, look;
PAYMENTADDRESSES *row, pa, *thispa;
char *upd, *ins;
@ -1529,12 +1528,13 @@ unitem:
if (!ok)
k_add_head(paymentaddresses_free, item);
else {
// Remove from ram, old (unneeded) records
// Change the expiry on all the old ones
pa.userid = userid;
pa.expirydate.tv_sec = DATE_S_EOT;
pa.payaddress[0] = '\0';
INIT_PAYMENTADDRESSES(&look);
look.data = (void *)(&pa);
// Tree order is expirydate desc
old = find_after_in_ktree(paymentaddresses_root, &look,
cmp_paymentaddresses, ctx);
while (old) {
@ -1543,9 +1543,15 @@ unitem:
if (thispa->userid != userid)
break;
old = next_in_ktree(ctx);
/* Tree remove+add below doesn't matter since
* this test will avoid reprocessing */
if (CURRENT(&(thispa->expirydate))) {
paymentaddresses_root = remove_from_ktree(paymentaddresses_root, this,
cmp_paymentaddresses, ctx2);
k_add_head(paymentaddresses_free, this);
cmp_paymentaddresses);
copy_tv(&(thispa->expirydate), cd);
paymentaddresses_root = add_to_ktree(paymentaddresses_root, this,
cmp_paymentaddresses);
}
}
paymentaddresses_root = add_to_ktree(paymentaddresses_root, item,
cmp_paymentaddresses);
@ -1565,8 +1571,7 @@ bool paymentaddresses_fill(PGconn *conn)
PGresult *res;
K_ITEM *item;
PAYMENTADDRESSES *row;
char *params[1];
int n, i, par = 0;
int n, i;
char *field;
char *sel;
int fields = 4;
@ -1577,11 +1582,8 @@ bool paymentaddresses_fill(PGconn *conn)
sel = "select "
"paymentaddressid,userid,payaddress,payratio"
HISTORYDATECONTROL
" from paymentaddresses where expirydate=$1";
par = 0;
params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
" from paymentaddresses";
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
@ -1933,7 +1935,7 @@ nostart:
// Discard it
if (old_item) {
optioncontrol_root = remove_from_ktree(optioncontrol_root, old_item,
cmp_optioncontrol, ctx);
cmp_optioncontrol);
k_add_head(optioncontrol_free, old_item);
}
optioncontrol_root = add_to_ktree(optioncontrol_root, oc_item, cmp_optioncontrol);
@ -2141,7 +2143,9 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance,
K_WLOCK(workinfo_free);
if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) {
free(row->transactiontree);
row->transactiontree = NULL;
free(row->merklehash);
row->merklehash = NULL;
workinfoid = row->workinfoid;
k_add_head(workinfo_free, item);
K_WUNLOCK(workinfo_free);
@ -2204,7 +2208,9 @@ unparam:
K_WLOCK(workinfo_free);
if (workinfoid == -1) {
free(row->transactiontree);
row->transactiontree = NULL;
free(row->merklehash);
row->merklehash = NULL;
k_add_head(workinfo_free, item);
} else {
if (row->transactiontree && *(row->transactiontree)) {
@ -2240,7 +2246,7 @@ bool workinfo_fill(PGconn *conn)
PGresult *res;
K_ITEM *item;
WORKINFO *row;
char *params[1];
char *params[3];
int n, i, par = 0;
char *field;
char *sel;
@ -2257,9 +2263,13 @@ bool workinfo_fill(PGconn *conn)
"workinfoid,poolinstance,merklehash,prevhash,"
"coinbase1,coinbase2,version,bits,ntime,reward"
HISTORYDATECONTROL
" from workinfo where expirydate=$1";
" from workinfo where expirydate=$1 and"
" ((workinfoid>=$2 and workinfoid<=$3) or"
" workinfoid in (select workinfoid from blocks) )";
par = 0;
params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
@ -2915,8 +2925,9 @@ bool sharesummary_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
int n, i, par = 0;
SHARESUMMARY *row;
char *params[2];
char *field;
char *sel;
int fields = 19;
@ -2931,8 +2942,12 @@ bool sharesummary_fill(PGconn *conn)
"sharecount,errorcount,firstshare,lastshare,"
"lastdiffacc,complete"
MODIFYDATECONTROL
" from sharesummary";
res = PQexec(conn, sel, CKPQ_READ);
" from sharesummary where workinfoid>=$1 and workinfoid<=$2";
par = 0;
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
@ -3118,7 +3133,6 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
ExecStatusType rescode;
bool conned = false;
PGresult *res = NULL;
K_TREE_CTX ctx[1];
K_ITEM *b_item, *old_b_item;
BLOCKS *row, *oldblocks;
char hash_dsp[16+1];
@ -3243,7 +3257,7 @@ unparam:
k_add_head(blocks_free, b_item);
else {
if (update_old) {
blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx);
blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks);
copy_tv(&(oldblocks->expirydate), cd);
blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks);
}
@ -3265,7 +3279,6 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash,
ExecStatusType rescode;
bool conned = false;
PGresult *res = NULL;
K_TREE_CTX ctx[1];
K_ITEM *b_item, *u_item, *old_b_item;
char cd_buf[DATE_BUFSIZ];
char hash_dsp[16+1];
@ -3546,7 +3559,7 @@ flail:
k_add_head(blocks_free, b_item);
else {
if (update_old) {
blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx);
blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks);
copy_tv(&(oldblocks->expirydate), cd);
blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks);
}

56
src/ckpmsg.c

@ -15,13 +15,53 @@
#include "libckpool.h"
void mkstamp(char *stamp, size_t siz)
{
long minoff, hroff;
char tzinfo[16];
time_t now_t;
struct tm tm;
char tzch;
now_t = time(NULL);
localtime_r(&now_t, &tm);
minoff = tm.tm_gmtoff / 60;
if (minoff < 0) {
tzch = '-';
minoff *= -1;
} else
tzch = '+';
hroff = minoff / 60;
if (minoff % 60) {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld:%02ld",
tzch, hroff, minoff % 60);
} else {
snprintf(tzinfo, sizeof(tzinfo),
"%c%02ld",
tzch, hroff);
}
snprintf(stamp, siz,
"[%d-%02d-%02d %02d:%02d:%02d%s]",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec,
tzinfo);
}
int main(int argc, char **argv)
{
char *name = NULL, *socket_dir = NULL, *buf = NULL;
int tmo1 = RECV_UNIX_TIMEOUT1;
int tmo2 = RECV_UNIX_TIMEOUT2;
bool proxy = false;
char stamp[128];
int c;
while ((c = getopt(argc, argv, "n:s:p")) != -1) {
while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) {
switch(c) {
case 'n':
name = strdup(optarg);
@ -32,6 +72,12 @@ int main(int argc, char **argv)
case 'p':
proxy = true;
break;
case 't':
tmo1 = atoi(optarg);
break;
case 'T':
tmo2 = atoi(optarg);
break;
}
}
if (!socket_dir)
@ -64,7 +110,8 @@ int main(int argc, char **argv)
continue;
}
buf[len - 1] = '\0'; // Strip /n
LOGDEBUG("Got message: %s", buf);
mkstamp(stamp, sizeof(stamp));
LOGDEBUG("%s Got message: %s", stamp, buf);
sockd = open_unix_client(socket_dir);
if (sockd < 0) {
@ -76,13 +123,14 @@ int main(int argc, char **argv)
break;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
buf = recv_unix_msg_tmo2(sockd, tmo1, tmo2);
close(sockd);
if (!buf) {
LOGERR("Received empty message");
continue;
}
LOGNOTICE("Received response: %s", buf);
mkstamp(stamp, sizeof(stamp));
LOGNOTICE("%s Received response: %s", stamp, buf);
}
dealloc(buf);

12
src/ktree.c

@ -947,6 +947,18 @@ DBG("@remove after balance=%f :(\n", cmp);
return root;
}
K_TREE *_remove_from_ktree_free(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), KTREE_FFL_ARGS)
{
K_TREE_CTX ctx[1];
root = _remove_from_ktree(root, data, cmp_funct, ctx, KTREE_FFL_PASS);
if (*ctx)
free(*ctx);
return root;
}
static void free_ktree_sub(K_TREE *ktree, void (*free_funct)(void *))
{
if (ktree != NULL && ktree != nil)

3
src/ktree.h

@ -70,7 +70,8 @@ extern K_ITEM *_find_after_in_ktree(K_TREE *ktree, K_ITEM *data, cmp_t (*cmp_fun
extern K_ITEM *_find_before_in_ktree(K_TREE *ktree, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), K_TREE_CTX *ctx, KTREE_FFL_ARGS);
#define find_before_in_ktree(_ktree, _data, _cmp_funct, _ctx) _find_before_in_ktree(_ktree, _data, _cmp_funct, _ctx, KLIST_FFL_HERE)
extern K_TREE *_remove_from_ktree(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), K_TREE_CTX *ctx, KTREE_FFL_ARGS);
#define remove_from_ktree(_root, _data, _cmp_funct, _ctx) _remove_from_ktree(_root, _data, _cmp_funct, _ctx, KLIST_FFL_HERE)
extern K_TREE *_remove_from_ktree_free(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), KTREE_FFL_ARGS);
#define remove_from_ktree(_root, _data, _cmp_funct) _remove_from_ktree_free(_root, _data, _cmp_funct, KLIST_FFL_HERE)
extern K_TREE *_free_ktree(K_TREE *root, void (*free_funct)(void *), KTREE_FFL_ARGS);
#define free_ktree(_root, _free_funct) _free_ktree(_root, _free_funct, KLIST_FFL_HERE)

6
src/libckpool.c

@ -739,13 +739,13 @@ int read_length(int sockd, void *buf, int len)
/* Use a standard message across the unix sockets:
* 4 byte length of message as little endian encoded uint32_t followed by the
* string. Return NULL in case of failure. */
char *_recv_unix_msg(int sockd, const char *file, const char *func, const int line)
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line)
{
char *buf = NULL;
uint32_t msglen;
int ret;
ret = wait_read_select(sockd, 30);
ret = wait_read_select(sockd, timeout1);
if (unlikely(ret < 1)) {
LOGERR("Select1 failed in recv_unix_msg");
goto out;
@ -761,7 +761,7 @@ char *_recv_unix_msg(int sockd, const char *file, const char *func, const int li
LOGWARNING("Invalid message length %u sent to recv_unix_msg", msglen);
goto out;
}
ret = wait_read_select(sockd, 5);
ret = wait_read_select(sockd, timeout2);
if (unlikely(ret < 1)) {
LOGERR("Select2 failed in recv_unix_msg");
goto out;

8
src/libckpool.h

@ -441,8 +441,12 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_read_select(int sockd, int timeout);
int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, const char *file, const char *func, const int line);
#define recv_unix_msg(sockd) _recv_unix_msg(sockd, __FILE__, __func__, __LINE__)
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);
#define RECV_UNIX_TIMEOUT1 30
#define RECV_UNIX_TIMEOUT2 5
#define recv_unix_msg(sockd) _recv_unix_msg(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__)
#define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__)
int wait_write_select(int sockd, int timeout);
int write_length(int sockd, const void *buf, int len);
bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line);

Loading…
Cancel
Save