diff --git a/src/generator.c b/src/generator.c index 1eb7e8fb..a47dfe74 100644 --- a/src/generator.c +++ b/src/generator.c @@ -221,7 +221,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - send_proc(ckp->connector, alive ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -367,7 +367,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - send_proc(ckp->stratifier, blockmsg); + async_send_proc(ckp, ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -972,7 +972,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1007,7 +1007,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1195,7 +1195,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1428,7 +1428,7 @@ static void *passthrough_recv(void *arg) if (proxy_alive(ckp, si, proxi, cs, false)) { proxi->alive = true; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1439,13 +1439,13 @@ static void *passthrough_recv(void *arg) while (!proxy_alive(ckp, si, proxi, cs, true)) { if (proxi->alive) { proxi->alive = false; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } sleep(5); } if (!proxi->alive) { proxi->alive = true; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } do { @@ -1456,13 +1456,13 @@ static void *passthrough_recv(void *arg) LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", proxi->id, proxi->si->url); proxi->alive = false; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); continue; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - send_proc(ckp->connector, cs->buf); + async_send_proc(ckp, ckp->connector, cs->buf); } return NULL; } @@ -1496,7 +1496,7 @@ static void *proxy_recv(void *arg) if (proxy_alive(ckp, si, proxi, cs, false)) { proxi->alive = true; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1510,7 +1510,7 @@ static void *proxy_recv(void *arg) while (!proxy_alive(ckp, si, proxi, cs, true)) { if (proxi->alive) { proxi->alive = false; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } sleep(5); proxi->reconnect_time = time(NULL); @@ -1522,7 +1522,7 @@ static void *proxy_recv(void *arg) LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); proxi->alive = true; proxi->reconnect_time = 0; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } now = time(NULL); @@ -1572,7 +1572,7 @@ static void *proxy_recv(void *arg) * pool is up */ proxi->reconnect = false; proxi->alive = false; - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", proxi->id, proxi->si->url); Close(cs->fd); @@ -1644,7 +1644,7 @@ static proxy_instance_t *best_proxy(ckpool_t *ckp, gdata_t *gdata) break; sleep(1); } - send_proc(ckp->connector, ret ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject"); return ret; } @@ -1672,7 +1672,7 @@ reconnect: proxi->id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%d", proxi->id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); /* Send a notify for the new chosen proxy or the * stratifier won't be able to switch. */ send_notify(ckp, proxi); @@ -1857,7 +1857,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } int generator(proc_instance_t *pi)