Browse Source

ckdb - fix lost db connections - and some incomplete pause code

master
kanoi 8 years ago
parent
commit
9e86ce8805
  1. 16
      src/ckdb.c
  2. 44
      src/ckdb.h
  3. 21
      src/ckdb_cmd.c
  4. 3
      src/ckdb_data.c
  5. 269
      src/ckdb_dbio.c

16
src/ckdb.c

@ -211,7 +211,7 @@ static int replier_count = 0;
static cklock_t replier_lock;
char *EMPTY = "";
const char *nullstr = "(null)";
const char *nullstr = NULLSTR;
const char *true_str = "true";
const char *false_str = "false";
@ -457,6 +457,19 @@ char *id_default = "42";
K_LIST *pgdb_free;
// Count of db connections
int pgdb_count;
__thread char *connect_file = NULLSTR;
__thread char *connect_func = NULLSTR;
__thread int connect_line = 0;
__thread bool connect_dis = true;
// Pause all DB IO (permanently)
cklock_t pgdb_pause_lock;
__thread int pause_read_count = 0;
__thread char *pause_read_file = NULLSTR;
__thread char *pause_read_func = NULLSTR;
__thread int pause_read_line = 0;
__thread bool pause_read_unlock = false;
bool pgdb_paused = false;
bool pgdb_pause_disabled = false;
// NULL or poolinstance must match
const char *sys_poolinstance = NULL;
@ -9654,6 +9667,7 @@ int main(int argc, char **argv)
cklock_init(&listener_all_lock);
cklock_init(&last_lock);
cklock_init(&btc_lock);
cklock_init(&pgdb_pause_lock);
cklock_init(&poolinstance_lock);
cklock_init(&seq_found_lock);

44
src/ckdb.h

@ -140,6 +140,7 @@ extern int btc_listener_threads_delta;
#define BLANK " "
extern char *EMPTY;
#define NULLSTR "(null)"
extern const char *nullstr;
extern const char *true_str;
@ -447,7 +448,46 @@ extern char *id_default;
// Emulate a list for lock checking
extern K_LIST *pgdb_free;
// Count of db connections
extern int pgdb_count;
extern __thread char *connect_file;
extern __thread char *connect_func;
extern __thread int connect_line;
extern __thread bool connect_dis;
/* (WHEN FINISHED) Pause all DB IO (permanently) pause.1.name=pgTABconfirm=Y
* Without confirm=Y it will return the pause state
* All DB IO commands must take out a read of this lock before
* starting anything that shouldn't be done partially
* This also means that the whole of CKDB can lock up for a short
* time if e.g. shift processing has taken the read lock shortly before
* the write lock is taken for the pause request to set the flag
* Once pgdb_paused is true, all DB IO will not access the database
* and all web access will be in read only mode i.e. no user changes
* All connections to the DB will close and thus DB changes and outages
* can then occur without affecting CKDB at all
* check the connection count with query.1.request=pg
* Shift generation is unchanged, Payout generation is permanently disabled
* To restart CKDB you must terminate it then rerun it, then CKDB will
* reload/redo all ckpool data it didn't store in the database
* The aim is to look the same as a normal running CKDB except doing no
* DB I/O - that will be deferred until CKDB is later restarted
* You can't pause CKDB until the dbload has completed, but you can
* during the CCL reload
* The function to take out the pause lock increments the thread's
* pause_read_count so each function that expects the lock to be held can
* easily test that and incorrectly calling it multiple times is tracked
* If the read lock code is called incorrectly, i.e. a code bug,
* pgdb_pause_disabled will be set to true and the pause command can no
* longer be activated, however if it was already activated, this wont
* affect anything */
extern cklock_t pgdb_pause_lock;
extern __thread int pause_read_count;
extern __thread char *pause_read_file;
extern __thread char *pause_read_func;
extern __thread int pause_read_line;
extern __thread bool pause_read_unlock;
extern bool pgdb_paused;
extern bool pgdb_pause_disabled;
// Number of seconds per poolinstance message for run
#define POOLINSTANCE_MSG_EVERY 30
@ -3681,6 +3721,10 @@ extern void _CKPQClear(PGresult *res, WHERE_FFL_ARGS);
#define PGLOGEMERG(_str, _rescode, _conn) PGLOG(LOGEMERG, _str, _rescode, _conn)
#define PGLOGNOTICE(_str, _rescode, _conn) PGLOG(LOGNOTICE, _str, _rescode, _conn)
extern void _pause_read_lock(WHERE_FFL_ARGS);
#define pause_read_lock() _pause_read_lock(WHERE_FFL_HERE)
extern void _pause_read_unlock(WHERE_FFL_ARGS);
#define pause_read_unlock() _pause_read_unlock(WHERE_FFL_HERE)
extern char *pqerrmsg(PGconn *conn);
extern bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS);
#define CKPQConn(_conn) _CKPQConn(_conn, WHERE_FFL_HERE)

21
src/ckdb_cmd.c

@ -3919,7 +3919,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
*(dot++) = '\0';
// If we already had a different one, save it to the DB
if (ua_item && strcmp(useratts->attname, attname) != 0) {
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
begun = CKPQBegin(conn);
if (!begun) {
@ -3973,7 +3974,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
t_item = next_in_ktree(ctx);
}
if (ua_item) {
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
begun = CKPQBegin(conn);
if (!begun) {
@ -3993,7 +3995,7 @@ rollback:
CKPQEnd(conn, (reason == NULL));
bats:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (reason) {
if (ua_item) {
K_WLOCK(useratts_free);
@ -4218,7 +4220,8 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value";
goto rollback;
}
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
begun = CKPQBegin(conn);
if (!begun) {
@ -4267,7 +4270,8 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
reason = "Missing value";
goto rollback;
}
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
begun = CKPQBegin(conn);
if (!begun) {
@ -4288,7 +4292,7 @@ rollback:
if (begun)
CKPQEnd(conn, (reason == NULL));
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (reason) {
snprintf(reply, siz, "ERR.%s", reason);
LOGERR("%s.%s.%s", cmd, id, reply);
@ -8479,7 +8483,8 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
if (strcasecmp(action, "store") == 0) {
/* Store the shares_hi_root list in the db now,
* rather than wait for a shift process to do it */
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
count = 0;
do {
did = false;
@ -8494,7 +8499,7 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
count++;
}
} while (did);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (count) {
LOGWARNING("%s() Stored: %d high shares",
__func__, count);

3
src/ckdb_data.c

@ -4973,7 +4973,8 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd)
FLDSEP, cd_buf);
DUP_POINTER(payouts_free, payouts->stats, &buf[0]);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
begun = CKPQBegin(conn);
if (!begun)
goto shazbot;

269
src/ckdb_dbio.c

@ -9,10 +9,60 @@
#include "ckdb.h"
void _pause_read_lock(WHERE_FFL_ARGS)
{
if (pgdb_pause_disabled)
return;
if (pause_read_count > 0) {
LOGEMERG("%s() ERR lock >0 (%d) (%s/%s/%d/%c) pause disabled"
WHERE_FFL,
__func__, pause_read_count, pause_read_file,
pause_read_func, pause_read_line,
pause_read_unlock ? 'U' : 'L', WHERE_FFL_PASS);
pgdb_pause_disabled = true;
return;
}
ck_rlock(&pgdb_pause_lock);
pause_read_count++;
pause_read_file = (char *)file;
pause_read_func = (char *)func;
pause_read_line = line;
pause_read_unlock = false;
}
void _pause_read_unlock(WHERE_FFL_ARGS)
{
if (pgdb_pause_disabled)
return;
if (pause_read_count != 1) {
LOGEMERG("%s() ERR lock !=1 (%d) (%s/%s/%d/%c) pause disabled"
WHERE_FFL,
__func__, pause_read_count, pause_read_file,
pause_read_func, pause_read_line,
pause_read_unlock ? 'U' : 'L', WHERE_FFL_PASS);
pgdb_pause_disabled = true;
return;
}
ck_runlock(&pgdb_pause_lock);
pause_read_count--;
pause_read_file = (char *)file;
pause_read_func = (char *)func;
pause_read_line = line;
pause_read_unlock = true;
}
char *pqerrmsg(PGconn *conn)
{
char *ptr, *buf = strdup(PQerrorMessage(conn));
char *ptr, *buf;
if (pgdb_paused)
return strdup("pgdb_paused");
buf = strdup(PQerrorMessage(conn));
if (!buf)
quithere(1, "malloc OOM");
ptr = buf + strlen(buf) - 1;
@ -36,6 +86,8 @@ char *pqerrmsg(PGconn *conn)
#define FETCHTICK 100000
#define CKPQFUNDEF -1
#define CKPQ_VAL_FLD(__res, __row, __num, __name, __fld, __ok) do { \
if (pgdb_paused) \
break; \
if (__num == CKPQFUNDEF) { \
__num = PQfnumber(__res, __name); \
if (__num == CKPQFUNDEF) { \
@ -76,6 +128,8 @@ char *pqerrmsg(PGconn *conn)
#define HISTORYDATEFLDS(_res, _row, _data, _ok) do { \
char *_fld; \
if (pgdb_paused) \
break; \
CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \
if (!_ok) \
break; \
@ -101,6 +155,8 @@ char *pqerrmsg(PGconn *conn)
#define HISTORYDATEIN(_res, _row, _data, _ok) do { \
char *_fld; \
if (pgdb_paused) \
break; \
CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \
if (!_ok) \
break; \
@ -153,6 +209,8 @@ char *pqerrmsg(PGconn *conn)
#define MODIFYDATEIN(_res, _row, _data, _ok) do { \
char *_fld; \
if (pgdb_paused) \
break; \
CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \
if (!_ok) \
break; \
@ -228,6 +286,8 @@ char *pqerrmsg(PGconn *conn)
#define SIMPLEDATEFLDS(_res, _row, _data, _ok) do { \
char *_fld; \
if (pgdb_paused) \
break; \
CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \
if (!_ok) \
break; \
@ -320,6 +380,9 @@ char *pqerrmsg(PGconn *conn)
// Bug check to ensure no unexpected write txns occur
PGresult *_CKPQExec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
{
if (pgdb_paused)
return NULL;
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
@ -370,6 +433,9 @@ PGresult *_CKPQExecParams(PGconn *conn, const char *qry,
int resultFormat,
bool isread, WHERE_FFL_ARGS)
{
if (pgdb_paused)
return NULL;
// It would slow it down, but could check qry for insert/update/...
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
@ -421,22 +487,44 @@ PGresult *_CKPQExecParams(PGconn *conn, const char *qry,
ExecStatusType _CKPQResultStatus(PGresult *res, WHERE_FFL_ARGS)
{
if (pgdb_paused)
return PGRES_COMMAND_OK;
return PQresultStatus(res);
}
void _CKPQClear(PGresult *res, WHERE_FFL_ARGS)
{
if (!pgdb_paused)
PQclear(res);
}
bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS)
{
if (*conn == NULL) {
if (connect_dis == false) {
LOGEMERG("%s() ERR already (%s/%s/%d)" WHERE_FFL
#if LOCK_CHECK
" @%s"
#endif
, __func__, connect_file, connect_func,
connect_line, WHERE_FFL_PASS
#if LOCK_CHECK
, my_thread_name
#endif
);
}
if (!pgdb_paused) {
LOGDEBUG("%s(): connecting", __func__);
*conn = dbconnect();
K_WLOCK(pgdb_free);
pgdb_count++;
K_WUNLOCK(pgdb_free);
connect_file = (char *)file;
connect_func = (char *)func;
connect_line = line;
connect_dis = false;
}
return true;
}
return false;
@ -451,6 +539,10 @@ bool _CKPQDisco(PGconn **conn, bool conned, WHERE_FFL_ARGS)
K_WLOCK(pgdb_free);
pgdb_count--;
K_WUNLOCK(pgdb_free);
connect_file = (char *)file;
connect_func = (char *)func;
connect_line = line;
connect_dis = true;
}
return false;
}
@ -460,6 +552,9 @@ bool _CKPQBegin(PGconn *conn, WHERE_FFL_ARGS)
ExecStatusType rescode;
PGresult *res;
if (pgdb_paused)
return true;
res = _CKPQExec(conn, "Begin", CKPQ_WRITE, WHERE_FFL_PASS);
rescode = _CKPQResultStatus(res, WHERE_FFL_PASS);
_CKPQClear(res, WHERE_FFL_PASS);
@ -479,6 +574,9 @@ void _CKPQEnd(PGconn *conn, bool commit, WHERE_FFL_ARGS)
ExecStatusType rescode;
PGresult *res;
if (pgdb_paused)
return;
if (commit) {
LOGDEBUG("%s(): commit", __func__);
res = _CKPQExec(conn, "Commit", CKPQ_WRITE, WHERE_FFL_PASS);
@ -526,7 +624,8 @@ int64_t nextid(PGconn *conn, char *idname, int64_t increment,
"where idname='%s' for update",
idname);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExec(conn, qry, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
if (!PGOK(rescode)) {
@ -590,7 +689,7 @@ int64_t nextid(PGconn *conn, char *idname, int64_t increment,
free(params[n]);
cleanup:
K_WUNLOCK(idcontrol_free);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
return lastid;
}
@ -612,6 +711,9 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash,
LOGDEBUG("%s(): change", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
if (oldhash != NULL)
hash = true;
else
@ -654,7 +756,8 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash,
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!CKPQBegin(conn))
goto unparam;
@ -704,7 +807,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -748,6 +851,9 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress,
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused && userbits != USER_MISSING)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
/* 2 attempts to add the same user at the same time will only do it once
* The 2nd attempt will get back the data provided by the 1st
* and thus throw away any differences in the 2nd */
@ -842,7 +948,8 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress,
"secondaryuserid,salt,userdata,userbits"
HISTORYDATECONTROL ") values (" PQPARAM15 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -853,7 +960,7 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress,
ok = true;
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
unitem:
@ -893,6 +1000,9 @@ bool users_replace(PGconn *conn, K_ITEM *u_item, K_ITEM *old_u_item, char *by,
LOGDEBUG("%s(): replace", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
DATA_USERS(users, u_item);
DATA_USERS(old_users, old_u_item);
@ -906,7 +1016,8 @@ bool users_replace(PGconn *conn, K_ITEM *u_item, K_ITEM *old_u_item, char *by,
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!CKPQBegin(conn))
goto unparam;
@ -954,7 +1065,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -1154,6 +1265,9 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun)
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
DATA_USERATTS(useratts, ua_item);
K_RLOCK(useratts_free);
@ -1164,7 +1278,8 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun)
/* N.B. the values of the old ua_item record, if it exists,
* are completely ignored i.e. you must provide all values required */
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
if (!CKPQBegin(conn))
goto unparam;
@ -1224,7 +1339,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -1259,6 +1374,9 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname,
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
K_WLOCK(useratts_free);
item = k_unlink_head(useratts_free);
K_WUNLOCK(useratts_free);
@ -1339,6 +1457,9 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd)
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
DATA_USERATTS(useratts, ua_item);
/* This is pointless if ua_item is part of the tree, however,
@ -1350,7 +1471,8 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd)
if (item) {
DATA_USERATTS(useratts, item);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
upd = "update useratts set "EDDB"=$1 where userid=$2 and "
"attname=$3 and "EDDB"=$4";
par = 0;
@ -1371,7 +1493,7 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd)
ok = true;
unparam:
if (par) {
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
}
@ -1550,7 +1672,8 @@ K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, bool add_ws,
DATA_WORKERS(row, item);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
bzero(row, sizeof(*row));
row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet);
if (row->workerid == 0)
@ -1630,7 +1753,7 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
unitem:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
K_WLOCK(workers_free);
if (!ret)
k_add_head(workers_free, item);
@ -1679,6 +1802,9 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
LOGDEBUG("%s(): update", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
/* Two attempts to update the same worker at the same time
* will determine the final state based on which gets the lock last,
* i.e. randomly, but without overwriting at the same time */
@ -1736,7 +1862,8 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!CKPQBegin(conn))
goto unparam;
@ -1781,7 +1908,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
early:
@ -1975,11 +2102,15 @@ bool paymentaddresses_set(PGconn *conn, int64_t userid, K_STORE *pa_store,
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
// Quick early abort
if (pa_store->count > ABS_ADDR_LIMIT)
return false;
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
/* This means the nextid updates will rollback on an error, but also
* means that it will lock the nextid record for the whole update */
if (!CKPQBegin(conn))
@ -2122,7 +2253,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
FREENULL(upd);
@ -2340,7 +2471,8 @@ bool payments_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item,
*old_p_item = find_payments(row->payoutid, row->userid, row->in_subname);
K_RUNLOCK(payments_free);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!already) {
begun = CKPQBegin(conn);
if (!begun)
@ -2426,7 +2558,7 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (!already)
payments_add_ram(ok, p_item, *old_p_item, cd);
@ -2646,7 +2778,8 @@ bool idcontrol_add(PGconn *conn, char *idname, char *idvalue, char *by,
ins = "insert into idcontrol "
"(idname,lastid" MODIFYDATECONTROL ") values (" PQPARAM10 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -2657,7 +2790,7 @@ bool idcontrol_add(PGconn *conn, char *idname, char *idvalue, char *by,
ok = true;
foil:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -3087,6 +3220,9 @@ K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool beg
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
DATA_OPTIONCONTROL(row, oc_item);
// Enforce the rule that switch_state isn't date/height controlled
@ -3102,7 +3238,8 @@ K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool beg
old_item = find_in_ktree(optioncontrol_root, &look, ctx);
K_RUNLOCK(optioncontrol_free);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
if (!CKPQBegin(conn))
goto nostart;
@ -3160,7 +3297,7 @@ rollback:
CKPQEnd(conn, ok);
nostart:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -3202,6 +3339,9 @@ K_ITEM *optioncontrol_add(PGconn *conn, char *optionname, char *optionvalue,
LOGDEBUG("%s(): add", __func__);
if (pgdb_paused)
LOGEMERG("ERR: %s() called when paused - data lost", __func__);
K_WLOCK(optioncontrol_free);
item = k_unlink_head(optioncontrol_free);
K_WUNLOCK(optioncontrol_free);
@ -3450,7 +3590,8 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr,
"prevhash,coinbase1,coinbase2,version,bits,ntime,reward"
HISTORYDATECONTROL ") values (" PQPARAM16 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -3464,7 +3605,7 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr,
unparam:
if (par) {
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
}
@ -4232,7 +4373,8 @@ bool shares_db(PGconn *conn, K_ITEM *s_item)
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff,address,"
"agent" HISTORYDATECONTROL ") values (" PQPARAM21 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
if (!PGOK(rescode)) {
@ -4250,7 +4392,7 @@ bool shares_db(PGconn *conn, K_ITEM *s_item)
unparam:
if (par) {
CKPQClear(res);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
}
@ -5215,7 +5357,8 @@ dokey:
setnow(&kadd_fin);
setnow(&db_stt);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!CKPQBegin(conn)) {
setnow(&db_fin);
goto flail;
@ -5264,7 +5407,7 @@ rollback:
CKPQEnd(conn, ok);
flail:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (reason) {
// already displayed the full workmarkers detail at the top
@ -5522,7 +5665,8 @@ bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm)
del = "delete from markersummary where markerid=$1";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
if (!PGOK(rescode)) {
@ -5549,7 +5693,7 @@ bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm)
unparam:
CKPQClear(res);
flail:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (!ok) {
if (del_markersummary_store && del_markersummary_store->count) {
@ -6038,7 +6182,8 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!CKPQBegin(conn))
goto unparam;
@ -6093,7 +6238,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -6246,7 +6391,8 @@ bool blocks_add(PGconn *conn, int32_t height, char *blockhash,
"statsconfirmed"
HISTORYDATECONTROL ") values (" PQPARAM23 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -6324,7 +6470,8 @@ bool blocks_add(PGconn *conn, int32_t height, char *blockhash,
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
// New is mostly a copy of the old
copy_blocks(row, oldblocks);
STRNCPY(row->confirmed, confirmed);
@ -6464,7 +6611,7 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
flail:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
K_RLOCK(workinfo_free);
K_WLOCK(blocks_free);
@ -6810,7 +6957,8 @@ bool miningpayouts_add(PGconn *conn, bool add, K_ITEM *mp_item,
*old_mp_item = find_miningpayouts(row->payoutid, row->userid);
K_RUNLOCK(miningpayouts_free);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!already) {
begun = CKPQBegin(conn);
if (!begun)
@ -6879,7 +7027,7 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (!already)
miningpayouts_add_ram(ok, mp_item, *old_mp_item, cd);
@ -7090,7 +7238,8 @@ bool payouts_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item,
*old_p_item = find_payouts(row->height, row->blockhash);
K_RUNLOCK(payouts_free);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!already) {
begun = CKPQBegin(conn);
if (!begun)
@ -7179,7 +7328,7 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (!already)
payouts_add_ram(ok, p_item, *old_p_item, cd);
@ -7221,7 +7370,8 @@ K_ITEM *payouts_full_expire(PGconn *conn, int64_t payoutid, tv_t *now, bool lock
goto matane;
}
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
begun = CKPQBegin(conn);
if (!begun)
goto matane;
@ -7420,7 +7570,7 @@ matane:
K_WUNLOCK(payouts_free);
}
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
if (lock)
K_WUNLOCK(process_pplns_free);
@ -8128,7 +8278,8 @@ bool poolstats_add(PGconn *conn, bool store, INTRANSIENT *in_poolinstance,
"hashrate5m,hashrate1hr,hashrate24hr"
SIMPLEDATECONTROL ") values (" PQPARAM12 ")";
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
if (!PGOK(rescode)) {
@ -8151,7 +8302,7 @@ bool poolstats_add(PGconn *conn, bool store, INTRANSIENT *in_poolinstance,
unparam:
if (store) {
CKPQClear(res);
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
}
@ -8586,7 +8737,8 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
row->diffacc);
FREENULL(st);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -8597,7 +8749,7 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
ok = true;
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -9031,7 +9183,8 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code,
row->diffacc);
FREENULL(st);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = CKPQResultStatus(res);
CKPQClear(res);
@ -9046,7 +9199,7 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code,
ok = true;
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -9103,7 +9256,8 @@ bool _workmarkers_process(PGconn *conn, bool already, bool add,
LOGDEBUG("%s(): updating old", __func__);
DATA_WORKMARKERS(oldworkmarkers, old_wm_item);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!already) {
begun = CKPQBegin(conn);
if (!begun)
@ -9157,7 +9311,8 @@ bool _workmarkers_process(PGconn *conn, bool already, bool add,
DATA_WORKMARKERS(row, wm_item);
bzero(row, sizeof(*row));
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!already && !begun) {
begun = CKPQBegin(conn);
if (!begun)
@ -9215,7 +9370,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);
@ -9461,7 +9616,8 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance,
LOGDEBUG("%s(): updating old", __func__);
DATA_MARKS(oldmarks, old_m_item);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
begun = CKPQBegin(conn);
if (!begun)
goto unparam;
@ -9533,7 +9689,8 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance,
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
conned = CKPQConn(&conn);
if (CKPQConn(&conn))
conned = true;
if (!begun) {
begun = CKPQBegin(conn);
if (!begun)
@ -9555,7 +9712,7 @@ rollback:
CKPQEnd(conn, ok);
unparam:
conned = CKPQDisco(&conn, conned);
CKPQDisco(&conn, conned);
for (n = 0; n < par; n++)
free(params[n]);

Loading…
Cancel
Save