Browse Source

Rework sessionid matching to not store client structure

master
ckolivas 10 years ago
parent
commit
c7dbb0114d
  1. 154
      src/stratifier.c

154
src/stratifier.c

@ -227,6 +227,7 @@ struct stratum_instance {
uchar enonce1bin[16]; uchar enonce1bin[16];
char enonce1var[12]; char enonce1var[12];
uint64_t enonce1_64; uint64_t enonce1_64;
int session_id;
int64_t diff; /* Current diff */ int64_t diff; /* Current diff */
int64_t old_diff; /* Previous diff */ int64_t old_diff; /* Previous diff */
@ -278,6 +279,16 @@ struct share {
typedef struct share share_t; typedef struct share share_t;
typedef struct session session_t;
struct session {
UT_hash_handle hh;
int session_id;
uint64_t enonce1_64;
int64_t client_id;
time_t added;
};
struct stratifier_data { struct stratifier_data {
char pubkeytxnbin[25]; char pubkeytxnbin[25];
char donkeytxnbin[25]; char donkeytxnbin[25];
@ -309,6 +320,7 @@ struct stratifier_data {
int64_t workbase_id; int64_t workbase_id;
int64_t blockchange_id; int64_t blockchange_id;
int session_id;
char lasthash[68]; char lasthash[68];
char lastswaphash[68]; char lastswaphash[68];
@ -321,14 +333,12 @@ struct stratifier_data {
int64_t user_instance_id; int64_t user_instance_id;
/* Stratum_instances hashlist is stored by id, whereas disconnected_instances
* is sorted by enonce1_64. */
stratum_instance_t *stratum_instances; stratum_instance_t *stratum_instances;
stratum_instance_t *disconnected_instances;
stratum_instance_t *recycled_instances; stratum_instance_t *recycled_instances;
int stratum_generated; int stratum_generated;
int disconnected_generated; int disconnected_generated;
session_t *disconnected_sessions;
user_instance_t *user_instances; user_instance_t *user_instances;
@ -938,13 +948,6 @@ static void __kill_instance(sdata_t *sdata, stratum_instance_t *client)
DL_APPEND(sdata->recycled_instances, client); DL_APPEND(sdata->recycled_instances, client);
} }
static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client)
{
HASH_DEL(sdata->disconnected_instances, client);
sdata->stats.disconnected--;
__kill_instance(sdata, client);
}
/* Called with instance_lock held. Note stats.users is protected by /* Called with instance_lock held. Note stats.users is protected by
* instance lock to avoid recursive locking. */ * instance lock to avoid recursive locking. */
static void __inc_worker(sdata_t *sdata, user_instance_t *instance) static void __inc_worker(sdata_t *sdata, user_instance_t *instance)
@ -961,6 +964,35 @@ static void __dec_worker(sdata_t *sdata, user_instance_t *instance)
sdata->stats.users--; sdata->stats.users--;
} }
static void __disconnect_session(sdata_t *sdata, const stratum_instance_t *client)
{
time_t now_t = time(NULL);
session_t *session, *tmp;
/* Opportunity to age old sessions */
HASH_ITER(hh, sdata->disconnected_sessions, session, tmp) {
if (now_t - session->added > 600) {
HASH_DEL(sdata->disconnected_sessions, session);
dealloc(session);
sdata->stats.disconnected--;
}
}
if (!client->enonce1_64)
return;
HASH_FIND_INT(sdata->disconnected_sessions, &client->session_id, session);
if (session)
return;
session = ckalloc(sizeof(session_t));
session->enonce1_64 = client->enonce1_64;
session->session_id = client->session_id;
session->client_id = client->id;
session->added = now_t;
HASH_ADD_INT(sdata->disconnected_sessions, session_id, session);
sdata->stats.disconnected++;
sdata->disconnected_generated++;
}
/* Removes a client instance we know is on the stratum_instances list and from /* Removes a client instance we know is on the stratum_instances list and from
* the user client list if it's been placed on it */ * the user client list if it's been placed on it */
static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user) static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user)
@ -970,13 +1002,14 @@ static void __del_client(sdata_t *sdata, stratum_instance_t *client, user_instan
DL_DELETE(user->clients, client); DL_DELETE(user->clients, client);
__dec_worker(sdata, user); __dec_worker(sdata, user);
} }
} }
static void drop_allclients(ckpool_t *ckp) static void drop_allclients(ckpool_t *ckp)
{ {
stratum_instance_t *client, *tmp; stratum_instance_t *client, *tmp;
int disconnects = 0, kills = 0;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
int kills = 0;
char buf[128]; char buf[128];
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
@ -992,15 +1025,9 @@ static void drop_allclients(ckpool_t *ckp)
sprintf(buf, "dropclient=%"PRId64, client_id); sprintf(buf, "dropclient=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
} }
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
disconnects++;
__del_disconnected(sdata, client);
}
sdata->stats.users = sdata->stats.workers = 0; sdata->stats.users = sdata->stats.workers = 0;
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
if (disconnects)
LOGNOTICE("Disconnected %d instances", disconnects);
if (kills) if (kills)
LOGNOTICE("Dropped %d instances", kills); LOGNOTICE("Dropped %d instances", kills);
} }
@ -1223,40 +1250,11 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, const int64_t id)
return client; return client;
} }
/* Has this client_id already been used and is now in one of the dropped lists */
static bool __dropped_instance(sdata_t *sdata, const int64_t id)
{
stratum_instance_t *client, *tmp;
bool ret = false;
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
if (unlikely(client->id == id)) {
ret = true;
goto out;
}
}
out:
return ret;
}
/* Ret = 1 is disconnected, 2 is killed, 3 is workerless killed */
static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user, static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *user,
bool lazily, char **msg) bool lazily, char **msg)
{ {
stratum_instance_t *old_client = NULL;
__del_client(sdata, client, user); __del_client(sdata, client, user);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s disconnected %s",
client->id, client->address, user->throttled ? "throttled " : "",
user->username, client->workername, lazily ? "lazily" : "");
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
sdata->disconnected_generated++;
client->disconnected_time = time(NULL);
} else {
if (client->workername) { if (client->workername) {
if (user) { if (user) {
ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s", ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s",
@ -1272,7 +1270,6 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta
client->id, client->address, lazily ? "lazily" : ""); client->id, client->address, lazily ? "lazily" : "");
} }
__kill_instance(sdata, client); __kill_instance(sdata, client);
}
} }
/* Decrease the reference count of instance. */ /* Decrease the reference count of instance. */
@ -1326,6 +1323,7 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i
client = __recruit_stratum_instance(sdata); client = __recruit_stratum_instance(sdata);
client->id = id; client->id = id;
client->session_id = ++sdata->session_id;
strcpy(client->address, address); strcpy(client->address, address);
client->server = server; client->server = server;
client->diff = client->old_diff = ckp->startdiff; client->diff = client->old_diff = ckp->startdiff;
@ -1337,42 +1335,31 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i
static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, const int64_t id) static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, const int64_t id)
{ {
stratum_instance_t *client, *tmp; int session_id, slen;
uint64_t enonce1_64 = 0, ret = 0; session_t *session;
int64_t old_id = 0; int64_t old_id = 0;
int slen; uint64_t ret = 0;
if (!sessionid) if (!sessionid)
goto out; goto out;
slen = strlen(sessionid) / 2; slen = strlen(sessionid) / 2;
if (slen < 1 || slen > 8) if (slen < 1 || slen > 4)
goto out; goto out;
if (!validhex(sessionid)) if (!validhex(sessionid))
goto out; goto out;
/* Number is in BE but we don't swap either of them */ sscanf(sessionid, "%x", &session_id);
hex2bin(&enonce1_64, sessionid, slen);
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_FIND_INT(sdata->disconnected_sessions, &session_id, session);
if (client->id == id) if (!session)
continue;
if (client->enonce1_64 == enonce1_64) {
/* Only allow one connected instance per enonce1 */
goto out_unlock; goto out_unlock;
} HASH_DEL(sdata->disconnected_sessions, session);
} sdata->stats.disconnected--;
client = NULL; ret = session->enonce1_64;
HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), client); old_id = session->client_id;
if (client) { dealloc(session);
/* Delete the entry once we are going to use it since there
* will be a new instance with the enonce1_64 */
old_id = client->id;
__del_disconnected(sdata, client);
ret = enonce1_64;
}
out_unlock: out_unlock:
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
out: out:
@ -1444,11 +1431,9 @@ static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_i
static void drop_client(sdata_t *sdata, const int64_t id) static void drop_client(sdata_t *sdata, const int64_t id)
{ {
stratum_instance_t *client, *tmp;
char_entry_t *entries = NULL; char_entry_t *entries = NULL;
user_instance_t *user = NULL; user_instance_t *user = NULL;
time_t now_t = time(NULL); stratum_instance_t *client;
int aged = 0;
char *msg; char *msg;
LOGINFO("Stratifier asked to drop client %"PRId64, id); LOGINFO("Stratifier asked to drop client %"PRId64, id);
@ -1456,6 +1441,7 @@ static void drop_client(sdata_t *sdata, const int64_t id)
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
client = __instance_by_id(sdata, id); client = __instance_by_id(sdata, id);
if (client && !client->dropped) { if (client && !client->dropped) {
__disconnect_session(sdata, client);
user = client->user_instance; user = client->user_instance;
/* If the client is still holding a reference, don't drop them /* If the client is still holding a reference, don't drop them
* now but wait till the reference is dropped */ * now but wait till the reference is dropped */
@ -1465,21 +1451,9 @@ static void drop_client(sdata_t *sdata, const int64_t id)
} else } else
client->dropped = true; client->dropped = true;
} }
/* Old disconnected instances will not have any valid shares so remove
* them from the disconnected instances list if they've been dead for
* more than 10 minutes */
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
if (now_t - client->disconnected_time < 600)
continue;
aged++;
__del_disconnected(sdata, client);
}
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
notice_msg_entries(&entries); notice_msg_entries(&entries);
if (aged)
LOGINFO("Aged %d disconnected instances to dead", aged);
} }
static void stratum_broadcast_message(sdata_t *sdata, const char *msg) static void stratum_broadcast_message(sdata_t *sdata, const char *msg)
@ -1696,7 +1670,8 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
objects = sdata->stats.disconnected; objects = sdata->stats.disconnected;
generated = sdata->disconnected_generated; generated = sdata->disconnected_generated;
memsize = sizeof(stratum_instance_t) * sdata->stats.disconnected; memsize = SAFE_HASH_OVERHEAD(sdata->disconnected_sessions);
memsize += sizeof(session_t) * sdata->stats.disconnected;
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "disconnected", subval); json_set_object(val, "disconnected", subval);
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
@ -1978,6 +1953,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
{ {
sdata_t *sdata = client->ckp->data; sdata_t *sdata = client->ckp->data;
bool old_match = false; bool old_match = false;
char sessionid[12];
int arr_size; int arr_size;
json_t *ret; json_t *ret;
int n2len; int n2len;
@ -2039,7 +2015,8 @@ static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_
n2len = sdata->workbases->enonce2varlen; n2len = sdata->workbases->enonce2varlen;
else else
n2len = 8; n2len = 8;
JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1, sprintf(sessionid, "%x", client->session_id);
JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", sessionid, client->enonce1,
n2len); n2len);
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
@ -3572,11 +3549,8 @@ static void srecv_process(ckpool_t *ckp, char *buf)
client = __instance_by_id(sdata, msg->client_id); client = __instance_by_id(sdata, msg->client_id);
/* If client_id instance doesn't exist yet, create one */ /* If client_id instance doesn't exist yet, create one */
if (unlikely(!client)) { if (unlikely(!client)) {
if (likely(!__dropped_instance(sdata, msg->client_id))) {
noid = true; noid = true;
client = __stratum_add_instance(ckp, msg->client_id, address, server); client = __stratum_add_instance(ckp, msg->client_id, address, server);
} else
dropped = true;
} else if (unlikely(client->dropped)) } else if (unlikely(client->dropped))
dropped = true; dropped = true;
if (likely(!dropped)) if (likely(!dropped))

Loading…
Cancel
Save