Browse Source

Send a message from the generator to the connector to tell it when it can accept or should reject incoming connections

master
Con Kolivas 11 years ago
parent
commit
d96b111653
  1. 14
      src/connector.c
  2. 2
      src/generator.c
  3. 11
      src/stratifier.c

14
src/connector.c

@ -29,6 +29,7 @@ struct connector_instance {
proc_instance_t *pi; proc_instance_t *pi;
int serverfd; int serverfd;
int nfds; int nfds;
bool accept;
}; };
typedef struct connector_instance conn_instance_t; typedef struct connector_instance conn_instance_t;
@ -93,6 +94,8 @@ void *acceptor(void *arg)
retry: retry:
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
client->address_len = sizeof(client->address); client->address_len = sizeof(client->address);
while (!ci->accept)
sleep(1);
fd = accept(ci->serverfd, &client->address, &client->address_len); fd = accept(ci->serverfd, &client->address, &client->address_len);
if (unlikely(fd < 0)) { if (unlikely(fd < 0)) {
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
@ -480,6 +483,7 @@ retry:
goto out; goto out;
} }
dealloc(buf);
buf = recv_unix_msg(sockd); buf = recv_unix_msg(sockd);
if (!buf) { if (!buf) {
LOGWARNING("Failed to get message in connector_loop"); LOGWARNING("Failed to get message in connector_loop");
@ -490,6 +494,16 @@ retry:
send_unix_msg(sockd, "pong"); send_unix_msg(sockd, "pong");
goto retry; goto retry;
} }
if (!strncasecmp(buf, "accept", 6)) {
LOGDEBUG("Connector received accept signal");
ci->accept = true;
goto retry;
}
if (!strncasecmp(buf, "reject", 6)) {
LOGDEBUG("Connector received reject signal");
ci->accept = false;
goto retry;
}
LOGDEBUG("Connector received message: %s", buf); LOGDEBUG("Connector received message: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) if (!strncasecmp(buf, "shutdown", 8))

2
src/generator.c

@ -166,6 +166,7 @@ retry:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }
@ -1159,6 +1160,7 @@ retry:
cond_init(&alive->psend_cond); cond_init(&alive->psend_cond);
create_pthread(&alive->pth_psend, proxy_send, alive); create_pthread(&alive->pth_psend, proxy_send, alive);
out: out:
send_proc(ckp->connector, alive ? "accept" : "reject");
return alive; return alive;
} }

11
src/stratifier.c

@ -1996,7 +1996,6 @@ static void *stratum_receiver(void *arg)
rename_proc("sreceiver"); rename_proc("sreceiver");
sleep(1);
while (42) { while (42) {
stratum_instance_t *instance; stratum_instance_t *instance;
@ -2442,6 +2441,10 @@ int stratifier(proc_instance_t *pi)
cond_init(&stratum_send_cond); cond_init(&stratum_send_cond);
create_pthread(&pth_stratum_sender, stratum_sender, ckp); create_pthread(&pth_stratum_sender, stratum_sender, ckp);
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
mutex_init(&sshare_lock); mutex_init(&sshare_lock);
cond_init(&sshare_cond); cond_init(&sshare_cond);
create_pthread(&pth_share_processer, share_processor, ckp); create_pthread(&pth_share_processer, share_processor, ckp);
@ -2461,12 +2464,6 @@ int stratifier(proc_instance_t *pi)
load_users(ckp); load_users(ckp);
/* Start the receiver last to only process requests once all structures
* are set up. */
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
ret = stratum_loop(ckp, pi); ret = stratum_loop(ckp, pi);
out: out:
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);

Loading…
Cancel
Save