Browse Source

ckdb - consistent commit/rollback and par initialisation

master
kanoi 10 years ago
parent
commit
45e29b3da4
  1. 280
      src/ckdb.c

280
src/ckdb.c

@ -2251,11 +2251,10 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment,
PGresult *res;
char qry[1024];
char *params[5];
int par;
int n, par = 0;
int64_t lastid;
char *field;
bool ok;
int n;
lastid = 0;
@ -2777,13 +2776,12 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash,
K_TREE_CTX ctx[1];
PGresult *res;
K_ITEM *item;
int n;
USERS *row, *users;
char *upd, *ins;
bool ok = false;
char *params[5 + HISTORYDATECOUNT];
bool hash;
int par;
int n, par = 0;
LOGDEBUG("%s(): change", __func__);
@ -2830,17 +2828,18 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash,
// Beginning of a write txn
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
PQclear(res);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto unparam;
goto rollback;
}
for (n = 0; n < par; n++)
@ -2871,14 +2870,18 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash,
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
res = PQexec(conn, "Commit", CKPQ_WRITE);
ok = true;
unparam:
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
unparam:
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
@ -2911,7 +2914,6 @@ static K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress,
bool conned = false;
PGresult *res;
K_ITEM *item;
int n;
USERS *row;
char *ins;
char tohash[64];
@ -2919,7 +2921,7 @@ static K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress,
__maybe_unused uint64_t tmp;
bool ok = false;
char *params[8 + HISTORYDATECOUNT];
int par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -3175,7 +3177,7 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu
char *upd, *ins;
bool ok = false;
char *params[9 + HISTORYDATECOUNT];
int n, par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -3189,7 +3191,6 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu
/* N.B. the values of the old ua_item record, if it exists,
* are completely ignored i.e. you must provide all values required */
par = 0;
if (!conn) {
conn = dbconnect();
conned = true;
@ -3596,11 +3597,10 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername,
bool conned = false;
PGresult *res;
K_ITEM *item, *ret = NULL;
int n;
WORKERS *row;
char *ins;
char *params[6 + HISTORYDATECOUNT];
int par;
int n, par = 0;
int32_t diffdef;
int32_t nottime;
@ -3708,12 +3708,11 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
ExecStatusType rescode;
bool conned = false;
PGresult *res;
int n;
WORKERS *row;
char *upd, *ins;
bool ok = false;
char *params[6 + HISTORYDATECOUNT];
int par;
int n, par = 0;
int32_t diffdef;
char idlenot;
int32_t nottime;
@ -3756,74 +3755,76 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
nottime == row->idlenotificationtime) {
ok = true;
goto early;
} else {
upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3";
par = 0;
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = bigint_to_buf(row->workerid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
}
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3";
par = 0;
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = bigint_to_buf(row->workerid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
PQclear(res);
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
}
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
for (n = 0; n < par; n++)
free(params[n]);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto rollback;
}
ins = "insert into workers "
"(workerid,userid,workername,difficultydefault,"
"idlenotificationenabled,idlenotificationtime"
HISTORYDATECONTROL ") values (" PQPARAM11 ")";
for (n = 0; n < par; n++)
free(params[n]);
row->difficultydefault = diffdef;
row->idlenotificationenabled[0] = idlenot;
row->idlenotificationenabled[1] = '\0';
row->idlenotificationtime = nottime;
ins = "insert into workers "
"(workerid,userid,workername,difficultydefault,"
"idlenotificationenabled,idlenotificationtime"
HISTORYDATECONTROL ") values (" PQPARAM11 ")";
par = 0;
params[par++] = bigint_to_buf(row->workerid, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = int_to_buf(row->difficultydefault, NULL, 0);
params[par++] = str_to_buf(row->idlenotificationenabled, NULL, 0);
params[par++] = int_to_buf(row->idlenotificationtime, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
row->difficultydefault = diffdef;
row->idlenotificationenabled[0] = idlenot;
row->idlenotificationenabled[1] = '\0';
row->idlenotificationtime = nottime;
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
}
par = 0;
params[par++] = bigint_to_buf(row->workerid, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = int_to_buf(row->difficultydefault, NULL, 0);
params[par++] = str_to_buf(row->idlenotificationenabled, NULL, 0);
params[par++] = int_to_buf(row->idlenotificationtime, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
res = PQexec(conn, "Commit", CKPQ_WRITE);
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto rollback;
}
ok = true;
unparam:
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
unparam:
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
@ -4066,8 +4067,7 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre
char *upd, *ins;
bool ok = false;
char *params[4 + HISTORYDATECOUNT];
int par;
int n;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -4114,8 +4114,7 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
for (n = 0; n < par; n++)
@ -4135,16 +4134,21 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
goto rollback;
}
res = PQexec(conn, "Commit", CKPQ_WRITE);
ok = true;
unparam:
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
unparam:
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
@ -4189,10 +4193,9 @@ static bool paymentaddresses_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
PAYMENTADDRESSES *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 4;
@ -4300,10 +4303,9 @@ static bool payments_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
PAYMENTS *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 8;
@ -4507,12 +4509,11 @@ static K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, b
K_TREE_CTX ctx[1];
PGresult *res;
K_ITEM *old_item, look;
int n;
OPTIONCONTROL *row;
char *upd, *ins;
bool ok = false;
char *params[4 + HISTORYDATECOUNT];
int par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -4524,7 +4525,6 @@ static K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, b
old_item = find_in_ktree(optioncontrol_root, &look, cmp_optioncontrol, ctx);
K_RUNLOCK(optioncontrol_free);
par = 0;
if (!conn) {
conn = dbconnect();
conned = true;
@ -4678,10 +4678,9 @@ static bool optioncontrol_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
OPTIONCONTROL *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 4;
@ -4855,12 +4854,11 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc
K_ITEM *item;
char cd_buf[DATE_BUFSIZ];
char ndiffbin[TXT_SML+1];
int n;
int64_t workinfoid = -1;
WORKINFO *row;
char *ins;
char *params[11 + HISTORYDATECOUNT];
int par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -4907,8 +4905,8 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc
}
K_WUNLOCK(workinfo_free);
par = 0;
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = str_to_buf(row->poolinstance, NULL, 0);
params[par++] = str_to_buf(row->transactiontree, NULL, 0);
@ -5307,10 +5305,9 @@ static bool workinfo_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
WORKINFO *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 10;
@ -5970,9 +5967,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
if (new || !(row->inserted)) {
MODIFYDATEINIT(row, cd, by, code, inet);
par = 0;
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
@ -6027,8 +6023,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
stats_update = true;
if (stats_update) {
par = 0;
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
@ -6074,8 +6070,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row
ok = true;
goto late;
} else {
par = 0;
if (!confirm_sharesummary) {
par = 0;
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
@ -6465,8 +6461,7 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
char *upd, *ins;
char *params[8 + HISTORYDATECOUNT];
bool ok = false, update_old = false;
int par = 0;
int n;
int n, par = 0;
LOGDEBUG("%s(): confirm", __func__);
@ -6513,19 +6508,18 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
PQclear(res);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
update_old = true;
@ -6562,15 +6556,18 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
res = PQexec(conn, "Commit", CKPQ_WRITE);
ok = true;
unparam:
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
unparam:
for (n = 0; n < par; n++)
free(params[n]);
@ -6613,9 +6610,8 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash,
char *upd, *ins;
char *params[17 + HISTORYDATECOUNT];
bool ok = false, update_old = false;
int par = 0;
int n, par = 0;
char want = '?';
int n;
LOGDEBUG("%s(): add", __func__);
@ -6718,10 +6714,14 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash,
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
// We didn't use a Begin
ok = true;
goto unparam;
break;
case BLOCKS_ORPHAN:
case BLOCKS_42:
@ -6787,19 +6787,18 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash,
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
PQclear(res);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
for (n = 0; n < par; n++)
@ -6852,13 +6851,10 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash,
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
res = PQexec(conn, "Rollback", CKPQ_WRITE);
goto unparam;
goto rollback;
}
update_old = true;
res = PQexec(conn, "Commit", CKPQ_WRITE);
break;
default:
LOGERR("%s(): %s.failed.invalid confirm='%s'",
@ -6867,8 +6863,14 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash,
}
ok = true;
unparam:
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
unparam:
for (n = 0; n < par; n++)
free(params[n]);
flail:
@ -7148,12 +7150,11 @@ __maybe_unused static bool miningpayouts_add(PGconn *conn, char *username, char
PGresult *res;
K_ITEM *m_item, *u_item;
bool ok = false;
int n;
MININGPAYOUTS *row;
USERS *users;
char *ins;
char *params[5 + HISTORYDATECOUNT];
int par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -7233,10 +7234,9 @@ static bool miningpayouts_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
MININGPAYOUTS *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 5;
@ -7372,13 +7372,12 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username,
K_TREE_CTX ctx[1];
K_ITEM *a_item, *u_item;
char cd_buf[DATE_BUFSIZ];
int n;
USERS *users;
AUTHS *row;
char *ins;
char *secuserid = NULL;
char *params[8 + HISTORYDATECOUNT];
int par;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
@ -7499,10 +7498,9 @@ static bool auths_fill(PGconn *conn)
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
int n, i;
AUTHS *row;
char *params[1];
int par;
int n, i, par = 0;
char *field;
char *sel;
int fields = 7;
@ -7647,11 +7645,10 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
PGresult *res;
K_TREE_CTX ctx[1];
K_ITEM *p_item;
int n;
POOLSTATS *row;
char *ins;
char *params[8 + SIMPLEDATECOUNT];
int par;
int n, par = 0;
bool ok = false;
LOGDEBUG("%s(): add", __func__);
@ -7683,8 +7680,8 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
return true;
}
par = 0;
if (store) {
par = 0;
params[par++] = str_to_buf(row->poolinstance, NULL, 0);
params[par++] = bigint_to_buf(row->elapsed, NULL, 0);
params[par++] = int_to_buf(row->users, NULL, 0);
@ -7968,8 +7965,7 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row)
char *ins;
bool ok = false;
char *params[10 + SIMPLEDATECOUNT];
int par;
int n;
int n, par = 0;
LOGDEBUG("%s(): store", __func__);
@ -9458,13 +9454,12 @@ static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
K_ITEM *i_idname, *i_idvalue, *look;
IDCONTROL *row;
char *params[2 + MODIFYDATECOUNT];
int par;
int n, par = 0;
bool ok = false;
ExecStatusType rescode;
bool conned = false;
PGresult *res;
char *ins;
int n;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -10947,10 +10942,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
ua_item = NULL;
db++;
} else {
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
reason = "DBERR";
goto bats;
goto rollback;
}
}
if (!ua_item) {
@ -11008,16 +11001,19 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
begun = true;
}
if (!useratts_item_add(conn, ua_item, now, begun)) {
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
reason = "DBERR";
goto bats;
goto rollback;
}
db++;
res = PQexec(conn, "Commit", CKPQ_WRITE);
PQclear(res);
}
}
rollback:
if (!reason)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
bats:
if (conned)
PQfinish(conn);

Loading…
Cancel
Save