Browse Source

ckdb - optionally store shares in the DB with a chosen sdiff lower limit -D

master
kanoi 9 years ago
parent
commit
116fda74b9
  1. 6
      sql/ckdb.sql
  2. 30
      sql/v1.0.4-v1.0.5.sql
  3. 27
      src/ckdb.c
  4. 30
      src/ckdb.h
  5. 7
      src/ckdb_cmd.c
  6. 35
      src/ckdb_data.c
  7. 311
      src/ckdb_dbio.c

6
sql/ckdb.sql

@ -192,7 +192,7 @@ CREATE TABLE workinfo (
); );
CREATE TABLE shares ( -- not stored in the db - only in log files CREATE TABLE shares ( -- only shares with sdiff >= minsdiff are stored in the DB
workinfoid bigint NOT NULL, workinfoid bigint NOT NULL,
userid bigint NOT NULL, userid bigint NOT NULL,
workername character varying(256) NOT NULL, workername character varying(256) NOT NULL,
@ -205,6 +205,8 @@ CREATE TABLE shares ( -- not stored in the db - only in log files
errn integer NOT NULL, errn integer NOT NULL,
error character varying(64) DEFAULT ''::character varying NOT NULL, -- optional error character varying(64) DEFAULT ''::character varying NOT NULL, -- optional
secondaryuserid character varying(64) NOT NULL, secondaryuserid character varying(64) NOT NULL,
ntime character varying(64) NOT NULL,
minsdiff float NOT NULL,
createdate timestamp with time zone NOT NULL, createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL, createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL, createcode character varying(128) DEFAULT ''::character varying NOT NULL,
@ -470,4 +472,4 @@ CREATE TABLE version (
PRIMARY KEY (vlock) PRIMARY KEY (vlock)
); );
insert into version (vlock,version) values (1,'1.0.4'); insert into version (vlock,version) values (1,'1.0.5');

30
sql/v1.0.4-v1.0.5.sql

@ -0,0 +1,30 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='1.0.5' where vlock=1 and version='1.0.4';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "1.0.4" - found "%"', ver;
END $$;
ALTER TABLE ONLY shares
ADD COLUMN ntime character varying(64) DEFAULT ''::character varying NOT NULL,
ADD COLUMN minsdiff float DEFAULT 0::float NOT NULL;
ALTER TABLE ONLY shares
ALTER COLUMN ntime DROP DEFAULT,
ALTER COLUMN minsdiff DROP DEFAULT;
END transaction;

27
src/ckdb.c

@ -432,8 +432,12 @@ K_LIST *shares_free;
K_STORE *shares_store; K_STORE *shares_store;
K_TREE *shares_early_root; K_TREE *shares_early_root;
K_STORE *shares_early_store; K_STORE *shares_early_store;
K_TREE *shares_hi_root;
K_TREE *shares_db_root;
K_STORE *shares_hi_store;
double diff_percent = DIFF_VAL(DIFF_PERCENT_DEFAULT); double diff_percent = DIFF_VAL(DIFF_PERCENT_DEFAULT);
double share_min_sdiff = 0;
// SHAREERRORS shareerrors.id.json={...} // SHAREERRORS shareerrors.id.json={...}
K_TREE *shareerrors_root; K_TREE *shareerrors_root;
@ -926,6 +930,8 @@ static bool getdata3()
} }
if (!(ok = markersummary_fill(conn)) || everyone_die) if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai; goto sukamudai;
if (!(ok = shares_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary && !everyone_die) if (!confirm_sharesummary && !everyone_die)
ok = poolstats_fill(conn); ok = poolstats_fill(conn);
@ -1200,6 +1206,9 @@ static void alloc_storage()
shares_early_store = k_new_store(shares_free); shares_early_store = k_new_store(shares_free);
shares_root = new_ktree(NULL, cmp_shares, shares_free); shares_root = new_ktree(NULL, cmp_shares, shares_free);
shares_early_root = new_ktree("SharesEarly", cmp_shares, shares_free); shares_early_root = new_ktree("SharesEarly", cmp_shares, shares_free);
shares_hi_store = k_new_store(shares_free);
shares_hi_root = new_ktree("SharesHi", cmp_shares, shares_free);
shares_db_root = new_ktree("SharesDB", cmp_shares, shares_free);
shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS), shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS),
ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, ALLOC_SHAREERRORS, LIMIT_SHAREERRORS,
@ -1597,6 +1606,8 @@ static void dealloc_storage()
FREE_STORE_DATA(sharesummary); FREE_STORE_DATA(sharesummary);
FREE_LIST_DATA(sharesummary); FREE_LIST_DATA(sharesummary);
LOGWARNING("%s() shares ...", __func__);
if (shareerrors_early_store->count > 0) { if (shareerrors_early_store->count > 0) {
LOGERR("%s() *** shareerrors_early count %d ***", LOGERR("%s() *** shareerrors_early count %d ***",
__func__, shareerrors_early_store->count); __func__, shareerrors_early_store->count);
@ -1618,6 +1629,10 @@ static void dealloc_storage()
FREE_TREE(shareerrors_early); FREE_TREE(shareerrors_early);
FREE_STORE(shareerrors_early); FREE_STORE(shareerrors_early);
FREE_ALL(shareerrors); FREE_ALL(shareerrors);
FREE_TREE(shares_hi);
FREE_TREE(shares_db);
FREE_STORE(shares_hi);
if (shares_early_store->count > 0) { if (shares_early_store->count > 0) {
LOGERR("%s() *** shares_early count %d ***", LOGERR("%s() *** shares_early count %d ***",
__func__, shares_early_store->count); __func__, shares_early_store->count);
@ -1651,6 +1666,8 @@ static void dealloc_storage()
FREE_LIST_DATA(workinfo); FREE_LIST_DATA(workinfo);
} }
LOGWARNING("%s() etc ...", __func__);
FREE_LISTS(idcontrol); FREE_LISTS(idcontrol);
FREE_ALL(accountbalance); FREE_ALL(accountbalance);
FREE_ALL(payments); FREE_ALL(payments);
@ -5808,6 +5825,7 @@ static struct option long_options[] = {
{ "alert", required_argument, 0, 'c' }, { "alert", required_argument, 0, 'c' },
{ "config", required_argument, 0, 'c' }, { "config", required_argument, 0, 'c' },
{ "dbname", required_argument, 0, 'd' }, { "dbname", required_argument, 0, 'd' },
{ "minsdiff", required_argument, 0, 'D' },
{ "free", required_argument, 0, 'f' }, { "free", required_argument, 0, 'f' },
// generate = enable payout pplns auto generation // generate = enable payout pplns auto generation
{ "generate", no_argument, 0, 'g' }, { "generate", no_argument, 0, 'g' },
@ -5863,7 +5881,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:c:d:ghi:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
switch(c) { switch(c) {
case 'a': case 'a':
len = strlen(optarg); len = strlen(optarg);
@ -5882,6 +5900,13 @@ int main(int argc, char **argv)
while (*kill) while (*kill)
*(kill++) = ' '; *(kill++) = ' ';
break; break;
case 'D':
share_min_sdiff = atof(optarg);
if (share_min_sdiff < 0) {
quit(1, "Invalid share_min_sdiff '%s' "
"must be >= 0", optarg);
}
break;
case 'f': case 'f':
if (strcasecmp(optarg, FREE_MODE_ALL_STR) == 0) if (strcasecmp(optarg, FREE_MODE_ALL_STR) == 0)
free_mode = FREE_MODE_ALL; free_mode = FREE_MODE_ALL;

30
src/ckdb.h

@ -50,8 +50,8 @@
* Consider adding row level locking (a per kitem usage count) if needed */ * Consider adding row level locking (a per kitem usage count) if needed */
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.4" #define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-1.960" #define CKDB_VERSION DB_VERSION"-1.970"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1673,6 +1673,8 @@ typedef struct shares {
int32_t errn; int32_t errn;
char error[TXT_SML+1]; char error[TXT_SML+1];
char secondaryuserid[TXT_SML+1]; char secondaryuserid[TXT_SML+1];
char ntime[TXT_SML+1];
double minsdiff;
HISTORYDATECONTROLFIELDS; HISTORYDATECONTROLFIELDS;
int32_t redo; // non-DB field int32_t redo; // non-DB field
int32_t oldcount; // non-DB field int32_t oldcount; // non-DB field
@ -1689,6 +1691,13 @@ extern K_STORE *shares_store;
// shares unexpectedly before the workinfo // shares unexpectedly before the workinfo
extern K_TREE *shares_early_root; extern K_TREE *shares_early_root;
extern K_STORE *shares_early_store; extern K_STORE *shares_early_store;
/* DB stored high sdiff shares N.B. they are duplicated,
* not relinked, since an item can't be in 2 lists
* New high shares are placed in both trees then removed from shares_hi_root
* after they are stored in the db */
extern K_TREE *shares_hi_root;
extern K_TREE *shares_db_root;
extern K_STORE *shares_hi_store;
/* Once a share is this old, it can only once more be /* Once a share is this old, it can only once more be
check for it's workinfoid and then be discarded */ check for it's workinfoid and then be discarded */
@ -1705,6 +1714,11 @@ extern K_STORE *shares_early_store;
extern double diff_percent; extern double diff_percent;
/* Record shares in the DB >= this
* The default of 0 means don't store shares
* This is set only via the runtime parameter -D or --minsdiff */
extern double share_min_sdiff;
// SHAREERRORS shareerrors.id.json={...} // SHAREERRORS shareerrors.id.json={...}
typedef struct shareerrors { typedef struct shareerrors {
int64_t workinfoid; int64_t workinfoid;
@ -3029,10 +3043,14 @@ extern int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc
char *code, char *inet, tv_t *cd, bool igndup, char *code, char *inet, tv_t *cd, bool igndup,
K_TREE *trf_root); K_TREE *trf_root);
extern bool workinfo_fill(PGconn *conn); extern bool workinfo_fill(PGconn *conn);
extern bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, extern bool shares_add(PGconn *conn, char *workinfoid, char *username,
char *clientid, char *errn, char *enonce1, char *nonce2, char *workername, char *clientid, char *errn,
char *nonce, char *diff, char *sdiff, char *secondaryuserid, char *enonce1, char *nonce2, char *nonce, char *diff,
char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); char *sdiff, char *secondaryuserid, char *ntime,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root);
extern bool shares_db(PGconn *conn, K_ITEM *s_item);
extern bool shares_fill(PGconn *conn);
extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
char *workername, char *clientid, char *errn, char *workername, char *clientid, char *errn,
char *error, char *secondaryuserid, char *by, char *error, char *secondaryuserid, char *by,

7
src/ckdb_cmd.c

@ -2601,7 +2601,7 @@ wiconf:
} else if (strcasecmp(cmd, STR_SHARES) == 0) { } else if (strcasecmp(cmd, STR_SHARES) == 0) {
K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_errn; K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_errn;
K_ITEM *i_enonce1, *i_nonce2, *i_nonce, *i_diff, *i_sdiff; K_ITEM *i_enonce1, *i_nonce2, *i_nonce, *i_diff, *i_sdiff;
K_ITEM *i_secondaryuserid; K_ITEM *i_secondaryuserid, *i_ntime;
bool ok; bool ok;
i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz);
@ -2676,6 +2676,10 @@ wiconf:
if (!i_secondaryuserid) if (!i_secondaryuserid)
i_secondaryuserid = &shares_secondaryuserid; i_secondaryuserid = &shares_secondaryuserid;
i_ntime = require_name(trf_root, "ntime", 1, NULL, reply, siz);
if (!i_ntime)
return strdup(reply);
ok = shares_add(conn, transfer_data(i_workinfoid), ok = shares_add(conn, transfer_data(i_workinfoid),
transfer_data(i_username), transfer_data(i_username),
transfer_data(i_workername), transfer_data(i_workername),
@ -2687,6 +2691,7 @@ wiconf:
transfer_data(i_diff), transfer_data(i_diff),
transfer_data(i_sdiff), transfer_data(i_sdiff),
transfer_data(i_secondaryuserid), transfer_data(i_secondaryuserid),
transfer_data(i_ntime),
by, code, inet, cd, trf_root); by, code, inet, cd, trf_root);
if (!ok) { if (!ok) {

35
src/ckdb_data.c

@ -5436,11 +5436,13 @@ K_ITEM *_find_markersummary(int64_t markerid, int64_t workinfoid,
bool make_markersummaries(bool msg, char *by, char *code, char *inet, bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root) tv_t *cd, K_TREE *trf_root)
{ {
PGconn *conn;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers; WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL; K_ITEM *wm_item, *wm_last = NULL, *s_item = NULL;
bool ok, did;
int count = 0;
tv_t now; tv_t now;
bool ok;
K_RLOCK(workmarkers_free); K_RLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
@ -5463,6 +5465,27 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
return false; return false;
} }
conn = dbconnect();
/* Store all shares in the DB before processing the workmarker
* This way we know that the high shares in the DB will match the start
* of, or be after the start of, the shares included in the reload
* All duplicate high shares are ignored */
count = 0;
do {
did = false;
K_WLOCK(shares_free);
s_item = first_in_ktree(shares_hi_root, ctx);
K_WUNLOCK(shares_free);
if (s_item) {
did = true;
ok = shares_db(conn, s_item);
if (!ok)
goto flailed;
count++;
}
} while (did);
DATA_WORKMARKERS(workmarkers, wm_last); DATA_WORKMARKERS(workmarkers, wm_last);
LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/" LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/"
@ -5486,10 +5509,16 @@ bool make_markersummaries(bool msg, char *by, char *code, char *inet,
* payout is being generated * payout is being generated
* N.B. this is a long lock since it stores the markersummaries */ * N.B. this is a long lock since it stores the markersummaries */
K_WLOCK(process_pplns_free); K_WLOCK(process_pplns_free);
ok = sharesummaries_to_markersummaries(NULL, workmarkers, by, code, ok = sharesummaries_to_markersummaries(conn, workmarkers, by, code,
inet, &now, trf_root); inet, &now, trf_root);
K_WUNLOCK(process_pplns_free); K_WUNLOCK(process_pplns_free);
flailed:
PQfinish(conn);
if (count > 0)
LOGWARNING("%s() Stored: %d high shares", __func__, count);
return ok; return ok;
} }

311
src/ckdb_dbio.c

@ -185,6 +185,7 @@ char *pqerrmsg(PGconn *conn)
#define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16" #define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16"
#define PQPARAM17 PQPARAM16 ",$17" #define PQPARAM17 PQPARAM16 ",$17"
#define PQPARAM18 PQPARAM16 ",$17,$18" #define PQPARAM18 PQPARAM16 ",$17,$18"
#define PQPARAM19 PQPARAM16 ",$17,$18,$19"
#define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22" #define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22"
#define PQPARAM23 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23" #define PQPARAM23 PQPARAM16 ",$17,$18,$19,$20,$21,$22,$23"
#define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26" #define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26"
@ -3624,15 +3625,18 @@ discard:
static void shareerrors_process_early(PGconn *conn, int64_t good_wid, static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
tv_t *good_cd, K_TREE *trf_root); tv_t *good_cd, K_TREE *trf_root);
// Memory (and log file) only // DB Shares are stored by by the summariser to ensure the reload is correct
bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername,
char *clientid, char *errn, char *enonce1, char *nonce2, char *clientid, char *errn, char *enonce1, char *nonce2,
char *nonce, char *diff, char *sdiff, char *secondaryuserid, char *nonce, char *diff, char *sdiff, char *secondaryuserid,
char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root) char *ntime, char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{ {
K_ITEM *s_item = NULL, *u_item, *wi_item; K_TREE_CTX ctx[1];
K_ITEM *s_item = NULL, *s2_item = NULL, *u_item, *wi_item, *tmp_item;
char cd_buf[DATE_BUFSIZ]; char cd_buf[DATE_BUFSIZ];
SHARES *shares = NULL; SHARES *shares = NULL, *shares2 = NULL;
double sdiff_amt;
USERS *users; USERS *users;
bool ok = false; bool ok = false;
char *st = NULL; char *st = NULL;
@ -3643,8 +3647,12 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
errn, cd->tv_sec, cd->tv_usec); errn, cd->tv_sec, cd->tv_usec);
FREENULL(st); FREENULL(st);
TXT_TO_DOUBLE("sdiff", sdiff, sdiff_amt);
K_WLOCK(shares_free); K_WLOCK(shares_free);
s_item = k_unlink_head(shares_free); s_item = k_unlink_head(shares_free);
if (share_min_sdiff > 0 && sdiff_amt >= share_min_sdiff)
s2_item = k_unlink_head(shares_free);
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
DATA_SHARES(shares, s_item); DATA_SHARES(shares, s_item);
@ -3692,9 +3700,17 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
} }
} }
STRNCPY(shares->ntime, ntime);
shares->minsdiff = share_min_sdiff;
HISTORYDATEINIT(shares, cd, by, code, inet); HISTORYDATEINIT(shares, cd, by, code, inet);
HISTORYDATETRANSFER(trf_root, shares); HISTORYDATETRANSFER(trf_root, shares);
if (s2_item) {
DATA_SHARES(shares2, s2_item);
memcpy(shares2, shares, sizeof(*shares2));
}
wi_item = find_workinfo(shares->workinfoid, NULL); wi_item = find_workinfo(shares->workinfoid, NULL);
if (!wi_item) { if (!wi_item) {
btv_to_buf(cd, cd_buf, sizeof(cd_buf)); btv_to_buf(cd, cd_buf, sizeof(cd_buf));
@ -3710,6 +3726,19 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
// They need to be sorted by workinfoid // They need to be sorted by workinfoid
add_to_ktree(shares_early_root, s_item); add_to_ktree(shares_early_root, s_item);
k_add_head(shares_early_store, s_item); k_add_head(shares_early_store, s_item);
if (s2_item) {
// Just ignore duplicates
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
if (tmp_item == NULL) {
// Store them in advance - always
add_to_ktree(shares_hi_root, s2_item);
add_to_ktree(shares_db_root, s2_item);
k_add_head(shares_hi_store, s_item);
} else {
k_add_head(shares_free, s2_item);
s2_item = NULL;
}
}
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
/* It was all OK except the missing workinfoid /* It was all OK except the missing workinfoid
* and it was queued, so most likely OK */ * and it was queued, so most likely OK */
@ -3721,6 +3750,18 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
K_WLOCK(shares_free); K_WLOCK(shares_free);
add_to_ktree(shares_root, s_item); add_to_ktree(shares_root, s_item);
k_add_head(shares_store, s_item); k_add_head(shares_store, s_item);
if (s2_item) {
// Just ignore duplicates
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
if (tmp_item == NULL) {
add_to_ktree(shares_hi_root, s2_item);
add_to_ktree(shares_db_root, s2_item);
k_add_head(shares_hi_store, s2_item);
} else {
k_add_head(shares_free, s2_item);
s2_item = NULL;
}
}
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
shares_process_early(conn, wi_item, &(shares->createdate), shares_process_early(conn, wi_item, &(shares->createdate),
@ -3740,6 +3781,268 @@ tisbad:
return false; return false;
} }
bool shares_db(PGconn *conn, K_ITEM *s_item)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
SHARES *row;
char *ins;
char *params[14 + HISTORYDATECOUNT];
int n, par = 0;
bool ok = false;
LOGDEBUG("%s(): store", __func__);
DATA_SHARES(row, s_item);
par = 0;
params[par++] = bigint_to_buf(row->workinfoid, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = int_to_buf(row->clientid, NULL, 0);
params[par++] = str_to_buf(row->enonce1, NULL, 0);
params[par++] = str_to_buf(row->nonce2, NULL, 0);
params[par++] = str_to_buf(row->nonce, NULL, 0);
params[par++] = double_to_buf(row->diff, NULL, 0);
params[par++] = double_to_buf(row->sdiff, NULL, 0);
params[par++] = int_to_buf(row->errn, NULL, 0);
params[par++] = str_to_buf(row->error, NULL, 0);
params[par++] = str_to_buf(row->secondaryuserid, NULL, 0);
params[par++] = str_to_buf(row->ntime, NULL, 0);
params[par++] = double_to_buf(row->minsdiff, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into shares "
"(workinfoid,userid,workername,clientid,enonce1,nonce2,nonce,"
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff"
HISTORYDATECONTROL ") values (" PQPARAM19 ")";
if (!conn) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
if (par) {
PQclear(res);
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
free(params[n]);
}
if (ok) {
K_WLOCK(shares_free);
remove_from_ktree(shares_hi_root, s_item);
K_WUNLOCK(shares_free);
}
return ok;
}
bool shares_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item = NULL;
SHARES *row;
int n, t, i;
char *field;
char *sel = NULL;
int fields = 14;
bool ok = false;
LOGDEBUG("%s(): select", __func__);
printf(TICK_PREFIX"sh 0\r");
fflush(stdout);
sel = "declare sh cursor for select "
"workinfoid,userid,workername,clientid,enonce1,nonce2,nonce,"
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff"
HISTORYDATECONTROL
" from shares";
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
return false;
}
res = PQexec(conn, "Lock table shares in access exclusive mode", CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Lock", rescode, conn);
goto flail;
}
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Declare", rescode, conn);
goto flail;
}
LOGDEBUG("%s(): fetching ...", __func__);
res = PQexec(conn, "fetch 1 in sh", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch first", rescode, conn);
PQclear(res);
goto flail;
}
n = PQnfields(res);
if (n != (fields + HISTORYDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n);
PQclear(res);
goto flail;
}
n = 0;
ok = true;
K_WLOCK(shares_free);
while ((t = PQntuples(res)) > 0) {
for (i = 0; i < t; i++) {
item = k_unlink_head(shares_free);
DATA_SHARES(row, item);
bzero(row, sizeof(*row));
if (everyone_die) {
ok = false;
break;
}
PQ_GET_FLD(res, i, "workinfoid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("workinfoid", field, row->workinfoid);
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_STR("workername", field, row->workername);
PQ_GET_FLD(res, i, "clientid", field, ok);
if (!ok)
break;
TXT_TO_INT("clientid", field, row->clientid);
PQ_GET_FLD(res, i, "enonce1", field, ok);
if (!ok)
break;
TXT_TO_STR("enonce1", field, row->enonce1);
PQ_GET_FLD(res, i, "nonce2", field, ok);
if (!ok)
break;
TXT_TO_STR("nonce2", field, row->nonce2);
PQ_GET_FLD(res, i, "nonce", field, ok);
if (!ok)
break;
TXT_TO_STR("nonce", field, row->nonce);
PQ_GET_FLD(res, i, "diff", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("diff", field, row->diff);
PQ_GET_FLD(res, i, "sdiff", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("sdiff", field, row->sdiff);
PQ_GET_FLD(res, i, "errn", field, ok);
if (!ok)
break;
TXT_TO_INT("errn", field, row->errn);
PQ_GET_FLD(res, i, "error", field, ok);
if (!ok)
break;
TXT_TO_STR("error", field, row->error);
PQ_GET_FLD(res, i, "secondaryuserid", field, ok);
if (!ok)
break;
TXT_TO_STR("secondaryuserid", field, row->secondaryuserid);
PQ_GET_FLD(res, i, "ntime", field, ok);
if (!ok)
break;
TXT_TO_STR("ntime", field, row->ntime);
PQ_GET_FLD(res, i, "minsdiff", field, ok);
if (!ok)
break;
TXT_TO_DOUBLE("minsdiff", field, row->sdiff);
HISTORYDATEFLDS(res, i, row, ok);
if (!ok)
break;
add_to_ktree(shares_db_root, item);
k_add_head(shares_hi_store, item);
if (n == 0 || ((n+1) % 100000) == 0) {
printf(TICK_PREFIX"sh ");
pcom(n+1);
putchar('\r');
fflush(stdout);
}
tick();
n++;
}
PQclear(res);
res = PQexec(conn, "fetch 9999 in sh", CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Fetch next", rescode, conn);
ok = false;
break;
}
}
if (!ok)
k_add_head(shares_free, item);
K_WUNLOCK(shares_free);
PQclear(res);
flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): fetched %d shares records", __func__, n);
}
return ok;
}
static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
K_TREE *trf_root) K_TREE *trf_root)
{ {

Loading…
Cancel
Save