Browse Source

Add disconnected stratum clients to a linked list and use reference counting to know when they are no longer being referenced and can have their data freed.

master
Con Kolivas 10 years ago
parent
commit
0941d5a751
  1. 154
      src/stratifier.c

154
src/stratifier.c

@ -209,6 +209,10 @@ struct stratum_instance {
stratum_instance_t *next;
stratum_instance_t *prev;
/* Reference count for when this instance is used outside of the
* instance_lock */
int ref;
char enonce1[32];
uchar enonce1bin[16];
char enonce1var[12];
@ -303,6 +307,7 @@ struct stratifier_data {
* is sorted by enonce1_64. */
stratum_instance_t *stratum_instances;
stratum_instance_t *disconnected_instances;
stratum_instance_t *dead_instances;
user_instance_t *user_instances;
@ -1042,6 +1047,41 @@ static stratum_instance_t *__instance_by_id(sdata_t *sdata, int64_t id)
return instance;
}
/* Increase the reference count of instance */
static void __inc_instance_ref(stratum_instance_t *instance)
{
instance->ref++;
}
/* Find an __instance_by_id and increase its reference count allowing us to
* use this instance outside of instance_lock without fear of it being
* dereferenced. */
static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id)
{
stratum_instance_t *instance;
ck_wlock(&sdata->instance_lock);
instance = __instance_by_id(sdata, id);
if (instance)
__inc_instance_ref(instance);
ck_wunlock(&sdata->instance_lock);
return instance;
}
/* Decrease the reference count of instance. */
static void __dec_instance_ref(stratum_instance_t *instance)
{
instance->ref--;
}
static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance)
{
ck_wlock(&sdata->instance_lock);
__dec_instance_ref(instance);
ck_wunlock(&sdata->instance_lock);
}
/* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id)
{
@ -1167,7 +1207,7 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance)
static void drop_client(sdata_t *sdata, int64_t id)
{
stratum_instance_t *client = NULL;
stratum_instance_t *client, *tmp;
bool dec = false;
LOGINFO("Stratifier dropping client %ld", id);
@ -1177,6 +1217,7 @@ static void drop_client(sdata_t *sdata, int64_t id)
if (client) {
stratum_instance_t *old_client = NULL;
__inc_instance_ref(client);
if (client->authorised) {
dec = true;
client->authorised = false;
@ -1187,11 +1228,34 @@ static void drop_client(sdata_t *sdata, int64_t id)
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64)
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
else {
if (client->user_instance)
DL_DELETE(client->user_instance->instances, client);
DL_APPEND(sdata->dead_instances, client);
}
}
ck_wunlock(&sdata->instance_lock);
/* Decrease worker count outside of instance_lock to avoid recursive
* locking */
if (dec)
dec_worker(client->ckp, client->user_instance);
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
ck_wlock(&sdata->instance_lock);
if (client)
__dec_instance_ref(client);
DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) {
if (!client->ref) {
LOGINFO("Stratifier discarding instance %ld", client->id);
DL_DELETE(sdata->dead_instances, client);
free(client->workername);
free(client->useragent);
free(client);
}
}
ck_wunlock(&sdata->instance_lock);
}
static void stratum_broadcast_message(sdata_t *sdata, const char *msg)
@ -1472,12 +1536,15 @@ static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1)
ret = false;
goto out;
}
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->enonce1_64 == enonce1) {
ret = false;
break;
}
}
ck_runlock(&sdata->instance_lock);
out:
return ret;
}
@ -1485,7 +1552,8 @@ out:
/* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number
* of bytes we have to work with when we are proxying with a split nonce2.
* When the proxy space is less than 32 bits to work with, we look for an
* unused enonce1 value and reject clients instead if there is no space left */
* unused enonce1 value and reject clients instead if there is no space left.
* Needs to be entered with client holding a ref count. */
static bool new_enonce1(stratum_instance_t *client)
{
sdata_t *sdata = client->ckp->data;
@ -1538,7 +1606,8 @@ static bool new_enonce1(stratum_instance_t *client)
static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg);
/* Extranonce1 must be set here */
/* Extranonce1 must be set here. Needs to be entered with client holding a ref
* count. */
static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, json_t *params_val)
{
sdata_t *sdata = client->ckp->data;
@ -1732,7 +1801,8 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
/* This simply strips off the first part of the workername and matches it to a
* user or creates a new one. */
* user or creates a new one. Needs to be entered with client holding a ref
* count. */
static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
const char *workername)
{
@ -1799,7 +1869,8 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
/* Send this to the database and parse the response to authorise a user
* and get SUID parameters back. We don't add these requests to the sdata->ckdbqueue
* since we have to wait for the response but this is done from the authoriser
* thread so it won't hold anything up but other authorisations. */
* thread so it won't hold anything up but other authorisations. Needs to be
* entered with client holding a ref count. */
static int send_recv_auth(stratum_instance_t *client)
{
user_instance_t *user_instance = client->user_instance;
@ -1886,7 +1957,8 @@ static int send_recv_auth(stratum_instance_t *client)
/* For sending auths to ckdb after we've already decided we can authorise
* these clients while ckdb is offline, based on an existing client of the
* same username already having been authorised. */
* same username already having been authorised. Needs to be entered with
* client holding a ref count. */
static void queue_delayed_auth(stratum_instance_t *client)
{
ckpool_t *ckp = client->ckp;
@ -1912,6 +1984,7 @@ static void queue_delayed_auth(stratum_instance_t *client)
ckdbq_add(ckp, ID_AUTH, val);
}
/* Needs to be entered with client holding a ref count. */
static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val,
const char *address, int *errnum)
{
@ -1982,6 +2055,7 @@ out:
return json_boolean(ret);
}
/* Needs to be entered with client holding a ref count. */
static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client)
{
json_t *json_msg;
@ -1991,6 +2065,7 @@ static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client)
stratum_add_send(sdata, json_msg, client->id);
}
/* Needs to be entered with client holding a ref count. */
static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg)
{
json_t *json_msg;
@ -2020,6 +2095,7 @@ static double sane_tdiff(tv_t *end, tv_t *start)
return tdiff;
}
/* Needs to be entered with client holding a ref count. */
static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid,
bool submit)
{
@ -2149,7 +2225,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool
stratum_send_diff(sdata, client);
}
/* We should already be holding the workbase_lock */
/* We should already be holding the workbase_lock. Needs to be entered with
* client holding a ref count. */
static void
test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, const uchar *hash,
double diff, const char *coinbase, int cblen, const char *nonce2, const char *nonce)
@ -2234,6 +2311,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c
ckdbq_add(ckp, ID_BLOCK, val);
}
/* Needs to be entered with client holding a ref count. */
static double submission_diff(stratum_instance_t *client, workbase_t *wb, const char *nonce2,
uint32_t ntime32, const char *nonce, uchar *hash)
{
@ -2314,7 +2392,8 @@ out_unlock:
return ret;
}
/* Submit a share in proxy mode to the parent pool. workbase_lock is held */
/* Submit a share in proxy mode to the parent pool. workbase_lock is held.
* Needs to be entered with client holding a ref count. */
static void submit_share(stratum_instance_t *client, int64_t jobid, const char *nonce2,
const char *ntime, const char *nonce, int msg_id)
{
@ -2335,6 +2414,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char *
#define JSON_ERR(err) json_string(SHARE_ERR(err))
/* Needs to be entered with client holding a ref count. */
static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
json_t *params_val, json_t **err_val)
{
@ -2631,6 +2711,7 @@ static void send_json_err(sdata_t *sdata, int64_t client_id, json_t *id_val, con
stratum_add_send(sdata, val, client_id);
}
/* Needs to be entered with client holding a ref count. */
static void update_client(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id)
{
stratum_send_update(sdata, client_id, true);
@ -2713,7 +2794,8 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif
}
/* Implement support for the diff in the params as well as the originally
* documented form of placing diff within the method. */
* documented form of placing diff within the method. Needs to be entered with
* client holding a ref count. */
static void suggest_diff(stratum_instance_t *client, const char *method, json_t *params_val)
{
json_t *arr_val = json_array_get(params_val, 0);
@ -2745,10 +2827,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
const char *method;
char buf[256];
ck_rlock(&sdata->instance_lock);
client = __instance_by_id(sdata, client_id);
ck_runlock(&sdata->instance_lock);
client = ref_instance_by_id(sdata, client_id);
if (unlikely(!client)) {
LOGINFO("Failed to find client id %ld in hashtable!", client_id);
return;
@ -2758,7 +2837,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
LOGINFO("Dropping client %d tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%ld", client->id);
send_proc(client->ckp->connector, buf);
return;
goto out;
}
/* Random broken clients send something not an integer as the id so we copy
@ -2779,7 +2858,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
stratum_add_send(sdata, val, client_id);
if (likely(client->subscribed))
update_client(sdata, client, client_id);
return;
goto out;
}
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
@ -2795,14 +2874,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
snprintf(buf, 255, "passthrough=%ld", client->id);
send_proc(client->ckp->connector, buf);
free(client);
return;
goto out;
}
if (cmdmatch(method, "mining.auth") && client->subscribed) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sauthq, jp);
return;
goto out;
}
/* We should only accept authorised requests from here on */
@ -2813,14 +2892,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
LOGINFO("Dropping unauthorised client %ld", client->id);
snprintf(buf, 255, "dropclient=%ld", client->id);
send_proc(client->ckp->connector, buf);
return;
goto out;
}
if (cmdmatch(method, "mining.submit")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp);
return;
goto out;
}
if (cmdmatch(method, "mining.suggest")) {
@ -2833,9 +2912,11 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp);
return;
goto out;
}
/* Unhandled message here */
out:
dec_instance_ref(sdata, client);
}
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg)
@ -2877,7 +2958,6 @@ out:
static void srecv_process(ckpool_t *ckp, char *buf)
{
stratum_instance_t *instance;
sdata_t *sdata = ckp->data;
smsg_t *msg;
json_t *val;
@ -2912,11 +2992,9 @@ static void srecv_process(ckpool_t *ckp, char *buf)
/* Parse the message here */
ck_wlock(&sdata->instance_lock);
instance = __instance_by_id(sdata, msg->client_id);
if (!instance) {
/* client_id instance doesn't exist yet, create one */
instance = __stratum_add_instance(ckp, msg->client_id);
}
/* client_id instance doesn't exist yet, create one */
if (!__instance_by_id(sdata, msg->client_id))
__stratum_add_instance(ckp, msg->client_id);
ck_wunlock(&sdata->instance_lock);
parse_instance_msg(sdata, msg);
@ -2968,17 +3046,14 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp)
client_id = jp->client_id;
ck_rlock(&sdata->instance_lock);
client = __instance_by_id(sdata, client_id);
ck_runlock(&sdata->instance_lock);
client = ref_instance_by_id(sdata, client_id);
if (unlikely(!client)) {
LOGINFO("Share processor failed to find client id %ld in hashtable!", client_id);
goto out;
}
if (unlikely(!client->authorised)) {
LOGDEBUG("Client %ld no longer authorised to submit shares", client_id);
goto out;
goto out_decref;
}
json_msg = json_object();
result_val = parse_submit(client, json_msg, jp->params, &err_val);
@ -2986,6 +3061,8 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp)
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(sdata, json_msg, client_id);
out_decref:
dec_instance_ref(sdata, client);
out:
discard_json_params(&jp);
}
@ -3000,9 +3077,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
client_id = jp->client_id;
ck_rlock(&sdata->instance_lock);
client = __instance_by_id(sdata, client_id);
ck_runlock(&sdata->instance_lock);
client = ref_instance_by_id(sdata, client_id);
if (unlikely(!client)) {
LOGINFO("Authoriser failed to find client id %ld in hashtable!", client_id);
@ -3038,6 +3113,8 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
stratum_send_diff(sdata, client);
}
out:
if (client)
dec_instance_ref(sdata, client);
discard_json_params(&jp);
}
@ -3146,7 +3223,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
{
const char *msg = json_string_value(jp->method),
*params = json_string_value(json_array_get(jp->params, 0));
stratum_instance_t *client;
stratum_instance_t *client = NULL;
sdata_t *sdata = ckp->data;
json_t *val, *hashes;
int64_t job_id = 0;
@ -3183,10 +3260,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
goto out_send;
}
ck_rlock(&sdata->instance_lock);
client = __instance_by_id(sdata, jp->client_id);
ck_runlock(&sdata->instance_lock);
client = ref_instance_by_id(sdata, jp->client_id);
if (unlikely(!client)) {
LOGINFO("send_transactions failed to find client id %ld in hashtable!",
jp->client_id);
@ -3215,6 +3289,8 @@ out_send:
stratum_add_send(sdata, val, jp->client_id);
out:
discard_json_params(&jp);
if (client)
dec_instance_ref(sdata, client);
}
/* Called every 20 seconds, we send the updated stats to ckdb of those users

Loading…
Cancel
Save