Browse Source

ckdb - use psql cursors for larger tables

master
kanoi 9 years ago
parent
commit
eedbd3e2ad
  1. 2
      src/ckdb.h
  2. 277
      src/ckdb_dbio.c

2
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.4" #define DB_VERSION "1.0.4"
#define CKDB_VERSION DB_VERSION"-1.403" #define CKDB_VERSION DB_VERSION"-1.500"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

277
src/ckdb_dbio.c

@ -41,8 +41,13 @@ char *pqerrmsg(PGconn *conn)
if (__col == -1) { \ if (__col == -1) { \
LOGERR("%s(): Unknown field '%s' row %d", __func__, __name, __row); \ LOGERR("%s(): Unknown field '%s' row %d", __func__, __name, __row); \
__ok = false; \ __ok = false; \
} else \ } else { \
__fld = PQgetvalue(__res, __row, __col); \ __fld = PQgetvalue(__res, __row, __col); \
if (__fld == NULL) { \
LOGERR("%s(): Invalid field '%s' or row %d", __func__, __name, __row); \
__ok = false; \
}\
} \
} while (0) } while (0)
// HISTORY FIELDS // HISTORY FIELDS
@ -1561,26 +1566,44 @@ bool workers_fill(PGconn *conn)
ExecStatusType rescode; ExecStatusType rescode;
PGresult *res; PGresult *res;
K_ITEM *item; K_ITEM *item;
int n, i; int n, t, i;
WORKERS *row; WORKERS *row;
char *field; char *field;
char *sel; char *sel;
int fields = 7; int fields = 7;
bool ok; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
sel = "select " sel = "declare wk cursor for select "
"userid,workername,difficultydefault," "userid,workername,difficultydefault,"
"idlenotificationenabled,idlenotificationtime,workerbits" "idlenotificationenabled,idlenotificationtime,workerbits"
HISTORYDATECONTROL HISTORYDATECONTROL
",workerid from workers"; ",workerid from workers";
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexec(conn, sel, CKPQ_READ); res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn); PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in wk", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQnfields(res); n = PQnfields(res);
@ -1588,14 +1611,14 @@ bool workers_fill(PGconn *conn)
LOGERR("%s(): Invalid field count - should be %d, but is %d", LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n); __func__, fields + HISTORYDATECOUNT, n);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQntuples(res); n = 0;
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true; ok = true;
K_WLOCK(workers_free); K_WLOCK(workers_free);
for (i = 0; i < n; i++) { while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(workers_free); item = k_unlink_head(workers_free);
DATA_WORKERS(row, item); DATA_WORKERS(row, item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
@ -1653,16 +1676,30 @@ bool workers_fill(PGconn *conn)
* other tables */ * other tables */
find_create_workerstatus(false, row->userid, row->workername, find_create_workerstatus(false, row->userid, row->workername,
__FILE__, __func__, __LINE__); __FILE__, __func__, __LINE__);
tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in wk", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
} }
if (!ok) if (!ok)
k_add_head(workers_free, item); k_add_head(workers_free, item);
K_WUNLOCK(workers_free); K_WUNLOCK(workers_free);
PQclear(res); PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) { if (ok) {
LOGDEBUG("%s(): built", __func__); LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d workers records", __func__, n); LOGWARNING("%s(): fetched %d workers records", __func__, n);
} }
return ok; return ok;
@ -2160,25 +2197,43 @@ bool payments_fill(PGconn *conn)
PGresult *res; PGresult *res;
K_ITEM *item; K_ITEM *item;
PAYMENTS *row; PAYMENTS *row;
int n, i; int n, t, i;
char *field; char *field;
char *sel; char *sel;
int fields = 11; int fields = 11;
bool ok; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
sel = "select " sel = "declare ps cursor for select "
"paymentid,payoutid,userid,subname,paydate,payaddress," "paymentid,payoutid,userid,subname,paydate,payaddress,"
"originaltxn,amount,diffacc,committxn,commitblockhash" "originaltxn,amount,diffacc,committxn,commitblockhash"
HISTORYDATECONTROL HISTORYDATECONTROL
" from payments"; " from payments";
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexec(conn, sel, CKPQ_READ); res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn); PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in ps", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQnfields(res); n = PQnfields(res);
@ -2186,14 +2241,14 @@ bool payments_fill(PGconn *conn)
LOGERR("%s(): Invalid field count - should be %d, but is %d", LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n); __func__, fields + HISTORYDATECOUNT, n);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQntuples(res); n = 0;
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true; ok = true;
K_WLOCK(payments_free); K_WLOCK(payments_free);
for (i = 0; i < n; i++) { while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(payments_free); item = k_unlink_head(payments_free);
DATA_PAYMENTS(row, item); DATA_PAYMENTS(row, item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
@ -2264,16 +2319,30 @@ bool payments_fill(PGconn *conn)
add_to_ktree(payments_root, item); add_to_ktree(payments_root, item);
k_add_head(payments_store, item); k_add_head(payments_store, item);
tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in ps", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
} }
if (!ok) if (!ok)
k_add_head(payments_free, item); k_add_head(payments_free, item);
K_WUNLOCK(payments_free); K_WUNLOCK(payments_free);
PQclear(res); PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) { if (ok) {
LOGDEBUG("%s(): built", __func__); LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d payments records", __func__, n); LOGWARNING("%s(): fetched %d payments records", __func__, n);
} }
return ok; return ok;
@ -2772,12 +2841,12 @@ bool workinfo_fill(PGconn *conn)
K_ITEM *item; K_ITEM *item;
WORKINFO *row; WORKINFO *row;
char *params[3]; char *params[3];
int n, i, par = 0; int n, t, i, par = 0;
char *field; char *field;
char *sel = NULL; char *sel = NULL;
size_t len, off; size_t len, off;
int fields = 10; int fields = 10;
bool ok; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
@ -2790,7 +2859,7 @@ bool workinfo_fill(PGconn *conn)
APPEND_REALLOC_INIT(sel, off, len); APPEND_REALLOC_INIT(sel, off, len);
APPEND_REALLOC(sel, off, len, APPEND_REALLOC(sel, off, len,
"select " "declare wi cursor for select "
// "workinfoid,poolinstance,transactiontree,merklehash,prevhash," // "workinfoid,poolinstance,transactiontree,merklehash,prevhash,"
"workinfoid,poolinstance,merklehash,prevhash," "workinfoid,poolinstance,merklehash,prevhash,"
"coinbase1,coinbase2,version,bits,ntime,reward" "coinbase1,coinbase2,version,bits,ntime,reward"
@ -2820,12 +2889,31 @@ bool workinfo_fill(PGconn *conn)
params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0); params[par++] = bigint_to_buf(dbload_workinfoid_start, NULL, 0);
params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0); params[par++] = bigint_to_buf(dbload_workinfoid_finish, NULL, 0);
PARCHK(par, params); PARCHK(par, params);
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn); PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in wi", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQnfields(res); n = PQnfields(res);
@ -2833,14 +2921,14 @@ bool workinfo_fill(PGconn *conn)
LOGERR("%s(): Invalid field count - should be %d, but is %d", LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n); __func__, fields + HISTORYDATECOUNT, n);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQntuples(res); n = 0;
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true; ok = true;
//K_WLOCK(workinfo_free); //K_WLOCK(workinfo_free);
for (i = 0; i < n; i++) { while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(workinfo_free); item = k_unlink_head(workinfo_free);
DATA_WORKINFO(row, item); DATA_WORKINFO(row, item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
@ -2925,14 +3013,23 @@ bool workinfo_fill(PGconn *conn)
dbstatus.newest_workinfoid = row->workinfoid; dbstatus.newest_workinfoid = row->workinfoid;
} }
if (i == 0 || ((i+1) % 100000) == 0) { if (n == 0 || ((n+1) % 100000) == 0) {
printf(TICK_PREFIX"wi "); printf(TICK_PREFIX"wi ");
pcom(i+1); pcom(n+1);
putchar('\r'); putchar('\r');
fflush(stdout); fflush(stdout);
} }
tick(); tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in wi", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
} }
if (!ok) { if (!ok) {
free_workinfo_data(item); free_workinfo_data(item);
@ -2942,10 +3039,14 @@ bool workinfo_fill(PGconn *conn)
//K_WUNLOCK(workinfo_free); //K_WUNLOCK(workinfo_free);
PQclear(res); PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) { if (ok) {
LOGDEBUG("%s(): built", __func__); LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d workinfo records", __func__, n); LOGWARNING("%s(): fetched %d workinfo records", __func__, n);
} }
return ok; return ok;
@ -5223,24 +5324,42 @@ bool miningpayouts_fill(PGconn *conn)
PGresult *res; PGresult *res;
K_ITEM *item; K_ITEM *item;
MININGPAYOUTS *row; MININGPAYOUTS *row;
int n, i; int n, t, i;
char *field; char *field;
char *sel; char *sel;
int fields = 4; int fields = 4;
bool ok; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
sel = "select " sel = "declare mp cursor for select "
"payoutid,userid,diffacc,amount" "payoutid,userid,diffacc,amount"
HISTORYDATECONTROL HISTORYDATECONTROL
" from miningpayouts"; " from miningpayouts";
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexec(conn, sel, CKPQ_READ); res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn); PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in mp", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQnfields(res); n = PQnfields(res);
@ -5248,14 +5367,14 @@ bool miningpayouts_fill(PGconn *conn)
LOGERR("%s(): Invalid field count - should be %d, but is %d", LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n); __func__, fields + HISTORYDATECOUNT, n);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQntuples(res); n = 0;
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true; ok = true;
K_WLOCK(miningpayouts_free); K_WLOCK(miningpayouts_free);
for (i = 0; i < n; i++) { while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(miningpayouts_free); item = k_unlink_head(miningpayouts_free);
DATA_MININGPAYOUTS(row, item); DATA_MININGPAYOUTS(row, item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
@ -5291,18 +5410,30 @@ bool miningpayouts_fill(PGconn *conn)
add_to_ktree(miningpayouts_root, item); add_to_ktree(miningpayouts_root, item);
k_add_head(miningpayouts_store, item); k_add_head(miningpayouts_store, item);
tick(); tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in mp", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
} }
if (!ok) if (!ok)
k_add_head(miningpayouts_free, item); k_add_head(miningpayouts_free, item);
K_WUNLOCK(miningpayouts_free); K_WUNLOCK(miningpayouts_free);
PQclear(res); PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) { if (ok) {
LOGDEBUG("%s(): built", __func__); LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d miningpayout records", __func__, n); LOGWARNING("%s(): fetched %d miningpayout records", __func__, n);
} }
return ok; return ok;
@ -6521,12 +6652,12 @@ bool markersummary_fill(PGconn *conn)
ExecStatusType rescode; ExecStatusType rescode;
PGresult *res; PGresult *res;
K_ITEM *item, *p_item; K_ITEM *item, *p_item;
int n, i, p_n; int n, t, i, p_n;
MARKERSUMMARY *row, *p_row; MARKERSUMMARY *row, *p_row;
char *field; char *field;
char *sel; char *sel;
int fields = 20; int fields = 20;
bool ok; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
@ -6534,19 +6665,37 @@ bool markersummary_fill(PGconn *conn)
fflush(stdout); fflush(stdout);
// TODO: limit how far back // TODO: limit how far back
sel = "select " sel = "declare ws cursor for select "
"markerid,userid,workername,diffacc,diffsta,diffdup,diffhi," "markerid,userid,workername,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,firstshareacc," "sharecount,errorcount,firstshare,lastshare,firstshareacc,"
"lastshareacc,lastdiffacc" "lastshareacc,lastdiffacc"
MODIFYDATECONTROL MODIFYDATECONTROL
" from markersummary"; " from markersummary";
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexec(conn, sel, CKPQ_READ); res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn); PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in ws", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQnfields(res); n = PQnfields(res);
@ -6554,14 +6703,14 @@ bool markersummary_fill(PGconn *conn)
LOGERR("%s(): Invalid field count - should be %d, but is %d", LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + MODIFYDATECOUNT, n); __func__, fields + MODIFYDATECOUNT, n);
PQclear(res); PQclear(res);
return false; goto flail;
} }
n = PQntuples(res); n = 0;
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true; ok = true;
//K_WLOCK(markersummary_free); //K_WLOCK(markersummary_free);
for (i = 0; i < n; i++) { while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(markersummary_free); item = k_unlink_head(markersummary_free);
DATA_MARKERSUMMARY(row, item); DATA_MARKERSUMMARY(row, item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
@ -6709,14 +6858,23 @@ bool markersummary_fill(PGconn *conn)
_userinfo_update(NULL, NULL, row, false, false); _userinfo_update(NULL, NULL, row, false, false);
if (i == 0 || ((i+1) % 100000) == 0) { if (n == 0 || ((n+1) % 100000) == 0) {
printf(TICK_PREFIX"ms "); printf(TICK_PREFIX"ms ");
pcom(i+1); pcom(n+1);
putchar('\r'); putchar('\r');
fflush(stdout); fflush(stdout);
} }
tick(); tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in ws", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
} }
if (!ok) { if (!ok) {
free_markersummary_data(item); free_markersummary_data(item);
@ -6727,10 +6885,13 @@ bool markersummary_fill(PGconn *conn)
//K_WUNLOCK(markersummary_free); //K_WUNLOCK(markersummary_free);
PQclear(res); PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) { if (ok) {
LOGDEBUG("%s(): built", __func__); LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d markersummary records", __func__, n); LOGWARNING("%s(): fetched %d markersummary records", __func__, n);
LOGWARNING("%s(): created %d markersummary pool records", __func__, p_n); LOGWARNING("%s(): created %d markersummary pool records", __func__, p_n);
} }

Loading…
Cancel
Save