From f01d89234457dcc2a126f72a76a21cd6f23d1711 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sat, 6 Aug 2016 21:15:27 +1000 Subject: [PATCH] ckdb - keysummary generation initial coding --- src/ckdb.c | 610 +++++++++++++++++++++++++++++++++++++++--------- src/ckdb.h | 10 +- src/ckdb_cmd.c | 26 ++- src/ckdb_data.c | 149 +++++++----- src/ckdb_dbio.c | 102 +++++--- 5 files changed, 690 insertions(+), 207 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 5410a490..e429753e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -144,8 +144,10 @@ static char *status_chars = "|/-\\"; static char *restorefrom; static bool ignore_seq = false; +static bool ignore_seqall = false; bool genpayout_auto; bool markersummary_auto; +bool exclusive_db = true; enum free_modes free_mode = FREE_MODE_FAST; @@ -209,6 +211,24 @@ const tv_t default_expiry = { DEFAULT_EXPIRY, 0L }; const tv_t date_eot = { DATE_S_EOT, DATE_uS_EOT }; const tv_t date_begin = { DATE_BEGIN, 0L }; +// argv -K - don't run in ckdb mode, just update keysummary +bool key_update; +/* Requires all workmarkers and workinfo present in the database + * they are all loaded during startup, but you can limit the + * workinfo loaded with -w - if any required are missing, + * it will stop with an error saying what was missing + * keysummary records are not loaded, but SQL stores of duplicate + * keysummary records are reported and ignored + * + * Valid options are: + * mNNN-MMM workmarker markerid range to process + * this determines the CCL file range loaded + * and the workinfoid range processed + */ +static char *key_range; +int64_t key_wi_stt; +int64_t key_wi_fin; + // argv -y - don't run in ckdb mode, just confirm sharesummaries bool confirm_sharesummary; @@ -252,7 +272,6 @@ int64_t confirm_last_workinfoid; /* Stop the reload 11min after the 'last' workinfoid+1 appears * ckpool uses 10min - but add 1min to be sure */ #define WORKINFO_AGE 660 -static tv_t confirm_finish; static tv_t reload_timestamp; @@ -758,6 +777,7 @@ K_STORE *userinfo_store; static char logname_db[512]; static char logname_io[512]; static char *dbcode; +static bool no_data_log = false; // low spec version of rotating_log() - no locking static bool rotating_log_nolock(char *msg, char *prefix) @@ -787,6 +807,9 @@ static void log_queue_message(char *msg, bool db) K_ITEM *lq_item; LOGQUEUE *lq; + if (no_data_log) + return; + K_WLOCK(logqueue_free); lq_item = k_unlink_head(logqueue_free); DATA_LOGQUEUE(lq, lq_item); @@ -1202,7 +1225,7 @@ static bool getdata3() PGconn *conn = dbconnect(); bool ok = true; - if (!confirm_sharesummary) { + if (!key_update && !confirm_sharesummary) { if (!(ok = paymentaddresses_fill(conn)) || everyone_die) goto sukamudai; /* FYI must be after blocks */ @@ -1222,21 +1245,25 @@ static bool getdata3() /* must be after workinfo */ if (!(ok = workmarkers_fill(conn)) || everyone_die) goto sukamudai; - if (!confirm_sharesummary) { + if (!key_update && !confirm_sharesummary) { /* must be after workmarkers */ if (!(ok = payouts_fill(conn)) || everyone_die) goto sukamudai; } PQfinish(conn); conn = dbconnect(); - if (!(ok = markersummary_fill(conn)) || everyone_die) - goto sukamudai; + if (!key_update) { + if (!(ok = markersummary_fill(conn)) || everyone_die) + goto sukamudai; + } PQfinish(conn); conn = dbconnect(); - if (!(ok = shares_fill(conn)) || everyone_die) - goto sukamudai; - if (!confirm_sharesummary && !everyone_die) - ok = poolstats_fill(conn); + if (!key_update) { + if (!(ok = shares_fill(conn)) || everyone_die) + goto sukamudai; + if (!confirm_sharesummary && !everyone_die) + ok = poolstats_fill(conn); + } sukamudai: @@ -1244,7 +1271,7 @@ sukamudai: return ok; } -static bool reload_from(tv_t *start); +static bool reload_from(tv_t *start, const tv_t *finish); static bool reload() { @@ -1297,7 +1324,7 @@ static bool reload() } free(filename); } - return reload_from(&start); + return reload_from(&start, &date_eot); } /* Open the file in path, check if there is a pid in there that still exists @@ -1865,10 +1892,12 @@ static void dealloc_storage() return; } - LOGWARNING("%s() logqueue ...", __func__); - - FREE_LISTS(logqueue); + if (logqueue_free) { + LOGWARNING("%s() logqueue ...", __func__); + FREE_LISTS(logqueue); + } + LOGWARNING("%s() user/marks ...", __func__); FREE_ALL(userinfo); FREE_TREE(marks); @@ -1933,71 +1962,72 @@ static void dealloc_storage() FREE_ALL(miningpayouts); FREE_ALL(blocks); - LOGWARNING("%s() sharesummary ...", __func__); - - FREE_TREE(sharesummary_pool); - k_list_transfer_to_tail_nolock(sharesummary_pool_store, - sharesummary_store); - FREE_STORE(sharesummary_pool); - FREE_TREE(sharesummary_workinfoid); - FREE_TREE(sharesummary); - FREE_STORE_DATA(sharesummary); - FREE_LIST_DATA(sharesummary); - - LOGWARNING("%s() shares ...", __func__); - - if (shareerrors_early_store->count > 0) { - LOGERR("%s() *** shareerrors_early count %d ***", - __func__, shareerrors_early_store->count); - s_item = STORE_HEAD_NOLOCK(shareerrors_early_store); - while (s_item) { - DATA_SHAREERRORS(shareerrors, s_item); - LOGERR("%s(): %"PRId64"/%s/%"PRId32"/%s/%ld,%ld", - __func__, - shareerrors->workinfoid, - st = safe_text_nonull(shareerrors->workername), - shareerrors->errn, - shareerrors->error, - shareerrors->createdate.tv_sec, - shareerrors->createdate.tv_usec); - FREENULL(st); - s_item = s_item->next; - } - } - FREE_TREE(shareerrors_early); - FREE_STORE(shareerrors_early); - FREE_ALL(shareerrors); - - FREE_TREE(shares_hi); - FREE_TREE(shares_db); - FREE_STORE(shares_hi); - if (shares_early_store->count > 0) { - LOGERR("%s() *** shares_early count %d ***", - __func__, shares_early_store->count); - s_item = STORE_HEAD_NOLOCK(shares_early_store); - while (s_item) { - DATA_SHARES(shares, s_item); - LOGERR("%s(): %"PRId64"/%s/%s/%"PRId32"/%ld,%ld", - __func__, - shares->workinfoid, - st = safe_text_nonull(shares->workername), - shares->nonce, - shares->errn, - shares->createdate.tv_sec, - shares->createdate.tv_usec); - FREENULL(st); - s_item = s_item->next; + if (sharesummary_free) { + LOGWARNING("%s() sharesummary ...", __func__); + FREE_TREE(sharesummary_pool); + k_list_transfer_to_tail_nolock(sharesummary_pool_store, + sharesummary_store); + FREE_STORE(sharesummary_pool); + FREE_TREE(sharesummary_workinfoid); + FREE_TREE(sharesummary); + FREE_STORE_DATA(sharesummary); + FREE_LIST_DATA(sharesummary); + } + + if (shares_free) { + LOGWARNING("%s() shares ...", __func__); + if (shareerrors_early_store->count > 0) { + LOGERR("%s() *** shareerrors_early count %d ***", + __func__, shareerrors_early_store->count); + s_item = STORE_HEAD_NOLOCK(shareerrors_early_store); + while (s_item) { + DATA_SHAREERRORS(shareerrors, s_item); + LOGERR("%s(): %"PRId64"/%s/%"PRId32"/%s/%ld,%ld", + __func__, + shareerrors->workinfoid, + st = safe_text_nonull(shareerrors->workername), + shareerrors->errn, + shareerrors->error, + shareerrors->createdate.tv_sec, + shareerrors->createdate.tv_usec); + FREENULL(st); + s_item = s_item->next; + } } + FREE_TREE(shareerrors_early); + FREE_STORE(shareerrors_early); + FREE_ALL(shareerrors); + + FREE_TREE(shares_hi); + FREE_TREE(shares_db); + FREE_STORE(shares_hi); + if (shares_early_store->count > 0) { + LOGERR("%s() *** shares_early count %d ***", + __func__, shares_early_store->count); + s_item = STORE_HEAD_NOLOCK(shares_early_store); + while (s_item) { + DATA_SHARES(shares, s_item); + LOGERR("%s(): %"PRId64"/%s/%s/%"PRId32"/%ld,%ld", + __func__, + shares->workinfoid, + st = safe_text_nonull(shares->workername), + shares->nonce, + shares->errn, + shares->createdate.tv_sec, + shares->createdate.tv_usec); + FREENULL(st); + s_item = s_item->next; + } + } + FREE_TREE(shares_early); + FREE_STORE(shares_early); + FREE_ALL(shares); } - FREE_TREE(shares_early); - FREE_STORE(shares_early); - FREE_ALL(shares); if (free_mode != FREE_MODE_ALL) LOGWARNING("%s() workinfo skipped", __func__); else { LOGWARNING("%s() workinfo ...", __func__); - FREE_TREE(workinfo_height); FREE_TREE(workinfo); FREE_STORE_DATA(workinfo); @@ -2048,21 +2078,23 @@ static void dealloc_storage() FREE_LIST(breakqueue); FREE_LISTS(msgline); - if (free_mode != FREE_MODE_ALL) - LOGWARNING("%s() seqset skipped", __func__); - else { - LOGWARNING("%s() seqset ...", __func__); - sequence_report(false); + if (seqset_free) { + if (free_mode != FREE_MODE_ALL) + LOGWARNING("%s() seqset skipped", __func__); + else { + LOGWARNING("%s() seqset ...", __func__); + sequence_report(false); - FREE_STORE_DATA(seqset); - FREE_LIST_DATA(seqset); - FREE_LISTS(seqset); + FREE_STORE_DATA(seqset); + FREE_LIST_DATA(seqset); + FREE_LISTS(seqset); - // Must be after seqset - FREE_LIST(seqtrans); + // Must be after seqset + FREE_LIST(seqtrans); - for (seq = 0; seq < SEQ_MAX; seq++) - FREENULL(seqnam[seq]); + for (seq = 0; seq < SEQ_MAX; seq++) + FREENULL(seqnam[seq]); + } } LOGWARNING("%s() finished", __func__); @@ -3100,7 +3132,7 @@ setitemdata: static enum cmd_values process_seq(MSGLINE *msgline) { - bool dupall, dupcmd; + bool dupall = false, dupcmd = false; char *st = NULL; if (ignore_seq) @@ -3125,16 +3157,22 @@ static enum cmd_values process_seq(MSGLINE *msgline) msgline->n_seqpid); } - dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, - msgline->n_seqpid, SEQALL, &(msgline->now), - &(msgline->cd), msgline->code, - msgline->seqentryflags, msgline->msg); + if (!ignore_seqall) { + dupall = update_seq(SEQ_ALL, msgline->n_seqall, + msgline->n_seqstt, msgline->n_seqpid, + SEQALL, &(msgline->now), &(msgline->cd), + msgline->code, msgline->seqentryflags, + msgline->msg); + } dupcmd = update_seq(ckdb_cmds[msgline->which_cmds].seq, msgline->n_seqcmd, msgline->n_seqstt, msgline->n_seqpid, msgline->seqcmdnam, &(msgline->now), &(msgline->cd), msgline->code, msgline->seqentryflags, msgline->msg); + if (ignore_seqall) + dupall = dupcmd; + if (dupall != dupcmd) { // Bad/corrupt data or a code bug LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " @@ -4212,7 +4250,7 @@ static char *shift_words[] = "origami", "paru", "quinn", - "rika", + "rem", "sena", "tenshi", "ur", @@ -5933,13 +5971,15 @@ static void *process_reload(__maybe_unused void *arg) st = safe_text(msgline->msg)); FREENULL(st); break; - case CMD_AUTH: - case CMD_ADDRAUTH: case CMD_HEARTBEAT: case CMD_POOLSTAT: case CMD_USERSTAT: case CMD_WORKERSTAT: case CMD_BLOCK: + if (key_update) + break; + case CMD_AUTH: + case CMD_ADDRAUTH: if (confirm_sharesummary) break; case CMD_SHARELOG: @@ -6153,7 +6193,7 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) /* If the reload start file is missing and -r was specified correctly: * touch the filename reported in "Failed to open 'filename'", * if ckdb aborts at the beginning of the reload, then start again */ -static bool reload_from(tv_t *start) +static bool reload_from(tv_t *start, const tv_t *finish) { // proc_pt could exit after this returns static pthread_t proc_pt; @@ -6265,8 +6305,10 @@ static bool reload_from(tv_t *start) if (everyone_die) break; reload_timestamp.tv_sec += ROLL_S; - if (confirm_sharesummary && tv_newer(&confirm_finish, &reload_timestamp)) { - LOGWARNING("%s(): confirm range complete", __func__); + if (tv_newer(finish, &reload_timestamp)) { + tv_to_buf(&reload_timestamp, buf, sizeof(buf)); + LOGWARNING("%s(): finish range (%s) exceeded", + __func__, buf); break; } @@ -6495,9 +6537,11 @@ static void *listener(void *arg) for (i = 0; i < breakdown_threads; i++) create_pthread(&break_pt, breaker, &cmder); - create_pthread(&log_pt, logger, NULL); + if (no_data_log == false) + create_pthread(&log_pt, logger, NULL); - create_pthread(&sock_pt, socketer, arg); + if (!confirm_sharesummary) + create_pthread(&sock_pt, socketer, arg); create_pthread(&summ_pt, summariser, NULL); @@ -6672,6 +6716,341 @@ sayonara: return NULL; } +static bool make_keysummaries() +{ + K_TREE_CTX ctx[1]; + WORKMARKERS *workmarkers; + K_ITEM *wm_item, *wm_last = NULL; + tv_t proc_lock_stt, proc_lock_got, proc_lock_fin, now; + bool ok = false; + + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); + while (wm_item) { + DATA_WORKMARKERS(workmarkers, wm_item); + if (!CURRENT(&(workmarkers->expirydate))) + break; + // find the oldest READY workinfoid + if (WMREADY(workmarkers->status)) + wm_last = wm_item; + wm_item = prev_in_ktree(ctx); + } + K_RUNLOCK(workmarkers_free); + + if (!wm_last) + return false; + + DATA_WORKMARKERS(workmarkers, wm_last); + + LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/" + "Stt %"PRId64"/%s/%s", + __func__, workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, workmarkers->workinfoidstart, + workmarkers->description, workmarkers->status); + + setnow(&now); + setnow(&proc_lock_stt); + K_KLONGWLOCK(process_pplns_free); + setnow(&proc_lock_got); + ok = sharesummaries_to_markersummaries(NULL, workmarkers, by_default, + (char *)__func__, inet_default, + &now, NULL); + K_WUNLOCK(process_pplns_free); + setnow(&proc_lock_fin); + LOGWARNING("%s() pplns lock time %.3fs+%.3fs", + __func__, tvdiff(&proc_lock_got, &proc_lock_stt), + tvdiff(&proc_lock_fin, &proc_lock_got)); + + return ok; +} + +static void *keymarker(__maybe_unused void *arg) +{ + pthread_detach(pthread_self()); + bool ok = true; + + LOCK_INIT("db_keymarker"); + rename_proc("db_keymarker"); + + if (!everyone_die) { + LOGWARNING("%s() Start key processing...", __func__); + marker_using_data = true; + } + + while (!everyone_die && ok) { + if (!everyone_die) + sleep(1); + if (!everyone_die) + ok = make_keysummaries(); + } + + marker_using_data = false; + + // No unprocessed workmarkers, or an error + everyone_die = true; + + return NULL; +} + +static void update_reload(WORKINFO *wi_stt, WORKINFO *wi_fin) +{ + K_TREE_CTX ctx[1]; + WORKMARKERS *workmarkers; + K_ITEM *wm_item, *wm_prev; + pthread_t keymark_pt; + tv_t *start; + tv_t finish; + int counter; + bool status_failure = false; + + /* Now that we know the workmarkers of interest, + * switch them from MARKER_PROCESSED to MARKER_READY + * and remove all after + * The markersummaries already exist + * We are generating the missing keysummaries */ + K_WLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); + while (wm_item) { + wm_prev = prev_in_ktree(ctx); + DATA_WORKMARKERS(workmarkers, wm_item); + if (CURRENT(&(workmarkers->expirydate))) { + if (workmarkers->workinfoidstart > wi_fin->workinfoid) { + remove_from_ktree(workmarkers_workinfoid_root, + wm_item); + remove_from_ktree(workmarkers_root, wm_item); + free_workmarkers_data(wm_item); + k_unlink_item(workmarkers_store, wm_item); + k_add_head(workmarkers_free, wm_item); + } else if (workmarkers->workinfoidstart >= + wi_stt->workinfoid) { + if (!WMPROCESSED(workmarkers->status)) { + status_failure = true; + LOGERR("%s() workmarker %"PRId64 + " invalid status '%s' != %c", + __func__, + workmarkers->markerid, + workmarkers->status, + MARKER_PROCESSED); + } else { + // Not part of either tree key + STRNCPY(workmarkers->status, MARKER_READY_STR); + } + } else + break; + } + wm_item = wm_prev; + } + K_WUNLOCK(workmarkers_free); + + if (status_failure) { + LOGERR("%s() Aborting ...", __func__); + return; + } + + create_pthread(&keymark_pt, keymarker, NULL); + + start = &(wi_stt->createdate); + if (start->tv_sec < DATE_BEGIN) + start = (tv_t *)&date_begin; + + copy_tv(&finish, &(wi_fin->createdate)); + // include the reload file after wi_fin + finish.tv_sec += ROLL_S; + + reload_from(start, &finish); + + // wait for all loaded data to be used + while (!everyone_die) { + K_RLOCK(breakqueue_free); + counter = reload_done_breakqueue_store->count + + reload_breakqueue_store->count + reload_processing; + K_RUNLOCK(breakqueue_free); + if (counter == 0) + break; + cksleep_ms(142); + } + + while (!everyone_die) + cksleep_ms(142); +} + +static void update_check(int64_t markerid_stt, int64_t markerid_fin) +{ + K_ITEM *wm_stt_item, *wm_fin_item, *wi_stt_item, *wi_fin_item; + char buf[DATE_BUFSIZ+1], buf2[DATE_BUFSIZ+1]; + WORKMARKERS *wm_stt = NULL, *wm_fin = NULL; + WORKINFO *wi_stt = NULL, *wi_fin = NULL; + tv_t up_stt, up_fin; + double min, sec; + + K_RLOCK(workmarkers_free); + wm_stt_item = find_workmarkerid(markerid_stt, false, MARKER_PROCESSED); + wm_fin_item = find_workmarkerid(markerid_fin, false, MARKER_PROCESSED); + K_RUNLOCK(workmarkers_free); + + if (!wm_stt_item || !wm_fin_item) { + if (!wm_stt_item) { + LOGERR("%s() unknown start markerid %"PRId64, + __func__, markerid_stt); + } + if (!wm_fin_item) { + LOGERR("%s() unknown finish markerid %"PRId64, + __func__, markerid_fin); + } + return; + } + + DATA_WORKMARKERS(wm_stt, wm_stt_item); + DATA_WORKMARKERS(wm_fin, wm_fin_item); + + key_wi_stt = wm_stt->workinfoidstart; + key_wi_fin = wm_fin->workinfoidend; + + wi_stt_item = find_workinfo(key_wi_stt, NULL); + wi_fin_item = find_workinfo(key_wi_fin, NULL); + + if (!wi_stt_item || !wi_fin_item) { + if (!wi_stt_item) { + LOGEMERG("%s() missing workinfoid data! %"PRId64 + " for start markerid %"PRId64, + __func__, key_wi_stt, markerid_stt); + } + if (!wi_fin_item) { + LOGEMERG("%s() missing workinfoid data! %"PRId64 + " for finish markerid %"PRId64, + __func__, key_wi_fin, markerid_fin); + } + return; + } + + DATA_WORKINFO(wi_stt, wi_stt_item); + DATA_WORKINFO(wi_fin, wi_fin_item); + + tv_to_buf(&(wi_stt->createdate), buf, sizeof(buf)); + tv_to_buf(&(wi_fin->createdate), buf2, sizeof(buf2)); + LOGWARNING("%s() processing from start markerid %"PRId64" %s to " + "finish markerid %"PRId64" %s", + __func__, markerid_stt, buf, markerid_fin, buf2); + + setnow(&up_stt); + + update_reload(wi_stt, wi_fin); + + POOLINSTANCE_RESET_MSG("reload"); + setnow(&up_fin); + sec = tvdiff(&up_fin, &up_stt); + min = floor(sec / 60.0); + sec -= min * 60.0; + LOGWARNING("update complete %.0fm %.3fs", min, sec); +} + +static void update_keysummary() +{ + int64_t markerid_stt, markerid_fin; + char *tmp, *minus; + tv_t db_stt, db_fin; + pthread_t break_pt; + double min, sec; + bool reloader; + int cpus, i; + + // Simple value check to abort early + if (!key_range || !(*key_range)) { + LOGEMERG("%s() -K option can't be blank", __func__); + return; + } + + switch(tolower(key_range[0])) { + case 'm': + tmp = strdup(key_range); + minus = strchr(tmp+1, '-'); + if (!minus || minus == tmp+1) { + LOGEMERG("%s() invalid workmarker range '%s' " + "- must be %cNNN-MMM", + __func__, key_range, tolower(key_range[0])); + return; + } + *(minus++) = '\0'; + markerid_stt = atoll(tmp+1); + if (markerid_stt <= 0) { + LOGEMERG("%s() invalid markerid start in '%s' " + "- must be >0", + __func__, key_range); + return; + } + markerid_fin = atoll(minus); + if (markerid_fin <= 0) { + LOGEMERG("%s() invalid markerid finish in '%s' " + "- must be >0", + __func__, key_range); + return; + } + if (markerid_fin < markerid_stt) { + LOGEMERG("%s() invalid markerid range in '%s' " + "- finish < start", + __func__, key_range); + return; + } + free(tmp); + break; + default: + LOGEMERG("%s() unknown key range '%c' in '%s'", + __func__, key_range[0], key_range); + return; + } + + LOCK_INIT("dbk_updater"); + rename_proc("dbk_updater"); + + breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), + ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); + reload_breakqueue_store = k_new_store(breakqueue_free); + reload_done_breakqueue_store = k_new_store(breakqueue_free); + +#if LOCK_CHECK + DLPRIO(breakqueue, PRIO_TERMINAL); +#endif + if (breakdown_threads <= 0) { + cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; + breakdown_threads = (int)(cpus / 3) ? : 1; + } + LOGWARNING("%s(): creating %d breaker threads ...", + __func__, breakdown_threads); + reloader = true; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &reloader); + + alloc_storage(); + + setnow(&db_stt); + + if (!getdata1() || everyone_die) + return; + + db_users_complete = true; + + if (!getdata2() || everyone_die) + return; + + if (dbload_workinfoid_start != -1) { + LOGWARNING("WARNING: dbload starting at workinfoid %"PRId64, + dbload_workinfoid_start); + } + + if (!getdata3() || everyone_die) + return; + + POOLINSTANCE_RESET_MSG("dbload"); + setnow(&db_fin); + sec = tvdiff(&db_fin, &db_stt); + min = floor(sec / 60.0); + sec -= min * 60.0; + LOGWARNING("dbload complete %.0fm %.3fs", min, sec); + + db_load_complete = true; + + update_check(markerid_stt, markerid_fin); +} #if 0 /* TODO: This will be way faster traversing both trees simultaneously * rather than traversing one and searching the other, then repeating @@ -7086,7 +7465,7 @@ static void confirm_reload() free(filename); } - if (!reload_from(&start)) { + if (!reload_from(&start, &date_eot)) { LOGEMERG("%s() ABORTING from reload_from()", __func__); return; } @@ -7265,6 +7644,8 @@ static struct option long_options[] = { // DON'T use when connected to ckpool { "ignore-seq", required_argument, 0, 'I' }, { "killold", no_argument, 0, 'k' }, + // Generate old keysummary records + { "key", required_argument, 0, 'K' }, { "loglevel", required_argument, 0, 'l' }, // marker = enable mark/workmarker/markersummary auto generation { "marker", no_argument, 0, 'm' }, @@ -7321,8 +7702,12 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { switch(c) { + case '?': + case ':': + quit(1, "exiting"); + break; case 'a': len = strlen(optarg); if (len > MAX_ALERT_CMD) @@ -7419,6 +7804,10 @@ int main(int argc, char **argv) case 'k': ckp.killold = true; break; + case 'K': + key_range = strdup(optarg); + key_update = true; + break; case 'l': ckp.loglevel = atoi(optarg); if (ckp.loglevel < LOG_EMERG || ckp.loglevel > LOG_DEBUG) { @@ -7540,10 +7929,20 @@ int main(int argc, char **argv) btc_auth = http_base64(buf); bzero(buf, sizeof(buf)); - if (confirm_sharesummary) - dbcode = "y"; - else - dbcode = ""; + if (key_update) { + dbcode = "k"; + no_data_log = true; + ignore_seqall = true; + exclusive_db = false; + } else { + if (confirm_sharesummary) { + dbcode = "y"; + no_data_log = true; + ignore_seqall = true; + exclusive_db = false; + } else + dbcode = ""; + } if (!db_name) db_name = "ckdb"; @@ -7650,7 +8049,10 @@ int main(int argc, char **argv) o_limits_max_lifetime = o_limits[i].lifetime; } - if (confirm_sharesummary) { + if (key_update) { + update_keysummary(); + everyone_die = true; + } else if (confirm_sharesummary) { // TODO: add a system lock to stop running 2 at once? confirm_summaries(); everyone_die = true; diff --git a/src/ckdb.h b/src/ckdb.h index 15cdf684..1cb388d5 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -11,6 +11,8 @@ #ifndef CKDB_H #define CKDB_H +#pragma GCC diagnostic ignored "-Wtautological-compare" + #include "config.h" #include @@ -52,7 +54,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.206" +#define CKDB_VERSION DB_VERSION"-2.300" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -98,6 +100,7 @@ extern int switch_state; extern bool genpayout_auto; extern bool markersummary_auto; +extern bool exclusive_db; enum free_modes { FREE_MODE_ALL, @@ -299,6 +302,11 @@ extern const tv_t date_begin; #define BTC_TO_D(_amt) ((double)((_amt) / 100000000.0)) +// argv -K - don't run in ckdb mode, just update keysummaries +extern bool key_update; +extern int64_t key_wi_stt; +extern int64_t key_wi_fin; + // argv -y - don't run in ckdb mode, just confirm sharesummaries extern bool confirm_sharesummary; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index f348fedd..ee34e613 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -2528,6 +2528,10 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, bool igndup = false; char *txn_tree; + // nothing needed by key_update is triggered by the workinfo data + if (key_update) + goto wiconf; + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); @@ -2637,7 +2641,7 @@ wiconf: TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); - if (reloading && !confirm_sharesummary) { + if (reloading && !key_update && !confirm_sharesummary) { /* ISDR (Ignored shares during reload) * This will discard any shares older than the newest * workinfoidend of any workmarker - including ready @@ -2656,6 +2660,11 @@ wiconf: return NULL; } + if (key_update) { + if (workinfoid < key_wi_stt || workinfoid > key_wi_fin) + goto sconf; + } + if (confirm_sharesummary) { if (workinfoid < confirm_first_workinfoid || workinfoid > confirm_last_workinfoid) @@ -2753,6 +2762,10 @@ sconf: K_ITEM *i_error, *i_secondaryuserid; bool ok; + // not summarised in keysummaries + if (key_update) + goto wiconf; + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); @@ -2834,15 +2847,18 @@ seconf: TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); - if (reloading && !confirm_sharesummary) { + if (reloading && !key_update && !confirm_sharesummary) { // This excludes any already summarised if (workinfoid <= dbstatus.newest_workmarker_workinfoid) return NULL; } - if (confirm_sharesummary) { - TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + if (key_update) { + if (workinfoid < key_wi_stt || workinfoid > key_wi_fin) + goto awconf; + } + if (confirm_sharesummary) { if (workinfoid < confirm_first_workinfoid || workinfoid > confirm_last_workinfoid) goto awconf; @@ -2856,7 +2872,7 @@ seconf: return strdup("failed.DATA"); } else { /* Don't slow down the reload - do them later */ - if (!reloading) { + if (!reloading || key_update) { // Aging is a queued item thus the reply is ignored auto_age_older(workinfoid, transfer_data(i_poolinstance), diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 07af1a0c..a2f9ad13 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -1611,16 +1611,17 @@ K_ITEM *new_worker(PGconn *conn, bool update, int64_t userid, char *workername, item = find_workers(false, userid, workername); if (item) { - if (!confirm_sharesummary && update) { + if (!key_update && !confirm_sharesummary && update) { workers_update(conn, item, diffdef, idlenotificationenabled, idlenotificationtime, by, code, inet, cd, trf_root, true); } } else { - if (confirm_sharesummary) { - // Shouldn't be possible since the sharesummary is already aged - LOGERR("%s() %"PRId64"/%s workername not found during confirm", - __func__, userid, workername); + if (key_update || confirm_sharesummary) { + // Shouldn't be possible with old data + LOGERR("%s() %"PRId64"/%s workername not found during %s", + __func__, userid, workername, + key_update ? "keyupdate" : "confirm" ); return NULL; } @@ -2133,14 +2134,74 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx) return item; } +#define DISCARD_ALL -1 +// userid = DISCARD_ALL will dump all shares for the given workinfoid +static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped, + int64_t *diff_tot, bool skipupdate, + int64_t workinfoid, int64_t userid, char *workername) +{ + K_ITEM s_look, *s_item, *tmp_item; + SHARES lookshares, *shares; + K_TREE_CTX s_ctx[1]; + char error[1024]; + + error[0] = '\0'; + INIT_SHARES(&s_look); + + lookshares.workinfoid = workinfoid; + lookshares.userid = userid; + strcpy(lookshares.workername, workername); + DATE_ZERO(&(lookshares.createdate)); + + s_look.data = (void *)(&lookshares); + K_WLOCK(shares_free); + s_item = find_after_in_ktree(shares_root, &s_look, s_ctx); + while (s_item) { + DATA_SHARES(shares, s_item); + if (shares->workinfoid != workinfoid) + break; + + if (userid != DISCARD_ALL) { + if (shares->userid != userid || + strcmp(shares->workername, workername) != 0) + break; + } + + (*shares_tot)++; + if (shares->errn == SE_NONE) + (*diff_tot) += shares->diff; + tmp_item = next_in_ktree(s_ctx); + remove_from_ktree(shares_root, s_item); + k_unlink_item(shares_store, s_item); + if (reloading && skipupdate) + (*shares_dumped)++; + if (reloading && skipupdate && !error[0]) { + snprintf(error, sizeof(error), + "reload found aged share: %"PRId64 + "/%"PRId64"/%s/%s%.0f", + shares->workinfoid, + shares->userid, + shares->workername, + (shares->errn == SE_NONE) ? "" : "*", + shares->diff); + } + k_add_head(shares_free, s_item); + s_item = tmp_item; + } + K_WUNLOCK(shares_free); + + if (error[0]) + LOGERR("%s(): %s", __func__, error); +} + // Duplicates during a reload are set to not show messages bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, int64_t *s_count, int64_t *s_diff) { - K_ITEM *wi_item, ss_look, *ss_item, s_look, *s_item; - K_ITEM ks_look, *ks_item, *wm_item, *tmp_item; - K_TREE_CTX ss_ctx[1], s_ctx[1], ks_ctx[1]; + K_ITEM *wi_item, ss_look, *ss_item; + K_ITEM ks_look, *ks_item, *wm_item; + K_TREE_CTX ss_ctx[1], ks_ctx[1]; char cd_buf[DATE_BUFSIZ]; int64_t ss_tot, ss_already, ss_failed, shares_tot, shares_dumped; int64_t ks_tot, ks_already, ks_failed; @@ -2148,9 +2209,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, KEYSHARESUMMARY lookkeysharesummary, *keysharesummary; SHARESUMMARY looksharesummary, *sharesummary; WORKINFO *workinfo; - SHARES lookshares, *shares; - bool ok = false, ksok = false, skipupdate; - char error[1024]; + bool ok = false, ksok = false, skipupdate = false; LOGDEBUG("%s(): age", __func__); @@ -2193,17 +2252,20 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, goto bye; } + ok = true; + ss_tot = ss_already = ss_failed = shares_tot = shares_dumped = + diff_tot = 0; + + if (key_update) + goto skip_ss; + INIT_SHARESUMMARY(&ss_look); - INIT_SHARES(&s_look); // Find the first matching sharesummary looksharesummary.workinfoid = workinfoid; looksharesummary.userid = -1; looksharesummary.workername = EMPTY; - ok = true; - ss_tot = ss_already = ss_failed = shares_tot = shares_dumped = - diff_tot = 0; ss_look.data = (void *)(&looksharesummary); K_RLOCK(sharesummary_free); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx); @@ -2211,7 +2273,6 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, DATA_SHARESUMMARY_NULL(sharesummary, ss_item); while (ss_item && sharesummary->workinfoid == workinfoid) { ss_tot++; - error[0] = '\0'; skipupdate = false; /* Reloading during a confirm will not have any old data * so finding an aged sharesummary here is an error @@ -2251,50 +2312,14 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, } // Discard the shares either way - lookshares.workinfoid = workinfoid; - lookshares.userid = sharesummary->userid; - strcpy(lookshares.workername, sharesummary->workername); - DATE_ZERO(&(lookshares.createdate)); - - s_look.data = (void *)(&lookshares); - K_WLOCK(shares_free); - s_item = find_after_in_ktree(shares_root, &s_look, s_ctx); - while (s_item) { - DATA_SHARES(shares, s_item); - if (shares->workinfoid != workinfoid || - shares->userid != lookshares.userid || - strcmp(shares->workername, lookshares.workername) != 0) - break; + discard_shares(&shares_tot, &shares_dumped, &diff_tot, skipupdate, + workinfoid, sharesummary->userid, + sharesummary->workername); - shares_tot++; - if (shares->errn == SE_NONE) - diff_tot += shares->diff; - tmp_item = next_in_ktree(s_ctx); - remove_from_ktree(shares_root, s_item); - k_unlink_item(shares_store, s_item); - if (reloading && skipupdate) - shares_dumped++; - if (reloading && skipupdate && !error[0]) { - snprintf(error, sizeof(error), - "reload found aged share: %"PRId64 - "/%"PRId64"/%s/%s%.0f", - shares->workinfoid, - shares->userid, - shares->workername, - (shares->errn == SE_NONE) ? "" : "*", - shares->diff); - } - k_add_head(shares_free, s_item); - s_item = tmp_item; - } - K_WUNLOCK(shares_free); K_RLOCK(sharesummary_free); ss_item = next_in_ktree(ss_ctx); K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); - - if (error[0]) - LOGERR("%s(): %s", __func__, error); } if (ss_already || ss_failed || shares_dumped) { @@ -2311,6 +2336,8 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, } } +skip_ss: + INIT_KEYSHARESUMMARY(&ks_look); // Find the first matching keysharesummary @@ -2331,7 +2358,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, /* Reloading during a confirm will not have any old data * so finding an aged keysharesummary here is an error * N.B. this can only happen with (very) old reload files */ - if (reloading) { + if (reloading && !key_update) { if (keysharesummary->complete[0] == SUMMARY_COMPLETE) { ks_already++; skipupdate = true; @@ -2356,15 +2383,19 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, } } - /* All shares should have been discarded during sharesummary - * processing above */ - K_RLOCK(keysharesummary_free); ks_item = next_in_ktree(ks_ctx); K_RUNLOCK(keysharesummary_free); DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); } + /* All shares should have been discarded during sharesummary + * processing above except during a key_update */ + if (key_update) { + discard_shares(&shares_tot, &shares_dumped, &diff_tot, skipupdate, + workinfoid, -1, EMPTY); + } + if (ks_already) { LOGNOTICE("%s(): Keysummary aging of %"PRId64"/%s " "kstotal=%"PRId64" already=%"PRId64" failed=%"PRId64, diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 242074c6..e69c7859 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3219,12 +3219,14 @@ bool workinfo_fill(PGconn *conn) return false; } - res = PQexec(conn, "Lock table workinfo in access exclusive mode", CKPQ_READ); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Lock", rescode, conn); - goto flail; + if (exclusive_db) { + res = PQexec(conn, "Lock table workinfo in access exclusive mode", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Lock", rescode, conn); + goto flail; + } } res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); @@ -3456,7 +3458,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, return false; } - if (reloading && !confirm_sharesummary) { + if (reloading && !key_update && !confirm_sharesummary) { // We only need to know if the workmarker is processed K_RLOCK(workmarkers_free); wm_item = find_workmarkers(shares->workinfoid, false, @@ -3494,7 +3496,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, } } - if (!confirm_sharesummary) { + if (!key_update && !confirm_sharesummary) { workerstatus_update(NULL, shares, NULL); K_WLOCK(userinfo_free); userinfo_update(shares, NULL, NULL, false); @@ -3949,12 +3951,14 @@ bool shares_fill(PGconn *conn) return false; } - res = PQexec(conn, "Lock table shares in access exclusive mode", CKPQ_READ); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Lock", rescode, conn); - goto flail; + if (exclusive_db) { + res = PQexec(conn, "Lock table shares in access exclusive mode", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Lock", rescode, conn); + goto flail; + } } res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); @@ -4157,6 +4161,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, return false; } + // key_update skips shareerrors if (reloading && !confirm_sharesummary) { // We only need to know if the workmarker is processed K_RLOCK(workmarkers_free); @@ -4490,7 +4495,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, K_ITEM *kss_item, *kss_prev, kss_look; K_ITEM *ms_item, ms_look, *p_ss_item, *p_ms_item; K_ITEM *ks_item, ks_look; - bool ok = false, conned = false; + bool ok = false, conned = false, nonblank = false; int64_t diffacc = 0, shareacc = 0; int64_t kdiffacc = 0, kshareacc = 0; char *reason = NULL; @@ -4520,7 +4525,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, K_STORE *new_keysummary_store = k_new_store(keysummary_free); /* Use the master size for these local trees since - * they're large and doesn't get created often */ + * they're large and don't get created often */ K_TREE *ms_root = new_ktree_local(sshortname, cmp_markersummary, markersummary_free); K_TREE *ks_root = new_ktree_local(kshortname, cmp_keysummary, @@ -4536,6 +4541,9 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, goto flail; } + if (key_update) + goto dokey; + setnow(&add_stt); /* Check there aren't already any matching markersummaries * and assume keysummaries are the same */ @@ -4660,6 +4668,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, } setnow(&add_fin); +dokey: + setnow(&kadd_stt); INIT_KEYSUMMARY(&ks_look); @@ -4684,6 +4694,8 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, DATA_KEYSHARESUMMARY(keysharesummary, kss_item); if (keysharesummary->workinfoid < workmarkers->workinfoidstart) break; + if (keysharesummary->key[0]) + nonblank = true; K_RLOCK(keysharesummary_free); kss_prev = prev_in_ktree(kss_ctx); K_RUNLOCK(keysharesummary_free); @@ -4801,14 +4813,19 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, ks_item = ks_item->next; } - ok = workmarkers_process(conn, true, true, - workmarkers->markerid, - workmarkers->poolinstance, - workmarkers->workinfoidend, - workmarkers->workinfoidstart, - workmarkers->description, - MARKER_PROCESSED_STR, - by, code, inet, cd, trf_root); + if (!key_update) { + ok = workmarkers_process(conn, true, true, + workmarkers->markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_PROCESSED_STR, + by, code, inet, cd, trf_root); + } else { + // Not part of either tree key + STRNCPY(workmarkers->status, MARKER_PROCESSED_STR); + } setnow(&kdb_fin); rollback: if (ok) @@ -4959,7 +4976,7 @@ flail: "k(2*%"PRId64"%s/2*%"PRId64"%s) for workmarkers " "%"PRId64"/%s/End %"PRId64"/Stt %"PRId64"/%s/%s " "add=%.3fs kadd=%.3fs db=%.3fs kdb=%.3fs " - "lck=%.3f+%.3fs", + "lck=%.3f+%.3fs%s", shortname, ms_count, ks_count, ss_count, kss_count, shareacc, diffacc, kshareacc >> 1, (kshareacc & 1) ? ".5" : "", @@ -4973,7 +4990,8 @@ flail: tvdiff(&db_fin, &db_stt), tvdiff(&kdb_fin, &kdb_stt), tvdiff(&lck_got, &lck_stt), - tvdiff(&lck_fin, &lck_got)); + tvdiff(&lck_fin, &lck_got), + nonblank ? EMPTY : " ONLY BLANK KEYS"); // This should never happen if (kshareacc != (shareacc << 1) || kdiffacc != (diffacc << 1)) { @@ -6489,12 +6507,14 @@ bool miningpayouts_fill(PGconn *conn) return false; } - res = PQexec(conn, "Lock table miningpayouts in access exclusive mode", CKPQ_READ); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Lock", rescode, conn); - goto flail; + if (exclusive_db) { + res = PQexec(conn, "Lock table miningpayouts in access exclusive mode", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Lock", rescode, conn); + goto flail; + } } res = PQexec(conn, sel, CKPQ_READ); @@ -8264,12 +8284,14 @@ bool markersummary_fill(PGconn *conn) return false; } - res = PQexec(conn, "Lock table markersummary in access exclusive mode", CKPQ_READ); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Lock", rescode, conn); - goto flail; + if (exclusive_db) { + res = PQexec(conn, "Lock table markersummary in access exclusive mode", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Lock", rescode, conn); + goto flail; + } } res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); @@ -8564,6 +8586,10 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code, rescode = PQresultStatus(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); + /* Don't fail on a duplicate during key_update + * TODO: should only be on a duplicate ... */ + if (key_update) + ok = true; goto unparam; }