Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 10 years ago
parent
commit
b44502b406
  1. 37
      src/ckdb.c
  2. 10
      src/ckdb.h
  3. 32
      src/ckdb_cmd.c
  4. 197
      src/ckdb_dbio.c

37
src/ckdb.c

@ -31,18 +31,11 @@
* with an ok.queued reply to ckpool, to be processed after the reload
* completes and just process authorise messages immediately while the
* reload runs
* This can't cause a duplicate process of an authorise message since a
* reload will ignore any messages before the last DB auths message,
* however, if ckdb and ckpool get out of sync due to ckpool starting
* during the reload (as mentioned below) it is possible for ckdb to
* find an authorise message in the CCLs that was processed in the
* message queue and thus is already in the DB.
* This error would be very rare and also not an issue
* To avoid this, we start the ckpool message queue after loading
* the users, auths, idcontrol and workers DB tables, before loading the
* We start the ckpool message queue after loading
* the users, idcontrol and workers DB tables, before loading the
* much larger DB tables so that ckdb is effectively ready for messages
* almost immediately
* The first ckpool message also allows us to know where ckpool is up to
* The first ckpool message allows us to know where ckpool is up to
* in the CCLs and thus where to stop processing the CCLs to stay in
* sync with ckpool
* If ckpool isn't running, then the reload will complete at the end of
@ -89,13 +82,14 @@
* TODO: Verify that all DB sharesummaries with complete='n'
* have done this
* DB+RAM workinfo: start from newest DB createdate workinfo
* DB+RAM auths: start from newest DB createdate auths
* RAM auths: none (we store them in RAM only)
* DB+RAM poolstats: newest createdate poolstats
* TODO: subtract how much we need in RAM of the 'between'
* non db records - will depend on TODO: pool stats reporting
* requirements
* RAM userstats: none (we simply store the last one found)
* DB+RAM workers: created by auths so auths will resolve it
* DB+RAM workers: created by auths so are simply ignore if they
* already exist
* DB+RAM blocks: resolved by workinfo - any unsaved blocks (if any)
* will be after the last DB workinfo
* DB+RAM accountbalance (TODO): resolved by shares/workinfo/blocks
@ -269,8 +263,8 @@ bool dbload_only_sharesummary = false;
* markersummaries and pplns payouts may not be correct */
bool sharesummary_marks_limit = false;
// DB users,workers,auth load is complete
bool db_auths_complete = false;
// DB users,workers load is complete
bool db_users_complete = false;
// DB load is complete
bool db_load_complete = false;
// Different input data handling
@ -717,10 +711,7 @@ static bool getdata1()
goto matane;
if (!(ok = users_fill(conn)))
goto matane;
if (!(ok = workers_fill(conn)))
goto matane;
if (!confirm_sharesummary)
ok = auths_fill(conn);
ok = workers_fill(conn);
matane:
@ -792,8 +783,6 @@ static bool reload()
LOGWARNING("%s(): %s newest DB complete sharesummary", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_workinfo), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB workinfo", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_auths), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB auths", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_poolstats), buf, sizeof(buf));
LOGWARNING("%s(): %s newest DB poolstats", __func__, buf);
tv_to_buf(&(dbstatus.newest_createdate_blocks), buf, sizeof(buf));
@ -810,10 +799,6 @@ static bool reload()
copy_tv(&start, &(dbstatus.newest_createdate_workinfo));
reason = "workinfo";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_auths))) {
copy_tv(&start, &(dbstatus.newest_createdate_auths));
reason = "auths";
}
if (!tv_newer(&start, &(dbstatus.newest_createdate_poolstats))) {
copy_tv(&start, &(dbstatus.newest_createdate_poolstats));
reason = "poolstats";
@ -1184,7 +1169,7 @@ static bool setup_data()
if (!getdata1() || everyone_die)
return false;
db_auths_complete = true;
db_users_complete = true;
cksem_post(&socketer_sem);
if (!getdata2() || everyone_die)
@ -2383,7 +2368,7 @@ static void *socketer(__maybe_unused void *arg)
rename_proc("db_socketer");
while (!everyone_die && !db_auths_complete)
while (!everyone_die && !db_users_complete)
cksem_mswait(&socketer_sem, 420);
socketer_using_data = true;

10
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.910"
#define CKDB_VERSION DB_VERSION"-0.920"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -103,7 +103,6 @@ typedef struct loadstatus {
tv_t oldest_sharesummary_firstshare_a;
tv_t newest_sharesummary_firstshare_y;
tv_t newest_createdate_workinfo;
tv_t newest_createdate_auths;
tv_t newest_createdate_poolstats;
tv_t newest_starttimeband_userstats;
tv_t newest_createdate_blocks;
@ -256,8 +255,8 @@ extern bool dbload_only_sharesummary;
* markersummaries and pplns payouts may not be correct */
extern bool sharesummary_marks_limit;
// DB users,workers,auth load is complete
extern bool db_auths_complete;
// DB users,workers load is complete
extern bool db_users_complete;
// DB load is complete
extern bool db_load_complete;
// Different input data handling
@ -1835,9 +1834,8 @@ extern bool miningpayouts_fill(PGconn *conn);
extern bool auths_add(PGconn *conn, char *poolinstance, char *username,
char *workername, char *clientid, char *enonce1,
char *useragent, char *preauth, char *by, char *code,
char *inet, tv_t *cd, bool igndup, K_TREE *trf_root,
char *inet, tv_t *cd, K_TREE *trf_root,
bool addressuser, USERS **users, WORKERS **workers);
extern bool auths_fill(PGconn *conn);
extern bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
char *elapsed, char *users, char *workers,
char *hashrate, char *hashrate5m,

32
src/ckdb_cmd.c

@ -2174,7 +2174,7 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id,
}
static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
char *code, char *inet, tv_t *cd, bool igndup,
char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
char reply[1024] = "";
@ -2241,7 +2241,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
transfer_data(i_enonce1),
transfer_data(i_useragent),
transfer_data(i_preauth),
by, code, inet, cd, igndup, trf_root, false,
by, code, inet, cd, trf_root, false,
&users, &workers);
if (!ok) {
@ -2266,21 +2266,11 @@ static char *cmd_auth(PGconn *conn, char *cmd, char *id,
char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
bool igndup = false;
// confirm_summaries() doesn't call this
if (reloading) {
if (tv_equal(cd, &(dbstatus.newest_createdate_auths)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_auths)))
return NULL;
}
return cmd_auth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root);
return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root);
}
static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by,
char *code, char *inet, tv_t *cd, bool igndup,
char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
char reply[1024] = "";
@ -2329,7 +2319,7 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by,
transfer_data(i_enonce1),
transfer_data(i_useragent),
transfer_data(i_preauth),
by, code, inet, cd, igndup, trf_root, true,
by, code, inet, cd, trf_root, true,
&users, &workers);
if (!ok) {
@ -2354,17 +2344,7 @@ static char *cmd_addrauth(PGconn *conn, char *cmd, char *id,
char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
bool igndup = false;
// confirm_summaries() doesn't call this
if (reloading) {
if (tv_equal(cd, &(dbstatus.newest_createdate_auths)))
igndup = true;
else if (tv_newer(cd, &(dbstatus.newest_createdate_auths)))
return NULL;
}
return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root);
return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root);
}
static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id,

197
src/ckdb_dbio.c

@ -4490,22 +4490,17 @@ bool miningpayouts_fill(PGconn *conn)
return ok;
}
// TODO: discard them from RAM
bool auths_add(PGconn *conn, char *poolinstance, char *username,
char *workername, char *clientid, char *enonce1,
char *useragent, char *preauth, char *by, char *code,
char *inet, tv_t *cd, bool igndup, K_TREE *trf_root,
char *inet, tv_t *cd, K_TREE *trf_root,
bool addressuser, USERS **users, WORKERS **workers)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
K_TREE_CTX ctx[1];
K_ITEM *a_item, *u_item, *w_item;
char cd_buf[DATE_BUFSIZ];
AUTHS *row;
char *ins;
char *params[8 + HISTORYDATECOUNT];
int n, par = 0;
bool ok = false;
LOGDEBUG("%s(): add", __func__);
@ -4521,10 +4516,6 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
K_RUNLOCK(users_free);
if (!u_item) {
if (addressuser) {
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
u_item = users_add(conn, username, EMPTY, EMPTY,
by, code, inet, cd, trf_root);
} else {
@ -4569,14 +4560,10 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
k_add_head(auths_free, a_item);
K_WUNLOCK(auths_free);
if (conned)
PQfinish(conn);
if (!igndup) {
// Shouldn't actually be possible unless twice in the logs
tv_to_buf(cd, cd_buf, sizeof(cd_buf));
LOGERR("%s(): Duplicate auths ignored %s/%s/%s",
__func__, poolinstance, workername, cd_buf);
}
/* Let them mine, that's what matters :)
* though this would normally only be during a reload */
@ -4587,45 +4574,10 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
// Update even if DB fails
workerstatus_update(row, NULL, NULL);
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
row->authid = 1;
row->authid = nextid(conn, "authid", (int64_t)1, cd, by, code, inet);
if (row->authid == 0)
goto unitem;
par = 0;
params[par++] = bigint_to_buf(row->authid, NULL, 0);
params[par++] = str_to_buf(row->poolinstance, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = int_to_buf(row->clientid, NULL, 0);
params[par++] = str_to_buf(row->enonce1, NULL, 0);
params[par++] = str_to_buf(row->useragent, NULL, 0);
params[par++] = str_to_buf(row->preauth, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into auths "
"(authid,poolinstance,userid,workername,clientid,enonce1,useragent,preauth"
HISTORYDATECONTROL ") values (" PQPARAM13 ")";
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
for (n = 0; n < par; n++)
free(params[n]);
unitem:
if (conned)
PQfinish(conn);
K_WLOCK(auths_free);
#if 1
/* To save ram for now, don't store them,
@ -4644,147 +4596,6 @@ unitem:
return ok;
}
bool auths_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
K_ITEM *item;
AUTHS *row;
char *params[2];
int n, i;
int par = 0;
char *field;
char *sel;
int fields = 7;
bool ok;
tv_t now;
LOGDEBUG("%s(): select", __func__);
// TODO: add/update a (single) fake auth every ~10min or 10min after the last one?
sel = "select "
"authid,userid,workername,clientid,enonce1,useragent,preauth"
HISTORYDATECONTROL
" from auths where expirydate=$1 and createdate>=$2";
setnow(&now);
now.tv_sec -= (24 * 60 * 60); // last day worth
par = 0;
params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0);
params[par++] = tv_to_buf((tv_t *)(&now), NULL, 0);
PARCHK(par, params);
res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
return false;
}
#if 0
// Only load the last record for each workername
sel = "with last as ("
"select authid,userid,workername,clientid,enonce1,useragent,preauth"
HISTORYDATECONTROL
",row_number() over(partition by userid,workername "
"order by expirydate desc, createdate desc)"
" as best from auths"
") select * from last where best=1";
res = PQexec(conn, sel, CKPQ_READ);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Select", rescode, conn);
PQclear(res);
return false;
}
#endif
n = PQnfields(res);
if (n != (fields + HISTORYDATECOUNT)) {
LOGERR("%s(): Invalid field count - should be %d, but is %d",
__func__, fields + HISTORYDATECOUNT, n);
PQclear(res);
return false;
}
n = PQntuples(res);
LOGDEBUG("%s(): tree build count %d", __func__, n);
ok = true;
K_WLOCK(auths_free);
for (i = 0; i < n; i++) {
item = k_unlink_head(auths_free);
DATA_AUTHS(row, item);
if (everyone_die) {
ok = false;
break;
}
PQ_GET_FLD(res, i, "authid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("authid", field, row->authid);
PQ_GET_FLD(res, i, "userid", field, ok);
if (!ok)
break;
TXT_TO_BIGINT("userid", field, row->userid);
PQ_GET_FLD(res, i, "workername", field, ok);
if (!ok)
break;
TXT_TO_STR("workername", field, row->workername);
PQ_GET_FLD(res, i, "clientid", field, ok);
if (!ok)
break;
TXT_TO_INT("clientid", field, row->clientid);
PQ_GET_FLD(res, i, "enonce1", field, ok);
if (!ok)
break;
TXT_TO_STR("enonce1", field, row->enonce1);
PQ_GET_FLD(res, i, "useragent", field, ok);
if (!ok)
break;
TXT_TO_STR("useragent", field, row->useragent);
PQ_GET_FLD(res, i, "preauth", field, ok);
if (!ok)
break;
TXT_TO_STR("preauth", field, row->preauth);
HISTORYDATEFLDS(res, i, row, ok);
if (!ok)
break;
auths_root = add_to_ktree(auths_root, item, cmp_auths);
k_add_head(auths_store, item);
workerstatus_update(row, NULL, NULL);
if (tv_newer(&(dbstatus.newest_createdate_auths), &(row->createdate)))
copy_tv(&(dbstatus.newest_createdate_auths), &(row->createdate));
tick();
}
if (!ok)
k_add_head(auths_free, item);
K_WUNLOCK(auths_free);
PQclear(res);
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): loaded %d auth records", __func__, n);
}
return ok;
}
bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
char *elapsed, char *users, char *workers,
char *hashrate, char *hashrate5m,

Loading…
Cancel
Save