diff --git a/src/ckdb.c b/src/ckdb.c index 1a46b3e0..de8c67d2 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -47,7 +47,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9" -#define CKDB_VERSION DB_VERSION"-0.250" +#define CKDB_VERSION DB_VERSION"-0.251" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -63,6 +63,9 @@ #define TRUE_STR "Y" #define FALSE_STR "N" +#define TRUE_CHR 'Y' +#define FALSE_CHR 'N' + #define coinbase1height(_cb1) _coinbase1height(_cb1, WHERE_FFL_HERE) #define cmp_height(_cb1a, _cb1b) _cmp_height(_cb1a, _cb1b, WHERE_FFL_HERE) @@ -1298,6 +1301,9 @@ typedef struct blocks { #define BLOCKS_ORPHAN 'O' #define BLOCKS_ORPHAN_STR "O" +#define BLOCKS_STATSPENDING FALSE_CHR +#define BLOCKS_STATSCONFIRMED TRUE_CHR + static const char *blocks_new = "New"; static const char *blocks_confirm = "1-Confirm"; static const char *blocks_42 = "42-Confirm"; @@ -2227,7 +2233,7 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, conn = dbconnect(); conned = true; } - + res = PQexec(conn, qry, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -2299,6 +2305,9 @@ static cmp_t cmp_workerstatus(K_ITEM *a, K_ITEM *b) return c; } +/* TODO: replace a lot of the code for all data types that codes finds, + * each with specific functions for finding, to centralise the finds, + * with passed ctx's */ static K_ITEM *get_workerstatus(int64_t userid, char *workername) { WORKERSTATUS workerstatus; @@ -2400,6 +2409,7 @@ static void set_block_share_counters() /* From the end backwards so we can skip the workinfoid's we don't * want by jumping back to just before the current worker when the * workinfoid goes below the limit */ + K_RLOCK(sharesummary_free); ss_item = last_in_ktree(sharesummary_root, ctx); while (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); @@ -2425,6 +2435,7 @@ static void set_block_share_counters() * since it should always exist * However, it is simplest to simply create it * and keep going */ + K_RUNLOCK(sharesummary_free); ws_item = find_workerstatus(sharesummary->userid, sharesummary->workername, __FILE__, __func__, __LINE__); @@ -2433,6 +2444,7 @@ static void set_block_share_counters() sharesummary->workername, __FILE__, __func__, __LINE__); } + K_RLOCK(sharesummary_free); DATA_WORKERSTATUS(workerstatus, ws_item); } @@ -2448,6 +2460,7 @@ static void set_block_share_counters() ss_item = prev_in_ktree(ctx); } + K_RLOCK(sharesummary_free); } /* All data is loaded, now update workerstatus fields @@ -2493,8 +2506,10 @@ static void workerstatus_ready() STRNCPY(looksharesummary.workername, workerstatus->workername); looksharesummary.workinfoid = MAXID; ss_look.data = (void *)(&looksharesummary); + K_RLOCK(sharesummary_free); ss_item = find_before_in_ktree(sharesummary_root, &ss_look, cmp_sharesummary, ss_ctx); + K_RUNLOCK(sharesummary_free); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); if (tv_newer(&(workerstatus->last_share), @@ -3875,7 +3890,7 @@ static K_ITEM *find_workinfo(int64_t workinfoid) { WORKINFO workinfo; K_TREE_CTX ctx[1]; - K_ITEM look; + K_ITEM look, *item; workinfo.workinfoid = workinfoid; workinfo.expirydate.tv_sec = default_expiry.tv_sec; @@ -3883,7 +3898,10 @@ static K_ITEM *find_workinfo(int64_t workinfoid) INIT_WORKINFO(&look); look.data = (void *)(&workinfo); - return find_in_ktree(workinfo_root, &look, cmp_workinfo, ctx); + K_RLOCK(workinfo_free); + item = find_in_ktree(workinfo_root, &look, cmp_workinfo, ctx); + K_RUNLOCK(workinfo_free); + return item; } static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance, @@ -4087,7 +4105,9 @@ static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, ok = true; ss_tot = ss_already = ss_failed = shares_tot = shares_dumped = 0; ss_look.data = (void *)(&looksharesummary); + K_RLOCK(sharesummary_free); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ss_ctx); + K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); while (ss_item && sharesummary->workinfoid == workinfoid) { ss_tot++; @@ -4168,12 +4188,15 @@ static bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, s_item = tmp_item; } K_WUNLOCK(shares_free); + K_RLOCK(sharesummary_free); ss_item = next_in_ktree(ss_ctx); + K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); if (error[0]) LOGERR("%s(): %s", __func__, error); } + K_RUNLOCK(sharesummary_free); if (conned) PQfinish(conn); @@ -4223,6 +4246,8 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, looksharesummary.workername[0] = '\0'; INIT_SHARESUMMARY(&look); look.data = (void *)(&looksharesummary); + + K_RLOCK(sharesummary_free); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, cmp_sharesummary_workinfoid, ctx); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); @@ -4242,6 +4267,7 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, ss_item = next_in_ktree(ctx); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); } + K_RUNLOCK(sharesummary_free); LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); // Don't repeat searching old items to avoid accessing their ram @@ -4277,10 +4303,14 @@ static void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance, to_id = do_id; wid_count++; + K_RLOCK(sharesummary_free); while (ss_item && sharesummary->workinfoid == to_id) { + K_RLOCK(sharesummary_free); ss_item = next_in_ktree(ctx); + K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); } + K_RUNLOCK(sharesummary_free); if (ss_item) { do_id = sharesummary->workinfoid; @@ -4898,7 +4928,9 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row sharecreatedate = &(e_row->createdate); } + K_RLOCK(sharesummary_free); item = find_sharesummary(userid, workername, workinfoid); + K_RUNLOCK(sharesummary_free); if (item) { new = false; DATA_SHARESUMMARY(row, item); @@ -5410,6 +5442,9 @@ static cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b) return c; } +/* TODO: and make sure all block searches use these + * or add new ones as required here */ + // Must be R or W locked before call - gets current status (default_expiry) static K_ITEM *find_blocks(int32_t height, char *blockhash) { @@ -5427,6 +5462,34 @@ static K_ITEM *find_blocks(int32_t height, char *blockhash) return find_in_ktree(blocks_root, &look, cmp_blocks, ctx); } +// Must be R or W locked before call +static K_ITEM *find_prev_blocks(int32_t height) +{ + BLOCKS lookblocks, *blocks; + K_TREE_CTX ctx[1]; + K_ITEM look, *b_item; + + /* TODO: For self orphaned (if that ever happens) + * this will find based on blockhash order if it has two, + * not NEW, blocks, which might not find the right one */ + lookblocks.height = height; + lookblocks.blockhash[0] = '\0'; + lookblocks.expirydate.tv_sec = 0L; + lookblocks.expirydate.tv_usec = 0L; + + INIT_BLOCKS(&look); + look.data = (void *)(&lookblocks); + b_item = find_before_in_ktree(blocks_root, &look, cmp_blocks, ctx); + while (b_item) { + DATA_BLOCKS(blocks, b_item); + if (blocks->confirmed[0] != BLOCKS_NEW && + CURRENT(&(blocks->expirydate))) + return b_item; + b_item = prev_in_ktree(ctx); + } + return NULL; +} + static const char *blocks_confirmed(char *confirmed) { switch (confirmed[0]) { @@ -5442,6 +5505,150 @@ static const char *blocks_confirmed(char *confirmed) return blocks_unknown; } +static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, + double diffacc, double diffinv, double shareacc, + double shareinv, int64_t elapsed, + char *by, char *code, char *inet, tv_t *cd) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res = NULL; + K_TREE_CTX ctx[1]; + K_ITEM *b_item, *old_b_item; + BLOCKS *row, *oldblocks; + char hash_dsp[16+1]; + char *upd, *ins; + char *params[8 + HISTORYDATECOUNT]; + bool ok = false, update_old = false; + int par = 0; + int n; + + LOGDEBUG("%s(): confirm", __func__); + + dsp_hash(blockhash, hash_dsp, sizeof(hash_dsp)); + + K_RLOCK(blocks_free); + old_b_item = find_blocks(height, blockhash); + K_RUNLOCK(blocks_free); + + if (!old_b_item) { + LOGERR("%s(): Non-existent Block: %s/...%s", + __func__, height, hash_dsp); + return false; + } + + DATA_BLOCKS_NULL(oldblocks, old_b_item); + + K_WLOCK(blocks_free); + b_item = k_unlink_head(blocks_free); + K_WUNLOCK(blocks_free); + + DATA_BLOCKS(row, b_item); + memcpy(row, oldblocks, sizeof(*row)); + row->diffacc = diffacc; + row->diffinv = diffinv; + row->shareacc = shareacc; + row->shareinv = shareinv; + row->elapsed = elapsed; + row->statsconfirmed[0] = BLOCKS_STATSCONFIRMED; + row->statsconfirmed[1] = '\0'; + HISTORYDATEINIT(row, cd, by, code, inet); + + upd = "update blocks set expirydate=$1 where blockhash=$2 and expirydate=$3"; + par = 0; + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = str_to_buf(row->blockhash, NULL, 0); + params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); + PARCHKVAL(par, 3, params); + + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } + PQclear(res); + + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Update", rescode, conn); + res = PQexec(conn, "Rollback", CKPQ_WRITE); + goto unparam; + } + + update_old = true; + + for (n = 0; n < par; n++) + free(params[n]); + + par = 0; + params[par++] = str_to_buf(row->blockhash, NULL, 0); + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = double_to_buf(row->diffacc, NULL, 0); + params[par++] = double_to_buf(row->diffinv, NULL, 0); + params[par++] = double_to_buf(row->shareacc, NULL, 0); + params[par++] = double_to_buf(row->shareinv, NULL, 0); + params[par++] = bigint_to_buf(row->elapsed, NULL, 0); + params[par++] = str_to_buf(row->statsconfirmed, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHKVAL(par, 8 + HISTORYDATECOUNT, params); // 13 as per ins + + ins = "insert into blocks " + "(height,blockhash,workinfoid,userid,workername," + "clientid,enonce1,nonce2,nonce,reward,confirmed," + "diffacc,diffinv,shareacc,shareinv,elapsed," + "statsconfirmed" + HISTORYDATECONTROL ") select " + "height,blockhash,workinfoid,userid,workername," + "clientid,enonce1,nonce2,nonce,reward,confirmed," + "$3,$4,$5,$6,$7,$8," + "$9,$10,$11,$12,$13 from blocks where " + "blockhash=$1 and expirydate=$2"; + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + res = PQexec(conn, "Rollback", CKPQ_WRITE); + goto unparam; + } + + res = PQexec(conn, "Commit", CKPQ_WRITE); + + ok = true; +unparam: + PQclear(res); + for (n = 0; n < par; n++) + free(params[n]); + + if (conned) + PQfinish(conn); + + K_WLOCK(blocks_free); + if (!ok) + k_add_head(blocks_free, b_item); + else { + if (update_old) { + blocks_root = remove_from_ktree(blocks_root, old_b_item, cmp_blocks, ctx); + copy_tv(&(oldblocks->expirydate), cd); + blocks_root = add_to_ktree(blocks_root, old_b_item, cmp_blocks); + } + blocks_root = add_to_ktree(blocks_root, b_item, cmp_blocks); + k_add_head(blocks_store, b_item); + } + K_WUNLOCK(blocks_free); + + return ok; +} + static bool blocks_add(PGconn *conn, char *height, char *blockhash, char *confirmed, char *workinfoid, char *username, char *workername, char *clientid, char *enonce1, @@ -6816,7 +7023,7 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row) conn = dbconnect(); conned = true; } - + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -8020,7 +8227,7 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, i_eos = &userstats_eos; eos = (strcasecmp(transfer_data(i_eos), TRUE_STR) == 0); - + ok = userstats_add(transfer_data(i_poolinstance), transfer_data(i_elapsed), transfer_data(i_username), @@ -9340,6 +9547,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, tvs_to_buf(&last_bc, reply, sizeof(reply)); snprintf(tmp, sizeof(tmp), "lastbc=%s%c", reply, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); + K_RLOCK(workinfo_free); if (workinfo_current) { WORKINFO *wic; int32_t hi; @@ -9352,6 +9560,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, snprintf(tmp, sizeof(tmp), "lastheight=?%c", FLDSEP); APPEND_REALLOC(buf, off, len, tmp); } + K_RUNLOCK(workinfo_free); } else { snprintf(tmp, sizeof(tmp), "lastbc=?%clastheight=?%c", FLDSEP, FLDSEP); @@ -9672,9 +9881,11 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, looksharesummary.workername[0] = '\0'; INIT_SHARESUMMARY(&ss_look); ss_look.data = (void *)(&looksharesummary); + K_RLOCK(sharesummary_free); ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look, cmp_sharesummary_workinfoid, ctx); if (!ss_item) { + K_RUNLOCK(sharesummary_free); snprintf(reply, siz, "ERR.no shares found with or before " "workinfo %"PRId64, @@ -9697,6 +9908,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, if (allow_aged) break; default: + K_RUNLOCK(sharesummary_free); snprintf(reply, siz, "ERR.sharesummary1 not ready in " "workinfo %"PRId64, @@ -9724,6 +9936,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, if (allow_aged) break; default: + K_RUNLOCK(sharesummary_free); snprintf(reply, siz, "ERR.sharesummary2 not ready in " "workinfo %"PRId64, @@ -9738,6 +9951,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, ss_item = prev_in_ktree(ctx); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); } + K_RUNLOCK(sharesummary_free); if (total == 0.0) { snprintf(reply, siz, @@ -10304,6 +10518,139 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, return cmds[*which_cmds].cmd_val; } +static void summarise_blocks() +{ + K_ITEM *b_item, *b_prev, *wi_item, ss_look, *ss_item; + K_TREE_CTX ctx[1], ss_ctx[1]; + double diffacc, diffinv, shareacc, shareinv; + tv_t now, elapsed_start, elapsed_finish; + int64_t elapsed, wi_start, wi_finish; + BLOCKS *blocks, *prev_blocks; + WORKINFO *prev_workinfo; + SHARESUMMARY looksharesummary, *sharesummary; + int32_t hi; + bool ok; + + setnow(&now); + + K_RLOCK(blocks_free); + // Find the oldest, stats pending, not new, block + b_item = first_in_ktree(blocks_root, ctx); + while (b_item) { + DATA_BLOCKS(blocks, b_item); + if (CURRENT(&(blocks->expirydate)) && + blocks->statsconfirmed[0] == BLOCKS_STATSPENDING && + blocks->confirmed[0] != BLOCKS_NEW) + break; + b_item = next_in_ktree(ctx); + } + K_RUNLOCK(blocks_free); + + // None + if (!b_item) + return; + + wi_finish = blocks->workinfoid; + hi = 0; + K_RLOCK(workinfo_free); + if (workinfo_current) { + WORKINFO *wic; + DATA_WORKINFO(wic, workinfo_current); + hi = coinbase1height(wic->coinbase1); + } + K_RUNLOCK(workinfo_free); + + // Wait at least for the (badly named) '2nd' confirm + if (hi == 0 || blocks->height >= (hi - 1)) + return; + + diffacc = diffinv = shareacc = shareinv = 0; + elapsed = 0; + K_RLOCK(blocks_free); + b_prev = find_prev_blocks(blocks->height); + K_RUNLOCK(blocks_free); + if (!b_prev) { + wi_start = 0; + elapsed_start.tv_sec = elapsed_start.tv_usec = 0L; + } else { + DATA_BLOCKS(prev_blocks, b_prev); + wi_start = prev_blocks->workinfoid; + wi_item = find_workinfo(wi_start); + if (!wi_item) { + // This will repeat until fixed ... + LOGERR("%s() block %d, but prev %d wid " + "%"PRId64" is missing", + __func__, blocks->height, + prev_blocks->height, + prev_blocks->workinfoid); + return; + } + DATA_WORKINFO(prev_workinfo, wi_item); + copy_tv(&elapsed_start, &(prev_workinfo->createdate)); + } + elapsed_finish.tv_sec = elapsed_finish.tv_usec = 0L; + + // Add up the sharesummaries, abort if any SUMMARY_NEW + looksharesummary.workinfoid = wi_finish; + looksharesummary.userid = MAXID; + looksharesummary.workername[0] = '\0'; + INIT_SHARESUMMARY(&ss_look); + ss_look.data = (void *)(&looksharesummary); + K_RLOCK(sharesummary_free); + ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look, + cmp_sharesummary_workinfoid, ss_ctx); + + if (!ss_item) { + K_RUNLOCK(sharesummary_free); + // This will repeat each call here until fixed ... + LOGERR("%s() block %d, prev %d no sharesummaries " + "on or before %"PRId64, + __func__, blocks->height, + prev_blocks->height, wi_finish); + return; + } + DATA_SHARESUMMARY(sharesummary, ss_item); + while (ss_item && sharesummary->workinfoid > wi_start) { + if (sharesummary->complete[0] == SUMMARY_NEW) { + // Not aged yet + K_RUNLOCK(sharesummary_free); + return; + } + if (elapsed_start.tv_sec == 0 || + !tv_newer(&elapsed_start, &(sharesummary->firstshare))) { + copy_tv(&elapsed_start, &(sharesummary->firstshare)); + } + if (tv_newer(&elapsed_finish, &(sharesummary->lastshare))) + copy_tv(&elapsed_finish, &(sharesummary->lastshare)); + + diffacc += sharesummary->diffacc; + diffinv += sharesummary->diffsta + sharesummary->diffdup + + sharesummary->diffhi + sharesummary-> diffrej; + shareacc += sharesummary->shareacc; + shareinv += sharesummary->sharesta + sharesummary->sharedup + + sharesummary->sharehi + sharesummary-> sharerej; + + ss_item = prev_in_ktree(ss_ctx); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + } + K_RUNLOCK(sharesummary_free); + + elapsed = (int64_t)(tvdiff(&elapsed_finish, &elapsed_start) + 0.5); + ok = blocks_stats(NULL, blocks->height, blocks->blockhash, + diffacc, diffinv, shareacc, shareinv, elapsed, + by_default, (char *)__func__, inet_default, &now); + + if (ok) { + LOGWARNING("%s() block %d, stats confirmed " + "%0.f/%.0f/%.0f/%.0f/%"PRId64, + __func__, blocks->height, + diffacc, diffinv, shareacc, shareinv, elapsed); + } else { + LOGERR("%s() block %d, failed to confirm stats", + __func__, blocks->height); + } +} + static void summarise_poolstats() { // TODO @@ -10508,11 +10855,15 @@ static void *summariser(__maybe_unused void *arg) cksleep_ms(42); while (!everyone_die) { - sleep(13); + sleep(5); + if (!everyone_die) + summarise_blocks(); + sleep(4); if (!everyone_die) summarise_poolstats(); + sleep(4); if (!everyone_die) summarise_userstats(); } @@ -11834,9 +12185,7 @@ static void confirm_summaries() confirm_reload(); } -#define RELOADFILES "ckdb" - -static void check_restore_dir() +static void check_restore_dir(char *name) { struct stat statbuf; @@ -11854,14 +12203,15 @@ static void check_restore_dir() if (stat(restorefrom, &statbuf)) quit(1, "ERR: -r '%s' directory doesn't exist", restorefrom); - restorefrom = realloc(restorefrom, strlen(restorefrom)+sizeof(RELOADFILES)); + restorefrom = realloc(restorefrom, strlen(restorefrom)+strlen(name)+1); if (!restorefrom) quithere(1, "OOM"); - strcat(restorefrom, RELOADFILES); + strcat(restorefrom, name); } static struct option long_options[] = { + { "dbprefix", required_argument, 0, 'b' }, { "config", required_argument, 0, 'c' }, { "dbname", required_argument, 0, 'd' }, { "help", no_argument, 0, 'h' }, @@ -11986,8 +12336,6 @@ int main(int argc, char **argv) else dbcode = ""; - check_restore_dir(); - if (!db_name) db_name = "ckdb"; if (!db_user) @@ -11998,6 +12346,8 @@ int main(int argc, char **argv) prctl(PR_SET_NAME, buf, 0, 0, 0); memset(buf, 0, 15); + check_restore_dir(ckp.name); + if (!ckp.config) { ckp.config = strdup(ckp.name); realloc_strcat(&ckp.config, ".conf");