Browse Source

ckdb - check queue seq after reload completes

master
kanoi 10 years ago
parent
commit
2d0db60443
  1. 1293
      src/ckdb.c
  2. 142
      src/ckdb.h
  3. 1
      src/ckdb_cmd.c
  4. 51
      src/ckdb_data.c

1293
src/ckdb.c

File diff suppressed because it is too large Load Diff

142
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.080"
#define CKDB_VERSION DB_VERSION"-1.090"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -699,25 +699,47 @@ typedef struct logqueue {
extern K_LIST *logqueue_free;
extern K_STORE *logqueue_store;
// WORKQUEUE
typedef struct workqueue {
char *buf;
// MSGLINE
typedef struct msgline {
int which_cmds;
enum cmd_values cmdnum;
char cmd[CMD_SIZ+1];
char id[ID_SIZ+1];
tv_t now;
char by[TXT_SML+1];
char code[TXT_MED+1];
char inet[TXT_MED+1];
tv_t cd;
char id[ID_SIZ+1];
char cmd[CMD_SIZ+1];
char *msg;
bool hasseq;
char *seqcmdnam;
uint64_t n_seqall;
uint64_t n_seqcmd;
uint64_t n_seqstt;
uint64_t n_seqpid;
int seqentryflags;
char *code;
K_TREE *trf_root;
K_STORE *trf_store;
} MSGLINE;
#define ALLOC_MSGLINE 8192
#define LIMIT_MSGLINE 0
#define CULL_MSGLINE 16
#define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline)
#define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true)
#define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false)
extern K_LIST *msgline_free;
extern K_STORE *msgline_store;
// WORKQUEUE
typedef struct workqueue {
K_ITEM *msgline_item;
char *by;
char *code;
char *inet;
} WORKQUEUE;
#define ALLOC_WORKQUEUE 1024
#define LIMIT_WORKQUEUE 0
#define CULL_WORKQUEUE 16
#define CULL_WORKQUEUE 32
#define INIT_WORKQUEUE(_item) INIT_GENERIC(_item, workqueue)
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
@ -795,28 +817,28 @@ extern tv_t missing_secuser_max;
* missing counters are incremented - maxseq will now be N+5
* Once we reach N+size we need to discard N and use it as N+size
* and increment seqbase N
* When we discard the oldest item due to needing more, if that oldest
* item was missing, it is now considered lost and the lost counters
* When we discard the oldest entry due to needing more, if that oldest
* entry was missing, it is now considered lost and the lost counters
* are incremented (and missing counters decremented)
* If we receive an item N+x where N+x-maxseq>highlimit we reject it as high
* If we receive an entry N+x where N+x-maxseq>highlimit we reject it as high
* and increment the high counters - this avoids creating a high bad sequence
* number and flagging any missing sequence numbers, most likely incorrectly,
* as lost in the range N to N+x-size
* If we receive an item N-x i.e. less than seqbase N, then:
* If we receive an entry N-x i.e. less than seqbase N, then:
* If maxseq-N = size then N-x is considered stale and the stale counters
* are incremented since there's no unused items below N available
* are incremented since there's no unused entries below N available
* This shouldn't normally happen after we've received size seq numbers
* Else maxseq-N is less than size, that means there are unused items below N
* Else maxseq-N is less than size, that means there are unused entries below N
* This will usually only be with a new sequence and the first seq was out
* of order, before lower sequence numbers, thus maxseq should be close to
* seqbase N and no where near N+size and there should be x unused below N
* If there are x unused items below N then we can move seqbase down to N-x
* If there are x unused entries below N then we can move seqbase down to N-x
* after we flag all N-1,N-2,N-3..N-(x-1) as missing
* Else there aren't enough unused items below N, then N-x is considered
* Else there aren't enough unused entries below N, then N-x is considered
* stale and the stale counters are incremented
*
* timelimit is an early limit to flag missing sequence numbers as 'transient'
* Normally, if a missing item at N is later reused, it will be discarded and
* Normally, if a missing entry at N is later reused, it will be discarded and
* reported as lost
* After the reload queue is complete, timelimit reports missing sequence
* numbers early, as transient, if they have been missing for 'timelimit' but
@ -827,7 +849,29 @@ extern tv_t missing_secuser_max;
* reload queue has cleared after ckdb startup, it will report the transient
* missing sequence numbers shortly after the timelimit
* When they are later found or lost they will again be reported, this time as
* found or lost */
* found or lost
*
* The sequence fields are checked for validity when the message arrives
* Sequence order checking of reload lines are done immediately
* Checking of ckpool lines isn't done until after the reload completes
* This solves the problem of the overlap from reload to queue
* If we stop the reload when we first match the queue, there's the issue
* that data order in the reload file may not match the data order in the
* queue and thus we could lose a record that is late in the reload file
* but was before the queue start
* To solve this unlikely (but not impossible) issue we reload all reload
* files to the end and then sequence process the queued data after the
* reload completes
* This however produces another (solvable) problem that the queue start
* may be stale when we finally complete the reload
* To handle this we keep a list of all lost records in the reload and check
* the stale queue records against those to see if we found them
* If a queue line is lower than minseq then that's a sequence error
* If a queue line is stale but above minseq, we check if it is in the
* lost list and then report it as recovered (and process it)
* If it wasn't lost then it's an expected duplicate and ignored
* Once the queue exceeds the reload maxseq, the lost records are no longer
* needed and are discarded */
// ckpool sequence numbers
#define SEQALL "seqall"
@ -860,30 +904,26 @@ enum seq_num {
// Ensure size is a (multiple of 8)-1
#define SEQ_CODE 15
#define SICHR(_sif) (((_sif) == SI_EARLYSOCK) ? 'E' : \
(((_sif) == SI_RELOAD) ? 'R' : \
(((_sif) == SI_SOCKET) ? 'S' : '?')))
#define SECHR(_sif) (((_sif) == SE_EARLYSOCK) ? 'E' : \
(((_sif) == SE_RELOAD) ? 'R' : \
(((_sif) == SE_SOCKET) ? 'S' : '?')))
// Msg from the socket before startup completed - ignore if it was a DUP
#define SI_EARLYSOCK 1
// Msg from the socket before startup completed
#define SE_EARLYSOCK 1
// Msg was from reload
#define SI_RELOAD 2
#define SE_RELOAD 2
// Msg from the socket after startup completed
#define SI_SOCKET 4
/* An SI_EARLYSOCK item vs an SI_RELOAD item is not considered a DUP
* since the reload reads to the end of the reload file after
* the match between the queue and the reload has been found */
#define SE_SOCKET 4
typedef struct seqitem {
typedef struct seqentry {
int flags;
tv_t cd; // sec:0=missing, usec:0=miss !0=trans
tv_t time;
char code[SEQ_CODE+1];
} SEQITEM;
} SEQENTRY;
typedef struct seqdata {
size_t size; // item count - MUST be a power of 2
size_t size; // entry count - MUST be a power of 2
uint64_t highlimit;
int timelimit;
uint64_t minseq;
@ -894,12 +934,15 @@ typedef struct seqdata {
uint64_t lost;
uint64_t stale;
uint64_t high;
uint64_t recovered;
uint64_t ok;
uint64_t reloadmax;
tv_t firsttime;
tv_t lasttime;
tv_t firstcd;
tv_t lastcd;
SEQITEM *item;
SEQENTRY *entry;
K_STORE *reload_lost;
} SEQDATA;
// SEQSET
@ -911,6 +954,7 @@ typedef struct seqset {
uint64_t lost; // total from seqdata
uint64_t stale; // total from seqdata
uint64_t high; // total from seqdata
uint64_t recovered; // total from seqdata
uint64_t ok; // total from seqdata
SEQDATA seqdata[SEQ_MAX];
} SEQSET;
@ -920,15 +964,22 @@ typedef struct seqset {
* the first time it processes a record with sequences */
// SEQALL and SHARES */
#define SEQ_LARGE_LIM 64
#define SEQ_LARGE_SIZ (65536*SEQ_LARGE_LIM)
#define SEQ_LARGE_TRANS_LIM 16
#define SEQ_LARGE_SIZ (65536*SEQ_LARGE_TRANS_LIM)
// WORKERSTATS, AUTH and ADDRAUTH
#define SEQ_MEDIUM_LIM 128
#define SEQ_MEDIUM_TRANS_LIM 32
#define SEQ_MEDIUM_SIZ 65536
// The rest
#define SEQ_SMALL_LIM 128
#define SEQ_SMALL_TRANS_LIM 64
#define SEQ_SMALL_SIZ 16384
// highlimit ratio (shift down bits)
#define HIGH_SHIFT 8
// Smallest highlimit allowed
#define HIGH_MIN 32
// Smallest _SIZ allowed
#define BASE_SIZ (HIGH_MIN << HIGH_SHIFT)
#define ALLOC_SEQSET 1
#define LIMIT_SEQSET 16
#define INIT_SEQSET(_item) INIT_GENERIC(_item, seqset)
@ -941,11 +992,11 @@ extern K_STORE *seqset_store;
// Initialised when seqset_free is allocated
extern char *seqnam[SEQ_MAX];
// SEQTRANS
// SEQTRANS also used for reload_lost
typedef struct seqtrans {
int seq;
uint64_t seqnum;
SEQITEM item;
SEQENTRY entry;
} SEQTRANS;
// The stores are created and freed each time required
@ -953,9 +1004,10 @@ extern K_LIST *seqtrans_free;
#define ALLOC_SEQTRANS 1024
#define LIMIT_SEQTRANS 0
#define CULL_SEQTRANS 65536
#define CULL_SEQTRANS 64
#define INIT_SEQTRANS(_item) INIT_GENERIC(_item, seqtrans)
#define DATA_SEQTRANS(_var, _item) DATA_GENERIC(_var, _item, seqtrans, true)
#define DATA_SEQTRANS_NULL(_var, _item) DATA_GENERIC(_var, _item, seqtrans, false)
// USERS
typedef struct users {
@ -1902,13 +1954,15 @@ extern void sequence_report(bool lock);
#define PPLNSDIFFADD "pplns_diff_add"
// Data free functions (first)
extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull);
extern void free_workinfo_data(K_ITEM *item);
extern void free_sharesummary_data(K_ITEM *item);
extern void free_optioncontrol_data(K_ITEM *item);
extern void free_markersummary_data(K_ITEM *item);
extern void free_workmarkers_data(K_ITEM *item);
extern void free_marks_data(K_ITEM *item);
extern void free_seqset_data(K_ITEM *item);
#define free_seqset_data(_item) _free_seqset_data(_item, false)
extern void _free_seqset_data(K_ITEM *item, bool lock);
#define safe_text(_txt) _safe_text(_txt, true)
#define safe_text_nonull(_txt) _safe_text(_txt, false)

1
src/ckdb_cmd.c

@ -5194,6 +5194,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(poolstats, 1, 1);
USEINFO(userstats, 2, 1);
USEINFO(workerstatus, 1, 1);
USEINFO(msgline, 1, 0);
USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0);
USEINFO(heartbeatqueue, 1, 0);

51
src/ckdb_data.c

@ -11,6 +11,39 @@
#include <math.h>
// Data free functions (added here as needed)
void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull)
{
K_ITEM *t_item = NULL;
TRANSFER *transfer;
MSGLINE *msgline;
DATA_MSGLINE(msgline, item);
if (msgline->trf_root)
msgline->trf_root = free_ktree(msgline->trf_root, NULL);
if (msgline->trf_store) {
t_item = msgline->trf_store->head;
while (t_item) {
DATA_TRANSFER(transfer, t_item);
if (transfer->mvalue != transfer->svalue)
FREENULL(transfer->mvalue);
t_item = t_item->next;
}
if (t_lock)
K_WLOCK(transfer_free);
k_list_transfer_to_head(msgline->trf_store, transfer_free);
if (t_cull) {
if (transfer_free->count == transfer_free->total &&
transfer_free->total >= ALLOC_TRANSFER * CULL_TRANSFER)
k_cull_list(transfer_free);
}
if (t_lock)
K_WUNLOCK(transfer_free);
msgline->trf_store = k_free_store(msgline->trf_store);
}
FREENULL(msgline->msg);
}
void free_workinfo_data(K_ITEM *item)
{
WORKINFO *workinfo;
@ -87,15 +120,27 @@ void free_marks_data(K_ITEM *item)
FREENULL(marks->extra);
}
void free_seqset_data(K_ITEM *item)
void _free_seqset_data(K_ITEM *item, bool lock)
{
K_STORE *reload_lost;
SEQSET *seqset;
int i;
DATA_SEQSET(seqset, item);
if (seqset->seqstt) {
for (i = 0; i < SEQ_MAX; i++)
FREENULL(seqset->seqdata[i].item);
for (i = 0; i < SEQ_MAX; i++) {
reload_lost = seqset->seqdata[i].reload_lost;
if (reload_lost) {
if (lock)
K_WLOCK(seqtrans_free);
k_list_transfer_to_head(reload_lost, seqtrans_free);
if (lock)
K_WUNLOCK(seqtrans_free);
k_free_store(reload_lost);
seqset->seqdata[i].reload_lost = NULL;
}
FREENULL(seqset->seqdata[i].entry);
}
seqset->seqstt = 0;
}
}

Loading…
Cancel
Save