From d96b111653264098e5989434366b67c58886cda5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 13 Jun 2014 22:51:34 +1000 Subject: [PATCH] Send a message from the generator to the connector to tell it when it can accept or should reject incoming connections --- src/connector.c | 14 ++++++++++++++ src/generator.c | 2 ++ src/stratifier.c | 11 ++++------- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 36911e5f..73ad62c3 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,6 +29,7 @@ struct connector_instance { proc_instance_t *pi; int serverfd; int nfds; + bool accept; }; typedef struct connector_instance conn_instance_t; @@ -93,6 +94,8 @@ void *acceptor(void *arg) retry: client = ckzalloc(sizeof(client_instance_t)); client->address_len = sizeof(client->address); + while (!ci->accept) + sleep(1); fd = accept(ci->serverfd, &client->address, &client->address_len); if (unlikely(fd < 0)) { LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); @@ -480,6 +483,7 @@ retry: goto out; } + dealloc(buf); buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in connector_loop"); @@ -490,6 +494,16 @@ retry: send_unix_msg(sockd, "pong"); goto retry; } + if (!strncasecmp(buf, "accept", 6)) { + LOGDEBUG("Connector received accept signal"); + ci->accept = true; + goto retry; + } + if (!strncasecmp(buf, "reject", 6)) { + LOGDEBUG("Connector received reject signal"); + ci->accept = false; + goto retry; + } LOGDEBUG("Connector received message: %s", buf); if (!strncasecmp(buf, "shutdown", 8)) diff --git a/src/generator.c b/src/generator.c index aa1ef93a..cee2ecb9 100644 --- a/src/generator.c +++ b/src/generator.c @@ -166,6 +166,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -1159,6 +1160,7 @@ retry: cond_init(&alive->psend_cond); create_pthread(&alive->pth_psend, proxy_send, alive); out: + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } diff --git a/src/stratifier.c b/src/stratifier.c index 92148ece..5caba009 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1996,7 +1996,6 @@ static void *stratum_receiver(void *arg) rename_proc("sreceiver"); - sleep(1); while (42) { stratum_instance_t *instance; @@ -2442,6 +2441,10 @@ int stratifier(proc_instance_t *pi) cond_init(&stratum_send_cond); create_pthread(&pth_stratum_sender, stratum_sender, ckp); + mutex_init(&stratum_recv_lock); + cond_init(&stratum_recv_cond); + create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); + mutex_init(&sshare_lock); cond_init(&sshare_cond); create_pthread(&pth_share_processer, share_processor, ckp); @@ -2461,12 +2464,6 @@ int stratifier(proc_instance_t *pi) load_users(ckp); - /* Start the receiver last to only process requests once all structures - * are set up. */ - mutex_init(&stratum_recv_lock); - cond_init(&stratum_recv_cond); - create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); - ret = stratum_loop(ckp, pi); out: return process_exit(ckp, pi, ret);