diff --git a/src/generator.c b/src/generator.c index 62e4ed89..fe907444 100644 --- a/src/generator.c +++ b/src/generator.c @@ -235,7 +235,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - send_proc(ckp->connector, alive ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -381,7 +381,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - send_proc(ckp->stratifier, blockmsg); + async_send_proc(ckp, ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -941,7 +941,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub char buf[256]; sprintf(buf, "deadproxy=%d:%d", id, subid); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } /* Remove the subproxy from the proxi list and put it on the dead list. @@ -1088,7 +1088,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1116,7 +1116,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_ json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); /* Send diff now as stratifier will not accept diff till it has a @@ -1306,7 +1306,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1352,7 +1352,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, int64_t id) char buf[256]; sprintf(buf, "reconnclient=%"PRId64, id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } static void submit_share(gdata_t *gdata, json_t *val) @@ -1721,9 +1721,9 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } -static void reconnect_generator(const ckpool_t *ckp) +static void reconnect_generator(ckpool_t *ckp) { - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } /* For receiving messages from an upstream pool to pass downstream. Responsible @@ -1780,7 +1780,7 @@ static void *passthrough_recv(void *arg) /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - send_proc(ckp->connector, cs->buf); + async_send_proc(ckp, ckp->connector, cs->buf); } return NULL; } @@ -1985,10 +1985,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) if (ret) break; - send_proc(ckp->connector, "reject"); + async_send_proc(ckp, ckp->connector, "reject"); sleep(1); } - send_proc(ckp->connector, ret ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject"); return ret; } @@ -2017,7 +2017,7 @@ reconnect: proxi->id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%d", proxi->id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } } retry: