Browse Source

Disable subproxies as they die, moving them to a dead list instead of trying to reuse them and recruit fresh proxies, disconnecting clients connected to them

master
Con Kolivas 10 years ago
parent
commit
b59760bb4a
  1. 70
      src/generator.c

70
src/generator.c

@ -77,6 +77,7 @@ typedef struct proxy_instance proxy_instance_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
UT_hash_handle hh; UT_hash_handle hh;
proxy_instance_t *next; /* For dead proxy list */
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
@ -101,6 +102,7 @@ struct proxy_instance {
bool no_sessionid; /* Doesn't support session id resume on subscribe */ bool no_sessionid; /* Doesn't support session id resume on subscribe */
bool no_params; /* Doesn't want any parameters on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */
bool disabled; /* Subproxy no longer to be used */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
bool alive; bool alive;
@ -137,9 +139,11 @@ struct proxy_instance {
/* Private data for the generator */ /* Private data for the generator */
struct generator_data { struct generator_data {
ckpool_t *ckp;
pthread_mutex_t lock; /* Lock protecting linked lists */ pthread_mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *proxies; /* Hash list of all proxies */
proxy_instance_t *proxy; /* Current proxy */ proxy_instance_t *proxy; /* Current proxy */
proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
}; };
@ -410,7 +414,7 @@ static bool send_json_msg(connsock_t *cs, json_t *json_msg)
sent = write_socket(cs->fd, s, len); sent = write_socket(cs->fd, s, len);
dealloc(s); dealloc(s);
if (sent != len) { if (sent != len) {
LOGWARNING("Failed to send %d bytes sent %d in send_json_msg", len, sent); LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent);
return false; return false;
} }
return true; return true;
@ -896,6 +900,7 @@ static bool send_pong(proxy_instance_t *proxi, json_t *val)
static void prepare_proxy(proxy_instance_t *proxi); static void prepare_proxy(proxy_instance_t *proxi);
static proxy_instance_t *create_subproxy(proxy_instance_t *proxi); static proxy_instance_t *create_subproxy(proxy_instance_t *proxi);
static bool recruit_subproxy(proxy_instance_t *proxi);
static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy) static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy)
{ {
@ -906,6 +911,32 @@ static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy)
mutex_unlock(&proxi->proxy_lock); mutex_unlock(&proxi->proxy_lock);
} }
static proxy_instance_t *__subproxy_by_id(proxy_instance_t *proxy, const int id)
{
proxy_instance_t *subproxy;
HASH_FIND_INT(proxy->subproxies, &id, subproxy);
return subproxy;
}
/* Remove the subproxy from the proxi list. Do not remove its ram in case of
* dangling references */
static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_instance_t *subproxy)
{
mutex_lock(&proxi->proxy_lock);
subproxy->disabled = true;
/* Make sure subproxy is still in the list */
if (__subproxy_by_id(proxi, subproxy->id)) {
HASH_DEL(proxi->subproxies, subproxy);
proxi->client_headroom -= proxi->clients_per_proxy;
LL_PREPEND(gdata->dead_proxies, subproxy);
}
mutex_unlock(&proxi->proxy_lock);
if (proxi->client_headroom < 42 && proxi->alive)
recruit_subproxy(proxi);
}
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val) static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{ {
server_instance_t *newsi, *si = proxi->si; server_instance_t *newsi, *si = proxi->si;
@ -1260,36 +1291,59 @@ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int id)
proxy_instance_t *subproxy; proxy_instance_t *subproxy;
mutex_lock(&proxy->proxy_lock); mutex_lock(&proxy->proxy_lock);
HASH_FIND_INT(proxy->subproxies, &id, subproxy); subproxy = __subproxy_by_id(proxy, id);
if (subproxy && subproxy->disabled)
subproxy = NULL;
mutex_unlock(&proxy->proxy_lock); mutex_unlock(&proxy->proxy_lock);
return subproxy; return subproxy;
} }
static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
{
char buf[256];
sprintf(buf, "dropclient=%"PRId64, id);
send_proc(ckp->stratifier, buf);
}
static void submit_share(gdata_t *gdata, json_t *val) static void submit_share(gdata_t *gdata, json_t *val)
{ {
proxy_instance_t *proxy, *proxi; proxy_instance_t *proxy, *proxi;
ckpool_t *ckp = gdata->ckp;
stratum_msg_t *msg; stratum_msg_t *msg;
share_msg_t *share; share_msg_t *share;
int64_t client_id;
int id, subid; int id, subid;
/* Get the client id so we can tell the stratifier to drop it if the
* proxy it's bound to is not functional */
json_getdel_int64(&client_id, val, "client_id");
json_getdel_int(&id, val, "proxy"); json_getdel_int(&id, val, "proxy");
proxy = proxy_by_id(gdata, id); proxy = proxy_by_id(gdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
LOGWARNING("Failed to find proxy %d to send share to", id); LOGWARNING("Failed to find proxy %d to send share to", id);
stratifier_drop_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
proxi = subproxy_by_id(proxy, subid); proxi = subproxy_by_id(proxy, subid);
if (unlikely(!proxi)) { if (unlikely(!proxi)) {
LOGWARNING("Failed to find subproxy %d to send share to", subid); LOGNOTICE("Failed to find proxy %d:%d to send share to", id, subid);
stratifier_drop_client(ckp, client_id);
return json_decref(val);
}
if (!proxi->alive) {
LOGNOTICE("Client %"PRId64" attempting to send shares to dead proxy %d, dropping",
client_id, id);
stratifier_drop_client(ckp, client_id);
return json_decref(val); return json_decref(val);
} }
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(stratum_msg_t));
share = ckzalloc(sizeof(share_msg_t)); share = ckzalloc(sizeof(share_msg_t));
share->submit_time = time(NULL); share->submit_time = time(NULL);
json_getdel_int(&share->client_id, val, "client_id"); share->client_id = client_id;
json_getdel_int(&share->msg_id, val, "msg_id"); json_getdel_int(&share->msg_id, val, "msg_id");
msg->json_msg = val; msg->json_msg = val;
@ -1361,6 +1415,7 @@ static void *proxy_send(void *arg)
{ {
proxy_instance_t *proxy = (proxy_instance_t *)arg; proxy_instance_t *proxy = (proxy_instance_t *)arg;
connsock_t *cs = proxy->cs; connsock_t *cs = proxy->cs;
gdata_t *gdata = cs->ckp->data;
rename_proc("proxysend"); rename_proc("proxysend");
@ -1405,6 +1460,10 @@ static void *proxy_send(void *arg)
"id", json_object_dup(msg->json_msg, "id"), "id", json_object_dup(msg->json_msg, "id"),
"method", "mining.submit"); "method", "mining.submit");
ret = send_json_msg(cs, val); ret = send_json_msg(cs, val);
if (unlikely(!ret && !subproxy->disabled)) {
LOGWARNING("Proxy %d:%d dead, disabling", proxy->id, subid);
disable_subproxy(gdata, proxy, subproxy);
}
json_decref(val); json_decref(val);
} else { } else {
LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend",
@ -1446,8 +1505,6 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
ckmsgq_add(proxi->passsends, pm); ckmsgq_add(proxi->passsends, pm);
} }
static bool recruit_subproxy(proxy_instance_t *proxi);
static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi, static bool proxy_alive(ckpool_t *ckp, server_instance_t *si, proxy_instance_t *proxi,
connsock_t *cs, bool pinging, int epfd) connsock_t *cs, bool pinging, int epfd)
{ {
@ -2034,6 +2091,7 @@ int generator(proc_instance_t *pi)
LOGWARNING("%s generator starting", ckp->name); LOGWARNING("%s generator starting", ckp->name);
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata; ckp->data = gdata;
gdata->ckp = ckp;
if (ckp->proxy) { if (ckp->proxy) {
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);
} else { } else {

Loading…
Cancel
Save