Browse Source

Remove use of server_instances by proxies in the generator

master
Con Kolivas 10 years ago
parent
commit
2122b0f689
  1. 228
      src/generator.c

228
src/generator.c

@ -82,15 +82,15 @@ struct proxy_instance {
proxy_instance_t *prev; /* For dead proxy list */
ckpool_t *ckp;
connsock_t *cs;
server_instance_t *si;
connsock_t cs;
bool passthrough;
int64_t id; /* Proxy server id*/
int low_id; /* Low bits of id */
int subid; /* Subproxy id */
const char *auth;
const char *pass;
char *url;
char *auth;
char *pass;
char *enonce1;
char *enonce1bin;
@ -581,7 +581,7 @@ retry:
parsed = true;
if (!(buf = new_proxy_line(cs))) {
LOGNOTICE("Proxy %ld:%d %s failed to receive line in parse_subscribe",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
goto out;
}
LOGDEBUG("parse_subscribe received %s", buf);
@ -614,7 +614,7 @@ retry:
goto retry;
}
LOGNOTICE("Proxy %ld:%d %s failed to parse subscribe response in parse_subscribe",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
goto out;
}
@ -649,7 +649,7 @@ retry:
if (size < 3) {
if (!proxi->subid) {
LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners",
proxi->low_id, proxi->si->url, size);
proxi->low_id, proxi->url, size);
} else {
LOGNOTICE("Proxy %ld:%d Nonce2 length %d too small for fast miners",
proxi->id, proxi->subid, size);
@ -699,7 +699,7 @@ retry:
json_decref(req);
if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to send message in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
goto out;
}
ret = parse_subscribe(cs, proxi);
@ -708,16 +708,16 @@ retry:
if (proxi->no_params) {
LOGNOTICE("Proxy %ld:%d %s failed all subscription options in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
goto out;
}
LOGINFO("Proxy %ld:%d %s failed connecting with parameters in subscribe_stratum, retrying without",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
proxi->no_params = true;
ret = connect_proxy(cs);
if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to reconnect in subscribe_stratum",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
goto out;
}
goto retry;
@ -873,12 +873,11 @@ static bool parse_diff(proxy_instance_t *proxi, json_t *val)
static bool send_version(proxy_instance_t *proxi, json_t *val)
{
json_t *json_msg, *id_val = json_object_dup(val, "id");
connsock_t *cs = proxi->cs;
bool ret;
JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", PACKAGE"/"VERSION,
"error", json_null());
ret = send_json_msg(cs, json_msg);
ret = send_json_msg(&proxi->cs, json_msg);
json_decref(json_msg);
return ret;
}
@ -899,12 +898,11 @@ static bool show_message(json_t *val)
static bool send_pong(proxy_instance_t *proxi, json_t *val)
{
json_t *json_msg, *id_val = json_object_dup(val, "id");
connsock_t *cs = proxi->cs;
bool ret;
JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", "pong",
"error", json_null());
ret = send_json_msg(cs, json_msg);
ret = send_json_msg(&proxi->cs, json_msg);
json_decref(json_msg);
return ret;
}
@ -933,6 +931,9 @@ static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy)
LOGINFO("Recycling data from proxy %ld:%d", proxy->id, proxy->subid);
mutex_lock(&gdata->lock);
dealloc(proxy->url);
dealloc(proxy->auth);
dealloc(proxy->pass);
DL_APPEND(gdata->dead_proxies, proxy);
mutex_unlock(&gdata->lock);
}
@ -952,9 +953,9 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
{
subproxy->alive = false;
send_stratifier_deadproxy(gdata->ckp, subproxy->id, subproxy->subid);
if (subproxy->cs->fd > 0) {
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, subproxy->cs->fd, NULL);
Close(subproxy->cs->fd);
if (subproxy->cs.fd > 0) {
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, subproxy->cs.fd, NULL);
Close(subproxy->cs.fd);
}
if (parent_proxy(subproxy))
return;
@ -973,7 +974,6 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{
server_instance_t *newsi, *si = proxi->si;
proxy_instance_t *parent, *newproxi;
bool sameurl = false, ret = false;
int64_t high_id, low_id, new_id;
@ -997,10 +997,10 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
char *dot_pool, *dot_reconnect;
int len;
dot_pool = strchr(si->url, '.');
dot_pool = strchr(proxi->url, '.');
if (!dot_pool) {
LOGWARNING("Denied stratum reconnect request from server without domain %s",
si->url);
proxi->url);
goto out;
}
dot_reconnect = strchr(new_url, '.');
@ -1012,12 +1012,12 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
len = strlen(dot_reconnect);
if (strncmp(dot_pool, dot_reconnect, len)) {
LOGWARNING("Denied stratum reconnect request from %s to non-matching domain %s",
si->url, new_url);
proxi->url, new_url);
goto out;
}
ASPRINTF(&url, "%s:%d", new_url, new_port);
} else {
url = strdup(si->url);
url = strdup(proxi->url);
sameurl = true;
}
LOGINFO("Processing reconnect request to %s", url);
@ -1043,27 +1043,18 @@ static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
} else
proxi->redirecting = false;
newsi = ckzalloc(sizeof(server_instance_t));
mutex_lock(&gdata->lock);
high_id = proxi->id >> 32; /* Use the high bits for the reconnect id */
high_id++;
high_id <<= 32;
low_id = proxi->id & 0x00000000FFFFFFFFll; /* Use the low bits for the master id */
new_id = high_id | low_id;
ckp->servers[low_id] = newsi;
newsi->url = url;
newsi->auth = strdup(si->auth);
newsi->pass = strdup(si->pass);
newproxi = ckzalloc(sizeof(proxy_instance_t));
newsi->data = newproxi;
newproxi->auth = newsi->auth;
newproxi->pass = newsi->pass;
newproxi->si = newsi;
newproxi->url = url;
newproxi->auth = strdup(proxi->auth);
newproxi->pass = strdup(proxi->pass);
newproxi->ckp = ckp;
newproxi->cs = &newsi->cs;
newproxi->cs->ckp = ckp;
newproxi->cs.ckp = ckp;
newproxi->low_id = low_id;
newproxi->id = new_id;
newproxi->subproxy_count = ++proxi->subproxy_count;
@ -1231,7 +1222,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
json_decref(req);
if (!ret) {
LOGNOTICE("Proxy %ld:%d %s failed to send message in auth_stratum",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
if (cs->fd > 0) {
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL);
Close(cs->fd);
@ -1246,7 +1237,7 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
buf = next_proxy_line(cs, proxi);
if (!buf) {
LOGNOTICE("Proxy %ld:%d %s failed to receive line in auth_stratum",
proxi->id, proxi->subid, proxi->si->url);
proxi->id, proxi->subid, proxi->url);
ret = false;
goto out;
}
@ -1256,27 +1247,27 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
val = json_msg_result(buf, &res_val, &err_val);
if (!val) {
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
proxi->low_id, proxi->subid, proxi->url, buf);
goto out;
}
if (err_val && !json_is_null(err_val)) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
proxi->low_id, proxi->subid, proxi->url, buf);
goto out;
}
if (res_val) {
ret = json_is_true(res_val);
if (!ret) {
LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s",
proxi->low_id, proxi->subid, proxi->si->url, buf);
proxi->low_id, proxi->subid, proxi->url, buf);
goto out;
}
} else {
/* No result and no error but successful val means auth success */
ret = true;
}
LOGINFO("Proxy %ld:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->si->url);
LOGINFO("Proxy %ld:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->url);
out:
if (val)
json_decref(val);
@ -1496,7 +1487,7 @@ out:
static void *proxy_send(void *arg)
{
proxy_instance_t *proxy = (proxy_instance_t *)arg;
connsock_t *cs = proxy->cs;
connsock_t *cs = &proxy->cs;
ckpool_t *ckp = cs->ckp;
gdata_t *gdata = ckp->data;
stratum_msg_t *msg = NULL;
@ -1569,7 +1560,7 @@ static void *proxy_send(void *arg)
subproxy = subproxy_by_id(proxy, subid);
if (subproxy)
cs = subproxy->cs;
cs = &subproxy->cs;
if (jobid && subproxy) {
JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid,
json_object_dup(msg->json_msg, "nonce2"),
@ -1582,7 +1573,7 @@ static void *proxy_send(void *arg)
} else if (!jobid) {
stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Proxy %ld:%s failed to find matching jobid for %sknown subproxy in proxysend",
proxy->id, proxy->si->url, subproxy ? "" : "un");
proxy->id, proxy->url, subproxy ? "" : "un");
} else {
stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Failed to find subproxy %ld:%d to send message to",
@ -1590,7 +1581,7 @@ static void *proxy_send(void *arg)
}
if (!ret && subproxy) {
LOGNOTICE("Proxy %ld:%d %s failed to send msg in proxy_send, dropping to reconnect",
id, subid, proxy->si->url);
id, subid, proxy->url);
disable_subproxy(gdata, proxy, subproxy);
}
}
@ -1616,13 +1607,13 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
{
pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t));
pm->cs = proxi->cs;
pm->cs = &proxi->cs;
ASPRINTF(&pm->msg, "%s\n", msg);
ckmsgq_add(proxi->passsends, pm);
}
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging, int epfd)
static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
bool pinging, int epfd)
{
struct epoll_event event;
bool ret = false;
@ -1630,8 +1621,8 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
/* Has this proxy already been reconnected? */
if (cs->fd > 0)
return true;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
if (!extract_sockaddr(proxi->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", proxi->url);
goto out;
}
if (!connect_proxy(cs)) {
@ -1663,7 +1654,7 @@ static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *
if (!auth_stratum(ckp, cs, proxi)) {
if (!pinging) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass);
cs->url, cs->port, proxi->auth, proxi->pass);
}
goto out;
}
@ -1707,21 +1698,20 @@ static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi
subproxy->disabled = false;
} else {
subproxy = ckzalloc(sizeof(proxy_instance_t));
subproxy->cs = ckzalloc(sizeof(connsock_t));
mutex_init(&subproxy->share_lock);
}
mutex_unlock(&gdata->lock);
subproxy->cs->ckp = subproxy->ckp = proxi->ckp;
subproxy->si = proxi->si;
subproxy->cs.ckp = subproxy->ckp = proxi->ckp;
mutex_lock(&proxi->proxy_lock);
subproxy->subid = ++proxi->subproxy_count;
mutex_unlock(&proxi->proxy_lock);
subproxy->id = proxi->id;
subproxy->auth = proxi->auth;
subproxy->pass = proxi->pass;
subproxy->url = strdup(proxi->url);
subproxy->auth = strdup(proxi->auth);
subproxy->pass = strdup(proxi->pass);
subproxy->parent = proxi;
subproxy->epfd = proxi->epfd;
return subproxy;
@ -1739,7 +1729,7 @@ static void *proxy_recruit(void *arg)
retry:
recruit = false;
proxy = create_subproxy(gdata, parent);
alive = proxy_alive(ckp, proxy->si, proxy, proxy->cs, false, parent->epfd);
alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd);
if (!alive) {
LOGNOTICE("Subproxy failed proxy_alive testing");
store_proxy(gdata, proxy);
@ -1787,12 +1777,11 @@ static void recruit_subproxy(proxy_instance_t *proxi, const char *buf)
static void *proxy_reconnect(void *arg)
{
proxy_instance_t *proxy = (proxy_instance_t *)arg;
server_instance_t *si = proxy->si;
connsock_t *cs = proxy->cs;
connsock_t *cs = &proxy->cs;
ckpool_t *ckp = proxy->ckp;
pthread_detach(pthread_self());
proxy_alive(ckp, si, proxy, cs, true, proxy->epfd);
proxy_alive(ckp, proxy, cs, true, proxy->epfd);
proxy->reconnecting = false;
return NULL;
}
@ -1813,8 +1802,7 @@ static void reconnect_proxy(proxy_instance_t *proxi)
static void *passthrough_recv(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
server_instance_t *si = proxi->si;
connsock_t *cs = proxi->cs;
connsock_t *cs = &proxi->cs;
ckpool_t *ckp = proxi->ckp;
struct epoll_event event;
bool alive;
@ -1828,17 +1816,17 @@ static void *passthrough_recv(void *arg)
return NULL;
}
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
if (proxy_alive(ckp, proxi, cs, false, epfd)) {
reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s connection established",
proxi->low_id, proxi->si->url);
proxi->low_id, proxi->url);
}
alive = proxi->alive;
while (42) {
int ret;
while (!proxy_alive(ckp, si, proxi, cs, true, epfd)) {
while (!proxy_alive(ckp, proxi, cs, true, epfd)) {
if (alive) {
alive = false;
reconnect_generator(ckp);
@ -1854,7 +1842,7 @@ static void *passthrough_recv(void *arg)
ret = read_socket_line(cs, 60);
if (ret < 1) {
LOGWARNING("Proxy %ld:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url);
proxi->id, proxi->url);
alive = proxi->alive = false;
reconnect_generator(ckp);
continue;
@ -1901,8 +1889,7 @@ static void *proxy_recv(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
proxy_instance_t *subproxy, *tmp;
server_instance_t *si = proxi->si;
connsock_t *cs = proxi->cs;
connsock_t *cs = &proxi->cs;
ckpool_t *ckp = proxi->ckp;
gdata_t *gdata = ckp->data;
struct epoll_event event;
@ -1917,9 +1904,9 @@ static void *proxy_recv(void *arg)
return NULL;
}
if (proxy_alive(ckp, si, proxi, cs, false, epfd)) {
if (proxy_alive(ckp, proxi, cs, false, epfd)) {
LOGWARNING("Proxy %d:%s connection established",
proxi->low_id, proxi->si->url);
proxi->low_id, proxi->url);
}
alive = proxi->alive;
@ -1936,7 +1923,7 @@ static void *proxy_recv(void *arg)
reconnect_proxy(proxi);
if (alive) {
LOGWARNING("Proxy %d:%s failed, attempting reconnect",
proxi->low_id, proxi->si->url);
proxi->low_id, proxi->url);
alive = false;
}
sleep(5);
@ -1947,7 +1934,7 @@ static void *proxy_recv(void *arg)
* to prevent switching to unstable pools. */
if (!alive && (!current_proxy(gdata) || time(NULL) - proxi->reconnect_time > 30)) {
reconnect_generator(ckp);
LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->si->url);
LOGWARNING("Proxy %d:%s recovered", proxi->low_id, proxi->url);
proxi->reconnect_time = 0;
alive = true;
}
@ -1980,7 +1967,7 @@ static void *proxy_recv(void *arg)
ret = epoll_wait(epfd, &event, 1, 600000);
if (likely(ret > 0)) {
subproxy = event.data.ptr;
cs = subproxy->cs;
cs = &subproxy->cs;
if (event.events & EPOLLHUP)
ret = -1;
else
@ -1988,7 +1975,7 @@ static void *proxy_recv(void *arg)
}
if (ret < 1) {
LOGNOTICE("Proxy %ld:%d %s failed to epoll/read_socket_line in proxy_recv",
proxi->id, subproxy->subid, subproxy->si->url);
proxi->id, subproxy->subid, subproxy->url);
disable_subproxy(gdata, proxi, subproxy);
continue;
}
@ -2001,7 +1988,7 @@ static void *proxy_recv(void *arg)
disable_subproxy(gdata, proxi, subproxy);
if (parent_proxy(subproxy)) {
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
subproxy->low_id, subproxy->si->url);
subproxy->low_id, subproxy->url);
goto out;
}
}
@ -2017,9 +2004,9 @@ out:
HASH_ITER(sh, proxi->subproxies, subproxy, tmp) {
subproxy->disabled = true;
send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid);
if (subproxy->cs->fd > 0) {
epoll_ctl(epfd, EPOLL_CTL_DEL, subproxy->cs->fd, NULL);
Close(subproxy->cs->fd);
if (subproxy->cs.fd > 0) {
epoll_ctl(epfd, EPOLL_CTL_DEL, subproxy->cs.fd, NULL);
Close(subproxy->cs.fd);
}
HASH_DELETE(sh, proxi->subproxies, subproxy);
}
@ -2039,27 +2026,6 @@ static void prepare_proxy(proxy_instance_t *proxi)
create_pthread(&proxi->pth_precv, proxy_recv, proxi);
}
static void setup_proxies(ckpool_t *ckp, gdata_t *gdata)
{
int i;
for (i = 0; i < ckp->proxies; i++) {
proxy_instance_t *proxi;
server_instance_t *si;
si = ckp->servers[i];
proxi = si->data;
proxi->id = proxi->low_id = i;
HASH_ADD_I64(gdata->proxies, id, proxi);
if (ckp->passthrough) {
create_pthread(&proxi->pth_precv, passthrough_recv, proxi);
proxi->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
} else {
prepare_proxy(proxi);
}
}
}
static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
{
proxy_instance_t *ret = NULL, *proxi, *tmp;
@ -2164,7 +2130,7 @@ out:
send_api_response(val, sockd);
}
static void add_proxy(ckpool_t *ckp, const int num);
static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int num);
static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf)
{
@ -2190,13 +2156,13 @@ static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const
mutex_lock(&gdata->lock);
id = ckp->proxies++;
ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies);
ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies);
ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies);
ckp->proxyurl[id] = strdup(url);
ckp->proxyauth[id] = strdup(auth);
ckp->proxypass[id] = strdup(pass);
add_proxy(ckp, id);
proxy = ckp->servers[id]->data;
proxy->id = proxy->low_id = id;
HASH_ADD_I64(gdata->proxies, id, proxy);
proxy = __add_proxy(ckp, gdata, id);
mutex_unlock(&gdata->lock);
prepare_proxy(proxy);
@ -2215,7 +2181,6 @@ static int proxy_loop(proc_instance_t *pi)
gdata_t *gdata = ckp->data;
char *buf = NULL;
setup_proxies(ckp, gdata);
reconnect:
Close(sockd);
/* This does not necessarily mean we reconnect, but a change has
@ -2226,9 +2191,8 @@ reconnect:
if (proxi != cproxy) {
proxi = cproxy;
if (!ckp->passthrough) {
connsock_t *cs = proxi->cs;
LOGWARNING("Successfully connected to proxy %d %s:%s as proxy",
proxi->low_id, cs->url, cs->port);
proxi->low_id, proxi->url, proxi->cs.port);
dealloc(buf);
ASPRINTF(&buf, "proxy=%ld", proxi->id);
send_proc(ckp->stratifier, buf);
@ -2327,41 +2291,41 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
return ret;
}
static void add_proxy(ckpool_t *ckp, const int num)
static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id)
{
proxy_instance_t *proxy;
server_instance_t *si;
ckp->servers = realloc(ckp->servers, sizeof(server_instance_t *) * (num + 1));
ckp->servers[num] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[num];
si->url = strdup(ckp->proxyurl[num]);
si->auth = strdup(ckp->proxyauth[num]);
si->pass = strdup(ckp->proxypass[num]);
proxy = ckzalloc(sizeof(proxy_instance_t));
si->data = proxy;
proxy->auth = si->auth;
proxy->pass = si->pass;
proxy->si = si;
proxy->id = proxy->low_id = id;
proxy->url = strdup(ckp->proxyurl[id]);
proxy->auth = strdup(ckp->proxyauth[id]);
proxy->pass = strdup(ckp->proxypass[id]);
proxy->ckp = ckp;
proxy->cs = &si->cs;
proxy->cs->ckp = ckp;
proxy->cs.ckp = ckp;
mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock);
HASH_ADD_I64(gdata->proxies, id, proxy);
return proxy;
}
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
{
gdata_t *gdata = ckp->data;
proxy_instance_t *proxy;
server_instance_t *si;
int i, ret;
mutex_init(&gdata->lock);
/* Create all our proxy structures and pointers */
for (i = 0; i < ckp->proxies; i++)
add_proxy(ckp, i);
for (i = 0; i < ckp->proxies; i++) {
proxy = __add_proxy(ckp, gdata, i);
if (ckp->passthrough) {
create_pthread(&proxy->pth_precv, passthrough_recv, proxy);
proxy->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
} else {
prepare_proxy(proxy);
}
}
LOGWARNING("%s generator ready", ckp->name);
@ -2369,24 +2333,20 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
mutex_lock(&gdata->lock);
for (i = 0; i < ckp->proxies; i++) {
si = ckp->servers[i];
Close(si->cs.fd);
proxy = si->data;
continue; // FIXME: Find proxies
Close(proxy->cs.fd);
free(proxy->enonce1);
free(proxy->enonce1bin);
pthread_cancel(proxy->pth_psend);
pthread_cancel(proxy->pth_precv);
join_pthread(proxy->pth_psend);
join_pthread(proxy->pth_precv);
dealloc(si->data);
dealloc(si->url);
dealloc(si->auth);
dealloc(si->pass);
dealloc(si);
dealloc(proxy->url);
dealloc(proxy->auth);
dealloc(proxy->pass);
}
mutex_unlock(&gdata->lock);
dealloc(ckp->servers);
return ret;
}

Loading…
Cancel
Save