Browse Source

Merge branch 'master' into segwit-rebased

master
Con Kolivas 8 years ago
parent
commit
7ddc30d61c
  1. 2
      configure.ac
  2. 1917
      src/ckdb.c
  3. 78
      src/ckdb.h
  4. 155
      src/ckdb_cmd.c
  5. 214
      src/ckdb_data.c
  6. 287
      src/ckdb_dbio.c
  7. 1
      src/klist.c
  8. 27
      src/klist.h

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.9.3, kernel@kolivas.org)
AC_INIT(ckpool, 0.9.4, kernel@kolivas.org)
AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4])

1917
src/ckdb.c

File diff suppressed because it is too large Load Diff

78
src/ckdb.h

@ -11,8 +11,11 @@
#ifndef CKDB_H
#define CKDB_H
// Remove this line if you have an old GCC version
#ifdef __GNUC__
#if __GNUC__ >= 6
#pragma GCC diagnostic ignored "-Wtautological-compare"
#endif
#endif
#include "config.h"
@ -55,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.305"
#define CKDB_VERSION DB_VERSION"-2.420"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -115,6 +118,17 @@ enum free_modes {
extern enum free_modes free_mode;
// Define the array size for thread data
#define THREAD_LIMIT 99
/* To notify thread changes
* Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */
extern int reload_queue_threads_delta;
extern int proc_queue_threads_delta;
// To notify thread changes
extern int reload_breakdown_threads_delta;
extern int cmd_breakdown_threads_delta;
#define BLANK " "
extern char *EMPTY;
extern const char *nullstr;
@ -265,7 +279,9 @@ enum data_type {
TYPE_BLOB,
TYPE_DOUBLE,
TYPE_T,
TYPE_BT
TYPE_BT,
TYPE_HMS,
TYPE_MS
};
// BLOB does what PTR needs
@ -338,6 +354,8 @@ extern bool db_load_complete;
extern bool prereload;
// Different input data handling
extern bool reloading;
// Start marks processing during a larger reload
extern bool reloaded_N_files;
// Data load is complete
extern bool startup_complete;
// Tell everyone to die
@ -720,6 +738,7 @@ enum cmd_values {
CMD_LOCKS,
CMD_EVENTS,
CMD_HIGH,
CMD_THREADS,
CMD_END
};
@ -1096,6 +1115,23 @@ enum cmd_values {
(_row)->pointers = (_row)->pointers; \
} while (0)
// IOQUEUE
typedef struct ioqueue {
char *msg;
tv_t when;
int errn;
bool logfd;
bool logout;
bool logerr;
bool eol;
bool flush;
} IOQUEUE;
#define ALLOC_IOQUEUE 1024
#define LIMIT_IOQUEUE 0
#define INIT_IOQUEUE(_item) INIT_GENERIC(_item, ioqueue)
#define DATA_IOQUEUE(_var, _item) DATA_GENERIC(_var, _item, ioqueue, true)
// LOGQUEUE
typedef struct logqueue {
char *msg;
@ -1624,6 +1660,8 @@ extern K_TREE *users_root;
extern K_TREE *userid_root;
extern K_LIST *users_free;
extern K_STORE *users_store;
// Emulate a list for lock checking
extern K_LIST *users_db_free;
// USERATTS
typedef struct useratts {
@ -1890,6 +1928,8 @@ extern tv_t last_bc;
// current network diff
extern double current_ndiff;
extern bool txn_tree_store;
// avoid trying to run 2 ages at the same time
extern bool workinfo_age_lock;
// Offset in binary coinbase1 of the block number
#define BLOCKNUM_OFFSET 42
@ -2013,7 +2053,6 @@ typedef struct sharesummary {
tv_t lastshareacc;
double lastdiffacc;
char complete[TXT_FLAG+1];
MODIFYDATECONTROLPOINTERS;
} SHARESUMMARY;
/* After this many shares added, we need to update the DB record
@ -2697,7 +2736,6 @@ typedef struct keysharesummary {
tv_t lastshareacc;
double lastdiffacc;
char complete[TXT_FLAG+1];
SIMPLEDATECONTROLPOINTERS;
} KEYSHARESUMMARY;
#define ALLOC_KEYSHARESUMMARY 1000
@ -2924,6 +2962,7 @@ enum reply_type {
};
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnowts(ts_t *now);
extern void setnow(tv_t *now);
extern void tick();
extern PGconn *dbconnect();
@ -3018,6 +3057,8 @@ extern char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz,
#define t_to_buf(_data, _buf, _siz) _t_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define bt_to_buf(_data, _buf, _siz) _bt_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define btu64_to_buf(_data, _buf, _siz) _btu64_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define hms_to_buf(_data, _buf, _siz) _hms_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
#define ms_to_buf(_data, _buf, _siz) _ms_to_buf(_data, _buf, _siz, WHERE_FFL_HERE)
extern char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS);
extern char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS);
@ -3040,6 +3081,10 @@ extern char *_t_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
// Convert seconds (only) time to (brief) M-DD/HH:MM:SS
extern char *_bt_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
extern char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
// Convert to HH:MM:SS
extern char *_hms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
// Convert to MM:SS
extern char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS);
extern void dsp_transfer(K_ITEM *item, FILE *stream);
@ -3142,10 +3187,9 @@ extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b);
#define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx);
extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx);
extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd, tv_t *ss_first,
tv_t *ss_last, int64_t *ss_count, int64_t *s_count,
int64_t *s_diff);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff);
extern double coinbase_reward(int32_t height);
extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid);
extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
@ -3166,8 +3210,7 @@ extern void zero_sharesummary(SHARESUMMARY *row);
extern K_ITEM *_find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid, bool pool);
extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername);
extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd);
extern void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd);
#define dbhash2btchash(_hash, _buf, _siz) \
_dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
@ -3411,16 +3454,11 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar
tv_t *cd, K_TREE *trf_root);
extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm);
extern char *ooo_status(char *buf, size_t siz);
#define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \
_sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \
WHERE_FFL_HERE)
extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
char *code, char *inet, tv_t *cd,
#define sharesummary_update(_s_row, _e_row, _cd) \
_sharesummary_update(_s_row, _e_row, _cd, WHERE_FFL_HERE)
extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd,
WHERE_FFL_ARGS);
#define sharesummary_age(_ss_item, _by, _code, _inet, _cd) \
_sharesummary_age(_ss_item, _by, _code, _inet, _cd, WHERE_FFL_HERE)
extern bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
tv_t *cd, WHERE_FFL_ARGS);
extern bool sharesummary_age(K_ITEM *ss_item);
extern bool keysharesummary_age(K_ITEM *kss_item);
extern bool sharesummary_fill(PGconn *conn);
extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,

155
src/ckdb_cmd.c

@ -2865,18 +2865,20 @@ seconf:
}
ok = workinfo_age(workinfoid, transfer_data(i_poolinstance),
by, code, inet, cd, &ss_first, &ss_last,
&ss_count, &s_count, &s_diff);
cd, &ss_first, &ss_last, &ss_count, &s_count,
&s_diff);
if (!ok) {
LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id);
return strdup("failed.DATA");
} else {
/* Don't slow down the reload - do them later */
if (!reloading || key_update) {
/* Don't slow down the reload - do them later,
* unless it's a long reload since:
* Any pool restarts in the reload data will cause
* unaged workinfos and thus would stop marker() */
if (!reloading || key_update || reloaded_N_files) {
// Aging is a queued item thus the reply is ignored
auto_age_older(workinfoid,
transfer_data(i_poolinstance),
by, code, inet, cd);
transfer_data(i_poolinstance), cd);
}
}
LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid);
@ -7663,6 +7665,67 @@ static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
"Payouts", FLDSEP, "", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
ok = true;
} else if (strcasecmp(request, "shareinfo") == 0) {
/* return share information for the workinfo with wid>=value
* if wid=0 then find the oldest workinfo that has shares */
K_ITEM *i_wid, s_look, *s_item;
SHARES lookshares, *shares;
int64_t selwid, wid, s_count = 0, s_diff = 0, s_sdiff = 0;
bool found;
i_wid = require_name(trf_root, "wid",
1, (char *)intpatt,
reply, siz);
if (!i_wid)
return strdup(reply);
TXT_TO_BIGINT("wid", transfer_data(i_wid), selwid);
INIT_SHARES(&s_look);
lookshares.workinfoid = selwid;
lookshares.userid = -1;
lookshares.workername[0] = '\0';
DATE_ZERO(&(lookshares.createdate));
s_look.data = (void *)(&lookshares);
found = false;
K_RLOCK(shares_free);
s_item = find_after_in_ktree(shares_root, &s_look, ctx);
if (s_item) {
found = true;
DATA_SHARES(shares, s_item);
wid = shares->workinfoid;
while (s_item) {
DATA_SHARES(shares, s_item);
if (shares->workinfoid != wid)
break;
s_count++;
s_diff += shares->diff;
if (s_sdiff < shares->sdiff)
s_sdiff = shares->sdiff;
s_item = next_in_ktree(ctx);
}
}
K_RUNLOCK(shares_free);
if (found) {
snprintf(tmp, sizeof(tmp), "selwid=%"PRId64"%c",
selwid, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "wid=%"PRId64"%c",
wid, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "shares=%"PRId64"%c",
s_count, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diff=%"PRId64"%c",
s_diff, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "maxsdiff=%"PRId64"%c",
s_sdiff, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
rows++;
}
ok = true;
} else {
free(buf);
@ -8354,6 +8417,85 @@ static char *cmd_high(PGconn *conn, char *cmd, char *id,
return buf;
}
// Running thread adjustments
static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd, K_TREE *trf_root,
__maybe_unused bool reload_data)
{
K_ITEM *i_name, *i_delta;
char *name, *delta;
char reply[1024] = "";
size_t siz = sizeof(reply);
char *buf = NULL;
int delta_value = 0;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_name = require_name(trf_root, "name", 1, NULL, reply, siz);
if (!i_name)
return strdup(reply);
name = transfer_data(i_name);
i_delta = require_name(trf_root, "delta", 2, NULL, reply, siz);
if (!i_delta)
return strdup(reply);
delta = transfer_data(i_delta);
if (*delta != '+' && *delta != '-') {
snprintf(reply, siz, "invalid delta '%s'", delta);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
delta_value = atoi(delta+1);
if (delta_value < 1 || delta_value >= THREAD_LIMIT) {
snprintf(reply, siz, "invalid delta range '%s'", delta);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
if (*delta == '-')
delta_value = -delta_value;
if (strcasecmp(name, "pr") == 0 ||
strcasecmp(name, "process_reload") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
reload_queue_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "pq") == 0 ||
strcasecmp(name, "pqproc") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
proc_queue_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "rb") == 0 ||
strcasecmp(name, "reload_breaker") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
reload_breakdown_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "cb") == 0 ||
strcasecmp(name, "cmd_breaker") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
cmd_breakdown_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else {
snprintf(reply, siz, "unknown name '%s'", name);
LOGERR("%s() %s.%s", __func__, id, reply);
return strdup(reply);
}
return buf;
}
/* The socket command format is as follows:
* Basic structure:
* cmd.ID.fld1=value1 FLDSEP fld2=value2 FLDSEP fld3=...
@ -8466,5 +8608,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_LOCKS, "locks", false, false, cmd_locks, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_EVENTS, "events", false, false, cmd_events, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_HIGH, "high", false, false, cmd_high, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_THREADS, "threads", false, false, cmd_threads, SEQ_NONE, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, SEQ_NONE, 0 }
};

214
src/ckdb_data.c

@ -89,12 +89,6 @@ void free_sharesummary_data(K_ITEM *item)
DATA_SHARESUMMARY(sharesummary, item);
LIST_MEM_SUB(sharesummary_free, sharesummary->workername);
FREENULL(sharesummary->workername);
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
}
void free_optioncontrol_data(K_ITEM *item)
@ -128,9 +122,6 @@ void free_keysharesummary_data(K_ITEM *item)
DATA_KEYSHARESUMMARY(keysharesummary, item);
LIST_MEM_SUB(keysharesummary_free, keysharesummary->key);
FREENULL(keysharesummary->key);
SET_CREATEBY(keysharesummary_free, keysharesummary->createby, EMPTY);
SET_CREATECODE(keysharesummary_free, keysharesummary->createcode, EMPTY);
SET_CREATEINET(keysharesummary_free, keysharesummary->createinet, EMPTY);
}
void free_keysummary_data(K_ITEM *item)
@ -520,6 +511,8 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_
case TYPE_BTV:
case TYPE_T:
case TYPE_BT:
case TYPE_HMS:
case TYPE_MS:
siz = DATE_BUFSIZ;
break;
case TYPE_CTV:
@ -606,6 +599,19 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_
tm.tm_min,
tm.tm_sec);
break;
case TYPE_HMS:
gmtime_r((time_t *)data, &tm);
snprintf(buf, siz, "%02d:%02d:%02d",
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
break;
case TYPE_MS:
gmtime_r((time_t *)data, &tm);
snprintf(buf, siz, "%02d:%02d",
tm.tm_min,
tm.tm_sec);
break;
}
return buf;
@ -683,6 +689,18 @@ char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
return _data_to_buf(TYPE_BT, (void *)&t, buf, siz, WHERE_FFL_PASS);
}
// Convert to HH:MM:SS
char *_hms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_HMS, (void *)data, buf, siz, WHERE_FFL_PASS);
}
// Convert to MM:SS
char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS)
{
return _data_to_buf(TYPE_MS, (void *)data, buf, siz, WHERE_FFL_PASS);
}
// For mutiple variable function calls that need the data
char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS)
{
@ -2135,6 +2153,11 @@ K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx)
}
#define DISCARD_ALL -1
/* No longer required since we already discard the shares after being added
* to the sharesummary */
#if 1
#define discard_shares(...)
#else
// userid = DISCARD_ALL will dump all shares for the given workinfoid
static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
int64_t *diff_tot, bool skipupdate,
@ -2144,6 +2167,8 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
SHARES lookshares, *shares;
K_TREE_CTX s_ctx[1];
char error[1024];
bool multiple = false;
int64_t curr_userid;
error[0] = '\0';
INIT_SHARES(&s_look);
@ -2154,6 +2179,7 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
DATE_ZERO(&(lookshares.createdate));
s_look.data = (void *)(&lookshares);
curr_userid = userid;
K_WLOCK(shares_free);
s_item = find_after_in_ktree(shares_root, &s_look, s_ctx);
while (s_item) {
@ -2167,37 +2193,62 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
break;
}
// Avoid releasing the lock the first time in
if (curr_userid == DISCARD_ALL)
curr_userid = shares->userid;
/* The shares being removed here wont be touched by any other
* code, so we don't need to hold the shares_free lock the
* whole time, since that would slow down incoming share
* processing too much - this only affects DISCARD_ALL
* TODO: delete the shares when they are summarised in the
* sharesummary */
if (shares->userid != curr_userid) {
K_WUNLOCK(shares_free);
curr_userid = shares->userid;
K_WLOCK(shares_free);
}
(*shares_tot)++;
if (shares->errn == SE_NONE)
(*diff_tot) += shares->diff;
if (reloading && skipupdate) {
(*shares_dumped)++;
if (error[0])
multiple = true;
else {
snprintf(error, sizeof(error),
"%"PRId64"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
}
tmp_item = next_in_ktree(s_ctx);
remove_from_ktree(shares_root, s_item);
k_unlink_item(shares_store, s_item);
if (reloading && skipupdate)
(*shares_dumped)++;
if (reloading && skipupdate && !error[0]) {
snprintf(error, sizeof(error),
"reload found aged share: %"PRId64
"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
k_add_head(shares_free, s_item);
s_item = tmp_item;
}
K_WUNLOCK(shares_free);
if (error[0])
LOGERR("%s(): %s", __func__, error);
if (error[0]) {
LOGERR("%s(): reload found %s aged share%s%s: %s",
__func__, multiple ? "multiple" : "an",
multiple ? "s" : EMPTY,
multiple ? ", the first was" : EMPTY,
error);
}
}
#endif
// Duplicates during a reload are set to not show messages
bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff)
bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd,
tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *s_count, int64_t *s_diff)
{
K_ITEM *wi_item, ss_look, *ss_item;
K_ITEM ks_look, *ks_item, *wm_item;
@ -2208,6 +2259,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
int64_t diff_tot;
KEYSHARESUMMARY lookkeysharesummary, *keysharesummary;
SHARESUMMARY looksharesummary, *sharesummary;
char complete[TXT_FLAG+1];
WORKINFO *workinfo;
bool ok = false, ksok = false, skipupdate = false;
@ -2269,8 +2321,12 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
// complete could change, the id fields wont be changed/removed yet
STRNCPY(complete, sharesummary->complete);
}
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid == workinfoid) {
ss_tot++;
skipupdate = false;
@ -2278,7 +2334,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
* so finding an aged sharesummary here is an error
* N.B. this can only happen with (very) old reload files */
if (reloading) {
if (sharesummary->complete[0] == SUMMARY_COMPLETE) {
if (complete[0] == SUMMARY_COMPLETE) {
ss_already++;
skipupdate = true;
if (confirm_sharesummary) {
@ -2292,7 +2348,9 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
}
if (!skipupdate) {
if (!sharesummary_age(ss_item, by, code, inet, cd)) {
K_WLOCK(sharesummary_free);
if (!sharesummary_age(ss_item)) {
K_WUNLOCK(sharesummary_free);
ss_failed++;
LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64,
__func__, sharesummary->userid,
@ -2308,6 +2366,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
copy_tv(ss_first, &(sharesummary->firstshare));
if (tv_newer(ss_last, &(sharesummary->lastshare)))
copy_tv(ss_last, &(sharesummary->lastshare));
K_WUNLOCK(sharesummary_free);
}
}
@ -2318,8 +2377,11 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
K_RLOCK(sharesummary_free);
ss_item = next_in_ktree(ss_ctx);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
STRNCPY(complete, sharesummary->complete);
}
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
}
if (ss_already || ss_failed || shares_dumped) {
@ -2350,8 +2412,12 @@ skip_ss:
ks_look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free);
ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx);
if (ks_item) {
DATA_KEYSHARESUMMARY(keysharesummary, ks_item);
// complete could change, the id fields wont be changed/removed yet
STRNCPY(complete, keysharesummary->complete);
}
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
while (ks_item && keysharesummary->workinfoid == workinfoid) {
ks_tot++;
skipupdate = false;
@ -2359,7 +2425,7 @@ skip_ss:
* so finding an aged keysharesummary here is an error
* N.B. this can only happen with (very) old reload files */
if (reloading && !key_update) {
if (keysharesummary->complete[0] == SUMMARY_COMPLETE) {
if (complete[0] == SUMMARY_COMPLETE) {
ks_already++;
skipupdate = true;
if (confirm_sharesummary) {
@ -2373,20 +2439,27 @@ skip_ss:
}
if (!skipupdate) {
K_WLOCK(keysharesummary_free);
if (!keysharesummary_age(ks_item)) {
ks_failed++;
K_WUNLOCK(keysharesummary_free);
LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s",
__func__, keysharesummary->workinfoid,
keysharesummary->keytype,
keysharesummary->key);
ksok = false;
} else {
K_WUNLOCK(keysharesummary_free);
}
}
K_RLOCK(keysharesummary_free);
ks_item = next_in_ktree(ks_ctx);
if (ks_item) {
DATA_KEYSHARESUMMARY(keysharesummary, ks_item);
STRNCPY(complete, keysharesummary->complete);
}
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
}
/* All shares should have been discarded during sharesummary
@ -2527,19 +2600,17 @@ cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b)
void dsp_sharesummary(K_ITEM *item, FILE *stream)
{
char createdate_buf[DATE_BUFSIZ];
SHARESUMMARY *s;
if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_SHARESUMMARY(s, item);
tv_to_buf(&(s->createdate), createdate_buf, sizeof(createdate_buf));
fprintf(stream, " uid=%"PRId64" wn='%s' wid=%"PRId64" "
"da=%f ds=%f ss=%f c='%s' cd=%s\n",
"da=%f ds=%f ss=%f c='%s'\n",
s->userid, s->workername, s->workinfoid,
s->diffacc, s->diffsta, s->sharesta,
s->complete, createdate_buf);
s->complete);
}
}
@ -2632,8 +2703,7 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
}
// key_update must age keysharesummary directly
static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd)
static void key_auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int64_t prev_found = 0;
@ -2651,6 +2721,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
int64_t age_id, do_id, to_id;
bool ok, found;
K_WLOCK(workinfo_free);
if (workinfo_age_lock) {
K_WUNLOCK(workinfo_free);
return;
} else
workinfo_age_lock = true;
K_WUNLOCK(workinfo_free);
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found;
@ -2663,15 +2741,15 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
INIT_KEYSHARESUMMARY(&look);
look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free);
kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
DATE_ZERO(&kss_first_min);
DATE_ZERO(&kss_last_max);
kss_count_tot = s_count_tot = s_diff_tot = 0;
found = false;
K_RLOCK(keysharesummary_free);
kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
while (kss_item && keysharesummary->workinfoid < workinfoid) {
if (keysharesummary->complete[0] == SUMMARY_NEW) {
age_id = keysharesummary->workinfoid;
@ -2686,9 +2764,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found);
// Don't repeat searching old items to avoid accessing their ram
if (!found)
if (!found) {
prev_found = workinfoid;
else {
} else {
/* Process all the consecutive keysharesummaries that's aren't aged
* This way we find each oldest 'batch' of keysharesummaries that have
* been missed and can report the range of data that was aged,
@ -2700,9 +2778,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(do_id, poolinstance, by, code, inet,
cd, &kss_first, &kss_last, &kss_count,
&s_count, &s_diff);
ok = workinfo_age(do_id, poolinstance, cd, &kss_first,
&kss_last, &kss_count, &s_count,
&s_diff);
kss_count_tot += kss_count;
s_count_tot += s_count;
@ -2774,12 +2852,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
idrange, keysharerange);
}
}
K_WLOCK(workinfo_free);
workinfo_age_lock = false;
K_WUNLOCK(workinfo_free);
}
/* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */
void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
char *code, char *inet, tv_t *cd)
void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd)
{
static int64_t last_attempted_id = -1;
static int64_t prev_found = 0;
@ -2798,10 +2878,21 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
bool ok, found;
if (key_update) {
key_auto_age_older(workinfoid, poolinstance, by, code, inet, cd);
key_auto_age_older(workinfoid, poolinstance, cd);
return;
}
/* Simply lock out more than one from running at the same time
* This locks access to prev_found, repeat and last_attempted_id
* If any are missed they'll be aged by the next age_workinfo in 30s */
K_WLOCK(workinfo_free);
if (workinfo_age_lock) {
K_WUNLOCK(workinfo_free);
return;
} else
workinfo_age_lock = true;
K_WUNLOCK(workinfo_free);
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found;
@ -2814,15 +2905,15 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
INIT_SHARESUMMARY(&look);
look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
DATE_ZERO(&ss_first_min);
DATE_ZERO(&ss_last_max);
ss_count_tot = s_count_tot = s_diff_tot = 0;
found = false;
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid < workinfoid) {
if (sharesummary->complete[0] == SUMMARY_NEW) {
age_id = sharesummary->workinfoid;
@ -2852,9 +2943,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
do_id = age_id;
to_id = 0;
do {
ok = workinfo_age(do_id, poolinstance, by, code, inet,
cd, &ss_first, &ss_last, &ss_count,
&s_count, &s_diff);
ok = workinfo_age(do_id, poolinstance, cd, &ss_first,
&ss_last, &ss_count, &s_count,
&s_diff);
ss_count_tot += ss_count;
s_count_tot += s_count;
@ -2926,6 +3017,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
idrange, sharerange);
}
}
K_WLOCK(workinfo_free);
workinfo_age_lock = false;
K_WUNLOCK(workinfo_free);
}
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS)

287
src/ckdb_dbio.c

@ -214,6 +214,17 @@ char *pqerrmsg(PGconn *conn)
#undef PQexec
#undef PQexecParams
/* Debug level to display write transactions - 0 removes the code
* Also enables checking the isread flag */
#define CKPQ_SHOW_WRITE 0
#define CKPQ_ISREAD1 "select "
#define CKPQ_ISREAD1LEN (sizeof(CKPQ_ISREAD1)-1)
#define CKPQ_ISREAD2 "declare "
#define CKPQ_ISREAD2LEN (sizeof(CKPQ_ISREAD2)-1)
#define CKPQ_ISREAD3 "fetch "
#define CKPQ_ISREAD3LEN (sizeof(CKPQ_ISREAD3)-1)
// Bug check to ensure no unexpected write txns occur
PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
{
@ -221,6 +232,40 @@ PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
#if CKPQ_SHOW_WRITE
if (isread) {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) {
LOGERR("%s() ERR: query flagged as read, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = false;
}
} else {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) {
LOGERR("%s() ERR: query flagged as write, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = true;
}
}
if (!isread) {
char *buf = NULL, ffl[128];
size_t len, off;
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, __func__);
APPEND_REALLOC(buf, off, len, "() W: '");
APPEND_REALLOC(buf, off, len, qry);
APPEND_REALLOC(buf, off, len, "'");
snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS);
APPEND_REALLOC(buf, off, len, ffl);
LOGMSGBUF(CKPQ_SHOW_WRITE, buf);
FREENULL(buf);
}
#endif
return PQexec(conn, qry);
}
@ -237,6 +282,47 @@ PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm");
#if CKPQ_SHOW_WRITE
if (isread) {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) {
LOGERR("%s() ERR: query flagged as read, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = false;
}
} else {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) {
LOGERR("%s() ERR: query flagged as write, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = true;
}
}
if (!isread) {
char *buf = NULL, num[16], ffl[128];
size_t len, off;
int i;
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, __func__);
APPEND_REALLOC(buf, off, len, "() W: '");
APPEND_REALLOC(buf, off, len, qry);
APPEND_REALLOC(buf, off, len, "'");
for (i = 0; i < nParams; i++) {
snprintf(num, sizeof(num), " $%d='", i+1);
APPEND_REALLOC(buf, off, len, num);
APPEND_REALLOC(buf, off, len, paramValues[i]);
APPEND_REALLOC(buf, off, len, "'");
}
snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS);
APPEND_REALLOC(buf, off, len, ffl);
LOGMSGBUF(CKPQ_SHOW_WRITE, buf);
FREENULL(buf);
}
#endif
return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths,
paramFormats, resultFormat);
}
@ -558,6 +644,19 @@ K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress,
LOGDEBUG("%s(): add", __func__);
/* 2 attempts to add the same user at the same time will only do it once
* The 2nd attempt will get back the data provided by the 1st
* and thus throw away any differences in the 2nd */
K_WLOCK(users_db_free);
K_RLOCK(users_free);
item = find_users(username);
K_RUNLOCK(users_free);
if (item) {
ok = true;
goto already;
}
K_WLOCK(users_free);
item = k_unlink_head(users_free);
K_WUNLOCK(users_free);
@ -665,6 +764,10 @@ unitem:
}
K_WUNLOCK(users_free);
already:
K_WUNLOCK(users_db_free);
if (ok)
return item;
else
@ -1469,6 +1572,11 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
LOGDEBUG("%s(): update", __func__);
/* Two attempts to update the same worker at the same time
* will determine the final state based on which gets the lock last,
* i.e. randomly, but without overwriting at the same time */
K_WLOCK(workers_db_free);
DATA_WORKERS(row, item);
if (check) {
@ -1583,6 +1691,9 @@ unparam:
for (n = 0; n < par; n++)
free(params[n]);
early:
K_WUNLOCK(workers_db_free);
return ok;
}
@ -3399,6 +3510,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
{
K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary;
char complete[TXT_FLAG+1];
WORKINFO *workinfo;
char *st = NULL;
@ -3434,9 +3546,9 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
100.0 * pool.diffacc /
workinfo->diff_target);
}
LOGWARNING("%s %s Diff %.1f%% (%.0f/%.1f) %s "
"Pool %.1f%s%s",
block ? "BLOCK!" : "Share",
LOGWARNING("%s (%"PRIu32") %s Diff %.1f%% (%.0f/%.1f) "
"%s Pool %.1f%s%s",
block ? "BLOCK!" : "Share", workinfo->height,
(sta == NULL) ? "ok" : sta,
100.0 * shares->sdiff / workinfo->diff_target,
shares->sdiff, workinfo->diff_target,
@ -3479,13 +3591,14 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
K_RLOCK(sharesummary_free);
ss_item = find_sharesummary(shares->userid, shares->workername,
shares->workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) {
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete,
__func__, complete,
shares->workinfoid, shares->userid,
st = safe_text_nonull(shares->workername),
shares->createdate.tv_sec,
@ -3495,6 +3608,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
return true;
}
}
K_RUNLOCK(sharesummary_free);
}
if (!key_update && !confirm_sharesummary) {
@ -3504,8 +3618,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
K_WUNLOCK(userinfo_free);
}
sharesummary_update(shares, NULL, shares->createby, shares->createcode,
shares->createinet, &(shares->createdate));
sharesummary_update(shares, NULL, &(shares->createdate));
return true;
}
@ -3601,8 +3714,8 @@ keep:
early_shares->oldcount, early_shares->redo);
FREENULL(st);
K_WLOCK(shares_free);
add_to_ktree(shares_root, es_item);
k_add_head(shares_store, es_item);
// Discard it, it's been processed
k_add_head(shares_free, es_item);
K_WUNLOCK(shares_free);
return;
discard:
@ -3639,6 +3752,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
USERS *users;
bool ok = false, dup = false;
char *st = NULL;
tv_t share_cd;
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
__func__,
@ -3660,6 +3774,8 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
K_RLOCK(users_free);
u_item = find_users(username);
K_RUNLOCK(users_free);
/* Can't change outside lock since we don't delete users
* or change their *userid */
if (!u_item) {
btv_to_buf(cd, cd_buf, sizeof(cd_buf));
/* This should never happen unless there's a bug in ckpool
@ -3672,7 +3788,6 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
goto tisbad;
}
DATA_USERS(users, u_item);
shares->userid = users->userid;
TXT_TO_BIGINT("workinfoid", workinfoid, shares->workinfoid);
@ -3762,9 +3877,10 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
ok = shares_process(conn, shares, wi_item, trf_root);
if (ok) {
copy_tv(&share_cd, &(shares->createdate));
K_WLOCK(shares_free);
add_to_ktree(shares_root, s_item);
k_add_head(shares_store, s_item);
// Discard it, it's been processed
k_add_head(shares_free, s_item);
if (s2_item) {
// Discard duplicates
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
@ -3791,11 +3907,10 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
FREENULL(st);
}
shares_process_early(conn, wi_item, &(shares->createdate),
trf_root);
shares_process_early(conn, wi_item, &share_cd, trf_root);
// Call both since shareerrors may be rare
shareerrors_process_early(conn, shares->workinfoid,
&(shares->createdate), trf_root);
&share_cd, trf_root);
// The original share was ok
return true;
@ -4142,6 +4257,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
{
K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary;
char complete[TXT_FLAG+1];
char *st = NULL;
LOGDEBUG("%s() add", __func__);
@ -4185,13 +4301,14 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
ss_item = find_sharesummary(shareerrors->userid,
shareerrors->workername,
shareerrors->workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) {
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete,
__func__, complete,
shareerrors->workinfoid,
shareerrors->userid,
st = safe_text_nonull(shareerrors->workername),
@ -4201,11 +4318,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
return false;
}
}
K_RUNLOCK(sharesummary_free);
}
sharesummary_update(NULL, shareerrors, shareerrors->createby,
shareerrors->createcode, shareerrors->createinet,
&(shareerrors->createdate));
sharesummary_update(NULL, shareerrors, &(shareerrors->createdate));
return true;
}
@ -4671,6 +4787,39 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
dokey:
if (key_update) {
setnow(&add_stt);
// Discard the sharesummaries
looksharesummary.workinfoid = workmarkers->workinfoidend;
looksharesummary.userid = MAXID;
looksharesummary.workername = EMPTY;
INIT_SHARESUMMARY(&ss_look);
ss_look.data = (void *)(&looksharesummary);
/* Since shares come in from ckpool at a high rate,
* we don't want to lock sharesummary for long
* Those incoming shares will not be touching the sharesummaries
* we are processing here */
K_RLOCK(sharesummary_free);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root,
&ss_look, ss_ctx);
K_RUNLOCK(sharesummary_free);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->workinfoid < workmarkers->workinfoidstart)
break;
K_WLOCK(sharesummary_free);
ss_prev = prev_in_ktree(ss_ctx);
k_unlink_item(sharesummary_store, ss_item);
K_WUNLOCK(sharesummary_free);
k_add_head_nolock(old_sharesummary_store, ss_item);
ss_item = ss_prev;
}
setnow(&add_fin);
}
setnow(&kadd_stt);
INIT_KEYSUMMARY(&ks_look);
@ -5182,14 +5331,13 @@ flail:
return ok;
}
// Requires K_WLOCK(sharesummary_free)
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new,
double *tdf, double *tdl)
{
tv_t *createdate;
K_WLOCK(sharesummary_free);
if (s_row)
createdate = &(s_row->createdate);
else
@ -5251,15 +5399,11 @@ static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
*tdf = tvdiff(createdate, &(row->firstshare));
*tdl = tvdiff(createdate, &(row->lastshare));
}
K_WUNLOCK(sharesummary_free);
}
static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row,
bool new)
{
K_WLOCK(keysharesummary_free);
if (new) {
zero_keysharesummary(row);
copy_tv(&(row->firstshare), &(s_row->createdate));
@ -5307,8 +5451,6 @@ static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row,
row->sharerej++;
break;
}
K_WUNLOCK(keysharesummary_free);
}
/* Keep some simple stats on how often shares are out of order
@ -5330,15 +5472,15 @@ char *ooo_status(char *buf, size_t siz)
/* sharesummaries are no longer stored in the DB but fields are updated as b4
* This creates/updates both the sharesummaries and the keysharesummaries */
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS)
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd,
WHERE_FFL_ARGS)
{
WORKMARKERS *wm;
SHARESUMMARY *row, *p_row;
KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL;
K_ITEM *ss_item, *kiss_item = NULL, *kass_item = NULL, *wm_item, *p_item = NULL;
bool new = false, p_new = false, ki_new = false, ka_new = false;
int64_t userid, workinfoid;
int64_t userid, workinfoid, markerid;
char *workername, *address = NULL, *agent = NULL;
char *st = NULL, *db = NULL;
char ooo_buf[256];
@ -5371,33 +5513,30 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED,
NULL);
K_RUNLOCK(workmarkers_free);
if (wm_item) {
DATA_WORKMARKERS(wm, wm_item);
markerid = wm->markerid;
K_RUNLOCK(workmarkers_free);
LOGERR("%s(): attempt to update sharesummary "
"with %s %"PRId64"/%"PRId64"/%s "CDDB" %s"
" but processed workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, st = safe_text(workername),
db = ctv_to_buf(cd, NULL, 0),
wm->markerid);
db = ctv_to_buf(cd, NULL, 0), markerid);
FREENULL(st);
FREENULL(db);
return false;
}
K_RUNLOCK(workmarkers_free);
K_RLOCK(sharesummary_free);
K_WLOCK(sharesummary_free);
ss_item = find_sharesummary(userid, workername, workinfoid);
p_item = find_sharesummary_p(workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) {
DATA_SHARESUMMARY(row, ss_item);
} else {
new = true;
K_WLOCK(sharesummary_free);
ss_item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(row, ss_item);
bzero(row, sizeof(*row));
row->userid = userid;
@ -5409,20 +5548,23 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data
set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl);
if (new) {
add_to_ktree(sharesummary_root, ss_item);
add_to_ktree(sharesummary_workinfoid_root, ss_item);
k_add_head(sharesummary_store, ss_item);
}
K_WUNLOCK(sharesummary_free);
// Ignore shareerrors for keysummaries
if (s_row) {
K_RLOCK(keysharesummary_free);
K_WLOCK(keysharesummary_free);
kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address);
kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent);
K_RUNLOCK(keysharesummary_free);
if (kiss_item) {
DATA_KEYSHARESUMMARY(ki_row, kiss_item);
} else {
ki_new = true;
K_WLOCK(keysharesummary_free);
kiss_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ki_row, kiss_item);
bzero(ki_row, sizeof(*ki_row));
ki_row->workinfoid = workinfoid;
@ -5433,14 +5575,20 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data
set_keysharesummary_stats(ki_row, s_row, ki_new);
if (ki_new) {
add_to_ktree(keysharesummary_root, kiss_item);
k_add_head(keysharesummary_store, kiss_item);
}
K_WUNLOCK(keysharesummary_free);
K_WLOCK(keysharesummary_free);
kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent);
if (kass_item) {
DATA_KEYSHARESUMMARY(ka_row, kass_item);
} else {
ka_new = true;
K_WLOCK(keysharesummary_free);
kass_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ka_row, kass_item);
bzero(ka_row, sizeof(*ka_row));
ka_row->workinfoid = workinfoid;
@ -5451,6 +5599,11 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data
set_keysharesummary_stats(ka_row, s_row, ka_new);
if (ka_new) {
add_to_ktree(keysharesummary_root, kass_item);
k_add_head(keysharesummary_store, kass_item);
}
K_WUNLOCK(keysharesummary_free);
}
if (!new) {
@ -5506,13 +5659,14 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
}
}
K_WLOCK(sharesummary_free);
p_item = find_sharesummary_p(workinfoid);
if (p_item) {
DATA_SHARESUMMARY(p_row, p_item);
} else {
p_new = true;
K_WLOCK(sharesummary_free);
p_item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row));
POOL_SS(p_row);
@ -5521,42 +5675,17 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl);
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
// Store either new item
if (new || p_new) {
K_WLOCK(sharesummary_free);
if (new) {
add_to_ktree(sharesummary_root, ss_item);
add_to_ktree(sharesummary_workinfoid_root, ss_item);
k_add_head(sharesummary_store, ss_item);
}
if (p_new) {
add_to_ktree(sharesummary_pool_root, p_item);
k_add_head(sharesummary_pool_store, p_item);
}
K_WUNLOCK(sharesummary_free);
}
if (ki_new || ka_new) {
K_WLOCK(keysharesummary_free);
if (ki_new) {
add_to_ktree(keysharesummary_root, kiss_item);
k_add_head(keysharesummary_store, kiss_item);
}
if (ka_new) {
add_to_ktree(keysharesummary_root, kass_item);
k_add_head(keysharesummary_store, kass_item);
}
K_WUNLOCK(keysharesummary_free);
if (p_new) {
add_to_ktree(sharesummary_pool_root, p_item);
k_add_head(sharesummary_pool_store, p_item);
}
K_WUNLOCK(sharesummary_free);
return true;
}
// No key fields are modified
bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
tv_t *cd, WHERE_FFL_ARGS)
bool sharesummary_age(K_ITEM *ss_item)
{
SHARESUMMARY *row;
@ -5566,8 +5695,6 @@ bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
row->complete[0] = SUMMARY_COMPLETE;
row->complete[1] = '\0';
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
return true;
}

1
src/klist.c

@ -13,6 +13,7 @@
const char *tree_node_list_name = "TreeNodes";
#if LOCK_CHECK
bool disable_checks = false;
bool check_locks = true;
const char *thread_noname = "UNSET";
int next_thread_id = 0;

27
src/klist.h

@ -75,10 +75,13 @@ extern const char *tree_node_list_name;
* default to false,
* or turn off check_locks during ckdb startup with a ckpmsg 'locks.ID.locks'
* If you turn deadlock prediction on with ckpmsg 'locks.1.deadlocks=y'
* it will not re-enable it for any thread that has alread predicted
* it will not re-enable it for any thread that has already predicted
* a deadlock */
#if LOCK_CHECK
// Disable all lock checks permanently if thread limits are exceeded
extern bool disable_checks;
// We disable lock checking if an error is encountered
extern bool check_locks;
/* Maximum number of threads preallocated
@ -239,7 +242,9 @@ retry:
#if LOCK_CHECK
#define LOCK_MAYBE
/* The simple lock_check_init check is in case someone incorrectly changes ckdb.c ...
* It's not fool proof :P */
* It's not fool proof :P
* If LOCK_INIT() is called too many times, i.e. too many threads,
* it will report and disable lock checking */
#define LOCK_INIT(_name) do { \
if (!lock_check_init) { \
quithere(1, "In thread %s, lock_check_lock has not been " \
@ -248,6 +253,12 @@ retry:
ck_wlock(&lock_check_lock); \
my_thread_id = next_thread_id++; \
ck_wunlock(&lock_check_lock); \
if (my_thread_id >= MAX_THREADS) { \
disable_checks = true; \
LOGERR("WARNING: all lock checking disabled due to " \
"initialising too many threads - limit %d", \
MAX_THREADS); \
} \
my_thread_name = strdup(_name); \
} while (0)
#define FIRST_LOCK_INIT(_name) do { \
@ -313,7 +324,7 @@ retry:
static const char *_fl = __FILE__; \
static const char *_f = __func__; \
static const int _l = __LINE__; \
if (my_check_locks && check_locks) { \
if (!disable_checks && my_check_locks && check_locks) { \
if (_mode == LOCK_MODE_LOCK) { \
if (THRLCK(_list).first_held || \
(THRLCK(_list).r_count != 0) || \
@ -384,7 +395,7 @@ retry:
} \
} \
} \
if (check_deadlocks && my_check_deadlocks) { \
if (!disable_checks && check_deadlocks && my_check_deadlocks) { \
int _dp = (_list)->deadlock_priority; \
if (my_lock_level == 0) { \
if (_mode == LOCK_MODE_LOCK) { \
@ -477,7 +488,7 @@ retry:
LOCK_MODE_UNLOCK, LOCK_TYPE_READ)
#define _LIST_WRITE(_list, _chklock, _file, _func, _line) do { \
if (my_check_locks && check_locks && _chklock) { \
if (!disable_checks && my_check_locks && check_locks && _chklock) { \
if (!THRLCK(_list).first_held || \
(THRLCK(_list).r_count != 0) || \
(THRLCK(_list).w_count != 1)) { \
@ -498,7 +509,7 @@ retry:
} \
} while (0)
#define _LIST_WRITE2(_list, _chklock) do { \
if (my_check_locks && check_locks && _chklock) { \
if (!disable_checks && my_check_locks && check_locks && _chklock) { \
if (!THRLCK(_list).first_held || \
(THRLCK(_list).r_count != 0) || \
(THRLCK(_list).w_count != 1)) { \
@ -519,7 +530,7 @@ retry:
} while (0)
// read is ok under read or write
#define _LIST_READ(_list, _chklock, _file, _func, _line) do { \
if (my_check_locks && check_locks && _chklock) { \
if (!disable_checks && my_check_locks && check_locks && _chklock) { \
if (!THRLCK(_list).first_held || \
(THRLCK(_list).r_count + \
THRLCK(_list).w_count) != 1) { \
@ -540,7 +551,7 @@ retry:
} \
} while (0)
#define _LIST_READ2(_list, _chklock) do { \
if (my_check_locks && check_locks && _chklock) { \
if (!disable_checks && my_check_locks && check_locks && _chklock) { \
if (!THRLCK(_list).first_held || \
(THRLCK(_list).r_count + \
THRLCK(_list).w_count) != 1) { \

Loading…
Cancel
Save