From b59760bb4a56da7cbf63e4b748dabd29893361e2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 14 Feb 2015 12:03:41 +1100 Subject: [PATCH] Disable subproxies as they die, moving them to a dead list instead of trying to reuse them and recruit fresh proxies, disconnecting clients connected to them --- src/generator.c | 70 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/src/generator.c b/src/generator.c index e8eec196..83a813e9 100644 --- a/src/generator.c +++ b/src/generator.c @@ -77,6 +77,7 @@ typedef struct proxy_instance proxy_instance_t; /* Per proxied pool instance data */ struct proxy_instance { UT_hash_handle hh; + proxy_instance_t *next; /* For dead proxy list */ ckpool_t *ckp; connsock_t *cs; @@ -101,6 +102,7 @@ struct proxy_instance { bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */ + bool disabled; /* Subproxy no longer to be used */ bool reconnect; /* We need to drop and reconnect */ bool alive; @@ -137,9 +139,11 @@ struct proxy_instance { /* Private data for the generator */ struct generator_data { + ckpool_t *ckp; pthread_mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *proxy; /* Current proxy */ + proxy_instance_t *dead_proxies; /* Disabled proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue }; @@ -410,7 +414,7 @@ static bool send_json_msg(connsock_t *cs, json_t *json_msg) sent = write_socket(cs->fd, s, len); dealloc(s); if (sent != len) { - LOGWARNING("Failed to send %d bytes sent %d in send_json_msg", len, sent); + LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent); return false; } return true; @@ -896,6 +900,7 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val) static void prepare_proxy(proxy_instance_t *proxi); static proxy_instance_t *create_subproxy(proxy_instance_t *proxi); +static bool recruit_subproxy(proxy_instance_t *proxi); static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy) { @@ -906,6 +911,32 @@ static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy) mutex_unlock(&proxi->proxy_lock); } +static proxy_instance_t *__subproxy_by_id(proxy_instance_t *proxy, const int id) +{ + proxy_instance_t *subproxy; + + HASH_FIND_INT(proxy->subproxies, &id, subproxy); + return subproxy; +} + +/* Remove the subproxy from the proxi list. Do not remove its ram in case of + * dangling references */ +static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_instance_t *subproxy) +{ + mutex_lock(&proxi->proxy_lock); + subproxy->disabled = true; + /* Make sure subproxy is still in the list */ + if (__subproxy_by_id(proxi, subproxy->id)) { + HASH_DEL(proxi->subproxies, subproxy); + proxi->client_headroom -= proxi->clients_per_proxy; + LL_PREPEND(gdata->dead_proxies, subproxy); + } + mutex_unlock(&proxi->proxy_lock); + + if (proxi->client_headroom < 42 && proxi->alive) + recruit_subproxy(proxi); +} + static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) { server_instance_t *newsi, *si = proxi->si; @@ -1260,36 +1291,59 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int id) proxy_instance_t *subproxy; mutex_lock(&proxy->proxy_lock); - HASH_FIND_INT(proxy->subproxies, &id, subproxy); + subproxy = __subproxy_by_id(proxy, id); + if (subproxy && subproxy->disabled) + subproxy = NULL; mutex_unlock(&proxy->proxy_lock); return subproxy; } +static void stratifier_drop_client(ckpool_t *ckp, int64_t id) +{ + char buf[256]; + + sprintf(buf, "dropclient=%"PRId64, id); + send_proc(ckp->stratifier, buf); +} + static void submit_share(gdata_t *gdata, json_t *val) { proxy_instance_t *proxy, *proxi; + ckpool_t *ckp = gdata->ckp; stratum_msg_t *msg; share_msg_t *share; + int64_t client_id; int id, subid; + /* Get the client id so we can tell the stratifier to drop it if the + * proxy it's bound to is not functional */ + json_getdel_int64(&client_id, val, "client_id"); json_getdel_int(&id, val, "proxy"); proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { LOGWARNING("Failed to find proxy %d to send share to", id); + stratifier_drop_client(ckp, client_id); return json_decref(val); } json_get_int(&subid, val, "subproxy"); proxi = subproxy_by_id(proxy, subid); if (unlikely(!proxi)) { - LOGWARNING("Failed to find subproxy %d to send share to", subid); + LOGNOTICE("Failed to find proxy %d:%d to send share to", id, subid); + stratifier_drop_client(ckp, client_id); + return json_decref(val); + } + if (!proxi->alive) { + LOGNOTICE("Client %"PRId64" attempting to send shares to dead proxy %d, dropping", + client_id, id); + stratifier_drop_client(ckp, client_id); return json_decref(val); } msg = ckzalloc(sizeof(stratum_msg_t)); share = ckzalloc(sizeof(share_msg_t)); share->submit_time = time(NULL); - json_getdel_int(&share->client_id, val, "client_id"); + share->client_id = client_id; json_getdel_int(&share->msg_id, val, "msg_id"); msg->json_msg = val; @@ -1361,6 +1415,7 @@ static void *proxy_send(void *arg) { proxy_instance_t *proxy = (proxy_instance_t *)arg; connsock_t *cs = proxy->cs; + gdata_t *gdata = cs->ckp->data; rename_proc("proxysend"); @@ -1405,6 +1460,10 @@ static void *proxy_send(void *arg) "id", json_object_dup(msg->json_msg, "id"), "method", "mining.submit"); ret = send_json_msg(cs, val); + if (unlikely(!ret && !subproxy->disabled)) { + LOGWARNING("Proxy %d:%d dead, disabling", proxy->id, subid); + disable_subproxy(gdata, proxy, subproxy); + } json_decref(val); } else { LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", @@ -1446,8 +1505,6 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) ckmsgq_add(proxi->passsends, pm); } -static bool recruit_subproxy(proxy_instance_t *proxi); - static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, connsock_t *cs, bool pinging, int epfd) { @@ -2034,6 +2091,7 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; + gdata->ckp = ckp; if (ckp->proxy) { ret = proxy_mode(ckp, pi); } else {