|
|
|
@ -1689,7 +1689,7 @@ static void trans_seq(tv_t *now)
|
|
|
|
|
static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, |
|
|
|
|
uint64_t n_seqstt, uint64_t n_seqpid, |
|
|
|
|
char *nam, tv_t *now, tv_t *cd, char *code, |
|
|
|
|
bool warndup, char *msg) |
|
|
|
|
int seqitemflags, char *msg) |
|
|
|
|
{ |
|
|
|
|
char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; |
|
|
|
|
bool firstseq, newseq, expseq, gothigh, gotstale, gotstalestart; |
|
|
|
@ -1697,13 +1697,13 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
|
|
|
|
|
SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; |
|
|
|
|
bool dup, wastrans, doitem, dotime; |
|
|
|
|
SEQDATA *seqdata; |
|
|
|
|
SEQITEM *seqitem; |
|
|
|
|
SEQITEM *seqitem, seqitem_copy; |
|
|
|
|
K_ITEM *seqset_item = NULL, *st_item = NULL; |
|
|
|
|
SEQTRANS *seqtrans = NULL; |
|
|
|
|
size_t siz, end; |
|
|
|
|
void *off0, *offn; |
|
|
|
|
uint64_t u; |
|
|
|
|
int set = -1, highlimit, i; |
|
|
|
|
int set = -1, expset = -1, highlimit, i; |
|
|
|
|
K_STORE *lost; |
|
|
|
|
|
|
|
|
|
// We store the lost items in here
|
|
|
|
@ -1732,10 +1732,10 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
|
|
|
|
|
} |
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Need to get a new seqset
|
|
|
|
|
// Need to setup a new seqset
|
|
|
|
|
newseq = true; |
|
|
|
|
if (!firstseq) { |
|
|
|
|
/* The current seqset (about to become the previous)
|
|
|
|
|
/* The current seqset (may become the previous)
|
|
|
|
|
* If !seqset_store->head (i.e. a bug) this will quit() */ |
|
|
|
|
DATA_SEQSET(seqset0, seqset_store->head); |
|
|
|
|
memcpy(&seqset_pre, seqset0, sizeof(seqset_pre)); |
|
|
|
@ -1796,16 +1796,59 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
|
|
|
|
|
LIST_MEM_ADD_SIZ(seqset_free, end); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// Expire the last set and overwrite it
|
|
|
|
|
seqset_item = k_unlink_tail(seqset_store); |
|
|
|
|
// If !item (i.e. a bug) this will quit()
|
|
|
|
|
// Expire the oldest set and overwrite it
|
|
|
|
|
K_ITEM *ss_item; |
|
|
|
|
SEQSET *ss = NULL; |
|
|
|
|
int s = 0; |
|
|
|
|
seqset = NULL; |
|
|
|
|
seqset_item = NULL; |
|
|
|
|
ss_item = seqset_store->head; |
|
|
|
|
while (ss_item) { |
|
|
|
|
DATA_SEQSET(ss, ss_item); |
|
|
|
|
if (!seqset) { |
|
|
|
|
seqset = ss; |
|
|
|
|
seqset_item = ss_item; |
|
|
|
|
expset = s; |
|
|
|
|
} else { |
|
|
|
|
// choose the last match
|
|
|
|
|
if (ss->seqstt >= seqset->seqstt) { |
|
|
|
|
seqset = ss; |
|
|
|
|
seqset_item = ss_item; |
|
|
|
|
expset = s; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
ss_item = ss_item->next; |
|
|
|
|
s++; |
|
|
|
|
} |
|
|
|
|
// If !seqset_item (i.e. a bug) k_unlink_item() will quit()
|
|
|
|
|
k_unlink_item(seqset_store, seqset_item); |
|
|
|
|
DATA_SEQSET(seqset, seqset_item); |
|
|
|
|
memcpy(&seqset_exp, seqset, sizeof(seqset_exp)); |
|
|
|
|
expseq = true; |
|
|
|
|
RESETSET(seqset, n_seqstt, n_seqpid); |
|
|
|
|
} |
|
|
|
|
/* Since the pool queue is active during the reload, sets can be out
|
|
|
|
|
* of order, so each new one should be added depending upon the value |
|
|
|
|
* of seqstt so the current pool is first, to minimise searching |
|
|
|
|
* seqset_store, but the order of the rest isn't as important |
|
|
|
|
* N.B. a new set is only created once per pool start */ |
|
|
|
|
if (firstseq) { |
|
|
|
|
k_add_head(seqset_store, seqset_item); |
|
|
|
|
set = 0; |
|
|
|
|
} else { |
|
|
|
|
// seqset0 already is the head
|
|
|
|
|
if (n_seqstt >= seqset0->seqstt) { |
|
|
|
|
// if new set is >= head then make it the head
|
|
|
|
|
k_add_head(seqset_store, seqset_item); |
|
|
|
|
set = 0; |
|
|
|
|
} else { |
|
|
|
|
// put it next after the head
|
|
|
|
|
k_insert_after(seqset_store, seqset_item, |
|
|
|
|
seqset_store->head); |
|
|
|
|
set = 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gotseqset: |
|
|
|
|
doitem = dotime = false; |
|
|
|
@ -1895,6 +1938,7 @@ gotseqset:
|
|
|
|
|
if (!ITEMISMIS(seqitem)) { |
|
|
|
|
dup = true; |
|
|
|
|
memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); |
|
|
|
|
memcpy(&seqitem_copy, seqitem, sizeof(seqitem_copy)); |
|
|
|
|
} else { |
|
|
|
|
// Found a missing one
|
|
|
|
|
seqdata->missing--; |
|
|
|
@ -1942,6 +1986,7 @@ gotseqset:
|
|
|
|
|
setitemdata: |
|
|
|
|
// Store the new seq if flagged to do so
|
|
|
|
|
if (doitem) { |
|
|
|
|
seqitem->flags = seqitemflags; |
|
|
|
|
copy_tv(&(seqitem->time), now); |
|
|
|
|
copy_tv(&(seqitem->cd), cd); |
|
|
|
|
STRNCPY(seqitem->code, code); |
|
|
|
@ -1969,13 +2014,13 @@ setitemdata:
|
|
|
|
|
nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); |
|
|
|
|
} else { |
|
|
|
|
if (newseq) { |
|
|
|
|
// previous set is set 1
|
|
|
|
|
SEQSETWARN(1, &seqset_pre, "previous", EMPTY); |
|
|
|
|
} |
|
|
|
|
if (expseq) { |
|
|
|
|
// set -1 means it was the discarded/removed last set
|
|
|
|
|
SEQSETWARN(-1, &seqset_exp, "discarded old", " for:"); |
|
|
|
|
if (set == 0) |
|
|
|
|
SEQSETWARN(0, &seqset_pre, "previous", EMPTY); |
|
|
|
|
else |
|
|
|
|
SEQSETWARN(0, &seqset_pre, "current", EMPTY); |
|
|
|
|
} |
|
|
|
|
if (expseq) |
|
|
|
|
SEQSETWARN(expset, &seqset_exp, "discarded old", " for:"); |
|
|
|
|
if (newseq || expseq) { |
|
|
|
|
btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); |
|
|
|
|
LOGWARNING("Seq created new: %s %"PRIu64" " |
|
|
|
@ -1985,15 +2030,20 @@ setitemdata:
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (dup) { |
|
|
|
|
int level = LOG_DEBUG; |
|
|
|
|
if (warndup) |
|
|
|
|
level = LOG_WARNING; |
|
|
|
|
int level = LOG_WARNING; |
|
|
|
|
/* If one is SI_RELOAD and the other is SI_EARLYSOCK then it's
|
|
|
|
|
* not unexpected so only LOG_DEBUG */ |
|
|
|
|
if (((seqitem_copy.flags | seqitemflags) & SI_RELOAD) && |
|
|
|
|
((seqitem_copy.flags | seqitemflags) & SI_EARLYSOCK)) |
|
|
|
|
level = LOG_DEBUG; |
|
|
|
|
btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); |
|
|
|
|
bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); |
|
|
|
|
LOGMSG(level, "SEQ dup %s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 |
|
|
|
|
" %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 |
|
|
|
|
"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 |
|
|
|
|
"/OK%"PRIu64" cmd=%.42s...", |
|
|
|
|
LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set %d/%"PRIu64 |
|
|
|
|
"=%s/%"PRIu64" %s/%s v%"PRIu64"/^%"PRIu64 |
|
|
|
|
"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64 |
|
|
|
|
"/H%"PRIu64"/OK%"PRIu64" cmd=%.42s...", |
|
|
|
|
(level == LOG_DEBUG) ? "*" : EMPTY, |
|
|
|
|
SICHR(seqitemflags), SICHR(seqitem_copy.flags), |
|
|
|
|
nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, |
|
|
|
|
t_buf2, code, |
|
|
|
|
seqset_copy.seqdata[seq].minseq, |
|
|
|
@ -2009,12 +2059,9 @@ setitemdata:
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (wastrans) { |
|
|
|
|
int level = LOG_DEBUG; |
|
|
|
|
if (warndup) |
|
|
|
|
level = LOG_WARNING; |
|
|
|
|
btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); |
|
|
|
|
bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); |
|
|
|
|
LOGMSG(level, "SEQ found trans %s %"PRIu64" set %d/%"PRIu64 |
|
|
|
|
LOGWARNING("SEQ found trans %s %"PRIu64" set %d/%"PRIu64 |
|
|
|
|
"=%s/%"PRIu64" %s/%s", |
|
|
|
|
nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, |
|
|
|
|
t_buf2, code); |
|
|
|
@ -2080,13 +2127,13 @@ setitemdata:
|
|
|
|
|
|
|
|
|
|
static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, |
|
|
|
|
tv_t *now, char *msg, K_TREE *trf_root, |
|
|
|
|
bool wantauth) |
|
|
|
|
bool wantauth, int seqitemflags) |
|
|
|
|
{ |
|
|
|
|
uint64_t n_seqall, n_seqstt, n_seqpid, n_seqcmd; |
|
|
|
|
K_ITEM *seqstt, *seqpid, *seqcmd, *i_code; |
|
|
|
|
char *err = NULL, *st = NULL; |
|
|
|
|
size_t len, off; |
|
|
|
|
bool dupall, dupcmd, warndup; |
|
|
|
|
bool dupall, dupcmd; |
|
|
|
|
char *code = NULL; |
|
|
|
|
char buf[64]; |
|
|
|
|
|
|
|
|
@ -2169,19 +2216,10 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd,
|
|
|
|
|
code = EMPTY; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!startup_complete) |
|
|
|
|
warndup = true; |
|
|
|
|
else { |
|
|
|
|
if (reload_queue_complete) |
|
|
|
|
warndup = true; |
|
|
|
|
else |
|
|
|
|
warndup = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dupall = update_seq(SEQ_ALL, n_seqall, n_seqstt, n_seqpid, SEQALL, |
|
|
|
|
now, cd, code, warndup, msg); |
|
|
|
|
now, cd, code, seqitemflags, msg); |
|
|
|
|
dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, |
|
|
|
|
buf, now, cd, code, warndup, msg); |
|
|
|
|
buf, now, cd, code, seqitemflags, msg); |
|
|
|
|
|
|
|
|
|
if (dupall != dupcmd) { |
|
|
|
|
// Bad/corrupt data or a code bug
|
|
|
|
@ -2212,7 +2250,8 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd,
|
|
|
|
|
|
|
|
|
|
static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, |
|
|
|
|
char *buf, int *which_cmds, char *cmd, |
|
|
|
|
char *id, tv_t *now, tv_t *cd, bool wantauth) |
|
|
|
|
char *id, tv_t *now, tv_t *cd, bool wantauth, |
|
|
|
|
int seqitemflags) |
|
|
|
|
{ |
|
|
|
|
char reply[1024] = ""; |
|
|
|
|
TRANSFER *transfer; |
|
|
|
@ -2530,7 +2569,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
|
|
|
|
|
if (seqall) { |
|
|
|
|
enum cmd_values ret; |
|
|
|
|
ret = process_seq(seqall, *which_cmds, cd, now, buf, |
|
|
|
|
*trf_root, wantauth); |
|
|
|
|
*trf_root, wantauth, seqitemflags); |
|
|
|
|
free(cmdptr); |
|
|
|
|
return ret; |
|
|
|
|
} else { |
|
|
|
@ -3594,10 +3633,13 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
else |
|
|
|
|
LOGDEBUG("Duplicate '%s' message received", duptype); |
|
|
|
|
} else { |
|
|
|
|
int seqitemflags = SI_SOCKET; |
|
|
|
|
if (!reload_queue_complete) |
|
|
|
|
seqitemflags = SI_EARLYSOCK; |
|
|
|
|
LOGQUE(buf); |
|
|
|
|
cmdnum = breakdown(&trf_root, &trf_store, buf, |
|
|
|
|
&which_cmds, cmd, id, &now, |
|
|
|
|
&cd, true); |
|
|
|
|
&cd, true, seqitemflags); |
|
|
|
|
switch (cmdnum) { |
|
|
|
|
case CMD_DUPSEQ: |
|
|
|
|
snprintf(reply, sizeof(reply), "%s.%ld.dup.", id, now.tv_sec); |
|
|
|
@ -3902,7 +3944,7 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) |
|
|
|
|
static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) |
|
|
|
|
{ |
|
|
|
|
char cmd[CMD_SIZ+1], id[ID_SIZ+1]; |
|
|
|
|
enum cmd_values cmdnum; |
|
|
|
@ -3913,7 +3955,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
|
|
|
|
|
TRANSFER *transfer; |
|
|
|
|
K_ITEM *item; |
|
|
|
|
tv_t now, cd; |
|
|
|
|
bool finished; |
|
|
|
|
bool matched; |
|
|
|
|
|
|
|
|
|
// Once we've read the message
|
|
|
|
|
setnow(&now); |
|
|
|
@ -3929,19 +3971,19 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
|
|
|
|
|
else |
|
|
|
|
LOGERR("%s() Empty message line %"PRIu64, __func__, count); |
|
|
|
|
} else { |
|
|
|
|
finished = false; |
|
|
|
|
matched = false; |
|
|
|
|
ck_wlock(&fpm_lock); |
|
|
|
|
if (first_pool_message && strcmp(first_pool_message, buf) == 0) |
|
|
|
|
finished = true; |
|
|
|
|
if (first_pool_message && strcmp(first_pool_message, buf) == 0) { |
|
|
|
|
matched = true; |
|
|
|
|
FREENULL(first_pool_message); |
|
|
|
|
} |
|
|
|
|
ck_wunlock(&fpm_lock); |
|
|
|
|
if (finished) { |
|
|
|
|
if (matched) |
|
|
|
|
LOGERR("%s() reload ckpool queue match at line %"PRIu64, __func__, count); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LOGQUE(buf); |
|
|
|
|
cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, |
|
|
|
|
cmd, id, &now, &cd, false); |
|
|
|
|
cmd, id, &now, &cd, false, SI_RELOAD); |
|
|
|
|
switch (cmdnum) { |
|
|
|
|
// Don't ever attempt to double process reload data
|
|
|
|
|
case CMD_DUPSEQ: |
|
|
|
@ -4029,8 +4071,6 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tick(); |
|
|
|
|
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 10Mb for now - transactiontree can be large
|
|
|
|
@ -4112,10 +4152,10 @@ static bool reload_from(tv_t *start)
|
|
|
|
|
PGconn *conn = NULL; |
|
|
|
|
char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; |
|
|
|
|
size_t rflen = strlen(restorefrom); |
|
|
|
|
char *missingfirst = NULL, *missinglast = NULL; |
|
|
|
|
char *missingfirst = NULL, *missinglast = NULL, *st = NULL; |
|
|
|
|
int missing_count; |
|
|
|
|
int processing; |
|
|
|
|
bool finished = false, matched = false, ret = true, ok, apipe = false; |
|
|
|
|
bool finished = false, ret = true, ok, apipe = false; |
|
|
|
|
char *filename = NULL; |
|
|
|
|
uint64_t count, total; |
|
|
|
|
tv_t now, begin; |
|
|
|
@ -4160,8 +4200,9 @@ static bool reload_from(tv_t *start)
|
|
|
|
|
* aborting early and not get the few slightly later out of |
|
|
|
|
* order messages in the log file */ |
|
|
|
|
while (!everyone_die &&
|
|
|
|
|
logline(reload_buf, MAX_READ, fp, filename)) |
|
|
|
|
matched = reload_line(conn, filename, ++count, reload_buf); |
|
|
|
|
logline(reload_buf, MAX_READ, fp, filename)) { |
|
|
|
|
reload_line(conn, filename, ++count, reload_buf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", |
|
|
|
|
__func__, |
|
|
|
@ -4244,16 +4285,14 @@ static bool reload_from(tv_t *start)
|
|
|
|
|
if (everyone_die) |
|
|
|
|
return true; |
|
|
|
|
|
|
|
|
|
if (!matched) { |
|
|
|
|
ck_wlock(&fpm_lock); |
|
|
|
|
if (first_pool_message) { |
|
|
|
|
LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", |
|
|
|
|
__func__, first_pool_message); |
|
|
|
|
LOGERR("%s() restart ckdb to resolve this", __func__); |
|
|
|
|
ret = false; |
|
|
|
|
LOGERR("%s() reload didn't finding first ckpool queue '%.32s...", |
|
|
|
|
__func__, st = safe_text(first_pool_message)); |
|
|
|
|
FREENULL(st); |
|
|
|
|
FREENULL(first_pool_message); |
|
|
|
|
} |
|
|
|
|
ck_wunlock(&fpm_lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
reloading = false; |
|
|
|
|
FREENULL(reload_buf); |
|
|
|
|