Browse Source

ckdb - keysummary generation initial coding

master
kanoi 8 years ago
parent
commit
f01d892344
  1. 452
      src/ckdb.c
  2. 10
      src/ckdb.h
  3. 26
      src/ckdb_cmd.c
  4. 149
      src/ckdb_data.c
  5. 38
      src/ckdb_dbio.c

452
src/ckdb.c

@ -144,8 +144,10 @@ static char *status_chars = "|/-\\";
static char *restorefrom;
static bool ignore_seq = false;
static bool ignore_seqall = false;
bool genpayout_auto;
bool markersummary_auto;
bool exclusive_db = true;
enum free_modes free_mode = FREE_MODE_FAST;
@ -209,6 +211,24 @@ const tv_t default_expiry = { DEFAULT_EXPIRY, 0L };
const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT };
const tv_t date_begin = { DATE_BEGIN, 0L };
// argv -K - don't run in ckdb mode, just update keysummary
bool key_update;
/* Requires all workmarkers and workinfo present in the database
* they are all loaded during startup, but you can limit the
* workinfo loaded with -w - if any required are missing,
* it will stop with an error saying what was missing
* keysummary records are not loaded, but SQL stores of duplicate
* keysummary records are reported and ignored
*
* Valid options are:
* mNNN-MMM workmarker markerid range to process
* this determines the CCL file range loaded
* and the workinfoid range processed
*/
static char *key_range;
int64_t key_wi_stt;
int64_t key_wi_fin;
// argv -y - don't run in ckdb mode, just confirm sharesummaries
bool confirm_sharesummary;
@ -252,7 +272,6 @@ int64_t confirm_last_workinfoid;
/* Stop the reload 11min after the 'last' workinfoid+1 appears
* ckpool uses 10min - but add 1min to be sure */
#define WORKINFO_AGE 660
static tv_t confirm_finish;
static tv_t reload_timestamp;
@ -758,6 +777,7 @@ K_STORE *userinfo_store;
static char logname_db[512];
static char logname_io[512];
static char *dbcode;
static bool no_data_log = false;
// low spec version of rotating_log() - no locking
static bool rotating_log_nolock(char *msg, char *prefix)
@ -787,6 +807,9 @@ static void log_queue_message(char *msg, bool db)
K_ITEM *lq_item;
LOGQUEUE *lq;
if (no_data_log)
return;
K_WLOCK(logqueue_free);
lq_item = k_unlink_head(logqueue_free);
DATA_LOGQUEUE(lq, lq_item);
@ -1202,7 +1225,7 @@ static bool getdata3()
PGconn *conn = dbconnect();
bool ok = true;
if (!confirm_sharesummary) {
if (!key_update && !confirm_sharesummary) {
if (!(ok = paymentaddresses_fill(conn)) || everyone_die)
goto sukamudai;
/* FYI must be after blocks */
@ -1222,21 +1245,25 @@ static bool getdata3()
/* must be after workinfo */
if (!(ok = workmarkers_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary) {
if (!key_update && !confirm_sharesummary) {
/* must be after workmarkers */
if (!(ok = payouts_fill(conn)) || everyone_die)
goto sukamudai;
}
PQfinish(conn);
conn = dbconnect();
if (!key_update) {
if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai;
}
PQfinish(conn);
conn = dbconnect();
if (!key_update) {
if (!(ok = shares_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary && !everyone_die)
ok = poolstats_fill(conn);
}
sukamudai:
@ -1244,7 +1271,7 @@ sukamudai:
return ok;
}
static bool reload_from(tv_t *start);
static bool reload_from(tv_t *start, const tv_t *finish);
static bool reload()
{
@ -1297,7 +1324,7 @@ static bool reload()
}
free(filename);
}
return reload_from(&start);
return reload_from(&start, &date_eot);
}
/* Open the file in path, check if there is a pid in there that still exists
@ -1865,10 +1892,12 @@ static void dealloc_storage()
return;
}
if (logqueue_free) {
LOGWARNING("%s() logqueue ...", __func__);
FREE_LISTS(logqueue);
}
LOGWARNING("%s() user/marks ...", __func__);
FREE_ALL(userinfo);
FREE_TREE(marks);
@ -1933,8 +1962,8 @@ static void dealloc_storage()
FREE_ALL(miningpayouts);
FREE_ALL(blocks);
if (sharesummary_free) {
LOGWARNING("%s() sharesummary ...", __func__);
FREE_TREE(sharesummary_pool);
k_list_transfer_to_tail_nolock(sharesummary_pool_store,
sharesummary_store);
@ -1943,9 +1972,10 @@ static void dealloc_storage()
FREE_TREE(sharesummary);
FREE_STORE_DATA(sharesummary);
FREE_LIST_DATA(sharesummary);
}
if (shares_free) {
LOGWARNING("%s() shares ...", __func__);
if (shareerrors_early_store->count > 0) {
LOGERR("%s() *** shareerrors_early count %d ***",
__func__, shareerrors_early_store->count);
@ -1992,12 +2022,12 @@ static void dealloc_storage()
FREE_TREE(shares_early);
FREE_STORE(shares_early);
FREE_ALL(shares);
}
if (free_mode != FREE_MODE_ALL)
LOGWARNING("%s() workinfo skipped", __func__);
else {
LOGWARNING("%s() workinfo ...", __func__);
FREE_TREE(workinfo_height);
FREE_TREE(workinfo);
FREE_STORE_DATA(workinfo);
@ -2048,6 +2078,7 @@ static void dealloc_storage()
FREE_LIST(breakqueue);
FREE_LISTS(msgline);
if (seqset_free) {
if (free_mode != FREE_MODE_ALL)
LOGWARNING("%s() seqset skipped", __func__);
else {
@ -2064,6 +2095,7 @@ static void dealloc_storage()
for (seq = 0; seq < SEQ_MAX; seq++)
FREENULL(seqnam[seq]);
}
}
LOGWARNING("%s() finished", __func__);
}
@ -3100,7 +3132,7 @@ setitemdata:
static enum cmd_values process_seq(MSGLINE *msgline)
{
bool dupall, dupcmd;
bool dupall = false, dupcmd = false;
char *st = NULL;
if (ignore_seq)
@ -3125,16 +3157,22 @@ static enum cmd_values process_seq(MSGLINE *msgline)
msgline->n_seqpid);
}
dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt,
msgline->n_seqpid, SEQALL, &(msgline->now),
&(msgline->cd), msgline->code,
msgline->seqentryflags, msgline->msg);
if (!ignore_seqall) {
dupall = update_seq(SEQ_ALL, msgline->n_seqall,
msgline->n_seqstt, msgline->n_seqpid,
SEQALL, &(msgline->now), &(msgline->cd),
msgline->code, msgline->seqentryflags,
msgline->msg);
}
dupcmd = update_seq(ckdb_cmds[msgline->which_cmds].seq,
msgline->n_seqcmd, msgline->n_seqstt,
msgline->n_seqpid, msgline->seqcmdnam,
&(msgline->now), &(msgline->cd), msgline->code,
msgline->seqentryflags, msgline->msg);
if (ignore_seqall)
dupall = dupcmd;
if (dupall != dupcmd) {
// Bad/corrupt data or a code bug
LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s "
@ -4212,7 +4250,7 @@ static char *shift_words[] =
"origami",
"paru",
"quinn",
"rika",
"rem",
"sena",
"tenshi",
"ur",
@ -5933,13 +5971,15 @@ static void *process_reload(__maybe_unused void *arg)
st = safe_text(msgline->msg));
FREENULL(st);
break;
case CMD_AUTH:
case CMD_ADDRAUTH:
case CMD_HEARTBEAT:
case CMD_POOLSTAT:
case CMD_USERSTAT:
case CMD_WORKERSTAT:
case CMD_BLOCK:
if (key_update)
break;
case CMD_AUTH:
case CMD_ADDRAUTH:
if (confirm_sharesummary)
break;
case CMD_SHARELOG:
@ -6153,7 +6193,7 @@ static bool logopen(char **filename, FILE **fp, bool *apipe)
/* If the reload start file is missing and -r was specified correctly:
* touch the filename reported in "Failed to open 'filename'",
* if ckdb aborts at the beginning of the reload, then start again */
static bool reload_from(tv_t *start)
static bool reload_from(tv_t *start, const tv_t *finish)
{
// proc_pt could exit after this returns
static pthread_t proc_pt;
@ -6265,8 +6305,10 @@ static bool reload_from(tv_t *start)
if (everyone_die)
break;
reload_timestamp.tv_sec += ROLL_S;
if (confirm_sharesummary && tv_newer(&confirm_finish, &reload_timestamp)) {
LOGWARNING("%s(): confirm range complete", __func__);
if (tv_newer(finish, &reload_timestamp)) {
tv_to_buf(&reload_timestamp, buf, sizeof(buf));
LOGWARNING("%s(): finish range (%s) exceeded",
__func__, buf);
break;
}
@ -6495,8 +6537,10 @@ static void *listener(void *arg)
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &cmder);
if (no_data_log == false)
create_pthread(&log_pt, logger, NULL);
if (!confirm_sharesummary)
create_pthread(&sock_pt, socketer, arg);
create_pthread(&summ_pt, summariser, NULL);
@ -6672,6 +6716,341 @@ sayonara:
return NULL;
}
static bool make_keysummaries()
{
K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL;
tv_t proc_lock_stt, proc_lock_got, proc_lock_fin, now;
bool ok = false;
K_RLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
while (wm_item) {
DATA_WORKMARKERS(workmarkers, wm_item);
if (!CURRENT(&(workmarkers->expirydate)))
break;
// find the oldest READY workinfoid
if (WMREADY(workmarkers->status))
wm_last = wm_item;
wm_item = prev_in_ktree(ctx);
}
K_RUNLOCK(workmarkers_free);
if (!wm_last)
return false;
DATA_WORKMARKERS(workmarkers, wm_last);
LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/"
"Stt %"PRId64"/%s/%s",
__func__, workmarkers->markerid, workmarkers->poolinstance,
workmarkers->workinfoidend, workmarkers->workinfoidstart,
workmarkers->description, workmarkers->status);
setnow(&now);
setnow(&proc_lock_stt);
K_KLONGWLOCK(process_pplns_free);
setnow(&proc_lock_got);
ok = sharesummaries_to_markersummaries(NULL, workmarkers, by_default,
(char *)__func__, inet_default,
&now, NULL);
K_WUNLOCK(process_pplns_free);
setnow(&proc_lock_fin);
LOGWARNING("%s() pplns lock time %.3fs+%.3fs",
__func__, tvdiff(&proc_lock_got, &proc_lock_stt),
tvdiff(&proc_lock_fin, &proc_lock_got));
return ok;
}
static void *keymarker(__maybe_unused void *arg)
{
pthread_detach(pthread_self());
bool ok = true;
LOCK_INIT("db_keymarker");
rename_proc("db_keymarker");
if (!everyone_die) {
LOGWARNING("%s() Start key processing...", __func__);
marker_using_data = true;
}
while (!everyone_die && ok) {
if (!everyone_die)
sleep(1);
if (!everyone_die)
ok = make_keysummaries();
}
marker_using_data = false;
// No unprocessed workmarkers, or an error
everyone_die = true;
return NULL;
}
static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin)
{
K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_prev;
pthread_t keymark_pt;
tv_t *start;
tv_t finish;
int counter;
bool status_failure = false;
/* Now that we know the workmarkers of interest,
* switch them from MARKER_PROCESSED to MARKER_READY
* and remove all after
* The markersummaries already exist
* We are generating the missing keysummaries */
K_WLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
while (wm_item) {
wm_prev = prev_in_ktree(ctx);
DATA_WORKMARKERS(workmarkers, wm_item);
if (CURRENT(&(workmarkers->expirydate))) {
if (workmarkers->workinfoidstart > wi_fin->workinfoid) {
remove_from_ktree(workmarkers_workinfoid_root,
wm_item);
remove_from_ktree(workmarkers_root, wm_item);
free_workmarkers_data(wm_item);
k_unlink_item(workmarkers_store, wm_item);
k_add_head(workmarkers_free, wm_item);
} else if (workmarkers->workinfoidstart >=
wi_stt->workinfoid) {
if (!WMPROCESSED(workmarkers->status)) {
status_failure = true;
LOGERR("%s() workmarker %"PRId64
" invalid status '%s' != %c",
__func__,
workmarkers->markerid,
workmarkers->status,
MARKER_PROCESSED);
} else {
// Not part of either tree key
STRNCPY(workmarkers->status, MARKER_READY_STR);
}
} else
break;
}
wm_item = wm_prev;
}
K_WUNLOCK(workmarkers_free);
if (status_failure) {
LOGERR("%s() Aborting ...", __func__);
return;
}
create_pthread(&keymark_pt, keymarker, NULL);
start = &(wi_stt->createdate);
if (start->tv_sec < DATE_BEGIN)
start = (tv_t *)&date_begin;
copy_tv(&finish, &(wi_fin->createdate));
// include the reload file after wi_fin
finish.tv_sec += ROLL_S;
reload_from(start, &finish);
// wait for all loaded data to be used
while (!everyone_die) {
K_RLOCK(breakqueue_free);
counter = reload_done_breakqueue_store->count +
reload_breakqueue_store->count + reload_processing;
K_RUNLOCK(breakqueue_free);
if (counter == 0)
break;
cksleep_ms(142);
}
while (!everyone_die)
cksleep_ms(142);
}
static void update_check(int64_t markerid_stt, int64_t markerid_fin)
{
K_ITEM *wm_stt_item, *wm_fin_item, *wi_stt_item, *wi_fin_item;
char buf[DATE_BUFSIZ+1], buf2[DATE_BUFSIZ+1];
WORKMARKERS *wm_stt = NULL, *wm_fin = NULL;
WORKINFO *wi_stt = NULL, *wi_fin = NULL;
tv_t up_stt, up_fin;
double min, sec;
K_RLOCK(workmarkers_free);
wm_stt_item = find_workmarkerid(markerid_stt, false, MARKER_PROCESSED);
wm_fin_item = find_workmarkerid(markerid_fin, false, MARKER_PROCESSED);
K_RUNLOCK(workmarkers_free);
if (!wm_stt_item || !wm_fin_item) {
if (!wm_stt_item) {
LOGERR("%s() unknown start markerid %"PRId64,
__func__, markerid_stt);
}
if (!wm_fin_item) {
LOGERR("%s() unknown finish markerid %"PRId64,
__func__, markerid_fin);
}
return;
}
DATA_WORKMARKERS(wm_stt, wm_stt_item);
DATA_WORKMARKERS(wm_fin, wm_fin_item);
key_wi_stt = wm_stt->workinfoidstart;
key_wi_fin = wm_fin->workinfoidend;
wi_stt_item = find_workinfo(key_wi_stt, NULL);
wi_fin_item = find_workinfo(key_wi_fin, NULL);
if (!wi_stt_item || !wi_fin_item) {
if (!wi_stt_item) {
LOGEMERG("%s() missing workinfoid data! %"PRId64
" for start markerid %"PRId64,
__func__, key_wi_stt, markerid_stt);
}
if (!wi_fin_item) {
LOGEMERG("%s() missing workinfoid data! %"PRId64
" for finish markerid %"PRId64,
__func__, key_wi_fin, markerid_fin);
}
return;
}
DATA_WORKINFO(wi_stt, wi_stt_item);
DATA_WORKINFO(wi_fin, wi_fin_item);
tv_to_buf(&(wi_stt->createdate), buf, sizeof(buf));
tv_to_buf(&(wi_fin->createdate), buf2, sizeof(buf2));
LOGWARNING("%s() processing from start markerid %"PRId64" %s to "
"finish markerid %"PRId64" %s",
__func__, markerid_stt, buf, markerid_fin, buf2);
setnow(&up_stt);
update_reload(wi_stt, wi_fin);
POOLINSTANCE_RESET_MSG("reload");
setnow(&up_fin);
sec = tvdiff(&up_fin, &up_stt);
min = floor(sec / 60.0);
sec -= min * 60.0;
LOGWARNING("update complete %.0fm %.3fs", min, sec);
}
static void update_keysummary()
{
int64_t markerid_stt, markerid_fin;
char *tmp, *minus;
tv_t db_stt, db_fin;
pthread_t break_pt;
double min, sec;
bool reloader;
int cpus, i;
// Simple value check to abort early
if (!key_range || !(*key_range)) {
LOGEMERG("%s() -K option can't be blank", __func__);
return;
}
switch(tolower(key_range[0])) {
case 'm':
tmp = strdup(key_range);
minus = strchr(tmp+1, '-');
if (!minus || minus == tmp+1) {
LOGEMERG("%s() invalid workmarker range '%s' "
"- must be %cNNN-MMM",
__func__, key_range, tolower(key_range[0]));
return;
}
*(minus++) = '\0';
markerid_stt = atoll(tmp+1);
if (markerid_stt <= 0) {
LOGEMERG("%s() invalid markerid start in '%s' "
"- must be >0",
__func__, key_range);
return;
}
markerid_fin = atoll(minus);
if (markerid_fin <= 0) {
LOGEMERG("%s() invalid markerid finish in '%s' "
"- must be >0",
__func__, key_range);
return;
}
if (markerid_fin < markerid_stt) {
LOGEMERG("%s() invalid markerid range in '%s' "
"- finish < start",
__func__, key_range);
return;
}
free(tmp);
break;
default:
LOGEMERG("%s() unknown key range '%c' in '%s'",
__func__, key_range[0], key_range);
return;
}
LOCK_INIT("dbk_updater");
rename_proc("dbk_updater");
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
if (breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1;
breakdown_threads = (int)(cpus / 3) ? : 1;
}
LOGWARNING("%s(): creating %d breaker threads ...",
__func__, breakdown_threads);
reloader = true;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &reloader);
alloc_storage();
setnow(&db_stt);
if (!getdata1() || everyone_die)
return;
db_users_complete = true;
if (!getdata2() || everyone_die)
return;
if (dbload_workinfoid_start != -1) {
LOGWARNING("WARNING: dbload starting at workinfoid %"PRId64,
dbload_workinfoid_start);
}
if (!getdata3() || everyone_die)
return;
POOLINSTANCE_RESET_MSG("dbload");
setnow(&db_fin);
sec = tvdiff(&db_fin, &db_stt);
min = floor(sec / 60.0);
sec -= min * 60.0;
LOGWARNING("dbload complete %.0fm %.3fs", min, sec);
db_load_complete = true;
update_check(markerid_stt, markerid_fin);
}
#if 0
/* TODO: This will be way faster traversing both trees simultaneously
* rather than traversing one and searching the other, then repeating
@ -7086,7 +7465,7 @@ static void confirm_reload()
free(filename);
}
if (!reload_from(&start)) {
if (!reload_from(&start, &date_eot)) {
LOGEMERG("%s() ABORTING from reload_from()", __func__);
return;
}
@ -7265,6 +7644,8 @@ static struct option long_options[] = {
// DON'T use when connected to ckpool
{ "ignore-seq", required_argument, 0, 'I' },
{ "killold", no_argument, 0, 'k' },
// Generate old keysummary records
{ "key", required_argument, 0, 'K' },
{ "loglevel", required_argument, 0, 'l' },
// marker = enable mark/workmarker/markersummary auto generation
{ "marker", no_argument, 0, 'm' },
@ -7321,8 +7702,12 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) {
switch(c) {
case '?':
case ':':
quit(1, "exiting");
break;
case 'a':
len = strlen(optarg);
if (len > MAX_ALERT_CMD)
@ -7419,6 +7804,10 @@ int main(int argc, char **argv)
case 'k':
ckp.killold = true;
break;
case 'K':
key_range = strdup(optarg);
key_update = true;
break;
case 'l':
ckp.loglevel = atoi(optarg);
if (ckp.loglevel < LOG_EMERG || ckp.loglevel > LOG_DEBUG) {
@ -7540,10 +7929,20 @@ int main(int argc, char **argv)
btc_auth = http_base64(buf);
bzero(buf, sizeof(buf));
if (confirm_sharesummary)
if (key_update) {
dbcode = "k";
no_data_log = true;
ignore_seqall = true;
exclusive_db = false;
} else {
if (confirm_sharesummary) {
dbcode = "y";
else
no_data_log = true;
ignore_seqall = true;
exclusive_db = false;
} else
dbcode = "";
}
if (!db_name)
db_name = "ckdb";
@ -7650,7 +8049,10 @@ int main(int argc, char **argv)
o_limits_max_lifetime = o_limits[i].lifetime;
}
if (confirm_sharesummary) {
if (key_update) {
update_keysummary();
everyone_die = true;
} else if (confirm_sharesummary) {
// TODO: add a system lock to stop running 2 at once?
confirm_summaries();
everyone_die = true;

10
src/ckdb.h

@ -11,6 +11,8 @@
#ifndef CKDB_H
#define CKDB_H
#pragma GCC diagnostic ignored "-Wtautological-compare"
#include "config.h"
#include <sys/ioctl.h>
@ -52,7 +54,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.206"
#define CKDB_VERSION DB_VERSION"-2.300"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -98,6 +100,7 @@ extern int switch_state;
extern bool genpayout_auto;
extern bool markersummary_auto;
extern bool exclusive_db;
enum free_modes {
FREE_MODE_ALL,
@ -299,6 +302,11 @@ extern const tv_t date_begin;
#define BTC_TO_D(_amt) ((double)((_amt) / 100000000.0))
// argv -K - don't run in ckdb mode, just update keysummaries
extern bool key_update;
extern int64_t key_wi_stt;
extern int64_t key_wi_fin;
// argv -y - don't run in ckdb mode, just confirm sharesummaries
extern bool confirm_sharesummary;

26
src/ckdb_cmd.c

@ -2528,6 +2528,10 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id,
bool igndup = false;
char *txn_tree;
// nothing needed by key_update is triggered by the workinfo data
if (key_update)
goto wiconf;
i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz);
if (!i_poolinstance)
return strdup(reply);
@ -2637,7 +2641,7 @@ wiconf:
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
if (reloading && !key_update && !confirm_sharesummary) {
/* ISDR (Ignored shares during reload)
* This will discard any shares older than the newest
* workinfoidend of any workmarker - including ready
@ -2656,6 +2660,11 @@ wiconf:
return NULL;
}
if (key_update) {
if (workinfoid < key_wi_stt || workinfoid > key_wi_fin)
goto sconf;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
@ -2753,6 +2762,10 @@ sconf:
K_ITEM *i_error, *i_secondaryuserid;
bool ok;
// not summarised in keysummaries
if (key_update)
goto wiconf;
i_username = require_name(trf_root, "username", 1, NULL, reply, siz);
if (!i_username)
return strdup(reply);
@ -2834,15 +2847,18 @@ seconf:
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (reloading && !confirm_sharesummary) {
if (reloading && !key_update && !confirm_sharesummary) {
// This excludes any already summarised
if (workinfoid <= dbstatus.newest_workmarker_workinfoid)
return NULL;
}
if (confirm_sharesummary) {
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
if (key_update) {
if (workinfoid < key_wi_stt || workinfoid > key_wi_fin)
goto awconf;
}
if (confirm_sharesummary) {
if (workinfoid < confirm_first_workinfoid ||
workinfoid > confirm_last_workinfoid)
goto awconf;
@ -2856,7 +2872,7 @@ seconf:
return strdup("failed.DATA");
} else {
/* Don't slow down the reload - do them later */
if (!reloading) {
if (!reloading || key_update) {
// Aging is a queued item thus the reply is ignored
auto_age_older(workinfoid,
transfer_data(i_poolinstance),

149
src/ckdb_data.c

@ -1611,16 +1611,17 @@ K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *workername,
item = find_workers(false, userid, workername);
if (item) {
if (!confirm_sharesummary && update) {
if (!key_update && !confirm_sharesummary && update) {
workers_update(conn, item, diffdef, idlenotificationenabled,
idlenotificationtime, by, code, inet, cd,
trf_root, true);
}
} else {
if (confirm_sharesummary) {
// Shouldn't be possible since the sharesummary is already aged
LOGERR("%s() %"PRId64"/%s workername not found during confirm",
__func__, userid, workername);
if (key_update || confirm_sharesummary) {
// Shouldn't be possible with old data
LOGERR("%s() %"PRId64"/%s workername not found during %s",
__func__, userid, workername,
key_update ? "keyupdate" : "confirm" );
return NULL;
}
@ -2133,14 +2134,74 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx)
return item;
}
#define DISCARD_ALL -1
// userid = DISCARD_ALL will dump all shares for the given workinfoid
static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
int64_t *diff_tot, bool skipupdate,
int64_t workinfoid, int64_t userid, char *workername)
{
K_ITEM s_look, *s_item, *tmp_item;
SHARES lookshares, *shares;
K_TREE_CTX s_ctx[1];
char error[1024];
error[0] = '\0';
INIT_SHARES(&s_look);
lookshares.workinfoid = workinfoid;
lookshares.userid = userid;
strcpy(lookshares.workername, workername);
DATE_ZERO(&(lookshares.createdate));
s_look.data = (void *)(&lookshares);
K_WLOCK(shares_free);
s_item = find_after_in_ktree(shares_root, &s_look, s_ctx);
while (s_item) {
DATA_SHARES(shares, s_item);
if (shares->workinfoid != workinfoid)
break;
if (userid != DISCARD_ALL) {
if (shares->userid != userid ||
strcmp(shares->workername, workername) != 0)
break;
}
(*shares_tot)++;
if (shares->errn == SE_NONE)
(*diff_tot) += shares->diff;
tmp_item = next_in_ktree(s_ctx);
remove_from_ktree(shares_root, s_item);
k_unlink_item(shares_store, s_item);
if (reloading && skipupdate)
(*shares_dumped)++;
if (reloading && skipupdate && !error[0]) {
snprintf(error, sizeof(error),
"reload found aged share: %"PRId64
"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
k_add_head(shares_free, s_item);
s_item = tmp_item;
}
K_WUNLOCK(shares_free);
if (error[0])
LOGERR("%s(): %s", __func__, error);
}
// Duplicates during a reload are set to not show messages
bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item;
K_ITEM ks_look, *ks_item, *wm_item, *tmp_item;
K_TREE_CTX ss_ctx[1], s_ctx[1], ks_ctx[1];
K_ITEM *wi_item, ss_look, *ss_item;
K_ITEM ks_look, *ks_item, *wm_item;
K_TREE_CTX ss_ctx[1], ks_ctx[1];
char cd_buf[DATE_BUFSIZ];
int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped;
int64_t ks_tot, ks_already, ks_failed;
@ -2148,9 +2209,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
KEYSHARESUMMARY lookkeysharesummary, *keysharesummary;
SHARESUMMARY looksharesummary, *sharesummary;
WORKINFO *workinfo;
SHARES lookshares, *shares;
bool ok = false, ksok = false, skipupdate;
char error[1024];
bool ok = false, ksok = false, skipupdate = false;
LOGDEBUG("%s(): age", __func__);
@ -2193,17 +2252,20 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
goto bye;
}
ok = true;
ss_tot = ss_already = ss_failed = shares_tot = shares_dumped =
diff_tot = 0;
if (key_update)
goto skip_ss;
INIT_SHARESUMMARY(&ss_look);
INIT_SHARES(&s_look);
// Find the first matching sharesummary
looksharesummary.workinfoid = workinfoid;
looksharesummary.userid = -1;
looksharesummary.workername = EMPTY;
ok = true;
ss_tot = ss_already = ss_failed = shares_tot = shares_dumped =
diff_tot = 0;
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx);
@ -2211,7 +2273,6 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid == workinfoid) {
ss_tot++;
error[0] = '\0';
skipupdate = false;
/* Reloading during a confirm will not have any old data
* so finding an aged sharesummary here is an error
@ -2251,50 +2312,14 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
}
// Discard the shares either way
lookshares.workinfoid = workinfoid;
lookshares.userid = sharesummary->userid;
strcpy(lookshares.workername, sharesummary->workername);
DATE_ZERO(&(lookshares.createdate));
discard_shares(&shares_tot, &shares_dumped, &diff_tot, skipupdate,
workinfoid, sharesummary->userid,
sharesummary->workername);
s_look.data = (void *)(&lookshares);
K_WLOCK(shares_free);
s_item = find_after_in_ktree(shares_root, &s_look, s_ctx);
while (s_item) {
DATA_SHARES(shares, s_item);
if (shares->workinfoid != workinfoid ||
shares->userid != lookshares.userid ||
strcmp(shares->workername, lookshares.workername) != 0)
break;
shares_tot++;
if (shares->errn == SE_NONE)
diff_tot += shares->diff;
tmp_item = next_in_ktree(s_ctx);
remove_from_ktree(shares_root, s_item);
k_unlink_item(shares_store, s_item);
if (reloading && skipupdate)
shares_dumped++;
if (reloading && skipupdate && !error[0]) {
snprintf(error, sizeof(error),
"reload found aged share: %"PRId64
"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
k_add_head(shares_free, s_item);
s_item = tmp_item;
}
K_WUNLOCK(shares_free);
K_RLOCK(sharesummary_free);
ss_item = next_in_ktree(ss_ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
if (error[0])
LOGERR("%s(): %s", __func__, error);
}
if (ss_already || ss_failed || shares_dumped) {
@ -2311,6 +2336,8 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
}
}
skip_ss:
INIT_KEYSHARESUMMARY(&ks_look);
// Find the first matching keysharesummary
@ -2331,7 +2358,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
/* Reloading during a confirm will not have any old data
* so finding an aged keysharesummary here is an error
* N.B. this can only happen with (very) old reload files */
if (reloading) {
if (reloading && !key_update) {
if (keysharesummary->complete[0] == SUMMARY_COMPLETE) {
ks_already++;
skipupdate = true;
@ -2356,15 +2383,19 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
}
}
/* All shares should have been discarded during sharesummary
* processing above */
K_RLOCK(keysharesummary_free);
ks_item = next_in_ktree(ks_ctx);
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
}
/* All shares should have been discarded during sharesummary
* processing above except during a key_update */
if (key_update) {
discard_shares(&shares_tot, &shares_dumped, &diff_tot, skipupdate,
workinfoid, -1, EMPTY);
}
if (ks_already) {
LOGNOTICE("%s(): Keysummary aging of %"PRId64"/%s "
"kstotal=%"PRId64" already=%"PRId64" failed=%"PRId64,

38
src/ckdb_dbio.c

@ -3219,6 +3219,7 @@ bool workinfo_fill(PGconn *conn)
return false;
}
if (exclusive_db) {
res = PQexec(conn, "Lock table workinfo in access exclusive mode", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
@ -3226,6 +3227,7 @@ bool workinfo_fill(PGconn *conn)
PGLOGERR("Lock", rescode, conn);
goto flail;
}
}
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
@ -3456,7 +3458,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
return false;
}
if (reloading && !confirm_sharesummary) {
if (reloading && !key_update && !confirm_sharesummary) {
// We only need to know if the workmarker is processed
K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(shares->workinfoid, false,
@ -3494,7 +3496,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
}
}
if (!confirm_sharesummary) {
if (!key_update && !confirm_sharesummary) {
workerstatus_update(NULL, shares, NULL);
K_WLOCK(userinfo_free);
userinfo_update(shares, NULL, NULL, false);
@ -3949,6 +3951,7 @@ bool shares_fill(PGconn *conn)
return false;
}
if (exclusive_db) {
res = PQexec(conn, "Lock table shares in access exclusive mode", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
@ -3956,6 +3959,7 @@ bool shares_fill(PGconn *conn)
PGLOGERR("Lock", rescode, conn);
goto flail;
}
}
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
@ -4157,6 +4161,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
return false;
}
// key_update skips shareerrors
if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is processed
K_RLOCK(workmarkers_free);
@ -4490,7 +4495,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_ITEM *kss_item, *kss_prev, kss_look;
K_ITEM *ms_item, ms_look, *p_ss_item, *p_ms_item;
K_ITEM *ks_item, ks_look;
bool ok = false, conned = false;
bool ok = false, conned = false, nonblank = false;
int64_t diffacc = 0, shareacc = 0;
int64_t kdiffacc = 0, kshareacc = 0;
char *reason = NULL;
@ -4520,7 +4525,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_STORE *new_keysummary_store = k_new_store(keysummary_free);
/* Use the master size for these local trees since
* they're large and doesn't get created often */
* they're large and don't get created often */
K_TREE *ms_root = new_ktree_local(sshortname, cmp_markersummary,
markersummary_free);
K_TREE *ks_root = new_ktree_local(kshortname, cmp_keysummary,
@ -4536,6 +4541,9 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
goto flail;
}
if (key_update)
goto dokey;
setnow(&add_stt);
/* Check there aren't already any matching markersummaries
* and assume keysummaries are the same */
@ -4660,6 +4668,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
}
setnow(&add_fin);
dokey:
setnow(&kadd_stt);
INIT_KEYSUMMARY(&ks_look);
@ -4684,6 +4694,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
DATA_KEYSHARESUMMARY(keysharesummary, kss_item);
if (keysharesummary->workinfoid < workmarkers->workinfoidstart)
break;
if (keysharesummary->key[0])
nonblank = true;
K_RLOCK(keysharesummary_free);
kss_prev = prev_in_ktree(kss_ctx);
K_RUNLOCK(keysharesummary_free);
@ -4801,6 +4813,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
ks_item = ks_item->next;
}
if (!key_update) {
ok = workmarkers_process(conn, true, true,
workmarkers->markerid,
workmarkers->poolinstance,
@ -4809,6 +4822,10 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
workmarkers->description,
MARKER_PROCESSED_STR,
by, code, inet, cd, trf_root);
} else {
// Not part of either tree key
STRNCPY(workmarkers->status, MARKER_PROCESSED_STR);
}
setnow(&kdb_fin);
rollback:
if (ok)
@ -4959,7 +4976,7 @@ flail:
"k(2*%"PRId64"%s/2*%"PRId64"%s) for workmarkers "
"%"PRId64"/%s/End %"PRId64"/Stt %"PRId64"/%s/%s "
"add=%.3fs kadd=%.3fs db=%.3fs kdb=%.3fs "
"lck=%.3f+%.3fs",
"lck=%.3f+%.3fs%s",
shortname, ms_count, ks_count, ss_count, kss_count,
shareacc, diffacc,
kshareacc >> 1, (kshareacc & 1) ? ".5" : "",
@ -4973,7 +4990,8 @@ flail:
tvdiff(&db_fin, &db_stt),
tvdiff(&kdb_fin, &kdb_stt),
tvdiff(&lck_got, &lck_stt),
tvdiff(&lck_fin, &lck_got));
tvdiff(&lck_fin, &lck_got),
nonblank ? EMPTY : " ONLY BLANK KEYS");
// This should never happen
if (kshareacc != (shareacc << 1) || kdiffacc != (diffacc << 1)) {
@ -6489,6 +6507,7 @@ bool miningpayouts_fill(PGconn *conn)
return false;
}
if (exclusive_db) {
res = PQexec(conn, "Lock table miningpayouts in access exclusive mode", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
@ -6496,6 +6515,7 @@ bool miningpayouts_fill(PGconn *conn)
PGLOGERR("Lock", rescode, conn);
goto flail;
}
}
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
@ -8264,6 +8284,7 @@ bool markersummary_fill(PGconn *conn)
return false;
}
if (exclusive_db) {
res = PQexec(conn, "Lock table markersummary in access exclusive mode", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
@ -8271,6 +8292,7 @@ bool markersummary_fill(PGconn *conn)
PGLOGERR("Lock", rescode, conn);
goto flail;
}
}
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
@ -8564,6 +8586,10 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code,
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
/* Don't fail on a duplicate during key_update
* TODO: should only be on a duplicate ... */
if (key_update)
ok = true;
goto unparam;
}

Loading…
Cancel
Save