From 15d16df1fe5e4e1aa70f378f1f26c9f48221750f Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 20 Apr 2015 18:17:49 +1000 Subject: [PATCH 1/6] ckdb - process/report sequence numbers and #define some db/transfer names --- src/ckdb.c | 976 +++++++++++++++++++++++++++++++++++++++++++++--- src/ckdb.h | 241 +++++++++++- src/ckdb_cmd.c | 102 ++--- src/ckdb_data.c | 59 ++- src/ckdb_dbio.c | 128 +++---- src/libckpool.h | 3 + 6 files changed, 1330 insertions(+), 179 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 1ae99ea2..e695f1ce 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -276,6 +276,8 @@ bool db_load_complete = false; bool reloading = false; // Data load is complete bool startup_complete = false; +// Set to true the first time workqueue reaches 0 after startup +static bool reload_queue_complete = false; // Tell everyone to die bool everyone_die = false; @@ -317,6 +319,15 @@ K_STORE *heartbeatqueue_store; // TRANSFER K_LIST *transfer_free; +// SEQSET +K_LIST *seqset_free; +K_STORE *seqset_store; +char *seqnam[SEQ_MAX]; +static cklock_t seq_lock; + +// SEQTRANS +K_LIST *seqtrans_free; + // USERS K_TREE *users_root; K_TREE *userid_root; @@ -940,6 +951,36 @@ static void clean_up(ckpool_t *ckp) static void alloc_storage() { + int seq; + + seqset_free = k_new_list("SeqSet", sizeof(SEQSET), + ALLOC_SEQSET, LIMIT_SEQSET, true); + seqset_store = k_new_store(seqset_free); + + // Map the SEQ_NNN values to their cmd names + seqnam[0] = SEQALL; + for (seq = 0; ckdb_cmds[seq].cmd_val != CMD_END; seq++) { + if (ckdb_cmds[seq].seq != SEQ_NONE) { + if (seqnam[ckdb_cmds[seq].seq]) { + quithere(1, "Name map in cddb_cmds[] to seqnam" + " isn't unique - seq %d is listed" + " twice as %s and %s", + seq, seqnam[ckdb_cmds[seq].seq], + ckdb_cmds[seq].cmd_str); + } + seqnam[ckdb_cmds[seq].seq] = ckdb_cmds[seq].cmd_str; + } + } + for (seq = 0; seq < SEQ_MAX; seq++) { + if (seqnam[seq] == NULL) { + quithere(1, "Name map in cddb_cmds[] is incomplete - " + " %d is missing", seq); + } + } + + seqtrans_free = k_new_list("SeqTrans", sizeof(SEQTRANS), + ALLOC_SEQTRANS, LIMIT_SEQTRANS, true); + workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); workqueue_store = k_new_store(workqueue_free); @@ -1089,6 +1130,38 @@ static void alloc_storage() marks_root = new_ktree(); } +#define SEQSETWARN(_seqset, _msgtxt, _endtxt) do { \ + char _t_buf[DATE_BUFSIZ]; \ + btu64_to_buf(&((_seqset)->seqstt), _t_buf, sizeof(_t_buf)); \ + LOGWARNING("SEQ %s: "SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ + "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 \ + " %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64 \ + "/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64 \ + "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \ + "/OK%"PRIu64"%s", \ + _msgtxt, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ + (_seqset)->missing, (_seqset)->trans, (_seqset)->lost, \ + (_seqset)->stale, (_seqset)->high, (_seqset)->ok, \ + seqnam[SEQ_ALL], \ + (_seqset)->seqdata[SEQ_ALL].minseq, \ + (_seqset)->seqdata[SEQ_ALL].maxseq, \ + (_seqset)->seqdata[SEQ_ALL].missing, \ + (_seqset)->seqdata[SEQ_ALL].trans, \ + (_seqset)->seqdata[SEQ_ALL].lost, \ + (_seqset)->seqdata[SEQ_ALL].stale, \ + (_seqset)->seqdata[SEQ_ALL].high, \ + (_seqset)->seqdata[SEQ_ALL].ok, \ + seqnam[SEQ_SHARES], \ + (_seqset)->seqdata[SEQ_SHARES].minseq, \ + (_seqset)->seqdata[SEQ_SHARES].maxseq, \ + (_seqset)->seqdata[SEQ_SHARES].missing, \ + (_seqset)->seqdata[SEQ_SHARES].trans, \ + (_seqset)->seqdata[SEQ_SHARES].lost, \ + (_seqset)->seqdata[SEQ_SHARES].stale, \ + (_seqset)->seqdata[SEQ_SHARES].high, \ + (_seqset)->seqdata[SEQ_SHARES].ok, _endtxt); \ + } while(0) + #define FREE_TREE(_tree) \ if (_tree ## _root) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \ @@ -1128,9 +1201,10 @@ static void alloc_storage() static void dealloc_storage() { SHAREERRORS *shareerrors; - SHARES *shares; - K_ITEM *s_item; + K_ITEM *s_item, *ss_item; char *st = NULL; + SHARES *shares; + SEQSET *seqset; LOGWARNING("%s() logqueue ...", __func__); @@ -1192,7 +1266,7 @@ static void dealloc_storage() LOGERR("%s(): %"PRId64"/%s/%"PRId32"/%s/%ld,%ld", __func__, shareerrors->workinfoid, - st = safe_text(shareerrors->workername), + st = safe_text_nonull(shareerrors->workername), shareerrors->errn, shareerrors->error, shareerrors->createdate.tv_sec, @@ -1213,7 +1287,7 @@ static void dealloc_storage() LOGERR("%s(): %"PRId64"/%s/%s/%"PRId32"/%ld,%ld", __func__, shares->workinfoid, - st = safe_text(shares->workername), + st = safe_text_nonull(shares->workername), shares->nonce, shares->errn, shares->createdate.tv_sec, @@ -1256,6 +1330,25 @@ static void dealloc_storage() FREE_LISTS(heartbeatqueue); FREE_LISTS(workqueue); + LOGWARNING("%s() seqset ...", __func__); + + ss_item = seqset_store->head; + while (ss_item) { + DATA_SEQSET(seqset, ss_item); + /* Missing/Trans/Lost should all be 0 for shares */ + if (seqset->seqstt && (seqset->seqdata[SEQ_SHARES].missing || + seqset->seqdata[SEQ_SHARES].trans || + seqset->seqdata[SEQ_SHARES].lost)) { + SEQSETWARN(seqset, "SHARES MISSING", EMPTY); + } + ss_item = ss_item->next; + } + FREE_LIST(seqtrans); + + FREE_STORE_DATA(seqset); + FREE_LIST_DATA(seqset); + FREE_LISTS(seqset); + LOGWARNING("%s() finished", __func__); } @@ -1342,16 +1435,740 @@ static bool setup_data() return true; } +#define SEQINUSE firstcd.tv_sec + +#define RESETSET(_seqset, _n_seqstt, _n_seqpid) do { \ + int _i; \ + (_seqset)->seqstt = (_n_seqstt); \ + (_seqset)->seqpid = (_n_seqpid); \ + for (_i = 0; _i < SEQ_MAX; _i++) { \ + (_seqset)->seqdata[_i].minseq = \ + (_seqset)->seqdata[_i].maxseq = \ + (_seqset)->seqdata[_i].seqbase = \ + (_seqset)->seqdata[_i].missing = \ + (_seqset)->seqdata[_i].trans = \ + (_seqset)->seqdata[_i].lost = \ + (_seqset)->seqdata[_i].stale = \ + (_seqset)->seqdata[_i].high = \ + (_seqset)->seqdata[_i].ok = 0; \ + (_seqset)->seqdata[_i].SEQINUSE = \ + (_seqset)->seqdata[_i].firstcd.tv_usec = \ + (_seqset)->seqdata[_i].lastcd.tv_sec = \ + (_seqset)->seqdata[_i].lastcd.tv_usec = \ + (_seqset)->seqdata[_i].firsttime.tv_sec = \ + (_seqset)->seqdata[_i].firsttime.tv_usec = \ + (_seqset)->seqdata[_i].lasttime.tv_sec = \ + (_seqset)->seqdata[_i].lasttime.tv_usec = 0; \ + } \ + } while (0); + +#define BASE_SIZ 64 +#define HIGH_MIN 4 + +#define MISSFLAG cd.tv_sec +#define TRANSFLAG cd.tv_usec + +// Test if an item is missing/transient +#define ITEMISMIS(_seqitem) ((_seqitem)->MISSFLAG == 0) +#define DATAISMIS(_seqdata, _u) \ + ITEMISMIS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + +#define ITEMONLYMIS(_seqitem) (((_seqitem)->MISSFLAG == 0) && \ + ((_seqitem)->TRANSFLAG == 0)) + +#define ITEMISTRANS(_seqitem) (((_seqitem)->MISSFLAG == 0) && \ + ((_seqitem)->TRANSFLAG != 0)) +#define DATAISTRANS(_seqdata, _u) \ + ITEMISTRANS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + +// Flag an item as missing +#define ITEMSETMIS(_seqitem, _now) do { \ + (_seqitem)->MISSFLAG = 0; \ + (_seqitem)->TRANSFLAG = 0; \ + (_seqitem)->time.tv_sec = now->tv_sec; \ + (_seqitem)->time.tv_usec = now->tv_usec; \ + } while(0) +#define DATASETMIS(_seqdata, _u, _now) \ + ITEMSETMIS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)]), _now) + +/* Flag an item directly as transient missing - + * it will already be missing but we set both flags */ +#define ITEMSETTRANS(_seqitem) do { \ + (_seqitem)->MISSFLAG = 0; \ + (_seqitem)->TRANSFLAG = 1; \ + } while(0) +#define DATASETTRANS(_seqdata, _u) \ + ITEMSETTRANS(&((_seqdata)->item[(_u) & ((_seqdata)->size - 1)])) + +// Check for transient missing every 2s +#define TRANCHECKLIMIT 2.0 +static tv_t last_trancheck; +// Don't let these messages be slowed down by a trans_process() +#define TRANCHKSEQOK(_seq) ((_seq) != SEQ_SHARES && (_seq) != SEQ_AUTH && \ + (_seq) != SEQ_ADDRAUTH && (_seq) != SEQ_BLOCK) + +/* time (now) is used, not cd, since cd is only relevant to reloading + * and we don't run trans_process() during reloading + * we also only know now, not cd, for a missing item + * This fills in store with a copy of the defails of all the new + * transients */ +static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) +{ + SEQDATA *seqdata = NULL; + SEQITEM *seqitem = NULL; + uint64_t zero, top, u; + SEQTRANS *seqtrans; + K_ITEM *st_item; + int seq; + + for (seq = 0; seq < SEQ_MAX; seq++) { + seqdata = &(seqset->seqdata[seq]); + if (seqdata->SEQINUSE == 0) + continue; + + /* run as 2 loops from seqbase to top, then bottom to maxseq + * thus we can use seqitem++ rather than calculating it + * each time */ + zero = seqdata->seqbase - (seqdata->seqbase & (seqdata->size - 1)); + top = zero + seqdata->size - 1; + if (top > seqdata->maxseq) + top = seqdata->maxseq; + u = seqdata->seqbase; + seqitem = &(seqdata->item[seqdata->seqbase & (seqdata->size - 1)]); + while (u <= top) { + if (ITEMONLYMIS(seqitem) && + tvdiff(now, &(seqitem->time)) > seqdata->timelimit) { + ITEMSETTRANS(seqitem); + seqdata->trans++; + seqset->trans++; + + K_WLOCK(seqtrans_free); + st_item = k_unlink_head(seqtrans_free); + K_WUNLOCK(seqtrans_free); + DATA_SEQTRANS(seqtrans, st_item); + seqtrans->seq = seq; + seqtrans->seqnum = u; + memcpy(&(seqtrans->item), seqitem, sizeof(SEQITEM)); + k_add_head(store, st_item); + } + u++; + seqitem++; + } + + // 2nd loop isn't needed, we've already covered the full range + if (top == seqdata->maxseq) + continue; + + u = zero + seqdata->size; + seqitem = &(seqdata->item[0]); + while (u <= seqdata->maxseq) { + if (ITEMONLYMIS(seqitem) && + tvdiff(now, &(seqitem->time)) > seqdata->timelimit) { + ITEMSETTRANS(seqitem); + seqdata->trans++; + seqset->trans++; + + K_WLOCK(seqtrans_free); + st_item = k_unlink_head(seqtrans_free); + K_WUNLOCK(seqtrans_free); + DATA_SEQTRANS(seqtrans, st_item); + seqtrans->seq = seq; + seqtrans->seqnum = u; + memcpy(&(seqtrans->item), seqitem, sizeof(SEQITEM)); + k_add_head(store, st_item); + } + u++; + seqitem++; + } + } +} + +static void trans_seq(tv_t *now) +{ + char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ]; + K_STORE *store; + SEQSET *seqset = NULL; + K_ITEM *item = NULL, *lastitem = NULL; + SEQTRANS *seqtrans; + K_ITEM *st_item; + uint64_t seqstt = 0, seqpid = 0; + bool more = true; + int i, j; + + store = k_new_store(seqtrans_free); + for (i = 0; more; i++) { + ck_wlock(&seq_lock); + if (seqset_store->count <= i) + more = false; + else { + item = seqset_store->head; + for (j = 0; item && j < 0; j++) + item = item->next; + if (!item) + more = false; + else { + /* Avoid wasting time reprocessing the item + * if a new set was added outside the lock + * and pushed the sets along one */ + if (item != lastitem) { + DATA_SEQSET(seqset, item); + seqstt = seqset->seqstt; + seqpid = seqset->seqpid; + if (seqstt) + trans_process(seqset, now, store); + else + more = false; + } + lastitem = item; + } + } + if (seqset_store->count <= (i + 1)) + more = false; + ck_wunlock(&seq_lock); + + st_item = store->tail; + while (st_item) { + DATA_SEQTRANS(seqtrans, st_item); + btu64_to_buf(&seqstt, t_buf, sizeof(t_buf)); + bt_to_buf(&(seqtrans->item.time.tv_sec), t_buf2, + sizeof(t_buf2)); + LOGWARNING("SEQ trans %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", + seqnam[seqtrans->seq], seqtrans->seqnum, + i, seqstt, t_buf, seqpid, + t_buf2, seqtrans->item.code); + st_item = st_item->prev; + } + if (store->head) { + K_WLOCK(seqtrans_free); + k_list_transfer_to_head(store, seqtrans_free); + K_WUNLOCK(seqtrans_free); + } + } + store = k_free_store(store); +} + +static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, + uint64_t n_seqstt, uint64_t n_seqpid, + char *nam, tv_t *now, tv_t *cd, char *code, + bool warndup, char *msg) +{ + char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; + bool firstseq, newseq, expseq, gothigh, gotstale, gotstalestart; + SEQSET *seqset = NULL, *seqset0 = NULL, seqset_pre = { 0 }; + SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; + bool dup, wastrans, doitem, dotime; + SEQDATA *seqdata; + SEQITEM *seqitem; + K_ITEM *seqset_item = NULL, *st_item = NULL; + SEQTRANS *seqtrans = NULL; + size_t siz, end; + void *off0, *offn; + uint64_t u; + int set = -1, highlimit, i; + K_STORE *lost; + + // We store the lost items in here + lost = k_new_store(seqtrans_free); + firstseq = newseq = expseq = gothigh = gotstale = gotstalestart = + dup = wastrans = false; + ck_wlock(&seq_lock); + // Get the seqset + if (seqset_store->count == 0) + firstseq = true; + else { + // Normal processing is: count=1 and head is current + seqset_item = seqset_store->head; + DATA_SEQSET(seqset, seqset_item); + set = 0; + if (n_seqstt == seqset->seqstt && n_seqpid == seqset->seqpid) + goto gotseqset; + // It's not the current set, check the older ones + while ((seqset_item = seqset_item->next)) { + DATA_SEQSET(seqset, seqset_item); + set++; + if (seqset->seqstt && n_seqstt == seqset->seqstt && + n_seqpid == seqset->seqpid) { + goto gotseqset; + } + } + } + + // Need to get a new seqset + newseq = true; + if (!firstseq) { + /* The current seqset (about to become the previous) + * If !seqset_store->head (i.e. a bug) this will quit() */ + DATA_SEQSET(seqset0, seqset_store->head); + memcpy(&seqset_pre, seqset0, sizeof(seqset_pre)); + } + seqset_item = k_unlink_head_zero(seqset_free); + if (seqset_item) { + // Setup a new set - everything is already zero + DATA_SEQSET(seqset, seqset_item); + seqset->seqstt = n_seqstt; + seqset->seqpid = n_seqpid; + for (i = 0; i < SEQ_MAX; i++) { + // Unnecessary - but as a reminder + seqset->seqdata[i].SEQINUSE = 0; + switch (i) { + case SEQ_ALL: + case SEQ_SHARES: + seqset->seqdata[i].size = SEQ_LARGE_SIZ; + seqset->seqdata[i].timelimit = SEQ_LARGE_LIM; + break; + case SEQ_WORKERSTAT: + case SEQ_AUTH: + case SEQ_ADDRAUTH: + seqset->seqdata[i].size = SEQ_MEDIUM_SIZ; + seqset->seqdata[i].timelimit = SEQ_MEDIUM_LIM; + break; + default: + seqset->seqdata[i].size = SEQ_SMALL_SIZ; + seqset->seqdata[i].timelimit = SEQ_SMALL_LIM; + break; + } + siz = seqset->seqdata[i].size; + if (siz < BASE_SIZ || (siz & (siz-1))) { + quithere(1, "seqdata[%d] size %d (0x%x) is %s %d", + i, (int)siz, (int)siz, + (siz < BASE_SIZ) ? + "too small, must be >=" : + "not a power of", + (siz < BASE_SIZ) ? BASE_SIZ : 2); + } + highlimit = siz >> 2; // 1/4 + if (highlimit < HIGH_MIN) { + // On the first ever seq record + quithere(1, "seqdata[%d] highlimit %d (0x%x) " + "is too small, must be >= %d", + i, highlimit, highlimit, HIGH_MIN); + } + seqset->seqdata[i].highlimit = highlimit; + seqset->seqdata[i].item = calloc(siz, sizeof(SEQITEM)); + end = siz * sizeof(SEQITEM); + off0 = &(seqset->seqdata[i].item[0]); + offn = &(seqset->seqdata[i].item[siz]); + if ((int)end != (offn - off0)) { + // On the first ever seq record + quithere(1, "memory size (%d) != structure " + "offset (%d) - your cc sux", + (int)end, (int)(offn - off0)); + } + LIST_MEM_ADD_SIZ(seqset_free, end); + } + } else { + // Expire the last set and overwrite it + seqset_item = k_unlink_tail(seqset_store); + // If !item (i.e. a bug) this will quit() + DATA_SEQSET(seqset, seqset_item); + memcpy(&seqset_exp, seqset, sizeof(seqset_exp)); + expseq = true; + RESETSET(seqset, n_seqstt, n_seqpid); + } + k_add_head(seqset_store, seqset_item); + set = 0; + +gotseqset: + doitem = dotime = false; + + seqdata = &(seqset->seqdata[seq]); + seqitem = &(seqdata->item[n_seqcmd & (seqdata->size - 1)]); + if (seqdata->SEQINUSE == 0) { + // First n_seqcmd for the given seq + copy_tv(&(seqdata->firsttime), now); + copy_tv(&(seqdata->lasttime), now); + copy_tv(&(seqdata->firstcd), cd); // In use + copy_tv(&(seqdata->lastcd), cd); + seqdata->minseq = seqdata->maxseq = + seqdata->seqbase = n_seqcmd; + doitem = true; + goto setitemdata; + } + + if (n_seqcmd > seqdata->maxseq) { + /* New seq above maxseq + * N.B. it must be at most highlimit above maxseq + * This is somewhat arbitrary but some limit is necessary to + * ensure a high bad value doesn't cause many lower lost/stale + * messages, but instead just one high message and possibly + * later trans/lost messages */ + if ((n_seqcmd - seqdata->maxseq) > seqdata->highlimit) { + seqdata->high++; + seqset->high++; + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + gothigh = true; + goto setitemdata; + } + for (u = seqdata->maxseq + 1; u <= n_seqcmd; u++) { + if ((u - seqdata->seqbase) < seqdata->size) { + // u is unused + if (u < n_seqcmd) { + // Flag skipped ones as missing + DATASETMIS(seqdata, u, now); + seqdata->missing++; + seqset->missing++; + } + } else { + // u is used by (u-size) + if (DATAISMIS(seqdata, u)) { + // (u-size) was missing + K_WLOCK(seqtrans_free); + st_item = k_unlink_head(seqtrans_free); + K_WUNLOCK(seqtrans_free); + DATA_SEQTRANS(seqtrans, st_item); + seqtrans->seqnum = u - seqdata->size; + memcpy(&(seqtrans->item), + &(seqdata->item[u & (seqdata->size - 1)]), + sizeof(SEQITEM)); + k_add_head(lost, st_item); + seqdata->lost++; + seqset->lost++; + if (DATAISTRANS(seqdata, u)) { + seqdata->trans--; + seqset->trans--; + } + if (u == n_seqcmd) { + // new wont be missing + seqdata->missing--; + seqset->missing--; + } else { + // new will also be missing + DATASETMIS(seqdata, u, now); + } + } else { + // (u-size) wasn't missing + if (u < n_seqcmd) { + // Flag skipped as missing + DATASETMIS(seqdata, u, now); + seqdata->missing++; + seqset->missing++; + } + } + seqdata->seqbase++; + } + seqdata->maxseq++; + } + // store n_seqcmd + doitem = true; + dotime = true; + } else if (n_seqcmd >= seqdata->seqbase) { + /* It's within the range thus dup or missing */ + if (!ITEMISMIS(seqitem)) { + dup = true; + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + } else { + // Found a missing one + seqdata->missing--; + seqset->missing--; + if (ITEMISTRANS(seqitem)) { + seqdata->trans--; + seqset->trans--; + wastrans = true; + } + doitem = true; + dotime = true; + } + } else if (n_seqcmd < seqdata->minseq) { + /* This would be early during startup, less than minseq + * This requires lowering minseq if there's space to lower it + * to n_seqcmd */ + if ((n_seqcmd + seqdata->size) > seqdata->maxseq) { + // Set all after n_seqcmd but before minseq as missing + for (u = seqdata->minseq - 1; u > n_seqcmd; u--) { + DATASETMIS(seqdata, u, now); + seqdata->minseq--; + seqdata->seqbase--; + seqset->missing++; + seqdata->missing++; + } + seqdata->minseq--; + seqdata->seqbase--; + doitem = true; + dotime = true; + } else { + // Can't go back that far, so it's stale + seqdata->stale++; + seqset->stale++; + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + gotstalestart = true; + } + } else { + // >=minseq but stale++; + seqset->stale++; + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + gotstale = true; + } + +setitemdata: + // Store the new seq if flagged to do so + if (doitem) { + copy_tv(&(seqitem->time), now); + copy_tv(&(seqitem->cd), cd); + STRNCPY(seqitem->code, code); + seqdata->ok++; + seqset->ok++; + } + if (dotime) { + if (tv_newer(now, &(seqdata->firsttime))) + copy_tv(&(seqdata->firsttime), now); + if (tv_newer(&(seqdata->lasttime), now)) + copy_tv(&(seqdata->lasttime), now); + if (tv_newer(cd, &(seqdata->firstcd))) + copy_tv(&(seqdata->firstcd), cd); + if (tv_newer(&(seqdata->lastcd), cd)) + copy_tv(&(seqdata->lastcd), cd); + } + + ck_wunlock(&seq_lock); + + if (firstseq) { + // The first ever SEQ_ALL + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + LOGWARNING("Seq first init: %s %"PRIu64" " + SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, + nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); + } else { + if (newseq) + SEQSETWARN(&seqset_pre, "previous", EMPTY); + if (expseq) + SEQSETWARN(&seqset_exp, "discarded old", " for:"); + if (newseq || expseq) { + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + LOGWARNING("Seq created new: %s %"PRIu64" " + SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, + nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); + } + } + + if (dup) { + int level = LOG_DEBUG; + if (warndup) + level = LOG_WARNING; + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); + LOGMSG(level, "SEQ dup %s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 + " %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 + "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 + "/OK%"PRIu64" cmd=%.42s...", + nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, + t_buf2, code, + seqset_copy.seqdata[seq].minseq, + seqset_copy.seqdata[seq].maxseq, + seqset_copy.seqdata[seq].missing, + seqset_copy.seqdata[seq].trans, + seqset_copy.seqdata[seq].lost, + seqset_copy.seqdata[seq].stale, + seqset_copy.seqdata[seq].high, + seqset_copy.seqdata[seq].ok, + st = safe_text(msg)); + FREENULL(st); + } + + if (wastrans) { + int level = LOG_DEBUG; + if (warndup) + level = LOG_WARNING; + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); + LOGMSG(level, "SEQ found trans %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", + nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, + t_buf2, code); + } + + if (gotstale || gotstalestart || gothigh) { + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); + LOGWARNING("SEQ %s %s%s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 + " %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64 + "/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 + " cmd=%.42s...", + gothigh ? "high" : "stale", + gotstalestart ? "STARTUP " : EMPTY, + nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, + t_buf2, code, + seqset_copy.seqdata[seq].minseq, + seqset_copy.seqdata[seq].maxseq, + seqset_copy.seqdata[seq].missing, + seqset_copy.seqdata[seq].trans, + seqset_copy.seqdata[seq].lost, + seqset_copy.seqdata[seq].stale, + seqset_copy.seqdata[seq].high, + seqset_copy.seqdata[seq].ok, + st = safe_text(msg)); + FREENULL(st); + } + if (reload_queue_complete && TRANCHKSEQOK(seq)) { + if (last_trancheck.tv_sec == 0) + setnow(&last_trancheck); + else { + if (tvdiff(now, &last_trancheck) > TRANCHECKLIMIT) { + trans_seq(now); + setnow(&last_trancheck); + } + } + } + + if (lost->head) { + st_item = lost->head; + while (st_item) { + DATA_SEQTRANS(seqtrans, st_item); + btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); + bt_to_buf(&(seqtrans->item.time.tv_sec), t_buf2, + sizeof(t_buf2)); + LOGWARNING("SEQ lost %s %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", + ITEMISTRANS(&(seqtrans->item)) ? + "trans" : "missing", + seqnam[seq], seqtrans->seqnum, set, + n_seqstt, t_buf, n_seqpid, t_buf2, + seqtrans->item.code); + st_item = st_item->next; + } + K_WLOCK(seqtrans_free); + k_list_transfer_to_head(lost, seqtrans_free); + K_WUNLOCK(seqtrans_free); + } + lost = k_free_store(lost); + + return dup; +} + +static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, + tv_t *now, char *msg, K_TREE *trf_root, + bool wantauth) +{ + uint64_t n_seqall, n_seqstt, n_seqpid, n_seqcmd; + K_ITEM *seqstt, *seqpid, *seqcmd, *i_code; + char *err = NULL, *st = NULL; + size_t len, off; + bool dupall, dupcmd, warndup; + char *code = NULL; + char buf[64]; + + n_seqstt = n_seqpid = n_seqcmd = 0; + n_seqall = atol(transfer_data(seqall)); + if ((seqstt = find_transfer(trf_root, SEQSTT))) + n_seqstt = atol(transfer_data(seqstt)); + + /* Ignore SEQSTTIGN sequence information + * This allows us to manually generate ckpool data and send it + * to ckdb using ckpmsg - if SEQSTT == SEQSTTIGN + * SEQALL must exist but the value is ignored + * SEQPID and SEQcmd don't need to exist and are ignored */ + if (n_seqstt == SEQSTTIGN) { + LOGWARNING("%s(): SEQIGN in %.42s...", + __func__, st = safe_text(msg)); + FREENULL(st); + return ckdb_cmds[which].cmd_val; + } + + if ((seqpid = find_transfer(trf_root, SEQPID))) + n_seqpid = atol(transfer_data(seqpid)); + snprintf(buf, sizeof(buf), "%s%s", SEQPRE, ckdb_cmds[which].cmd_str); + if ((seqcmd = find_transfer(trf_root, buf))) + n_seqcmd = atol(transfer_data(seqcmd)); + + /* Make one message with the initial seq missing/value problems ... + * that should never happen if seqall is present */ + if (!seqstt || !seqpid || !seqcmd || + (seqstt && n_seqstt < DATE_BEGIN) || (seqpid && n_seqpid <= 0)) { + APPEND_REALLOC_INIT(err, off, len); + APPEND_REALLOC(err, off, len, "ERROR: command "); + APPEND_REALLOC(err, off, len, ckdb_cmds[which].cmd_str); + if (!seqstt || !seqpid || !seqcmd) { + APPEND_REALLOC(err, off, len, " - missing"); + if (!seqstt) + APPEND_REALLOC(err, off, len, BLANK SEQSTT); + if (!seqpid) + APPEND_REALLOC(err, off, len, BLANK SEQPID); + if (!seqcmd) { + APPEND_REALLOC(err, off, len, BLANK); + APPEND_REALLOC(err, off, len, buf); + } + } + if (seqstt && n_seqstt < DATE_BEGIN) { + APPEND_REALLOC(err, off, len, " - invalid " SEQSTT " '"); + st = safe_text_nonull(transfer_data(seqstt)); + APPEND_REALLOC(err, off, len, st); + FREENULL(st); + APPEND_REALLOC(err, off, len, "'"); + } + if (seqpid && n_seqpid <= 0) { + APPEND_REALLOC(err, off, len, " - invalid " SEQPID " '"); + st = safe_text_nonull(transfer_data(seqpid)); + APPEND_REALLOC(err, off, len, st); + FREENULL(st); + APPEND_REALLOC(err, off, len, "'"); + } + APPEND_REALLOC(err, off, len, " - msg='"); + APPEND_REALLOC(err, off, len, msg); + APPEND_REALLOC(err, off, len, "'"); + LOGMSGBUF(LOG_EMERG, err); + FREENULL(err); + + /* Just process it normally - + * this will of course produce missing seq messages later + * if msg was supposed to have proper seq numbers */ + return ckdb_cmds[which].cmd_val; + } + + if ((i_code = find_transfer(trf_root, CODETRF))) { + code = transfer_data(i_code); + if (!(*code)) + code = NULL; + } + if (!code) { + if ((i_code = find_transfer(trf_root, BYTRF))) + code = transfer_data(i_code); + else + code = EMPTY; + } + + if (!startup_complete) + warndup = true; + else { + if (reload_queue_complete) + warndup = true; + else + warndup = false; + } + + dupall = update_seq(SEQ_ALL, n_seqall, n_seqstt, n_seqpid, SEQALL, + now, cd, code, warndup, msg); + dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, + buf, now, cd, code, warndup, msg); + + // zzz // if dupall != dupcmd report a problem ... inside update_seq() ? + + /* The norm is: neither is a dup, so reply with ok to process + * If only one is a dup then that's a corrupt message or a bug + * so simply try to process it as normal */ + if (!dupall || !dupcmd) + return ckdb_cmds[which].cmd_val; + + /* Reprocess AUTH messages anyway, since they shouldn't cause any errors + * but the reply may be needed the 2nd time */ + if (wantauth && (ckdb_cmds[which].cmd_val == CMD_AUTH || + ckdb_cmds[which].cmd_val == CMD_ADDRAUTH)) + return ckdb_cmds[which].cmd_val; + + /* It's a dup so ignore it */ + return CMD_DUPSEQ; +} + static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, char *buf, int *which_cmds, char *cmd, - char *id, tv_t *now, tv_t *cd) + char *id, tv_t *now, tv_t *cd, bool wantauth) { char reply[1024] = ""; TRANSFER *transfer; K_TREE_CTX ctx[1]; - K_ITEM *item = NULL; + K_ITEM *item = NULL, *seqall; char *cmdptr, *idptr, *next, *eq, *end, *was; - char *data = NULL, *tmp; + char *data = NULL, *st = NULL, *st2 = NULL; bool noid = false; size_t siz; @@ -1380,7 +2197,9 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } if (ckdb_cmds[*which_cmds].cmd_val == CMD_END) { - LOGERR("Listener received unknown command: '%s'", buf); + LOGERR("Listener received unknown command: '%s'", + st2 = safe_text(buf)); + FREENULL(st2); free(cmdptr); return CMD_REPLY; } @@ -1393,7 +2212,9 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } STRNCPYSIZ(id, cmdptr, ID_SIZ); - LOGERR("Listener received invalid (noid) message: '%s'", buf); + LOGERR("Listener received invalid (noid) message: '%s'", + st2 = safe_text(buf)); + FREENULL(st2); free(cmdptr); return CMD_REPLY; } @@ -1408,9 +2229,11 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, while (*next == ' ') next++; if (*next != JSON_BEGIN) { - LOGERR("JSON_BEGIN '%c' was: %.32s...", - JSON_BEGIN, tmp = safe_text(was)); - free(tmp); + LOGERR("JSON_BEGIN '%c' was:%.32s... buf=%.32s...", + JSON_BEGIN, st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); return CMD_REPLY; } @@ -1425,16 +2248,22 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, end++; } if (!*end) { - LOGERR("JSON name no trailing '%c' was: %.32s...", - JSON_STR, tmp = safe_text(was)); - free(tmp); + LOGERR("JSON name no trailing '%c' " + "was:%.32s... buf=%.32s...", + JSON_STR, st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); return CMD_REPLY; } if (next == end) { - LOGERR("JSON zero length name was: %.32s...", - tmp = safe_text(was)); - free(tmp); + LOGERR("JSON zero length name was:%.32s..." + " buf=%.32s...", + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); return CMD_REPLY; } @@ -1449,10 +2278,13 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, next++; // we have a name, now expect a value after it if (*next != JSON_VALUE) { - LOGERR("JSON_VALUE '%c' '%s' was: %.32s...", + LOGERR("JSON_VALUE '%c' '%s' was:%.32s..." + " buf=%.32s...", JSON_VALUE, transfer->name, - tmp = safe_text(was)); - free(tmp); + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1470,10 +2302,13 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, end++; } if (!*end) { - LOGERR("JSON '%s' value was: %.32s...", + LOGERR("JSON '%s' value was:%.32s..." + " buf=%.32s...", transfer->name, - tmp = safe_text(was)); - free(tmp); + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1485,8 +2320,11 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } else if (*next == JSON_ARRAY) { // Only merklehash for now if (strcmp(transfer->name, "merklehash")) { - LOGERR("JSON '%s' can't be an array", - transfer->name); + LOGERR("JSON '%s' can't be an array" + " buf=%.32s...", + transfer->name, + st2 = safe_text(buf)); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1502,10 +2340,12 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, end++; if (end < next+1) { LOGERR("JSON '%s' zero length value " - "was: %.32s...", + "was:%.32s... buf=%.32s...", transfer->name, - tmp = safe_text(was)); - free(tmp); + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1522,10 +2362,13 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, end++; } if (!*end) { - LOGERR("JSON '%s' value was: %.32s...", + LOGERR("JSON '%s' value was:%.32s..." + " buf=%.32s...", transfer->name, - tmp = safe_text(was)); - free(tmp); + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1534,10 +2377,12 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } if (next == end) { LOGERR("JSON '%s' zero length value " - "was: %.32s...", + "was:%.32s... buf=%.32s...", transfer->name, - tmp = safe_text(was)); - free(tmp); + st = safe_text(was), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); K_WLOCK(transfer_free); k_add_head(transfer_free, item); @@ -1555,6 +2400,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); k_add_head(*trf_store, item); + item = NULL; // find the separator then move to the next name next = end; @@ -1567,9 +2413,11 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } } if (*next != JSON_END) { - LOGERR("JSON_END '%c' was: %.32s...", - JSON_END, tmp = safe_text(next)); - free(tmp); + LOGERR("JSON_END '%c' was:%.32s... buf=%.32s...", + JSON_END, st = safe_text(next), + st2 = safe_text(buf)); + FREENULL(st); + FREENULL(st2); free(cmdptr); if (item) { K_WLOCK(transfer_free); @@ -1609,23 +2457,53 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, } K_WUNLOCK(transfer_free); } + + seqall = find_transfer(*trf_root, SEQALL); if (ckdb_cmds[*which_cmds].createdate) { - item = require_name(*trf_root, "createdate", 10, NULL, reply, sizeof(reply)); + item = require_name(*trf_root, CDTRF, 10, NULL, reply, sizeof(reply)); if (!item) { free(cmdptr); return CMD_REPLY; } DATA_TRANSFER(transfer, item); - txt_to_ctv("createdate", transfer->mvalue, cd, sizeof(*cd)); + txt_to_ctv(CDTRF, transfer->mvalue, cd, sizeof(*cd)); if (cd->tv_sec == 0) { - LOGERR("%s(): failed, %s has invalid createdate '%s'", + LOGERR("%s(): failed, %s has invalid "CDTRF" '%s'", __func__, cmdptr, transfer->mvalue); free(cmdptr); return CMD_REPLY; } if (confirm_check_createdate) check_createdate_ccl(cmd, cd); + if (seqall) { + enum cmd_values ret; + ret = process_seq(seqall, *which_cmds, cd, now, buf, + *trf_root, wantauth); + free(cmdptr); + return ret; + } else { + /* It's OK to load old data from the CCLs + * but socket data must contain SEQALL ... + * even in manually generated data */ + if (startup_complete) { + LOGEMERG("%s(): *** ckpool needs upgrading - " + "missing "SEQALL" from '%s' ckpool " + "data in '%s'", + __func__, cmdptr, + st = safe_text_nonull(buf)); + FREENULL(st); + } + } + } else { + // Bug somewhere or createdate flag missing + if (seqall) { + LOGWARNING("%s(): msg '%s' shouldn't contain "SEQALL + " in '%s'", + __func__, cmdptr, + st = safe_text_nonull(buf)); + FREENULL(st); + } } free(cmdptr); return ckdb_cmds[*which_cmds].cmd_val; @@ -2666,8 +3544,14 @@ static void *socketer(__maybe_unused void *arg) LOGDEBUG("Duplicate '%s' message received", duptype); } else { LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, + &which_cmds, cmd, id, &now, + &cd, true); switch (cmdnum) { + case CMD_DUPSEQ: + snprintf(reply, sizeof(reply), "%s.%ld.dup.", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; case CMD_REPLY: snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); send_unix_msg(sockd, reply); @@ -3004,8 +3888,12 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) } LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, + cmd, id, &now, &cd, false); switch (cmdnum) { + // Don't ever attempt to double process reload data + case CMD_DUPSEQ: + break; // Ignore case CMD_REPLY: break; @@ -3458,6 +4346,7 @@ static void *listener(void *arg) LOGWARNING("reload queue completed %.0fm %.3fs", min, sec); // Used as the flag to display the message once wq_stt.tv_sec = 0L; + reload_queue_complete = true; } @@ -4286,6 +5175,7 @@ int main(int argc, char **argv) ckp.main.processname = strdup("main"); cklock_init(&last_lock); + cklock_init(&seq_lock); cklock_init(&process_pplns_lock); if (confirm_sharesummary) { diff --git a/src/ckdb.h b/src/ckdb.h index 7edb433e..80b606bf 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.066" +#define CKDB_VERSION DB_VERSION"-1.070" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -102,8 +102,10 @@ extern int switch_state; #define SWITCH_STATE_AUTHWORKERS 1 #define SWITCH_STATE_ALL 666666 +#define BLANK " " extern char *EMPTY; +// Field patterns extern const char *userpatt; extern const char *mailpatt; extern const char *idpatt; @@ -237,7 +239,9 @@ enum data_type { TYPE_CTV, TYPE_FTV, TYPE_BLOB, - TYPE_DOUBLE + TYPE_DOUBLE, + TYPE_T, + TYPE_BT }; // BLOB does what PTR needs @@ -336,12 +340,27 @@ extern char *btc_server; extern char *btc_auth; extern int btc_timeout; +#define EDDB "expirydate" +#define CDDB "createdate" +#define CDTRF CDDB +#define BYDB "createby" +#define BYTRF BYDB +#define CODEDB "createcode" +#define CODETRF CODEDB +#define INETDB "createinet" +#define INETTRF INETDB +#define MDDB "modifydate" +#define MBYDB "modifyby" +#define MCODEDB "modifycode" +#define MINETDB "modifyinet" + extern char *by_default; extern char *inet_default; extern char *id_default; enum cmd_values { CMD_UNSET, + CMD_DUPSEQ, // Ignore, we've already got it CMD_REPLY, // Means something was wrong - send back reply CMD_TERMINATE, CMD_PING, @@ -426,12 +445,16 @@ enum cmd_values { // The size strdup will allocate multiples of #define MEMBASE 4 +#define LIST_MEM_ADD_SIZ(_list, _siz) do { \ + if (_siz % MEMBASE) \ + _siz += MEMBASE - (_siz % MEMBASE); \ + _list->ram += (int)_siz; \ + } while (0) + #define LIST_MEM_ADD(_list, _fld) do { \ size_t __siz; \ __siz = strlen(_fld) + 1; \ - if (__siz % MEMBASE) \ - __siz += MEMBASE - (__siz % MEMBASE); \ - _list->ram += (int)__siz; \ + LIST_MEM_ADD_SIZ(_list, __siz); \ } while (0) #define LIST_MEM_SUB(_list, _fld) do { \ @@ -470,7 +493,7 @@ enum cmd_values { #define SET_MODIFYCODE(_list, _fld, _val) SET_POINTER(_list, _fld, _val, EMPTY) #define SET_MODIFYINET(_list, _fld, _val) SET_POINTER(_list, _fld, _val, inet_default) -#define HISTORYDATECONTROL ",createdate,createby,createcode,createinet,expirydate" +#define HISTORYDATECONTROL ","CDDB","BYDB","CODEDB","INETDB","EDDB #define HISTORYDATECOUNT 5 #define HISTORYDATECONTROLFIELDS \ tv_t createdate; \ @@ -497,17 +520,17 @@ enum cmd_values { size_t __siz = sizeof(__reply); \ K_ITEM *__item; \ TRANSFER *__transfer; \ - __item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createby, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createcode, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createinet, __transfer->mvalue); \ @@ -515,8 +538,8 @@ enum cmd_values { } \ } while (0) -#define MODIFYDATECONTROL ",createdate,createby,createcode,createinet" \ - ",modifydate,modifyby,modifycode,modifyinet" +#define MODIFYDATECONTROL ","CDDB","BYDB","CODEDB","INETDB \ + ","MDDB","MBYDB","MCODEDB","MINETDB #define MODIFYDATECOUNT 8 #define MODIFYUPDATECOUNT 4 #define MODIFYDATECONTROLFIELDS \ @@ -595,17 +618,17 @@ enum cmd_values { size_t __siz = sizeof(__reply); \ K_ITEM *__item; \ TRANSFER *__transfer; \ - __item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \ @@ -614,7 +637,7 @@ enum cmd_values { } \ } while (0) -#define SIMPLEDATECONTROL ",createdate,createby,createcode,createinet" +#define SIMPLEDATECONTROL ","CDDB","BYDB","CODEDB","INETDB #define SIMPLEDATECOUNT 4 #define SIMPLEDATECONTROLFIELDS \ tv_t createdate; \ @@ -645,17 +668,17 @@ enum cmd_values { size_t __siz = sizeof(__reply); \ K_ITEM *__item; \ TRANSFER *__transfer; \ - __item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, BYTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createby, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, CODETRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createcode, __transfer->mvalue); \ } \ - __item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \ + __item = optional_name(_root, INETTRF, 1, NULL, __reply, __siz); \ if (__item) { \ DATA_TRANSFER(__transfer, __item); \ STRNCPY(_row->createinet, __transfer->mvalue); \ @@ -751,6 +774,172 @@ extern K_ITEM shareerrors_secondaryuserid; extern tv_t missing_secuser_min; extern tv_t missing_secuser_max; +/* The aim of the sequence data is to identify and ignore duplicate records - + * which should usually only be during a reload - + * and to identify missing records when their sequence numbers are skipped + * + * If at any time there is a problem with a sequence number, an error is + * reported, then the record being processed is simply processed normally by + * the cmd_func() it was intended for - which may or may not produce other + * processing error messages depending on the validity of the rest of the data + * + * Field explanation: + * Normal sequence processing would be that we first get seq N, then N+1 + * then N+2 ... etc. + * seqbase,minseq,maxseq are all set to the first N for the given sequence + * maxseq is incremented to the current maximum N+x each time a record is + * processed + * If we get up to N+3 but it is unexpectedly followed by N+5, that means N+4 + * is currently missing - so it is flagged as missing by time=0 and the + * missing counters are incremented - maxseq will now be N+5 + * Once we reach N+size we need to discard N and use it as N+size + * and increment seqbase N + * When we discard the oldest item due to needing more, if that oldest + * item was missing, it is now considered lost and the lost counters + * are incremented (and missing counters decremented) + * If we receive an item N+x where N+x-maxseq>highlimit we reject it as high + * and increment the high counters - this avoids creating a high bad sequence + * number and flagging any missing sequence numbers, most likely incorrectly, + * as lost in the range N to N+x-size + * If we receive an item N-x i.e. less than seqbase N, then: + * If maxseq-N = size then N-x is considered stale and the stale counters + * are incremented since there's no unused items below N available + * this shouldn't normally happen after we've received size seq numbers + * Else maxseq-N is less than size, that means there are unused items below N + * This will usually only be with a new sequence and the first seq was out + * of order, before lower sequence numbers, thus maxseq should be close to + * seqbase N and no where near N+size and there should be x unused below N + * If there are x unused items below N then we can move seqbase down to N-x + * after we flag all N-1,N-2,N-3..N-(x-1) as missing + * Else there aren't enough unused items below N, then N-x is considered + * stale and the stale counters are incremented + * + * timelimit is an early limit to flag missing sequence numbers as 'transient' + * Normally, if a missing item at N is later reused, it will be discarded and + * reported as lost + * After the reload queue is complete, timelimit reports missing sequence + * numbers early, as transient, if they have been missing for 'timelimit' but + * not already lost + * missing isn't decremented, since it is still treated as missing + * This is needed in case a size limit on a sequence means it may take a long + * time before it reports messages as lost - this also means that after the + * reload queue has cleared after ckdb startup, it will report the transient + * missing sequence numbers shortly after the timelimit + * When they are later found or lost they will again be reported, this time as + * found or lost */ + +// ckpool sequence numbers +#define SEQALL "seqall" +#define SEQSTT "seqstart" +#define SEQPID "seqpid" +#define SEQPRE "seq" + +/* Value to use for SEQSTT when manually sending messages, + * to make ckdb not check the seq numbers + * The message must have a SEQALL but the value is ignored + * SEQPID and SEQcmd don't need to exist and are ignored */ +#define SEQSTTIGN 42 + +enum seq_num { + SEQ_NONE = -1, // Invalid to have ckpool seq numbers + SEQ_ALL, + SEQ_BLOCK, + SEQ_SHARES, + SEQ_WORKINFO, + SEQ_AGEWORKINFO, + SEQ_AUTH, + SEQ_ADDRAUTH, + SEQ_HEARTBEAT, + SEQ_SHAREERRORS, + SEQ_WORKERSTAT, + SEQ_POOLSTATS, + SEQ_MAX +}; + +// Ensure size is a (multiple of 8)-1 +#define SEQ_CODE 15 + +typedef struct seqitem { + tv_t cd; // sec:0=missing, usec:0=miss !0=trans + tv_t time; + char code[SEQ_CODE+1]; +} SEQITEM; + +typedef struct seqdata { + size_t size; // item count - MUST be a power of 2 + uint64_t highlimit; + int timelimit; + uint64_t minseq; + uint64_t maxseq; + uint64_t seqbase; + uint64_t missing; + uint64_t trans; + uint64_t lost; + uint64_t stale; + uint64_t high; + uint64_t ok; + tv_t firsttime; + tv_t lasttime; + tv_t firstcd; + tv_t lastcd; + SEQITEM *item; +} SEQDATA; + +// SEQSET +typedef struct seqset { + uint64_t seqstt; // 0 if unused/unallocated + uint64_t seqpid; + uint64_t missing; // total from seqdata + uint64_t trans; // total from seqdata + uint64_t lost; // total from seqdata + uint64_t stale; // total from seqdata + uint64_t high; // total from seqdata + uint64_t ok; // total from seqdata + SEQDATA seqdata[SEQ_MAX]; +} SEQSET; + +/* All *_SIZ must be >= 64 and a power of 2 + * but if any aren't, the code checks this and quit()s + * the first time it processes a record with sequences */ + +// SEQALL and SHARES */ +#define SEQ_LARGE_LIM 64 +#define SEQ_LARGE_SIZ (65536*SEQ_LARGE_LIM) +// WORKERSTATS, AUTH and ADDRAUTH +#define SEQ_MEDIUM_LIM 128 +#define SEQ_MEDIUM_SIZ 65536 +// The rest +#define SEQ_SMALL_LIM 128 +#define SEQ_SMALL_SIZ 16384 + +#define ALLOC_SEQSET 1 +#define LIMIT_SEQSET 16 +#define INIT_SEQSET(_item) INIT_GENERIC(_item, seqset) +#define DATA_SEQSET(_var, _item) DATA_GENERIC(_var, _item, seqset, true) + +extern K_LIST *seqset_free; +// each new seqset is added to the head, so head is the current one +extern K_STORE *seqset_store; + +// Initialised when seqset_free is allocated +extern char *seqnam[SEQ_MAX]; + +// SEQTRANS +typedef struct seqtrans { + int seq; + uint64_t seqnum; + SEQITEM item; +} SEQTRANS; + +// The stores are created and freed each time required +extern K_LIST *seqtrans_free; + +#define ALLOC_SEQTRANS 1024 +#define LIMIT_SEQTRANS 0 +#define CULL_SEQTRANS 65536 +#define INIT_SEQTRANS(_item) INIT_GENERIC(_item, seqtrans) +#define DATA_SEQTRANS(_var, _item) DATA_GENERIC(_var, _item, seqtrans, true) + // USERS typedef struct users { int64_t userid; @@ -1701,8 +1890,11 @@ extern void free_optioncontrol_data(K_ITEM *item); extern void free_markersummary_data(K_ITEM *item); extern void free_workmarkers_data(K_ITEM *item); extern void free_marks_data(K_ITEM *item); +extern void free_seqset_data(K_ITEM *item); -extern char *safe_text(char *txt); +#define safe_text(_txt) _safe_text(_txt, true) +#define safe_text_nonull(_txt) _safe_text(_txt, false) +extern char *_safe_text(char *txt, bool shownull); extern void username_trim(USERS *users); extern bool like_address(char *username); @@ -1738,6 +1930,9 @@ extern char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, #define btv_to_buf(_data, _buf, _siz) _btv_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) //#define blob_to_buf(_data, _buf, _siz) _blob_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) #define double_to_buf(_data, _buf, _siz) _double_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define t_to_buf(_data, _buf, _siz) _t_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define bt_to_buf(_data, _buf, _siz) _bt_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) +#define btu64_to_buf(_data, _buf, _siz) _btu64_to_buf(_data, _buf, _siz, WHERE_FFL_HERE) extern char *_str_to_buf(char data[], char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_bigint_to_buf(int64_t data, char *buf, size_t siz, WHERE_FFL_ARGS); @@ -1749,12 +1944,17 @@ extern char *_ctv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_ftv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); // Convert tv to seconds (ignore uS) extern char *_tvs_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); -// Convert tv to (brief) DD HH:MM:SS +// Convert tv to (brief) DD/HH:MM:SS extern char *_btv_to_buf(tv_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); /* unused yet extern char *_blob_to_buf(char *data, char *buf, size_t siz, WHERE_FFL_ARGS); */ extern char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS); +// Convert seconds (only) time to date +extern char *_t_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); +// Convert seconds (only) time to (brief) M-DD/HH:MM:SS +extern char *_bt_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); +extern char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS); extern void dsp_transfer(K_ITEM *item, FILE *stream); @@ -2131,6 +2331,7 @@ struct CMDS { bool createdate; // requires a createdate char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, char *, tv_t *, K_TREE *); + enum seq_num seq; char *access; }; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 35a00ddd..6187d467 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -945,12 +945,12 @@ redo: APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), - "firstcreatedate:%d=%ld%c", rows, + "first"CDTRF":%d=%ld%c", rows, first_cd.tv_sec, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), - "createdate:%d=%ld%c", rows, + CDTRF":%d=%ld%c", rows, blocks->createdate.tv_sec, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); @@ -1077,8 +1077,8 @@ redo: snprintf(tmp, sizeof(tmp), "rows=%d%cflds=%s%c", rows, FLDSEP, - "seq,height,blockhash,nonce,reward,workername,firstcreatedate," - "createdate,status,statsconf,diffacc,diffinv,shareacc," + "seq,height,blockhash,nonce,reward,workername,first"CDTRF"," + CDTRF",status,statsconf,diffacc,diffinv,shareacc," "shareinv,elapsed,netdiff,diffratio,cdf,luck", FLDSEP); APPEND_REALLOC(buf, off, len, tmp); @@ -2683,7 +2683,7 @@ static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id, snprintf(tmp, sizeof(tmp), "%s{\"workername\":\"%s\"," "\"difficultydefault\":%d," - "\"createdate\":\"%ld,%ld\"}", + "\""CDTRF"\":\"%ld,%ld\"}", first ? "" : ",", heartbeatqueue->workername, heartbeatqueue->difficultydefault, @@ -5198,6 +5198,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(transfer, 0, 0); USEINFO(heartbeatqueue, 1, 0); USEINFO(logqueue, 1, 0); + USEINFO(seqset, 1, 0); + USEINFO(seqtrans, 0, 0); snprintf(tmp, sizeof(tmp), "totalram=%"PRIu64"%c", tot, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); @@ -5908,50 +5910,50 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, * For the heartbeat pulse reply it has no '={}' */ -// cmd_val cmd_str noid createdate func access +// cmd_val cmd_str noid createdate func seq access struct CMDS ckdb_cmds[] = { - { CMD_TERMINATE, "terminate", true, false, NULL, ACCESS_SYSTEM }, - { CMD_PING, "ping", true, false, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, - { CMD_VERSION, "version", true, false, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, - { CMD_LOGLEVEL, "loglevel", true, false, NULL, ACCESS_SYSTEM }, - { CMD_FLUSH, "flush", true, false, NULL, ACCESS_SYSTEM }, - { CMD_SHARELOG, STR_WORKINFO, false, true, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_SHARES, false, true, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_SHAREERRORS, false, true, cmd_sharelog, ACCESS_POOL }, - { CMD_SHARELOG, STR_AGEWORKINFO, false, true, cmd_sharelog, ACCESS_POOL }, - { CMD_AUTH, "authorise", false, true, cmd_auth, ACCESS_POOL }, - { CMD_ADDRAUTH, "addrauth", false, true, cmd_addrauth, ACCESS_POOL }, - { CMD_HEARTBEAT,"heartbeat", false, true, cmd_heartbeat, ACCESS_POOL }, - { CMD_ADDUSER, "adduser", false, false, cmd_adduser, ACCESS_WEB }, - { CMD_NEWPASS, "newpass", false, false, cmd_newpass, ACCESS_WEB }, - { CMD_CHKPASS, "chkpass", false, false, cmd_chkpass, ACCESS_WEB }, - { CMD_USERSET, "usersettings", false, false, cmd_userset, ACCESS_WEB }, - { CMD_WORKERSET,"workerset", false, false, cmd_workerset, ACCESS_WEB }, - { CMD_POOLSTAT, "poolstats", false, true, cmd_poolstats, ACCESS_POOL }, - { CMD_USERSTAT, "userstats", false, true, cmd_userstats, ACCESS_POOL }, - { CMD_WORKERSTAT,"workerstats", false, true, cmd_workerstats,ACCESS_POOL }, - { CMD_BLOCK, "block", false, true, cmd_blocks, ACCESS_POOL }, - { CMD_BLOCKLIST,"blocklist", false, false, cmd_blocklist, ACCESS_WEB }, - { CMD_BLOCKSTATUS,"blockstatus",false, false, cmd_blockstatus,ACCESS_SYSTEM }, - { CMD_NEWID, "newid", false, false, cmd_newid, ACCESS_SYSTEM }, - { CMD_PAYMENTS, "payments", false, false, cmd_payments, ACCESS_WEB }, - { CMD_WORKERS, "workers", false, false, cmd_workers, ACCESS_WEB }, - { CMD_ALLUSERS, "allusers", false, false, cmd_allusers, ACCESS_WEB }, - { CMD_HOMEPAGE, "homepage", false, false, cmd_homepage, ACCESS_WEB }, - { CMD_GETATTS, "getatts", false, false, cmd_getatts, ACCESS_WEB }, - { CMD_SETATTS, "setatts", false, false, cmd_setatts, ACCESS_WEB }, - { CMD_EXPATTS, "expatts", false, false, cmd_expatts, ACCESS_WEB }, - { CMD_GETOPTS, "getopts", false, false, cmd_getopts, ACCESS_WEB }, - { CMD_SETOPTS, "setopts", false, false, cmd_setopts, ACCESS_WEB }, - { CMD_DSP, "dsp", false, false, cmd_dsp, ACCESS_SYSTEM }, - { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_PPLNS2, "pplns2", false, false, cmd_pplns2, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_PAYOUTS, "payouts", false, false, cmd_payouts, ACCESS_SYSTEM }, - { CMD_MPAYOUTS, "mpayouts", false, false, cmd_mpayouts, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_SHIFTS, "shifts", false, false, cmd_shifts, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_MARKS, "marks", false, false, cmd_marks, ACCESS_SYSTEM }, - { CMD_PSHIFT, "pshift", false, false, cmd_pshift, ACCESS_SYSTEM ACCESS_WEB }, - { CMD_END, NULL, false, false, NULL, NULL } + { CMD_TERMINATE, "terminate", true, false, NULL, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_PING, "ping", true, false, NULL, SEQ_NONE, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, + { CMD_VERSION, "version", true, false, NULL, SEQ_NONE, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, + { CMD_LOGLEVEL, "loglevel", true, false, NULL, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_FLUSH, "flush", true, false, NULL, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_SHARELOG, STR_WORKINFO, false, true, cmd_sharelog, SEQ_WORKINFO, ACCESS_POOL }, + { CMD_SHARELOG, STR_SHARES, false, true, cmd_sharelog, SEQ_SHARES, ACCESS_POOL }, + { CMD_SHARELOG, STR_SHAREERRORS,false, true, cmd_sharelog, SEQ_SHAREERRORS,ACCESS_POOL }, + { CMD_SHARELOG, STR_AGEWORKINFO,false, true, cmd_sharelog, SEQ_AGEWORKINFO,ACCESS_POOL }, + { CMD_AUTH, "authorise", false, true, cmd_auth, SEQ_AUTH, ACCESS_POOL }, + { CMD_ADDRAUTH, "addrauth", false, true, cmd_addrauth, SEQ_ADDRAUTH, ACCESS_POOL }, + { CMD_HEARTBEAT,"heartbeat", false, true, cmd_heartbeat, SEQ_HEARTBEAT, ACCESS_POOL }, + { CMD_ADDUSER, "adduser", false, false, cmd_adduser, SEQ_NONE, ACCESS_WEB }, + { CMD_NEWPASS, "newpass", false, false, cmd_newpass, SEQ_NONE, ACCESS_WEB }, + { CMD_CHKPASS, "chkpass", false, false, cmd_chkpass, SEQ_NONE, ACCESS_WEB }, + { CMD_USERSET, "usersettings", false, false, cmd_userset, SEQ_NONE, ACCESS_WEB }, + { CMD_WORKERSET,"workerset", false, false, cmd_workerset, SEQ_NONE, ACCESS_WEB }, + { CMD_POOLSTAT, "poolstats", false, true, cmd_poolstats, SEQ_POOLSTATS, ACCESS_POOL }, + { CMD_USERSTAT, "userstats", false, true, cmd_userstats, SEQ_NONE, ACCESS_POOL }, + { CMD_WORKERSTAT,"workerstats", false, true, cmd_workerstats,SEQ_WORKERSTAT, ACCESS_POOL }, + { CMD_BLOCK, "block", false, true, cmd_blocks, SEQ_BLOCK, ACCESS_POOL }, + { CMD_BLOCKLIST,"blocklist", false, false, cmd_blocklist, SEQ_NONE, ACCESS_WEB }, + { CMD_BLOCKSTATUS,"blockstatus",false, false, cmd_blockstatus,SEQ_NONE, ACCESS_SYSTEM }, + { CMD_NEWID, "newid", false, false, cmd_newid, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_PAYMENTS, "payments", false, false, cmd_payments, SEQ_NONE, ACCESS_WEB }, + { CMD_WORKERS, "workers", false, false, cmd_workers, SEQ_NONE, ACCESS_WEB }, + { CMD_ALLUSERS, "allusers", false, false, cmd_allusers, SEQ_NONE, ACCESS_WEB }, + { CMD_HOMEPAGE, "homepage", false, false, cmd_homepage, SEQ_NONE, ACCESS_WEB }, + { CMD_GETATTS, "getatts", false, false, cmd_getatts, SEQ_NONE, ACCESS_WEB }, + { CMD_SETATTS, "setatts", false, false, cmd_setatts, SEQ_NONE, ACCESS_WEB }, + { CMD_EXPATTS, "expatts", false, false, cmd_expatts, SEQ_NONE, ACCESS_WEB }, + { CMD_GETOPTS, "getopts", false, false, cmd_getopts, SEQ_NONE, ACCESS_WEB }, + { CMD_SETOPTS, "setopts", false, false, cmd_setopts, SEQ_NONE, ACCESS_WEB }, + { CMD_DSP, "dsp", false, false, cmd_dsp, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_STATS, "stats", true, false, cmd_stats, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_PPLNS, "pplns", false, false, cmd_pplns, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_PPLNS2, "pplns2", false, false, cmd_pplns2, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_PAYOUTS, "payouts", false, false, cmd_payouts, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_MPAYOUTS, "mpayouts", false, false, cmd_mpayouts, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_SHIFTS, "shifts", false, false, cmd_shifts, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_MARKS, "marks", false, false, cmd_marks, SEQ_NONE, ACCESS_SYSTEM }, + { CMD_PSHIFT, "pshift", false, false, cmd_pshift, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_END, NULL, false, false, NULL, SEQ_NONE, NULL } }; diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 8f5539d1..deee3f5d 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -87,8 +87,21 @@ void free_marks_data(K_ITEM *item) FREENULL(marks->extra); } +void free_seqset_data(K_ITEM *item) +{ + SEQSET *seqset; + int i; + + DATA_SEQSET(seqset, item); + if (seqset->seqstt) { + for (i = 0; i < SEQ_MAX; i++) + FREENULL(seqset->seqdata[i].item); + seqset->seqstt = 0; + } +} + // Clear text printable version of txt up to first '\0' -char *safe_text(char *txt) +char *_safe_text(char *txt, bool shownull) { unsigned char *ptr = (unsigned char *)txt; size_t len; @@ -114,7 +127,11 @@ char *safe_text(char *txt) buf += 4; } } - strcpy(buf, "0x00"); + if (shownull) + strcpy(buf, "0x00"); + else + *buf = '\0'; + return ret; } @@ -375,6 +392,9 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_ break; case TYPE_TV: case TYPE_TVS: + case TYPE_BTV: + case TYPE_T: + case TYPE_BT: siz = DATE_BUFSIZ; break; case TYPE_CTV: @@ -441,6 +461,25 @@ char *_data_to_buf(enum data_type typ, void *data, char *buf, size_t siz, WHERE_ case TYPE_DOUBLE: snprintf(buf, siz, "%f", *((double *)data)); break; + case TYPE_T: + gmtime_r((time_t *)data, &tm); + snprintf(buf, siz, "%d-%02d-%02d %02d:%02d:%02d+00", + tm.tm_year + 1900, + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec); + break; + case TYPE_BT: + gmtime_r((time_t *)data, &tm); + snprintf(buf, siz, "%d-%02d %02d:%02d:%02d", + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec); + break; } return buf; @@ -502,6 +541,22 @@ char *_double_to_buf(double data, char *buf, size_t siz, WHERE_FFL_ARGS) return _data_to_buf(TYPE_DOUBLE, (void *)(&data), buf, siz, WHERE_FFL_PASS); } +char *_t_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + return _data_to_buf(TYPE_T, (void *)data, buf, siz, WHERE_FFL_PASS); +} + +char *_bt_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + return _data_to_buf(TYPE_BT, (void *)data, buf, siz, WHERE_FFL_PASS); +} + +char *_btu64_to_buf(uint64_t *data, char *buf, size_t siz, WHERE_FFL_ARGS) +{ + time_t t = *data; + return _data_to_buf(TYPE_BT, (void *)&t, buf, siz, WHERE_FFL_PASS); +} + // For mutiple variable function calls that need the data char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS) { diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index bb0a9011..37b9974c 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -37,26 +37,26 @@ char *pqerrmsg(PGconn *conn) // HISTORY FIELDS #define HISTORYDATEFLDS(_res, _row, _data, _ok) do { \ char *_fld; \ - PQ_GET_FLD(_res, _row, "createdate", _fld, _ok); \ + PQ_GET_FLD(_res, _row, CDDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_TV("createdate", _fld, (_data)->createdate); \ - PQ_GET_FLD(_res, _row, "createby", _fld, _ok); \ + TXT_TO_TV(CDDB, _fld, (_data)->createdate); \ + PQ_GET_FLD(_res, _row, BYDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createby", _fld, (_data)->createby); \ - PQ_GET_FLD(_res, _row, "createcode", _fld, _ok); \ + TXT_TO_STR(BYDB, _fld, (_data)->createby); \ + PQ_GET_FLD(_res, _row, CODEDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createcode", _fld, (_data)->createcode); \ - PQ_GET_FLD(_res, _row, "createinet", _fld, _ok); \ + TXT_TO_STR(CODEDB, _fld, (_data)->createcode); \ + PQ_GET_FLD(_res, _row, INETDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createinet", _fld, (_data)->createinet); \ - PQ_GET_FLD(_res, _row, "expirydate", _fld, _ok); \ + TXT_TO_STR(INETDB, _fld, (_data)->createinet); \ + PQ_GET_FLD(_res, _row, EDDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_TV("expirydate", _fld, (_data)->expirydate); \ + TXT_TO_TV(EDDB, _fld, (_data)->expirydate); \ } while (0) #define HISTORYDATEPARAMS(_params, _his_pos, _row) do { \ @@ -70,35 +70,35 @@ char *pqerrmsg(PGconn *conn) // MODIFY FIELDS #define MODIFYDATEFLDPOINTERS(_list, _res, _row, _data, _ok) do { \ char *_fld; \ - PQ_GET_FLD(_res, _row, "createdate", _fld, _ok); \ + PQ_GET_FLD(_res, _row, CDDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_TV("createdate", _fld, (_data)->createdate); \ - PQ_GET_FLD(_res, _row, "createby", _fld, _ok); \ + TXT_TO_TV(CDDB, _fld, (_data)->createdate); \ + PQ_GET_FLD(_res, _row, BYDB, _fld, _ok); \ if (!_ok) \ break; \ SET_CREATEBY(_list, (_data)->createby, _fld); \ - PQ_GET_FLD(_res, _row, "createcode", _fld, _ok); \ + PQ_GET_FLD(_res, _row, CODEDB, _fld, _ok); \ if (!_ok) \ break; \ SET_CREATECODE(_list, (_data)->createcode, _fld); \ - PQ_GET_FLD(_res, _row, "createinet", _fld, _ok); \ + PQ_GET_FLD(_res, _row, INETDB, _fld, _ok); \ if (!_ok) \ break; \ SET_CREATEINET(_list, (_data)->createinet, _fld); \ - PQ_GET_FLD(_res, _row, "modifydate", _fld, _ok); \ + PQ_GET_FLD(_res, _row, MDDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_TV("modifydate", _fld, (_data)->modifydate); \ - PQ_GET_FLD(_res, _row, "modifyby", _fld, _ok); \ + TXT_TO_TV(MDDB, _fld, (_data)->modifydate); \ + PQ_GET_FLD(_res, _row, MBYDB, _fld, _ok); \ if (!_ok) \ break; \ SET_MODIFYBY(_list, (_data)->modifyby, _fld); \ - PQ_GET_FLD(_res, _row, "modifycode", _fld, _ok); \ + PQ_GET_FLD(_res, _row, MCODEDB, _fld, _ok); \ if (!_ok) \ break; \ SET_MODIFYCODE(_list, (_data)->modifycode, _fld); \ - PQ_GET_FLD(_res, _row, "modifyinet", _fld, _ok); \ + PQ_GET_FLD(_res, _row, MINETDB, _fld, _ok); \ if (!_ok) \ break; \ SET_MODIFYINET(_list, (_data)->modifyinet, _fld); \ @@ -125,22 +125,22 @@ char *pqerrmsg(PGconn *conn) // SIMPLE FIELDS #define SIMPLEDATEFLDS(_res, _row, _data, _ok) do { \ char *_fld; \ - PQ_GET_FLD(_res, _row, "createdate", _fld, _ok); \ + PQ_GET_FLD(_res, _row, CDDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_TV("createdate", _fld, (_data)->createdate); \ - PQ_GET_FLD(_res, _row, "createby", _fld, _ok); \ + TXT_TO_TV(CDDB, _fld, (_data)->createdate); \ + PQ_GET_FLD(_res, _row, BYDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createby", _fld, (_data)->createby); \ - PQ_GET_FLD(_res, _row, "createcode", _fld, _ok); \ + TXT_TO_STR(BYDB, _fld, (_data)->createby); \ + PQ_GET_FLD(_res, _row, CODEDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createcode", _fld, (_data)->createcode); \ - PQ_GET_FLD(_res, _row, "createinet", _fld, _ok); \ + TXT_TO_STR(CODEDB, _fld, (_data)->createcode); \ + PQ_GET_FLD(_res, _row, INETDB, _fld, _ok); \ if (!_ok) \ break; \ - TXT_TO_STR("createinet", _fld, (_data)->createinet); \ + TXT_TO_STR(INETDB, _fld, (_data)->createinet); \ } while (0) #define SIMPLEDATEPARAMS(_params, _his_pos, _row) do { \ @@ -342,8 +342,8 @@ int64_t nextid(PGconn *conn, char *idname, int64_t increment, lastid += increment; snprintf(qry, sizeof(qry), "update idcontrol set " - "lastid=$1, modifydate=$2, modifyby=$3, " - "modifycode=$4, modifyinet=$5 " + "lastid=$1,"MDDB"=$2,"MBYDB"=$3," + MCODEDB"=$4,"MINETDB"=$5 " "where idname='%s'", idname); @@ -421,7 +421,7 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash, HISTORYDATEINIT(row, cd, by, code, inet); HISTORYDATETRANSFER(trf_root, row); - upd = "update users set expirydate=$1 where userid=$2 and expirydate=$3"; + upd = "update users set "EDDB"=$1 where userid=$2 and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(row->userid, NULL, 0); @@ -472,7 +472,7 @@ bool users_update(PGconn *conn, K_ITEM *u_item, char *oldhash, "userid,username,$3,$4,joineddate," "$5,secondaryuserid,$6," "$7,$8,$9,$10,$11 from users where " - "userid=$1 and expirydate=$2"; + "userid=$1 and "EDDB"=$2"; res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); @@ -795,8 +795,8 @@ bool useratts_item_add(PGconn *conn, K_ITEM *ua_item, tv_t *cd, bool begun) } if (old_item) { - upd = "update useratts set expirydate=$1 where userid=$2 and " - "attname=$3 and expirydate=$4"; + upd = "update useratts set "EDDB"=$1 where userid=$2 and " + "attname=$3 and "EDDB"=$4"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(old_useratts->userid, NULL, 0); @@ -985,8 +985,8 @@ bool useratts_item_expire(PGconn *conn, K_ITEM *ua_item, tv_t *cd) conned = true; } - upd = "update useratts set expirydate=$1 where userid=$2 and " - "attname=$3 and expirydate=$4"; + upd = "update useratts set "EDDB"=$1 where userid=$2 and " + "attname=$3 and "EDDB"=$4"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(useratts->userid, NULL, 0); @@ -1327,7 +1327,7 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, HISTORYDATEINIT(row, cd, by, code, inet); HISTORYDATETRANSFER(trf_root, row); - upd = "update workers set expirydate=$1 where workerid=$2 and expirydate=$3"; + upd = "update workers set "EDDB"=$1 where workerid=$2 and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(row->workerid, NULL, 0); @@ -1551,8 +1551,8 @@ bool paymentaddresses_set(PGconn *conn, int64_t userid, K_STORE *pa_store, count = matches = 0; APPEND_REALLOC_INIT(upd, off, len); APPEND_REALLOC(upd, off, len, - "update paymentaddresses set expirydate=$1 where " - "userid=$2 and expirydate=$3 and payaddress in ("); + "update paymentaddresses set "EDDB"=$1 where " + "userid=$2 and "EDDB"=$3 and payaddress in ("); par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(userid, NULL, 0); @@ -1905,8 +1905,8 @@ bool payments_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item, DATA_PAYMENTS(oldp, *old_p_item); - upd = "update payments set expirydate=$1 where paymentid=$2" - " and expirydate=$3"; + upd = "update payments set "EDDB"=$1 where paymentid=$2" + " and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(oldp->paymentid, NULL, 0); @@ -2220,9 +2220,9 @@ K_ITEM *optioncontrol_item_add(PGconn *conn, K_ITEM *oc_item, tv_t *cd, bool beg if (old_item) { upd = "update optioncontrol " - "set expirydate=$1 where optionname=$2 and " + "set "EDDB"=$1 where optionname=$2 and " "activationdate=$3 and activationheight=$4 and " - "expirydate=$5"; + EDDB"=$5"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); @@ -2367,7 +2367,7 @@ bool optioncontrol_fill(PGconn *conn) sel = "select " "optionname,optionvalue,activationdate,activationheight" HISTORYDATECONTROL - " from optioncontrol where expirydate=$1"; + " from optioncontrol where "EDDB"=$1"; par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); PARCHK(par, params); @@ -2635,7 +2635,7 @@ bool workinfo_fill(PGconn *conn) "workinfoid,poolinstance,merklehash,prevhash," "coinbase1,coinbase2,version,bits,ntime,reward" HISTORYDATECONTROL - " from workinfo where expirydate=$1 and" + " from workinfo where "EDDB"=$1 and" " ((workinfoid>=$2 and workinfoid<=$3)"); // If we aren't loading the full range, ensure the necessary ones are loaded @@ -3921,7 +3921,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE if (wm_item) { DATA_WORKMARKERS(wm, wm_item); LOGERR("%s(): attempt to update sharesummary " - "with %s %"PRId64"/%"PRId64"/%s createdate %s" + "with %s %"PRId64"/%"PRId64"/%s "CDDB" %s" " but processed workmarkers %"PRId64" exists", __func__, s_row ? "shares" : "shareerrors", workinfoid, userid, st = safe_text(workername), @@ -3969,7 +3969,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE level = LOG_WARNING; } else ooof0++; - LOGMSG(level, "%s(): OoO %s createdate (%s) is < summary" + LOGMSG(level, "%s(): OoO %s "CDDB" (%s) is < summary" " firstshare (%s) (%s)", __func__, s_row ? "shares" : "shareerrors", (tmp1 = ctv_to_buf(createdate, NULL, 0)), @@ -3990,7 +3990,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE level = LOG_WARNING; } else oool0++; - LOGMSG(level, "%s(): OoO %s createdate (%s) is < summary" + LOGMSG(level, "%s(): OoO %s "CDDB" (%s) is < summary" " lastshare (%s) (%s)", __func__, s_row ? "shares" : "shareerrors", (tmp1 = ctv_to_buf(createdate, NULL, 0)), @@ -4126,7 +4126,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE "shareacc=$9,sharesta=$10,sharedup=$11,sharehi=$12," "sharerej=$13,firstshare=$14,lastshare=$15," "sharecount=$16,errorcount=$17,lastdiffacc=$18,complete=$19" - ",modifydate=$20,modifyby=$21,modifycode=$22,modifyinet=$23 " + ","MDDB"=$20,"MBYDB"=$21,"MCODEDB"=$22,"MINETDB"=$23 " "where userid=$1 and workername=$2 and workinfoid=$3"; res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); @@ -4154,7 +4154,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE PARCHKVAL(par, 8, params); upd = "update sharesummary " - "set complete=$4,modifydate=$5,modifyby=$6,modifycode=$7,modifyinet=$8 " + "set complete=$4,"MDDB"=$5,"MBYDB"=$6,"MCODEDB"=$7,"MINETDB"=$8 " "where userid=$1 and workername=$2 and workinfoid=$3"; res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); @@ -4509,7 +4509,7 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, row->statsconfirmed[1] = '\0'; HISTORYDATEINIT(row, cd, by, code, inet); - upd = "update blocks set expirydate=$1 where blockhash=$2 and expirydate=$3"; + upd = "update blocks set "EDDB"=$1 where blockhash=$2 and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = str_to_buf(row->blockhash, NULL, 0); @@ -4564,7 +4564,7 @@ bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, "clientid,enonce1,nonce2,nonce,reward,confirmed," "$3,$4,$5,$6,$7,$8," "$9,$10,$11,$12,$13 from blocks where " - "blockhash=$1 and expirydate=$2"; + "blockhash=$1 and "EDDB"=$2"; res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); rescode = PQresultStatus(res); @@ -4782,7 +4782,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash, goto flail; } - upd = "update blocks set expirydate=$1 where blockhash=$2 and expirydate=$3"; + upd = "update blocks set "EDDB"=$1 where blockhash=$2 and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = str_to_buf(row->blockhash, NULL, 0); @@ -4849,7 +4849,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash, "clientid,enonce1,nonce2,nonce,reward," "$3,$4,$5,$6,$7,elapsed,statsconfirmed," "$8,$9,$10,$11,$12 from blocks where " - "blockhash=$1 and expirydate=$2"; + "blockhash=$1 and "EDDB"=$2"; } else { HISTORYDATEPARAMS(params, par, row); PARCHKVAL(par, 3 + HISTORYDATECOUNT, params); // 8 as per ins @@ -4865,7 +4865,7 @@ bool blocks_add(PGconn *conn, char *height, char *blockhash, "$3,diffacc,diffinv,shareacc,shareinv,elapsed," "statsconfirmed," "$4,$5,$6,$7,$8 from blocks where " - "blockhash=$1 and expirydate=$2"; + "blockhash=$1 and "EDDB"=$2"; } res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); @@ -5208,8 +5208,8 @@ bool miningpayouts_add(PGconn *conn, bool add, K_ITEM *mp_item, DATA_MININGPAYOUTS(oldmp, *old_mp_item); - upd = "update miningpayouts set expirydate=$1 where payoutid=$2" - " and userid=$3 and expirydate=$4"; + upd = "update miningpayouts set "EDDB"=$1 where payoutid=$2" + " and userid=$3 and "EDDB"=$4"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(oldmp->payoutid, NULL, 0); @@ -5428,8 +5428,8 @@ bool payouts_add(PGconn *conn, bool add, K_ITEM *p_item, K_ITEM **old_p_item, DATA_PAYOUTS(oldpayouts, *old_p_item); - upd = "update payouts set expirydate=$1 where payoutid=$2" - " and expirydate=$3"; + upd = "update payouts set "EDDB"=$1 where payoutid=$2" + " and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(oldpayouts->payoutid, NULL, 0); @@ -5922,7 +5922,7 @@ bool poolstats_fill(PGconn *conn) "poolinstance,elapsed,users,workers,hashrate," "hashrate5m,hashrate1hr,hashrate24hr" SIMPLEDATECONTROL - " from poolstats where createdate>"); + " from poolstats where "CDDB">"); APPEND_REALLOC(sel, off, len, stamp); res = PQexec(conn, sel, CKPQ_READ); @@ -6545,8 +6545,8 @@ bool _workmarkers_process(PGconn *conn, bool already, bool add, begun = true; } - upd = "update workmarkers set expirydate=$1 where markerid=$2" - " and expirydate=$3"; + upd = "update workmarkers set "EDDB"=$1 where markerid=$2" + " and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(oldworkmarkers->markerid, NULL, 0); @@ -6873,8 +6873,8 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance, begun = true; - upd = "update marks set expirydate=$1 where workinfoid=$2" - " and expirydate=$3"; + upd = "update marks set "EDDB"=$1 where workinfoid=$2" + " and "EDDB"=$3"; par = 0; params[par++] = tv_to_buf(cd, NULL, 0); params[par++] = bigint_to_buf(workinfoid, NULL, 0); diff --git a/src/libckpool.h b/src/libckpool.h index 28ea14c9..19158427 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -194,6 +194,9 @@ void logmsg(int loglevel, const char *fmt, ...); #define DEFLOGBUFSIZ 512 +#define LOGMSGBUF(__lvl, __buf) do { \ + logmsg(__lvl, "%s", __buf); \ + } while(0) #define LOGMSGSIZ(__siz, __lvl, __fmt, ...) do { \ char tmp42[__siz]; \ snprintf(tmp42, sizeof(tmp42), __fmt, ##__VA_ARGS__); \ From 3bc394ffb90b5eaedd6418d373e0312f2bcb5396 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 20 Apr 2015 18:33:05 +1000 Subject: [PATCH 2/6] ckdb - abort code that adds an item already in a list/store --- src/klist.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/klist.c b/src/klist.c index ab6b40b0..65e30448 100644 --- a/src/klist.c +++ b/src/klist.c @@ -205,6 +205,11 @@ void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) list->name, __func__, item->name, KLIST_FFL_PASS); } + if (item->prev || item->next) { + quithere(1, "%s() added item %s still linked" KLIST_FFL, + __func__, item->name, KLIST_FFL_PASS); + } + item->prev = NULL; item->next = list->head; if (list->head) @@ -241,6 +246,11 @@ void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) list->name, __func__, KLIST_FFL_PASS); } + if (item->prev || item->next) { + quithere(1, "%s() added item %s still linked" KLIST_FFL, + __func__, item->name, KLIST_FFL_PASS); + } + item->prev = list->tail; item->next = NULL; if (list->tail) From da2d33a497b5874ea6c2269eae268791674007fb Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 20 Apr 2015 18:41:15 +1000 Subject: [PATCH 3/6] ckdb - use safe_text_null where trailing null isn't needed --- src/ckdb.h | 6 +++--- src/ckdb_dbio.c | 54 ++++++++++++++++++++++++------------------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 80b606bf..77f01746 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.070" +#define CKDB_VERSION DB_VERSION"-1.071" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -790,7 +790,7 @@ extern tv_t missing_secuser_max; * maxseq is incremented to the current maximum N+x each time a record is * processed * If we get up to N+3 but it is unexpectedly followed by N+5, that means N+4 - * is currently missing - so it is flagged as missing by time=0 and the + * is currently missing - so it is flagged as missing by MISSFLAG=0 and the * missing counters are incremented - maxseq will now be N+5 * Once we reach N+size we need to discard N and use it as N+size * and increment seqbase N @@ -804,7 +804,7 @@ extern tv_t missing_secuser_max; * If we receive an item N-x i.e. less than seqbase N, then: * If maxseq-N = size then N-x is considered stale and the stale counters * are incremented since there's no unused items below N available - * this shouldn't normally happen after we've received size seq numbers + * This shouldn't normally happen after we've received size seq numbers * Else maxseq-N is less than size, that means there are unused items below N * This will usually only be with a new sequence and the first seq was out * of order, before lower sequence numbers, thus maxseq should be close to diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 37b9974c..2c04c4af 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -901,7 +901,7 @@ K_ITEM *useratts_add(PGconn *conn, char *username, char *attname, if (!u_item) { LOGERR("%s(): unknown user '%s'", __func__, - st = safe_text(username)); + st = safe_text_nonull(username)); FREENULL(st); goto unitem; } @@ -2797,7 +2797,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) if (!w_item) { LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld", __func__, shares->userid, - st = safe_text(shares->workername), + st = safe_text_nonull(shares->workername), shares->createdate.tv_sec, shares->createdate.tv_usec); FREENULL(st); return false; @@ -2811,7 +2811,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) LOGDEBUG("%s(): workmarker exists for wid %"PRId64 " %"PRId64"/%s/%ld,%ld", __func__, shares->workinfoid, shares->userid, - st = safe_text(shares->workername), + st = safe_text_nonull(shares->workername), shares->createdate.tv_sec, shares->createdate.tv_usec); FREENULL(st); @@ -2827,7 +2827,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_TREE *trf_root) "%"PRId64" %"PRId64"/%s/%ld,%ld", __func__, sharesummary->complete, shares->workinfoid, shares->userid, - st = safe_text(shares->workername), + st = safe_text_nonull(shares->workername), shares->createdate.tv_sec, shares->createdate.tv_usec); FREENULL(st); @@ -2938,7 +2938,7 @@ keep: LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 " Early share procured", __func__, early_shares->workinfoid, - st = safe_text(early_shares->workername), + st = safe_text_nonull(early_shares->workername), early_shares->createdate.tv_sec, early_shares->createdate.tv_usec, cd_buf, early_shares->oldcount, early_shares->redo); @@ -2953,7 +2953,7 @@ discard: LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 " Early share discarded!%s", __func__, early_shares->workinfoid, - st = safe_text(early_shares->workername), + st = safe_text_nonull(early_shares->workername), early_shares->createdate.tv_sec, early_shares->createdate.tv_usec, cd_buf, early_shares->oldcount, early_shares->redo, why); @@ -2982,7 +2982,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", __func__, - workinfoid, st = safe_text(workername), nonce, + workinfoid, st = safe_text_nonull(workername), nonce, errn, cd->tv_sec, cd->tv_usec); FREENULL(st); @@ -3002,7 +3002,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername or the authentication information got to ckdb after the shares ... which shouldn't ever happen */ LOGERR("%s() %s/%ld,%ld %s no user! Share discarded!", - __func__, st = safe_text(username), + __func__, st = safe_text_nonull(username), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); goto tisbad; @@ -3029,7 +3029,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername btv_to_buf(cd, cd_buf, sizeof(cd_buf)); LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! " "Share corrected", - __func__, st = safe_text(username), + __func__, st = safe_text_nonull(username), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); } @@ -3044,7 +3044,7 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " "Early share queued!", __func__, shares->workinfoid, - st = safe_text(workername), + st = safe_text_nonull(workername), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); shares->redo = 0; @@ -3102,7 +3102,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, if (!w_item) { LOGDEBUG("%s(): new_default_worker failed %"PRId64"/%s/%ld,%ld", __func__, shareerrors->userid, - st = safe_text(shareerrors->workername), + st = safe_text_nonull(shareerrors->workername), shareerrors->createdate.tv_sec, shareerrors->createdate.tv_usec); FREENULL(st); @@ -3118,7 +3118,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, " %"PRId64"/%s/%ld,%ld", __func__, shareerrors->workinfoid, shareerrors->userid, - st = safe_text(shareerrors->workername), + st = safe_text_nonull(shareerrors->workername), shareerrors->createdate.tv_sec, shareerrors->createdate.tv_usec); FREENULL(st); @@ -3136,7 +3136,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, __func__, sharesummary->complete, shareerrors->workinfoid, shareerrors->userid, - st = safe_text(shareerrors->workername), + st = safe_text_nonull(shareerrors->workername), shareerrors->createdate.tv_sec, shareerrors->createdate.tv_usec); FREENULL(st); @@ -3249,7 +3249,7 @@ keep: LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 " Early share procured", __func__, early_shareerrors->workinfoid, - st = safe_text(early_shareerrors->workername), + st = safe_text_nonull(early_shareerrors->workername), early_shareerrors->createdate.tv_sec, early_shareerrors->createdate.tv_usec, cd_buf, early_shareerrors->oldcount, early_shareerrors->redo); @@ -3264,7 +3264,7 @@ discard: LOGERR("%s() %"PRId64"/%s/%ld,%ld %s/%"PRId32"/%"PRId32 " Early share discarded!%s", __func__, early_shareerrors->workinfoid, - st = safe_text(early_shareerrors->workername), + st = safe_text_nonull(early_shareerrors->workername), early_shareerrors->createdate.tv_sec, early_shareerrors->createdate.tv_usec, cd_buf, early_shareerrors->oldcount, early_shareerrors->redo, why); @@ -3290,7 +3290,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", __func__, - workinfoid, st = safe_text(workername), errn, + workinfoid, st = safe_text_nonull(workername), errn, error, cd->tv_sec, cd->tv_usec); FREENULL(st); @@ -3307,7 +3307,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, if (!u_item) { btv_to_buf(cd, cd_buf, sizeof(cd_buf)); LOGERR("%s() %s/%ld,%ld %s no user! Shareerror discarded!", - __func__, st = safe_text(username), + __func__, st = safe_text_nonull(username), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); goto tisbad; @@ -3330,7 +3330,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, btv_to_buf(cd, cd_buf, sizeof(cd_buf)); LOGERR("%s() %s/%ld,%ld %s missing secondaryuserid! " "Sharerror corrected", - __func__, st = safe_text(username), + __func__, st = safe_text_nonull(username), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); } @@ -3345,7 +3345,7 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, LOGERR("%s() %"PRId64"/%s/%ld,%ld %s no workinfo! " "Early shareerror queued!", __func__, shareerrors->workinfoid, - st = safe_text(workername), + st = safe_text_nonull(workername), cd->tv_sec, cd->tv_usec, cd_buf); FREENULL(st); shareerrors->redo = 0; @@ -4004,7 +4004,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE LOGDEBUG("%s(): updating sharesummary not '%c'" " %"PRId64"/%s/%"PRId64"/%s", __func__, SUMMARY_NEW, row->userid, - st = safe_text(row->workername), + st = safe_text_nonull(row->workername), row->workinfoid, row->complete); FREENULL(st); } @@ -4956,7 +4956,7 @@ flail: snprintf(tmp, sizeof(tmp), " Reward: %f, Worker: %s, ShareEst: %.1f %s%s%% UTC:%s", BTC_TO_D(row->reward), - st = safe_text(row->workername), + st = safe_text_nonull(row->workername), pool.diffacc, est, pct, cd_buf); FREENULL(st); if (pool.workinfoid < row->workinfoid) { @@ -5686,7 +5686,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, } else { LOGDEBUG("%s(): unknown user '%s'", __func__, - st = safe_text(username)); + st = safe_text_nonull(username)); FREENULL(st); } if (!u_item) @@ -5727,7 +5727,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, // Shouldn't actually be possible unless twice in the logs tv_to_buf(cd, cd_buf, sizeof(cd_buf)); LOGERR("%s(): Duplicate auths ignored %s/%s/%s", - __func__, poolinstance, st = safe_text(workername), + __func__, poolinstance, st = safe_text_nonull(workername), cd_buf); FREENULL(st); @@ -6056,7 +6056,7 @@ bool userstats_add(char *poolinstance, char *elapsed, char *username, if (!u_item) { LOGERR("%s(): unknown user '%s'", __func__, - st = safe_text(username)); + st = safe_text_nonull(username)); FREENULL(st); return false; } @@ -6166,8 +6166,8 @@ bool workerstats_add(char *poolinstance, char *elapsed, char *username, char *usr = NULL, *wrk = NULL; LOGERR("%s(): unknown user '%s' (worker=%s)", __func__, - usr = safe_text(username), - wrk = safe_text(workername)); + usr = safe_text_nonull(username), + wrk = safe_text_nonull(workername)); FREENULL(usr); FREENULL(wrk); return false; @@ -6261,7 +6261,7 @@ bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f", __func__, row->markerid, row->userid, - st = safe_text(row->workername), + st = safe_text_nonull(row->workername), row->diffacc); FREENULL(st); From 6fb7879a7eefbd711153177af1038ef2da2e40c1 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 20 Apr 2015 19:14:47 +1000 Subject: [PATCH 4/6] ckdb - complete the reload to the end of the log file --- src/ckdb.c | 11 ++++++++--- src/ckdb.h | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index e695f1ce..df758acc 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -3883,7 +3883,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) finished = true; ck_wunlock(&fpm_lock); if (finished) { - LOGERR("%s() reload completed, ckpool queue match at line %"PRIu64, __func__, count); + LOGERR("%s() reload ckpool queue match at line %"PRIu64, __func__, count); return true; } @@ -4101,7 +4101,12 @@ static bool reload_from(tv_t *start) processing++; count = 0; - while (!everyone_die && !matched && + /* Don't abort when matched since breakdown() will remove + * the matching message sequence numbers queued from ckpool + * Also since ckpool messages are not in order, we could be + * aborting early and not get the few slightly later out of + * order messages in the log file */ + while (!everyone_die && logline(reload_buf, MAX_READ, fp, filename)) matched = reload_line(conn, filename, ++count, reload_buf); @@ -4121,7 +4126,7 @@ static bool reload_from(tv_t *start) } else fclose(fp); free(filename); - if (everyone_die || matched) + if (everyone_die) break; reload_timestamp.tv_sec += ROLL_S; if (confirm_sharesummary && tv_newer(&confirm_finish, &reload_timestamp)) { diff --git a/src/ckdb.h b/src/ckdb.h index 77f01746..0d49c012 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.071" +#define CKDB_VERSION DB_VERSION"-1.072" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From 998a3587e4eb48232e513dca728258f97ee2c7e1 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 20 Apr 2015 20:15:35 +1000 Subject: [PATCH 5/6] ckdb - add dup inimical message --- src/ckdb.c | 10 +++++++++- src/ckdb.h | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index df758acc..2c901d5b 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2141,7 +2141,15 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, buf, now, cd, code, warndup, msg); - // zzz // if dupall != dupcmd report a problem ... inside update_seq() ? + if (dupall != dupcmd) { + LOGERR("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " + "cmd=%.32s...", + seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", + seqnam[ckdb_cmds[which].seq], n_seqcmd, + dupcmd ? "DUP" : "notdup", + st = safe_text_nonull(msg)); + FREENULL(st); + } /* The norm is: neither is a dup, so reply with ok to process * If only one is a dup then that's a corrupt message or a bug diff --git a/src/ckdb.h b/src/ckdb.h index 0d49c012..636e7922 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.072" +#define CKDB_VERSION DB_VERSION"-1.073" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From 15a735f805993c8f6bc8229800a4dc7b2c4cca78 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 21 Apr 2015 10:55:47 +1000 Subject: [PATCH 6/6] Broadcast in message queues instead of signalling for when there are multiple queue parsing threads --- src/ckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 70725e08..218139de 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -175,7 +175,7 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons } /* Generic function for adding messages to a ckmsgq linked list and signal the - * ckmsgq parsing thread to wake up and process it. */ + * ckmsgq parsing thread(s) to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) { ckmsg_t *msg = ckalloc(sizeof(ckmsg_t)); @@ -185,7 +185,7 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(ckmsgq->cond); + pthread_cond_broadcast(ckmsgq->cond); mutex_unlock(ckmsgq->lock); }