From 2d0db60443fce22d4ad14ea0caf856cfb931a25b Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 29 Apr 2015 23:38:44 +1000 Subject: [PATCH] ckdb - check queue seq after reload completes --- src/ckdb.c | 1293 ++++++++++++++++++++++++++++------------------- src/ckdb.h | 142 ++++-- src/ckdb_cmd.c | 1 + src/ckdb_data.c | 51 +- 4 files changed, 933 insertions(+), 554 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 598237de..bf579226 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -280,6 +280,8 @@ bool startup_complete = false; static bool reload_queue_complete = false; // Tell everyone to die bool everyone_die = false; +// Set to true every time a store is created +static bool seqdata_reload_lost = false; /* These are included in cmd_homepage * to help identify when ckpool locks up (or dies) */ @@ -306,6 +308,10 @@ char *id_default = "42"; K_LIST *logqueue_free; K_STORE *logqueue_store; +// MSGLINE +K_LIST *msgline_free; +K_STORE *msgline_store; + // WORKQUEUE K_LIST *workqueue_free; K_STORE *workqueue_store; @@ -636,7 +642,8 @@ void setnow(tv_t *now) now->tv_usec = spec.tv_nsec / 1000; } -// Limits are all +/-1s since on the live machine all were well within that +/* Limits are all +/-1s since on the live machine all were well within that + * TODO: not thread safe */ static void check_createdate_ccl(char *cmd, tv_t *cd) { static tv_t last_cd; @@ -969,7 +976,7 @@ static void alloc_storage() seq, seqnam[ckdb_cmds[seq].seq], ckdb_cmds[seq].cmd_str); } - len = strlen(SEQRPE) + strlen(ckdb_cmds[seq].cmd_str) + 1; + len = strlen(SEQPRE) + strlen(ckdb_cmds[seq].cmd_str) + 1; seqnam[ckdb_cmds[seq].seq] = malloc(len); if (!(seqnam[ckdb_cmds[seq].seq])) quithere(1, "malloc (%d) OOM", (int)len); @@ -987,6 +994,10 @@ static void alloc_storage() seqtrans_free = k_new_list("SeqTrans", sizeof(SEQTRANS), ALLOC_SEQTRANS, LIMIT_SEQTRANS, true); + msgline_free = k_new_list("MsgLine", sizeof(MSGLINE), + ALLOC_MSGLINE, LIMIT_MSGLINE, true); + msgline_store = k_new_store(msgline_free); + workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); workqueue_store = k_new_store(workqueue_free); @@ -1139,15 +1150,17 @@ static void alloc_storage() #define SEQSETWARN(_set, _seqset, _msgtxt, _endtxt) do { \ char _t_buf[DATE_BUFSIZ]; \ btu64_to_buf(&((_seqset)->seqstt), _t_buf, sizeof(_t_buf)); \ - LOGWARNING("SEQ %s: %d/"SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ - "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 \ - " %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64 \ - "/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64 \ - "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \ + LOGWARNING("SEQ %s: %d/"SEQSTT":%"PRIu64"=%s "SEQPID":%"PRIu64 \ + " M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \ + "/R%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 \ + "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/R%"PRIu64 \ + "/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64 \ + "/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/R%"PRIu64 \ "/OK%"PRIu64"%s", \ - _msgtxt, _set, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ - (_seqset)->missing, (_seqset)->trans, (_seqset)->lost, \ - (_seqset)->stale, (_seqset)->high, (_seqset)->ok, \ + _msgtxt, _set, (_seqset)->seqstt, _t_buf, \ + (_seqset)->seqpid, (_seqset)->missing, (_seqset)->trans, \ + (_seqset)->lost, (_seqset)->stale, (_seqset)->high, \ + (_seqset)->recovered, (_seqset)->ok, \ seqnam[SEQ_ALL], \ (_seqset)->seqdata[SEQ_ALL].minseq, \ (_seqset)->seqdata[SEQ_ALL].maxseq, \ @@ -1156,6 +1169,7 @@ static void alloc_storage() (_seqset)->seqdata[SEQ_ALL].lost, \ (_seqset)->seqdata[SEQ_ALL].stale, \ (_seqset)->seqdata[SEQ_ALL].high, \ + (_seqset)->seqdata[SEQ_ALL].recovered, \ (_seqset)->seqdata[SEQ_ALL].ok, \ seqnam[SEQ_SHARES], \ (_seqset)->seqdata[SEQ_SHARES].minseq, \ @@ -1165,6 +1179,7 @@ static void alloc_storage() (_seqset)->seqdata[SEQ_SHARES].lost, \ (_seqset)->seqdata[SEQ_SHARES].stale, \ (_seqset)->seqdata[SEQ_SHARES].high, \ + (_seqset)->seqdata[SEQ_SHARES].recovered, \ (_seqset)->seqdata[SEQ_SHARES].ok, _endtxt); \ } while(0) @@ -1384,16 +1399,18 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); FREE_LISTS(workqueue); + FREE_LISTS(msgline); LOGWARNING("%s() seqset ...", __func__); sequence_report(false); - FREE_LIST(seqtrans); - FREE_STORE_DATA(seqset); FREE_LIST_DATA(seqset); FREE_LISTS(seqset); + // Must be after seqset + FREE_LIST(seqtrans); + for (seq = 0; seq < SEQ_MAX; seq++) FREENULL(seqnam[seq]); @@ -1498,6 +1515,7 @@ static bool setup_data() (_seqset)->seqdata[_i].lost = \ (_seqset)->seqdata[_i].stale = \ (_seqset)->seqdata[_i].high = \ + (_seqset)->seqdata[_i].recovered = \ (_seqset)->seqdata[_i].ok = 0; \ (_seqset)->seqdata[_i].SEQINUSE = \ (_seqset)->seqdata[_i].firstcd.tv_usec = \ @@ -1510,43 +1528,40 @@ static bool setup_data() } \ } while (0); -#define BASE_SIZ 64 -#define HIGH_MIN 4 - #define MISSFLAG cd.tv_sec #define TRANSFLAG cd.tv_usec // Test if an item is missing/transient -#define ITEMISMIS(_seqitem) ((_seqitem)->MISSFLAG == 0) +#define ENTRYISMIS(_seqentry) ((_seqentry)->MISSFLAG == 0) #define DATAISMIS(_seqdata, _u) \ - ITEMISMIS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + ENTRYISMIS(&((_seqdata)->entry[(_u) & ((_seqdata)->size - 1)])) -#define ITEMONLYMIS(_seqitem) (((_seqitem)->MISSFLAG == 0) && \ - ((_seqitem)->TRANSFLAG == 0)) +#define ENTRYONLYMIS(_seqentry) (((_seqentry)->MISSFLAG == 0) && \ + ((_seqentry)->TRANSFLAG == 0)) -#define ITEMISTRANS(_seqitem) (((_seqitem)->MISSFLAG == 0) && \ - ((_seqitem)->TRANSFLAG != 0)) +#define ENTRYISTRANS(_seqentry) (((_seqentry)->MISSFLAG == 0) && \ + ((_seqentry)->TRANSFLAG != 0)) #define DATAISTRANS(_seqdata, _u) \ - ITEMISTRANS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + ENTRYISTRANS(&((_seqdata)->entry[(_u) & ((_seqdata)->size - 1)])) // Flag an item as missing -#define ITEMSETMIS(_seqitem, _now) do { \ - (_seqitem)->MISSFLAG = 0; \ - (_seqitem)->TRANSFLAG = 0; \ - (_seqitem)->time.tv_sec = now->tv_sec; \ - (_seqitem)->time.tv_usec = now->tv_usec; \ +#define ENTRYSETMIS(_seqentry, _now) do { \ + (_seqentry)->MISSFLAG = 0; \ + (_seqentry)->TRANSFLAG = 0; \ + (_seqentry)->time.tv_sec = now->tv_sec; \ + (_seqentry)->time.tv_usec = now->tv_usec; \ } while(0) #define DATASETMIS(_seqdata, _u, _now) \ - ITEMSETMIS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)]), _now) + ENTRYSETMIS(&((_seqdata)->entry[(_u) & ((_seqdata)->size - 1)]), _now) /* Flag an item directly as transient missing - * it will already be missing but we set both flags */ -#define ITEMSETTRANS(_seqitem) do { \ - (_seqitem)->MISSFLAG = 0; \ - (_seqitem)->TRANSFLAG = 1; \ +#define ENTRYSETTRANS(_seqentry) do { \ + (_seqentry)->MISSFLAG = 0; \ + (_seqentry)->TRANSFLAG = 1; \ } while(0) #define DATASETTRANS(_seqdata, _u) \ - ITEMSETTRANS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + ENTRYSETTRANS(&((_seqdata)->entry[(_u) & ((_seqdata)->size - 1)])) // Check for transient missing every 2s #define TRANCHECKLIMIT 2.0 @@ -1557,13 +1572,13 @@ static tv_t last_trancheck; /* time (now) is used, not cd, since cd is only relevant to reloading * and we don't run trans_process() during reloading - * we also only know now, not cd, for a missing item - * This fills in store with a copy of the defails of all the new - * transients */ + * We also only know now, not cd, for a missing item + * This fills in store with a copy of the details of all the new transients + * N.B. this is called under seq_lock */ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) { SEQDATA *seqdata = NULL; - SEQITEM *seqitem = NULL; + SEQENTRY *seqentry = NULL; uint64_t zero, top, u; SEQTRANS *seqtrans; K_ITEM *st_item; @@ -1575,32 +1590,32 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) continue; /* run as 2 loops from seqbase to top, then bottom to maxseq - * thus we can use seqitem++ rather than calculating it + * thus we can use seqentry++ rather than calculating it * each time */ zero = seqdata->seqbase - (seqdata->seqbase & (seqdata->size - 1)); top = zero + seqdata->size - 1; if (top > seqdata->maxseq) top = seqdata->maxseq; u = seqdata->seqbase; - seqitem = &(seqdata->item[seqdata->seqbase & (seqdata->size - 1)]); + seqentry = &(seqdata->entry[seqdata->seqbase & (seqdata->size - 1)]); while (u <= top) { - if (ITEMONLYMIS(seqitem) && - tvdiff(now, &(seqitem->time)) > seqdata->timelimit) { - ITEMSETTRANS(seqitem); + if (ENTRYONLYMIS(seqentry) && + tvdiff(now, &(seqentry->time)) > seqdata->timelimit) { + ENTRYSETTRANS(seqentry); seqdata->trans++; seqset->trans++; - + // N.B. lock inside lock K_WLOCK(seqtrans_free); st_item = k_unlink_head(seqtrans_free); K_WUNLOCK(seqtrans_free); DATA_SEQTRANS(seqtrans, st_item); seqtrans->seq = seq; seqtrans->seqnum = u; - memcpy(&(seqtrans->item), seqitem, sizeof(SEQITEM)); + memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); k_add_head(store, st_item); } u++; - seqitem++; + seqentry++; } // 2nd loop isn't needed, we've already covered the full range @@ -1608,25 +1623,25 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) continue; u = zero + seqdata->size; - seqitem = &(seqdata->item[0]); + seqentry = &(seqdata->entry[0]); while (u <= seqdata->maxseq) { - if (ITEMONLYMIS(seqitem) && - tvdiff(now, &(seqitem->time)) > seqdata->timelimit) { - ITEMSETTRANS(seqitem); + if (ENTRYONLYMIS(seqentry) && + tvdiff(now, &(seqentry->time)) > seqdata->timelimit) { + ENTRYSETTRANS(seqentry); seqdata->trans++; seqset->trans++; - + // N.B. lock inside lock K_WLOCK(seqtrans_free); st_item = k_unlink_head(seqtrans_free); K_WUNLOCK(seqtrans_free); DATA_SEQTRANS(seqtrans, st_item); seqtrans->seq = seq; seqtrans->seqnum = u; - memcpy(&(seqtrans->item), seqitem, sizeof(SEQITEM)); + memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); k_add_head(store, st_item); } u++; - seqitem++; + seqentry++; } } } @@ -1678,48 +1693,83 @@ static void trans_seq(tv_t *now) while (st_item) { DATA_SEQTRANS(seqtrans, st_item); btu64_to_buf(&seqstt, t_buf, sizeof(t_buf)); - bt_to_buf(&(seqtrans->item.time.tv_sec), t_buf2, + bt_to_buf(&(seqtrans->entry.time.tv_sec), t_buf2, sizeof(t_buf2)); - LOGWARNING("SEQ trans %s %"PRIu64" set %d/%"PRIu64 + LOGWARNING("Seq trans %s %"PRIu64" set:%d/%"PRIu64 "=%s/%"PRIu64" %s/%s", seqnam[seqtrans->seq], seqtrans->seqnum, i, seqstt, t_buf, seqpid, - t_buf2, seqtrans->item.code); + t_buf2, seqtrans->entry.code); st_item = st_item->prev; } if (store->head) { K_WLOCK(seqtrans_free); k_list_transfer_to_head(store, seqtrans_free); + if (seqtrans_free->count == seqtrans_free->total && + seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS) + k_cull_list(seqtrans_free); K_WUNLOCK(seqtrans_free); } } store = k_free_store(store); } +/* Only called once just before flagging reload as finished + * Set all the reloadmax variables in all seqdata */ +static void seq_reloadmax() +{ + K_ITEM *seqset_item; + SEQSET *seqset; + SEQDATA *seqdata; + int i; + + ck_wlock(&seq_lock); + if (seqset_store->count > 0) { + seqset_item = seqset_store->head; + while (seqset_item) { + DATA_SEQSET(seqset, seqset_item); + if (seqset->seqstt) { + seqdata = seqset->seqdata; + for (i = 0; i < SEQ_MAX; i++) { + if (seqdata->SEQINUSE) + seqdata->reloadmax = seqdata->maxseq; + seqdata++; + } + } + seqset_item = seqset_item->next; + } + } + ck_wunlock(&seq_lock); +} + +/* Most of the extra message logic in here is to avoid putting too many + * messages or incorrect messages on the console when errors occur + * It wont lose msglines from the reload or the queue, since if there is any + * problem with any msgline, it will be processed rather than skipped + * Only valid duplicates, with all 3 sequence numbers (cmd, stt, pid) matching + * a previous msgline, are flagged DUP to be skipped by the sequence code */ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, uint64_t n_seqstt, uint64_t n_seqpid, char *nam, tv_t *now, tv_t *cd, char *code, - int seqitemflags, char *msg) + int seqentryflags, char *msg) { char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; - bool firstseq, newseq, expseq, gothigh, gotstale, gotstalestart; + bool firstseq, newseq, expseq, gothigh, okhi, gotstale, gotstalestart; SEQSET *seqset = NULL, *seqset0 = NULL, seqset_pre = { 0 }; SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; - bool dup, wastrans, doitem, dotime; + bool dup, wastrans, doitem, dotime, gotrecover; SEQDATA *seqdata; - SEQITEM *seqitem, seqitem_copy; - K_ITEM *seqset_item = NULL, *st_item = NULL; - SEQTRANS *seqtrans = NULL; + SEQENTRY *seqentry, seqentry_copy, *u_entry; + K_ITEM *seqset_item = NULL, *st_item = NULL, *stl_item = NULL; + SEQTRANS *seqtrans = NULL, *seqtrans2 = NULL; size_t siz, end; void *off0, *offn; uint64_t u; int set = -1, expset = -1, highlimit, i; - K_STORE *lost; + K_STORE *lost = NULL; - // We store the lost items in here - lost = k_new_store(seqtrans_free); - firstseq = newseq = expseq = gothigh = gotstale = gotstalestart = - dup = wastrans = false; + firstseq = newseq = expseq = gothigh = okhi = gotstale = + gotstalestart = dup = wastrans = gotrecover = false; ck_wlock(&seq_lock); // Get the seqset if (seqset_store->count == 0) @@ -1745,9 +1795,9 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, // Need to setup a new seqset newseq = true; if (!firstseq) { - /* The current seqset (may become the previous) - * If !seqset_store->head (i.e. a bug) this will quit() */ + // If !seqset_store->head (i.e. a bug) this will quit() DATA_SEQSET(seqset0, seqset_store->head); + // The current seqset (may become the previous) memcpy(&seqset_pre, seqset0, sizeof(seqset_pre)); } seqset_item = k_unlink_head_zero(seqset_free); @@ -1763,21 +1813,22 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, case SEQ_ALL: case SEQ_SHARES: seqset->seqdata[i].size = SEQ_LARGE_SIZ; - seqset->seqdata[i].timelimit = SEQ_LARGE_LIM; + seqset->seqdata[i].timelimit = SEQ_LARGE_TRANS_LIM; break; case SEQ_WORKERSTAT: case SEQ_AUTH: case SEQ_ADDRAUTH: seqset->seqdata[i].size = SEQ_MEDIUM_SIZ; - seqset->seqdata[i].timelimit = SEQ_MEDIUM_LIM; + seqset->seqdata[i].timelimit = SEQ_MEDIUM_TRANS_LIM; break; default: seqset->seqdata[i].size = SEQ_SMALL_SIZ; - seqset->seqdata[i].timelimit = SEQ_SMALL_LIM; + seqset->seqdata[i].timelimit = SEQ_SMALL_TRANS_LIM; break; } siz = seqset->seqdata[i].size; if (siz < BASE_SIZ || (siz & (siz-1))) { + // On the first ever seq record quithere(1, "seqdata[%d] size %d (0x%x) is %s %d", i, (int)siz, (int)siz, (siz < BASE_SIZ) ? @@ -1785,7 +1836,7 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, "not a power of", (siz < BASE_SIZ) ? BASE_SIZ : 2); } - highlimit = siz >> 2; // 1/4 + highlimit = siz >> HIGH_SHIFT; if (highlimit < HIGH_MIN) { // On the first ever seq record quithere(1, "seqdata[%d] highlimit %d (0x%x) " @@ -1793,10 +1844,10 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, i, highlimit, highlimit, HIGH_MIN); } seqset->seqdata[i].highlimit = highlimit; - seqset->seqdata[i].item = calloc(siz, sizeof(SEQITEM)); - end = siz * sizeof(SEQITEM); - off0 = &(seqset->seqdata[i].item[0]); - offn = &(seqset->seqdata[i].item[siz]); + seqset->seqdata[i].entry = calloc(siz, sizeof(SEQENTRY)); + end = siz * sizeof(SEQENTRY); + off0 = &(seqset->seqdata[i].entry[0]); + offn = &(seqset->seqdata[i].entry[siz]); if ((int)end != (offn - off0)) { // On the first ever seq record quithere(1, "memory size (%d) != structure " @@ -1806,7 +1857,25 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, LIST_MEM_ADD_SIZ(seqset_free, end); } } else { - // Expire the oldest set and overwrite it + /* Expire the oldest set and overwrite it + * If this happens during a reload and the expired set is in + * the queue, then this will most likely end up spitting out + * lots of errors since this will cause a chain reaction, + * after the reload completes, of expiring the last set, that + * is ahead in the unprocessed queue, each time the queue + * advances to the next set + * This can only happen if the pool restarts SEQ_MAX times + * during the reload ... so it's extremely unlikely but + * also will be obvious in the console log if it happens + * The (very unlikely) side effect would be that it could miss + * spotting lost sequence numbers: between the maxseq of a set + * in the reload, and the base of the same set in the queue + * The fix is simply to stop ckdb immediately if ckpool is + * constantly failing (SEQ_MAX times during the reload) and + * restart ckdb when ckpool isn't aborting, or if ckpool is + * in a problematic state, start ckdb while ckpool isn't + * running, then start ckpool when ckdb completes it's full + * startup */ K_ITEM *ss_item; SEQSET *ss = NULL; int s = 0; @@ -1837,11 +1906,11 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, expseq = true; RESETSET(seqset, n_seqstt, n_seqpid); } - /* Since the pool queue is active during the reload, sets can be out - * of order, so each new one should be added depending upon the value - * of seqstt so the current pool is first, to minimise searching - * seqset_store, but the order of the rest isn't as important - * N.B. a new set is only created once per pool start */ + /* To avoid putting an old set as first in the list, if they are out + * of order, each new set is added depending upon the value of seqstt + * so the current pool is first, to minimise searching seqset_store + * The order of the rest isn't as important + * N.B. a new set is only created once per ckpool start */ if (firstseq) { k_add_head(seqset_store, seqset_item); set = 0; @@ -1857,14 +1926,13 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, seqset_store->head); set = 1; } - } gotseqset: doitem = dotime = false; seqdata = &(seqset->seqdata[seq]); - seqitem = &(seqdata->item[n_seqcmd & (seqdata->size - 1)]); + seqentry = &(seqdata->entry[n_seqcmd & (seqdata->size - 1)]); if (seqdata->SEQINUSE == 0) { // First n_seqcmd for the given seq copy_tv(&(seqdata->firsttime), now); @@ -1875,21 +1943,39 @@ gotseqset: seqdata->seqbase = n_seqcmd; doitem = true; goto setitemdata; + // TODO: add a check/message if minseq!=0 and + // it's a 2nd or later set } if (n_seqcmd > seqdata->maxseq) { - /* New seq above maxseq - * N.B. it must be at most highlimit above maxseq - * This is somewhat arbitrary but some limit is necessary to - * ensure a high bad value doesn't cause many lower lost/stale - * messages, but instead just one high message and possibly - * later trans/lost messages */ + /* New seq above maxseq */ if ((n_seqcmd - seqdata->maxseq) > seqdata->highlimit) { + /* During a reload there may be missing data/gaps if + * there was some problem with the reload data + * When we switch from the reload data to the queue + * data, it is also flagged ok since it may also be + * due to lost data at the end or missing reload files + * In both these cases the message will be 'OKHI' + * instead of 'HIGH' + * If however this is caused by a corrupt seq number + * then the result will be a large range of lost + * msglines followed by continuous stale msglines + * until it catches up to the corrupt seq number + * Currently the only fix to get rid of the console + * messages is to fix/remove the corrupt high + * msgline from the reload data and restart ckdb + * You would have to stop ckdb first if it is in the + * current hour reload file + * Note of course if it came from the queue it will + * be in the reload data when you restart ckdb */ + if ((seqentryflags & SE_RELOAD) || + (seqdata->maxseq == seqdata->reloadmax)) + okhi = true; + seqdata->high++; seqset->high++; memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); gothigh = true; - goto setitemdata; } for (u = seqdata->maxseq + 1; u <= n_seqcmd; u++) { if ((u - seqdata->seqbase) < seqdata->size) { @@ -1902,20 +1988,26 @@ gotseqset: } } else { // u is used by (u-size) - if (DATAISMIS(seqdata, u)) { - // (u-size) was missing + u_entry = &(seqdata->entry[u & (seqdata->size - 1)]); + if (ENTRYISMIS(u_entry)) { + /* (u-size) was missing + * N.B. lock inside lock */ K_WLOCK(seqtrans_free); st_item = k_unlink_head(seqtrans_free); + if (u_entry->flags & SE_RELOAD) + stl_item = k_unlink_head(seqtrans_free); K_WUNLOCK(seqtrans_free); DATA_SEQTRANS(seqtrans, st_item); seqtrans->seqnum = u - seqdata->size; - memcpy(&(seqtrans->item), - &(seqdata->item[u & (seqdata->size - 1)]), - sizeof(SEQITEM)); - k_add_head(lost, st_item); + memcpy(&(seqtrans->entry), u_entry, + sizeof(SEQENTRY)); + if (!lost) + lost = k_new_store(seqtrans_free); + k_add_tail(lost, st_item); seqdata->lost++; seqset->lost++; - if (DATAISTRANS(seqdata, u)) { + if (ENTRYISTRANS(u_entry)) { + // no longer trans seqdata->trans--; seqset->trans--; } @@ -1925,7 +2017,23 @@ gotseqset: seqset->missing--; } else { // new will also be missing - DATASETMIS(seqdata, u, now); + ENTRYSETMIS(u_entry, now); + } + + /* Store stale reload entries so we can + * check against any stale queue + * entries and flag them as recovered + * or DUP */ + if (u_entry->flags & SE_RELOAD) { + DATA_SEQTRANS(seqtrans, stl_item); + seqtrans->seqnum = u - seqdata->size; + memcpy(&(seqtrans->entry), u_entry, + sizeof(SEQENTRY)); + if (!seqdata->reload_lost) { + seqdata->reload_lost = k_new_store(seqtrans_free); + seqdata_reload_lost = true; + } + k_add_tail(seqdata->reload_lost, stl_item); } } else { // (u-size) wasn't missing @@ -1944,16 +2052,16 @@ gotseqset: doitem = true; dotime = true; } else if (n_seqcmd >= seqdata->seqbase) { - /* It's within the range thus dup or missing */ - if (!ITEMISMIS(seqitem)) { + /* It's within the range thus DUP or missing */ + if (!ENTRYISMIS(seqentry)) { dup = true; memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); - memcpy(&seqitem_copy, seqitem, sizeof(seqitem_copy)); + memcpy(&seqentry_copy, seqentry, sizeof(seqentry_copy)); } else { // Found a missing one seqdata->missing--; seqset->missing--; - if (ITEMISTRANS(seqitem)) { + if (ENTRYISTRANS(seqentry)) { seqdata->trans--; seqset->trans--; wastrans = true; @@ -1986,7 +2094,60 @@ gotseqset: gotstalestart = true; } } else { - // >=minseq but =minseq but reload_lost) { + st_item = seqdata->reload_lost->head; + // seqnum order is not guaranteed + while (st_item) { + DATA_SEQTRANS(seqtrans, st_item); + if (seqtrans->seqnum == n_seqcmd) + break; + st_item = st_item->next; + } + } + if (st_item) { + // recovered a lost entry + k_unlink_item(seqtrans_free, st_item); + // N.B. lock inside lock + K_WLOCK(seqtrans_free); + k_add_head(seqtrans_free, st_item); + K_WUNLOCK(seqtrans_free); + seqdata->lost--; + seqset->lost--; + seqdata->recovered++; + seqset->recovered++; + seqdata->ok++; + seqset->ok++; + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + gotrecover = true; + goto setitemdata; + } + + /* Reload will still have all the lost entries + * thus since it's not lost/recovered, it must be a DUP */ + if (seqentryflags & SE_RELOAD) { + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + dup = true; + goto setitemdata; + } + + /* Before we've synced up the queue and still not discarded + * the reload lost list, if it's not lost/recovered but + * still in the reload range, it must be a DUP */ + if (seqentryflags & SE_EARLYSOCK && n_seqcmd <= seqdata->reloadmax) { + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + dup = true; + goto setitemdata; + } + + // Reload lost list is gone or it's after the reload range seqdata->stale++; seqset->stale++; memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); @@ -1996,10 +2157,10 @@ gotseqset: setitemdata: // Store the new seq if flagged to do so if (doitem) { - seqitem->flags = seqitemflags; - copy_tv(&(seqitem->time), now); - copy_tv(&(seqitem->cd), cd); - STRNCPY(seqitem->code, code); + seqentry->flags = seqentryflags; + copy_tv(&(seqentry->time), now); + copy_tv(&(seqentry->cd), cd); + STRNCPY(seqentry->code, code); seqdata->ok++; seqset->ok++; } @@ -2019,8 +2180,8 @@ setitemdata: if (firstseq) { // The first ever SEQ_ALL btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); - LOGWARNING("Seq first init: %s %"PRIu64" " - SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, + LOGWARNING("Seq first init: %s:%"PRIu64" " + SEQSTT":%"PRIu64"=%s "SEQPID":%"PRIu64, nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); } else { if (newseq) { @@ -2033,8 +2194,8 @@ setitemdata: SEQSETWARN(expset, &seqset_exp, "discarded old", " for:"); if (newseq || expseq) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); - LOGWARNING("Seq created new: set %d %s %"PRIu64" " - SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, + LOGWARNING("Seq created new: set:%d %s:%"PRIu64" " + SEQSTT":%"PRIu64"=%s "SEQPID":%"PRIu64, set, nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); } @@ -2042,19 +2203,19 @@ setitemdata: if (dup) { int level = LOG_WARNING; - /* If one is SI_RELOAD and the other is SI_EARLYSOCK then it's + /* If one is SE_RELOAD and the other is SE_EARLYSOCK then it's * not unexpected so only LOG_DEBUG */ - if (((seqitem_copy.flags | seqitemflags) & SI_RELOAD) && - ((seqitem_copy.flags | seqitemflags) & SI_EARLYSOCK)) + if (((seqentry_copy.flags | seqentryflags) & SE_RELOAD) && + ((seqentry_copy.flags | seqentryflags) & SE_EARLYSOCK)) level = LOG_DEBUG; btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set %d/%"PRIu64 + LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set:%d/%"PRIu64 "=%s/%"PRIu64" %s/%s v%"PRIu64"/^%"PRIu64 "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64 "/H%"PRIu64"/OK%"PRIu64" cmd=%.42s...", (level == LOG_DEBUG) ? "*" : EMPTY, - SICHR(seqitemflags), SICHR(seqitem_copy.flags), + SECHR(seqentryflags), SECHR(seqentry_copy.flags), nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code, seqset_copy.seqdata[seq].minseq, @@ -2069,11 +2230,12 @@ setitemdata: FREENULL(st); } - if (wastrans) { + if (wastrans || gotrecover) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGWARNING("SEQ found trans %s %"PRIu64" set %d/%"PRIu64 - "=%s/%"PRIu64" %s/%s", + LOGWARNING("%s %s %"PRIu64" set:%d/%"PRIu64"=%s/%"PRIu64 + " %s/%s", + gotrecover ? "SEQ recovered" : "Seq found trans", nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code); } @@ -2081,11 +2243,11 @@ setitemdata: if (gotstale || gotstalestart || gothigh) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGWARNING("SEQ %s %s%s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 + LOGWARNING("SEQ %s %s%s %"PRIu64" set:%d/%"PRIu64"=%s/%"PRIu64 " %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64 "/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 " cmd=%.42s...", - gothigh ? "high" : "stale", + gothigh ? (okhi ? "OKHI" : "HIGH") : "stale", gotstalestart ? "STARTUP " : EMPTY, nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code, @@ -2111,73 +2273,144 @@ setitemdata: } } - if (lost->head) { + if (lost && lost->head) { + int tran = 0, miss = 0; + uint64_t prev = 0; + char range_buf[256]; + bool isrange = false; st_item = lost->head; while (st_item) { DATA_SEQTRANS(seqtrans, st_item); + st_item = st_item->next; + DATA_SEQTRANS_NULL(seqtrans2, st_item); + + if (ENTRYISTRANS(&(seqtrans->entry))) + tran++; + else + miss++; + + // Output the messages as ranges if they are + if (st_item && seqtrans2->seqnum == (seqtrans->seqnum+1)) { + if (!prev) + prev = seqtrans->seqnum; + continue; + } + if (prev) { + isrange = true; + snprintf(range_buf, sizeof(range_buf), + "RANGE %d tran %d miss %"PRIu64 + "-%"PRIu64, + tran, miss, prev, + seqtrans->seqnum); + prev = 0; + } else { + isrange = false; + snprintf(range_buf, sizeof(range_buf), + "%s %"PRIu64, + ENTRYISTRANS(&(seqtrans->entry)) ? + "tran" : "miss", + seqtrans->seqnum); + } + // consumed + tran = miss = 0; + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); - bt_to_buf(&(seqtrans->item.time.tv_sec), t_buf2, + bt_to_buf(&(seqtrans->entry.time.tv_sec), t_buf2, sizeof(t_buf2)); - LOGWARNING("SEQ lost %s %s %"PRIu64" set %d/%"PRIu64 - "=%s/%"PRIu64" %s/%s", - ITEMISTRANS(&(seqtrans->item)) ? - "trans" : "missing", - seqnam[seq], seqtrans->seqnum, set, - n_seqstt, t_buf, n_seqpid, t_buf2, - seqtrans->item.code); - st_item = st_item->next; + LOGWARNING("SEQ lost %s %s set:%d/%"PRIu64"=%s/%"PRIu64 + " %s%s/%s", + seqnam[seq], range_buf, set, + n_seqstt, t_buf, n_seqpid, + isrange ? "last: " : EMPTY, + t_buf2, seqtrans->entry.code); } K_WLOCK(seqtrans_free); k_list_transfer_to_head(lost, seqtrans_free); + if (seqtrans_free->count == seqtrans_free->total && + seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS) + k_cull_list(seqtrans_free); K_WUNLOCK(seqtrans_free); } - lost = k_free_store(lost); + + if (lost) + lost = k_free_store(lost); return dup; } -static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, - tv_t *now, char *msg, K_TREE *trf_root, - bool wantauth, int seqitemflags) +static enum cmd_values process_seq(MSGLINE *msgline) +{ + bool dupall, dupcmd; + char *st = NULL; + + dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, + msgline->n_seqpid, SEQALL, &(msgline->now), + &(msgline->cd), msgline->code, + msgline->seqentryflags, msgline->msg); + dupcmd = update_seq(ckdb_cmds[msgline->which_cmds].seq, + msgline->n_seqcmd, msgline->n_seqstt, + msgline->n_seqpid, msgline->seqcmdnam, + &(msgline->now), &(msgline->cd), msgline->code, + msgline->seqentryflags, msgline->msg); + + if (dupall != dupcmd) { + // Bad/corrupt data or a code bug + LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " + "cmd=%.32s...", + seqnam[SEQ_ALL], msgline->n_seqall, + dupall ? "DUP" : "notdup", + msgline->seqcmdnam, msgline->n_seqcmd, + dupcmd ? "DUP" : "notdup", + st = safe_text_nonull(msgline->msg)); + FREENULL(st); + } + + /* The norm is: neither is a dup, so reply with ok to process + * If only one is a dup then that's a corrupt message or a bug + * so simply try to process it as normal */ + if (!dupall || !dupcmd) + return ckdb_cmds[msgline->which_cmds].cmd_val; + + /* It's a dup */ + return CMD_DUPSEQ; +} + +static void setup_seq(K_ITEM *seqall, MSGLINE *msgline) { - uint64_t n_seqall, n_seqstt, n_seqpid, n_seqcmd; K_ITEM *seqstt, *seqpid, *seqcmd, *i_code; char *err = NULL, *st = NULL; size_t len, off; - bool dupall, dupcmd; - char *code = NULL; - char *seqcmdnam; - n_seqstt = n_seqpid = n_seqcmd = 0; - n_seqall = atol(transfer_data(seqall)); - if ((seqstt = find_transfer(trf_root, SEQSTT))) - n_seqstt = atol(transfer_data(seqstt)); + msgline->n_seqall = atol(transfer_data(seqall)); + if ((seqstt = find_transfer(msgline->trf_root, SEQSTT))) + msgline->n_seqstt = atol(transfer_data(seqstt)); /* Ignore SEQSTTIGN sequence information * This allows us to manually generate ckpool data and send it * to ckdb using ckpmsg - if SEQSTT == SEQSTTIGN * SEQALL must exist but the value is ignored * SEQPID and SEQcmd don't need to exist and are ignored */ - if (n_seqstt == SEQSTTIGN) { + if (msgline->n_seqstt == SEQSTTIGN) { LOGWARNING("%s(): SEQIGN in %.42s...", - __func__, st = safe_text(msg)); + __func__, st = safe_text(msgline->msg)); FREENULL(st); - return ckdb_cmds[which].cmd_val; + return; } - if ((seqpid = find_transfer(trf_root, SEQPID))) - n_seqpid = atol(transfer_data(seqpid)); - seqcmdnam = seqnam[ckdb_cmds[which].seq]; - if ((seqcmd = find_transfer(trf_root, seqcmdnam))) - n_seqcmd = atol(transfer_data(seqcmd)); + if ((seqpid = find_transfer(msgline->trf_root, SEQPID))) + msgline->n_seqpid = atol(transfer_data(seqpid)); + msgline->seqcmdnam = seqnam[ckdb_cmds[msgline->which_cmds].seq]; + if ((seqcmd = find_transfer(msgline->trf_root, msgline->seqcmdnam))) + msgline->n_seqcmd = atol(transfer_data(seqcmd)); /* Make one message with the initial seq missing/value problems ... * that should never happen if seqall is present */ if (!seqstt || !seqpid || !seqcmd || - (seqstt && n_seqstt < DATE_BEGIN) || (seqpid && n_seqpid <= 0)) { + (seqstt && msgline->n_seqstt < DATE_BEGIN) || + (seqpid && msgline->n_seqpid <= 0)) { APPEND_REALLOC_INIT(err, off, len); - APPEND_REALLOC(err, off, len, "ERROR: command "); - APPEND_REALLOC(err, off, len, ckdb_cmds[which].cmd_str); + APPEND_REALLOC(err, off, len, "SEQ ERROR: command "); + APPEND_REALLOC(err, off, len, msgline->cmd); if (!seqstt || !seqpid || !seqcmd) { APPEND_REALLOC(err, off, len, " - missing"); if (!seqstt) @@ -2186,98 +2419,70 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, APPEND_REALLOC(err, off, len, BLANK SEQPID); if (!seqcmd) { APPEND_REALLOC(err, off, len, BLANK); - APPEND_REALLOC(err, off, len, seqcmdnam); + APPEND_REALLOC(err, off, len, + msgline->seqcmdnam); } } - if (seqstt && n_seqstt < DATE_BEGIN) { - APPEND_REALLOC(err, off, len, " - invalid " SEQSTT " '"); + if (seqstt && msgline->n_seqstt < DATE_BEGIN) { + APPEND_REALLOC(err, off, len, " - invalid "SEQSTT":'"); st = safe_text_nonull(transfer_data(seqstt)); APPEND_REALLOC(err, off, len, st); FREENULL(st); APPEND_REALLOC(err, off, len, "'"); } - if (seqpid && n_seqpid <= 0) { - APPEND_REALLOC(err, off, len, " - invalid " SEQPID " '"); + if (seqpid && msgline->n_seqpid <= 0) { + APPEND_REALLOC(err, off, len, " - invalid "SEQPID":'"); st = safe_text_nonull(transfer_data(seqpid)); APPEND_REALLOC(err, off, len, st); FREENULL(st); APPEND_REALLOC(err, off, len, "'"); } APPEND_REALLOC(err, off, len, " - msg='"); - APPEND_REALLOC(err, off, len, msg); + APPEND_REALLOC(err, off, len, msgline->msg); APPEND_REALLOC(err, off, len, "'"); LOGMSGBUF(LOG_EMERG, err); FREENULL(err); - /* Just process it normally - - * this will of course produce missing seq messages later - * if msg was supposed to have proper seq numbers */ - return ckdb_cmds[which].cmd_val; + return; } - if ((i_code = find_transfer(trf_root, CODETRF))) { - code = transfer_data(i_code); - if (!(*code)) - code = NULL; + msgline->hasseq = true; + + if ((i_code = find_transfer(msgline->trf_root, CODETRF))) { + msgline->code = transfer_data(i_code); + if (!(*(msgline->code))) + msgline->code = NULL; } - if (!code) { - if ((i_code = find_transfer(trf_root, BYTRF))) - code = transfer_data(i_code); + if (!(msgline->code)) { + if ((i_code = find_transfer(msgline->trf_root, BYTRF))) + msgline->code = transfer_data(i_code); else - code = EMPTY; - } - - dupall = update_seq(SEQ_ALL, n_seqall, n_seqstt, n_seqpid, SEQALL, - now, cd, code, seqitemflags, msg); - dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, - seqcmdnam, now, cd, code, seqitemflags, msg); - - if (dupall != dupcmd) { - // Bad/corrupt data or a code bug - LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " - "cmd=%.32s...", - seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", - seqnam[ckdb_cmds[which].seq], n_seqcmd, - dupcmd ? "DUP" : "notdup", - st = safe_text_nonull(msg)); - FREENULL(st); + msgline->code = EMPTY; } - - /* The norm is: neither is a dup, so reply with ok to process - * If only one is a dup then that's a corrupt message or a bug - * so simply try to process it as normal */ - if (!dupall || !dupcmd) - return ckdb_cmds[which].cmd_val; - - /* Reprocess AUTH messages anyway, since they shouldn't cause any errors - * but the reply may be needed the 2nd time */ - if (wantauth && (ckdb_cmds[which].cmd_val == CMD_AUTH || - ckdb_cmds[which].cmd_val == CMD_ADDRAUTH)) - return ckdb_cmds[which].cmd_val; - - /* It's a dup so ignore it */ - return CMD_DUPSEQ; } -static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, - char *buf, int *which_cmds, char *cmd, - char *id, tv_t *now, tv_t *cd, bool wantauth, - int seqitemflags) +static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, + int seqentryflags) { char reply[1024] = ""; TRANSFER *transfer; K_TREE_CTX ctx[1]; - K_ITEM *item = NULL, *seqall; + MSGLINE *msgline; + K_ITEM *t_item = NULL, *cd_item = NULL, *seqall; char *cmdptr, *idptr, *next, *eq, *end, *was; char *data = NULL, *st = NULL, *st2 = NULL; bool noid = false; size_t siz; - *trf_root = NULL; - *trf_store = NULL; - *which_cmds = CMD_UNSET; - *cmd = *id = '\0'; - copy_tv(cd, now); // default cd to 'now' + K_WLOCK(msgline_free); + *ml_item = k_unlink_head_zero(msgline_free); + K_WUNLOCK(msgline_free); + DATA_MSGLINE(msgline, *ml_item); + msgline->which_cmds = CMD_UNSET; + copy_tv(&(msgline->now), now); + copy_tv(&(msgline->cd), now); // default cd to 'now' + msgline->msg = strdup(buf); + msgline->seqentryflags = seqentryflags; cmdptr = strdup(buf); idptr = strchr(cmdptr, '.'); @@ -2288,40 +2493,39 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, data = strchr(idptr, '.'); if (data) *(data++) = '\0'; - STRNCPYSIZ(id, idptr, ID_SIZ); + STRNCPY(msgline->id, idptr); } - STRNCPYSIZ(cmd, cmdptr, CMD_SIZ); - for (*which_cmds = 0; ckdb_cmds[*which_cmds].cmd_val != CMD_END; (*which_cmds)++) { - if (strcasecmp(cmd, ckdb_cmds[*which_cmds].cmd_str) == 0) + STRNCPY(msgline->cmd, cmdptr); + for (msgline->which_cmds = 0; + ckdb_cmds[msgline->which_cmds].cmd_val != CMD_END; + (msgline->which_cmds)++) { + if (strcasecmp(msgline->cmd, + ckdb_cmds[msgline->which_cmds].cmd_str) == 0) break; } - if (ckdb_cmds[*which_cmds].cmd_val == CMD_END) { - LOGERR("Listener received unknown command: '%s'", + if (ckdb_cmds[msgline->which_cmds].cmd_val == CMD_END) { + LOGERR("Listener received unknown command: '%.42s...", st2 = safe_text(buf)); FREENULL(st2); - free(cmdptr); - return CMD_REPLY; + goto nogood; } if (noid) { - if (ckdb_cmds[*which_cmds].noid) { - *id = '\0'; + if (ckdb_cmds[msgline->which_cmds].noid) { free(cmdptr); - return ckdb_cmds[*which_cmds].cmd_val; + return ckdb_cmds[msgline->which_cmds].cmd_val; } - STRNCPYSIZ(id, cmdptr, ID_SIZ); - LOGERR("Listener received invalid (noid) message: '%s'", + LOGERR("Listener received invalid (noid) message: '%.42s...", st2 = safe_text(buf)); FREENULL(st2); - free(cmdptr); - return CMD_REPLY; + goto nogood; } - *trf_root = new_ktree(); - *trf_store = k_new_store(transfer_free); + msgline->trf_root = new_ktree(); + msgline->trf_store = k_new_store(transfer_free); next = data; if (next && strncmp(next, JSON_TRANSFER, JSON_TRANSFER_LEN) == 0) { // It's json @@ -2335,8 +2539,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - return CMD_REPLY; + goto nogood; } next++; // while we have a new quoted name @@ -2355,8 +2558,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - return CMD_REPLY; + goto nogood; } if (next == end) { LOGERR("JSON zero length name was:%.32s..." @@ -2365,14 +2567,13 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - return CMD_REPLY; + goto nogood; } *(end++) = '\0'; K_WLOCK(transfer_free); - item = k_unlink_head(transfer_free); + t_item = k_unlink_head(transfer_free); K_WUNLOCK(transfer_free); - DATA_TRANSFER(transfer, item); + DATA_TRANSFER(transfer, t_item); STRNCPY(transfer->name, next); was = next = end; while (*next == ' ') @@ -2386,11 +2587,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } was = ++next; while (*next == ' ') @@ -2410,11 +2607,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } siz = end - next; end++; @@ -2426,11 +2619,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, transfer->name, st2 = safe_text(buf)); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } end = ++next; /* No structure testing for now since we @@ -2447,11 +2636,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } siz = end - next; end++; @@ -2470,11 +2655,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } if (next == end) { LOGERR("JSON '%s' zero length value " @@ -2484,11 +2665,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - return CMD_REPLY; + goto nogood; } siz = end - next; } @@ -2499,9 +2676,10 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, STRNCPYSIZ(transfer->svalue, next, siz+1); transfer->mvalue = transfer->svalue; } - *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); - k_add_head(*trf_store, item); - item = NULL; + msgline->trf_root = add_to_ktree(msgline->trf_root, + t_item, cmp_transfer); + k_add_head(msgline->trf_store, t_item); + t_item = NULL; // find the separator then move to the next name next = end; @@ -2519,13 +2697,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, st2 = safe_text(buf)); FREENULL(st); FREENULL(st2); - free(cmdptr); - if (item) { - K_WLOCK(transfer_free); - k_add_head(transfer_free, item); - K_WUNLOCK(transfer_free); - } - return CMD_REPLY; + goto nogood; } } else { K_WLOCK(transfer_free); @@ -2541,54 +2713,55 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, else *(eq++) = '\0'; - item = k_unlink_head(transfer_free); - DATA_TRANSFER(transfer, item); + t_item = k_unlink_head(transfer_free); + DATA_TRANSFER(transfer, t_item); STRNCPY(transfer->name, data); STRNCPY(transfer->svalue, eq); transfer->mvalue = transfer->svalue; - if (find_in_ktree(*trf_root, item, cmp_transfer, ctx)) { + // Discard duplicates + if (find_in_ktree(msgline->trf_root, t_item, + cmp_transfer, ctx)) { if (transfer->mvalue != transfer->svalue) FREENULL(transfer->mvalue); - k_add_head(transfer_free, item); + k_add_head(transfer_free, t_item); } else { - *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); - k_add_head(*trf_store, item); + msgline->trf_root = add_to_ktree(msgline->trf_root, + t_item, + cmp_transfer); + k_add_head(msgline->trf_store, t_item); } } K_WUNLOCK(transfer_free); } - seqall = find_transfer(*trf_root, SEQALL); - if (ckdb_cmds[*which_cmds].createdate) { - item = require_name(*trf_root, CDTRF, 10, NULL, reply, sizeof(reply)); - if (!item) { - free(cmdptr); - return CMD_REPLY; - } - - DATA_TRANSFER(transfer, item); - txt_to_ctv(CDTRF, transfer->mvalue, cd, sizeof(*cd)); - if (cd->tv_sec == 0) { + seqall = find_transfer(msgline->trf_root, SEQALL); + if (ckdb_cmds[msgline->which_cmds].createdate) { + cd_item = require_name(msgline->trf_root, CDTRF, 10, NULL, + reply, sizeof(reply)); + if (!cd_item) + goto nogood; + + DATA_TRANSFER(transfer, cd_item); + txt_to_ctv(CDTRF, transfer->mvalue, &(msgline->cd), + sizeof(msgline->cd)); + if (msgline->cd.tv_sec == 0) { LOGERR("%s(): failed, %s has invalid "CDTRF" '%s'", __func__, cmdptr, transfer->mvalue); - free(cmdptr); - return CMD_REPLY; + goto nogood; } if (confirm_check_createdate) - check_createdate_ccl(cmd, cd); + check_createdate_ccl(msgline->cmd, &(msgline->cd)); if (seqall) { - enum cmd_values ret; - ret = process_seq(seqall, *which_cmds, cd, now, buf, - *trf_root, wantauth, seqitemflags); + setup_seq(seqall, msgline); free(cmdptr); - return ret; + return ckdb_cmds[msgline->which_cmds].cmd_val; } else { /* It's OK to load old data from the CCLs * but socket data must contain SEQALL ... * even in manually generated data */ if (startup_complete) { - LOGEMERG("%s(): *** ckpool needs upgrading - " + LOGEMERG("%s(): *** ckpool needs upgrading? - " "missing "SEQALL" from '%s' ckpool " "data in '%s'", __func__, cmdptr, @@ -2597,7 +2770,8 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } } } else { - // Bug somewhere or createdate flag missing + /* Bug somewhere or createdate flag missing + * This ignores/discards all seq info */ if (seqall) { LOGWARNING("%s(): msg '%s' shouldn't contain "SEQALL " in '%s'", @@ -2607,7 +2781,15 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } } free(cmdptr); - return ckdb_cmds[*which_cmds].cmd_val; + return ckdb_cmds[msgline->which_cmds].cmd_val; +nogood: + if (t_item) { + K_WLOCK(transfer_free); + k_add_head(transfer_free, t_item); + K_WUNLOCK(transfer_free); + } + free(cmdptr); + return CMD_REPLY; } static void check_blocks() @@ -3507,7 +3689,6 @@ static void *socketer(__maybe_unused void *arg) proc_instance_t *pi = (proc_instance_t *)arg; unixsock_t *us = &pi->us; char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; - char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; char *last_auth = NULL, *reply_auth = NULL; char *last_addrauth = NULL, *reply_addrauth = NULL; char *last_chkpass = NULL, *reply_chkpass = NULL; @@ -3522,15 +3703,14 @@ static void *socketer(__maybe_unused void *arg) char *last_web = NULL, *reply_web = NULL; char *reply_last, duptype[CMD_SIZ+1]; enum cmd_values cmdnum; - int sockd, which_cmds; + int sockd; + K_ITEM *wq_item = NULL, *ml_item = NULL; WORKQUEUE *workqueue; - TRANSFER *transfer; - K_STORE *trf_store; - K_TREE *trf_root; - K_ITEM *item; + MSGLINE *msgline; + char reply[1024+1]; size_t siz; - tv_t now, cd; - bool dup, want_first, show_dup; + tv_t now; + bool dup, want_first, show_dup, replied; int loglevel, oldloglevel; pthread_detach(pthread_self()); @@ -3553,8 +3733,6 @@ static void *socketer(__maybe_unused void *arg) } cmdnum = CMD_UNSET; - trf_root = NULL; - trf_store = NULL; buf = recv_unix_msg(sockd); // Once we've read the message @@ -3644,94 +3822,107 @@ static void *socketer(__maybe_unused void *arg) else LOGDEBUG("Duplicate '%s' message received", duptype); } else { - int seqitemflags = SI_SOCKET; + int seqentryflags = SE_SOCKET; if (!reload_queue_complete) - seqitemflags = SI_EARLYSOCK; + seqentryflags = SE_EARLYSOCK; LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, - &which_cmds, cmd, id, &now, - &cd, true, seqitemflags); + cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); + DATA_MSGLINE(msgline, ml_item); + replied = false; switch (cmdnum) { - case CMD_DUPSEQ: - snprintf(reply, sizeof(reply), "%s.%ld.dup.", id, now.tv_sec); - send_unix_msg(sockd, reply); - break; case CMD_REPLY: - snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); + snprintf(reply, sizeof(reply), + "%s.%ld.?.", + msgline->id, + now.tv_sec); send_unix_msg(sockd, reply); break; case CMD_TERMINATE: - LOGWARNING("Listener received terminate message, terminating ckdb"); - snprintf(reply, sizeof(reply), "%s.%ld.ok.exiting", id, now.tv_sec); + LOGWARNING("Listener received" + " terminate message," + " terminating ckdb"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.exiting", + msgline->id, + now.tv_sec); send_unix_msg(sockd, reply); everyone_die = true; break; case CMD_PING: - LOGDEBUG("Listener received ping request"); - snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); + LOGDEBUG("Listener received ping" + " request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.pong", + msgline->id, + now.tv_sec); send_unix_msg(sockd, reply); break; case CMD_VERSION: - LOGDEBUG("Listener received version request"); + LOGDEBUG("Listener received" + " version request"); snprintf(reply, sizeof(reply), "%s.%ld.ok.CKDB V%s", - id, now.tv_sec, CKDB_VERSION); + msgline->id, + now.tv_sec, + CKDB_VERSION); send_unix_msg(sockd, reply); break; case CMD_LOGLEVEL: - if (!*id) { - LOGDEBUG("Listener received loglevel, currently %d", + if (!*(msgline->id)) { + LOGDEBUG("Listener received" + " loglevel, currently %d", pi->ckp->loglevel); snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel currently %d", - id, now.tv_sec, + "%s.%ld.ok.loglevel" + " currently %d", + msgline->id, + now.tv_sec, pi->ckp->loglevel); } else { oldloglevel = pi->ckp->loglevel; - loglevel = atoi(id); - LOGDEBUG("Listener received loglevel %d currently %d A", + loglevel = atoi(msgline->id); + LOGDEBUG("Listener received loglevel" + " %d currently %d A", loglevel, oldloglevel); - if (loglevel < LOG_EMERG || loglevel > LOG_DEBUG) { + if (loglevel < LOG_EMERG || + loglevel > LOG_DEBUG) { snprintf(reply, sizeof(reply), - "%s.%ld.ERR.invalid loglevel %d" + "%s.%ld.ERR.invalid" + " loglevel %d" " - currently %d", - id, now.tv_sec, - loglevel, oldloglevel); + msgline->id, + now.tv_sec, + loglevel, + oldloglevel); } else { pi->ckp->loglevel = loglevel; snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel now %d - was %d", - id, now.tv_sec, - pi->ckp->loglevel, oldloglevel); + "%s.%ld.ok.loglevel" + " now %d - was %d", + msgline->id, + now.tv_sec, + pi->ckp->loglevel, + oldloglevel); } // Do this twice since the loglevel may have changed - LOGDEBUG("Listener received loglevel %d currently %d B", + LOGDEBUG("Listener received loglevel" + " %d currently %d B", loglevel, oldloglevel); } send_unix_msg(sockd, reply); break; case CMD_FLUSH: - LOGDEBUG("Listener received flush request"); + LOGDEBUG("Listener received" + " flush request"); snprintf(reply, sizeof(reply), "%s.%ld.ok.splash", - id, now.tv_sec); + msgline->id, now.tv_sec); send_unix_msg(sockd, reply); fflush(stdout); fflush(stderr); if (global_ckp && global_ckp->logfd) fflush(global_ckp->logfp); break; - // Always process immediately: - case CMD_AUTH: - case CMD_ADDRAUTH: - case CMD_HEARTBEAT: - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } case CMD_CHKPASS: case CMD_ADDUSER: case CMD_NEWPASS: @@ -3747,14 +3938,20 @@ static void *socketer(__maybe_unused void *arg) case CMD_STATS: case CMD_USERSTATUS: case CMD_SHSTA: - ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, - by_default, - (char *)__func__, - inet_default, - &cd, trf_root); - siz = strlen(ans) + strlen(id) + 32; + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + now.tv_sec, ans); send_unix_msg(sockd, rep); FREENULL(ans); switch (cmdnum) { @@ -3813,17 +4010,26 @@ static void *socketer(__maybe_unused void *arg) if (!startup_complete) { snprintf(reply, sizeof(reply), "%s.%ld.loading.%s", - id, now.tv_sec, cmd); + msgline->id, + now.tv_sec, + msgline->cmd); send_unix_msg(sockd, reply); } else { - ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, - by_default, - (char *)__func__, - inet_default, - &cd, trf_root); - siz = strlen(ans) + strlen(id) + 32; + DATA_MSGLINE(msgline, ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + now.tv_sec, ans); send_unix_msg(sockd, rep); FREENULL(ans); if (cmdnum == CMD_DSP) @@ -3846,28 +4052,35 @@ static void *socketer(__maybe_unused void *arg) if (!startup_complete) { snprintf(reply, sizeof(reply), "%s.%ld.loading.%s", - id, now.tv_sec, cmd); + msgline->id, + now.tv_sec, + msgline->cmd); send_unix_msg(sockd, reply); } else { - ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, - by_default, - (char *)__func__, - inet_default, - &cd, trf_root); - siz = strlen(ans) + strlen(id) + 32; + DATA_MSGLINE(msgline, ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + now.tv_sec, ans); send_unix_msg(sockd, rep); FREENULL(ans); FREENULL(rep); } break; - // Always queue (ok.queued) - case CMD_SHARELOG: - case CMD_POOLSTAT: - case CMD_USERSTAT: - case CMD_WORKERSTAT: - case CMD_BLOCK: + // Always process immediately: + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: // First message from the pool if (want_first) { want_first = false; @@ -3875,47 +4088,68 @@ static void *socketer(__maybe_unused void *arg) first_pool_message = strdup(buf); ck_wunlock(&fpm_lock); } - - snprintf(reply, sizeof(reply), - "%s.%ld.ok.queued", - id, now.tv_sec); - send_unix_msg(sockd, reply); - - K_WLOCK(workqueue_free); - item = k_unlink_head(workqueue_free); - K_WUNLOCK(workqueue_free); - - DATA_WORKQUEUE(workqueue, item); - workqueue->buf = buf; - buf = NULL; - workqueue->which_cmds = which_cmds; - workqueue->cmdnum = cmdnum; - STRNCPY(workqueue->cmd, cmd); - STRNCPY(workqueue->id, id); - copy_tv(&(workqueue->now), &now); - STRNCPY(workqueue->by, by_default); - STRNCPY(workqueue->code, __func__); - STRNCPY(workqueue->inet, inet_default); - copy_tv(&(workqueue->cd), &cd); - workqueue->trf_root = trf_root; - trf_root = NULL; - workqueue->trf_store = trf_store; - trf_store = NULL; + DATA_MSGLINE(msgline, ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + now.tv_sec, ans); + send_unix_msg(sockd, rep); + FREENULL(ans); + replied = true; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + if (!replied) { + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(buf); + ck_wunlock(&fpm_lock); + } + snprintf(reply, sizeof(reply), + "%s.%ld.ok.queued", + msgline->id, + now.tv_sec); + send_unix_msg(sockd, reply); + } K_WLOCK(workqueue_free); - k_add_tail(workqueue_store, item); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + k_add_tail(workqueue_store, wq_item); K_WUNLOCK(workqueue_free); + ml_item = NULL; mutex_lock(&wq_waitlock); pthread_cond_signal(&wq_waitcond); mutex_unlock(&wq_waitlock); break; // Code error default: - LOGEMERG("%s() CODE ERROR unhandled message %d %.32s...", + LOGEMERG("%s() CODE ERROR unhandled" + " message %d %.32s...", __func__, cmdnum, buf); snprintf(reply, sizeof(reply), "%s.%ld.failed.code", - id, now.tv_sec); + msgline->id, + now.tv_sec); send_unix_msg(sockd, reply); break; } @@ -3923,26 +4157,15 @@ static void *socketer(__maybe_unused void *arg) } close(sockd); - tick(); - - if (trf_root) - trf_root = free_ktree(trf_root, NULL); - if (trf_store) { - item = trf_store->head; - while (item) { - DATA_TRANSFER(transfer, item); - if (transfer->mvalue != transfer->svalue) - FREENULL(transfer->mvalue); - item = item->next; - } - K_WLOCK(transfer_free); - k_list_transfer_to_head(trf_store, transfer_free); - trf_store = k_free_store(trf_store); - if (transfer_free->count == transfer_free->total && - transfer_free->total > ALLOC_TRANSFER * CULL_TRANSFER) - k_cull_list(transfer_free); - K_WUNLOCK(transfer_free); + if (ml_item) { + free_msgline_data(ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + ml_item = NULL; } + + tick(); } socketer_using_data = false; @@ -3957,15 +4180,11 @@ static void *socketer(__maybe_unused void *arg) static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) { - char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; - char *end, *ans; - int which_cmds; - K_STORE *trf_store = NULL; - K_TREE *trf_root = NULL; - TRANSFER *transfer; - K_ITEM *item; - tv_t now, cd; + char *end, *ans, *st = NULL; + MSGLINE *msgline; + K_ITEM *ml_item; + tv_t now; bool matched; // Once we've read the message @@ -3977,28 +4196,32 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) *(end--) = '\0'; } if (!buf || !*buf) { - if (!buf) - LOGERR("%s() NULL message line %"PRIu64, __func__, count); - else - LOGERR("%s() Empty message line %"PRIu64, __func__, count); + if (!buf) { + LOGERR("%s() NULL message line %"PRIu64, + __func__, count); + } else { + LOGERR("%s() Empty message line %"PRIu64, + __func__, count); + } } else { matched = false; ck_wlock(&fpm_lock); - if (first_pool_message && strcmp(first_pool_message, buf) == 0) { + if (first_pool_message && + strcmp(first_pool_message, buf) == 0) { matched = true; FREENULL(first_pool_message); } ck_wunlock(&fpm_lock); - if (matched) - LOGERR("%s() reload ckpool queue match at line %"PRIu64, __func__, count); + if (matched) { + LOGERR("%s() reload ckpool queue match at line %"PRIu64, + __func__, count); + } LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, - cmd, id, &now, &cd, false, SI_RELOAD); + // ml_item is set for all but CMD_REPLY + cmdnum = breakdown(&ml_item, buf, &now, SE_RELOAD); + DATA_MSGLINE(msgline, ml_item); switch (cmdnum) { - // Don't ever attempt to double process reload data - case CMD_DUPSEQ: - break; // Ignore case CMD_REPLY: break; @@ -4037,8 +4260,11 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_MARKS: case CMD_PSHIFT: case CMD_SHSTA: - LOGERR("%s() INVALID message line %"PRIu64" '%s' - ignored", - __func__, count, cmd); + LOGERR("%s() INVALID message line %"PRIu64 + " ignored '%.42s...", + __func__, count, + st = safe_text(msgline->msg)); + FREENULL(st); break; case CMD_AUTH: case CMD_ADDRAUTH: @@ -4050,34 +4276,38 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) if (confirm_sharesummary) break; case CMD_SHARELOG: - ans = ckdb_cmds[which_cmds].func(conn, cmd, id, &now, - by_default, - (char *)__func__, - inet_default, - &cd, trf_root); - FREENULL(ans); + // This will return the same cmdnum or DUP + cmdnum = process_seq(msgline); + if (cmdnum != CMD_DUPSEQ) { + ans = ckdb_cmds[msgline->which_cmds].func(conn, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + FREENULL(ans); + } break; default: // Force this switch to be updated if new cmds are added - quithere(1, "%s line %"PRIu64" '%s' - not handled by reload", - filename, count, cmd); + quithere(1, "%s line %"PRIu64" '%s' - not " + "handled by reload", + filename, count, + st = safe_text_nonull(msgline->cmd)); + // Won't get here ... + FREENULL(st); break; } - if (trf_root) - trf_root = free_ktree(trf_root, NULL); - if (trf_store) { - item = trf_store->head; - while (item) { - DATA_TRANSFER(transfer, item); - if (transfer->mvalue != transfer->svalue) - FREENULL(transfer->mvalue); - item = item->next; - } - K_WLOCK(transfer_free); - k_list_transfer_to_head(trf_store, transfer_free); - K_WUNLOCK(transfer_free); - trf_store = k_free_store(trf_store); + if (ml_item) { + free_msgline_data(ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + ml_item = NULL; } } @@ -4305,6 +4535,8 @@ static bool reload_from(tv_t *start) } ck_wunlock(&fpm_lock); + seq_reloadmax(); + reloading = false; FREENULL(reload_buf); return ret; @@ -4312,48 +4544,66 @@ static bool reload_from(tv_t *start) static void process_queued(PGconn *conn, K_ITEM *wq_item) { - static char *last_buf = NULL; + enum cmd_values cmdnum; WORKQUEUE *workqueue; - TRANSFER *transfer; - K_ITEM *item; + MSGLINE *msgline; + K_ITEM *ml_item; char *ans; DATA_WORKQUEUE(workqueue, wq_item); - - // Simply ignore the (very rare) duplicates - if (!last_buf || strcmp(workqueue->buf, last_buf)) { - ans = ckdb_cmds[workqueue->which_cmds].func(conn, workqueue->cmd, workqueue->id, - &(workqueue->now), workqueue->by, - workqueue->code, workqueue->inet, - &(workqueue->cd), workqueue->trf_root); - FREENULL(ans); + ml_item = workqueue->msgline_item; + DATA_MSGLINE(msgline, ml_item); + + /* Queued messages haven't had their seq number check yet + * This will return the entries cmdnum or DUP */ + cmdnum = process_seq(msgline); + switch (cmdnum) { + case CMD_DUPSEQ: + // Already replied + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: + break; + default: + ans = ckdb_cmds[msgline->which_cmds].func(conn, + msgline->cmd, + msgline->id, + &(msgline->now), + workqueue->by, + workqueue->code, + workqueue->inet, + &(msgline->cd), + msgline->trf_root); + FREENULL(ans); + break; } - if (last_buf) - free(last_buf); - last_buf = workqueue->buf; - - workqueue->trf_root = free_ktree(workqueue->trf_root, NULL); - item = workqueue->trf_store->head; - while (item) { - DATA_TRANSFER(transfer, item); - if (transfer->mvalue != transfer->svalue) - FREENULL(transfer->mvalue); - item = item->next; - } - K_WLOCK(transfer_free); - k_list_transfer_to_head(workqueue->trf_store, transfer_free); - K_WUNLOCK(transfer_free); - workqueue->trf_store = k_free_store(workqueue->trf_store); + free_msgline_data(ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); K_WLOCK(workqueue_free); k_add_head(workqueue_free, wq_item); if (workqueue_free->count == workqueue_free->total && - workqueue_free->total > ALLOC_WORKQUEUE * CULL_WORKQUEUE) + workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE) k_cull_list(workqueue_free); K_WUNLOCK(workqueue_free); } +static void free_lost(SEQDATA *seqdata) +{ + if (seqdata->reload_lost) { + K_WLOCK(seqtrans_free); + k_list_transfer_to_head(seqdata->reload_lost, seqtrans_free); + if (seqtrans_free->count == seqtrans_free->total && + seqtrans_free->total >= ALLOC_SEQTRANS * CULL_SEQTRANS) + k_cull_list(seqtrans_free); + K_WUNLOCK(seqtrans_free); + seqdata->reload_lost = NULL; + } +} + // TODO: equivalent of api_allow static void *listener(void *arg) { @@ -4369,6 +4619,10 @@ static void *listener(void *arg) tv_t wq_stt, wq_fin; double min, sec; int left; + SEQSET *seqset = NULL; + SEQDATA *seqdata; + K_ITEM *ss_item; + int i; logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); @@ -4458,6 +4712,31 @@ static void *listener(void *arg) reload_queue_complete = true; } + /* Checked outside lock but only changed under lock + * This avoids taking out the lock repeatedly and the cleanup + * code is ok if there is nothing to clean up + * This would normally only ever be done once */ + if (seqdata_reload_lost && reload_queue_complete) { + /* Cleanup all the reload_lost stores since + * they should no longer be needed and the ram + * they use should be freed by the next cull */ + ck_wlock(&seq_lock); + if (seqset_store->count > 0) { + ss_item = seqset_store->head; + while (ss_item) { + DATA_SEQSET(seqset, ss_item); + if (seqset->seqstt) { + seqdata = &(seqset->seqdata[0]); + for (i = 0; i < SEQ_MAX; i++) { + free_lost(seqdata); + seqdata++; + } + } + } + } + seqdata_reload_lost = false; + ck_wunlock(&seq_lock); + } if (!wq_item) { const ts_t tsdiff = {0, 420000000}; diff --git a/src/ckdb.h b/src/ckdb.h index ba269993..16296c7e 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.080" +#define CKDB_VERSION DB_VERSION"-1.090" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -699,25 +699,47 @@ typedef struct logqueue { extern K_LIST *logqueue_free; extern K_STORE *logqueue_store; -// WORKQUEUE -typedef struct workqueue { - char *buf; +// MSGLINE +typedef struct msgline { int which_cmds; - enum cmd_values cmdnum; - char cmd[CMD_SIZ+1]; - char id[ID_SIZ+1]; tv_t now; - char by[TXT_SML+1]; - char code[TXT_MED+1]; - char inet[TXT_MED+1]; tv_t cd; + char id[ID_SIZ+1]; + char cmd[CMD_SIZ+1]; + char *msg; + bool hasseq; + char *seqcmdnam; + uint64_t n_seqall; + uint64_t n_seqcmd; + uint64_t n_seqstt; + uint64_t n_seqpid; + int seqentryflags; + char *code; K_TREE *trf_root; K_STORE *trf_store; +} MSGLINE; + +#define ALLOC_MSGLINE 8192 +#define LIMIT_MSGLINE 0 +#define CULL_MSGLINE 16 +#define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline) +#define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true) +#define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false) + +extern K_LIST *msgline_free; +extern K_STORE *msgline_store; + +// WORKQUEUE +typedef struct workqueue { + K_ITEM *msgline_item; + char *by; + char *code; + char *inet; } WORKQUEUE; #define ALLOC_WORKQUEUE 1024 #define LIMIT_WORKQUEUE 0 -#define CULL_WORKQUEUE 16 +#define CULL_WORKQUEUE 32 #define INIT_WORKQUEUE(_item) INIT_GENERIC(_item, workqueue) #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) @@ -795,28 +817,28 @@ extern tv_t missing_secuser_max; * missing counters are incremented - maxseq will now be N+5 * Once we reach N+size we need to discard N and use it as N+size * and increment seqbase N - * When we discard the oldest item due to needing more, if that oldest - * item was missing, it is now considered lost and the lost counters + * When we discard the oldest entry due to needing more, if that oldest + * entry was missing, it is now considered lost and the lost counters * are incremented (and missing counters decremented) - * If we receive an item N+x where N+x-maxseq>highlimit we reject it as high + * If we receive an entry N+x where N+x-maxseq>highlimit we reject it as high * and increment the high counters - this avoids creating a high bad sequence * number and flagging any missing sequence numbers, most likely incorrectly, * as lost in the range N to N+x-size - * If we receive an item N-x i.e. less than seqbase N, then: + * If we receive an entry N-x i.e. less than seqbase N, then: * If maxseq-N = size then N-x is considered stale and the stale counters - * are incremented since there's no unused items below N available + * are incremented since there's no unused entries below N available * This shouldn't normally happen after we've received size seq numbers - * Else maxseq-N is less than size, that means there are unused items below N + * Else maxseq-N is less than size, that means there are unused entries below N * This will usually only be with a new sequence and the first seq was out * of order, before lower sequence numbers, thus maxseq should be close to * seqbase N and no where near N+size and there should be x unused below N - * If there are x unused items below N then we can move seqbase down to N-x + * If there are x unused entries below N then we can move seqbase down to N-x * after we flag all N-1,N-2,N-3..N-(x-1) as missing - * Else there aren't enough unused items below N, then N-x is considered + * Else there aren't enough unused entries below N, then N-x is considered * stale and the stale counters are incremented * * timelimit is an early limit to flag missing sequence numbers as 'transient' - * Normally, if a missing item at N is later reused, it will be discarded and + * Normally, if a missing entry at N is later reused, it will be discarded and * reported as lost * After the reload queue is complete, timelimit reports missing sequence * numbers early, as transient, if they have been missing for 'timelimit' but @@ -827,7 +849,29 @@ extern tv_t missing_secuser_max; * reload queue has cleared after ckdb startup, it will report the transient * missing sequence numbers shortly after the timelimit * When they are later found or lost they will again be reported, this time as - * found or lost */ + * found or lost + * + * The sequence fields are checked for validity when the message arrives + * Sequence order checking of reload lines are done immediately + * Checking of ckpool lines isn't done until after the reload completes + * This solves the problem of the overlap from reload to queue + * If we stop the reload when we first match the queue, there's the issue + * that data order in the reload file may not match the data order in the + * queue and thus we could lose a record that is late in the reload file + * but was before the queue start + * To solve this unlikely (but not impossible) issue we reload all reload + * files to the end and then sequence process the queued data after the + * reload completes + * This however produces another (solvable) problem that the queue start + * may be stale when we finally complete the reload + * To handle this we keep a list of all lost records in the reload and check + * the stale queue records against those to see if we found them + * If a queue line is lower than minseq then that's a sequence error + * If a queue line is stale but above minseq, we check if it is in the + * lost list and then report it as recovered (and process it) + * If it wasn't lost then it's an expected duplicate and ignored + * Once the queue exceeds the reload maxseq, the lost records are no longer + * needed and are discarded */ // ckpool sequence numbers #define SEQALL "seqall" @@ -860,30 +904,26 @@ enum seq_num { // Ensure size is a (multiple of 8)-1 #define SEQ_CODE 15 -#define SICHR(_sif) (((_sif) == SI_EARLYSOCK) ? 'E' : \ - (((_sif) == SI_RELOAD) ? 'R' : \ - (((_sif) == SI_SOCKET) ? 'S' : '?'))) +#define SECHR(_sif) (((_sif) == SE_EARLYSOCK) ? 'E' : \ + (((_sif) == SE_RELOAD) ? 'R' : \ + (((_sif) == SE_SOCKET) ? 'S' : '?'))) -// Msg from the socket before startup completed - ignore if it was a DUP -#define SI_EARLYSOCK 1 +// Msg from the socket before startup completed +#define SE_EARLYSOCK 1 // Msg was from reload -#define SI_RELOAD 2 +#define SE_RELOAD 2 // Msg from the socket after startup completed -#define SI_SOCKET 4 - -/* An SI_EARLYSOCK item vs an SI_RELOAD item is not considered a DUP - * since the reload reads to the end of the reload file after - * the match between the queue and the reload has been found */ +#define SE_SOCKET 4 -typedef struct seqitem { +typedef struct seqentry { int flags; tv_t cd; // sec:0=missing, usec:0=miss !0=trans tv_t time; char code[SEQ_CODE+1]; -} SEQITEM; +} SEQENTRY; typedef struct seqdata { - size_t size; // item count - MUST be a power of 2 + size_t size; // entry count - MUST be a power of 2 uint64_t highlimit; int timelimit; uint64_t minseq; @@ -894,12 +934,15 @@ typedef struct seqdata { uint64_t lost; uint64_t stale; uint64_t high; + uint64_t recovered; uint64_t ok; + uint64_t reloadmax; tv_t firsttime; tv_t lasttime; tv_t firstcd; tv_t lastcd; - SEQITEM *item; + SEQENTRY *entry; + K_STORE *reload_lost; } SEQDATA; // SEQSET @@ -911,6 +954,7 @@ typedef struct seqset { uint64_t lost; // total from seqdata uint64_t stale; // total from seqdata uint64_t high; // total from seqdata + uint64_t recovered; // total from seqdata uint64_t ok; // total from seqdata SEQDATA seqdata[SEQ_MAX]; } SEQSET; @@ -920,15 +964,22 @@ typedef struct seqset { * the first time it processes a record with sequences */ // SEQALL and SHARES */ -#define SEQ_LARGE_LIM 64 -#define SEQ_LARGE_SIZ (65536*SEQ_LARGE_LIM) +#define SEQ_LARGE_TRANS_LIM 16 +#define SEQ_LARGE_SIZ (65536*SEQ_LARGE_TRANS_LIM) // WORKERSTATS, AUTH and ADDRAUTH -#define SEQ_MEDIUM_LIM 128 +#define SEQ_MEDIUM_TRANS_LIM 32 #define SEQ_MEDIUM_SIZ 65536 // The rest -#define SEQ_SMALL_LIM 128 +#define SEQ_SMALL_TRANS_LIM 64 #define SEQ_SMALL_SIZ 16384 +// highlimit ratio (shift down bits) +#define HIGH_SHIFT 8 +// Smallest highlimit allowed +#define HIGH_MIN 32 +// Smallest _SIZ allowed +#define BASE_SIZ (HIGH_MIN << HIGH_SHIFT) + #define ALLOC_SEQSET 1 #define LIMIT_SEQSET 16 #define INIT_SEQSET(_item) INIT_GENERIC(_item, seqset) @@ -941,11 +992,11 @@ extern K_STORE *seqset_store; // Initialised when seqset_free is allocated extern char *seqnam[SEQ_MAX]; -// SEQTRANS +// SEQTRANS also used for reload_lost typedef struct seqtrans { int seq; uint64_t seqnum; - SEQITEM item; + SEQENTRY entry; } SEQTRANS; // The stores are created and freed each time required @@ -953,9 +1004,10 @@ extern K_LIST *seqtrans_free; #define ALLOC_SEQTRANS 1024 #define LIMIT_SEQTRANS 0 -#define CULL_SEQTRANS 65536 +#define CULL_SEQTRANS 64 #define INIT_SEQTRANS(_item) INIT_GENERIC(_item, seqtrans) #define DATA_SEQTRANS(_var, _item) DATA_GENERIC(_var, _item, seqtrans, true) +#define DATA_SEQTRANS_NULL(_var, _item) DATA_GENERIC(_var, _item, seqtrans, false) // USERS typedef struct users { @@ -1902,13 +1954,15 @@ extern void sequence_report(bool lock); #define PPLNSDIFFADD "pplns_diff_add" // Data free functions (first) +extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull); extern void free_workinfo_data(K_ITEM *item); extern void free_sharesummary_data(K_ITEM *item); extern void free_optioncontrol_data(K_ITEM *item); extern void free_markersummary_data(K_ITEM *item); extern void free_workmarkers_data(K_ITEM *item); extern void free_marks_data(K_ITEM *item); -extern void free_seqset_data(K_ITEM *item); +#define free_seqset_data(_item) _free_seqset_data(_item, false) +extern void _free_seqset_data(K_ITEM *item, bool lock); #define safe_text(_txt) _safe_text(_txt, true) #define safe_text_nonull(_txt) _safe_text(_txt, false) diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 5e145ebd..ab0c68a8 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -5194,6 +5194,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(poolstats, 1, 1); USEINFO(userstats, 2, 1); USEINFO(workerstatus, 1, 1); + USEINFO(msgline, 1, 0); USEINFO(workqueue, 1, 0); USEINFO(transfer, 0, 0); USEINFO(heartbeatqueue, 1, 0); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 17e3d431..ac4e0136 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -11,6 +11,39 @@ #include // Data free functions (added here as needed) + +void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull) +{ + K_ITEM *t_item = NULL; + TRANSFER *transfer; + MSGLINE *msgline; + + DATA_MSGLINE(msgline, item); + if (msgline->trf_root) + msgline->trf_root = free_ktree(msgline->trf_root, NULL); + if (msgline->trf_store) { + t_item = msgline->trf_store->head; + while (t_item) { + DATA_TRANSFER(transfer, t_item); + if (transfer->mvalue != transfer->svalue) + FREENULL(transfer->mvalue); + t_item = t_item->next; + } + if (t_lock) + K_WLOCK(transfer_free); + k_list_transfer_to_head(msgline->trf_store, transfer_free); + if (t_cull) { + if (transfer_free->count == transfer_free->total && + transfer_free->total >= ALLOC_TRANSFER * CULL_TRANSFER) + k_cull_list(transfer_free); + } + if (t_lock) + K_WUNLOCK(transfer_free); + msgline->trf_store = k_free_store(msgline->trf_store); + } + FREENULL(msgline->msg); +} + void free_workinfo_data(K_ITEM *item) { WORKINFO *workinfo; @@ -87,15 +120,27 @@ void free_marks_data(K_ITEM *item) FREENULL(marks->extra); } -void free_seqset_data(K_ITEM *item) +void _free_seqset_data(K_ITEM *item, bool lock) { + K_STORE *reload_lost; SEQSET *seqset; int i; DATA_SEQSET(seqset, item); if (seqset->seqstt) { - for (i = 0; i < SEQ_MAX; i++) - FREENULL(seqset->seqdata[i].item); + for (i = 0; i < SEQ_MAX; i++) { + reload_lost = seqset->seqdata[i].reload_lost; + if (reload_lost) { + if (lock) + K_WLOCK(seqtrans_free); + k_list_transfer_to_head(reload_lost, seqtrans_free); + if (lock) + K_WUNLOCK(seqtrans_free); + k_free_store(reload_lost); + seqset->seqdata[i].reload_lost = NULL; + } + FREENULL(seqset->seqdata[i].entry); + } seqset->seqstt = 0; } }