Browse Source

ckdb - handle duplicate messages

master
kanoi 10 years ago
parent
commit
3418ff004e
  1. 87
      src/ckdb.c

87
src/ckdb.c

@ -1038,6 +1038,7 @@ static K_STORE *workerstatus_store;
static char logname[512]; static char logname[512];
#define LOGFILE(_msg) rotating_log(logname, _msg) #define LOGFILE(_msg) rotating_log(logname, _msg)
#define LOGDUP "dup."
void logmsg(int loglevel, const char *fmt, ...) void logmsg(int loglevel, const char *fmt, ...)
{ {
@ -6198,14 +6199,18 @@ static void *listener(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
char *end, *ans, *rep, *buf = NULL; char *end, *ans, *rep, *buf = NULL, *dot;
char *last_msg = NULL, *last_reply = NULL;
char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1];
enum cmd_values cmdnum; // Minimise the size in case of garbage
char duptype[16+1];
enum cmd_values cmdnum, last_cmd = 9001;
int sockd, which_cmds; int sockd, which_cmds;
pthread_t summzer; pthread_t summzer;
K_ITEM *item; K_ITEM *item;
size_t siz; size_t siz;
tv_t now; tv_t now;
bool dup;
create_pthread(&summzer, summariser, NULL); create_pthread(&summzer, summariser, NULL);
@ -6246,13 +6251,47 @@ static void *listener(void *arg)
else else
LOGWARNING("Empty message in listener"); LOGWARNING("Empty message in listener");
} else { } else {
// Log everything we get (for now) /* For duplicates:
LOGFILE(buf); * System: shutdown and ping are always processed,
cmdnum = breakdown(buf, &which_cmds, cmd, id); * so for any others, send a ping between them
* if you need to send the same message twice
* Web: if the pool didn't do anything since the original
* then the reply could be wrong for any reply that
* changes over time ... however if the pool is busy
* and we get the same request repeatedly, this will
* reduce the load - thus always send the same reply
* Pool: must not process it, must send back the same reply
* TODO: remember last message/reply per source
*/
if (last_msg && strcmp(last_msg, buf) == 0) {
dup = true;
// This means an exact duplicate of the last non-dup
snprintf(reply, sizeof(reply), "%s%ld,%ld", LOGDUP, now.tv_sec, now.tv_usec);
LOGFILE(reply);
cmdnum = last_cmd;
STRNCPY(duptype, buf);
dot = strchr(duptype, '.');
if (dot)
*dot = '\0';
LOGWARNING("Duplicate '%s' message received", duptype);
} else {
dup = false;
LOGFILE(buf);
cmdnum = breakdown(buf, &which_cmds, cmd, id);
last_cmd = cmdnum;
}
switch (cmdnum) { switch (cmdnum) {
case CMD_REPLY: case CMD_REPLY:
snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); if (dup)
send_unix_msg(sockd, reply); send_unix_msg(sockd, last_reply);
else {
snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec);
if (last_reply)
free(last_reply);
last_reply = strdup(reply);
send_unix_msg(sockd, reply);
}
break; break;
case CMD_SHUTDOWN: case CMD_SHUTDOWN:
LOGWARNING("Listener received shutdown message, terminating ckdb"); LOGWARNING("Listener received shutdown message, terminating ckdb");
@ -6261,22 +6300,30 @@ static void *listener(void *arg)
break; break;
case CMD_PING: case CMD_PING:
LOGDEBUG("Listener received ping request"); LOGDEBUG("Listener received ping request");
// Generate a new reply each time even on dup
snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec);
send_unix_msg(sockd, reply); send_unix_msg(sockd, reply);
break; break;
default: default:
// TODO: optionally get by/code/inet from transfer here instead? if (dup)
ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", send_unix_msg(sockd, last_reply);
(char *)__func__, else {
(char *)"127.0.0.1"); // TODO: optionally get by/code/inet from transfer here instead?
siz = strlen(ans) + strlen(id) + 32; ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code",
rep = malloc(siz); (char *)__func__,
snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); (char *)"127.0.0.1");
free(ans); siz = strlen(ans) + strlen(id) + 32;
ans = NULL; rep = malloc(siz);
send_unix_msg(sockd, rep); snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans);
free(rep); free(ans);
rep = NULL; ans = NULL;
if (last_reply)
free(last_reply);
last_reply = strdup(rep);
send_unix_msg(sockd, rep);
free(rep);
rep = NULL;
}
break; break;
} }
} }
@ -6298,6 +6345,8 @@ static void *listener(void *arg)
} }
dealloc(buf); dealloc(buf);
if (last_reply)
free(last_reply);
close_unix_socket(us->sockd, us->path); close_unix_socket(us->sockd, us->path);
return NULL; return NULL;
} }

Loading…
Cancel
Save