Browse Source

Update subscriptions and notifications for proxies that aren't the current proxy for faster switching

master
Con Kolivas 10 years ago
parent
commit
d02862d1fe
  1. 29
      src/generator.c
  2. 5
      src/stratifier.c

29
src/generator.c

@ -129,6 +129,7 @@ struct proxy_instance {
struct generator_data { struct generator_data {
pthread_mutex_t lock; /* Lock protecting linked lists */ pthread_mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *proxies; /* Hash list of all proxies */
proxy_instance_t *proxy; /* Current proxy */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
}; };
@ -1366,6 +1367,18 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
ckmsgq_add(proxi->passsends, pm); ckmsgq_add(proxi->passsends, pm);
} }
static int current_proxy_id(gdata_t *gdata)
{
int ret = 0;
mutex_lock(&gdata->lock);
if (gdata->proxy)
ret = gdata->proxy->id;
mutex_unlock(&gdata->lock);
return ret;
}
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging) connsock_t *cs, bool pinging)
{ {
@ -1414,8 +1427,19 @@ out:
if (!ret) { if (!ret) {
/* Close and invalidate the file handle */ /* Close and invalidate the file handle */
Close(cs->fd); Close(cs->fd);
} else } else {
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
/* If this isn't a new higher priority proxy we won't be
* issuing a reconnect so tell the stratifier to get the
* subscription and notification data without reconnecting. */
if (proxi->id > current_proxy_id(ckp->data)) {
char *msg;
ASPRINTF(&msg, "subscribe=%d", proxi->id);
send_proc(ckp->stratifier, msg);
free(msg);
}
}
return ret; return ret;
} }
@ -1605,6 +1629,7 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata)
while (42) { while (42) {
if (!ping_main(ckp)) if (!ping_main(ckp))
break; break;
mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->proxies, proxi, tmp) { HASH_ITER(hh, gdata->proxies, proxi, tmp) {
if (proxi->alive) { if (proxi->alive) {
if (!ret) { if (!ret) {
@ -1615,6 +1640,8 @@ static proxy_instance_t *current_proxy(ckpool_t *ckp, gdata_t *gdata)
ret = proxi; ret = proxi;
} }
} }
gdata->proxy = ret;
mutex_unlock(&gdata->lock);
if (ret) if (ret)
break; break;
sleep(1); sleep(1);

5
src/stratifier.c

@ -1039,6 +1039,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
return; return;
} }
LOGNOTICE("Got updated subscribe for proxy %d", id);
proxy = proxy_by_id(sdata, id); proxy = proxy_by_id(sdata, id);
json_get_bool(&reconnect, val, "reconnect"); json_get_bool(&reconnect, val, "reconnect");
@ -1121,12 +1123,13 @@ static void _update_notify(ckpool_t *ckp, const int id)
LOGWARNING("Failed to get notify from generator in update_notify"); LOGWARNING("Failed to get notify from generator in update_notify");
return; return;
} }
LOGDEBUG("Update notify: %s", buf);
if (unlikely(!safecmp(buf, "notready"))) { if (unlikely(!safecmp(buf, "notready"))) {
LOGNOTICE("Generator not ready to send notify to stratifier"); LOGNOTICE("Generator not ready to send notify to stratifier");
return; return;
} }
LOGDEBUG("Update notify: %s", buf); LOGINFO("Got updated notify for proxy %d", id);
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
val = json_loads(buf, 0, NULL); val = json_loads(buf, 0, NULL);
dealloc(buf); dealloc(buf);

Loading…
Cancel
Save