From eedbd3e2adc1a5f165ce37831574f281936b0c9d Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 27 Oct 2015 02:14:07 +1100 Subject: [PATCH] ckdb - use psql cursors for larger tables --- src/ckdb.h | 2 +- src/ckdb_dbio.c | 935 ++++++++++++++++++++++++++++-------------------- 2 files changed, 549 insertions(+), 388 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 204d9ee9..ab48fccb 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.4" -#define CKDB_VERSION DB_VERSION"-1.403" +#define CKDB_VERSION DB_VERSION"-1.500" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 6d1277ca..6319aed4 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -41,8 +41,13 @@ char *pqerrmsg(PGconn *conn) if (__col == -1) { \ LOGERR("%s(): Unknown field '%s' row %d", __func__, __name, __row); \ __ok = false; \ - } else \ + } else { \ __fld = PQgetvalue(__res, __row, __col); \ + if (__fld == NULL) { \ + LOGERR("%s(): Invalid field '%s' or row %d", __func__, __name, __row); \ + __ok = false; \ + }\ + } \ } while (0) // HISTORY FIELDS @@ -1561,26 +1566,44 @@ bool workers_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; + int n, t, i; WORKERS *row; char *field; char *sel; int fields = 7; - bool ok; + bool ok = false; LOGDEBUG("%s(): select", __func__); - sel = "select " + sel = "declare wk cursor for select " "userid,workername,difficultydefault," "idlenotificationenabled,idlenotificationtime,workerbits" HISTORYDATECONTROL ",workerid from workers"; + res = PQexec(conn, "Begin", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + return false; + } + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); + PGLOGERR("Declare", rescode, conn); + goto flail; + } + + LOGDEBUG("%s(): fetching ...", __func__); + + res = PQexec(conn, "fetch 1 in wk", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch first", rescode, conn); PQclear(res); - return false; + goto flail; } n = PQnfields(res); @@ -1588,81 +1611,95 @@ bool workers_fill(PGconn *conn) LOGERR("%s(): Invalid field count - should be %d, but is %d", __func__, fields + HISTORYDATECOUNT, n); PQclear(res); - return false; + goto flail; } - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); + n = 0; ok = true; K_WLOCK(workers_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(workers_free); - DATA_WORKERS(row, item); - bzero(row, sizeof(*row)); - - if (everyone_die) { - ok = false; - break; - } + while ((t = PQntuples(res)) > 0) { + for (i = 0; i < t; i++) { + item = k_unlink_head(workers_free); + DATA_WORKERS(row, item); + bzero(row, sizeof(*row)); + + if (everyone_die) { + ok = false; + break; + } - PQ_GET_FLD(res, i, "userid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("userid", field, row->userid); + PQ_GET_FLD(res, i, "userid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("userid", field, row->userid); - PQ_GET_FLD(res, i, "workername", field, ok); - if (!ok) - break; - TXT_TO_STR("workername", field, row->workername); + PQ_GET_FLD(res, i, "workername", field, ok); + if (!ok) + break; + TXT_TO_STR("workername", field, row->workername); - PQ_GET_FLD(res, i, "difficultydefault", field, ok); - if (!ok) - break; - TXT_TO_INT("difficultydefault", field, row->difficultydefault); + PQ_GET_FLD(res, i, "difficultydefault", field, ok); + if (!ok) + break; + TXT_TO_INT("difficultydefault", field, row->difficultydefault); - PQ_GET_FLD(res, i, "idlenotificationenabled", field, ok); - if (!ok) - break; - TXT_TO_STR("idlenotificationenabled", field, row->idlenotificationenabled); + PQ_GET_FLD(res, i, "idlenotificationenabled", field, ok); + if (!ok) + break; + TXT_TO_STR("idlenotificationenabled", field, row->idlenotificationenabled); - PQ_GET_FLD(res, i, "idlenotificationtime", field, ok); - if (!ok) - break; - TXT_TO_INT("idlenotificationtime", field, row->idlenotificationtime); + PQ_GET_FLD(res, i, "idlenotificationtime", field, ok); + if (!ok) + break; + TXT_TO_INT("idlenotificationtime", field, row->idlenotificationtime); - PQ_GET_FLD(res, i, "workerbits", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("workerbits", field, row->workerbits); + PQ_GET_FLD(res, i, "workerbits", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("workerbits", field, row->workerbits); - HISTORYDATEFLDS(res, i, row, ok); - if (!ok) - break; + HISTORYDATEFLDS(res, i, row, ok); + if (!ok) + break; - PQ_GET_FLD(res, i, "workerid", field, ok); - if (!ok) + PQ_GET_FLD(res, i, "workerid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("workerid", field, row->workerid); + + add_to_ktree(workers_root, item); + k_add_head(workers_store, item); + + /* Make sure a workerstatus exists for each worker + * This is to ensure that code can use the workerstatus tree + * to reference other tables and not miss workers in the + * other tables */ + find_create_workerstatus(false, row->userid, row->workername, + __FILE__, __func__, __LINE__); + tick(); + n++; + } + PQclear(res); + res = PQexec(conn, "fetch 9999 in wk", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch next", rescode, conn); + ok = false; break; - TXT_TO_BIGINT("workerid", field, row->workerid); - - add_to_ktree(workers_root, item); - k_add_head(workers_store, item); - - /* Make sure a workerstatus exists for each worker - * This is to ensure that code can use the workerstatus tree - * to reference other tables and not miss workers in the - * other tables */ - find_create_workerstatus(false, row->userid, row->workername, - __FILE__, __func__, __LINE__); + } } if (!ok) k_add_head(workers_free, item); K_WUNLOCK(workers_free); PQclear(res); +flail: + res = PQexec(conn, "Commit", CKPQ_READ); + PQclear(res); if (ok) { LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d workers records", __func__, n); + LOGWARNING("%s(): fetched %d workers records", __func__, n); } return ok; @@ -2160,25 +2197,43 @@ bool payments_fill(PGconn *conn) PGresult *res; K_ITEM *item; PAYMENTS *row; - int n, i; + int n, t, i; char *field; char *sel; int fields = 11; - bool ok; + bool ok = false; LOGDEBUG("%s(): select", __func__); - sel = "select " + sel = "declare ps cursor for select " "paymentid,payoutid,userid,subname,paydate,payaddress," "originaltxn,amount,diffacc,committxn,commitblockhash" HISTORYDATECONTROL " from payments"; + res = PQexec(conn, "Begin", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + return false; + } + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); + PGLOGERR("Declare", rescode, conn); + goto flail; + } + + LOGDEBUG("%s(): fetching ...", __func__); + + res = PQexec(conn, "fetch 1 in ps", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch first", rescode, conn); PQclear(res); - return false; + goto flail; } n = PQnfields(res); @@ -2186,94 +2241,108 @@ bool payments_fill(PGconn *conn) LOGERR("%s(): Invalid field count - should be %d, but is %d", __func__, fields + HISTORYDATECOUNT, n); PQclear(res); - return false; + goto flail; } - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); + n = 0; ok = true; K_WLOCK(payments_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(payments_free); - DATA_PAYMENTS(row, item); - bzero(row, sizeof(*row)); + while ((t = PQntuples(res)) > 0) { + for (i = 0; i < t; i++) { + item = k_unlink_head(payments_free); + DATA_PAYMENTS(row, item); + bzero(row, sizeof(*row)); + + if (everyone_die) { + ok = false; + break; + } - if (everyone_die) { - ok = false; - break; - } + PQ_GET_FLD(res, i, "paymentid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("paymentid", field, row->paymentid); - PQ_GET_FLD(res, i, "paymentid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("paymentid", field, row->paymentid); + PQ_GET_FLD(res, i, "payoutid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("payoutid", field, row->payoutid); - PQ_GET_FLD(res, i, "payoutid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("payoutid", field, row->payoutid); + PQ_GET_FLD(res, i, "userid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("userid", field, row->userid); - PQ_GET_FLD(res, i, "userid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("userid", field, row->userid); + PQ_GET_FLD(res, i, "subname", field, ok); + if (!ok) + break; + TXT_TO_STR("subname", field, row->subname); - PQ_GET_FLD(res, i, "subname", field, ok); - if (!ok) - break; - TXT_TO_STR("subname", field, row->subname); + PQ_GET_FLD(res, i, "paydate", field, ok); + if (!ok) + break; + TXT_TO_TV("paydate", field, row->paydate); - PQ_GET_FLD(res, i, "paydate", field, ok); - if (!ok) - break; - TXT_TO_TV("paydate", field, row->paydate); + PQ_GET_FLD(res, i, "payaddress", field, ok); + if (!ok) + break; + TXT_TO_STR("payaddress", field, row->payaddress); - PQ_GET_FLD(res, i, "payaddress", field, ok); - if (!ok) - break; - TXT_TO_STR("payaddress", field, row->payaddress); + PQ_GET_FLD(res, i, "originaltxn", field, ok); + if (!ok) + break; + TXT_TO_STR("originaltxn", field, row->originaltxn); - PQ_GET_FLD(res, i, "originaltxn", field, ok); - if (!ok) - break; - TXT_TO_STR("originaltxn", field, row->originaltxn); + PQ_GET_FLD(res, i, "amount", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("amount", field, row->amount); - PQ_GET_FLD(res, i, "amount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("amount", field, row->amount); + PQ_GET_FLD(res, i, "diffacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffacc", field, row->diffacc); - PQ_GET_FLD(res, i, "diffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffacc", field, row->diffacc); + PQ_GET_FLD(res, i, "committxn", field, ok); + if (!ok) + break; + TXT_TO_STR("committxn", field, row->committxn); - PQ_GET_FLD(res, i, "committxn", field, ok); - if (!ok) - break; - TXT_TO_STR("committxn", field, row->committxn); + PQ_GET_FLD(res, i, "commitblockhash", field, ok); + if (!ok) + break; + TXT_TO_STR("commitblockhash", field, row->commitblockhash); - PQ_GET_FLD(res, i, "commitblockhash", field, ok); - if (!ok) - break; - TXT_TO_STR("commitblockhash", field, row->commitblockhash); + HISTORYDATEFLDS(res, i, row, ok); + if (!ok) + break; - HISTORYDATEFLDS(res, i, row, ok); - if (!ok) + add_to_ktree(payments_root, item); + k_add_head(payments_store, item); + tick(); + n++; + } + PQclear(res); + res = PQexec(conn, "fetch 9999 in ps", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch next", rescode, conn); + ok = false; break; - - add_to_ktree(payments_root, item); - k_add_head(payments_store, item); + } } if (!ok) k_add_head(payments_free, item); K_WUNLOCK(payments_free); PQclear(res); +flail: + res = PQexec(conn, "Commit", CKPQ_READ); + PQclear(res); if (ok) { LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d payments records", __func__, n); + LOGWARNING("%s(): fetched %d payments records", __func__, n); } return ok; @@ -2772,12 +2841,12 @@ bool workinfo_fill(PGconn *conn) K_ITEM *item; WORKINFO *row; char *params[3]; - int n, i, par = 0; + int n, t, i, par = 0; char *field; char *sel = NULL; size_t len, off; int fields = 10; - bool ok; + bool ok = false; LOGDEBUG("%s(): select", __func__); @@ -2790,7 +2859,7 @@ bool workinfo_fill(PGconn *conn) APPEND_REALLOC_INIT(sel, off, len); APPEND_REALLOC(sel, off, len, - "select " + "declare wi cursor for select " // "workinfoid,poolinstance,transactiontree,merklehash,prevhash," "workinfoid,poolinstance,merklehash,prevhash," "coinbase1,coinbase2,version,bits,ntime,reward" @@ -2820,12 +2889,31 @@ bool workinfo_fill(PGconn *conn) params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0); PARCHK(par, params); + + res = PQexec(conn, "Begin", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + return false; + } + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Declare", rescode, conn); + goto flail; + } + + LOGDEBUG("%s(): fetching ...", __func__); + + res = PQexec(conn, "fetch 1 in wi", CKPQ_READ); + rescode = PQresultStatus(res); if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); + PGLOGERR("Fetch first", rescode, conn); PQclear(res); - return false; + goto flail; } n = PQnfields(res); @@ -2833,106 +2921,115 @@ bool workinfo_fill(PGconn *conn) LOGERR("%s(): Invalid field count - should be %d, but is %d", __func__, fields + HISTORYDATECOUNT, n); PQclear(res); - return false; + goto flail; } - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); + n = 0; ok = true; //K_WLOCK(workinfo_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(workinfo_free); - DATA_WORKINFO(row, item); - bzero(row, sizeof(*row)); - - if (everyone_die) { - ok = false; - break; - } + while ((t = PQntuples(res)) > 0) { + for (i = 0; i < t; i++) { + item = k_unlink_head(workinfo_free); + DATA_WORKINFO(row, item); + bzero(row, sizeof(*row)); + + if (everyone_die) { + ok = false; + break; + } - PQ_GET_FLD(res, i, "workinfoid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("workinfoid", field, row->workinfoid); + PQ_GET_FLD(res, i, "workinfoid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("workinfoid", field, row->workinfoid); - PQ_GET_FLD(res, i, "poolinstance", field, ok); - if (!ok) - break; - TXT_TO_STR("poolinstance", field, row->poolinstance); + PQ_GET_FLD(res, i, "poolinstance", field, ok); + if (!ok) + break; + TXT_TO_STR("poolinstance", field, row->poolinstance); /* Not currently needed in RAM - PQ_GET_FLD(res, i, "transactiontree", field, ok); - if (!ok) - break; - TXT_TO_BLOB("transactiontree", field, row->transactiontree); - LIST_MEM_ADD(workinfo_free, row->transactiontree); + PQ_GET_FLD(res, i, "transactiontree", field, ok); + if (!ok) + break; + TXT_TO_BLOB("transactiontree", field, row->transactiontree); + LIST_MEM_ADD(workinfo_free, row->transactiontree); */ - row->transactiontree = EMPTY; + row->transactiontree = EMPTY; - PQ_GET_FLD(res, i, "merklehash", field, ok); - if (!ok) - break; - TXT_TO_BLOB("merklehash", field, row->merklehash); - LIST_MEM_ADD(workinfo_free, row->merklehash); + PQ_GET_FLD(res, i, "merklehash", field, ok); + if (!ok) + break; + TXT_TO_BLOB("merklehash", field, row->merklehash); + LIST_MEM_ADD(workinfo_free, row->merklehash); - PQ_GET_FLD(res, i, "prevhash", field, ok); - if (!ok) - break; - TXT_TO_STR("prevhash", field, row->prevhash); + PQ_GET_FLD(res, i, "prevhash", field, ok); + if (!ok) + break; + TXT_TO_STR("prevhash", field, row->prevhash); - PQ_GET_FLD(res, i, "coinbase1", field, ok); - if (!ok) - break; - TXT_TO_STR("coinbase1", field, row->coinbase1); + PQ_GET_FLD(res, i, "coinbase1", field, ok); + if (!ok) + break; + TXT_TO_STR("coinbase1", field, row->coinbase1); - PQ_GET_FLD(res, i, "coinbase2", field, ok); - if (!ok) - break; - TXT_TO_STR("coinbase2", field, row->coinbase2); + PQ_GET_FLD(res, i, "coinbase2", field, ok); + if (!ok) + break; + TXT_TO_STR("coinbase2", field, row->coinbase2); - PQ_GET_FLD(res, i, "version", field, ok); - if (!ok) - break; - TXT_TO_STR("version", field, row->version); + PQ_GET_FLD(res, i, "version", field, ok); + if (!ok) + break; + TXT_TO_STR("version", field, row->version); - PQ_GET_FLD(res, i, "bits", field, ok); - if (!ok) - break; - TXT_TO_STR("bits", field, row->bits); + PQ_GET_FLD(res, i, "bits", field, ok); + if (!ok) + break; + TXT_TO_STR("bits", field, row->bits); - PQ_GET_FLD(res, i, "ntime", field, ok); - if (!ok) - break; - TXT_TO_STR("ntime", field, row->ntime); + PQ_GET_FLD(res, i, "ntime", field, ok); + if (!ok) + break; + TXT_TO_STR("ntime", field, row->ntime); - PQ_GET_FLD(res, i, "reward", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("reward", field, row->reward); - pool.reward = row->reward; + PQ_GET_FLD(res, i, "reward", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("reward", field, row->reward); + pool.reward = row->reward; - HISTORYDATEFLDS(res, i, row, ok); - if (!ok) - break; + HISTORYDATEFLDS(res, i, row, ok); + if (!ok) + break; - add_to_ktree(workinfo_root, item); - if (!confirm_sharesummary) - add_to_ktree(workinfo_height_root, item); - k_add_head(workinfo_store, item); + add_to_ktree(workinfo_root, item); + if (!confirm_sharesummary) + add_to_ktree(workinfo_height_root, item); + k_add_head(workinfo_store, item); - if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) { - copy_tv(&(dbstatus.newest_createdate_workinfo), &(row->createdate)); - dbstatus.newest_workinfoid = row->workinfoid; - } + if (tv_newer(&(dbstatus.newest_createdate_workinfo), &(row->createdate))) { + copy_tv(&(dbstatus.newest_createdate_workinfo), &(row->createdate)); + dbstatus.newest_workinfoid = row->workinfoid; + } - if (i == 0 || ((i+1) % 100000) == 0) { - printf(TICK_PREFIX"wi "); - pcom(i+1); - putchar('\r'); - fflush(stdout); + if (n == 0 || ((n+1) % 100000) == 0) { + printf(TICK_PREFIX"wi "); + pcom(n+1); + putchar('\r'); + fflush(stdout); + } + tick(); + n++; + } + PQclear(res); + res = PQexec(conn, "fetch 9999 in wi", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch next", rescode, conn); + ok = false; + break; } - - tick(); } if (!ok) { free_workinfo_data(item); @@ -2942,10 +3039,14 @@ bool workinfo_fill(PGconn *conn) //K_WUNLOCK(workinfo_free); PQclear(res); +flail: + res = PQexec(conn, "Commit", CKPQ_READ); + PQclear(res); + if (ok) { LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d workinfo records", __func__, n); + LOGWARNING("%s(): fetched %d workinfo records", __func__, n); } return ok; @@ -5223,24 +5324,42 @@ bool miningpayouts_fill(PGconn *conn) PGresult *res; K_ITEM *item; MININGPAYOUTS *row; - int n, i; + int n, t, i; char *field; char *sel; int fields = 4; - bool ok; + bool ok = false; LOGDEBUG("%s(): select", __func__); - sel = "select " + sel = "declare mp cursor for select " "payoutid,userid,diffacc,amount" HISTORYDATECONTROL " from miningpayouts"; + res = PQexec(conn, "Begin", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + return false; + } + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); + PGLOGERR("Declare", rescode, conn); + goto flail; + } + + LOGDEBUG("%s(): fetching ...", __func__); + + res = PQexec(conn, "fetch 1 in mp", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch first", rescode, conn); PQclear(res); - return false; + goto flail; } n = PQnfields(res); @@ -5248,61 +5367,73 @@ bool miningpayouts_fill(PGconn *conn) LOGERR("%s(): Invalid field count - should be %d, but is %d", __func__, fields + HISTORYDATECOUNT, n); PQclear(res); - return false; + goto flail; } - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); + n = 0; ok = true; K_WLOCK(miningpayouts_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(miningpayouts_free); - DATA_MININGPAYOUTS(row, item); - bzero(row, sizeof(*row)); + while ((t = PQntuples(res)) > 0) { + for (i = 0; i < t; i++) { + item = k_unlink_head(miningpayouts_free); + DATA_MININGPAYOUTS(row, item); + bzero(row, sizeof(*row)); + + if (everyone_die) { + ok = false; + break; + } - if (everyone_die) { - ok = false; - break; - } + PQ_GET_FLD(res, i, "payoutid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("payoutid", field, row->payoutid); - PQ_GET_FLD(res, i, "payoutid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("payoutid", field, row->payoutid); + PQ_GET_FLD(res, i, "userid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("userid", field, row->userid); - PQ_GET_FLD(res, i, "userid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("userid", field, row->userid); + PQ_GET_FLD(res, i, "diffacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffacc", field, row->diffacc); - PQ_GET_FLD(res, i, "diffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffacc", field, row->diffacc); + PQ_GET_FLD(res, i, "amount", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("amount", field, row->amount); - PQ_GET_FLD(res, i, "amount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("amount", field, row->amount); + HISTORYDATEFLDS(res, i, row, ok); + if (!ok) + break; - HISTORYDATEFLDS(res, i, row, ok); - if (!ok) + add_to_ktree(miningpayouts_root, item); + k_add_head(miningpayouts_store, item); + tick(); + n++; + } + PQclear(res); + res = PQexec(conn, "fetch 9999 in mp", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch next", rescode, conn); + ok = false; break; - - add_to_ktree(miningpayouts_root, item); - k_add_head(miningpayouts_store, item); - - tick(); + } } if (!ok) k_add_head(miningpayouts_free, item); K_WUNLOCK(miningpayouts_free); PQclear(res); +flail: + res = PQexec(conn, "Commit", CKPQ_READ); + PQclear(res); if (ok) { LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d miningpayout records", __func__, n); + LOGWARNING("%s(): fetched %d miningpayout records", __func__, n); } return ok; @@ -6521,12 +6652,12 @@ bool markersummary_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item, *p_item; - int n, i, p_n; + int n, t, i, p_n; MARKERSUMMARY *row, *p_row; char *field; char *sel; int fields = 20; - bool ok; + bool ok = false; LOGDEBUG("%s(): select", __func__); @@ -6534,19 +6665,37 @@ bool markersummary_fill(PGconn *conn) fflush(stdout); // TODO: limit how far back - sel = "select " + sel = "declare ws cursor for select " "markerid,userid,workername,diffacc,diffsta,diffdup,diffhi," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," "sharecount,errorcount,firstshare,lastshare,firstshareacc," "lastshareacc,lastdiffacc" MODIFYDATECONTROL " from markersummary"; + res = PQexec(conn, "Begin", CKPQ_READ); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + return false; + } + res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { - PGLOGERR("Select", rescode, conn); + PGLOGERR("Declare", rescode, conn); + goto flail; + } + + LOGDEBUG("%s(): fetching ...", __func__); + + res = PQexec(conn, "fetch 1 in ws", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch first", rescode, conn); PQclear(res); - return false; + goto flail; } n = PQnfields(res); @@ -6554,169 +6703,178 @@ bool markersummary_fill(PGconn *conn) LOGERR("%s(): Invalid field count - should be %d, but is %d", __func__, fields + MODIFYDATECOUNT, n); PQclear(res); - return false; + goto flail; } - n = PQntuples(res); - LOGDEBUG("%s(): tree build count %d", __func__, n); + n = 0; ok = true; //K_WLOCK(markersummary_free); - for (i = 0; i < n; i++) { - item = k_unlink_head(markersummary_free); - DATA_MARKERSUMMARY(row, item); - bzero(row, sizeof(*row)); - - if (everyone_die) { - ok = false; - break; - } + while ((t = PQntuples(res)) > 0) { + for (i = 0; i < t; i++) { + item = k_unlink_head(markersummary_free); + DATA_MARKERSUMMARY(row, item); + bzero(row, sizeof(*row)); + + if (everyone_die) { + ok = false; + break; + } - PQ_GET_FLD(res, i, "markerid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("markerid", field, row->markerid); + PQ_GET_FLD(res, i, "markerid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("markerid", field, row->markerid); - PQ_GET_FLD(res, i, "userid", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("userid", field, row->userid); + PQ_GET_FLD(res, i, "userid", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("userid", field, row->userid); - PQ_GET_FLD(res, i, "workername", field, ok); - if (!ok) - break; - TXT_TO_PTR("workername", field, row->workername); - LIST_MEM_ADD(markersummary_free, row->workername); + PQ_GET_FLD(res, i, "workername", field, ok); + if (!ok) + break; + TXT_TO_PTR("workername", field, row->workername); + LIST_MEM_ADD(markersummary_free, row->workername); - PQ_GET_FLD(res, i, "diffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffacc", field, row->diffacc); + PQ_GET_FLD(res, i, "diffacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffacc", field, row->diffacc); - PQ_GET_FLD(res, i, "diffsta", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffsta", field, row->diffsta); + PQ_GET_FLD(res, i, "diffsta", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffsta", field, row->diffsta); - PQ_GET_FLD(res, i, "diffdup", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffdup", field, row->diffdup); + PQ_GET_FLD(res, i, "diffdup", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffdup", field, row->diffdup); - PQ_GET_FLD(res, i, "diffhi", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffhi", field, row->diffhi); + PQ_GET_FLD(res, i, "diffhi", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffhi", field, row->diffhi); - PQ_GET_FLD(res, i, "diffrej", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("diffrej", field, row->diffrej); + PQ_GET_FLD(res, i, "diffrej", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("diffrej", field, row->diffrej); - PQ_GET_FLD(res, i, "shareacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("shareacc", field, row->shareacc); + PQ_GET_FLD(res, i, "shareacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("shareacc", field, row->shareacc); - PQ_GET_FLD(res, i, "sharesta", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharesta", field, row->sharesta); + PQ_GET_FLD(res, i, "sharesta", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("sharesta", field, row->sharesta); - PQ_GET_FLD(res, i, "sharedup", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharedup", field, row->sharedup); + PQ_GET_FLD(res, i, "sharedup", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("sharedup", field, row->sharedup); - PQ_GET_FLD(res, i, "sharehi", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharehi", field, row->sharehi); + PQ_GET_FLD(res, i, "sharehi", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("sharehi", field, row->sharehi); - PQ_GET_FLD(res, i, "sharerej", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("sharerej", field, row->sharerej); + PQ_GET_FLD(res, i, "sharerej", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("sharerej", field, row->sharerej); - PQ_GET_FLD(res, i, "sharecount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("sharecount", field, row->sharecount); + PQ_GET_FLD(res, i, "sharecount", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("sharecount", field, row->sharecount); - PQ_GET_FLD(res, i, "errorcount", field, ok); - if (!ok) - break; - TXT_TO_BIGINT("errorcount", field, row->errorcount); + PQ_GET_FLD(res, i, "errorcount", field, ok); + if (!ok) + break; + TXT_TO_BIGINT("errorcount", field, row->errorcount); - PQ_GET_FLD(res, i, "firstshare", field, ok); - if (!ok) - break; - TXT_TO_TV("firstshare", field, row->firstshare); + PQ_GET_FLD(res, i, "firstshare", field, ok); + if (!ok) + break; + TXT_TO_TV("firstshare", field, row->firstshare); - PQ_GET_FLD(res, i, "lastshare", field, ok); - if (!ok) - break; - TXT_TO_TV("lastshare", field, row->lastshare); + PQ_GET_FLD(res, i, "lastshare", field, ok); + if (!ok) + break; + TXT_TO_TV("lastshare", field, row->lastshare); - PQ_GET_FLD(res, i, "firstshareacc", field, ok); - if (!ok) - break; - TXT_TO_TV("firstshareacc", field, row->firstshareacc); + PQ_GET_FLD(res, i, "firstshareacc", field, ok); + if (!ok) + break; + TXT_TO_TV("firstshareacc", field, row->firstshareacc); - PQ_GET_FLD(res, i, "lastshareacc", field, ok); - if (!ok) - break; - TXT_TO_TV("lastshareacc", field, row->lastshareacc); + PQ_GET_FLD(res, i, "lastshareacc", field, ok); + if (!ok) + break; + TXT_TO_TV("lastshareacc", field, row->lastshareacc); - PQ_GET_FLD(res, i, "lastdiffacc", field, ok); - if (!ok) - break; - TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc); + PQ_GET_FLD(res, i, "lastdiffacc", field, ok); + if (!ok) + break; + TXT_TO_DOUBLE("lastdiffacc", field, row->lastdiffacc); - MODIFYDATEFLDPOINTERS(markersummary_free, res, i, row, ok); - if (!ok) - break; + MODIFYDATEFLDPOINTERS(markersummary_free, res, i, row, ok); + if (!ok) + break; - /* Save having to do this everywhere in the code for old data - * It's not always accurate, but soon after when it's not, - * and also what was used before the 2 fields were added */ - if (row->diffacc > 0) { - if (row->firstshareacc.tv_sec == 0L) - copy_tv(&(row->firstshareacc), &(row->firstshare)); - if (row->lastshareacc.tv_sec == 0L) - copy_tv(&(row->lastshareacc), &(row->lastshare)); - } + /* Save having to do this everywhere in the code for old data + * It's not always accurate, but soon after when it's not, + * and also what was used before the 2 fields were added */ + if (row->diffacc > 0) { + if (row->firstshareacc.tv_sec == 0L) + copy_tv(&(row->firstshareacc), &(row->firstshare)); + if (row->lastshareacc.tv_sec == 0L) + copy_tv(&(row->lastshareacc), &(row->lastshare)); + } - add_to_ktree(markersummary_root, item); - add_to_ktree(markersummary_userid_root, item); - k_add_head(markersummary_store, item); - - p_item = find_markersummary_p(row->markerid); - if (!p_item) { - /* N.B. this could be false due to the markerid - * having the wrong status TODO: deal with that? */ - p_item = k_unlink_head(markersummary_free); - DATA_MARKERSUMMARY(p_row, p_item); - bzero(p_row, sizeof(*p_row)); - p_row->markerid = row->markerid; - POOL_MS(p_row); - add_to_ktree(markersummary_pool_root, p_item); - k_add_head(markersummary_pool_store, p_item); - } else { - DATA_MARKERSUMMARY(p_row, p_item); - } + add_to_ktree(markersummary_root, item); + add_to_ktree(markersummary_userid_root, item); + k_add_head(markersummary_store, item); + + p_item = find_markersummary_p(row->markerid); + if (!p_item) { + /* N.B. this could be false due to the markerid + * having the wrong status TODO: deal with that? */ + p_item = k_unlink_head(markersummary_free); + DATA_MARKERSUMMARY(p_row, p_item); + bzero(p_row, sizeof(*p_row)); + p_row->markerid = row->markerid; + POOL_MS(p_row); + add_to_ktree(markersummary_pool_root, p_item); + k_add_head(markersummary_pool_store, p_item); + } else { + DATA_MARKERSUMMARY(p_row, p_item); + } - markersummary_to_pool(p_row, row); + markersummary_to_pool(p_row, row); - _userinfo_update(NULL, NULL, row, false, false); + _userinfo_update(NULL, NULL, row, false, false); - if (i == 0 || ((i+1) % 100000) == 0) { - printf(TICK_PREFIX"ms "); - pcom(i+1); - putchar('\r'); - fflush(stdout); + if (n == 0 || ((n+1) % 100000) == 0) { + printf(TICK_PREFIX"ms "); + pcom(n+1); + putchar('\r'); + fflush(stdout); + } + tick(); + n++; + } + PQclear(res); + res = PQexec(conn, "fetch 9999 in ws", CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Fetch next", rescode, conn); + ok = false; + break; } - - tick(); } if (!ok) { free_markersummary_data(item); @@ -6727,10 +6885,13 @@ bool markersummary_fill(PGconn *conn) //K_WUNLOCK(markersummary_free); PQclear(res); +flail: + res = PQexec(conn, "Commit", CKPQ_READ); + PQclear(res); if (ok) { LOGDEBUG("%s(): built", __func__); - LOGWARNING("%s(): loaded %d markersummary records", __func__, n); + LOGWARNING("%s(): fetched %d markersummary records", __func__, n); LOGWARNING("%s(): created %d markersummary pool records", __func__, p_n); }