Browse Source

ckdb - wrap most PQ functions and simplify

master
kanoi 8 years ago
parent
commit
ac6aff7918
  1. 68
      src/ckdb.c
  2. 28
      src/ckdb.h
  3. 91
      src/ckdb_cmd.c
  4. 6
      src/ckdb_data.c
  5. 1051
      src/ckdb_dbio.c

68
src/ckdb.c

@ -1716,9 +1716,11 @@ PGconn *dbconnect()
*/ */
static bool getdata1() static bool getdata1()
{ {
PGconn *conn = dbconnect(); PGconn *conn = NULL;
bool ok = true; bool ok = true;
CKPQConn(&conn);
if (!(ok = check_db_version(conn))) if (!(ok = check_db_version(conn)))
goto matane; goto matane;
if (!(ok = optioncontrol_fill(conn))) if (!(ok = optioncontrol_fill(conn)))
@ -1731,7 +1733,7 @@ static bool getdata1()
matane: matane:
PQfinish(conn); CKPQFinish(&conn);
return ok; return ok;
} }
@ -1740,19 +1742,25 @@ matane:
*/ */
static bool getdata2() static bool getdata2()
{ {
PGconn *conn = dbconnect(); PGconn *conn = NULL;
bool ok = blocks_fill(conn); bool ok;
CKPQConn(&conn);
ok = blocks_fill(conn);
PQfinish(conn); CKPQFinish(&conn);
return ok; return ok;
} }
static bool getdata3() static bool getdata3()
{ {
PGconn *conn = dbconnect(); PGconn *conn = NULL;
bool ok = true; bool ok = true;
CKPQConn(&conn);
if (!key_update && !confirm_sharesummary) { if (!key_update && !confirm_sharesummary) {
if (!(ok = paymentaddresses_fill(conn)) || everyone_die) if (!(ok = paymentaddresses_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
@ -1762,12 +1770,12 @@ static bool getdata3()
if (!(ok = miningpayouts_fill(conn)) || everyone_die) if (!(ok = miningpayouts_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
} }
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
if (!(ok = workinfo_fill(conn)) || everyone_die) if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
if (!(ok = marks_fill(conn)) || everyone_die) if (!(ok = marks_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
/* must be after workinfo */ /* must be after workinfo */
@ -1778,14 +1786,14 @@ static bool getdata3()
if (!(ok = payouts_fill(conn)) || everyone_die) if (!(ok = payouts_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
} }
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
if (!key_update) { if (!key_update) {
if (!(ok = markersummary_fill(conn)) || everyone_die) if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
} }
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
if (!key_update) { if (!key_update) {
if (!(ok = shares_fill(conn)) || everyone_die) if (!(ok = shares_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
@ -1795,7 +1803,7 @@ static bool getdata3()
sukamudai: sukamudai:
PQfinish(conn); CKPQFinish(&conn);
return ok; return ok;
} }
@ -6153,7 +6161,7 @@ static void *listener_all(void *arg)
else else
clistener_using_data = true; clistener_using_data = true;
conn = dbconnect(); CKPQConn(&conn);
now = time(NULL); now = time(NULL);
while (!everyone_die) { while (!everyone_die) {
@ -6250,8 +6258,8 @@ static void *listener_all(void *arg)
// Don't keep a connection for more than ~10s // Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) { if ((time(NULL) - now) > 10) {
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
now = time(NULL); now = time(NULL);
} }
@ -6294,9 +6302,7 @@ static void *listener_all(void *arg)
} }
} }
} }
CKPQFinish(&conn);
if (conn)
PQfinish(conn);
if (mythread != 0) if (mythread != 0)
LOGNOTICE("%s() %s exiting", __func__, buf); LOGNOTICE("%s() %s exiting", __func__, buf);
@ -7202,7 +7208,7 @@ static void *process_reload(__maybe_unused void *arg)
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000;
conn = dbconnect(); CKPQConn(&conn);
now = time(NULL); now = time(NULL);
while (!everyone_die) { while (!everyone_die) {
@ -7306,8 +7312,8 @@ static void *process_reload(__maybe_unused void *arg)
// Don't keep a connection for more than ~10s ... of processing // Don't keep a connection for more than ~10s ... of processing
if ((time(NULL) - now) > 10) { if ((time(NULL) - now) > 10) {
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
now = time(NULL); now = time(NULL);
} }
@ -7322,9 +7328,7 @@ static void *process_reload(__maybe_unused void *arg)
tick(); tick();
} }
CKPQFinish(&conn);
if (conn)
PQfinish(conn);
if (mythread == 0) { if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) { for (i = 1; i < THREAD_LIMIT; i++) {
@ -7859,7 +7863,7 @@ static void *pqproc(void *arg)
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
now = time(NULL); now = time(NULL);
conn = dbconnect(); CKPQConn(&conn);
wqgot = 0; wqgot = 0;
// Override checking until pool0 is complete // Override checking until pool0 is complete
@ -7972,8 +7976,8 @@ static void *pqproc(void *arg)
/* Don't keep a connection for more than ~10s or ~10000 items /* Don't keep a connection for more than ~10s or ~10000 items
* but always have a connection open */ * but always have a connection open */
if ((time(NULL) - now) > 10 || wqgot > 10000) { if ((time(NULL) - now) > 10 || wqgot > 10000) {
PQfinish(conn); CKPQFinish(&conn);
conn = dbconnect(); CKPQConn(&conn);
now = time(NULL); now = time(NULL);
wqgot = 0; wqgot = 0;
} }
@ -8043,9 +8047,7 @@ static void *pqproc(void *arg)
mutex_unlock(&wq_pool_waitlock); mutex_unlock(&wq_pool_waitlock);
} }
} }
CKPQFinish(&conn);
if (conn)
PQfinish(conn);
if (mythread == 0) { if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) { for (i = 1; i < THREAD_LIMIT; i++) {

28
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.518" #define CKDB_VERSION DB_VERSION"-2.600"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -3614,12 +3614,12 @@ extern void userinfo_block(BLOCKS *blocks, enum info_type isnew, int delta);
#define CKPQ_READ true #define CKPQ_READ true
#define CKPQ_WRITE false #define CKPQ_WRITE false
#define CKPQexec(_conn, _qry, _isread) _CKPQexec(_conn, _qry, _isread, WHERE_FFL_HERE) #define CKPQExec(_conn, _qry, _isread) _CKPQExec(_conn, _qry, _isread, WHERE_FFL_HERE)
extern PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS); extern PGresult *_CKPQExec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS);
#define CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \ #define CKPQExecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, _isread) \
_CKPQexecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \ _CKPQExecParams(_conn, _qry, _p1, _p2, _p3, _p4, _p5, _p6, \
_isread, WHERE_FFL_HERE) _isread, WHERE_FFL_HERE)
extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry, extern PGresult *_CKPQExecParams(PGconn *conn, const char *qry,
int nParams, int nParams,
const Oid *paramTypes, const Oid *paramTypes,
const char *const * paramValues, const char *const * paramValues,
@ -3627,10 +3627,10 @@ extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
const int *paramFormats, const int *paramFormats,
int resultFormat, int resultFormat,
bool isread, WHERE_FFL_ARGS); bool isread, WHERE_FFL_ARGS);
extern ExecStatusType _CKPQResultStatus(PGresult *res, WHERE_FFL_ARGS);
// Force use CKPQ... for PQ functions in use #define CKPQResultStatus(_res) _CKPQResultStatus(_res, WHERE_FFL_HERE)
#define PQexec CKPQexec extern void _CKPQClear(PGresult *res, WHERE_FFL_ARGS);
#define PQexecParams CKPQexecParams #define CKPQClear(_res) _CKPQClear(_res, WHERE_FFL_HERE)
#define PGLOG(__LOG, __str, __rescode, __conn) do { \ #define PGLOG(__LOG, __str, __rescode, __conn) do { \
char *__buf = pqerrmsg(__conn); \ char *__buf = pqerrmsg(__conn); \
@ -3644,12 +3644,16 @@ extern PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
#define PGLOGNOTICE(_str, _rescode, _conn) PGLOG(LOGNOTICE, _str, _rescode, _conn) #define PGLOGNOTICE(_str, _rescode, _conn) PGLOG(LOGNOTICE, _str, _rescode, _conn)
extern char *pqerrmsg(PGconn *conn); extern char *pqerrmsg(PGconn *conn);
extern bool CKPQConn(PGconn **conn); extern bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS);
extern void CKPQDisco(PGconn **conn, bool conned); #define CKPQConn(_conn) _CKPQConn(_conn, WHERE_FFL_HERE)
extern bool _CKPQDisco(PGconn **conn, bool conned, WHERE_FFL_ARGS);
#define CKPQDisco(_conn, _conned) _CKPQDisco(_conn, _conned, WHERE_FFL_HERE)
#define CKPQFinish(_conn) CKPQDisco(_conn, true)
extern bool _CKPQBegin(PGconn *conn, WHERE_FFL_ARGS); extern bool _CKPQBegin(PGconn *conn, WHERE_FFL_ARGS);
#define CKPQBegin(_conn) _CKPQBegin(conn, WHERE_FFL_HERE) #define CKPQBegin(_conn) _CKPQBegin(conn, WHERE_FFL_HERE)
extern void _CKPQEnd(PGconn *conn, bool commit, WHERE_FFL_ARGS); extern void _CKPQEnd(PGconn *conn, bool commit, WHERE_FFL_ARGS);
#define CKPQEnd(_conn, _commit) _CKPQEnd(_conn, _commit, WHERE_FFL_HERE) #define CKPQEnd(_conn, _commit) _CKPQEnd(_conn, _commit, WHERE_FFL_HERE)
#define CKPQCommit(_conn) _CKPQEnd(_conn, true, WHERE_FFL_HERE)
extern int64_t nextid(PGconn *conn, char *idname, int64_t increment, extern int64_t nextid(PGconn *conn, char *idname, int64_t increment,
tv_t *cd, char *by, char *code, char *inet); tv_t *cd, char *by, char *code, char *inet);

91
src/ckdb_cmd.c

@ -3872,8 +3872,6 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root, __maybe_unused tv_t *notcd, K_TREE *trf_root,
__maybe_unused bool reload_data) __maybe_unused bool reload_data)
{ {
ExecStatusType rescode;
PGresult *res;
bool conned = false; bool conned = false;
K_ITEM *t_item, *u_item, *ua_item = NULL; K_ITEM *t_item, *u_item, *ua_item = NULL;
INTRANSIENT *in_username; INTRANSIENT *in_username;
@ -3921,21 +3919,13 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
*(dot++) = '\0'; *(dot++) = '\0';
// If we already had a different one, save it to the DB // If we already had a different one, save it to the DB
if (ua_item && strcmp(useratts->attname, attname) != 0) { if (ua_item && strcmp(useratts->attname, attname) != 0) {
if (conn == NULL) { conned = CKPQConn(&conn);
conn = dbconnect(); if (!begun) {
conned = true; begun = CKPQBegin(conn);
}
if (!begun) { if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
reason = "DBERR"; reason = "DBERR";
goto bats; goto bats;
} }
begun = true;
} }
if (useratts_item_add(conn, ua_item, now, begun)) { if (useratts_item_add(conn, ua_item, now, begun)) {
ua_item = NULL; ua_item = NULL;
@ -3983,21 +3973,13 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
t_item = next_in_ktree(ctx); t_item = next_in_ktree(ctx);
} }
if (ua_item) { if (ua_item) {
if (conn == NULL) { conned = CKPQConn(&conn);
conn = dbconnect(); if (!begun) {
conned = true; begun = CKPQBegin(conn);
}
if (!begun) { if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
reason = "DBERR"; reason = "DBERR";
goto bats; goto bats;
} }
begun = true;
} }
if (!useratts_item_add(conn, ua_item, now, begun)) { if (!useratts_item_add(conn, ua_item, now, begun)) {
reason = "DBERR"; reason = "DBERR";
@ -4007,15 +3989,11 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
} }
} }
rollback: rollback:
if (!reason)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res); CKPQEnd(conn, (reason == NULL));
bats: bats:
if (conned) conned = CKPQDisco(&conn, conned);
PQfinish(conn);
if (reason) { if (reason) {
if (ua_item) { if (ua_item) {
K_WLOCK(useratts_free); K_WLOCK(useratts_free);
@ -4207,8 +4185,6 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notcd, K_TREE *trf_root, __maybe_unused tv_t *notcd, K_TREE *trf_root,
__maybe_unused bool reload_data) __maybe_unused bool reload_data)
{ {
ExecStatusType rescode;
PGresult *res;
bool conned = false; bool conned = false;
K_ITEM *t_item, *oc_item = NULL, *ok = NULL; K_ITEM *t_item, *oc_item = NULL, *ok = NULL;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -4242,21 +4218,13 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value"; reason = "Missing value";
goto rollback; goto rollback;
} }
if (conn == NULL) { conned = CKPQConn(&conn);
conn = dbconnect(); if (!begun) {
conned = true; begun = CKPQBegin(conn);
}
if (!begun) { if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
reason = "DBERR"; reason = "DBERR";
goto rollback; goto rollback;
} }
begun = true;
} }
ok = optioncontrol_item_add(conn, oc_item, now, begun); ok = optioncontrol_item_add(conn, oc_item, now, begun);
oc_item = NULL; oc_item = NULL;
@ -4299,21 +4267,13 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value"; reason = "Missing value";
goto rollback; goto rollback;
} }
if (conn == NULL) { conned = CKPQConn(&conn);
conn = dbconnect(); if (!begun) {
conned = true; begun = CKPQBegin(conn);
}
if (!begun) { if (!begun) {
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
reason = "DBERR"; reason = "DBERR";
goto rollback; goto rollback;
} }
begun = true;
} }
ok = optioncontrol_item_add(conn, oc_item, now, begun); ok = optioncontrol_item_add(conn, oc_item, now, begun);
oc_item = NULL; oc_item = NULL;
@ -4325,17 +4285,10 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
} }
} }
rollback: rollback:
if (begun) { if (begun)
if (reason) CKPQEnd(conn, (reason == NULL));
res = PQexec(conn, "Rollback", CKPQ_WRITE);
else
res = PQexec(conn, "Commit", CKPQ_WRITE);
PQclear(res); conned = CKPQDisco(&conn, conned);
}
if (conned)
PQfinish(conn);
if (reason) { if (reason) {
snprintf(reply, siz, "ERR.%s", reason); snprintf(reply, siz, "ERR.%s", reason);
LOGERR("%s.%s.%s", cmd, id, reply); LOGERR("%s.%s.%s", cmd, id, reply);
@ -8386,10 +8339,7 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
if (strcasecmp(action, "store") == 0) { if (strcasecmp(action, "store") == 0) {
/* Store the shares_hi_root list in the db now, /* Store the shares_hi_root list in the db now,
* rather than wait for a shift process to do it */ * rather than wait for a shift process to do it */
if (!conn) { conned = CKPQConn(&conn);
conn = dbconnect();
conned = true;
}
count = 0; count = 0;
do { do {
did = false; did = false;
@ -8404,8 +8354,7 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
count++; count++;
} }
} while (did); } while (did);
if (conned) conned = CKPQDisco(&conn, conned);
PQfinish(conn);
if (count) { if (count) {
LOGWARNING("%s() Stored: %d high shares", LOGWARNING("%s() Stored: %d high shares",
__func__, count); __func__, count);

6
src/ckdb_data.c

@ -6397,7 +6397,7 @@ K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
bool make_markersummaries(bool msg, char *by, char *code, char *inet, bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root) tv_t *cd, K_TREE *trf_root)
{ {
PGconn *conn; PGconn *conn = NULL;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers; WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL, *s_item = NULL; K_ITEM *wm_item, *wm_last = NULL, *s_item = NULL;
@ -6427,7 +6427,7 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
return false; return false;
} }
conn = dbconnect(); CKPQConn(&conn);
/* Store all shares in the DB before processing the workmarker /* Store all shares in the DB before processing the workmarker
* This way we know that the high shares in the DB will match the start * This way we know that the high shares in the DB will match the start
@ -6486,7 +6486,7 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tvdiff(&proc_lock_fin, &proc_lock_got)); tvdiff(&proc_lock_fin, &proc_lock_got));
flailed: flailed:
PQfinish(conn); CKPQDisco(&conn, true);
if (count > 0) { if (count > 0) {
LOGWARNING("%s() Stored: %d high shares %.3fs", LOGWARNING("%s() Stored: %d high shares %.3fs",

1051
src/ckdb_dbio.c

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save