Browse Source

Use async send proc in the generator

master
Con Kolivas 10 years ago
parent
commit
3f103ef67e
  1. 26
      src/generator.c

26
src/generator.c

@ -235,7 +235,7 @@ retry:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -381,7 +381,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1); ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1); memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
send_proc(ckp->stratifier, blockmsg); async_send_proc(ckp, ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) { } else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10)) if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
@ -941,7 +941,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub
char buf[256]; char buf[256];
sprintf(buf, "deadproxy=%d:%d", id, subid); sprintf(buf, "deadproxy=%d:%d", id, subid);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
} }
/* Remove the subproxy from the proxi list and put it on the dead list. /* Remove the subproxy from the proxi list and put it on the dead list.
@ -1088,7 +1088,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "diff=%s", msg); ASPRINTF(&buf, "diff=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
} }
@ -1116,7 +1116,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "notify=%s", msg); ASPRINTF(&buf, "notify=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
/* Send diff now as stratifier will not accept diff till it has a /* Send diff now as stratifier will not accept diff till it has a
@ -1306,7 +1306,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg); ASPRINTF(&buf, "subscribe=%s", msg);
free(msg); free(msg);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
free(buf); free(buf);
} }
@ -1352,7 +1352,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, int64_t id)
char buf[256]; char buf[256];
sprintf(buf, "reconnclient=%"PRId64, id); sprintf(buf, "reconnclient=%"PRId64, id);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
} }
static void submit_share(gdata_t *gdata, json_t *val) static void submit_share(gdata_t *gdata, json_t *val)
@ -1721,9 +1721,9 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi); create_pthread(&pth, proxy_reconnect, proxi);
} }
static void reconnect_generator(const ckpool_t *ckp) static void reconnect_generator(ckpool_t *ckp)
{ {
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
@ -1780,7 +1780,7 @@ static void *passthrough_recv(void *arg)
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
send_proc(ckp->connector, cs->buf); async_send_proc(ckp, ckp->connector, cs->buf);
} }
return NULL; return NULL;
} }
@ -1985,10 +1985,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
if (ret) if (ret)
break; break;
send_proc(ckp->connector, "reject"); async_send_proc(ckp, ckp->connector, "reject");
sleep(1); sleep(1);
} }
send_proc(ckp->connector, ret ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject");
return ret; return ret;
} }
@ -2017,7 +2017,7 @@ reconnect:
proxi->id, cs->url, cs->port); proxi->id, cs->url, cs->port);
dealloc(buf); dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id); ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf); async_send_proc(ckp, ckp->stratifier, buf);
} }
} }
retry: retry:

Loading…
Cancel
Save