Browse Source

Update stratifier to use integers for proxy IDs again

master
Con Kolivas 10 years ago
parent
commit
d590869963
  1. 4
      src/generator.c
  2. 104
      src/stratifier.c

4
src/generator.c

@ -1290,12 +1290,12 @@ out:
return ret; return ret;
} }
static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int64_t id) static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id)
{ {
proxy_instance_t *proxi; proxy_instance_t *proxi;
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_FIND_I64(gdata->proxies, &id, proxi); HASH_FIND_INT(gdata->proxies, &id, proxi);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
return proxi; return proxi;

104
src/stratifier.c

@ -278,7 +278,7 @@ struct stratum_instance {
sdata_t *sdata; /* Which sdata this client is bound to */ sdata_t *sdata; /* Which sdata this client is bound to */
proxy_t *proxy; /* Proxy this is bound to in proxy mode */ proxy_t *proxy; /* Proxy this is bound to in proxy mode */
int64_t proxyid; /* Which proxy id */ int proxyid; /* Which proxy id */
int subproxyid; /* Which subproxy */ int subproxyid; /* Which subproxy */
}; };
@ -295,8 +295,7 @@ struct proxy_base {
UT_hash_handle sh; /* For subproxy hashlist */ UT_hash_handle sh; /* For subproxy hashlist */
proxy_t *next; /* For retired subproxies */ proxy_t *next; /* For retired subproxies */
proxy_t *prev; proxy_t *prev;
int64_t id; int id;
int low_id;
int subid; int subid;
double diff; double diff;
@ -1079,20 +1078,19 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata)
return dsdata; return dsdata;
} }
static proxy_t *__generate_proxy(sdata_t *sdata, const int64_t id) static proxy_t *__generate_proxy(sdata_t *sdata, const int id)
{ {
proxy_t *proxy = ckzalloc(sizeof(proxy_t)); proxy_t *proxy = ckzalloc(sizeof(proxy_t));
proxy->parent = proxy; proxy->parent = proxy;
proxy->id = id; proxy->id = id;
proxy->low_id = id & 0xFFFFFFFF;
proxy->sdata = duplicate_sdata(sdata); proxy->sdata = duplicate_sdata(sdata);
proxy->sdata->subproxy = proxy; proxy->sdata->subproxy = proxy;
proxy->sdata->verbose = true; proxy->sdata->verbose = true;
/* subid == 0 on parent proxy */ /* subid == 0 on parent proxy */
HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy); HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), proxy);
proxy->subproxy_count++; proxy->subproxy_count++;
HASH_ADD_I64(sdata->proxies, id, proxy); HASH_ADD_INT(sdata->proxies, id, proxy);
sdata->proxy_count++; sdata->proxy_count++;
return proxy; return proxy;
} }
@ -1103,7 +1101,6 @@ static proxy_t *__generate_subproxy(sdata_t *sdata, proxy_t *proxy, const int su
subproxy->parent = proxy; subproxy->parent = proxy;
subproxy->id = proxy->id; subproxy->id = proxy->id;
subproxy->low_id = proxy->low_id;
subproxy->subid = subid; subproxy->subid = subid;
HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), subproxy); HASH_ADD(sh, proxy->subproxies, subid, sizeof(int), subproxy);
proxy->subproxy_count++; proxy->subproxy_count++;
@ -1112,22 +1109,22 @@ static proxy_t *__generate_subproxy(sdata_t *sdata, proxy_t *proxy, const int su
return subproxy; return subproxy;
} }
static proxy_t *__existing_proxy(const sdata_t *sdata, const int64_t id) static proxy_t *__existing_proxy(const sdata_t *sdata, const int id)
{ {
proxy_t *proxy; proxy_t *proxy;
HASH_FIND_I64(sdata->proxies, &id, proxy); HASH_FIND_INT(sdata->proxies, &id, proxy);
return proxy; return proxy;
} }
/* Find proxy by id number, generate one if none exist yet by that id */ /* Find proxy by id number, generate one if none exist yet by that id */
static proxy_t *__proxy_by_id(sdata_t *sdata, const int64_t id) static proxy_t *__proxy_by_id(sdata_t *sdata, const int id)
{ {
proxy_t *proxy = __existing_proxy(sdata, id); proxy_t *proxy = __existing_proxy(sdata, id);
if (unlikely(!proxy)) { if (unlikely(!proxy)) {
proxy = __generate_proxy(sdata, id); proxy = __generate_proxy(sdata, id);
LOGNOTICE("Stratifier added new proxy %ld", id); LOGNOTICE("Stratifier added new proxy %d", id);
} }
return proxy; return proxy;
@ -1147,12 +1144,12 @@ static proxy_t *__subproxy_by_id(sdata_t *sdata, proxy_t *proxy, const int subid
if (!subproxy) { if (!subproxy) {
subproxy = __generate_subproxy(sdata, proxy, subid); subproxy = __generate_subproxy(sdata, proxy, subid);
LOGINFO("Stratifier added new subproxy %ld:%d", proxy->id, subid); LOGINFO("Stratifier added new subproxy %d:%d", proxy->id, subid);
} }
return subproxy; return subproxy;
} }
static proxy_t *subproxy_by_id(sdata_t *sdata, const int64_t id, const int subid) static proxy_t *subproxy_by_id(sdata_t *sdata, const int id, const int subid)
{ {
proxy_t *proxy, *subproxy; proxy_t *proxy, *subproxy;
@ -1164,7 +1161,7 @@ static proxy_t *subproxy_by_id(sdata_t *sdata, const int64_t id, const int subid
return subproxy; return subproxy;
} }
static proxy_t *existing_subproxy(sdata_t *sdata, const int64_t id, const int subid) static proxy_t *existing_subproxy(sdata_t *sdata, const int id, const int subid)
{ {
proxy_t *proxy, *subproxy = NULL; proxy_t *proxy, *subproxy = NULL;
@ -1241,7 +1238,7 @@ static void reconnect_clients(sdata_t *sdata)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged for reconnect to proxy %ld", reconnects, LOGNOTICE("%d clients flagged for reconnect to proxy %d", reconnects,
proxy->id); proxy->id);
} }
if (headroom < 0) if (headroom < 0)
@ -1261,7 +1258,7 @@ static proxy_t *current_proxy(sdata_t *sdata)
} }
#endif #endif
static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid) static void dead_proxyid(sdata_t *sdata, const int id, const int subid)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int reconnects = 0, hard = 0; int reconnects = 0, hard = 0;
@ -1271,7 +1268,7 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (proxy) if (proxy)
proxy->dead = true; proxy->dead = true;
LOGINFO("Stratifier dropping clients from proxy %ld:%d", id, subid); LOGINFO("Stratifier dropping clients from proxy %d:%d", id, subid);
headroom = current_headroom(sdata, &proxy); headroom = current_headroom(sdata, &proxy);
ck_rlock(&sdata->instance_lock); ck_rlock(&sdata->instance_lock);
@ -1288,7 +1285,7 @@ static void dead_proxyid(sdata_t *sdata, const int64_t id, const int subid)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
if (reconnects) { if (reconnects) {
LOGNOTICE("%d clients flagged to reconnect from dead proxy %ld:%d", reconnects, LOGNOTICE("%d clients flagged to reconnect from dead proxy %d:%d", reconnects,
id, subid); id, subid);
} }
if (headroom < 0) if (headroom < 0)
@ -1299,9 +1296,8 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
proxy_t *proxy, *old = NULL; proxy_t *proxy, *old = NULL;
int id = 0, subid = 0;
const char *buf; const char *buf;
int64_t id = 0;
int subid = 0;
json_t *val; json_t *val;
if (unlikely(strlen(cmd) < 11)) { if (unlikely(strlen(cmd) < 11)) {
@ -1315,7 +1311,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode subscribe response in update_subscribe %s", buf); LOGWARNING("Failed to json decode subscribe response in update_subscribe %s", buf);
return; return;
} }
if (unlikely(!json_get_int64(&id, val, "proxy"))) { if (unlikely(!json_get_int(&id, val, "proxy"))) {
LOGWARNING("Failed to json decode proxy value in update_subscribe %s", buf); LOGWARNING("Failed to json decode proxy value in update_subscribe %s", buf);
return; return;
} }
@ -1325,9 +1321,9 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
} }
if (!subid) if (!subid)
LOGNOTICE("Got updated subscribe for proxy %ld", id); LOGNOTICE("Got updated subscribe for proxy %d", id);
else else
LOGINFO("Got updated subscribe for proxy %ld:%d", id, subid); LOGINFO("Got updated subscribe for proxy %d:%d", id, subid);
/* Is this a replacement for an existing proxy id? */ /* Is this a replacement for an existing proxy id? */
old = existing_subproxy(sdata, id, subid); old = existing_subproxy(sdata, id, subid);
@ -1363,15 +1359,15 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
/* Is this a replacement proxy for the current one */ /* Is this a replacement proxy for the current one */
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
if (sdata->proxy && sdata->proxy->low_id == proxy->low_id && !proxy->subid) if (sdata->proxy && sdata->proxy->id == proxy->id && !proxy->subid)
sdata->proxy = proxy; sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
if (subid) { if (subid) {
LOGINFO("Upstream pool %ld:%d extranonce2 length %d, max proxy clients %"PRId64, LOGINFO("Upstream pool %d:%d extranonce2 length %d, max proxy clients %"PRId64,
id, subid, proxy->nonce2len, proxy->max_clients); id, subid, proxy->nonce2len, proxy->max_clients);
} else { } else {
LOGNOTICE("Upstream pool %ld extranonce2 length %d, max proxy clients %"PRId64, LOGNOTICE("Upstream pool %d extranonce2 length %d, max proxy clients %"PRId64,
id, proxy->nonce2len, proxy->max_clients); id, proxy->nonce2len, proxy->max_clients);
} }
json_decref(val); json_decref(val);
@ -1382,11 +1378,10 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
bool new_block = false, clean; bool new_block = false, clean;
proxy_t *proxy, *current; proxy_t *proxy, *current;
int i, id = 0, subid = 0;
int64_t current_id; int64_t current_id;
int i, subid = 0;
char header[228]; char header[228];
const char *buf; const char *buf;
int64_t id = 0;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
@ -1402,17 +1397,17 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode in update_notify"); LOGWARNING("Failed to json decode in update_notify");
return; return;
} }
json_get_int64(&id, val, "proxy"); json_get_int(&id, val, "proxy");
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (unlikely(!proxy || !proxy->subscribed)) { if (unlikely(!proxy || !proxy->subscribed)) {
LOGINFO("No valid proxy %ld:%d subscription to update notify yet", id, subid); LOGINFO("No valid proxy %d:%d subscription to update notify yet", id, subid);
goto out; goto out;
} }
if (!subid) if (!subid)
LOGNOTICE("Got updated notify for proxy %ld", id); LOGNOTICE("Got updated notify for proxy %d", id);
else else
LOGINFO("Got updated notify for proxy %ld:%d", id, subid); LOGINFO("Got updated notify for proxy %d:%d", id, subid);
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
wb->ckp = ckp; wb->ckp = ckp;
@ -1466,9 +1461,9 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
add_base(ckp, dsdata, wb, &new_block); add_base(ckp, dsdata, wb, &new_block);
if (new_block) { if (new_block) {
if (subid) if (subid)
LOGINFO("Block hash on proxy %ld:%d changed to %s", id, subid, dsdata->lastswaphash); LOGINFO("Block hash on proxy %d:%d changed to %s", id, subid, dsdata->lastswaphash);
else else
LOGNOTICE("Block hash on proxy %ld changed to %s", id, dsdata->lastswaphash); LOGNOTICE("Block hash on proxy %d changed to %s", id, dsdata->lastswaphash);
} }
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
@ -1481,7 +1476,7 @@ static void update_notify(ckpool_t *ckp, const char *cmd)
if (!proxy->subid && proxy->id == current_id) if (!proxy->subid && proxy->id == current_id)
reconnect_clients(sdata); reconnect_clients(sdata);
clean |= new_block; clean |= new_block;
LOGINFO("Proxy %ld:%d broadcast updated stratum notify with%s clean", id, LOGINFO("Proxy %d:%d broadcast updated stratum notify with%s clean", id,
subid, clean ? "" : "out"); subid, clean ? "" : "out");
stratum_broadcast_update(dsdata, wb, clean); stratum_broadcast_update(dsdata, wb, clean);
out: out:
@ -1495,10 +1490,9 @@ static void update_diff(ckpool_t *ckp, const char *cmd)
sdata_t *sdata = ckp->data, *dsdata; sdata_t *sdata = ckp->data, *dsdata;
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
double old_diff, diff; double old_diff, diff;
int id = 0, subid = 0;
const char *buf; const char *buf;
int64_t id = 0;
proxy_t *proxy; proxy_t *proxy;
int subid = 0;
json_t *val; json_t *val;
if (unlikely(strlen(cmd) < 6)) { if (unlikely(strlen(cmd) < 6)) {
@ -1513,15 +1507,15 @@ static void update_diff(ckpool_t *ckp, const char *cmd)
LOGWARNING("Failed to json decode in update_diff"); LOGWARNING("Failed to json decode in update_diff");
return; return;
} }
json_get_int64(&id, val, "proxy"); json_get_int(&id, val, "proxy");
json_get_int(&subid, val, "subproxy"); json_get_int(&subid, val, "subproxy");
json_dblcpy(&diff, val, "diff"); json_dblcpy(&diff, val, "diff");
json_decref(val); json_decref(val);
LOGINFO("Got updated diff for proxy %ld:%d", id, subid); LOGINFO("Got updated diff for proxy %d:%d", id, subid);
proxy = existing_subproxy(sdata, id, subid); proxy = existing_subproxy(sdata, id, subid);
if (!proxy) { if (!proxy) {
LOGINFO("No existing subproxy %ld:%d to update diff", id, subid); LOGINFO("No existing subproxy %d:%d to update diff", id, subid);
return; return;
} }
@ -1603,12 +1597,12 @@ static void reap_proxies(ckpool_t *ckp, sdata_t *sdata)
if (!subproxy->dead) if (!subproxy->dead)
continue; continue;
if (unlikely(!subproxy->subid)) { if (unlikely(!subproxy->subid)) {
LOGWARNING("Unexepectedly found proxy %ld:%d as subproxy of %ld:%d", LOGWARNING("Unexepectedly found proxy %d:%d as subproxy of %d:%d",
subproxy->id, subproxy->subid, proxy->id, proxy->subid); subproxy->id, subproxy->subid, proxy->id, proxy->subid);
continue; continue;
} }
if (unlikely(subproxy == sdata->proxy)) { if (unlikely(subproxy == sdata->proxy)) {
LOGWARNING("Unexepectedly found proxy %ld:%d as current", LOGWARNING("Unexepectedly found proxy %d:%d as current",
subproxy->id, subproxy->subid); subproxy->id, subproxy->subid);
continue; continue;
} }
@ -2239,25 +2233,24 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
* this proxy we are only given this message if all clients must move. */ * this proxy we are only given this message if all clients must move. */
static void set_proxy(sdata_t *sdata, const char *buf) static void set_proxy(sdata_t *sdata, const char *buf)
{ {
int64_t id = 0; int id = 0;
proxy_t *proxy; proxy_t *proxy;
sscanf(buf, "proxy=%ld", &id); sscanf(buf, "proxy=%d", &id);
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
proxy = __proxy_by_id(sdata, id); proxy = __proxy_by_id(sdata, id);
sdata->proxy = proxy; sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
LOGNOTICE("Stratifier setting active proxy to %ld", id); LOGNOTICE("Stratifier setting active proxy to %d", id);
} }
static void dead_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf)
{ {
int64_t id = 0; int id = 0, subid = 0;
int subid = 0;
sscanf(buf, "deadproxy=%ld:%d", &id, &subid); sscanf(buf, "deadproxy=%d:%d", &id, &subid);
dead_proxyid(sdata, id, subid); dead_proxyid(sdata, id, subid);
} }
@ -2522,8 +2515,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata) static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
{ {
proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub; proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub;
int best_subid = 0, best_lowid; int best_subid = 0, best_id;
int64_t best_id = 0;
if (!ckp->proxy || ckp->passthrough) if (!ckp->proxy || ckp->passthrough)
return ckp_sdata; return ckp_sdata;
@ -2532,7 +2524,7 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
LOGWARNING("No proxy available yet to generate subscribes"); LOGWARNING("No proxy available yet to generate subscribes");
return NULL; return NULL;
} }
best_lowid = ckp_sdata->proxy_count; best_id = ckp_sdata->proxy_count;
mutex_lock(&ckp_sdata->proxy_lock); mutex_lock(&ckp_sdata->proxy_lock);
HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) {
@ -2555,9 +2547,8 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
max_headroom = subproxy_headroom; max_headroom = subproxy_headroom;
} }
} }
if (best && (best->low_id < best_lowid || (best->low_id == best_lowid && best->id > best_id))) { if (best && best->id < best_id) {
best_id = best->id; best_id = best->id;
best_lowid = best->low_id;
best_subid = best->subid; best_subid = best->subid;
if (proxy == current) if (proxy == current)
break; break;
@ -2598,7 +2589,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
return json_string("Initialising"); return json_string("Initialising");
} }
if (ckp->proxy) { if (ckp->proxy) {
LOGINFO("Current %ld, selecting proxy %ld:%d for client %"PRId64, ckp_sdata->proxy->id, LOGINFO("Current %d, selecting proxy %d:%d for client %"PRId64, ckp_sdata->proxy->id,
sdata->subproxy->id, sdata->subproxy->subid, client->id); sdata->subproxy->id, sdata->subproxy->subid, client->id);
} }
client->sdata = sdata; client->sdata = sdata;
@ -3187,7 +3178,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
client->authorised = ret; client->authorised = ret;
user->authorised = ret; user->authorised = ret;
if (ckp->proxy) { if (ckp->proxy) {
LOGNOTICE("Authorised client %"PRId64" to proxy %ld:%d, worker %s as user %s", LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s",
client->id, client->proxyid, client->subproxyid, buf, user->username); client->id, client->proxyid, client->subproxyid, buf, user->username);
} else { } else {
LOGNOTICE("Authorised client %"PRId64" worker %s as user %s", LOGNOTICE("Authorised client %"PRId64" worker %s as user %s",
@ -4902,9 +4893,8 @@ static void *statsupdate(void *arg)
mutex_lock(&sdata->proxy_lock); mutex_lock(&sdata->proxy_lock);
HASH_ITER(hh, sdata->proxies, proxy, proxytmp) { HASH_ITER(hh, sdata->proxies, proxy, proxytmp) {
JSON_CPACK(val, "{sI,si,si,sI,sb}", JSON_CPACK(val, "{sI,si,sI,sb}",
"id", proxy->id, "id", proxy->id,
"priority", proxy->low_id,
"subproxies", proxy->subproxy_count, "subproxies", proxy->subproxy_count,
"clients", proxy->combined_clients, "clients", proxy->combined_clients,
"alive", !proxy->dead); "alive", !proxy->dead);

Loading…
Cancel
Save