From 3418ff004e656e65cb74e000fb0930b2dbdcf9b6 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sun, 27 Jul 2014 10:45:48 +1000 Subject: [PATCH] ckdb - handle duplicate messages --- src/ckdb.c | 87 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 19 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 47be772f..69cd6e6e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1038,6 +1038,7 @@ static K_STORE *workerstatus_store; static char logname[512]; #define LOGFILE(_msg) rotating_log(logname, _msg) +#define LOGDUP "dup." void logmsg(int loglevel, const char *fmt, ...) { @@ -6198,14 +6199,18 @@ static void *listener(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; unixsock_t *us = &pi->us; - char *end, *ans, *rep, *buf = NULL; + char *end, *ans, *rep, *buf = NULL, *dot; + char *last_msg = NULL, *last_reply = NULL; char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; - enum cmd_values cmdnum; + // Minimise the size in case of garbage + char duptype[16+1]; + enum cmd_values cmdnum, last_cmd = 9001; int sockd, which_cmds; pthread_t summzer; K_ITEM *item; size_t siz; tv_t now; + bool dup; create_pthread(&summzer, summariser, NULL); @@ -6246,13 +6251,47 @@ static void *listener(void *arg) else LOGWARNING("Empty message in listener"); } else { - // Log everything we get (for now) - LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id); + /* For duplicates: + * System: shutdown and ping are always processed, + * so for any others, send a ping between them + * if you need to send the same message twice + * Web: if the pool didn't do anything since the original + * then the reply could be wrong for any reply that + * changes over time ... however if the pool is busy + * and we get the same request repeatedly, this will + * reduce the load - thus always send the same reply + * Pool: must not process it, must send back the same reply + * TODO: remember last message/reply per source + */ + if (last_msg && strcmp(last_msg, buf) == 0) { + dup = true; + // This means an exact duplicate of the last non-dup + snprintf(reply, sizeof(reply), "%s%ld,%ld", LOGDUP, now.tv_sec, now.tv_usec); + LOGFILE(reply); + cmdnum = last_cmd; + + STRNCPY(duptype, buf); + dot = strchr(duptype, '.'); + if (dot) + *dot = '\0'; + LOGWARNING("Duplicate '%s' message received", duptype); + } else { + dup = false; + LOGFILE(buf); + cmdnum = breakdown(buf, &which_cmds, cmd, id); + last_cmd = cmdnum; + } switch (cmdnum) { case CMD_REPLY: - snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); - send_unix_msg(sockd, reply); + if (dup) + send_unix_msg(sockd, last_reply); + else { + snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); + if (last_reply) + free(last_reply); + last_reply = strdup(reply); + send_unix_msg(sockd, reply); + } break; case CMD_SHUTDOWN: LOGWARNING("Listener received shutdown message, terminating ckdb"); @@ -6261,22 +6300,30 @@ static void *listener(void *arg) break; case CMD_PING: LOGDEBUG("Listener received ping request"); + // Generate a new reply each time even on dup snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); send_unix_msg(sockd, reply); break; default: - // TODO: optionally get by/code/inet from transfer here instead? - ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", - (char *)__func__, - (char *)"127.0.0.1"); - siz = strlen(ans) + strlen(id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); - free(ans); - ans = NULL; - send_unix_msg(sockd, rep); - free(rep); - rep = NULL; + if (dup) + send_unix_msg(sockd, last_reply); + else { + // TODO: optionally get by/code/inet from transfer here instead? + ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", + (char *)__func__, + (char *)"127.0.0.1"); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + free(ans); + ans = NULL; + if (last_reply) + free(last_reply); + last_reply = strdup(rep); + send_unix_msg(sockd, rep); + free(rep); + rep = NULL; + } break; } } @@ -6298,6 +6345,8 @@ static void *listener(void *arg) } dealloc(buf); + if (last_reply) + free(last_reply); close_unix_socket(us->sockd, us->path); return NULL; }