From d02862d1fe3c9de82b9aa0ddff2d08b069cf5370 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 6 Feb 2015 20:24:53 +1100 Subject: [PATCH] Update subscriptions and notifications for proxies that aren't the current proxy for faster switching --- src/generator.c | 29 ++++++++++++++++++++++++++++- src/stratifier.c | 5 ++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/generator.c b/src/generator.c index 53908353..afc295e6 100644 --- a/src/generator.c +++ b/src/generator.c @@ -129,6 +129,7 @@ struct proxy_instance { struct generator_data { pthread_mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxies; /* Hash list of all proxies */ + proxy_instance_t *proxy; /* Current proxy */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -1366,6 +1367,18 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) ckmsgq_add(proxi->passsends, pm); } +static int current_proxy_id(gdata_t *gdata) +{ + int ret = 0; + + mutex_lock(&gdata->lock); + if (gdata->proxy) + ret = gdata->proxy->id; + mutex_unlock(&gdata->lock); + + return ret; +} + static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging) { @@ -1414,8 +1427,19 @@ out: if (!ret) { /* Close and invalidate the file handle */ Close(cs->fd); - } else + } else { keep_sockalive(cs->fd); + /* If this isn't a new higher priority proxy we won't be + * issuing a reconnect so tell the stratifier to get the + * subscription and notification data without reconnecting. */ + if (proxi->id > current_proxy_id(ckp->data)) { + char *msg; + + ASPRINTF(&msg, "subscribe=%d", proxi->id); + send_proc(ckp->stratifier, msg); + free(msg); + } + } return ret; } @@ -1605,6 +1629,7 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata) while (42) { if (!ping_main(ckp)) break; + mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { if (proxi->alive) { if (!ret) { @@ -1615,6 +1640,8 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata) ret = proxi; } } + gdata->proxy = ret; + mutex_unlock(&gdata->lock); if (ret) break; sleep(1); diff --git a/src/stratifier.c b/src/stratifier.c index 199895b8..b60ec695 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1039,6 +1039,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd) return; } + LOGNOTICE("Got updated subscribe for proxy %d", id); + proxy = proxy_by_id(sdata, id); json_get_bool(&reconnect, val, "reconnect"); @@ -1121,12 +1123,13 @@ static void _update_notify(ckpool_t *ckp, const int id) LOGWARNING("Failed to get notify from generator in update_notify"); return; } + LOGDEBUG("Update notify: %s", buf); if (unlikely(!safecmp(buf, "notready"))) { LOGNOTICE("Generator not ready to send notify to stratifier"); return; } - LOGDEBUG("Update notify: %s", buf); + LOGINFO("Got updated notify for proxy %d", id); wb = ckzalloc(sizeof(workbase_t)); val = json_loads(buf, 0, NULL); dealloc(buf);