Browse Source

Use async send proc in the generator

master
Con Kolivas 10 years ago
parent
commit
5b41b1cad6
  1. 34
      src/generator.c

34
src/generator.c

@ -221,7 +221,7 @@ retry:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -367,7 +367,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1); ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1); memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
send_proc(ckp->stratifier, blockmsg); async_send_proc(ckp, ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) { } else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10)) if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
@ -972,7 +972,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "diff=%s", msg); ASPRINTF(&buf, "diff=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
} }
@ -1007,7 +1007,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "notify=%s", msg); ASPRINTF(&buf, "notify=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
} }
@ -1195,7 +1195,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg); ASPRINTF(&buf, "subscribe=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
} }
@ -1428,7 +1428,7 @@ static void *passthrough_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false)) { if (proxy_alive(ckp, si, proxi, cs, false)) {
proxi->alive = true; proxi->alive = true;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
@ -1439,13 +1439,13 @@ static void *passthrough_recv(void *arg)
while (!proxy_alive(ckp, si, proxi, cs, true)) { while (!proxy_alive(ckp, si, proxi, cs, true)) {
if (proxi->alive) { if (proxi->alive) {
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
sleep(5); sleep(5);
} }
if (!proxi->alive) { if (!proxi->alive) {
proxi->alive = true; proxi->alive = true;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
do { do {
@ -1456,13 +1456,13 @@ static void *passthrough_recv(void *arg)
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
continue; continue;
} }
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
send_proc(ckp->connector, cs->buf); async_send_proc(ckp, ckp->connector, cs->buf);
} }
return NULL; return NULL;
} }
@ -1496,7 +1496,7 @@ static void *proxy_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false)) { if (proxy_alive(ckp, si, proxi, cs, false)) {
proxi->alive = true; proxi->alive = true;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established", LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
} }
@ -1510,7 +1510,7 @@ static void *proxy_recv(void *arg)
while (!proxy_alive(ckp, si, proxi, cs, true)) { while (!proxy_alive(ckp, si, proxi, cs, true)) {
if (proxi->alive) { if (proxi->alive) {
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
sleep(5); sleep(5);
proxi->reconnect_time = time(NULL); proxi->reconnect_time = time(NULL);
@ -1522,7 +1522,7 @@ static void *proxy_recv(void *arg)
LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url);
proxi->alive = true; proxi->alive = true;
proxi->reconnect_time = 0; proxi->reconnect_time = 0;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
now = time(NULL); now = time(NULL);
@ -1572,7 +1572,7 @@ static void *proxy_recv(void *arg)
* pool is up */ * pool is up */
proxi->reconnect = false; proxi->reconnect = false;
proxi->alive = false; proxi->alive = false;
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
proxi->id, proxi->si->url); proxi->id, proxi->si->url);
Close(cs->fd); Close(cs->fd);
@ -1644,7 +1644,7 @@ static proxy_instance_t *best_proxy(ckpool_t *ckp, gdata_t *gdata)
break; break;
sleep(1); sleep(1);
} }
send_proc(ckp->connector, ret ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject");
return ret; return ret;
} }
@ -1672,7 +1672,7 @@ reconnect:
proxi->id, cs->url, cs->port); proxi->id, cs->url, cs->port);
dealloc(buf); dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id); ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
/* Send a notify for the new chosen proxy or the /* Send a notify for the new chosen proxy or the
* stratifier won't be able to switch. */ * stratifier won't be able to switch. */
send_notify(ckp, proxi); send_notify(ckp, proxi);
@ -1857,7 +1857,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)

Loading…
Cancel
Save