Browse Source

Use async senc proc in the generator

master
Con Kolivas 10 years ago
parent
commit
cada930aa8
  1. 37
      src/generator.c

37
src/generator.c

@ -132,6 +132,7 @@ struct generator_data {
proxy_instance_t *proxy_list; /* Linked list of all active proxies */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
ckwq_t *ckwqs;
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -221,7 +222,7 @@ retry:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -367,7 +368,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1); ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1); memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
send_proc(ckp->stratifier, blockmsg); async_send_proc(ckp, ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) { } else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10)) if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true"); send_unix_msg(sockd, "true");
@ -1297,22 +1298,22 @@ static void *proxy_recv(void *arg)
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */ /* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
break; break;
} }
if (parse_method(proxi, cs->buf)) { if (parse_method(proxi, cs->buf)) {
if (proxi->notified) { if (proxi->notified) {
send_proc(ckp->stratifier, "notify"); async_send_proc(ckp, ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
} }
if (proxi->diffed) { if (proxi->diffed) {
send_proc(ckp->stratifier, "diff"); async_send_proc(ckp, ckp->stratifier, "diff");
proxi->diffed = false; proxi->diffed = false;
} }
if (proxi->reconnect) { if (proxi->reconnect) {
proxi->reconnect = false; proxi->reconnect = false;
LOGWARNING("Reconnect issue, dropping existing connection"); LOGWARNING("Reconnect issue, dropping existing connection");
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
break; break;
} }
continue; continue;
@ -1402,13 +1403,13 @@ static void *passthrough_recv(void *arg)
if (ret < 1) { if (ret < 1) {
/* Send ourselves a reconnect message */ /* Send ourselves a reconnect message */
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
break; break;
} }
/* Simply forward the message on, as is, to the connector to /* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
send_proc(ckp->connector, cs->buf); async_send_proc(ckp, ckp->connector, cs->buf);
} }
return NULL; return NULL;
} }
@ -1523,8 +1524,8 @@ retry:
} }
} }
if (!alive) { if (!alive) {
send_proc(ckp->connector, "reject"); async_send_proc(ckp, ckp->connector, "reject");
send_proc(ckp->stratifier, "dropall"); async_send_proc(ckp, ckp->stratifier, "dropall");
LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!");
sleep(5); sleep(5);
goto retry; goto retry;
@ -1543,7 +1544,7 @@ retry:
create_pthread(&alive->pth_psend, proxy_send, alive); create_pthread(&alive->pth_psend, proxy_send, alive);
} }
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject"); async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -1552,8 +1553,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi)
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; connsock_t *cs;
send_proc(ckp->stratifier, "reconnect"); async_send_proc(ckp, ckp->stratifier, "reconnect");
send_proc(ckp->connector, "reject"); async_send_proc(ckp, ckp->connector, "reject");
if (!proxi) // This shouldn't happen if (!proxi) // This shouldn't happen
return; return;
@ -1603,8 +1604,8 @@ reconnect:
/* We've just subscribed and authorised so tell the stratifier to /* We've just subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */ * retrieve the first subscription. */
if (!ckp->passthrough) { if (!ckp->passthrough) {
send_proc(ckp->stratifier, "subscribe"); async_send_proc(ckp, ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify"); async_send_proc(ckp, ckp->stratifier, "notify");
proxi->notified = false; proxi->notified = false;
} }
@ -1792,7 +1793,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
@ -1838,7 +1839,7 @@ static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break; break;
} }
if (alive) if (alive)
send_proc(ckp->generator, "reconnect"); async_send_proc(ckp, ckp->generator, "reconnect");
} }
@ -1851,6 +1852,8 @@ int generator(proc_instance_t *pi)
LOGWARNING("%s generator starting", ckp->name); LOGWARNING("%s generator starting", ckp->name);
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
/* Generator only requires one work queue */
ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1);
if (ckp->proxy) { if (ckp->proxy) {
gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog);
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);

Loading…
Cancel
Save