From 9e86ce8805e29e460bf2c7cdde7f499f64213dfe Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 20 Oct 2016 21:32:52 +1100 Subject: [PATCH] ckdb - fix lost db connections - and some incomplete pause code --- src/ckdb.c | 16 ++- src/ckdb.h | 44 ++++++++ src/ckdb_cmd.c | 21 ++-- src/ckdb_data.c | 3 +- src/ckdb_dbio.c | 281 +++++++++++++++++++++++++++++++++++++----------- 5 files changed, 293 insertions(+), 72 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 824db5f0..2ebebbb8 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -211,7 +211,7 @@ static int replier_count = 0; static cklock_t replier_lock; char *EMPTY = ""; -const char *nullstr = "(null)"; +const char *nullstr = NULLSTR; const char *true_str = "true"; const char *false_str = "false"; @@ -457,6 +457,19 @@ char *id_default = "42"; K_LIST *pgdb_free; // Count of db connections int pgdb_count; +__thread char *connect_file = NULLSTR; +__thread char *connect_func = NULLSTR; +__thread int connect_line = 0; +__thread bool connect_dis = true; +// Pause all DB IO (permanently) +cklock_t pgdb_pause_lock; +__thread int pause_read_count = 0; +__thread char *pause_read_file = NULLSTR; +__thread char *pause_read_func = NULLSTR; +__thread int pause_read_line = 0; +__thread bool pause_read_unlock = false; +bool pgdb_paused = false; +bool pgdb_pause_disabled = false; // NULL or poolinstance must match const char *sys_poolinstance = NULL; @@ -9654,6 +9667,7 @@ int main(int argc, char **argv) cklock_init(&listener_all_lock); cklock_init(&last_lock); cklock_init(&btc_lock); + cklock_init(&pgdb_pause_lock); cklock_init(&poolinstance_lock); cklock_init(&seq_found_lock); diff --git a/src/ckdb.h b/src/ckdb.h index 5e163394..1a9419fd 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -140,6 +140,7 @@ extern int btc_listener_threads_delta; #define BLANK " " extern char *EMPTY; +#define NULLSTR "(null)" extern const char *nullstr; extern const char *true_str; @@ -447,7 +448,46 @@ extern char *id_default; // Emulate a list for lock checking extern K_LIST *pgdb_free; +// Count of db connections extern int pgdb_count; +extern __thread char *connect_file; +extern __thread char *connect_func; +extern __thread int connect_line; +extern __thread bool connect_dis; +/* (WHEN FINISHED) Pause all DB IO (permanently) pause.1.name=pgTABconfirm=Y + * Without confirm=Y it will return the pause state + * All DB IO commands must take out a read of this lock before + * starting anything that shouldn't be done partially + * This also means that the whole of CKDB can lock up for a short + * time if e.g. shift processing has taken the read lock shortly before + * the write lock is taken for the pause request to set the flag + * Once pgdb_paused is true, all DB IO will not access the database + * and all web access will be in read only mode i.e. no user changes + * All connections to the DB will close and thus DB changes and outages + * can then occur without affecting CKDB at all + * check the connection count with query.1.request=pg + * Shift generation is unchanged, Payout generation is permanently disabled + * To restart CKDB you must terminate it then rerun it, then CKDB will + * reload/redo all ckpool data it didn't store in the database + * The aim is to look the same as a normal running CKDB except doing no + * DB I/O - that will be deferred until CKDB is later restarted + * You can't pause CKDB until the dbload has completed, but you can + * during the CCL reload + * The function to take out the pause lock increments the thread's + * pause_read_count so each function that expects the lock to be held can + * easily test that and incorrectly calling it multiple times is tracked + * If the read lock code is called incorrectly, i.e. a code bug, + * pgdb_pause_disabled will be set to true and the pause command can no + * longer be activated, however if it was already activated, this wont + * affect anything */ +extern cklock_t pgdb_pause_lock; +extern __thread int pause_read_count; +extern __thread char *pause_read_file; +extern __thread char *pause_read_func; +extern __thread int pause_read_line; +extern __thread bool pause_read_unlock; +extern bool pgdb_paused; +extern bool pgdb_pause_disabled; // Number of seconds per poolinstance message for run #define POOLINSTANCE_MSG_EVERY 30 @@ -3681,6 +3721,10 @@ extern void _CKPQClear(PGresult *res, WHERE_FFL_ARGS); #define PGLOGEMERG(_str, _rescode, _conn) PGLOG(LOGEMERG, _str, _rescode, _conn) #define PGLOGNOTICE(_str, _rescode, _conn) PGLOG(LOGNOTICE, _str, _rescode, _conn) +extern void _pause_read_lock(WHERE_FFL_ARGS); +#define pause_read_lock() _pause_read_lock(WHERE_FFL_HERE) +extern void _pause_read_unlock(WHERE_FFL_ARGS); +#define pause_read_unlock() _pause_read_unlock(WHERE_FFL_HERE) extern char *pqerrmsg(PGconn *conn); extern bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS); #define CKPQConn(_conn) _CKPQConn(_conn, WHERE_FFL_HERE) diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index b4ac3730..b64f2e43 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -3919,7 +3919,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, *(dot++) = '\0'; // If we already had a different one, save it to the DB if (ua_item && strcmp(useratts->attname, attname) != 0) { - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { begun = CKPQBegin(conn); if (!begun) { @@ -3973,7 +3974,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, t_item = next_in_ktree(ctx); } if (ua_item) { - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { begun = CKPQBegin(conn); if (!begun) { @@ -3993,7 +3995,7 @@ rollback: CKPQEnd(conn, (reason == NULL)); bats: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (reason) { if (ua_item) { K_WLOCK(useratts_free); @@ -4218,7 +4220,8 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id, reason = "Missing value"; goto rollback; } - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { begun = CKPQBegin(conn); if (!begun) { @@ -4267,7 +4270,8 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id, reason = "Missing value"; goto rollback; } - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { begun = CKPQBegin(conn); if (!begun) { @@ -4288,7 +4292,7 @@ rollback: if (begun) CKPQEnd(conn, (reason == NULL)); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (reason) { snprintf(reply, siz, "ERR.%s", reason); LOGERR("%s.%s.%s", cmd, id, reply); @@ -8479,7 +8483,8 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id, if (strcasecmp(action, "store") == 0) { /* Store the shares_hi_root list in the db now, * rather than wait for a shift process to do it */ - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; count = 0; do { did = false; @@ -8494,7 +8499,7 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id, count++; } } while (did); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (count) { LOGWARNING("%s() Stored: %d high shares", __func__, count); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 9a7323d5..d7eba801 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -4973,7 +4973,8 @@ bool process_pplns(int32_t height, char *blockhash, tv_t *addr_cd) FLDSEP, cd_buf); DUP_POINTER(payouts_free, payouts->stats, &buf[0]); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; begun = CKPQBegin(conn); if (!begun) goto shazbot; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 2acfabe1..8726892b 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -9,10 +9,60 @@ #include "ckdb.h" +void _pause_read_lock(WHERE_FFL_ARGS) +{ + if (pgdb_pause_disabled) + return; + + if (pause_read_count > 0) { + LOGEMERG("%s() ERR lock >0 (%d) (%s/%s/%d/%c) pause disabled" + WHERE_FFL, + __func__, pause_read_count, pause_read_file, + pause_read_func, pause_read_line, + pause_read_unlock ? 'U' : 'L', WHERE_FFL_PASS); + pgdb_pause_disabled = true; + return; + } + + ck_rlock(&pgdb_pause_lock); + pause_read_count++; + pause_read_file = (char *)file; + pause_read_func = (char *)func; + pause_read_line = line; + pause_read_unlock = false; +} + +void _pause_read_unlock(WHERE_FFL_ARGS) +{ + if (pgdb_pause_disabled) + return; + + if (pause_read_count != 1) { + LOGEMERG("%s() ERR lock !=1 (%d) (%s/%s/%d/%c) pause disabled" + WHERE_FFL, + __func__, pause_read_count, pause_read_file, + pause_read_func, pause_read_line, + pause_read_unlock ? 'U' : 'L', WHERE_FFL_PASS); + pgdb_pause_disabled = true; + return; + } + + ck_runlock(&pgdb_pause_lock); + pause_read_count--; + pause_read_file = (char *)file; + pause_read_func = (char *)func; + pause_read_line = line; + pause_read_unlock = true; +} + char *pqerrmsg(PGconn *conn) { - char *ptr, *buf = strdup(PQerrorMessage(conn)); + char *ptr, *buf; + + if (pgdb_paused) + return strdup("pgdb_paused"); + buf = strdup(PQerrorMessage(conn)); if (!buf) quithere(1, "malloc OOM"); ptr = buf + strlen(buf) - 1; @@ -36,6 +86,8 @@ char *pqerrmsg(PGconn *conn) #define FETCHTICK 100000 #define CKPQFUNDEF -1 #define CKPQ_VAL_FLD(__res, __row, __num, __name, __fld, __ok) do { \ + if (pgdb_paused) \ + break; \ if (__num == CKPQFUNDEF) { \ __num = PQfnumber(__res, __name); \ if (__num == CKPQFUNDEF) { \ @@ -76,6 +128,8 @@ char *pqerrmsg(PGconn *conn) #define HISTORYDATEFLDS(_res, _row, _data, _ok) do { \ char *_fld; \ + if (pgdb_paused) \ + break; \ CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \ if (!_ok) \ break; \ @@ -101,6 +155,8 @@ char *pqerrmsg(PGconn *conn) #define HISTORYDATEIN(_res, _row, _data, _ok) do { \ char *_fld; \ + if (pgdb_paused) \ + break; \ CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \ if (!_ok) \ break; \ @@ -153,6 +209,8 @@ char *pqerrmsg(PGconn *conn) #define MODIFYDATEIN(_res, _row, _data, _ok) do { \ char *_fld; \ + if (pgdb_paused) \ + break; \ CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \ if (!_ok) \ break; \ @@ -228,6 +286,8 @@ char *pqerrmsg(PGconn *conn) #define SIMPLEDATEFLDS(_res, _row, _data, _ok) do { \ char *_fld; \ + if (pgdb_paused) \ + break; \ CKPQ_VAL_FLD_tail(_res, _row, _CDDB, _fld, _ok); \ if (!_ok) \ break; \ @@ -320,6 +380,9 @@ char *pqerrmsg(PGconn *conn) // Bug check to ensure no unexpected write txns occur PGresult *_CKPQExec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS) { + if (pgdb_paused) + return NULL; + // It would slow it down, but could check qry for insert/update/... if (!isread && confirm_sharesummary) quitfrom(1, file, func, line, "BUG: write txn during confirm"); @@ -370,6 +433,9 @@ PGresult *_CKPQExecParams(PGconn *conn, const char *qry, int resultFormat, bool isread, WHERE_FFL_ARGS) { + if (pgdb_paused) + return NULL; + // It would slow it down, but could check qry for insert/update/... if (!isread && confirm_sharesummary) quitfrom(1, file, func, line, "BUG: write txn during confirm"); @@ -421,22 +487,44 @@ PGresult *_CKPQExecParams(PGconn *conn, const char *qry, ExecStatusType _CKPQResultStatus(PGresult *res, WHERE_FFL_ARGS) { + if (pgdb_paused) + return PGRES_COMMAND_OK; + return PQresultStatus(res); } void _CKPQClear(PGresult *res, WHERE_FFL_ARGS) { - PQclear(res); + if (!pgdb_paused) + PQclear(res); } bool _CKPQConn(PGconn **conn, WHERE_FFL_ARGS) { if (*conn == NULL) { - LOGDEBUG("%s(): connecting", __func__); - *conn = dbconnect(); - K_WLOCK(pgdb_free); - pgdb_count++; - K_WUNLOCK(pgdb_free); + if (connect_dis == false) { + LOGEMERG("%s() ERR already (%s/%s/%d)" WHERE_FFL +#if LOCK_CHECK + " @%s" +#endif + , __func__, connect_file, connect_func, + connect_line, WHERE_FFL_PASS +#if LOCK_CHECK + , my_thread_name +#endif + ); + } + if (!pgdb_paused) { + LOGDEBUG("%s(): connecting", __func__); + *conn = dbconnect(); + K_WLOCK(pgdb_free); + pgdb_count++; + K_WUNLOCK(pgdb_free); + connect_file = (char *)file; + connect_func = (char *)func; + connect_line = line; + connect_dis = false; + } return true; } return false; @@ -451,6 +539,10 @@ bool _CKPQDisco(PGconn **conn, bool conned, WHERE_FFL_ARGS) K_WLOCK(pgdb_free); pgdb_count--; K_WUNLOCK(pgdb_free); + connect_file = (char *)file; + connect_func = (char *)func; + connect_line = line; + connect_dis = true; } return false; } @@ -460,6 +552,9 @@ bool _CKPQBegin(PGconn *conn, WHERE_FFL_ARGS) ExecStatusType rescode; PGresult *res; + if (pgdb_paused) + return true; + res = _CKPQExec(conn, "Begin", CKPQ_WRITE, WHERE_FFL_PASS); rescode = _CKPQResultStatus(res, WHERE_FFL_PASS); _CKPQClear(res, WHERE_FFL_PASS); @@ -479,6 +574,9 @@ void _CKPQEnd(PGconn *conn, bool commit, WHERE_FFL_ARGS) ExecStatusType rescode; PGresult *res; + if (pgdb_paused) + return; + if (commit) { LOGDEBUG("%s(): commit", __func__); res = _CKPQExec(conn, "Commit", CKPQ_WRITE, WHERE_FFL_PASS); @@ -526,7 +624,8 @@ int64_t nextid(PGconn *conn, char *idname, int64_t increment, "where idname='%s' for update", idname); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExec(conn, qry, CKPQ_WRITE); rescode = CKPQResultStatus(res); if (!PGOK(rescode)) { @@ -590,7 +689,7 @@ int64_t nextid(PGconn *conn, char *idname, int64_t increment, free(params[n]); cleanup: K_WUNLOCK(idcontrol_free); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); return lastid; } @@ -612,6 +711,9 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash, LOGDEBUG("%s(): change", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + if (oldhash != NULL) hash = true; else @@ -654,7 +756,8 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!CKPQBegin(conn)) goto unparam; @@ -704,7 +807,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -748,6 +851,9 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress, LOGDEBUG("%s(): add", __func__); + if (pgdb_paused && userbits != USER_MISSING) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + /* 2 attempts to add the same user at the same time will only do it once * The 2nd attempt will get back the data provided by the 1st * and thus throw away any differences in the 2nd */ @@ -842,7 +948,8 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress, "secondaryuserid,salt,userdata,userbits" HISTORYDATECONTROL ") values (" PQPARAM15 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -853,7 +960,7 @@ K_ITEM *users_add(PGconn *conn, INTRANSIENT *in_username, char *emailaddress, ok = true; unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); unitem: @@ -893,6 +1000,9 @@ bool users_replace(PGconn *conn, K_ITEM *u_item, K_ITEM *old_u_item, char *by, LOGDEBUG("%s(): replace", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + DATA_USERS(users, u_item); DATA_USERS(old_users, old_u_item); @@ -906,7 +1016,8 @@ bool users_replace(PGconn *conn, K_ITEM *u_item, K_ITEM *old_u_item, char *by, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!CKPQBegin(conn)) goto unparam; @@ -954,7 +1065,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -1154,6 +1265,9 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun) LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + DATA_USERATTS(useratts, ua_item); K_RLOCK(useratts_free); @@ -1164,7 +1278,8 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun) /* N.B. the values of the old ua_item record, if it exists, * are completely ignored i.e. you must provide all values required */ - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { if (!CKPQBegin(conn)) goto unparam; @@ -1224,7 +1339,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -1259,6 +1374,9 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname, LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + K_WLOCK(useratts_free); item = k_unlink_head(useratts_free); K_WUNLOCK(useratts_free); @@ -1339,6 +1457,9 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd) LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + DATA_USERATTS(useratts, ua_item); /* This is pointless if ua_item is part of the tree, however, @@ -1350,7 +1471,8 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd) if (item) { DATA_USERATTS(useratts, item); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; upd = "update useratts set "EDDB"=$1 where userid=$2 and " "attname=$3 and "EDDB"=$4"; par = 0; @@ -1371,7 +1493,7 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd) ok = true; unparam: if (par) { - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); } @@ -1550,7 +1672,8 @@ K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, bool add_ws, DATA_WORKERS(row, item); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; bzero(row, sizeof(*row)); row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet); if (row->workerid == 0) @@ -1630,7 +1753,7 @@ unparam: for (n = 0; n < par; n++) free(params[n]); unitem: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); K_WLOCK(workers_free); if (!ret) k_add_head(workers_free, item); @@ -1679,6 +1802,9 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, LOGDEBUG("%s(): update", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + /* Two attempts to update the same worker at the same time * will determine the final state based on which gets the lock last, * i.e. randomly, but without overwriting at the same time */ @@ -1736,7 +1862,8 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!CKPQBegin(conn)) goto unparam; @@ -1781,7 +1908,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); early: @@ -1975,11 +2102,15 @@ bool paymentaddresses_set(PGconn *conn, int64_t userid, K_STORE *pa_store, LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + // Quick early abort if (pa_store->count > ABS_ADDR_LIMIT) return false; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; /* This means the nextid updates will rollback on an error, but also * means that it will lock the nextid record for the whole update */ if (!CKPQBegin(conn)) @@ -2122,7 +2253,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); FREENULL(upd); @@ -2340,7 +2471,8 @@ bool payments_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item, *old_p_item = find_payments(row->payoutid, row->userid, row->in_subname); K_RUNLOCK(payments_free); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!already) { begun = CKPQBegin(conn); if (!begun) @@ -2426,7 +2558,7 @@ unparam: for (n = 0; n < par; n++) free(params[n]); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (!already) payments_add_ram(ok, p_item, *old_p_item, cd); @@ -2646,7 +2778,8 @@ bool idcontrol_add(PGconn *conn, char *idname, char *idvalue, char *by, ins = "insert into idcontrol " "(idname,lastid" MODIFYDATECONTROL ") values (" PQPARAM10 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -2657,7 +2790,7 @@ bool idcontrol_add(PGconn *conn, char *idname, char *idvalue, char *by, ok = true; foil: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -3087,6 +3220,9 @@ K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool beg LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + DATA_OPTIONCONTROL(row, oc_item); // Enforce the rule that switch_state isn't date/height controlled @@ -3102,7 +3238,8 @@ K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool beg old_item = find_in_ktree(optioncontrol_root, &look, ctx); K_RUNLOCK(optioncontrol_free); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { if (!CKPQBegin(conn)) goto nostart; @@ -3160,7 +3297,7 @@ rollback: CKPQEnd(conn, ok); nostart: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -3202,6 +3339,9 @@ K_ITEM *optioncontrol_add(PGconn *conn, char *optionname, char *optionvalue, LOGDEBUG("%s(): add", __func__); + if (pgdb_paused) + LOGEMERG("ERR: %s() called when paused - data lost", __func__); + K_WLOCK(optioncontrol_free); item = k_unlink_head(optioncontrol_free); K_WUNLOCK(optioncontrol_free); @@ -3450,7 +3590,8 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, "prevhash,coinbase1,coinbase2,version,bits,ntime,reward" HISTORYDATECONTROL ") values (" PQPARAM16 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -3464,7 +3605,7 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, unparam: if (par) { - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); } @@ -4232,7 +4373,8 @@ bool shares_db(PGconn *conn, K_ITEM *s_item) "diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff,address," "agent" HISTORYDATECONTROL ") values (" PQPARAM21 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); if (!PGOK(rescode)) { @@ -4250,7 +4392,7 @@ bool shares_db(PGconn *conn, K_ITEM *s_item) unparam: if (par) { CKPQClear(res); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); } @@ -5215,7 +5357,8 @@ dokey: setnow(&kadd_fin); setnow(&db_stt); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!CKPQBegin(conn)) { setnow(&db_fin); goto flail; @@ -5264,7 +5407,7 @@ rollback: CKPQEnd(conn, ok); flail: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (reason) { // already displayed the full workmarkers detail at the top @@ -5522,7 +5665,8 @@ bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm) del = "delete from markersummary where markerid=$1"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); if (!PGOK(rescode)) { @@ -5549,7 +5693,7 @@ bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm) unparam: CKPQClear(res); flail: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (!ok) { if (del_markersummary_store && del_markersummary_store->count) { @@ -6038,7 +6182,8 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!CKPQBegin(conn)) goto unparam; @@ -6093,7 +6238,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -6246,7 +6391,8 @@ bool blocks_add(PGconn *conn, int32_t height, char *blockhash, "statsconfirmed" HISTORYDATECONTROL ") values (" PQPARAM23 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -6324,7 +6470,8 @@ bool blocks_add(PGconn *conn, int32_t height, char *blockhash, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; // New is mostly a copy of the old copy_blocks(row, oldblocks); STRNCPY(row->confirmed, confirmed); @@ -6464,7 +6611,7 @@ unparam: for (n = 0; n < par; n++) free(params[n]); flail: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); K_RLOCK(workinfo_free); K_WLOCK(blocks_free); @@ -6810,7 +6957,8 @@ bool miningpayouts_add(PGconn *conn, bool add, K_ITEM *mp_item, *old_mp_item = find_miningpayouts(row->payoutid, row->userid); K_RUNLOCK(miningpayouts_free); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!already) { begun = CKPQBegin(conn); if (!begun) @@ -6879,7 +7027,7 @@ unparam: for (n = 0; n < par; n++) free(params[n]); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (!already) miningpayouts_add_ram(ok, mp_item, *old_mp_item, cd); @@ -7090,7 +7238,8 @@ bool payouts_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item, *old_p_item = find_payouts(row->height, row->blockhash); K_RUNLOCK(payouts_free); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!already) { begun = CKPQBegin(conn); if (!begun) @@ -7179,7 +7328,7 @@ unparam: for (n = 0; n < par; n++) free(params[n]); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (!already) payouts_add_ram(ok, p_item, *old_p_item, cd); @@ -7221,7 +7370,8 @@ K_ITEM *payouts_full_expire(PGconn *conn, int64_t payoutid, tv_t *now, bool lock goto matane; } - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; begun = CKPQBegin(conn); if (!begun) goto matane; @@ -7420,7 +7570,7 @@ matane: K_WUNLOCK(payouts_free); } - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); if (lock) K_WUNLOCK(process_pplns_free); @@ -8128,7 +8278,8 @@ bool poolstats_add(PGconn *conn, bool store, INTRANSIENT *in_poolinstance, "hashrate5m,hashrate1hr,hashrate24hr" SIMPLEDATECONTROL ") values (" PQPARAM12 ")"; - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); if (!PGOK(rescode)) { @@ -8151,7 +8302,7 @@ bool poolstats_add(PGconn *conn, bool store, INTRANSIENT *in_poolinstance, unparam: if (store) { CKPQClear(res); - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); } @@ -8586,7 +8737,8 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, row->diffacc); FREENULL(st); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -8597,7 +8749,7 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, ok = true; unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -9031,7 +9183,8 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code, row->diffacc); FREENULL(st); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; res = CKPQExecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = CKPQResultStatus(res); CKPQClear(res); @@ -9046,7 +9199,7 @@ bool keysummary_add(PGconn *conn, K_ITEM *ks_item, char *by, char *code, ok = true; unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -9103,7 +9256,8 @@ bool _workmarkers_process(PGconn *conn, bool already, bool add, LOGDEBUG("%s(): updating old", __func__); DATA_WORKMARKERS(oldworkmarkers, old_wm_item); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!already) { begun = CKPQBegin(conn); if (!begun) @@ -9157,7 +9311,8 @@ bool _workmarkers_process(PGconn *conn, bool already, bool add, DATA_WORKMARKERS(row, wm_item); bzero(row, sizeof(*row)); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!already && !begun) { begun = CKPQBegin(conn); if (!begun) @@ -9215,7 +9370,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]); @@ -9461,7 +9616,8 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance, LOGDEBUG("%s(): updating old", __func__); DATA_MARKS(oldmarks, old_m_item); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; begun = CKPQBegin(conn); if (!begun) goto unparam; @@ -9533,7 +9689,8 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance, HISTORYDATEPARAMS(params, par, row); PARCHK(par, params); - conned = CKPQConn(&conn); + if (CKPQConn(&conn)) + conned = true; if (!begun) { begun = CKPQBegin(conn); if (!begun) @@ -9555,7 +9712,7 @@ rollback: CKPQEnd(conn, ok); unparam: - conned = CKPQDisco(&conn, conned); + CKPQDisco(&conn, conned); for (n = 0; n < par; n++) free(params[n]);