From cc6e431b13f8c2b852e8a9184c0da339f7461fe9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 23 Sep 2016 13:04:19 +1000 Subject: [PATCH] Properly handle userproxy priorities --- src/stratifier.c | 55 +++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 304a5928..1d8592fe 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2081,19 +2081,25 @@ static int64_t prio_sort(proxy_t *a, proxy_t *b) return (a->priority - b->priority); } +/* Masked increment */ +static int64_t masked_inc(int64_t value, int64_t mask) +{ + value &= ~mask; + value++; + value |= mask; + return value; +} + /* Priority values can be sparse, they do not need to be sequential */ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) { proxy_t *tmpa, *tmpb, *exists = NULL; - int64_t next_prio = 0; + int64_t mask, next_prio = 0; /* Encode the userid as the high bits in priority */ - if (!proxy->global) { - int64_t high_bits = proxy->userid; - - high_bits <<= 32; - priority |= high_bits; - } + mask = proxy->userid; + mask <<= 32; + priority |= mask; /* See if the priority is already in use */ HASH_ITER(hh, sdata->proxies, tmpa, tmpb) { @@ -2101,7 +2107,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) break; if (tmpa->priority == priority) { exists = tmpa; - next_prio = exists->priority + 1; + next_prio = masked_inc(priority, mask); break; } } @@ -2109,7 +2115,7 @@ static void __set_proxy_prio(sdata_t *sdata, proxy_t *proxy, int64_t priority) HASH_ITER(hh, exists, tmpa, tmpb) { if (tmpa->priority > next_prio) break; - tmpa->priority++; + tmpa->priority = masked_inc(tmpa->priority, mask); next_prio++; } proxy->priority = priority; @@ -2225,11 +2231,15 @@ static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid) return subproxy; } +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid); + static void set_proxy_prio(sdata_t *sdata, proxy_t *proxy, const int priority) { mutex_lock(&sdata->proxy_lock); __set_proxy_prio(sdata, proxy, priority); mutex_unlock(&sdata->proxy_lock); + + check_userproxies(sdata, proxy, proxy->userid); } /* Set proxy to the current proxy and calculate how much headroom it has */ @@ -2544,7 +2554,7 @@ static void recruit_best_userproxy(sdata_t *sdata, const int userid, const int r /* Check how much headroom the userid proxies have and reconnect any clients * that are not bound to it that should be */ -static void check_userproxies(sdata_t *sdata, const int userid) +static void check_userproxies(sdata_t *sdata, proxy_t *proxy, const int userid) { int64_t headroom = proxy_headroom(sdata, userid); stratum_instance_t *client, *tmpclient; @@ -2558,8 +2568,10 @@ static void check_userproxies(sdata_t *sdata, const int userid) continue; if (client->user_id != userid) continue; - /* Is this client bound to a dead proxy? */ - if (!client->reconnect && client->proxy->userid == userid) + /* Is the client already bound to a proxy of its own userid of + * a higher priority than this one. */ + if (client->proxy->userid == userid && + client->proxy->parent->priority <= proxy->parent->priority) continue; if (headroom-- < 1) continue; @@ -2681,7 +2693,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd) if (proxy->parent != best_proxy(sdata)->parent) reconnect_clients(sdata); } else - check_userproxies(sdata, proxy->userid); + check_userproxies(sdata, proxy, proxy->userid); clean |= new_block; LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id, subid, clean ? "" : "out"); @@ -4355,19 +4367,16 @@ static proxy_t *__best_subproxy(proxy_t *proxy) * running out of room. */ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) { - proxy_t *current, *proxy, *tmp, *best = NULL; + proxy_t *global, *proxy, *tmp, *best = NULL; if (!ckp->proxy || ckp->passthrough) return ckp_sdata; - current = ckp_sdata->proxy; - if (!current) { - LOGWARNING("No proxy available yet to generate subscribes"); - return NULL; - } /* Proxies are ordered by priority so first available will be the best * priority */ mutex_lock(&ckp_sdata->proxy_lock); + best = global = ckp_sdata->proxy; + HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { if (proxy->userid < userid) continue; @@ -4381,12 +4390,14 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int if (!best) { if (!userid) - LOGWARNING("Temporarily insufficient subproxies to accept more clients"); + LOGWARNING("Temporarily insufficient proxies to accept more clients"); + else + LOGNOTICE("Temporarily insufficient proxies for userid %d to accept more clients", userid); return NULL; } if (!userid) { - if (best->id != current->id || current_headroom(ckp_sdata, &proxy) < 2) - generator_recruit(ckp, current->id, 1); + if (best->id != global->id || current_headroom(ckp_sdata, &proxy) < 2) + generator_recruit(ckp, global->id, 1); } else { if (proxy_headroom(ckp_sdata, userid) < 2) generator_recruit(ckp, best->id, 1);