From 48ffb1d1094e01a85ba3459044d1e3db4e5da27e Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 16 Sep 2014 22:14:22 +1000 Subject: [PATCH 1/5] ckdb - optioncontrol table and commands - getopts/setopts --- src/ckdb.c | 675 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 649 insertions(+), 26 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 43a94834..31db9f31 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -49,7 +49,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.303" +#define CKDB_VERSION DB_VERSION"-0.305" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -236,6 +236,7 @@ static LOADSTATUS dbstatus; // Share stats since last block typedef struct poolstatus { int64_t workinfoid; // Last block + int32_t height; double diffacc; double diffinv; // Non-acc double shareacc; @@ -314,6 +315,8 @@ static char *pqerrmsg(PGconn *conn) { char *ptr, *buf = strdup(PQerrorMessage(conn)); + if (!buf) + quithere(1, "malloc OOM"); ptr = buf + strlen(buf) - 1; while (ptr >= buf && (*ptr == '\n' || *ptr == '\r')) *(ptr--) = '\0'; @@ -388,7 +391,7 @@ enum data_type { #define TXT_TO_INT(__nam, __fld, __data) txt_to_int(__nam, __fld, &(__data), sizeof(__data)) #define TXT_TO_TV(__nam, __fld, __data) txt_to_tv(__nam, __fld, &(__data), sizeof(__data)) #define TXT_TO_CTV(__nam, __fld, __data) txt_to_ctv(__nam, __fld, &(__data), sizeof(__data)) -#define TXT_TO_BLOB(__nam, __fld, __data) txt_to_blob(__nam, __fld, __data) +#define TXT_TO_BLOB(__nam, __fld, __data) txt_to_blob(__nam, __fld, &(__data)) #define TXT_TO_DOUBLE(__nam, __fld, __data) txt_to_double(__nam, __fld, &(__data), sizeof(__data)) #define PQ_GET_FLD(__res, __row, __name, __fld, __ok) do { \ @@ -803,6 +806,8 @@ enum cmd_values { CMD_GETATTS, CMD_SETATTS, CMD_EXPATTS, + CMD_GETOPTS, + CMD_SETOPTS, CMD_DSP, CMD_STATS, CMD_PPLNS, @@ -1135,7 +1140,6 @@ typedef struct idcontrol { static K_LIST *idcontrol_free; static K_STORE *idcontrol_store; -/* unused yet // OPTIONCONTROL typedef struct optioncontrol { char optionname[TXT_SML+1]; @@ -1149,11 +1153,14 @@ typedef struct optioncontrol { #define LIMIT_OPTIONCONTROL 0 #define INIT_OPTIONCONTROL(_item) INIT_GENERIC(_item, optioncontrol) #define DATA_OPTIONCONTROL(_var, _item) DATA_GENERIC(_var, _item, optioncontrol, true) +#define DATA_OPTIONCONTROL_NULL(_var, _item) DATA_GENERIC(_var, _item, optioncontrol, false) + +// Value it must default to (to work properly) +#define OPTIONCONTROL_HEIGHT 1 static K_TREE *optioncontrol_root; static K_LIST *optioncontrol_free; static K_STORE *optioncontrol_store; -*/ // TODO: discarding workinfo,shares // WORKINFO workinfo.id.json={...} @@ -1535,6 +1542,8 @@ static K_STORE *userstats_summ; ((_old)->tv_sec < (_new)->tv_sec)) #define tv_equal(_a, _b) (((_a)->tv_sec == (_b)->tv_sec) && \ ((_a)->tv_usec == (_b)->tv_usec)) +// newer OR equal +#define tv_newer_eq(_old, _new) (!(tv_newer(_new, _old))) // WORKERSTATUS from various incoming data typedef struct workerstatus { @@ -1716,9 +1725,9 @@ static void _txt_to_ctv(char *nam, char *fld, tv_t *data, size_t siz, WHERE_FFL_ _txt_to_data(TYPE_CTV, nam, fld, (void *)data, siz, WHERE_FFL_PASS); } -static void _txt_to_blob(char *nam, char *fld, char *data, WHERE_FFL_ARGS) +static void _txt_to_blob(char *nam, char *fld, char **data, WHERE_FFL_ARGS) { - _txt_to_data(TYPE_BLOB, nam, fld, (void *)(&data), 0, WHERE_FFL_PASS); + _txt_to_data(TYPE_BLOB, nam, fld, (void *)data, 0, WHERE_FFL_PASS); } static void _txt_to_double(char *nam, char *fld, double *data, size_t siz, WHERE_FFL_ARGS) @@ -1760,7 +1769,7 @@ static char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, buf = malloc(siz); if (!buf) - quithere(1, "OOM (%d)" WHERE_FFL, (int)siz, WHERE_FFL_PASS); + quithere(1, "(%d) OOM" WHERE_FFL, (int)siz, WHERE_FFL_PASS); } switch (typ) { @@ -1896,6 +1905,8 @@ static void log_queue_message(char *msg) lq_item = k_unlink_head(logqueue_free); DATA_LOGQUEUE(lq, lq_item); lq->msg = strdup(msg); + if (!(lq->msg)) + quithere(1, "malloc (%d) OOM", (int)strlen(msg)); k_add_tail(logqueue_store, lq_item); K_WUNLOCK(logqueue_free); } @@ -3164,7 +3175,7 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu char *upd, *ins; bool ok = false; char *params[9 + HISTORYDATECOUNT]; - int n, par = 0; + int n, par; LOGDEBUG("%s(): add", __func__); @@ -3178,6 +3189,7 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu /* N.B. the values of the old ua_item record, if it exists, * are completely ignored i.e. you must provide all values required */ + par = 0; if (!conn) { conn = dbconnect(); conned = true; @@ -3187,11 +3199,11 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu // Beginning of a write txn res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); goto unparam; } - PQclear(res); } if (old_item) { @@ -3206,6 +3218,7 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); goto unparam; @@ -3235,22 +3248,23 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto rollback; + } + + ok = true; +rollback: if (!begun) { - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; - } - res = PQexec(conn, "Commit", CKPQ_WRITE); - ok = true; - } else { - if (PGOK(rescode)) - ok = true; + PQclear(res); } unparam: - PQclear(res); if (conned) PQfinish(conn); for (n = 0; n < par; n++) @@ -3258,6 +3272,7 @@ unparam: K_WLOCK(useratts_free); if (ok) { + // Update it if (old_item) { useratts_root = remove_from_ktree(useratts_root, old_item, cmp_useratts, ctx); copy_tv(&(old_useratts->expirydate), cd); @@ -3333,10 +3348,11 @@ static __maybe_unused K_ITEM *useratts_add(PGconn *conn, char *username, char *a ok = useratts_item_add(conn, item, cd, begun); unitem: - K_WLOCK(useratts_free); - if (!ok) + if (!ok) { + K_WLOCK(useratts_free); k_add_head(useratts_free, item); - K_WUNLOCK(useratts_free); + K_WUNLOCK(useratts_free); + } if (ok) return item; @@ -4407,6 +4423,350 @@ void payments_reload() PQfinish(conn); } +// order by optionname asc,activationdate asc,activationheight asc,expirydate desc +static cmp_t cmp_optioncontrol(K_ITEM *a, K_ITEM *b) +{ + OPTIONCONTROL *oca, *ocb; + DATA_OPTIONCONTROL(oca, a); + DATA_OPTIONCONTROL(ocb, b); + cmp_t c = CMP_STR(oca->optionname, ocb->optionname); + if (c == 0) { + c = CMP_TV(oca->activationdate, ocb->activationdate); + if (c == 0) { + c = CMP_INT(oca->activationheight, ocb->activationheight); + if (c == 0) + c = CMP_TV(ocb->expirydate, oca->expirydate); + } + } + return c; +} + +// Must be R or W locked before call +static K_ITEM *find_optioncontrol(char *optionname, tv_t *now) +{ + OPTIONCONTROL optioncontrol, *oc, *ocbest; + K_TREE_CTX ctx[1]; + K_ITEM look, *item, *best; + + /* Step through all records having optionaname and check: + * 1) activationdate is <= now + * and + * 2) height <= current + * Remember the active record with the newest activationdate + * If two records have the same activation date, then + * remember the active record with the highest height + * In optioncontrol_add(), when not specified, + * the default activation date is DATE_BEGIN + * and the default height is 1 (OPTIONCONTROL_HEIGHT) + * Thus if records have both values set, then + * activationdate will determine the newests record + * To have activationheight decide selection, + * create all records with only activationheight and then + * activationdate will all be the default value and not + * decide the outcome */ + STRNCPY(optioncontrol.optionname, optionname); + optioncontrol.activationdate.tv_sec = 0L; + optioncontrol.activationdate.tv_usec = 0L; + optioncontrol.activationheight = OPTIONCONTROL_HEIGHT - 1; + optioncontrol.expirydate.tv_sec = default_expiry.tv_sec; + optioncontrol.expirydate.tv_usec = default_expiry.tv_usec; + + INIT_OPTIONCONTROL(&look); + look.data = (void *)(&optioncontrol); + item = find_after_in_ktree(optioncontrol_root, &look, cmp_optioncontrol, ctx); + ocbest = NULL; + best = NULL; + while (item) { + DATA_OPTIONCONTROL(oc, item); + // Ordered first by optionname + if (strcmp(oc->optionname, optionname) != 0) + break; + + // Is oc active? + if (CURRENT(&(oc->expirydate)) && + oc->activationheight <= pool.height && + tv_newer_eq(&(oc->activationdate), now)) { + // Is oc newer than ocbest? + if (!ocbest || + tv_newer(&(ocbest->activationdate), &(oc->activationdate)) || + (tv_equal(&(ocbest->activationdate), &(oc->activationdate)) && + ocbest->activationheight < oc->activationheight)) { + ocbest = oc; + best = item; + } + } + item = next_in_ktree(ctx); + } + return best; +} + +static K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool begun) +{ + ExecStatusType rescode; + bool conned = false; + K_TREE_CTX ctx[1]; + PGresult *res; + K_ITEM *old_item, look; + int n; + OPTIONCONTROL *row; + char *upd, *ins; + bool ok = false; + char *params[4 + HISTORYDATECOUNT]; + int par; + + LOGDEBUG("%s(): add", __func__); + + DATA_OPTIONCONTROL(row, oc_item); + + INIT_OPTIONCONTROL(&look); + look.data = (void *)row; + K_RLOCK(optioncontrol_free); + old_item = find_in_ktree(optioncontrol_root, &look, cmp_optioncontrol, ctx); + K_RUNLOCK(optioncontrol_free); + + par = 0; + if (!conn) { + conn = dbconnect(); + conned = true; + } + + if (!begun) { + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto nostart; + } + } + + if (old_item) { + upd = "update optioncontrol " + "set expirydate=$1 where optionname=$2 and " + "activationdate=$3 and activationheight=$4 and " + "expirydate=$5"; + + par = 0; + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = str_to_buf(row->optionname, NULL, 0); + params[par++] = tv_to_buf(&(row->activationdate), NULL, 0); + params[par++] = int_to_buf(row->activationheight, NULL, 0); + params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); + PARCHKVAL(par, 5, params); + + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Update", rescode, conn); + goto rollback; + } + + for (n = 0; n < par; n++) + free(params[n]); + } + + par = 0; + params[par++] = str_to_buf(row->optionname, NULL, 0); + params[par++] = str_to_buf(row->optionvalue, NULL, 0); + params[par++] = tv_to_buf(&(row->activationdate), NULL, 0); + params[par++] = int_to_buf(row->activationheight, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into optioncontrol " + "(optionname,optionvalue,activationdate,activationheight" + HISTORYDATECONTROL ") values (" PQPARAM9 ")"; + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto rollback; + } + + ok = true; +rollback: + if (!begun) { + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); + } +nostart: + if (conned) + PQfinish(conn); + for (n = 0; n < par; n++) + free(params[n]); + + K_WLOCK(optioncontrol_free); + if (!ok) + k_add_head(optioncontrol_free, oc_item); + else { + // Discard it + if (old_item) { + optioncontrol_root = remove_from_ktree(optioncontrol_root, old_item, + cmp_optioncontrol, ctx); + k_add_head(optioncontrol_free, old_item); + } + optioncontrol_root = add_to_ktree(optioncontrol_root, oc_item, cmp_optioncontrol); + k_add_head(optioncontrol_store, oc_item); + } + K_WUNLOCK(optioncontrol_free); + + if (ok) + return oc_item; + else + return NULL; +} + +static __maybe_unused K_ITEM *optioncontrol_add(PGconn *conn, char *optionname, char *optionvalue, + char *activationdate, char *activationheight, + char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root, bool begun) +{ + K_ITEM *item; + OPTIONCONTROL *row; + bool ok = false; + + LOGDEBUG("%s(): add", __func__); + + K_WLOCK(optioncontrol_free); + item = k_unlink_head(optioncontrol_free); + K_WUNLOCK(optioncontrol_free); + + DATA_OPTIONCONTROL(row, item); + + STRNCPY(row->optionname, optionname); + row->optionvalue = strdup(optionvalue); + if (!(row->optionvalue)) + quithere(1, "malloc (%d) OOM", (int)strlen(optionvalue)); + if (activationdate && *activationdate) { + TXT_TO_CTV("activationdate", activationdate, + row->activationdate); + } else + copy_tv(&(row->activationdate), &date_begin); + if (activationheight && *activationheight) { + TXT_TO_INT("activationheight", activationheight, + row->activationheight); + } else + row->activationheight = 1; + + HISTORYDATEINIT(row, cd, by, code, inet); + HISTORYDATETRANSFER(trf_root, row); + + ok = optioncontrol_item_add(conn, item, cd, begun); + + if (!ok) { + free(row->optionvalue); + K_WLOCK(optioncontrol_free); + k_add_head(optioncontrol_free, item); + K_WUNLOCK(optioncontrol_free); + } + + if (ok) + return item; + else + return NULL; +} + +static bool optioncontrol_fill(PGconn *conn) +{ + ExecStatusType rescode; + PGresult *res; + K_ITEM *item; + int n, i; + OPTIONCONTROL *row; + char *params[1]; + int par; + char *field; + char *sel; + int fields = 4; + bool ok; + + LOGDEBUG("%s(): select", __func__); + + // No need to keep old versions in ram for now ... + sel = "select " + "optionname,optionvalue,activationdate,activationheight" + HISTORYDATECONTROL + " from optioncontrol where expirydate=$1"; + par = 0; + params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); + PARCHK(par, params); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Select", rescode, conn); + PQclear(res); + return false; + } + + n = PQnfields(res); + if (n != (fields + HISTORYDATECOUNT)) { + LOGERR("%s(): Invalid field count - should be %d, but is %d", + __func__, fields + HISTORYDATECOUNT, n); + PQclear(res); + return false; + } + + n = PQntuples(res); + LOGDEBUG("%s(): tree build count %d", __func__, n); + ok = true; + K_WLOCK(optioncontrol_free); + for (i = 0; i < n; i++) { + item = k_unlink_head(optioncontrol_free); + DATA_OPTIONCONTROL(row, item); + + if (everyone_die) { + ok = false; + break; + } + + PQ_GET_FLD(res, i, "optionname", field, ok); + if (!ok) + break; + TXT_TO_STR("optionname", field, row->optionname); + + PQ_GET_FLD(res, i, "optionvalue", field, ok); + if (!ok) + break; + TXT_TO_BLOB("optionvalue", field, row->optionvalue); + + PQ_GET_FLD(res, i, "activationdate", field, ok); + if (!ok) + break; + TXT_TO_TV("activationdate", field, row->activationdate); + + PQ_GET_FLD(res, i, "activationheight", field, ok); + if (!ok) + break; + TXT_TO_INT("activationheight", field, row->activationheight); + + HISTORYDATEFLDS(res, i, row, ok); + if (!ok) + break; + + optioncontrol_root = add_to_ktree(optioncontrol_root, item, cmp_optioncontrol); + k_add_head(optioncontrol_store, item); + } + if (!ok) + k_add_head(optioncontrol_free, item); + + K_WUNLOCK(optioncontrol_free); + PQclear(res); + + if (ok) { + LOGDEBUG("%s(): built", __func__); + LOGWARNING("%s(): loaded %d optioncontrol records", __func__, n); + } + + return ok; +} + // order by workinfoid asc,expirydate asc static cmp_t cmp_workinfo(K_ITEM *a, K_ITEM *b) { @@ -4513,7 +4873,11 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc TXT_TO_BIGINT("workinfoid", workinfoidstr, row->workinfoid); STRNCPY(row->poolinstance, poolinstance); row->transactiontree = strdup(transactiontree); + if (!(row->transactiontree)) + quithere(1, "malloc (%d) OOM", (int)strlen(transactiontree)); row->merklehash = strdup(merklehash); + if (!(row->merklehash)) + quithere(1, "malloc (%d) OOM", (int)strlen(merklehash)); STRNCPY(row->prevhash, prevhash); STRNCPY(row->coinbase1, coinbase1); STRNCPY(row->coinbase2, coinbase2); @@ -6565,6 +6929,7 @@ flail: pool.diffacc, est, pct, cd_buf); if (pool.workinfoid < row->workinfoid) { pool.workinfoid = row->workinfoid; + pool.height = row->height; zero_on_new_block(); } break; @@ -6728,8 +7093,10 @@ static bool blocks_fill(PGconn *conn) if (tv_newer(&(dbstatus.newest_createdate_blocks), &(row->createdate))) copy_tv(&(dbstatus.newest_createdate_blocks), &(row->createdate)); - if (pool.workinfoid < row->workinfoid) + if (pool.workinfoid < row->workinfoid) { pool.workinfoid = row->workinfoid; + pool.height = row->height; + } } if (!ok) k_add_head(blocks_free, item); @@ -8052,6 +8419,8 @@ static bool check_db_version(PGconn *conn) /* Load tables required to support auths,adduser,chkpass and newid * N.B. idcontrol is DB internal so is always ready + * OptionControl is loaded first in case it is needed by other loads + * (though not yet) */ static bool getdata1() { @@ -8060,6 +8429,8 @@ static bool getdata1() if (!(ok = check_db_version(conn))) goto matane; + if (!(ok = optioncontrol_fill(conn))) + goto matane; if (!(ok = users_fill(conn))) goto matane; if (!(ok = workers_fill(conn))) @@ -8293,6 +8664,12 @@ static void alloc_storage() useratts_store = k_new_store(useratts_free); useratts_root = new_ktree(); + optioncontrol_free = k_new_list("OptionControl", sizeof(OPTIONCONTROL), + ALLOC_OPTIONCONTROL, + LIMIT_OPTIONCONTROL, true); + optioncontrol_store = k_new_store(optioncontrol_free); + optioncontrol_root = new_ktree(); + workers_free = k_new_list("Workers", sizeof(WORKERS), ALLOC_WORKERS, LIMIT_WORKERS, true); workers_store = k_new_store(workers_free); @@ -10511,7 +10888,7 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, char reply[1024] = ""; size_t siz = sizeof(reply); TRANSFER *transfer; - USERATTS *useratts; + USERATTS *useratts = NULL; USERS *users; char attname[sizeof(useratts->attname)*2]; char *reason = NULL; @@ -10742,6 +11119,239 @@ rats: return strdup(reply); } +/* Return the list of optioncontrols + * Format is optlist=optionname,optionname,optionname,... + * Replies will be optionname=value + * Any optionnames not in the DB or not yet active will be missing + */ +static char *cmd_getopts(__maybe_unused PGconn *conn, char *cmd, char *id, + tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root) +{ + K_ITEM *i_optlist, *oc_item; + char reply[1024] = ""; + size_t siz = sizeof(reply); + char tmp[1024]; + OPTIONCONTROL *optioncontrol; + char *reason = NULL; + char *answer = NULL; + char *optlist = NULL, *ptr, *comma; + size_t len, off; + bool first; + + LOGDEBUG("%s(): cmd '%s'", __func__, cmd); + + i_optlist = require_name(trf_root, "optlist", 1, NULL, reply, siz); + if (!i_optlist) { + reason = "Missing optlist"; + goto ruts; + } + + APPEND_REALLOC_INIT(answer, off, len); + optlist = ptr = strdup(transfer_data(i_optlist)); + first = true; + while (ptr && *ptr) { + comma = strchr(ptr, ','); + if (comma) + *(comma++) = '\0'; + K_RLOCK(optioncontrol_free); + oc_item = find_optioncontrol(ptr, now); + K_RUNLOCK(optioncontrol_free); + /* web code must check the existance of the optionname + * in the reply since it will be missing if it doesn't + * exist in the DB */ + if (oc_item) { + DATA_OPTIONCONTROL(optioncontrol, oc_item); + snprintf(tmp, sizeof(tmp), "%s%s=%s", + first ? EMPTY : FLDSEPSTR, + optioncontrol->optionname, + optioncontrol->optionvalue); + APPEND_REALLOC(answer, off, len, tmp); + first = false; + } + ptr = comma; + } +ruts: + if (optlist) + free(optlist); + + if (reason) { + if (answer) + free(answer); + snprintf(reply, siz, "ERR.%s", reason); + LOGERR("%s.%s.%s", cmd, id, reply); + return strdup(reply); + } + snprintf(reply, siz, "ok.%s", answer); + LOGDEBUG("%s.%s", id, answer); + free(answer); + return strdup(reply); +} + +// This is the same as att_set_date() for now +#define opt_set_date(_date, _data, _now) att_set_date(_date, _data, _now) + +/* Store optioncontrols in the DB + * Format is 1 or more: oc_optionname.fld=value + * i.e. each starts with the constant "oc_" + * optionname cannot contain Tab . or = + * fld is one of the 3: value, date, height + * value must exist + * None, one or both of date and height can exist + * If a matching optioncontrol (same name, date and height) exists, + * it will have it's expiry date set to now and be replaced with the new value + * The date field requires either an epoch sec,usec + * (usec is optional and defaults to 0) or one of: now or now+NNN + * now is the current epoch value and now+NNN is the epoch + NNN seconds + * See opt_set_date() above */ +static char *cmd_setopts(PGconn *conn, char *cmd, char *id, + tv_t *now, char *by, char *code, char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root) +{ + ExecStatusType rescode; + PGresult *res; + bool conned = false; + K_ITEM *t_item, *oc_item = NULL; + K_TREE_CTX ctx[1]; + char reply[1024] = ""; + size_t siz = sizeof(reply); + TRANSFER *transfer; + OPTIONCONTROL *optioncontrol; + char optionname[sizeof(optioncontrol->optionname)*2]; + char *reason = NULL; + char *dot, *data; + bool begun = false, gotvalue = false; + int db = 0; + + LOGDEBUG("%s(): cmd '%s'", __func__, cmd); + + t_item = first_in_ktree(trf_root, ctx); + while (t_item) { + DATA_TRANSFER(transfer, t_item); + if (strncmp(transfer->name, "oc_", 3) == 0) { + data = transfer_data(t_item); + STRNCPY(optionname, transfer->name + 3); + dot = strchr(optionname, '.'); + if (!dot) { + reason = "Missing field"; + goto rollback; + } + *(dot++) = '\0'; + // If we already had a different one, save it to the DB + if (oc_item && strcmp(optioncontrol->optionname, optionname) != 0) { + if (!gotvalue) { + reason = "Missing value"; + goto rollback; + } + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + if (!begun) { + // Beginning of a write txn + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + reason = "DBERR"; + goto rollback; + } + begun = true; + } + if (optioncontrol_item_add(conn, oc_item, now, begun)) { + oc_item = NULL; + db++; + } else { + reason = "DBERR"; + goto rollback; + } + } + if (!oc_item) { + K_RLOCK(optioncontrol_free); + oc_item = k_unlink_head(optioncontrol_free); + K_RUNLOCK(optioncontrol_free); + DATA_OPTIONCONTROL(optioncontrol, oc_item); + bzero(optioncontrol, sizeof(*optioncontrol)); + STRNCPY(optioncontrol->optionname, optionname); + optioncontrol->activationheight = OPTIONCONTROL_HEIGHT; + HISTORYDATEINIT(optioncontrol, now, by, code, inet); + HISTORYDATETRANSFER(trf_root, optioncontrol); + gotvalue = false; + } + if (strcmp(dot, "value") == 0) { + optioncontrol->optionvalue = strdup(data); + if (!(optioncontrol->optionvalue)) + quithere(1, "malloc (%d) OOM", (int)strlen(data)); + gotvalue = true; + } else if (strcmp(dot, "date") == 0) { + att_to_date(&(optioncontrol->activationdate), data, now); + } else if (strcmp(dot, "height") == 0) { + TXT_TO_INT("height", data, optioncontrol->activationheight); + } else { + reason = "Unknown field"; + goto rollback; + } + } + t_item = next_in_ktree(ctx); + } + if (oc_item) { + if (!gotvalue) { + reason = "Missing value"; + goto rollback; + } + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + if (!begun) { + // Beginning of a write txn + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + reason = "DBERR"; + goto rollback; + } + begun = true; + } + if (!optioncontrol_item_add(conn, oc_item, now, begun)) { + reason = "DBERR"; + goto rollback; + } + db++; + } +rollback: + if (begun) { + if (reason) + res = PQexec(conn, "Rollback", CKPQ_WRITE); + else + res = PQexec(conn, "Commit", CKPQ_WRITE); + + PQclear(res); + } + + if (conned) + PQfinish(conn); + if (reason) { + if (oc_item) { + if (optioncontrol->optionvalue) + free(optioncontrol->optionvalue); + K_WLOCK(optioncontrol_free); + k_add_head(optioncontrol_free, oc_item); + K_WUNLOCK(optioncontrol_free); + } + snprintf(reply, siz, "ERR.%s", reason); + LOGERR("%s.%s.%s", cmd, id, reply); + return strdup(reply); + } + snprintf(reply, siz, "ok.set %d", db); + LOGDEBUG("%s.%s", id, reply); + return strdup(reply); +} + // order by userid asc static cmp_t cmp_mu(K_ITEM *a, K_ITEM *b) { @@ -11327,6 +11937,8 @@ static struct CMDS { { CMD_GETATTS, "getatts", false, false, cmd_getatts, ACCESS_WEB }, { CMD_SETATTS, "setatts", false, false, cmd_setatts, ACCESS_WEB }, { CMD_EXPATTS, "expatts", false, false, cmd_expatts, ACCESS_WEB }, + { CMD_GETOPTS, "getopts", false, false, cmd_getopts, ACCESS_WEB }, + { CMD_SETOPTS, "setopts", false, false, cmd_setopts, ACCESS_WEB }, { CMD_DSP, "dsp", false, false, cmd_dsp, ACCESS_SYSTEM }, { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM }, { CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM }, @@ -11996,6 +12608,7 @@ static void *socketer(__maybe_unused void *arg) char *last_userset = NULL, *reply_userset = NULL; char *last_newid = NULL, *reply_newid = NULL; char *last_setatts = NULL, *reply_setatts = NULL; + char *last_setopts = NULL, *reply_setopts = NULL; char *last_web = NULL, *reply_web = NULL; char *reply_last, duptype[CMD_SIZ+1]; enum cmd_values cmdnum; @@ -12090,6 +12703,9 @@ static void *socketer(__maybe_unused void *arg) } else if (last_setatts && strcmp(last_setatts, buf) == 0) { reply_last = reply_setatts; dup = true; + } else if (last_setopts && strcmp(last_setopts, buf) == 0) { + reply_last = reply_setopts; + dup = true; } else if (last_web && strcmp(last_web, buf) == 0) { reply_last = reply_web; dup = true; @@ -12179,6 +12795,8 @@ static void *socketer(__maybe_unused void *arg) case CMD_GETATTS: case CMD_SETATTS: case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: case CMD_BLOCKLIST: case CMD_NEWID: case CMD_STATS: @@ -12218,6 +12836,9 @@ static void *socketer(__maybe_unused void *arg) case CMD_SETATTS: STORELASTREPLY(setatts); break; + case CMD_SETOPTS: + STORELASTREPLY(setopts); + break; // The rest default: free(rep); @@ -12416,6 +13037,8 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_GETATTS: case CMD_SETATTS: case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: case CMD_DSP: case CMD_STATS: case CMD_PPLNS: @@ -12490,7 +13113,7 @@ static bool reload_from(tv_t *start) reload_buf = malloc(MAX_READ); if (!reload_buf) - quithere(1, "OOM"); + quithere(1, "(%d) OOM", MAX_READ); reloading = true; From 45e29b3da45e92071aa0b65fe4af7a66a31b11cc Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 17 Sep 2014 08:53:07 +1000 Subject: [PATCH 2/5] ckdb - consistent commit/rollback and par initialisation --- src/ckdb.c | 280 ++++++++++++++++++++++++++--------------------------- 1 file changed, 138 insertions(+), 142 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 31db9f31..8ebda2bf 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2251,11 +2251,10 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, PGresult *res; char qry[1024]; char *params[5]; - int par; + int n, par = 0; int64_t lastid; char *field; bool ok; - int n; lastid = 0; @@ -2777,13 +2776,12 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash, K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; - int n; USERS *row, *users; char *upd, *ins; bool ok = false; char *params[5 + HISTORYDATECOUNT]; bool hash; - int par; + int n, par = 0; LOGDEBUG("%s(): change", __func__); @@ -2830,17 +2828,18 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash, // Beginning of a write txn res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); goto unparam; } - PQclear(res); res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - goto unparam; + goto rollback; } for (n = 0; n < par; n++) @@ -2871,14 +2870,18 @@ static bool users_pass_email(PGconn *conn, K_ITEM *u_item, char *oldhash, PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } - res = PQexec(conn, "Commit", CKPQ_WRITE); ok = true; -unparam: +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + PQclear(res); +unparam: if (conned) PQfinish(conn); for (n = 0; n < par; n++) @@ -2911,7 +2914,6 @@ static K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress, bool conned = false; PGresult *res; K_ITEM *item; - int n; USERS *row; char *ins; char tohash[64]; @@ -2919,7 +2921,7 @@ static K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress, __maybe_unused uint64_t tmp; bool ok = false; char *params[8 + HISTORYDATECOUNT]; - int par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -3175,7 +3177,7 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu char *upd, *ins; bool ok = false; char *params[9 + HISTORYDATECOUNT]; - int n, par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -3189,7 +3191,6 @@ static bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begu /* N.B. the values of the old ua_item record, if it exists, * are completely ignored i.e. you must provide all values required */ - par = 0; if (!conn) { conn = dbconnect(); conned = true; @@ -3596,11 +3597,10 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, bool conned = false; PGresult *res; K_ITEM *item, *ret = NULL; - int n; WORKERS *row; char *ins; char *params[6 + HISTORYDATECOUNT]; - int par; + int n, par = 0; int32_t diffdef; int32_t nottime; @@ -3708,12 +3708,11 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, ExecStatusType rescode; bool conned = false; PGresult *res; - int n; WORKERS *row; char *upd, *ins; bool ok = false; char *params[6 + HISTORYDATECOUNT]; - int par; + int n, par = 0; int32_t diffdef; char idlenot; int32_t nottime; @@ -3756,74 +3755,76 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, nottime == row->idlenotificationtime) { ok = true; goto early; - } else { - upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3"; - par = 0; - params[par++] = tv_to_buf(cd, NULL, 0); - params[par++] = bigint_to_buf(row->workerid, NULL, 0); - params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); - PARCHKVAL(par, 3, params); + } - if (conn == NULL) { - conn = dbconnect(); - conned = true; - } + upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3"; + par = 0; + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = bigint_to_buf(row->workerid, NULL, 0); + params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); + PARCHKVAL(par, 3, params); - res = PQexec(conn, "Begin", CKPQ_WRITE); - rescode = PQresultStatus(res); - if (!PGOK(rescode)) { - PGLOGERR("Begin", rescode, conn); - goto unparam; - } - PQclear(res); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } - res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; - } + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } - for (n = 0; n < par; n++) - free(params[n]); + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Update", rescode, conn); + goto rollback; + } - ins = "insert into workers " - "(workerid,userid,workername,difficultydefault," - "idlenotificationenabled,idlenotificationtime" - HISTORYDATECONTROL ") values (" PQPARAM11 ")"; + for (n = 0; n < par; n++) + free(params[n]); - row->difficultydefault = diffdef; - row->idlenotificationenabled[0] = idlenot; - row->idlenotificationenabled[1] = '\0'; - row->idlenotificationtime = nottime; + ins = "insert into workers " + "(workerid,userid,workername,difficultydefault," + "idlenotificationenabled,idlenotificationtime" + HISTORYDATECONTROL ") values (" PQPARAM11 ")"; - par = 0; - params[par++] = bigint_to_buf(row->workerid, NULL, 0); - params[par++] = bigint_to_buf(row->userid, NULL, 0); - params[par++] = str_to_buf(row->workername, NULL, 0); - params[par++] = int_to_buf(row->difficultydefault, NULL, 0); - params[par++] = str_to_buf(row->idlenotificationenabled, NULL, 0); - params[par++] = int_to_buf(row->idlenotificationtime, NULL, 0); - HISTORYDATEPARAMS(params, par, row); - PARCHK(par, params); + row->difficultydefault = diffdef; + row->idlenotificationenabled[0] = idlenot; + row->idlenotificationenabled[1] = '\0'; + row->idlenotificationtime = nottime; - res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; - } + par = 0; + params[par++] = bigint_to_buf(row->workerid, NULL, 0); + params[par++] = bigint_to_buf(row->userid, NULL, 0); + params[par++] = str_to_buf(row->workername, NULL, 0); + params[par++] = int_to_buf(row->difficultydefault, NULL, 0); + params[par++] = str_to_buf(row->idlenotificationenabled, NULL, 0); + params[par++] = int_to_buf(row->idlenotificationtime, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHK(par, params); - res = PQexec(conn, "Commit", CKPQ_WRITE); + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto rollback; } ok = true; -unparam: +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + PQclear(res); +unparam: if (conned) PQfinish(conn); for (n = 0; n < par; n++) @@ -4066,8 +4067,7 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre char *upd, *ins; bool ok = false; char *params[4 + HISTORYDATECOUNT]; - int par; - int n; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -4114,8 +4114,7 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } for (n = 0; n < par; n++) @@ -4135,16 +4134,21 @@ static K_ITEM *paymentaddresses_set(PGconn *conn, int64_t userid, char *payaddre res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - goto unparam; + goto rollback; } - res = PQexec(conn, "Commit", CKPQ_WRITE); - ok = true; -unparam: +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + PQclear(res); +unparam: if (conned) PQfinish(conn); for (n = 0; n < par; n++) @@ -4189,10 +4193,9 @@ static bool paymentaddresses_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; PAYMENTADDRESSES *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 4; @@ -4300,10 +4303,9 @@ static bool payments_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; PAYMENTS *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 8; @@ -4507,12 +4509,11 @@ static K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, b K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *old_item, look; - int n; OPTIONCONTROL *row; char *upd, *ins; bool ok = false; char *params[4 + HISTORYDATECOUNT]; - int par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -4524,7 +4525,6 @@ static K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, b old_item = find_in_ktree(optioncontrol_root, &look, cmp_optioncontrol, ctx); K_RUNLOCK(optioncontrol_free); - par = 0; if (!conn) { conn = dbconnect(); conned = true; @@ -4678,10 +4678,9 @@ static bool optioncontrol_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; OPTIONCONTROL *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 4; @@ -4855,12 +4854,11 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc K_ITEM *item; char cd_buf[DATE_BUFSIZ]; char ndiffbin[TXT_SML+1]; - int n; int64_t workinfoid = -1; WORKINFO *row; char *ins; char *params[11 + HISTORYDATECOUNT]; - int par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -4907,8 +4905,8 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc } K_WUNLOCK(workinfo_free); - par = 0; if (!confirm_sharesummary) { + par = 0; params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); params[par++] = str_to_buf(row->poolinstance, NULL, 0); params[par++] = str_to_buf(row->transactiontree, NULL, 0); @@ -5307,10 +5305,9 @@ static bool workinfo_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; WORKINFO *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 10; @@ -5970,9 +5967,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row if (new || !(row->inserted)) { MODIFYDATEINIT(row, cd, by, code, inet); - par = 0; - if (!confirm_sharesummary) { + par = 0; params[par++] = bigint_to_buf(row->userid, NULL, 0); params[par++] = str_to_buf(row->workername, NULL, 0); params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -6027,8 +6023,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row stats_update = true; if (stats_update) { - par = 0; if (!confirm_sharesummary) { + par = 0; params[par++] = bigint_to_buf(row->userid, NULL, 0); params[par++] = str_to_buf(row->workername, NULL, 0); params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -6074,8 +6070,8 @@ static bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row ok = true; goto late; } else { - par = 0; if (!confirm_sharesummary) { + par = 0; params[par++] = bigint_to_buf(row->userid, NULL, 0); params[par++] = str_to_buf(row->workername, NULL, 0); params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -6465,8 +6461,7 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, char *upd, *ins; char *params[8 + HISTORYDATECOUNT]; bool ok = false, update_old = false; - int par = 0; - int n; + int n, par = 0; LOGDEBUG("%s(): confirm", __func__); @@ -6513,19 +6508,18 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); goto unparam; } - PQclear(res); res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } update_old = true; @@ -6562,15 +6556,18 @@ static bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } - res = PQexec(conn, "Commit", CKPQ_WRITE); - ok = true; -unparam: +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + PQclear(res); +unparam: for (n = 0; n < par; n++) free(params[n]); @@ -6613,9 +6610,8 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, char *upd, *ins; char *params[17 + HISTORYDATECOUNT]; bool ok = false, update_old = false; - int par = 0; + int n, par = 0; char want = '?'; - int n; LOGDEBUG("%s(): add", __func__); @@ -6718,10 +6714,14 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); goto unparam; } + // We didn't use a Begin + ok = true; + goto unparam; break; case BLOCKS_ORPHAN: case BLOCKS_42: @@ -6787,19 +6787,18 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); + PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Begin", rescode, conn); goto unparam; } - PQclear(res); res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Update", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } for (n = 0; n < par; n++) @@ -6852,13 +6851,10 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, PQclear(res); if (!PGOK(rescode)) { PGLOGERR("Insert", rescode, conn); - res = PQexec(conn, "Rollback", CKPQ_WRITE); - goto unparam; + goto rollback; } update_old = true; - - res = PQexec(conn, "Commit", CKPQ_WRITE); break; default: LOGERR("%s(): %s.failed.invalid confirm='%s'", @@ -6867,8 +6863,14 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, } ok = true; -unparam: +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + PQclear(res); +unparam: for (n = 0; n < par; n++) free(params[n]); flail: @@ -7148,12 +7150,11 @@ __maybe_unused static bool miningpayouts_add(PGconn *conn, char *username, char PGresult *res; K_ITEM *m_item, *u_item; bool ok = false; - int n; MININGPAYOUTS *row; USERS *users; char *ins; char *params[5 + HISTORYDATECOUNT]; - int par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -7233,10 +7234,9 @@ static bool miningpayouts_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; MININGPAYOUTS *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 5; @@ -7372,13 +7372,12 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item; char cd_buf[DATE_BUFSIZ]; - int n; USERS *users; AUTHS *row; char *ins; char *secuserid = NULL; char *params[8 + HISTORYDATECOUNT]; - int par; + int n, par = 0; LOGDEBUG("%s(): add", __func__); @@ -7499,10 +7498,9 @@ static bool auths_fill(PGconn *conn) ExecStatusType rescode; PGresult *res; K_ITEM *item; - int n, i; AUTHS *row; char *params[1]; - int par; + int n, i, par = 0; char *field; char *sel; int fields = 7; @@ -7647,11 +7645,10 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, PGresult *res; K_TREE_CTX ctx[1]; K_ITEM *p_item; - int n; POOLSTATS *row; char *ins; char *params[8 + SIMPLEDATECOUNT]; - int par; + int n, par = 0; bool ok = false; LOGDEBUG("%s(): add", __func__); @@ -7683,8 +7680,8 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, return true; } - par = 0; if (store) { + par = 0; params[par++] = str_to_buf(row->poolinstance, NULL, 0); params[par++] = bigint_to_buf(row->elapsed, NULL, 0); params[par++] = int_to_buf(row->users, NULL, 0); @@ -7968,8 +7965,7 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row) char *ins; bool ok = false; char *params[10 + SIMPLEDATECOUNT]; - int par; - int n; + int n, par = 0; LOGDEBUG("%s(): store", __func__); @@ -9458,13 +9454,12 @@ static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, K_ITEM *i_idname, *i_idvalue, *look; IDCONTROL *row; char *params[2 + MODIFYDATECOUNT]; - int par; + int n, par = 0; bool ok = false; ExecStatusType rescode; bool conned = false; PGresult *res; char *ins; - int n; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); @@ -10947,10 +10942,8 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, ua_item = NULL; db++; } else { - res = PQexec(conn, "Rollback", CKPQ_WRITE); - PQclear(res); reason = "DBERR"; - goto bats; + goto rollback; } } if (!ua_item) { @@ -11008,16 +11001,19 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, begun = true; } if (!useratts_item_add(conn, ua_item, now, begun)) { - res = PQexec(conn, "Rollback", CKPQ_WRITE); - PQclear(res); reason = "DBERR"; - goto bats; + goto rollback; } db++; - res = PQexec(conn, "Commit", CKPQ_WRITE); - PQclear(res); } } +rollback: + if (!reason) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); bats: if (conned) PQfinish(conn); From e0b08aadd0f2d82358bc9880af6ca8bb5890544c Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 17 Sep 2014 11:55:22 +1000 Subject: [PATCH 3/5] ckdb - allow retrying the db connection and reconnect often --- src/ckdb.c | 55 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 8ebda2bf..7cbb21c5 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -49,7 +49,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.305" +#define CKDB_VERSION DB_VERSION"-0.310" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2227,8 +2227,9 @@ static K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt, static PGconn *dbconnect() { - char conninfo[128]; + char conninfo[256]; PGconn *conn; + int i, retry = 10; snprintf(conninfo, sizeof(conninfo), "host=127.0.0.1 dbname=%s user=%s%s%s", @@ -2237,9 +2238,20 @@ static PGconn *dbconnect() db_pass ? db_pass : ""); conn = PQconnectdb(conninfo); - if (PQstatus(conn) != CONNECTION_OK) - quithere(1, "ERR: Failed to connect to db '%s'", pqerrmsg(conn)); - + if (PQstatus(conn) != CONNECTION_OK) { + LOGERR("%s(): Failed 1st connect to db '%s'", __func__, pqerrmsg(conn)); + LOGERR("%s(): Retrying for %d seconds ...", __func__, retry); + for (i = 0; i < retry; i++) { + sleep(1); + conn = PQconnectdb(conninfo); + if (PQstatus(conn) == CONNECTION_OK) { + LOGWARNING("%s(): Connected on attempt %d", __func__, i+2); + return conn; + } + } + quithere(1, "ERR: Failed to connect %d times to db '%s'", + retry+1, pqerrmsg(conn)); + } return conn; } @@ -13291,7 +13303,8 @@ static void *listener(void *arg) pthread_t sock_pt; pthread_t summ_pt; K_ITEM *wq_item; - int qc; + time_t now; + int wqcount, wqgot; logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); @@ -13315,23 +13328,34 @@ static void *listener(void *arg) if (!everyone_die) { K_RLOCK(workqueue_store); - qc = workqueue_store->count; + wqcount = workqueue_store->count; K_RUNLOCK(workqueue_store); - LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc); + LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); startup_complete = true; } - if (!everyone_die) - conn = dbconnect(); - // Process queued work + conn = dbconnect(); + now = time(NULL); + wqgot = 0; while (!everyone_die) { K_WLOCK(workqueue_store); wq_item = k_unlink_head(workqueue_store); K_WUNLOCK(workqueue_store); + + /* Don't keep a connection for more than ~10s or ~1000 items + * but always have a connection open */ + if ((time(NULL) - now) > 10 || wqgot > 1000) { + PQfinish(conn); + conn = dbconnect(); + now = time(NULL); + wqgot = 0; + } + if (wq_item) { + wqgot++; process_queued(conn, wq_item); tick(); } else { @@ -13459,6 +13483,15 @@ static void compare_summaries(K_TREE *leftsum, char *leftname, } } +/* TODO: have a seperate option to find/store missing workinfo/shares/etc + * from the reload files, in a supplied UTC time range + * since there is no automatic way to get them in the DB after later ones + * have been stored e.g. a database failure/recovery or short outage but + * later workinfos/shares/etc have been stored so earlier ones will not be + * picked up by the reload + * However, will need to deal with, via reporting an error and/or abort, + * if a stored workinfoid is in a range that has already been paid + * and the payment is now wrong */ static void confirm_reload() { K_TREE *sharesummary_workinfoid_save; From 5798453985c724455f9914230b9dd54db6638d5d Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 17 Sep 2014 12:03:20 +1000 Subject: [PATCH 4/5] ckdb - also report postgresql version at startup --- src/ckdb.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 7cbb21c5..2b5f4db1 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -49,7 +49,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.310" +#define CKDB_VERSION DB_VERSION"-0.311" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -8356,13 +8356,14 @@ static bool check_db_version(PGconn *conn) PGresult *res; char *field; char *sel; - int fields = 2; + char *pgv; + int fields = 3; bool ok; int n; LOGDEBUG("%s(): select", __func__); - sel = "select * from version;"; + sel = "select version() as pgv,* from version;"; res = PQexec(conn, sel, CKPQ_READ); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -8417,10 +8418,19 @@ static bool check_db_version(PGconn *conn) return false; } + PQ_GET_FLD(res, 0, "pgv", field, ok); + if (ok) + pgv = strdup(field); + else + pgv = strdup("Failed to get postgresql version information"); + PQclear(res); LOGWARNING("%s(): DB version (%s) correct (CKDB V%s)", __func__, DB_VERSION, CKDB_VERSION); + LOGWARNING("%s(): %s", __func__, pgv); + + free(pgv); return true; } From fe9ea8b0f52f73148208ff5928da509617fe6a17 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 17 Sep 2014 14:04:10 +1000 Subject: [PATCH 5/5] ckdb - add TZ to screen log and handle fractional hour timezones --- src/ckdb.c | 51 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 2b5f4db1..c8bfd20d 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -49,7 +49,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.2" -#define CKDB_VERSION DB_VERSION"-0.311" +#define CKDB_VERSION DB_VERSION"-0.313" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1605,22 +1605,27 @@ static void _txt_to_data(enum data_type typ, char *nam, char *fld, void *data, s WHERE_FFL, nam, (int)siz, (int)sizeof(tv_t), WHERE_FFL_PASS); } - unsigned int yyyy, mm, dd, HH, MM, SS, uS = 0, tz; + unsigned int yyyy, mm, dd, HH, MM, SS, uS = 0, tz, tzm = 0; char pm[2]; struct tm tm; time_t tim; int n; - n = sscanf(fld, "%u-%u-%u %u:%u:%u%1[+-]%u", - &yyyy, &mm, &dd, &HH, &MM, &SS, pm, &tz); - if (n != 8) { + // A timezone looks like: +10 or +09:30 or -05 etc + n = sscanf(fld, "%u-%u-%u %u:%u:%u%1[+-]%u:%u", + &yyyy, &mm, &dd, &HH, &MM, &SS, pm, &tz, &tzm); + if (n < 8) { // allow uS - n = sscanf(fld, "%u-%u-%u %u:%u:%u.%u%1[+-]%u", - &yyyy, &mm, &dd, &HH, &MM, &SS, &uS, pm, &tz); - if (n != 9) { + n = sscanf(fld, "%u-%u-%u %u:%u:%u.%u%1[+-]%u:%u", + &yyyy, &mm, &dd, &HH, &MM, &SS, &uS, pm, &tz, &tzm); + if (n < 9) { quithere(1, "Field %s tv_t unhandled date '%s' (%d)" WHERE_FFL, nam, fld, n, WHERE_FFL_PASS); } - } + + if (n < 10) + tzm = 0; + } else if (n < 9) + tzm = 0; tm.tm_sec = (int)SS; tm.tm_min = (int)MM; tm.tm_hour = (int)HH; @@ -1633,9 +1638,7 @@ static void _txt_to_data(enum data_type typ, char *nam, char *fld, void *data, s ((tv_t *)data)->tv_sec = default_expiry.tv_sec; ((tv_t *)data)->tv_usec = default_expiry.tv_usec; } else { - // 2 digit tz (vs 4 digit) - if (tz < 25) - tz *= 60; + tz = tz * 60 + tzm; // time was converted ignoring tz - so correct it if (pm[0] == '-') tim += 60 * tz; @@ -1920,20 +1923,40 @@ void logmsg(int loglevel, const char *fmt, ...) va_list ap; char stamp[128]; char *extra = EMPTY; + char tzinfo[16]; + char tzch; + long minoff, hroff; if (loglevel > global_ckp->loglevel) return; now_t = time(NULL); localtime_r(&now_t, &tm); + minoff = timezone / 60; + if (minoff < 0) { + tzch = '+'; + minoff *= -1; + } else + tzch = '-'; + hroff = minoff / 60; + if (minoff % 60) { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld:%02ld", + tzch, hroff, minoff % 60); + } else { + snprintf(tzinfo, sizeof(tzinfo), + "%c%02ld", + tzch, hroff); + } snprintf(stamp, sizeof(stamp), - "[%d-%02d-%02d %02d:%02d:%02d]", + "[%d-%02d-%02d %02d:%02d:%02d%s]", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, - tm.tm_sec); + tm.tm_sec, + tzinfo); if (!fmt) { fprintf(stderr, "%s %s() called without fmt\n", stamp, __func__);