Browse Source

ckdb - make share processing thread safe and enable a cmd socket during key_update

master
kanoi 8 years ago
parent
commit
db77dfc788
  1. 715
      src/ckdb.c
  2. 33
      src/ckdb.h
  3. 14
      src/ckdb_cmd.c
  4. 173
      src/ckdb_data.c
  5. 231
      src/ckdb_dbio.c

715
src/ckdb.c

@ -117,8 +117,15 @@ static bool blistener_using_data;
static bool breakdown_using_data; static bool breakdown_using_data;
static bool replier_using_data; static bool replier_using_data;
// Define the array size for thread data
#define THREAD_LIMIT 99
/* Use -Q to set it higher
* Setting it higher can degrade performance if the server can't
* handle the extra locking or is swapping */
static int queue_threads = 1;
// -B to override calculated value // -B to override calculated value
static int breakdown_threads = -1; static int breakdown_threads = -1;
#define BREAKDOWN_RATIO 3
static int reload_breakdown_count = 0; static int reload_breakdown_count = 0;
static int cmd_breakdown_count = 0; static int cmd_breakdown_count = 0;
/* Lock for access to *breakdown_count /* Lock for access to *breakdown_count
@ -275,6 +282,10 @@ int64_t confirm_last_workinfoid;
#define WORKINFO_AGE 660 #define WORKINFO_AGE 660
static tv_t reload_timestamp; static tv_t reload_timestamp;
// Shared by threads - accessed under breakqueue_free lock
static uint64_t reload_processed = 0;
// Shared by threads - accessed under workqueue_free lock
static uint64_t workqueue_proc0 = 0, workqueue_proc1 = 0;
/* Allow overriding the workinfoid range used in the db load of /* Allow overriding the workinfoid range used in the db load of
* workinfo and sharesummary */ * workinfo and sharesummary */
@ -297,7 +308,7 @@ bool prereload = true;
// Different input data handling // Different input data handling
bool reloading = false; bool reloading = false;
// Start marks processing during a larger reload // Start marks processing during a larger reload
static bool reloaded_N_files = false; bool reloaded_N_files = false;
// Data load is complete // Data load is complete
bool startup_complete = false; bool startup_complete = false;
// Set to true when pool0 completes, pool0 = socket data during reload // Set to true when pool0 completes, pool0 = socket data during reload
@ -389,6 +400,9 @@ K_STORE *logqueue_store;
K_LIST *msgline_free; K_LIST *msgline_free;
K_STORE *msgline_store; K_STORE *msgline_store;
// This can be set with the -q option
static int reload_queue_limit = RELOAD_QUEUE_LIMIT;
// BREAKQUEUE // BREAKQUEUE
K_LIST *breakqueue_free; K_LIST *breakqueue_free;
K_STORE *reload_breakqueue_store; K_STORE *reload_breakqueue_store;
@ -498,6 +512,8 @@ K_TREE *users_root;
K_TREE *userid_root; K_TREE *userid_root;
K_LIST *users_free; K_LIST *users_free;
K_STORE *users_store; K_STORE *users_store;
// Emulate a list for lock checking
K_LIST *users_db_free;
// USERATTS // USERATTS
K_TREE *useratts_root; K_TREE *useratts_root;
@ -556,6 +572,8 @@ tv_t last_bc;
// current network diff // current network diff
double current_ndiff; double current_ndiff;
bool txn_tree_store = true; bool txn_tree_store = true;
// avoid trying to run 2 ages at the same time
bool workinfo_age_lock = false;
// SHARES shares.id.json={...} // SHARES shares.id.json={...}
K_TREE *shares_root; K_TREE *shares_root;
@ -3716,7 +3734,7 @@ static void *breaker(void *arg)
// Is this a reload thread or a cmd thread? // Is this a reload thread or a cmd thread?
reload = *(bool *)(arg); reload = *(bool *)(arg);
if (reload) { if (reload) {
queue_limit = RELOAD_QUEUE_LIMIT; queue_limit = reload_queue_limit;
queue_sleep = RELOAD_QUEUE_SLEEP_MS; queue_sleep = RELOAD_QUEUE_SLEEP_MS;
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000;
@ -4353,7 +4371,7 @@ static void make_a_shift_mark()
int32_t prev_height; int32_t prev_height;
char wi_bits[TXT_SML+1]; char wi_bits[TXT_SML+1];
bool was_block = false, ok, oc_look = true; bool was_block = false, ok, oc_look = true;
char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ], cd_buf3[DATE_BUFSIZ]; char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ];
int used_wid; int used_wid;
/* If there are no CURRENT marks, make the first one by /* If there are no CURRENT marks, make the first one by
@ -4433,11 +4451,10 @@ static void make_a_shift_mark()
if (ss_item) { if (ss_item) {
tv_to_buf(&(sharesummary->lastshareacc), cd_buf, sizeof(cd_buf)); tv_to_buf(&(sharesummary->lastshareacc), cd_buf, sizeof(cd_buf));
tv_to_buf(&(sharesummary->lastshare), cd_buf2, sizeof(cd_buf2)); tv_to_buf(&(sharesummary->lastshare), cd_buf2, sizeof(cd_buf2));
tv_to_buf(&(sharesummary->createdate), cd_buf3, sizeof(cd_buf3)); LOGDEBUG("%s() last sharesummary %s/%s/%"PRId64"/%s/%s",
LOGDEBUG("%s() last sharesummary %s/%s/%"PRId64"/%s/%s/%s",
__func__, sharesummary->complete, __func__, sharesummary->complete,
sharesummary->workername, sharesummary->workername,
ss_age_wid, cd_buf, cd_buf2, cd_buf3); ss_age_wid, cd_buf, cd_buf2);
} }
LOGDEBUG("%s() age sharesummary limit wid %"PRId64, __func__, ss_age_wid); LOGDEBUG("%s() age sharesummary limit wid %"PRId64, __func__, ss_age_wid);
@ -4676,14 +4693,12 @@ static void make_a_shift_mark()
cd_buf, sizeof(cd_buf)); cd_buf, sizeof(cd_buf));
tv_to_buf(&(sharesummary->lastshare), tv_to_buf(&(sharesummary->lastshare),
cd_buf2, sizeof(cd_buf2)); cd_buf2, sizeof(cd_buf2));
tv_to_buf(&(sharesummary->createdate),
cd_buf3, sizeof(cd_buf3));
LOGEMERG("%s() ERR unaged sharesummary " LOGEMERG("%s() ERR unaged sharesummary "
"%s/%s/%"PRId64"/%s/%s/%s", "%s/%s/%"PRId64"/%s/%s",
__func__, sharesummary->complete, __func__, sharesummary->complete,
sharesummary->workername, sharesummary->workername,
sharesummary->workinfoid, sharesummary->workinfoid,
cd_buf, cd_buf2, cd_buf3); cd_buf, cd_buf2);
return; return;
} }
} }
@ -5367,6 +5382,34 @@ static void *process_socket(void *arg)
DATA_BREAKQUEUE(bq, bq_item); DATA_BREAKQUEUE(bq, bq_item);
DATA_MSGLINE(msgline, bq->ml_item); DATA_MSGLINE(msgline, bq->ml_item);
// Limited commands available during key_update
if (key_update) {
switch (bq->cmdnum) {
case CMD_TERMINATE:
case CMD_PING:
case CMD_VERSION:
case CMD_LOGLEVEL:
case CMD_FLUSH:
case CMD_STATS:
case CMD_SHSTA:
case CMD_CHKPASS:
case CMD_GETATTS:
case CMD_HOMEPAGE:
break;
default:
snprintf(reply, sizeof(reply),
"%s.%ld.unavailable.%s",
msgline->id,
bq->now.tv_sec,
msgline->cmd);
setnow(&(msgline->processed));
ckdb_unix_msg(REPLIER_CMD, bq->sockd,
reply, msgline, true);
goto skippy;
}
}
if (SEQALL_LOG) { if (SEQALL_LOG) {
K_ITEM *seqall; K_ITEM *seqall;
if (msgline->trf_root) { if (msgline->trf_root) {
@ -5378,6 +5421,7 @@ static void *process_socket(void *arg)
} }
} }
} }
replied = btc = false; replied = btc = false;
switch (bq->cmdnum) { switch (bq->cmdnum) {
case CMD_AUTH: case CMD_AUTH:
@ -5584,7 +5628,7 @@ static void *process_socket(void *arg)
case CMD_BLOCKSTATUS: case CMD_BLOCKSTATUS:
case CMD_MARKS: case CMD_MARKS:
case CMD_QUERY: case CMD_QUERY:
if (!startup_complete) { if (!startup_complete && !key_update) {
snprintf(reply, sizeof(reply), snprintf(reply, sizeof(reply),
"%s.%ld.loading.%s", "%s.%ld.loading.%s",
msgline->id, msgline->id,
@ -5729,6 +5773,9 @@ static void *process_socket(void *arg)
msgline, true); msgline, true);
break; break;
} }
skippy:
if (bq->sockd >= 0) if (bq->sockd >= 0)
dec_sockd = true; dec_sockd = true;
else else
@ -5905,28 +5952,167 @@ static void *socketer(void *arg)
return NULL; return NULL;
} }
static void *process_reload(__maybe_unused void *arg) static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
{ {
PGconn *conn = NULL;
MSGLINE *msgline = NULL; MSGLINE *msgline = NULL;
K_ITEM *bq_item = NULL;
BREAKQUEUE *bq = NULL; BREAKQUEUE *bq = NULL;
enum cmd_values cmdnum; enum cmd_values cmdnum;
char *ans, *st = NULL; char *ans, *st = NULL;
DATA_BREAKQUEUE(bq, bq_item);
DATA_MSGLINE(msgline, bq->ml_item);
if (SEQALL_LOG) {
K_ITEM *seqall;
if (msgline->trf_root) {
seqall = find_transfer(msgline->trf_root, SEQALL);
if (seqall) {
LOGNOTICE("%s() SEQALL %d %s",
__func__, bq->cmdnum,
transfer_data(seqall));
}
}
}
switch (bq->cmdnum) {
// Ignore
case CMD_REPLY:
case CMD_ALERTEVENT:
case CMD_ALERTOVENT:
break;
// Shouldn't be there
case CMD_TERMINATE:
case CMD_PING:
case CMD_VERSION:
case CMD_LOGLEVEL:
case CMD_FLUSH:
// Non pool commands, shouldn't be there
case CMD_ADDUSER:
case CMD_NEWPASS:
case CMD_CHKPASS:
case CMD_2FA:
case CMD_USERSET:
case CMD_WORKERSET:
case CMD_BLOCKLIST:
case CMD_BLOCKSTATUS:
case CMD_NEWID:
case CMD_PAYMENTS:
case CMD_WORKERS:
case CMD_ALLUSERS:
case CMD_HOMEPAGE:
case CMD_GETATTS:
case CMD_SETATTS:
case CMD_EXPATTS:
case CMD_GETOPTS:
case CMD_SETOPTS:
case CMD_DSP:
case CMD_STATS:
case CMD_PPLNS:
case CMD_PPLNS2:
case CMD_PAYOUTS:
case CMD_MPAYOUTS:
case CMD_SHIFTS:
case CMD_USERSTATUS:
case CMD_MARKS:
case CMD_PSHIFT:
case CMD_SHSTA:
case CMD_USERINFO:
case CMD_BTCSET:
case CMD_QUERY:
case CMD_LOCKS:
case CMD_EVENTS:
case CMD_HIGH:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
__func__, bq->count,
st = safe_text(msgline->msg));
FREENULL(st);
break;
case CMD_HEARTBEAT:
case CMD_POOLSTAT:
case CMD_USERSTAT:
case CMD_WORKERSTAT:
case CMD_BLOCK:
if (key_update)
break;
case CMD_AUTH:
case CMD_ADDRAUTH:
if (confirm_sharesummary)
break;
case CMD_SHARELOG:
// This will return the same cmdnum or DUP
cmdnum = process_seq(msgline);
if (cmdnum != CMD_DUPSEQ) {
ans = ckdb_cmds[msgline->which_cmds].func(conn,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root, true);
FREENULL(ans);
}
// TODO: time stats from each msgline tv_t
break;
default:
// Force this switch to be updated if new cmds are added
quithere(1, "%s line %"PRIu64" '%s' - not "
"handled by reload",
bq->filename, bq->count,
st = safe_text_nonull(msgline->cmd));
// Won't get here ...
FREENULL(st);
break;
}
if (bq->ml_item) {
free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
bq->ml_item = NULL;
}
free(bq->buf);
}
static void *process_reload(__maybe_unused void *arg)
{
pthread_t *procrel_pt;
PGconn *conn = NULL;
K_ITEM *bq_item = NULL;
char buf[128];
time_t now; time_t now;
uint64_t processed = 0; int i, *n, zeros;
ts_t when, when_add; ts_t when, when_add;
int ret; int ret;
pthread_detach(pthread_self()); if (arg)
i = *(int *)(arg);
else {
pthread_detach(pthread_self());
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; n = malloc(queue_threads * sizeof(int));
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; procrel_pt = malloc(queue_threads * sizeof(*procrel_pt));
for (i = 1; i < queue_threads; i++) {
n[i] = i;
create_pthread(&(procrel_pt[i]), process_reload, &(n[i]));
}
i = 0;
LOCK_INIT("db_procreload"); LOGNOTICE("%s() starting", __func__);
rename_proc("db_procreload"); }
LOGNOTICE("%s() starting", __func__); if (queue_threads < 10)
zeros = 1;
else
zeros = (int)log10(queue_threads) + 1;
snprintf(buf, sizeof(buf), "db_p%0*drload", zeros, i);
LOCK_INIT(buf);
rename_proc(buf);
when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000;
conn = dbconnect(); conn = dbconnect();
now = time(NULL); now = time(NULL);
@ -5934,8 +6120,10 @@ static void *process_reload(__maybe_unused void *arg)
while (!everyone_die) { while (!everyone_die) {
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
bq_item = k_unlink_head(reload_done_breakqueue_store); bq_item = k_unlink_head(reload_done_breakqueue_store);
if (bq_item) if (bq_item) {
reload_processing++; reload_processing++;
reload_processed++;
}
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
if (!bq_item) { if (!bq_item) {
@ -5958,8 +6146,6 @@ static void *process_reload(__maybe_unused void *arg)
continue; continue;
} }
processed++;
// Don't keep a connection for more than ~10s ... of processing // Don't keep a connection for more than ~10s ... of processing
if ((time(NULL) - now) > 10) { if ((time(NULL) - now) > 10) {
PQfinish(conn); PQfinish(conn);
@ -5967,120 +6153,7 @@ static void *process_reload(__maybe_unused void *arg)
now = time(NULL); now = time(NULL);
} }
DATA_BREAKQUEUE(bq, bq_item); process_reload_item(conn, bq_item);
DATA_MSGLINE(msgline, bq->ml_item);
if (SEQALL_LOG) {
K_ITEM *seqall;
if (msgline->trf_root) {
seqall = find_transfer(msgline->trf_root, SEQALL);
if (seqall) {
LOGNOTICE("%s() SEQALL %d %s",
__func__, bq->cmdnum,
transfer_data(seqall));
}
}
}
switch (bq->cmdnum) {
// Ignore
case CMD_REPLY:
case CMD_ALERTEVENT:
case CMD_ALERTOVENT:
break;
// Shouldn't be there
case CMD_TERMINATE:
case CMD_PING:
case CMD_VERSION:
case CMD_LOGLEVEL:
case CMD_FLUSH:
// Non pool commands, shouldn't be there
case CMD_ADDUSER:
case CMD_NEWPASS:
case CMD_CHKPASS:
case CMD_2FA:
case CMD_USERSET:
case CMD_WORKERSET:
case CMD_BLOCKLIST:
case CMD_BLOCKSTATUS:
case CMD_NEWID:
case CMD_PAYMENTS:
case CMD_WORKERS:
case CMD_ALLUSERS:
case CMD_HOMEPAGE:
case CMD_GETATTS:
case CMD_SETATTS:
case CMD_EXPATTS:
case CMD_GETOPTS:
case CMD_SETOPTS:
case CMD_DSP:
case CMD_STATS:
case CMD_PPLNS:
case CMD_PPLNS2:
case CMD_PAYOUTS:
case CMD_MPAYOUTS:
case CMD_SHIFTS:
case CMD_USERSTATUS:
case CMD_MARKS:
case CMD_PSHIFT:
case CMD_SHSTA:
case CMD_USERINFO:
case CMD_BTCSET:
case CMD_QUERY:
case CMD_LOCKS:
case CMD_EVENTS:
case CMD_HIGH:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
__func__, bq->count,
st = safe_text(msgline->msg));
FREENULL(st);
break;
case CMD_HEARTBEAT:
case CMD_POOLSTAT:
case CMD_USERSTAT:
case CMD_WORKERSTAT:
case CMD_BLOCK:
if (key_update)
break;
case CMD_AUTH:
case CMD_ADDRAUTH:
if (confirm_sharesummary)
break;
case CMD_SHARELOG:
// This will return the same cmdnum or DUP
cmdnum = process_seq(msgline);
if (cmdnum != CMD_DUPSEQ) {
ans = ckdb_cmds[msgline->which_cmds].func(conn,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root, true);
FREENULL(ans);
}
// TODO: time stats from each msgline tv_t
break;
default:
// Force this switch to be updated if new cmds are added
quithere(1, "%s line %"PRIu64" '%s' - not "
"handled by reload",
bq->filename, bq->count,
st = safe_text_nonull(msgline->cmd));
// Won't get here ...
FREENULL(st);
break;
}
if (bq->ml_item) {
free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
bq->ml_item = NULL;
}
free(bq->buf);
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
reload_processing--; reload_processing--;
@ -6092,7 +6165,13 @@ static void *process_reload(__maybe_unused void *arg)
PQfinish(conn); PQfinish(conn);
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); if (!arg) {
for (i = 1; i < queue_threads; i++)
join_pthread(procrel_pt[i]);
LOGNOTICE("%s() exiting, processed %"PRIu64,
__func__, reload_processed);
}
return NULL; return NULL;
} }
@ -6164,7 +6243,7 @@ static void reload_line(char *filename, char *buf, uint64_t count)
pthread_cond_signal(&bq_reload_waitcond); pthread_cond_signal(&bq_reload_waitcond);
mutex_unlock(&bq_reload_waitlock); mutex_unlock(&bq_reload_waitlock);
while (qcount > RELOAD_QUEUE_LIMIT) { while (qcount > reload_queue_limit) {
cksleep_ms(RELOAD_QUEUE_SLEEP_MS); cksleep_ms(RELOAD_QUEUE_SLEEP_MS);
K_RLOCK(breakqueue_free); K_RLOCK(breakqueue_free);
qcount = reload_breakqueue_store->count; qcount = reload_breakqueue_store->count;
@ -6546,129 +6625,57 @@ static void free_lost(SEQDATA *seqdata)
} }
} }
// TODO: equivalent of api_allow static void *pqproc(void *arg)
static void *listener(void *arg)
{ {
/* Process queued work - ensure pool0 is emptied first,
* even if there is pending pool0 data being processed by breaker() */
static bool pool0 = true;
static tv_t wq_stt, wq_fin;
pthread_t *queue_pt;
PGconn *conn = NULL; PGconn *conn = NULL;
pthread_t log_pt;
pthread_t sock_pt;
pthread_t summ_pt;
pthread_t mark_pt;
pthread_t break_pt;
K_ITEM *wq_item; K_ITEM *wq_item;
time_t now = 0; time_t now = 0;
int bq, bqp, bqd, wq0count, wqcount, wqgot; bool switch_msg = false, complete_msg;
char ooo_buf[256]; int wqcount, wqgot;
tv_t wq_stt, wq_fin; char buf[128];
double min, sec; double min, sec = 0;
SEQSET *seqset = NULL; SEQSET *seqset = NULL;
SEQDATA *seqdata; SEQDATA *seqdata;
K_ITEM *ss_item; K_ITEM *ss_item;
int cpus, i; int i, *n, zeros;
bool reloader, cmder, pool0, switch_msg = false;
uint64_t proc0 = 0, proc1 = 0;
ts_t when, when_add; ts_t when, when_add;
int ret; int ret;
pthread_detach(pthread_self()); if (!arg) {
setnow(&wq_stt);
when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
LOCK_INIT("db_plistener");
rename_proc("db_plistener");
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(logqueue, 94);
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
if (breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1;
breakdown_threads = (int)(cpus / 3) ? : 1;
}
LOGWARNING("%s(): creating %d*2 breaker threads ...",
__func__, breakdown_threads);
reloader = true;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &reloader);
cmder = false;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &cmder);
if (no_data_log == false)
create_pthread(&log_pt, logger, NULL);
if (!confirm_sharesummary)
create_pthread(&sock_pt, socketer, arg);
create_pthread(&summ_pt, summariser, NULL);
create_pthread(&mark_pt, marker, NULL);
plistener_using_data = true;
if (!setup_data()) { n = malloc(queue_threads * sizeof(int));
if (!everyone_die) { queue_pt = malloc(queue_threads * sizeof(*queue_pt));
LOGEMERG("ABORTING"); for (i = 1; i < queue_threads; i++) {
everyone_die = true; n[i] = i;
create_pthread(&(queue_pt[i]), pqproc, &(n[i]));
} }
goto sayonara; } else {
} i = *(int *)(arg);
if (!everyone_die) {
K_RLOCK(workqueue_free);
wq0count = pool0_workqueue_store->count;
wqcount = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
K_RLOCK(breakqueue_free);
bq = cmd_breakqueue_store->count;
bqp = cmd_processing;
bqd = cmd_done_breakqueue_store->count;
K_RUNLOCK(breakqueue_free);
LOGWARNING("reload shares OoO %s",
ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)", if (queue_threads < 10)
__func__, bq+bqp+bqd+wq0count+wqcount, zeros = 1;
bq, bqp, bqd, wq0count, wqcount); else
zeros = (int)log10(queue_threads) + 1;
/* Until startup_complete, the values should be ignored snprintf(buf, sizeof(buf), "db_p%0*dqproc", zeros, i);
* Setting them to 'now' means that they won't time out LOCK_INIT(buf);
* until after startup_complete */ rename_proc(buf);
ck_wlock(&last_lock); }
setnow(&last_heartbeat);
copy_tv(&last_workinfo, &last_heartbeat);
copy_tv(&last_share, &last_heartbeat);
copy_tv(&last_share_acc, &last_heartbeat);
copy_tv(&last_share_inv, &last_heartbeat);
copy_tv(&last_auth, &last_heartbeat);
ck_wunlock(&last_lock);
startup_complete = true; when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
setnow(&wq_stt); now = time(NULL);
conn = dbconnect(); conn = dbconnect();
now = time(NULL); wqgot = 0;
wqgot = 0;
}
LOGNOTICE("%s() processing pool0", __func__);
/* Process queued work - ensure pool0 is emptied first,
* even if there is pending pool0 data being processed by breaker() */
pool0 = true;
// Override checking until pool0 is complete // Override checking until pool0 is complete
wqcount = -1; wqcount = -1;
while (!everyone_die) { while (!everyone_die) {
@ -6688,17 +6695,24 @@ static void *listener(void *arg)
wq_item = k_unlink_head(pool_workqueue_store); wq_item = k_unlink_head(pool_workqueue_store);
wqcount = pool_workqueue_store->count; wqcount = pool_workqueue_store->count;
} }
if (wqcount == 0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin);
if (wq_item) {
if (pool0)
workqueue_proc0++;
else
workqueue_proc1++;
}
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
if (switch_msg) { if (switch_msg) {
switch_msg = false; switch_msg = false;
LOGNOTICE("%s() pool0 complete, processed %"PRIu64, LOGNOTICE("%s() pool0 complete, processed %"PRIu64,
__func__, proc0); __func__, workqueue_proc0);
} }
if (wqcount == 0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin);
/* Don't keep a connection for more than ~10s or ~10000 items /* Don't keep a connection for more than ~10s or ~10000 items
* but always have a connection open */ * but always have a connection open */
if ((time(NULL) - now) > 10 || wqgot > 10000) { if ((time(NULL) - now) > 10 || wqgot > 10000) {
@ -6709,24 +6723,27 @@ static void *listener(void *arg)
} }
if (wq_item) { if (wq_item) {
if (pool0)
proc0++;
else
proc1++;
wqgot++; wqgot++;
process_queued(conn, wq_item); process_queued(conn, wq_item);
tick(); tick();
} }
complete_msg = false;
K_WLOCK(workqueue_free);
if (wqcount == 0 && wq_stt.tv_sec != 0L) { if (wqcount == 0 && wq_stt.tv_sec != 0L) {
sec = tvdiff(&wq_fin, &wq_stt); sec = tvdiff(&wq_fin, &wq_stt);
min = floor(sec / 60.0); complete_msg = true;
sec -= min * 60.0;
LOGWARNING("pool queue completed %.0fm %.3fs", min, sec);
// Used as the flag to display the message once // Used as the flag to display the message once
wq_stt.tv_sec = 0L; wq_stt.tv_sec = 0L;
reload_queue_complete = true; reload_queue_complete = true;
} }
K_WUNLOCK(workqueue_free);
if (complete_msg) {
min = floor(sec / 60.0);
sec -= min * 60.0;
LOGWARNING("%s() pool queue completed %.0fm %.3fs",
__func__, min, sec);
}
/* Checked outside lock but only changed under lock /* Checked outside lock but only changed under lock
* This avoids taking out the lock repeatedly and the cleanup * This avoids taking out the lock repeatedly and the cleanup
@ -6771,15 +6788,127 @@ static void *listener(void *arg)
} }
} }
sayonara: if (conn)
PQfinish(conn);
if (!arg) {
for (i = 1; i < queue_threads; i++)
join_pthread(queue_pt[i]);
}
return NULL;
}
static void *listener(void *arg)
{
pthread_t log_pt;
pthread_t sock_pt;
pthread_t summ_pt;
pthread_t mark_pt;
pthread_t break_pt;
int bq, bqp, bqd, wq0count, wqcount;
char ooo_buf[256];
char buf[128];
int cpus, i;
bool reloader, cmder;
pthread_detach(pthread_self());
snprintf(buf, sizeof(buf), "db_p0qproc");
LOCK_INIT(buf);
rename_proc(buf);
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(logqueue, 94);
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
if (breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (cpus < 1)
cpus = 1;
breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1;
}
LOGWARNING("%s(): creating %d*2 breaker threads ...",
__func__, breakdown_threads);
reloader = true;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &reloader);
cmder = false;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &cmder);
if (no_data_log == false)
create_pthread(&log_pt, logger, NULL);
if (!confirm_sharesummary)
create_pthread(&sock_pt, socketer, arg);
create_pthread(&summ_pt, summariser, NULL);
create_pthread(&mark_pt, marker, NULL);
plistener_using_data = true;
if (!setup_data()) {
if (!everyone_die) {
LOGEMERG("ABORTING");
everyone_die = true;
}
}
if (!everyone_die) {
K_RLOCK(workqueue_free);
wq0count = pool0_workqueue_store->count;
wqcount = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
K_RLOCK(breakqueue_free);
bq = cmd_breakqueue_store->count;
bqp = cmd_processing;
bqd = cmd_done_breakqueue_store->count;
K_RUNLOCK(breakqueue_free);
LOGWARNING("reload shares OoO %s",
ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)",
__func__, bq+bqp+bqd+wq0count+wqcount,
bq, bqp, bqd, wq0count, wqcount);
/* Until startup_complete, the values should be ignored
* Setting them to 'now' means that they won't time out
* until after startup_complete */
ck_wlock(&last_lock);
setnow(&last_heartbeat);
copy_tv(&last_workinfo, &last_heartbeat);
copy_tv(&last_share, &last_heartbeat);
copy_tv(&last_share_acc, &last_heartbeat);
copy_tv(&last_share_inv, &last_heartbeat);
copy_tv(&last_auth, &last_heartbeat);
ck_wunlock(&last_lock);
startup_complete = true;
LOGNOTICE("%s() processing pool0", __func__);
pqproc(NULL);
}
LOGNOTICE("%s() exiting, pool0 %"PRIu64" pool %"PRIu64, LOGNOTICE("%s() exiting, pool0 %"PRIu64" pool %"PRIu64,
__func__, proc0, proc1); __func__, workqueue_proc0, workqueue_proc1);
plistener_using_data = false; plistener_using_data = false;
if (conn)
PQfinish(conn);
POOLINSTANCE_RESET_MSG("exiting"); POOLINSTANCE_RESET_MSG("exiting");
return NULL; return NULL;
@ -7062,14 +7191,14 @@ static void update_check(int64_t markerid_stt, int64_t markerid_fin)
LOGWARNING("update complete %.0fm %.3fs", min, sec); LOGWARNING("update complete %.0fm %.3fs", min, sec);
} }
static void update_keysummary() static void update_keysummary(ckpool_t *ckp)
{ {
int64_t markerid_stt, markerid_fin; int64_t markerid_stt, markerid_fin;
char *tmp, *minus; char *tmp, *minus;
tv_t db_stt, db_fin; tv_t db_stt, db_fin;
pthread_t break_pt; pthread_t break_pt, sock_pt;
double min, sec; double min, sec;
bool reloader; bool reloader, cmder;
int cpus, i; int cpus, i;
// Simple value check to abort early // Simple value check to abort early
@ -7132,14 +7261,20 @@ static void update_keysummary()
DLPRIO(breakqueue, PRIO_TERMINAL); DLPRIO(breakqueue, PRIO_TERMINAL);
#endif #endif
if (breakdown_threads <= 0) { if (breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; cpus = sysconf(_SC_NPROCESSORS_ONLN);
breakdown_threads = (int)(cpus / 3) ? : 1; if (cpus < 1)
cpus = 1;
breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1;
} }
LOGWARNING("%s(): creating %d breaker threads ...", LOGWARNING("%s(): creating %d+1 breaker threads ...",
__func__, breakdown_threads); __func__, breakdown_threads);
reloader = true; reloader = true;
for (i = 0; i < breakdown_threads; i++) for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &reloader); create_pthread(&break_pt, breaker, &reloader);
cmder = false;
// Only needs one
for (i = 0; i < 1; i++)
create_pthread(&break_pt, breaker, &cmder);
alloc_storage(); alloc_storage();
@ -7147,6 +7282,8 @@ static void update_keysummary()
setnow(&db_stt); setnow(&db_stt);
create_pthread(&sock_pt, socketer, &(ckp->main));
if (!getdata1() || everyone_die) if (!getdata1() || everyone_die)
return; return;
@ -7773,6 +7910,8 @@ static struct option long_options[] = {
{ "name", required_argument, 0, 'n' }, { "name", required_argument, 0, 'n' },
{ "dbpass", required_argument, 0, 'p' }, { "dbpass", required_argument, 0, 'p' },
{ "btc-pass", required_argument, 0, 'P' }, { "btc-pass", required_argument, 0, 'P' },
{ "reload-queue-limit", required_argument, 0, 'q' },
{ "queue-threads", required_argument, 0, 'Q' },
{ "ckpool-logdir", required_argument, 0, 'r' }, { "ckpool-logdir", required_argument, 0, 'r' },
{ "logdir", required_argument, 0, 'R' }, { "logdir", required_argument, 0, 'R' },
{ "sockdir", required_argument, 0, 's' }, { "sockdir", required_argument, 0, 's' },
@ -7822,7 +7961,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) {
switch(c) { switch(c) {
case '?': case '?':
case ':': case ':':
@ -7850,11 +7989,11 @@ int main(int argc, char **argv)
case 'B': case 'B':
{ {
int bt = atoi(optarg); int bt = atoi(optarg);
if (bt < 1) { if (bt < 1 || bt > THREAD_LIMIT) {
quit(1, "Invalid breakdown " quit(1, "Invalid breakdown "
"thread count %d " "thread count %d "
"- must be > 0", "- must be >0 and <=%d",
bt); bt, THREAD_LIMIT);
} }
breakdown_threads = bt; breakdown_threads = bt;
} }
@ -7985,6 +8124,29 @@ int main(int argc, char **argv)
while (*kill) while (*kill)
*(kill++) = '\0'; *(kill++) = '\0';
break; break;
case 'q':
{
int rql = atoi(optarg);
if (rql < 1) {
quit(1, "Invalid reload queue "
"limit %d - must be > 0",
rql);
}
reload_queue_limit = rql;
}
break;
case 'Q':
{
int qt = atoi(optarg);
if (qt < 1 || qt > THREAD_LIMIT) {
quit(1, "Invalid queue "
"thread count %d "
"- must be >0 and <=%d",
qt, THREAD_LIMIT);
}
queue_threads = qt;
}
break;
case 'r': case 'r':
restorefrom = strdup(optarg); restorefrom = strdup(optarg);
break; break;
@ -8156,11 +8318,13 @@ int main(int argc, char **argv)
// Emulate a list for lock checking // Emulate a list for lock checking
process_pplns_free = k_lock_only_list("ProcessPPLNS"); process_pplns_free = k_lock_only_list("ProcessPPLNS");
workers_db_free = k_lock_only_list("WorkersDB"); workers_db_free = k_lock_only_list("WorkersDB");
users_db_free = k_lock_only_list("UsersDB");
event_limits_free = k_lock_only_list("EventLimits"); event_limits_free = k_lock_only_list("EventLimits");
#if LOCK_CHECK #if LOCK_CHECK
DLPRIO(process_pplns, 99); DLPRIO(process_pplns, 99);
DLPRIO(workers_db, 98); DLPRIO(workers_db, 98);
DLPRIO(users_db, 97);
DLPRIO(event_limits, 46); // events-2 DLPRIO(event_limits, 46); // events-2
#endif #endif
@ -8173,7 +8337,12 @@ int main(int argc, char **argv)
} }
if (key_update) { if (key_update) {
update_keysummary(); ckp.main.sockname = strdup("klistener");
write_namepid(&ckp.main);
create_process_unixsock(&ckp.main);
fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC);
update_keysummary(&ckp);
everyone_die = true; everyone_die = true;
} else if (confirm_sharesummary) { } else if (confirm_sharesummary) {
// TODO: add a system lock to stop running 2 at once? // TODO: add a system lock to stop running 2 at once?

33
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.305" #define CKDB_VERSION DB_VERSION"-2.400"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -338,6 +338,8 @@ extern bool db_load_complete;
extern bool prereload; extern bool prereload;
// Different input data handling // Different input data handling
extern bool reloading; extern bool reloading;
// Start marks processing during a larger reload
extern bool reloaded_N_files;
// Data load is complete // Data load is complete
extern bool startup_complete; extern bool startup_complete;
// Tell everyone to die // Tell everyone to die
@ -1624,6 +1626,8 @@ extern K_TREE *users_root;
extern K_TREE *userid_root; extern K_TREE *userid_root;
extern K_LIST *users_free; extern K_LIST *users_free;
extern K_STORE *users_store; extern K_STORE *users_store;
// Emulate a list for lock checking
extern K_LIST *users_db_free;
// USERATTS // USERATTS
typedef struct useratts { typedef struct useratts {
@ -1890,6 +1894,8 @@ extern tv_t last_bc;
// current network diff // current network diff
extern double current_ndiff; extern double current_ndiff;
extern bool txn_tree_store; extern bool txn_tree_store;
// avoid trying to run 2 ages at the same time
extern bool workinfo_age_lock;
// Offset in binary coinbase1 of the block number // Offset in binary coinbase1 of the block number
#define BLOCKNUM_OFFSET 42 #define BLOCKNUM_OFFSET 42
@ -2013,7 +2019,6 @@ typedef struct sharesummary {
tv_t lastshareacc; tv_t lastshareacc;
double lastdiffacc; double lastdiffacc;
char complete[TXT_FLAG+1]; char complete[TXT_FLAG+1];
MODIFYDATECONTROLPOINTERS;
} SHARESUMMARY; } SHARESUMMARY;
/* After this many shares added, we need to update the DB record /* After this many shares added, we need to update the DB record
@ -2697,7 +2702,6 @@ typedef struct keysharesummary {
tv_t lastshareacc; tv_t lastshareacc;
double lastdiffacc; double lastdiffacc;
char complete[TXT_FLAG+1]; char complete[TXT_FLAG+1];
SIMPLEDATECONTROLPOINTERS;
} KEYSHARESUMMARY; } KEYSHARESUMMARY;
#define ALLOC_KEYSHARESUMMARY 1000 #define ALLOC_KEYSHARESUMMARY 1000
@ -3142,10 +3146,9 @@ extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b);
#define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx); #define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx);
extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx); extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx);
extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx);
extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd,
char *code, char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
tv_t *ss_last, int64_t *ss_count, int64_t *s_count, int64_t *s_count, int64_t *s_diff);
int64_t *s_diff);
extern double coinbase_reward(int32_t height); extern double coinbase_reward(int32_t height);
extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid); extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid);
extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
@ -3166,8 +3169,7 @@ extern void zero_sharesummary(SHARESUMMARY *row);
extern K_ITEM *_find_sharesummary(int64_t userid, char *workername, extern K_ITEM *_find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid, bool pool); int64_t workinfoid, bool pool);
extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername); extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername);
extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, extern void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd);
char *code, char *inet, tv_t *cd);
#define dbhash2btchash(_hash, _buf, _siz) \ #define dbhash2btchash(_hash, _buf, _siz) \
_dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE) _dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS); void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
@ -3411,16 +3413,11 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar
tv_t *cd, K_TREE *trf_root); tv_t *cd, K_TREE *trf_root);
extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm); extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm);
extern char *ooo_status(char *buf, size_t siz); extern char *ooo_status(char *buf, size_t siz);
#define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \ #define sharesummary_update(_s_row, _e_row, _cd) \
_sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \ _sharesummary_update(_s_row, _e_row, _cd, WHERE_FFL_HERE)
WHERE_FFL_HERE) extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd,
extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS); WHERE_FFL_ARGS);
#define sharesummary_age(_ss_item, _by, _code, _inet, _cd) \ extern bool sharesummary_age(K_ITEM *ss_item);
_sharesummary_age(_ss_item, _by, _code, _inet, _cd, WHERE_FFL_HERE)
extern bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
tv_t *cd, WHERE_FFL_ARGS);
extern bool keysharesummary_age(K_ITEM *kss_item); extern bool keysharesummary_age(K_ITEM *kss_item);
extern bool sharesummary_fill(PGconn *conn); extern bool sharesummary_fill(PGconn *conn);
extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash,

14
src/ckdb_cmd.c

@ -2865,18 +2865,20 @@ seconf:
} }
ok = workinfo_age(workinfoid, transfer_data(i_poolinstance), ok = workinfo_age(workinfoid, transfer_data(i_poolinstance),
by, code, inet, cd, &ss_first, &ss_last, cd, &ss_first, &ss_last, &ss_count, &s_count,
&ss_count, &s_count, &s_diff); &s_diff);
if (!ok) { if (!ok) {
LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id);
return strdup("failed.DATA"); return strdup("failed.DATA");
} else { } else {
/* Don't slow down the reload - do them later */ /* Don't slow down the reload - do them later,
if (!reloading || key_update) { * unless it's a long reload since:
* Any pool restarts in the reload data will cause
* unaged workinfos and thus would stop marker() */
if (!reloading || key_update || reloaded_N_files) {
// Aging is a queued item thus the reply is ignored // Aging is a queued item thus the reply is ignored
auto_age_older(workinfoid, auto_age_older(workinfoid,
transfer_data(i_poolinstance), transfer_data(i_poolinstance), cd);
by, code, inet, cd);
} }
} }
LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid); LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid);

173
src/ckdb_data.c

@ -89,12 +89,6 @@ void free_sharesummary_data(K_ITEM *item)
DATA_SHARESUMMARY(sharesummary, item); DATA_SHARESUMMARY(sharesummary, item);
LIST_MEM_SUB(sharesummary_free, sharesummary->workername); LIST_MEM_SUB(sharesummary_free, sharesummary->workername);
FREENULL(sharesummary->workername); FREENULL(sharesummary->workername);
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
} }
void free_optioncontrol_data(K_ITEM *item) void free_optioncontrol_data(K_ITEM *item)
@ -128,9 +122,6 @@ void free_keysharesummary_data(K_ITEM *item)
DATA_KEYSHARESUMMARY(keysharesummary, item); DATA_KEYSHARESUMMARY(keysharesummary, item);
LIST_MEM_SUB(keysharesummary_free, keysharesummary->key); LIST_MEM_SUB(keysharesummary_free, keysharesummary->key);
FREENULL(keysharesummary->key); FREENULL(keysharesummary->key);
SET_CREATEBY(keysharesummary_free, keysharesummary->createby, EMPTY);
SET_CREATECODE(keysharesummary_free, keysharesummary->createcode, EMPTY);
SET_CREATEINET(keysharesummary_free, keysharesummary->createinet, EMPTY);
} }
void free_keysummary_data(K_ITEM *item) void free_keysummary_data(K_ITEM *item)
@ -2144,6 +2135,8 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
SHARES lookshares, *shares; SHARES lookshares, *shares;
K_TREE_CTX s_ctx[1]; K_TREE_CTX s_ctx[1];
char error[1024]; char error[1024];
bool multiple = false;
int64_t curr_userid;
error[0] = '\0'; error[0] = '\0';
INIT_SHARES(&s_look); INIT_SHARES(&s_look);
@ -2154,6 +2147,7 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
DATE_ZERO(&(lookshares.createdate)); DATE_ZERO(&(lookshares.createdate));
s_look.data = (void *)(&lookshares); s_look.data = (void *)(&lookshares);
curr_userid = userid;
K_WLOCK(shares_free); K_WLOCK(shares_free);
s_item = find_after_in_ktree(shares_root, &s_look, s_ctx); s_item = find_after_in_ktree(shares_root, &s_look, s_ctx);
while (s_item) { while (s_item) {
@ -2167,37 +2161,61 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped,
break; break;
} }
// Avoid releasing the lock the first time in
if (curr_userid == DISCARD_ALL)
curr_userid = shares->userid;
/* The shares being removed here wont be touched by any other
* code, so we don't need to hold the shares_free lock the
* whole time, since that would slow down incoming share
* processing too much - this only affects DISCARD_ALL
* TODO: delete the shares when they are summarised in the
* sharesummary */
if (shares->userid != curr_userid) {
K_WUNLOCK(shares_free);
curr_userid = shares->userid;
K_WLOCK(shares_free);
}
(*shares_tot)++; (*shares_tot)++;
if (shares->errn == SE_NONE) if (shares->errn == SE_NONE)
(*diff_tot) += shares->diff; (*diff_tot) += shares->diff;
if (reloading && skipupdate) {
(*shares_dumped)++;
if (error[0])
multiple = true;
else {
snprintf(error, sizeof(error),
"%"PRId64"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
}
tmp_item = next_in_ktree(s_ctx); tmp_item = next_in_ktree(s_ctx);
remove_from_ktree(shares_root, s_item); remove_from_ktree(shares_root, s_item);
k_unlink_item(shares_store, s_item); k_unlink_item(shares_store, s_item);
if (reloading && skipupdate)
(*shares_dumped)++;
if (reloading && skipupdate && !error[0]) {
snprintf(error, sizeof(error),
"reload found aged share: %"PRId64
"/%"PRId64"/%s/%s%.0f",
shares->workinfoid,
shares->userid,
shares->workername,
(shares->errn == SE_NONE) ? "" : "*",
shares->diff);
}
k_add_head(shares_free, s_item); k_add_head(shares_free, s_item);
s_item = tmp_item; s_item = tmp_item;
} }
K_WUNLOCK(shares_free); K_WUNLOCK(shares_free);
if (error[0]) if (error[0]) {
LOGERR("%s(): %s", __func__, error); LOGERR("%s(): reload found %s aged share%s%s: %s",
__func__, multiple ? "multiple" : "an",
multiple ? "s" : EMPTY,
multiple ? ", the first was" : EMPTY,
error);
}
} }
// Duplicates during a reload are set to not show messages // Duplicates during a reload are set to not show messages
bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd,
char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last, tv_t *ss_first, tv_t *ss_last, int64_t *ss_count,
int64_t *ss_count, int64_t *s_count, int64_t *s_diff) int64_t *s_count, int64_t *s_diff)
{ {
K_ITEM *wi_item, ss_look, *ss_item; K_ITEM *wi_item, ss_look, *ss_item;
K_ITEM ks_look, *ks_item, *wm_item; K_ITEM ks_look, *ks_item, *wm_item;
@ -2208,6 +2226,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
int64_t diff_tot; int64_t diff_tot;
KEYSHARESUMMARY lookkeysharesummary, *keysharesummary; KEYSHARESUMMARY lookkeysharesummary, *keysharesummary;
SHARESUMMARY looksharesummary, *sharesummary; SHARESUMMARY looksharesummary, *sharesummary;
char complete[TXT_FLAG+1];
WORKINFO *workinfo; WORKINFO *workinfo;
bool ok = false, ksok = false, skipupdate = false; bool ok = false, ksok = false, skipupdate = false;
@ -2269,8 +2288,10 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
ss_look.data = (void *)(&looksharesummary); ss_look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free); K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item); DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
// complete could change, the id fields wont be changed/removed yet
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
while (ss_item && sharesummary->workinfoid == workinfoid) { while (ss_item && sharesummary->workinfoid == workinfoid) {
ss_tot++; ss_tot++;
skipupdate = false; skipupdate = false;
@ -2278,7 +2299,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
* so finding an aged sharesummary here is an error * so finding an aged sharesummary here is an error
* N.B. this can only happen with (very) old reload files */ * N.B. this can only happen with (very) old reload files */
if (reloading) { if (reloading) {
if (sharesummary->complete[0] == SUMMARY_COMPLETE) { if (complete[0] == SUMMARY_COMPLETE) {
ss_already++; ss_already++;
skipupdate = true; skipupdate = true;
if (confirm_sharesummary) { if (confirm_sharesummary) {
@ -2292,7 +2313,9 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
} }
if (!skipupdate) { if (!skipupdate) {
if (!sharesummary_age(ss_item, by, code, inet, cd)) { K_WLOCK(sharesummary_free);
if (!sharesummary_age(ss_item)) {
K_WUNLOCK(sharesummary_free);
ss_failed++; ss_failed++;
LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64, LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64,
__func__, sharesummary->userid, __func__, sharesummary->userid,
@ -2308,6 +2331,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
copy_tv(ss_first, &(sharesummary->firstshare)); copy_tv(ss_first, &(sharesummary->firstshare));
if (tv_newer(ss_last, &(sharesummary->lastshare))) if (tv_newer(ss_last, &(sharesummary->lastshare)))
copy_tv(ss_last, &(sharesummary->lastshare)); copy_tv(ss_last, &(sharesummary->lastshare));
K_WUNLOCK(sharesummary_free);
} }
} }
@ -2318,8 +2342,9 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code,
K_RLOCK(sharesummary_free); K_RLOCK(sharesummary_free);
ss_item = next_in_ktree(ss_ctx); ss_item = next_in_ktree(ss_ctx);
K_RUNLOCK(sharesummary_free);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item); DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
} }
if (ss_already || ss_failed || shares_dumped) { if (ss_already || ss_failed || shares_dumped) {
@ -2350,8 +2375,10 @@ skip_ss:
ks_look.data = (void *)(&lookkeysharesummary); ks_look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free); K_RLOCK(keysharesummary_free);
ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx); ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx);
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
// complete could change, the id fields wont be changed/removed yet
STRNCPY(complete, keysharesummary->complete);
K_RUNLOCK(keysharesummary_free);
while (ks_item && keysharesummary->workinfoid == workinfoid) { while (ks_item && keysharesummary->workinfoid == workinfoid) {
ks_tot++; ks_tot++;
skipupdate = false; skipupdate = false;
@ -2359,7 +2386,7 @@ skip_ss:
* so finding an aged keysharesummary here is an error * so finding an aged keysharesummary here is an error
* N.B. this can only happen with (very) old reload files */ * N.B. this can only happen with (very) old reload files */
if (reloading && !key_update) { if (reloading && !key_update) {
if (keysharesummary->complete[0] == SUMMARY_COMPLETE) { if (complete[0] == SUMMARY_COMPLETE) {
ks_already++; ks_already++;
skipupdate = true; skipupdate = true;
if (confirm_sharesummary) { if (confirm_sharesummary) {
@ -2373,20 +2400,25 @@ skip_ss:
} }
if (!skipupdate) { if (!skipupdate) {
K_WLOCK(keysharesummary_free);
if (!keysharesummary_age(ks_item)) { if (!keysharesummary_age(ks_item)) {
ks_failed++; ks_failed++;
K_WUNLOCK(keysharesummary_free);
LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s", LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s",
__func__, keysharesummary->workinfoid, __func__, keysharesummary->workinfoid,
keysharesummary->keytype, keysharesummary->keytype,
keysharesummary->key); keysharesummary->key);
ksok = false; ksok = false;
} else {
K_WUNLOCK(keysharesummary_free);
} }
} }
K_RLOCK(keysharesummary_free); K_RLOCK(keysharesummary_free);
ks_item = next_in_ktree(ks_ctx); ks_item = next_in_ktree(ks_ctx);
K_RUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item);
STRNCPY(complete, keysharesummary->complete);
K_RUNLOCK(keysharesummary_free);
} }
/* All shares should have been discarded during sharesummary /* All shares should have been discarded during sharesummary
@ -2527,19 +2559,17 @@ cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b)
void dsp_sharesummary(K_ITEM *item, FILE *stream) void dsp_sharesummary(K_ITEM *item, FILE *stream)
{ {
char createdate_buf[DATE_BUFSIZ];
SHARESUMMARY *s; SHARESUMMARY *s;
if (!item) if (!item)
fprintf(stream, "%s() called with (null) item\n", __func__); fprintf(stream, "%s() called with (null) item\n", __func__);
else { else {
DATA_SHARESUMMARY(s, item); DATA_SHARESUMMARY(s, item);
tv_to_buf(&(s->createdate), createdate_buf, sizeof(createdate_buf));
fprintf(stream, " uid=%"PRId64" wn='%s' wid=%"PRId64" " fprintf(stream, " uid=%"PRId64" wn='%s' wid=%"PRId64" "
"da=%f ds=%f ss=%f c='%s' cd=%s\n", "da=%f ds=%f ss=%f c='%s'\n",
s->userid, s->workername, s->workinfoid, s->userid, s->workername, s->workinfoid,
s->diffacc, s->diffsta, s->sharesta, s->diffacc, s->diffsta, s->sharesta,
s->complete, createdate_buf); s->complete);
} }
} }
@ -2632,8 +2662,7 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername)
} }
// key_update must age keysharesummary directly // key_update must age keysharesummary directly
static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, static void key_auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd)
char *code, char *inet, tv_t *cd)
{ {
static int64_t last_attempted_id = -1; static int64_t last_attempted_id = -1;
static int64_t prev_found = 0; static int64_t prev_found = 0;
@ -2651,6 +2680,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
int64_t age_id, do_id, to_id; int64_t age_id, do_id, to_id;
bool ok, found; bool ok, found;
K_WLOCK(workinfo_free);
if (workinfo_age_lock) {
K_WUNLOCK(workinfo_free);
return;
} else
workinfo_age_lock = true;
K_WUNLOCK(workinfo_free);
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found; age_id = prev_found;
@ -2663,15 +2700,15 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
INIT_KEYSHARESUMMARY(&look); INIT_KEYSHARESUMMARY(&look);
look.data = (void *)(&lookkeysharesummary); look.data = (void *)(&lookkeysharesummary);
K_RLOCK(keysharesummary_free);
kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
DATE_ZERO(&kss_first_min); DATE_ZERO(&kss_first_min);
DATE_ZERO(&kss_last_max); DATE_ZERO(&kss_last_max);
kss_count_tot = s_count_tot = s_diff_tot = 0; kss_count_tot = s_count_tot = s_diff_tot = 0;
found = false; found = false;
K_RLOCK(keysharesummary_free);
kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx);
DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item);
while (kss_item && keysharesummary->workinfoid < workinfoid) { while (kss_item && keysharesummary->workinfoid < workinfoid) {
if (keysharesummary->complete[0] == SUMMARY_NEW) { if (keysharesummary->complete[0] == SUMMARY_NEW) {
age_id = keysharesummary->workinfoid; age_id = keysharesummary->workinfoid;
@ -2686,9 +2723,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found);
// Don't repeat searching old items to avoid accessing their ram // Don't repeat searching old items to avoid accessing their ram
if (!found) if (!found) {
prev_found = workinfoid; prev_found = workinfoid;
else { } else {
/* Process all the consecutive keysharesummaries that's aren't aged /* Process all the consecutive keysharesummaries that's aren't aged
* This way we find each oldest 'batch' of keysharesummaries that have * This way we find each oldest 'batch' of keysharesummaries that have
* been missed and can report the range of data that was aged, * been missed and can report the range of data that was aged,
@ -2700,9 +2737,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
do_id = age_id; do_id = age_id;
to_id = 0; to_id = 0;
do { do {
ok = workinfo_age(do_id, poolinstance, by, code, inet, ok = workinfo_age(do_id, poolinstance, cd, &kss_first,
cd, &kss_first, &kss_last, &kss_count, &kss_last, &kss_count, &s_count,
&s_count, &s_diff); &s_diff);
kss_count_tot += kss_count; kss_count_tot += kss_count;
s_count_tot += s_count; s_count_tot += s_count;
@ -2774,12 +2811,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
idrange, keysharerange); idrange, keysharerange);
} }
} }
K_WLOCK(workinfo_free);
workinfo_age_lock = false;
K_WUNLOCK(workinfo_free);
} }
/* TODO: markersummary checking? /* TODO: markersummary checking?
* However, there should be no issues since the sharesummaries are removed */ * However, there should be no issues since the sharesummaries are removed */
void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd)
char *code, char *inet, tv_t *cd)
{ {
static int64_t last_attempted_id = -1; static int64_t last_attempted_id = -1;
static int64_t prev_found = 0; static int64_t prev_found = 0;
@ -2798,10 +2837,21 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
bool ok, found; bool ok, found;
if (key_update) { if (key_update) {
key_auto_age_older(workinfoid, poolinstance, by, code, inet, cd); key_auto_age_older(workinfoid, poolinstance, cd);
return; return;
} }
/* Simply lock out more than one from running at the same time
* This locks access to prev_found, repeat and last_attempted_id
* If any are missed they'll be aged by the next age_workinfo in 30s */
K_WLOCK(workinfo_free);
if (workinfo_age_lock) {
K_WUNLOCK(workinfo_free);
return;
} else
workinfo_age_lock = true;
K_WUNLOCK(workinfo_free);
LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found);
age_id = prev_found; age_id = prev_found;
@ -2814,15 +2864,15 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
INIT_SHARESUMMARY(&look); INIT_SHARESUMMARY(&look);
look.data = (void *)(&looksharesummary); look.data = (void *)(&looksharesummary);
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
DATE_ZERO(&ss_first_min); DATE_ZERO(&ss_first_min);
DATE_ZERO(&ss_last_max); DATE_ZERO(&ss_last_max);
ss_count_tot = s_count_tot = s_diff_tot = 0; ss_count_tot = s_count_tot = s_diff_tot = 0;
found = false; found = false;
K_RLOCK(sharesummary_free);
ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx);
DATA_SHARESUMMARY_NULL(sharesummary, ss_item);
while (ss_item && sharesummary->workinfoid < workinfoid) { while (ss_item && sharesummary->workinfoid < workinfoid) {
if (sharesummary->complete[0] == SUMMARY_NEW) { if (sharesummary->complete[0] == SUMMARY_NEW) {
age_id = sharesummary->workinfoid; age_id = sharesummary->workinfoid;
@ -2852,9 +2902,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
do_id = age_id; do_id = age_id;
to_id = 0; to_id = 0;
do { do {
ok = workinfo_age(do_id, poolinstance, by, code, inet, ok = workinfo_age(do_id, poolinstance, cd, &ss_first,
cd, &ss_first, &ss_last, &ss_count, &ss_last, &ss_count, &s_count,
&s_count, &s_diff); &s_diff);
ss_count_tot += ss_count; ss_count_tot += ss_count;
s_count_tot += s_count; s_count_tot += s_count;
@ -2926,6 +2976,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by,
idrange, sharerange); idrange, sharerange);
} }
} }
K_WLOCK(workinfo_free);
workinfo_age_lock = false;
K_WUNLOCK(workinfo_free);
} }
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS) void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS)

231
src/ckdb_dbio.c

@ -214,6 +214,17 @@ char *pqerrmsg(PGconn *conn)
#undef PQexec #undef PQexec
#undef PQexecParams #undef PQexecParams
/* Debug level to display write transactions - 0 removes the code
* Also enables checking the isread flag */
#define CKPQ_SHOW_WRITE 0
#define CKPQ_ISREAD1 "select "
#define CKPQ_ISREAD1LEN (sizeof(CKPQ_ISREAD1)-1)
#define CKPQ_ISREAD2 "declare "
#define CKPQ_ISREAD2LEN (sizeof(CKPQ_ISREAD2)-1)
#define CKPQ_ISREAD3 "fetch "
#define CKPQ_ISREAD3LEN (sizeof(CKPQ_ISREAD3)-1)
// Bug check to ensure no unexpected write txns occur // Bug check to ensure no unexpected write txns occur
PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS) PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
{ {
@ -221,6 +232,40 @@ PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS)
if (!isread && confirm_sharesummary) if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm"); quitfrom(1, file, func, line, "BUG: write txn during confirm");
#if CKPQ_SHOW_WRITE
if (isread) {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) {
LOGERR("%s() ERR: query flagged as read, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = false;
}
} else {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) {
LOGERR("%s() ERR: query flagged as write, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = true;
}
}
if (!isread) {
char *buf = NULL, ffl[128];
size_t len, off;
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, __func__);
APPEND_REALLOC(buf, off, len, "() W: '");
APPEND_REALLOC(buf, off, len, qry);
APPEND_REALLOC(buf, off, len, "'");
snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS);
APPEND_REALLOC(buf, off, len, ffl);
LOGMSGBUF(CKPQ_SHOW_WRITE, buf);
FREENULL(buf);
}
#endif
return PQexec(conn, qry); return PQexec(conn, qry);
} }
@ -237,6 +282,47 @@ PGresult *_CKPQexecParams(PGconn *conn, const char *qry,
if (!isread && confirm_sharesummary) if (!isread && confirm_sharesummary)
quitfrom(1, file, func, line, "BUG: write txn during confirm"); quitfrom(1, file, func, line, "BUG: write txn during confirm");
#if CKPQ_SHOW_WRITE
if (isread) {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) &&
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) {
LOGERR("%s() ERR: query flagged as read, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = false;
}
} else {
if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) ||
(strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) {
LOGERR("%s() ERR: query flagged as write, but isn't"
WHERE_FFL, __func__, WHERE_FFL_PASS);
isread = true;
}
}
if (!isread) {
char *buf = NULL, num[16], ffl[128];
size_t len, off;
int i;
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, __func__);
APPEND_REALLOC(buf, off, len, "() W: '");
APPEND_REALLOC(buf, off, len, qry);
APPEND_REALLOC(buf, off, len, "'");
for (i = 0; i < nParams; i++) {
snprintf(num, sizeof(num), " $%d='", i+1);
APPEND_REALLOC(buf, off, len, num);
APPEND_REALLOC(buf, off, len, paramValues[i]);
APPEND_REALLOC(buf, off, len, "'");
}
snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS);
APPEND_REALLOC(buf, off, len, ffl);
LOGMSGBUF(CKPQ_SHOW_WRITE, buf);
FREENULL(buf);
}
#endif
return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths, return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths,
paramFormats, resultFormat); paramFormats, resultFormat);
} }
@ -558,6 +644,17 @@ K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress,
LOGDEBUG("%s(): add", __func__); LOGDEBUG("%s(): add", __func__);
/* 2 attempts to add the same user at the same time will only do it once
* The 2nd attempt will get back the data provided by the 1st
* and thus throw away any differences in the 2nd */
K_WLOCK(users_db_free);
item = find_users(username);
if (item) {
ok = true;
goto already;
}
K_WLOCK(users_free); K_WLOCK(users_free);
item = k_unlink_head(users_free); item = k_unlink_head(users_free);
K_WUNLOCK(users_free); K_WUNLOCK(users_free);
@ -665,6 +762,10 @@ unitem:
} }
K_WUNLOCK(users_free); K_WUNLOCK(users_free);
already:
K_WUNLOCK(users_db_free);
if (ok) if (ok)
return item; return item;
else else
@ -1469,6 +1570,11 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault,
LOGDEBUG("%s(): update", __func__); LOGDEBUG("%s(): update", __func__);
/* Two attempts to update the same worker at the same time
* will determine the final state based on which gets the lock last,
* i.e. randomly, but without overwriting at the same time */
K_WLOCK(workers_db_free);
DATA_WORKERS(row, item); DATA_WORKERS(row, item);
if (check) { if (check) {
@ -1583,6 +1689,9 @@ unparam:
for (n = 0; n < par; n++) for (n = 0; n < par; n++)
free(params[n]); free(params[n]);
early: early:
K_WUNLOCK(workers_db_free);
return ok; return ok;
} }
@ -3399,6 +3508,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
{ {
K_ITEM *w_item, *wm_item, *ss_item; K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary; SHARESUMMARY *sharesummary;
char complete[TXT_FLAG+1];
WORKINFO *workinfo; WORKINFO *workinfo;
char *st = NULL; char *st = NULL;
@ -3479,13 +3589,14 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
K_RLOCK(sharesummary_free); K_RLOCK(sharesummary_free);
ss_item = find_sharesummary(shares->userid, shares->workername, ss_item = find_sharesummary(shares->userid, shares->workername,
shares->workinfoid); shares->workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) { if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item); DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) { if (sharesummary->complete[0] != SUMMARY_NEW) {
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
LOGDEBUG("%s(): '%s' sharesummary exists " LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld", "%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete, __func__, complete,
shares->workinfoid, shares->userid, shares->workinfoid, shares->userid,
st = safe_text_nonull(shares->workername), st = safe_text_nonull(shares->workername),
shares->createdate.tv_sec, shares->createdate.tv_sec,
@ -3495,6 +3606,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
return true; return true;
} }
} }
K_RUNLOCK(sharesummary_free);
} }
if (!key_update && !confirm_sharesummary) { if (!key_update && !confirm_sharesummary) {
@ -3504,8 +3616,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item,
K_WUNLOCK(userinfo_free); K_WUNLOCK(userinfo_free);
} }
sharesummary_update(shares, NULL, shares->createby, shares->createcode, sharesummary_update(shares, NULL, &(shares->createdate));
shares->createinet, &(shares->createdate));
return true; return true;
} }
@ -3660,6 +3771,8 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
K_RLOCK(users_free); K_RLOCK(users_free);
u_item = find_users(username); u_item = find_users(username);
K_RUNLOCK(users_free); K_RUNLOCK(users_free);
/* Can't change outside lock since we don't delete users
* or change their *userid */
if (!u_item) { if (!u_item) {
btv_to_buf(cd, cd_buf, sizeof(cd_buf)); btv_to_buf(cd, cd_buf, sizeof(cd_buf));
/* This should never happen unless there's a bug in ckpool /* This should never happen unless there's a bug in ckpool
@ -3672,7 +3785,6 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
goto tisbad; goto tisbad;
} }
DATA_USERS(users, u_item); DATA_USERS(users, u_item);
shares->userid = users->userid; shares->userid = users->userid;
TXT_TO_BIGINT("workinfoid", workinfoid, shares->workinfoid); TXT_TO_BIGINT("workinfoid", workinfoid, shares->workinfoid);
@ -4142,6 +4254,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
{ {
K_ITEM *w_item, *wm_item, *ss_item; K_ITEM *w_item, *wm_item, *ss_item;
SHARESUMMARY *sharesummary; SHARESUMMARY *sharesummary;
char complete[TXT_FLAG+1];
char *st = NULL; char *st = NULL;
LOGDEBUG("%s() add", __func__); LOGDEBUG("%s() add", __func__);
@ -4185,13 +4298,14 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
ss_item = find_sharesummary(shareerrors->userid, ss_item = find_sharesummary(shareerrors->userid,
shareerrors->workername, shareerrors->workername,
shareerrors->workinfoid); shareerrors->workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) { if (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item); DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->complete[0] != SUMMARY_NEW) { if (sharesummary->complete[0] != SUMMARY_NEW) {
STRNCPY(complete, sharesummary->complete);
K_RUNLOCK(sharesummary_free);
LOGDEBUG("%s(): '%s' sharesummary exists " LOGDEBUG("%s(): '%s' sharesummary exists "
"%"PRId64" %"PRId64"/%s/%ld,%ld", "%"PRId64" %"PRId64"/%s/%ld,%ld",
__func__, sharesummary->complete, __func__, complete,
shareerrors->workinfoid, shareerrors->workinfoid,
shareerrors->userid, shareerrors->userid,
st = safe_text_nonull(shareerrors->workername), st = safe_text_nonull(shareerrors->workername),
@ -4201,11 +4315,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors,
return false; return false;
} }
} }
K_RUNLOCK(sharesummary_free);
} }
sharesummary_update(NULL, shareerrors, shareerrors->createby, sharesummary_update(NULL, shareerrors, &(shareerrors->createdate));
shareerrors->createcode, shareerrors->createinet,
&(shareerrors->createdate));
return true; return true;
} }
@ -5182,14 +5295,13 @@ flail:
return ok; return ok;
} }
// Requires K_WLOCK(sharesummary_free)
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new, SHAREERRORS *e_row, bool new,
double *tdf, double *tdl) double *tdf, double *tdl)
{ {
tv_t *createdate; tv_t *createdate;
K_WLOCK(sharesummary_free);
if (s_row) if (s_row)
createdate = &(s_row->createdate); createdate = &(s_row->createdate);
else else
@ -5251,15 +5363,11 @@ static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
*tdf = tvdiff(createdate, &(row->firstshare)); *tdf = tvdiff(createdate, &(row->firstshare));
*tdl = tvdiff(createdate, &(row->lastshare)); *tdl = tvdiff(createdate, &(row->lastshare));
} }
K_WUNLOCK(sharesummary_free);
} }
static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row, static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row,
bool new) bool new)
{ {
K_WLOCK(keysharesummary_free);
if (new) { if (new) {
zero_keysharesummary(row); zero_keysharesummary(row);
copy_tv(&(row->firstshare), &(s_row->createdate)); copy_tv(&(row->firstshare), &(s_row->createdate));
@ -5307,8 +5415,6 @@ static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row,
row->sharerej++; row->sharerej++;
break; break;
} }
K_WUNLOCK(keysharesummary_free);
} }
/* Keep some simple stats on how often shares are out of order /* Keep some simple stats on how often shares are out of order
@ -5330,15 +5436,15 @@ char *ooo_status(char *buf, size_t siz)
/* sharesummaries are no longer stored in the DB but fields are updated as b4 /* sharesummaries are no longer stored in the DB but fields are updated as b4
* This creates/updates both the sharesummaries and the keysharesummaries */ * This creates/updates both the sharesummaries and the keysharesummaries */
bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd,
char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) WHERE_FFL_ARGS)
{ {
WORKMARKERS *wm; WORKMARKERS *wm;
SHARESUMMARY *row, *p_row; SHARESUMMARY *row, *p_row;
KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL; KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL;
K_ITEM *ss_item, *kiss_item = NULL, *kass_item = NULL, *wm_item, *p_item = NULL; K_ITEM *ss_item, *kiss_item = NULL, *kass_item = NULL, *wm_item, *p_item = NULL;
bool new = false, p_new = false, ki_new = false, ka_new = false; bool new = false, p_new = false, ki_new = false, ka_new = false;
int64_t userid, workinfoid; int64_t userid, workinfoid, markerid;
char *workername, *address = NULL, *agent = NULL; char *workername, *address = NULL, *agent = NULL;
char *st = NULL, *db = NULL; char *st = NULL, *db = NULL;
char ooo_buf[256]; char ooo_buf[256];
@ -5371,33 +5477,30 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
K_RLOCK(workmarkers_free); K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED, wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED,
NULL); NULL);
K_RUNLOCK(workmarkers_free);
if (wm_item) { if (wm_item) {
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
markerid = wm->markerid;
K_RUNLOCK(workmarkers_free);
LOGERR("%s(): attempt to update sharesummary " LOGERR("%s(): attempt to update sharesummary "
"with %s %"PRId64"/%"PRId64"/%s "CDDB" %s" "with %s %"PRId64"/%"PRId64"/%s "CDDB" %s"
" but processed workmarkers %"PRId64" exists", " but processed workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors", __func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, st = safe_text(workername), workinfoid, userid, st = safe_text(workername),
db = ctv_to_buf(cd, NULL, 0), db = ctv_to_buf(cd, NULL, 0), markerid);
wm->markerid);
FREENULL(st); FREENULL(st);
FREENULL(db); FREENULL(db);
return false; return false;
} }
K_RUNLOCK(workmarkers_free);
K_RLOCK(sharesummary_free); K_WLOCK(sharesummary_free);
ss_item = find_sharesummary(userid, workername, workinfoid); ss_item = find_sharesummary(userid, workername, workinfoid);
p_item = find_sharesummary_p(workinfoid);
K_RUNLOCK(sharesummary_free);
if (ss_item) { if (ss_item) {
DATA_SHARESUMMARY(row, ss_item); DATA_SHARESUMMARY(row, ss_item);
} else { } else {
new = true; new = true;
K_WLOCK(sharesummary_free);
ss_item = k_unlink_head(sharesummary_free); ss_item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(row, ss_item); DATA_SHARESUMMARY(row, ss_item);
bzero(row, sizeof(*row)); bzero(row, sizeof(*row));
row->userid = userid; row->userid = userid;
@ -5409,20 +5512,23 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data // N.B. this directly updates the non-key data
set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl); set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl);
if (new) {
add_to_ktree(sharesummary_root, ss_item);
add_to_ktree(sharesummary_workinfoid_root, ss_item);
k_add_head(sharesummary_store, ss_item);
}
K_WUNLOCK(sharesummary_free);
// Ignore shareerrors for keysummaries // Ignore shareerrors for keysummaries
if (s_row) { if (s_row) {
K_RLOCK(keysharesummary_free); K_WLOCK(keysharesummary_free);
kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address); kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address);
kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent);
K_RUNLOCK(keysharesummary_free);
if (kiss_item) { if (kiss_item) {
DATA_KEYSHARESUMMARY(ki_row, kiss_item); DATA_KEYSHARESUMMARY(ki_row, kiss_item);
} else { } else {
ki_new = true; ki_new = true;
K_WLOCK(keysharesummary_free);
kiss_item = k_unlink_head(keysharesummary_free); kiss_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ki_row, kiss_item); DATA_KEYSHARESUMMARY(ki_row, kiss_item);
bzero(ki_row, sizeof(*ki_row)); bzero(ki_row, sizeof(*ki_row));
ki_row->workinfoid = workinfoid; ki_row->workinfoid = workinfoid;
@ -5433,14 +5539,20 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data // N.B. this directly updates the non-key data
set_keysharesummary_stats(ki_row, s_row, ki_new); set_keysharesummary_stats(ki_row, s_row, ki_new);
if (ki_new) {
add_to_ktree(keysharesummary_root, kiss_item);
k_add_head(keysharesummary_store, kiss_item);
}
K_WUNLOCK(keysharesummary_free);
K_WLOCK(keysharesummary_free);
kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent);
if (kass_item) { if (kass_item) {
DATA_KEYSHARESUMMARY(ka_row, kass_item); DATA_KEYSHARESUMMARY(ka_row, kass_item);
} else { } else {
ka_new = true; ka_new = true;
K_WLOCK(keysharesummary_free);
kass_item = k_unlink_head(keysharesummary_free); kass_item = k_unlink_head(keysharesummary_free);
K_WUNLOCK(keysharesummary_free);
DATA_KEYSHARESUMMARY(ka_row, kass_item); DATA_KEYSHARESUMMARY(ka_row, kass_item);
bzero(ka_row, sizeof(*ka_row)); bzero(ka_row, sizeof(*ka_row));
ka_row->workinfoid = workinfoid; ka_row->workinfoid = workinfoid;
@ -5451,6 +5563,11 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
// N.B. this directly updates the non-key data // N.B. this directly updates the non-key data
set_keysharesummary_stats(ka_row, s_row, ka_new); set_keysharesummary_stats(ka_row, s_row, ka_new);
if (ka_new) {
add_to_ktree(keysharesummary_root, kass_item);
k_add_head(keysharesummary_store, kass_item);
}
K_WUNLOCK(keysharesummary_free);
} }
if (!new) { if (!new) {
@ -5506,13 +5623,14 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
} }
} }
K_WLOCK(sharesummary_free);
p_item = find_sharesummary_p(workinfoid);
if (p_item) { if (p_item) {
DATA_SHARESUMMARY(p_row, p_item); DATA_SHARESUMMARY(p_row, p_item);
} else { } else {
p_new = true; p_new = true;
K_WLOCK(sharesummary_free);
p_item = k_unlink_head(sharesummary_free); p_item = k_unlink_head(sharesummary_free);
K_WUNLOCK(sharesummary_free);
DATA_SHARESUMMARY(p_row, p_item); DATA_SHARESUMMARY(p_row, p_item);
bzero(p_row, sizeof(*p_row)); bzero(p_row, sizeof(*p_row));
POOL_SS(p_row); POOL_SS(p_row);
@ -5521,42 +5639,17 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by,
set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl); set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl);
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); if (p_new) {
add_to_ktree(sharesummary_pool_root, p_item);
// Store either new item k_add_head(sharesummary_pool_store, p_item);
if (new || p_new) {
K_WLOCK(sharesummary_free);
if (new) {
add_to_ktree(sharesummary_root, ss_item);
add_to_ktree(sharesummary_workinfoid_root, ss_item);
k_add_head(sharesummary_store, ss_item);
}
if (p_new) {
add_to_ktree(sharesummary_pool_root, p_item);
k_add_head(sharesummary_pool_store, p_item);
}
K_WUNLOCK(sharesummary_free);
}
if (ki_new || ka_new) {
K_WLOCK(keysharesummary_free);
if (ki_new) {
add_to_ktree(keysharesummary_root, kiss_item);
k_add_head(keysharesummary_store, kiss_item);
}
if (ka_new) {
add_to_ktree(keysharesummary_root, kass_item);
k_add_head(keysharesummary_store, kass_item);
}
K_WUNLOCK(keysharesummary_free);
} }
K_WUNLOCK(sharesummary_free);
return true; return true;
} }
// No key fields are modified // No key fields are modified
bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, bool sharesummary_age(K_ITEM *ss_item)
tv_t *cd, WHERE_FFL_ARGS)
{ {
SHARESUMMARY *row; SHARESUMMARY *row;
@ -5566,8 +5659,6 @@ bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet,
row->complete[0] = SUMMARY_COMPLETE; row->complete[0] = SUMMARY_COMPLETE;
row->complete[1] = '\0'; row->complete[1] = '\0';
MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet);
return true; return true;
} }

Loading…
Cancel
Save