Browse Source

Revert sessionid code to merge master sessionid code

master
Con Kolivas 10 years ago
parent
commit
505abca664
  1. 133
      src/stratifier.c

133
src/stratifier.c

@ -281,11 +281,6 @@ struct stratum_instance {
proxy_t *proxy; /* Proxy this is bound to in proxy mode */ proxy_t *proxy; /* Proxy this is bound to in proxy mode */
int proxyid; /* Which proxy id */ int proxyid; /* Which proxy id */
int subproxyid; /* Which subproxy */ int subproxyid; /* Which subproxy */
/* What session id we gave this instance to recognise users as they
* reconnect before they've sent their auth information if possible in
* proxy mode instead of supporting stratum resume with it. */
int64_t session_id;
}; };
struct share { struct share {
@ -336,19 +331,10 @@ struct proxy_base {
int subproxy_count; /* Number of subproxies */ int subproxy_count; /* Number of subproxies */
proxy_t *parent; /* Parent proxy of each subproxy */ proxy_t *parent; /* Parent proxy of each subproxy */
proxy_t *subproxies; /* Hashlist of subproxies sorted by subid */ proxy_t *subproxies; /* Hashlist of subproxies sorted by subid */
sdata_t *sdata; /* Unique stratifier data for each subproxy */ sdata_t *sdata; /* Unique stratifer data for each subproxy */
bool dead; bool dead;
}; };
typedef struct user_sessionid user_sessionid_t;
struct user_sessionid {
UT_hash_handle hh;
int64_t session_id;
user_instance_t *user;
time_t added;
};
struct stratifier_data { struct stratifier_data {
ckpool_t *ckp; ckpool_t *ckp;
@ -388,7 +374,6 @@ struct stratifier_data {
ckmsgq_t *stxnq; // Transaction requests ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id; int64_t user_instance_id;
int64_t session_id;
/* Stratum_instances hashlist is stored by id, whereas disconnected_instances /* Stratum_instances hashlist is stored by id, whereas disconnected_instances
* is sorted by enonce1_64. */ * is sorted by enonce1_64. */
@ -423,7 +408,6 @@ struct stratifier_data {
proxy_t *proxies; /* Hashlist of all proxies */ proxy_t *proxies; /* Hashlist of all proxies */
mutex_t proxy_lock; /* Protects all proxy data */ mutex_t proxy_lock; /* Protects all proxy data */
proxy_t *subproxy; /* Which subproxy this sdata belongs to in proxy mode */ proxy_t *subproxy; /* Which subproxy this sdata belongs to in proxy mode */
user_sessionid_t *user_sessionids;
}; };
typedef struct json_entry json_entry_t; typedef struct json_entry json_entry_t;
@ -2778,7 +2762,7 @@ static proxy_t *__best_subproxy(proxy_t *proxy)
* in proxy mode where we find a subproxy based on the current proxy with room * in proxy mode where we find a subproxy based on the current proxy with room
* for more clients. Signal the generator to recruit more subproxies if we are * for more clients. Signal the generator to recruit more subproxies if we are
* running out of room. */ * running out of room. */
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, int userid) static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
{ {
proxy_t *current, *proxy, *tmp, *best = NULL; proxy_t *current, *proxy, *tmp, *best = NULL;
@ -2794,9 +2778,10 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, int userid
* priority */ * priority */
mutex_lock(&ckp_sdata->proxy_lock); mutex_lock(&ckp_sdata->proxy_lock);
HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) { HASH_ITER(hh, ckp_sdata->proxies, proxy, tmp) {
if (proxy->userid < userid) /* FIXME: We need to check the user bound proxies though we
continue; * currently only know users after they've authorised which is
if (proxy->userid > userid) * too late for this. */
if (!proxy->global)
break; break;
best = __best_subproxy(proxy); best = __best_subproxy(proxy);
if (best) if (best)
@ -2805,35 +2790,14 @@ static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, int userid
mutex_unlock(&ckp_sdata->proxy_lock); mutex_unlock(&ckp_sdata->proxy_lock);
if (!best) { if (!best) {
if (!userid) LOGWARNING("Temporarily insufficient subproxies to accept more clients");
LOGWARNING("Temporarily insufficient subproxies to accept more clients");
return NULL; return NULL;
} }
if (!userid && (best->id != current->id || current->headroom < 2)) if (best->id != current->id || current->headroom < 2)
generator_recruit(ckp, 1); generator_recruit(ckp, 1);
return best->sdata; return best->sdata;
} }
static user_instance_t *user_from_sessionid(sdata_t *sdata, const int64_t session_id)
{
user_sessionid_t *user_sessionid;
user_instance_t *user = NULL;
ck_wlock(&sdata->instance_lock);
HASH_FIND_I64(sdata->user_sessionids, &session_id, user_sessionid);
if (user_sessionid)
HASH_DEL(sdata->user_sessionids, user_sessionid);
ck_wunlock(&sdata->instance_lock);
if (user_sessionid) {
user = user_sessionid->user;
LOGINFO("Found matching user %s from sessionid %lx", user->username,
session_id);
dealloc(user_sessionid);
}
return user;
}
/* Extranonce1 must be set here. Needs to be entered with client holding a ref /* Extranonce1 must be set here. Needs to be entered with client holding a ref
* count. */ * count. */
static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_id, const json_t *params_val) static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_id, const json_t *params_val)
@ -2850,12 +2814,17 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
return json_string("params not an array"); return json_string("params not an array");
} }
sdata = select_sdata(ckp, ckp_sdata, 0); sdata = select_sdata(ckp, ckp_sdata);
if (unlikely(!sdata || !sdata->current_workbase)) { if (unlikely(!sdata || !sdata->current_workbase)) {
LOGWARNING("Failed to provide subscription due to no %s", sdata ? "current workbase" : "sdata"); LOGWARNING("Failed to provide subscription due to no %s", sdata ? "current workbase" : "sdata");
stratum_send_message(ckp_sdata, client, "Pool Initialising"); stratum_send_message(ckp_sdata, client, "Pool Initialising");
return json_string("Initialising"); return json_string("Initialising");
} }
if (ckp->proxy) {
LOGINFO("Current %d, selecting proxy %d:%d for client %"PRId64, ckp_sdata->proxy->id,
sdata->subproxy->id, sdata->subproxy->subid, client->id);
}
client->sdata = sdata;
arr_size = json_array_size(params_val); arr_size = json_array_size(params_val);
/* NOTE useragent is NULL prior to this so should not be used in code /* NOTE useragent is NULL prior to this so should not be used in code
@ -2868,41 +2837,23 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
client->useragent = strdup(buf); client->useragent = strdup(buf);
else else
client->useragent = ckzalloc(1); // Set to "" client->useragent = ckzalloc(1); // Set to ""
buf = NULL; if (arr_size > 1 && !ckp->proxy) {
if (arr_size > 1) { /* This would be the session id for reconnect, it will
* not work for clients on a proxied connection. */
buf = json_string_value(json_array_get(params_val, 1)); buf = json_string_value(json_array_get(params_val, 1));
LOGDEBUG("Found old session id %s", buf); LOGDEBUG("Found old session id %s", buf);
if (!ckp->proxy) { /* Add matching here */
/* This would be the session id for reconnect, it will if ((client->enonce1_64 = disconnected_sessionid_exists(sdata, buf, client_id))) {
* not work for clients on a proxied connection. */ sprintf(client->enonce1, "%016lx", client->enonce1_64);
if ((client->enonce1_64 = disconnected_sessionid_exists(sdata, buf, client_id))) { old_match = true;
sprintf(client->enonce1, "%016lx", client->enonce1_64);
old_match = true; ck_rlock(&sdata->workbase_lock);
__fill_enonce1data(sdata->current_workbase, client);
ck_rlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
__fill_enonce1data(sdata->current_workbase, client);
ck_runlock(&sdata->workbase_lock);
}
} else {
int64_t session_id = 0;
sscanf(buf, "%lx", &session_id);
client->user_instance = user_from_sessionid(ckp_sdata, session_id);
if (client->user_instance) {
sdata_t *usdata = select_sdata(ckp, ckp_sdata, client->user_instance->id);
if (usdata)
sdata = usdata;
}
} }
} }
} else } else
client->useragent = ckzalloc(1); client->useragent = ckzalloc(1);
client->sdata = sdata;
if (ckp->proxy) {
LOGINFO("Current %d, selecting proxy %d:%d for client %"PRId64, ckp_sdata->proxy->id,
sdata->subproxy->id, sdata->subproxy->subid, client->id);
}
if (!old_match) { if (!old_match) {
/* Create a new extranonce1 based on a uint64_t pointer */ /* Create a new extranonce1 based on a uint64_t pointer */
if (!new_enonce1(ckp, ckp_sdata, sdata, client)) { if (!new_enonce1(ckp, ckp_sdata, sdata, client)) {
@ -2923,17 +2874,13 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
/* Send a random sessionid in proxy mode so clients don't think we have /* Send a random sessionid in proxy mode so clients don't think we have
* resumed if enonce1 ends up matching on reconnect but we can use it * resumed if enonce1 ends up matching on reconnect. */
* to recognise users reconnecting before they authorise. */
if (ckp->proxy) { if (ckp->proxy) {
char sessionid[20]; unsigned int now = time(NULL);
char nowx[12];
ck_wlock(&ckp_sdata->instance_lock); sprintf(nowx, "%x", now);
client->session_id = ckp_sdata->session_id++; JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", nowx, client->enonce1,
ck_wunlock(&ckp_sdata->instance_lock);
sprintf(sessionid, "%lx", client->session_id);
JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", sessionid, client->enonce1,
n2len); n2len);
} else { } else {
JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1, JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1,
@ -3107,7 +3054,7 @@ static user_instance_t *__create_user(sdata_t *sdata, const char *username)
user->auth_backoff = DEFAULT_AUTH_BACKOFF; user->auth_backoff = DEFAULT_AUTH_BACKOFF;
strcpy(user->username, username); strcpy(user->username, username);
user->id = ++sdata->user_instance_id; user->id = sdata->user_instance_id++;
HASH_ADD_STR(sdata->user_instances, username, user); HASH_ADD_STR(sdata->user_instances, username, user);
return user; return user;
} }
@ -3381,20 +3328,6 @@ static void queue_delayed_auth(stratum_instance_t *client)
ckdbq_add(ckp, ID_AUTH, val); ckdbq_add(ckp, ID_AUTH, val);
} }
static void add_user_sessionid(ckpool_t *ckp, int64_t session_id, user_instance_t *user)
{
user_sessionid_t *user_sessionid = ckalloc(sizeof(user_sessionid_t));
sdata_t *sdata = ckp->data;
user_sessionid->session_id = session_id;
user_sessionid->user = user;
user_sessionid->added = time(NULL);
ck_wlock(&sdata->instance_lock);
HASH_ADD_I64(sdata->user_sessionids, session_id, user_sessionid);
ck_wunlock(&sdata->instance_lock);
}
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with client holding a ref count. */
static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_val, static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_val,
json_t **err_val, int *errnum) json_t **err_val, int *errnum)
@ -3437,8 +3370,6 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
goto out; goto out;
} }
user = generate_user(ckp, client, buf); user = generate_user(ckp, client, buf);
if (ckp->proxy)
add_user_sessionid(ckp, client->session_id, user);
client->user_id = user->id; client->user_id = user->id;
ts_realtime(&now); ts_realtime(&now);
client->start_time = now.tv_sec; client->start_time = now.tv_sec;
@ -5458,8 +5389,6 @@ int stratifier(proc_instance_t *pi)
randomiser <<= 32; randomiser <<= 32;
if (!ckp->proxy) if (!ckp->proxy)
sdata->blockchange_id = sdata->workbase_id = randomiser; sdata->blockchange_id = sdata->workbase_id = randomiser;
else
sdata->session_id = randomiser;
if (!ckp->serverurls) { if (!ckp->serverurls) {
ckp->serverurl[0] = "127.0.0.1"; ckp->serverurl[0] = "127.0.0.1";

Loading…
Cancel
Save