diff --git a/pool/page_workers.php b/pool/page_workers.php index 9f2d92fc..938aee9a 100644 --- a/pool/page_workers.php +++ b/pool/page_workers.php @@ -15,9 +15,14 @@ function worktitle($data, $user) return $pg; } # +function workhashorder($a, $b) +{ + return $b['w_uhr'] - $a['w_uhr']; +} +# function workuser($data, $user, &$offset, &$totshare, &$totdiff, &$totinvalid, &$totrate, &$blockacc, - &$blockreward, $old = false) + &$blockreward, $old = false, $srt = false) { $ans = getWorkers($user); @@ -28,6 +33,7 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff, $blockacc = $ans['blockacc']; if (isset($ans['blockreward'])) $blockreward = $ans['blockreward']; + $all = array(); $count = $ans['rows']; for ($i = 0; $i < $count; $i++) { @@ -35,48 +41,69 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff, if ($old !== false && $lst > $old) continue; + if ($ans['w_elapsed:'.$i] > 3600) + $uhr = $ans['w_hashrate1hr:'.$i]; + else + $uhr = $ans['w_hashrate5m:'.$i]; + + $all[] = array('workername' => $ans['workername:'.$i], + 'w_lastshare' => $ans['w_lastshare:'.$i], + 'w_lastdiff' => $ans['w_lastdiff:'.$i], + 'w_shareacc' => $ans['w_shareacc:'.$i], + 'w_diffacc' => $ans['w_diffacc:'.$i], + 'w_diffinv' => $ans['w_diffinv:'.$i], + 'w_lastdiff' => $ans['w_lastdiff:'.$i], + 'w_uhr' => $uhr); + } + + if ($srt) + usort($all, 'workhashorder'); + + for ($i = 0; $i < $count; $i++) + { + $lst = $ans['STAMP'] - $all[$i]['w_lastshare']; + if ($old !== false && $lst > $old) + continue; + if ((($offset) % 2) == 0) $row = 'even'; else $row = 'odd'; $pg .= ""; - $pg .= ''.htmlspecialchars($ans['workername:'.$i]).''; - if ($ans['w_lastdiff:'.$i] > 0) - $ld = difffmt($ans['w_lastdiff:'.$i]); + $pg .= ''.htmlspecialchars($all[$i]['workername']).''; + if ($all[$i]['w_lastdiff'] > 0) + $ld = difffmt($all[$i]['w_lastdiff']); else $ld = ' '; $pg .= "$ld"; $pg .= ''.howlongago($lst).''; - $shareacc = number_format($ans['w_shareacc:'.$i], 0); - $totshare += $ans['w_shareacc:'.$i]; - $diffacc = number_format($ans['w_diffacc:'.$i], 0); - $totdiff += $ans['w_diffacc:'.$i]; + $shareacc = number_format($all[$i]['w_shareacc'], 0); + $totshare += $all[$i]['w_shareacc']; + $diffacc = number_format($all[$i]['w_diffacc'], 0); + $totdiff += $all[$i]['w_diffacc']; $pg .= "$shareacc"; $pg .= "$diffacc"; - $dtot = $ans['w_diffacc:'.$i] + $ans['w_diffinv:'.$i]; + $dtot = $all[$i]['w_diffacc'] + $all[$i]['w_diffinv']; if ($dtot > 0) - $rej = number_format(100.0 * $ans['w_diffinv:'.$i] / $dtot, 3); + $rej = number_format(100.0 * $all[$i]['w_diffinv'] / $dtot, 3); else $rej = '0'; - $totinvalid += $ans['w_diffinv:'.$i]; + $totinvalid += $all[$i]['w_diffinv']; $pg .= "$rej%"; if ($blockacc <= 0) $blkpct = ' '; else - $blkpct = number_format(100.0 * $ans['w_diffacc:'.$i] / $blockacc, 3) . '%'; + $blkpct = number_format(100.0 * $all[$i]['w_diffacc'] / $blockacc, 3) . '%'; $pg .= "$blkpct"; - if ($ans['w_elapsed:'.$i] > 3600) - $uhr = $ans['w_hashrate1hr:'.$i]; - else - $uhr = $ans['w_hashrate5m:'.$i]; + $uhr = $all[$i]['w_uhr']; if ($uhr == '?') $uhr = '?GHs'; else @@ -138,7 +165,7 @@ function doworker($data, $user) $pg .= worktitle($data, $user); $pg .= workuser($data, $user, $offset, $totshare, $totdiff, $totinvalid, - $totrate, $blockacc, $blockreward, false); + $totrate, $blockacc, $blockreward, false, true); $pg .= worktotal($offset, $totshare, $totdiff, $totinvalid, $totrate, $blockacc, $blockreward); diff --git a/src/ckdb.c b/src/ckdb.c index 8c57682c..afe545cc 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -150,6 +150,11 @@ * and a warning is displayed if there were any matching shares */ +static bool socketer_using_data; +static bool summariser_using_data; +static bool logger_using_data; +static bool listener_using_data; + char *EMPTY = ""; static char *db_name; @@ -257,6 +262,11 @@ static tv_t confirm_finish; static tv_t reload_timestamp; +/* Allow overriding the workinfoid range used in the db load of + * workinfo and sharesummary */ +int64_t dbload_workinfoid_start = -1; +int64_t dbload_workinfoid_finish = MAXID; + // DB users,workers,auth load is complete bool db_auths_complete = false; // DB load is complete @@ -484,12 +494,12 @@ void logmsg(int loglevel, const char *fmt, ...) now_t = time(NULL); localtime_r(&now_t, &tm); - minoff = timezone / 60; + minoff = tm.tm_gmtoff / 60; if (minoff < 0) { - tzch = '+'; + tzch = '-'; minoff *= -1; } else - tzch = '-'; + tzch = '+'; hroff = minoff / 60; if (minoff % 60) { snprintf(tzinfo, sizeof(tzinfo), @@ -678,13 +688,24 @@ matane: return ok; } +/* Load blocks first to allow data range settings to know + * the blocks info for setting limits for tables in getdata3() + */ static bool getdata2() +{ + PGconn *conn = dbconnect(); + bool ok = blocks_fill(conn); + + PQfinish(conn); + + return ok; +} + +static bool getdata3() { PGconn *conn = dbconnect(); bool ok = true; - if (!(ok = blocks_fill(conn)) || everyone_die) - goto sukamudai; if (!confirm_sharesummary) { if (!(ok = paymentaddresses_fill(conn)) || everyone_die) goto sukamudai; @@ -979,6 +1000,149 @@ static void alloc_storage() workerstatus_root = new_ktree(); } +static void free_workinfo_data(K_ITEM *item) +{ + WORKINFO *workinfo; + + DATA_WORKINFO(workinfo, item); + if (workinfo->transactiontree) + free(workinfo->transactiontree); + if (workinfo->merklehash) + free(workinfo->merklehash); +} + +static void free_sharesummary_data(K_ITEM *item) +{ + SHARESUMMARY *sharesummary; + + DATA_SHARESUMMARY(sharesummary, item); + SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); + SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); + SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); + SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); + SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); + SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); +} + +static void free_optioncontrol_data(K_ITEM *item) +{ + OPTIONCONTROL *optioncontrol; + + DATA_OPTIONCONTROL(optioncontrol, item); + if (optioncontrol->optionvalue) + free(optioncontrol->optionvalue); +} + + +#define FREE_TREE(_tree) \ + if (_tree ## _root) \ + _tree ## _root = free_ktree(_tree ## _root, NULL) \ + +#define FREE_STORE(_list) \ + if (_list ## _store) \ + _list ## _store = k_free_store(_list ## _store) \ + +#define FREE_LIST(_list) \ + if (_list ## _free) \ + _list ## _free = k_free_list(_list ## _free) \ + +#define FREE_ALL(_list) FREE_TREE(_list); FREE_STORE(_list); FREE_LIST(_list) + +#define FREE_LISTS(_list) FREE_STORE(_list); FREE_LIST(_list) + +static void dealloc_storage() +{ + K_ITEM *item; + + FREE_LISTS(logqueue); + FREE_ALL(workerstatus); + + FREE_TREE(userstats_workerstatus); + FREE_TREE(userstats_statsdate); + if (userstats_summ) + userstats_summ = k_free_store(userstats_summ); + FREE_STORE(userstats_eos); + FREE_ALL(userstats); + + FREE_ALL(poolstats); + FREE_ALL(auths); + FREE_ALL(miningpayouts); + FREE_ALL(blocks); + + FREE_TREE(sharesummary_workinfoid); + FREE_TREE(sharesummary); + if (sharesummary_store) { + item = sharesummary_store->head; + while (item) { + free_sharesummary_data(item); + item = item->next; + } + FREE_STORE(sharesummary); + } + if (sharesummary_free) { + item = sharesummary_free->head; + while (item) { + free_sharesummary_data(item); + item = item->next; + } + FREE_LIST(sharesummary); + } + + FREE_ALL(shareerrors); + FREE_ALL(shares); + + FREE_TREE(workinfo_height); + FREE_TREE(workinfo); + if (workinfo_store) { + item = workinfo_store->head; + while (item) { + free_workinfo_data(item); + item = item->next; + } + FREE_STORE(workinfo); + } + if (workinfo_free) { + item = workinfo_free->head; + while (item) { + free_workinfo_data(item); + item = item->next; + } + FREE_LIST(workinfo); + } + + FREE_LISTS(idcontrol); + FREE_ALL(payments); + FREE_ALL(paymentaddresses); + FREE_ALL(workers); + + FREE_TREE(optioncontrol); + if (optioncontrol_store) { + item = optioncontrol_store->head; + while (item) { + free_optioncontrol_data(item); + item = item->next; + } + FREE_STORE(optioncontrol); + } + if (optioncontrol_free) { + item = optioncontrol_free->head; + while (item) { + free_optioncontrol_data(item); + item = item->next; + } + FREE_LIST(optioncontrol); + } + + FREE_ALL(useratts); + + FREE_TREE(userid); + FREE_ALL(users); + + FREE_LIST(transfer); + FREE_LISTS(heartbeatqueue); + FREE_LISTS(workqueue); +} + static bool setup_data() { K_TREE_CTX ctx[1]; @@ -1001,6 +1165,14 @@ static bool setup_data() if (!getdata2() || everyone_die) return false; + if (dbload_workinfoid_start != -1) { + LOGWARNING("WARNING: dbload starting at workinfoid %"PRId64, + dbload_workinfoid_start); + } + + if (!getdata3() || everyone_die) + return false; + db_load_complete = true; if (!reload() || everyone_die) @@ -1434,7 +1606,7 @@ static void summarise_poolstats() // TODO: consider limiting how much/how long this processes each time static void summarise_userstats() { - K_TREE_CTX ctx[1], ctx2[1]; + K_TREE_CTX ctx[1]; K_ITEM *first, *last, *new, *next, *tmp; USERSTATS *userstats, *us_first, *us_last, *us_next; double statrange, factor; @@ -1503,9 +1675,9 @@ static void summarise_userstats() DATA_USERSTATS(userstats, new); memcpy(userstats, us_first, sizeof(USERSTATS)); - userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats, ctx2); + userstats_root = remove_from_ktree(userstats_root, first, cmp_userstats); userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, first, - cmp_userstats_statsdate, ctx2); + cmp_userstats_statsdate); k_unlink_item(userstats_store, first); k_add_head(userstats_summ, first); @@ -1530,9 +1702,9 @@ static void summarise_userstats() userstats->elapsed = us_next->elapsed; userstats->summarycount += us_next->summarycount; - userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats, ctx2); + userstats_root = remove_from_ktree(userstats_root, next, cmp_userstats); userstats_statsdate_root = remove_from_ktree(userstats_statsdate_root, next, - cmp_userstats_statsdate, ctx2); + cmp_userstats_statsdate); k_unlink_item(userstats_store, next); k_add_head(userstats_summ, next); } @@ -1621,6 +1793,8 @@ static void summarise_userstats() static void *summariser(__maybe_unused void *arg) { + int i; + pthread_detach(pthread_self()); rename_proc("db_summariser"); @@ -1628,23 +1802,43 @@ static void *summariser(__maybe_unused void *arg) while (!everyone_die && !db_load_complete) cksleep_ms(42); + summariser_using_data = true; + while (!everyone_die) { - sleep(5); - if (!everyone_die) { + for (i = 0; i < 5; i++) { + if (!everyone_die) + sleep(1); + } + if (everyone_die) + break; + else { if (startup_complete) check_blocks(); - summarise_blocks(); + if (!everyone_die) + summarise_blocks(); } - sleep(4); - if (!everyone_die) + for (i = 0; i < 4; i++) { + if (!everyone_die) + sleep(1); + } + if (everyone_die) + break; + else summarise_poolstats(); - sleep(4); - if (!everyone_die) + for (i = 0; i < 4; i++) { + if (!everyone_die) + sleep(1); + } + if (everyone_die) + break; + else summarise_userstats(); } + summariser_using_data = false; + return NULL; } @@ -1660,6 +1854,8 @@ static void *logger(__maybe_unused void *arg) snprintf(buf, sizeof(buf), "db%s_logger", dbcode); rename_proc(buf); + logger_using_data = true; + setnow(&now); snprintf(buf, sizeof(buf), "logstart.%ld,%ld", now.tv_sec, now.tv_usec); @@ -1691,12 +1887,19 @@ static void *logger(__maybe_unused void *arg) logqueue_store->count, now.tv_sec, now.tv_usec); LOGFILE(buf); - while((lq_item = k_unlink_head(logqueue_store))) { + if (logqueue_store->count) + LOGERR("%s", buf); + lq_item = logqueue_store->head; + while (lq_item) { DATA_LOGQUEUE(lq, lq_item); LOGFILE(lq->msg); + free(lq->msg); + lq_item = lq_item->next; } K_WUNLOCK(logqueue_free); + logger_using_data = false; + setnow(&now); snprintf(buf, sizeof(buf), "logstop.%ld,%ld", now.tv_sec, now.tv_usec); @@ -1752,6 +1955,8 @@ static void *socketer(__maybe_unused void *arg) while (!everyone_die && !db_auths_complete) cksem_mswait(&socketer_sem, 420); + socketer_using_data = true; + want_first = true; while (!everyone_die) { if (buf) @@ -2101,6 +2306,8 @@ static void *socketer(__maybe_unused void *arg) } } + socketer_using_data = false; + if (buf) dealloc(buf); // TODO: if anyone cares, free all the dup buffers :P @@ -2447,6 +2654,8 @@ static void *listener(void *arg) rename_proc("db_listener"); + listener_using_data = true; + if (!setup_data()) { if (!everyone_die) { LOGEMERG("ABORTING"); @@ -2502,6 +2711,8 @@ static void *listener(void *arg) } } + listener_using_data = false; + if (conn) PQfinish(conn); @@ -2834,8 +3045,17 @@ static void confirm_reload() __func__, workinfo->workinfoid); } } else { - start.tv_sec = start.tv_usec = 0; - LOGWARNING("%s() no start workinfo found ... using time 0", __func__); + if (confirm_first_workinfoid == 0) { + start.tv_sec = start.tv_usec = 0; + LOGWARNING("%s() no start workinfo found ... " + "using time 0", __func__); + } else { + // Abort, otherwise reload will reload all log files + LOGERR("%s(): Start workinfoid doesn't exist, " + "use 0 to mean from the beginning of time", + __func__); + return; + } } /* Find the workinfo after confirm_last_workinfoid-1 @@ -2995,6 +3215,8 @@ static void confirm_summaries() return; } free(range); + dbload_workinfoid_start = confirm_range_start - 1; + dbload_workinfoid_finish = confirm_range_finish + 1; break; case 'w': confirm_range_start = atoll(confirm_range+1); @@ -3026,6 +3248,11 @@ static void confirm_summaries() return; } + if (!getdata3()) { + LOGEMERG("%s() ABORTING from getdata3()", __func__); + return; + } + confirm_reload(); } @@ -3071,6 +3298,7 @@ static struct option long_options[] = { { "dbuser", required_argument, 0, 'u' }, { "btc-user", required_argument, 0, 'U' }, { "version", no_argument, 0, 'v' }, + { "workinfoid", required_argument, 0, 'w' }, { "confirm", no_argument, 0, 'y' }, { "confirmrange", required_argument, 0, 'Y' }, { 0, 0, 0, 0 } @@ -3103,7 +3331,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vyY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'c': ckp.config = strdup(optarg); @@ -3189,6 +3417,18 @@ int main(int argc, char **argv) break; case 'v': exit(0); + case 'w': + // Don't use this :) + { + int64_t start = atoll(optarg); + if (start < 0) { + quit(1, "Invalid workinfoid start" + " %"PRId64" - must be >= 0", + start); + } + dbload_workinfoid_start = start; + } + break; case 'y': confirm_sharesummary = true; break; @@ -3269,6 +3509,7 @@ int main(int argc, char **argv) if (confirm_sharesummary) { // TODO: add a system lock to stop running 2 at once? confirm_summaries(); + everyone_die = true; } else { ckp.main.sockname = strdup("listener"); write_namepid(&ckp.main); @@ -3286,6 +3527,36 @@ int main(int argc, char **argv) join_pthread(ckp.pth_listener); } + time_t start, trigger, curr; + char *msg = NULL; + + trigger = start = time(NULL); + while (socketer_using_data || summariser_using_data || + logger_using_data || listener_using_data) { + msg = NULL; + curr = time(NULL); + if (curr - start > 4) { + if (curr - trigger > 4) { + msg = "Shutdown initial delay"; + } else if (curr - trigger > 2) { + msg = "Shutdown delay"; + } + } + if (msg) { + trigger = curr; + printf("%s %ds due to%s%s%s%s\n", + msg, (int)(curr - start), + socketer_using_data ? " socketer" : EMPTY, + summariser_using_data ? " summariser" : EMPTY, + logger_using_data ? " logger" : EMPTY, + listener_using_data ? " listener" : EMPTY); + fflush(stdout); + } + sleep(1); + } + + dealloc_storage(); + clean_up(&ckp); return 0; diff --git a/src/ckdb.h b/src/ckdb.h index fb8977e9..72aa2e6b 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.572" +#define CKDB_VERSION DB_VERSION"-0.602" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -225,6 +225,11 @@ extern int64_t confirm_last_workinfoid; * ckpool uses 10min - but add 1min to be sure */ #define WORKINFO_AGE 660 +/* Allow defining the workinfoid range used in the db load of + * workinfo and sharesummary */ +extern int64_t dbload_workinfoid_start; +extern int64_t dbload_workinfoid_finish; + // DB users,workers,auth load is complete extern bool db_auths_complete; // DB load is complete @@ -998,8 +1003,8 @@ typedef struct blocks { // 42 doesn't actually mean '42' it means matured #define BLOCKS_42 'F' #define BLOCKS_42_STR "F" -// Current block maturity is ... 100 -#define BLOCKS_42_VALUE 100 +// Current block maturity is ... +#define BLOCKS_42_VALUE 101 #define BLOCKS_ORPHAN 'O' #define BLOCKS_ORPHAN_STR "O" /* Block height difference required before checking if it's orphaned diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 7bed4bfb..d8e34e4e 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -2849,8 +2849,10 @@ rollback: PQfinish(conn); if (reason) { if (oc_item) { - if (optioncontrol->optionvalue) + if (optioncontrol->optionvalue) { free(optioncontrol->optionvalue); + optioncontrol->optionvalue = NULL; + } K_WLOCK(optioncontrol_free); k_add_head(optioncontrol_free, oc_item); K_WUNLOCK(optioncontrol_free); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 30a113f3..9985c827 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -1040,6 +1040,7 @@ cmp_t cmp_paymentaddresses(K_ITEM *a, K_ITEM *b) return c; } +// Only one for now ... K_ITEM *find_paymentaddresses(int64_t userid) { PAYMENTADDRESSES paymentaddresses, *pa; @@ -1238,7 +1239,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, int64_t *s_count, int64_t *s_diff) { K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item, *tmp_item; - K_TREE_CTX ss_ctx[1], s_ctx[1], tmp_ctx[1]; + K_TREE_CTX ss_ctx[1], s_ctx[1]; char cd_buf[DATE_BUFSIZ]; int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped; SHARESUMMARY looksharesummary, *sharesummary; @@ -1352,7 +1353,7 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, shares_tot++; tmp_item = next_in_ktree(s_ctx); - shares_root = remove_from_ktree(shares_root, s_item, cmp_shares, tmp_ctx); + shares_root = remove_from_ktree(shares_root, s_item, cmp_shares); k_unlink_item(shares_store, s_item); if (reloading && skipupdate) shares_dumped++; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index f9258915..5c645083 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -310,7 +310,6 @@ bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash, { ExecStatusType rescode; bool conned = false; - K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; USERS *row, *users; @@ -428,8 +427,8 @@ unparam: if (!ok) k_add_head(users_free, item); else { - users_root = remove_from_ktree(users_root, u_item, cmp_users, ctx); - userid_root = remove_from_ktree(userid_root, u_item, cmp_userid, ctx); + users_root = remove_from_ktree(users_root, u_item, cmp_users); + userid_root = remove_from_ktree(userid_root, u_item, cmp_userid); copy_tv(&(users->expirydate), cd); users_root = add_to_ktree(users_root, u_item, cmp_users); userid_root = add_to_ktree(userid_root, u_item, cmp_userid); @@ -684,7 +683,6 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun) { ExecStatusType rescode; bool conned = false; - K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *old_item; USERATTS *old_useratts, *useratts; @@ -789,7 +787,7 @@ unparam: if (ok) { // Update it if (old_item) { - useratts_root = remove_from_ktree(useratts_root, old_item, cmp_useratts, ctx); + useratts_root = remove_from_ktree(useratts_root, old_item, cmp_useratts); copy_tv(&(old_useratts->expirydate), cd); useratts_root = add_to_ktree(useratts_root, old_item, cmp_useratts); } @@ -879,7 +877,6 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd) { ExecStatusType rescode; bool conned = false; - K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; USERATTS *useratts; @@ -934,7 +931,7 @@ unparam: K_WLOCK(useratts_free); if (ok && item) { - useratts_root = remove_from_ktree(useratts_root, item, cmp_useratts, ctx); + useratts_root = remove_from_ktree(useratts_root, item, cmp_useratts); copy_tv(&(useratts->expirydate), cd); useratts_root = add_to_ktree(useratts_root, item, cmp_useratts); } @@ -1424,7 +1421,9 @@ bool workers_fill(PGconn *conn) return ok; } -// Whatever the current paymentaddresses are, replace them with this one +/* Whatever the current paymentaddresses are, replace them with this one + * Code allows for zero, one or more current payment address + * even though there currently can only be zero or one */ K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddress, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) @@ -1432,7 +1431,7 @@ K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddress, ExecStatusType rescode; bool conned = false; PGresult *res; - K_TREE_CTX ctx[1], ctx2[1]; + K_TREE_CTX ctx[1]; K_ITEM *item, *old, *this, look; PAYMENTADDRESSES *row, pa, *thispa; char *upd, *ins; @@ -1529,12 +1528,13 @@ unitem: if (!ok) k_add_head(paymentaddresses_free, item); else { - // Remove from ram, old (unneeded) records + // Change the expiry on all the old ones pa.userid = userid; pa.expirydate.tv_sec = DATE_S_EOT; pa.payaddress[0] = '\0'; INIT_PAYMENTADDRESSES(&look); look.data = (void *)(&pa); + // Tree order is expirydate desc old = find_after_in_ktree(paymentaddresses_root, &look, cmp_paymentaddresses, ctx); while (old) { @@ -1543,9 +1543,15 @@ unitem: if (thispa->userid != userid) break; old = next_in_ktree(ctx); - paymentaddresses_root = remove_from_ktree(paymentaddresses_root, this, - cmp_paymentaddresses, ctx2); - k_add_head(paymentaddresses_free, this); + /* Tree remove+add below doesn't matter since + * this test will avoid reprocessing */ + if (CURRENT(&(thispa->expirydate))) { + paymentaddresses_root = remove_from_ktree(paymentaddresses_root, this, + cmp_paymentaddresses); + copy_tv(&(thispa->expirydate), cd); + paymentaddresses_root = add_to_ktree(paymentaddresses_root, this, + cmp_paymentaddresses); + } } paymentaddresses_root = add_to_ktree(paymentaddresses_root, item, cmp_paymentaddresses); @@ -1565,8 +1571,7 @@ bool paymentaddresses_fill(PGconn *conn) PGresult *res; K_ITEM *item; PAYMENTADDRESSES *row; - char *params[1]; - int n, i, par = 0; + int n, i; char *field; char *sel; int fields = 4; @@ -1577,11 +1582,8 @@ bool paymentaddresses_fill(PGconn *conn) sel = "select " "paymentaddressid,userid,payaddress,payratio" HISTORYDATECONTROL - " from paymentaddresses where expirydate=$1"; - par = 0; - params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); - PARCHK(par, params); - res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); + " from paymentaddresses"; + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -1933,7 +1935,7 @@ nostart: // Discard it if (old_item) { optioncontrol_root = remove_from_ktree(optioncontrol_root, old_item, - cmp_optioncontrol, ctx); + cmp_optioncontrol); k_add_head(optioncontrol_free, old_item); } optioncontrol_root = add_to_ktree(optioncontrol_root, oc_item, cmp_optioncontrol); @@ -2141,7 +2143,9 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance, K_WLOCK(workinfo_free); if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { free(row->transactiontree); + row->transactiontree = NULL; free(row->merklehash); + row->merklehash = NULL; workinfoid = row->workinfoid; k_add_head(workinfo_free, item); K_WUNLOCK(workinfo_free); @@ -2204,7 +2208,9 @@ unparam: K_WLOCK(workinfo_free); if (workinfoid == -1) { free(row->transactiontree); + row->transactiontree = NULL; free(row->merklehash); + row->merklehash = NULL; k_add_head(workinfo_free, item); } else { if (row->transactiontree && *(row->transactiontree)) { @@ -2240,7 +2246,7 @@ bool workinfo_fill(PGconn *conn) PGresult *res; K_ITEM *item; WORKINFO *row; - char *params[1]; + char *params[3]; int n, i, par = 0; char *field; char *sel; @@ -2257,9 +2263,13 @@ bool workinfo_fill(PGconn *conn) "workinfoid,poolinstance,merklehash,prevhash," "coinbase1,coinbase2,version,bits,ntime,reward" HISTORYDATECONTROL - " from workinfo where expirydate=$1"; + " from workinfo where expirydate=$1 and" + " ((workinfoid>=$2 and workinfoid<=$3) or" + " workinfoid in (select workinfoid from blocks) )"; par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); + params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); + params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0); PARCHK(par, params); res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); @@ -2915,8 +2925,9 @@ bool sharesummary_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; + int n, i, par = 0; SHARESUMMARY *row; + char *params[2]; char *field; char *sel; int fields = 19; @@ -2931,8 +2942,12 @@ bool sharesummary_fill(PGconn *conn) "sharecount,errorcount,firstshare,lastshare," "lastdiffacc,complete" MODIFYDATECONTROL - " from sharesummary"; - res = PQexec(conn, sel, CKPQ_READ); + " from sharesummary where workinfoid>=$1 and workinfoid<=$2"; + par = 0; + params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); + params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0); + PARCHK(par, params); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Select", rescode, conn); @@ -3118,7 +3133,6 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, ExecStatusType rescode; bool conned = false; PGresult *res = NULL; - K_TREE_CTX ctx[1]; K_ITEM *b_item, *old_b_item; BLOCKS *row, *oldblocks; char hash_dsp[16+1]; @@ -3243,7 +3257,7 @@ unparam: k_add_head(blocks_free, b_item); else { if (update_old) { - blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx); + blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks); copy_tv(&(oldblocks->expirydate), cd); blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks); } @@ -3265,7 +3279,6 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash, ExecStatusType rescode; bool conned = false; PGresult *res = NULL; - K_TREE_CTX ctx[1]; K_ITEM *b_item, *u_item, *old_b_item; char cd_buf[DATE_BUFSIZ]; char hash_dsp[16+1]; @@ -3546,7 +3559,7 @@ flail: k_add_head(blocks_free, b_item); else { if (update_old) { - blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx); + blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks); copy_tv(&(oldblocks->expirydate), cd); blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks); } diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 9129a43c..c6f4e387 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -15,13 +15,53 @@ #include "libckpool.h" +void mkstamp(char *stamp, size_t siz) +{ + long minoff, hroff; + char tzinfo[16]; + time_t now_t; + struct tm tm; + char tzch; + + now_t = time(NULL); + localtime_r(&now_t, &tm); + minoff = tm.tm_gmtoff / 60; + if (minoff < 0) { + tzch = '-'; + minoff *= -1; + } else + tzch = '+'; + hroff = minoff / 60; + if (minoff % 60) { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld:%02ld", + tzch, hroff, minoff % 60); + } else { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld", + tzch, hroff); + } + snprintf(stamp, siz, + "[%d-%02d-%02d %02d:%02d:%02d%s]", + tm.tm_year + 1900, + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec, + tzinfo); +} + int main(int argc, char **argv) { char *name = NULL, *socket_dir = NULL, *buf = NULL; + int tmo1 = RECV_UNIX_TIMEOUT1; + int tmo2 = RECV_UNIX_TIMEOUT2; bool proxy = false; + char stamp[128]; int c; - while ((c = getopt(argc, argv, "n:s:p")) != -1) { + while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) { switch(c) { case 'n': name = strdup(optarg); @@ -32,6 +72,12 @@ int main(int argc, char **argv) case 'p': proxy = true; break; + case 't': + tmo1 = atoi(optarg); + break; + case 'T': + tmo2 = atoi(optarg); + break; } } if (!socket_dir) @@ -64,7 +110,8 @@ int main(int argc, char **argv) continue; } buf[len - 1] = '\0'; // Strip /n - LOGDEBUG("Got message: %s", buf); + mkstamp(stamp, sizeof(stamp)); + LOGDEBUG("%s Got message: %s", stamp, buf); sockd = open_unix_client(socket_dir); if (sockd < 0) { @@ -76,13 +123,14 @@ int main(int argc, char **argv) break; } dealloc(buf); - buf = recv_unix_msg(sockd); + buf = recv_unix_msg_tmo2(sockd, tmo1, tmo2); close(sockd); if (!buf) { LOGERR("Received empty message"); continue; } - LOGNOTICE("Received response: %s", buf); + mkstamp(stamp, sizeof(stamp)); + LOGNOTICE("%s Received response: %s", stamp, buf); } dealloc(buf); diff --git a/src/ktree.c b/src/ktree.c index 57525c4e..c94d12fb 100644 --- a/src/ktree.c +++ b/src/ktree.c @@ -947,6 +947,18 @@ DBG("@remove after balance=%f :(\n", cmp); return root; } +K_TREE *_remove_from_ktree_free(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), KTREE_FFL_ARGS) +{ + K_TREE_CTX ctx[1]; + + root = _remove_from_ktree(root, data, cmp_funct, ctx, KTREE_FFL_PASS); + + if (*ctx) + free(*ctx); + + return root; +} + static void free_ktree_sub(K_TREE *ktree, void (*free_funct)(void *)) { if (ktree != NULL && ktree != nil) diff --git a/src/ktree.h b/src/ktree.h index dfacb4c1..0721610a 100644 --- a/src/ktree.h +++ b/src/ktree.h @@ -70,7 +70,8 @@ extern K_ITEM *_find_after_in_ktree(K_TREE *ktree, K_ITEM *data, cmp_t (*cmp_fun extern K_ITEM *_find_before_in_ktree(K_TREE *ktree, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), K_TREE_CTX *ctx, KTREE_FFL_ARGS); #define find_before_in_ktree(_ktree, _data, _cmp_funct, _ctx) _find_before_in_ktree(_ktree, _data, _cmp_funct, _ctx, KLIST_FFL_HERE) extern K_TREE *_remove_from_ktree(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), K_TREE_CTX *ctx, KTREE_FFL_ARGS); -#define remove_from_ktree(_root, _data, _cmp_funct, _ctx) _remove_from_ktree(_root, _data, _cmp_funct, _ctx, KLIST_FFL_HERE) +extern K_TREE *_remove_from_ktree_free(K_TREE *root, K_ITEM *data, cmp_t (*cmp_funct)(K_ITEM *, K_ITEM *), KTREE_FFL_ARGS); +#define remove_from_ktree(_root, _data, _cmp_funct) _remove_from_ktree_free(_root, _data, _cmp_funct, KLIST_FFL_HERE) extern K_TREE *_free_ktree(K_TREE *root, void (*free_funct)(void *), KTREE_FFL_ARGS); #define free_ktree(_root, _free_funct) _free_ktree(_root, _free_funct, KLIST_FFL_HERE) diff --git a/src/libckpool.c b/src/libckpool.c index dab6799e..8fb55173 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -739,13 +739,13 @@ int read_length(int sockd, void *buf, int len) /* Use a standard message across the unix sockets: * 4 byte length of message as little endian encoded uint32_t followed by the * string. Return NULL in case of failure. */ -char *_recv_unix_msg(int sockd, const char *file, const char *func, const int line) +char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line) { char *buf = NULL; uint32_t msglen; int ret; - ret = wait_read_select(sockd, 30); + ret = wait_read_select(sockd, timeout1); if (unlikely(ret < 1)) { LOGERR("Select1 failed in recv_unix_msg"); goto out; @@ -761,7 +761,7 @@ char *_recv_unix_msg(int sockd, const char *file, const char *func, const int li LOGWARNING("Invalid message length %u sent to recv_unix_msg", msglen); goto out; } - ret = wait_read_select(sockd, 5); + ret = wait_read_select(sockd, timeout2); if (unlikely(ret < 1)) { LOGERR("Select2 failed in recv_unix_msg"); goto out; diff --git a/src/libckpool.h b/src/libckpool.h index b01967e5..93323d9c 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -441,8 +441,12 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) int wait_read_select(int sockd, int timeout); int read_length(int sockd, void *buf, int len); -char *_recv_unix_msg(int sockd, const char *file, const char *func, const int line); -#define recv_unix_msg(sockd) _recv_unix_msg(sockd, __FILE__, __func__, __LINE__) +char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); +#define RECV_UNIX_TIMEOUT1 30 +#define RECV_UNIX_TIMEOUT2 5 +#define recv_unix_msg(sockd) _recv_unix_msg(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) +#define recv_unix_msg_tmo(sockd, tmo) _recv_unix_msg(sockd, tmo, RECV_UNIX_TIMEOUT2, __FILE__, __func__, __LINE__) +#define recv_unix_msg_tmo2(sockd, tmo1, tmo2) _recv_unix_msg(sockd, tmo1, tmo2, __FILE__, __func__, __LINE__) int wait_write_select(int sockd, int timeout); int write_length(int sockd, const void *buf, int len); bool _send_unix_msg(int sockd, const char *buf, const char *file, const char *func, const int line);