|
|
|
@ -388,6 +388,10 @@ static char *seqnam[SEQ_MAX];
|
|
|
|
|
#define SEQLOCK() K_WLOCK(seqset_free); |
|
|
|
|
#define SEQUNLOCK() K_WUNLOCK(seqset_free); |
|
|
|
|
|
|
|
|
|
/* Set to 1 to enable compiling in the SEQALL_LOG logging
|
|
|
|
|
* Any compiler optimisation should remove the code if it's 0 */ |
|
|
|
|
#define SEQALL_LOG 0 |
|
|
|
|
|
|
|
|
|
// SEQTRANS
|
|
|
|
|
K_LIST *seqtrans_free; |
|
|
|
|
|
|
|
|
@ -2817,6 +2821,10 @@ static enum cmd_values process_seq(MSGLINE *msgline)
|
|
|
|
|
msgline->n_seqpid == 0) |
|
|
|
|
return ckdb_cmds[msgline->which_cmds].cmd_val; |
|
|
|
|
|
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %"PRIu64, |
|
|
|
|
__func__, msgline->n_seqall); |
|
|
|
|
} |
|
|
|
|
dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, |
|
|
|
|
msgline->n_seqpid, SEQALL, &(msgline->now), |
|
|
|
|
&(msgline->cd), msgline->code, |
|
|
|
@ -2845,6 +2853,10 @@ static enum cmd_values process_seq(MSGLINE *msgline)
|
|
|
|
|
if (!dupall || !dupcmd) |
|
|
|
|
return ckdb_cmds[msgline->which_cmds].cmd_val; |
|
|
|
|
|
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
LOGNOTICE("%s() SEQALL DUP %"PRIu64, |
|
|
|
|
__func__, msgline->n_seqall); |
|
|
|
|
} |
|
|
|
|
/* It's a dup */ |
|
|
|
|
return CMD_DUPSEQ; |
|
|
|
|
} |
|
|
|
@ -3375,6 +3387,22 @@ static void *breaker(void *arg)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags); |
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
MSGLINE *msgline; |
|
|
|
|
K_ITEM *seqall; |
|
|
|
|
if (bq->ml_item) { |
|
|
|
|
DATA_MSGLINE(msgline, bq->ml_item); |
|
|
|
|
if (msgline->trf_root) { |
|
|
|
|
seqall = find_transfer(msgline->trf_root, SEQALL); |
|
|
|
|
if (seqall) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %s %s", |
|
|
|
|
__func__, |
|
|
|
|
reload ? "reload" : "cmd", |
|
|
|
|
transfer_data(seqall)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
K_WLOCK(breakqueue_free); |
|
|
|
|
if (reload) |
|
|
|
|
k_add_tail(reload_done_breakqueue_store, bq_item); |
|
|
|
@ -4558,6 +4586,17 @@ static void *process_socket(void *arg)
|
|
|
|
|
|
|
|
|
|
DATA_BREAKQUEUE(bq, bq_item); |
|
|
|
|
DATA_MSGLINE(msgline, bq->ml_item); |
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
K_ITEM *seqall; |
|
|
|
|
if (msgline->trf_root) { |
|
|
|
|
seqall = find_transfer(msgline->trf_root, SEQALL); |
|
|
|
|
if (seqall) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %d %s", |
|
|
|
|
__func__, bq->cmdnum, |
|
|
|
|
transfer_data(seqall)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
replied = btc = false; |
|
|
|
|
switch (bq->cmdnum) { |
|
|
|
|
case CMD_AUTH: |
|
|
|
@ -4949,6 +4988,25 @@ static void *socketer(void *arg)
|
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
char *pos, *col, *com; |
|
|
|
|
pos = strstr(buf, SEQALL); |
|
|
|
|
if (pos) { |
|
|
|
|
col = strchr(pos, JSON_VALUE); |
|
|
|
|
if (col) { |
|
|
|
|
com = strchr(col, JSON_SEP); |
|
|
|
|
if (com) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %s %.*s", |
|
|
|
|
__func__, |
|
|
|
|
seqentryflags == SE_SOCKET |
|
|
|
|
? "S" : "ES", |
|
|
|
|
(int)(com-col-1), |
|
|
|
|
col+1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Don't limit the speed filling up cmd_breakqueue_store
|
|
|
|
|
K_WLOCK(breakqueue_free); |
|
|
|
|
bq_item = k_unlink_head(breakqueue_free); |
|
|
|
@ -5018,6 +5076,17 @@ static void *process_reload(__maybe_unused void *arg)
|
|
|
|
|
|
|
|
|
|
DATA_BREAKQUEUE(bq, bq_item); |
|
|
|
|
DATA_MSGLINE(msgline, bq->ml_item); |
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
K_ITEM *seqall; |
|
|
|
|
if (msgline->trf_root) { |
|
|
|
|
seqall = find_transfer(msgline->trf_root, SEQALL); |
|
|
|
|
if (seqall) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %d %s", |
|
|
|
|
__func__, bq->cmdnum, |
|
|
|
|
transfer_data(seqall)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
switch (bq->cmdnum) { |
|
|
|
|
// Ignore
|
|
|
|
|
case CMD_REPLY: |
|
|
|
@ -5155,6 +5224,22 @@ static void reload_line(char *filename, char *buf, uint64_t count)
|
|
|
|
|
__func__, count); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
char *pos, *col, *com; |
|
|
|
|
pos = strstr(buf, SEQALL); |
|
|
|
|
if (pos) { |
|
|
|
|
col = strchr(pos, JSON_VALUE); |
|
|
|
|
if (col) { |
|
|
|
|
com = strchr(col, JSON_SEP); |
|
|
|
|
if (com) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %.*s", |
|
|
|
|
__func__, |
|
|
|
|
(int)(com-col-1), |
|
|
|
|
col+1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
K_WLOCK(breakqueue_free); |
|
|
|
|
bq_item = k_unlink_head(breakqueue_free); |
|
|
|
|
K_WUNLOCK(breakqueue_free); |
|
|
|
@ -5484,6 +5569,18 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item)
|
|
|
|
|
DATA_WORKQUEUE(workqueue, wq_item); |
|
|
|
|
ml_item = workqueue->msgline_item; |
|
|
|
|
DATA_MSGLINE(msgline, ml_item); |
|
|
|
|
if (SEQALL_LOG) { |
|
|
|
|
K_ITEM *seqall; |
|
|
|
|
if (msgline->trf_root) { |
|
|
|
|
seqall = find_transfer(msgline->trf_root, SEQALL); |
|
|
|
|
if (seqall) { |
|
|
|
|
LOGNOTICE("%s() SEQALL %d %s", |
|
|
|
|
__func__, |
|
|
|
|
ckdb_cmds[msgline->which_cmds].cmd_val, |
|
|
|
|
transfer_data(seqall)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Queued messages haven't had their seq number check yet
|
|
|
|
|
* This will return the entries cmdnum or DUP */ |
|
|
|
|