From 0941d5a7516f6fa38000ceb3da16459ed9cb26c7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 14 Nov 2014 21:41:54 +1100 Subject: [PATCH] Add disconnected stratum clients to a linked list and use reference counting to know when they are no longer being referenced and can have their data freed. --- src/stratifier.c | 154 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 39 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e6292f3a..f48f7833 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -209,6 +209,10 @@ struct stratum_instance { stratum_instance_t *next; stratum_instance_t *prev; + /* Reference count for when this instance is used outside of the + * instance_lock */ + int ref; + char enonce1[32]; uchar enonce1bin[16]; char enonce1var[12]; @@ -303,6 +307,7 @@ struct stratifier_data { * is sorted by enonce1_64. */ stratum_instance_t *stratum_instances; stratum_instance_t *disconnected_instances; + stratum_instance_t *dead_instances; user_instance_t *user_instances; @@ -1042,6 +1047,41 @@ static stratum_instance_t *__instance_by_id(sdata_t *sdata, int64_t id) return instance; } +/* Increase the reference count of instance */ +static void __inc_instance_ref(stratum_instance_t *instance) +{ + instance->ref++; +} + +/* Find an __instance_by_id and increase its reference count allowing us to + * use this instance outside of instance_lock without fear of it being + * dereferenced. */ +static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id) +{ + stratum_instance_t *instance; + + ck_wlock(&sdata->instance_lock); + instance = __instance_by_id(sdata, id); + if (instance) + __inc_instance_ref(instance); + ck_wunlock(&sdata->instance_lock); + + return instance; +} + +/* Decrease the reference count of instance. */ +static void __dec_instance_ref(stratum_instance_t *instance) +{ + instance->ref--; +} + +static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance) +{ + ck_wlock(&sdata->instance_lock); + __dec_instance_ref(instance); + ck_wunlock(&sdata->instance_lock); +} + /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id) { @@ -1167,7 +1207,7 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) static void drop_client(sdata_t *sdata, int64_t id) { - stratum_instance_t *client = NULL; + stratum_instance_t *client, *tmp; bool dec = false; LOGINFO("Stratifier dropping client %ld", id); @@ -1177,6 +1217,7 @@ static void drop_client(sdata_t *sdata, int64_t id) if (client) { stratum_instance_t *old_client = NULL; + __inc_instance_ref(client); if (client->authorised) { dec = true; client->authorised = false; @@ -1187,11 +1228,34 @@ static void drop_client(sdata_t *sdata, int64_t id) /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64) HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); + else { + if (client->user_instance) + DL_DELETE(client->user_instance->instances, client); + DL_APPEND(sdata->dead_instances, client); + } } ck_wunlock(&sdata->instance_lock); + /* Decrease worker count outside of instance_lock to avoid recursive + * locking */ if (dec) dec_worker(client->ckp, client->user_instance); + + /* Cull old unused clients lazily when there are no more reference + * counts for them. */ + ck_wlock(&sdata->instance_lock); + if (client) + __dec_instance_ref(client); + DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { + if (!client->ref) { + LOGINFO("Stratifier discarding instance %ld", client->id); + DL_DELETE(sdata->dead_instances, client); + free(client->workername); + free(client->useragent); + free(client); + } + } + ck_wunlock(&sdata->instance_lock); } static void stratum_broadcast_message(sdata_t *sdata, const char *msg) @@ -1472,12 +1536,15 @@ static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1) ret = false; goto out; } + + ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->enonce1_64 == enonce1) { ret = false; break; } } + ck_runlock(&sdata->instance_lock); out: return ret; } @@ -1485,7 +1552,8 @@ out: /* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number * of bytes we have to work with when we are proxying with a split nonce2. * When the proxy space is less than 32 bits to work with, we look for an - * unused enonce1 value and reject clients instead if there is no space left */ + * unused enonce1 value and reject clients instead if there is no space left. + * Needs to be entered with client holding a ref count. */ static bool new_enonce1(stratum_instance_t *client) { sdata_t *sdata = client->ckp->data; @@ -1538,7 +1606,8 @@ static bool new_enonce1(stratum_instance_t *client) static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg); -/* Extranonce1 must be set here */ +/* Extranonce1 must be set here. Needs to be entered with client holding a ref + * count. */ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, json_t *params_val) { sdata_t *sdata = client->ckp->data; @@ -1732,7 +1801,8 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) /* This simply strips off the first part of the workername and matches it to a - * user or creates a new one. */ + * user or creates a new one. Needs to be entered with client holding a ref + * count. */ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, const char *workername) { @@ -1799,7 +1869,8 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, /* Send this to the database and parse the response to authorise a user * and get SUID parameters back. We don't add these requests to the sdata->ckdbqueue * since we have to wait for the response but this is done from the authoriser - * thread so it won't hold anything up but other authorisations. */ + * thread so it won't hold anything up but other authorisations. Needs to be + * entered with client holding a ref count. */ static int send_recv_auth(stratum_instance_t *client) { user_instance_t *user_instance = client->user_instance; @@ -1886,7 +1957,8 @@ static int send_recv_auth(stratum_instance_t *client) /* For sending auths to ckdb after we've already decided we can authorise * these clients while ckdb is offline, based on an existing client of the - * same username already having been authorised. */ + * same username already having been authorised. Needs to be entered with + * client holding a ref count. */ static void queue_delayed_auth(stratum_instance_t *client) { ckpool_t *ckp = client->ckp; @@ -1912,6 +1984,7 @@ static void queue_delayed_auth(stratum_instance_t *client) ckdbq_add(ckp, ID_AUTH, val); } +/* Needs to be entered with client holding a ref count. */ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val, const char *address, int *errnum) { @@ -1982,6 +2055,7 @@ out: return json_boolean(ret); } +/* Needs to be entered with client holding a ref count. */ static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client) { json_t *json_msg; @@ -1991,6 +2065,7 @@ static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client) stratum_add_send(sdata, json_msg, client->id); } +/* Needs to be entered with client holding a ref count. */ static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg) { json_t *json_msg; @@ -2020,6 +2095,7 @@ static double sane_tdiff(tv_t *end, tv_t *start) return tdiff; } +/* Needs to be entered with client holding a ref count. */ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool valid, bool submit) { @@ -2149,7 +2225,8 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool stratum_send_diff(sdata, client); } -/* We should already be holding the workbase_lock */ +/* We should already be holding the workbase_lock. Needs to be entered with + * client holding a ref count. */ static void test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, const uchar *hash, double diff, const char *coinbase, int cblen, const char *nonce2, const char *nonce) @@ -2234,6 +2311,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c ckdbq_add(ckp, ID_BLOCK, val); } +/* Needs to be entered with client holding a ref count. */ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const char *nonce2, uint32_t ntime32, const char *nonce, uchar *hash) { @@ -2314,7 +2392,8 @@ out_unlock: return ret; } -/* Submit a share in proxy mode to the parent pool. workbase_lock is held */ +/* Submit a share in proxy mode to the parent pool. workbase_lock is held. + * Needs to be entered with client holding a ref count. */ static void submit_share(stratum_instance_t *client, int64_t jobid, const char *nonce2, const char *ntime, const char *nonce, int msg_id) { @@ -2335,6 +2414,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char * #define JSON_ERR(err) json_string(SHARE_ERR(err)) +/* Needs to be entered with client holding a ref count. */ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, json_t *params_val, json_t **err_val) { @@ -2631,6 +2711,7 @@ static void send_json_err(sdata_t *sdata, int64_t client_id, json_t *id_val, con stratum_add_send(sdata, val, client_id); } +/* Needs to be entered with client holding a ref count. */ static void update_client(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id) { stratum_send_update(sdata, client_id, true); @@ -2713,7 +2794,8 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif } /* Implement support for the diff in the params as well as the originally - * documented form of placing diff within the method. */ + * documented form of placing diff within the method. Needs to be entered with + * client holding a ref count. */ static void suggest_diff(stratum_instance_t *client, const char *method, json_t *params_val) { json_t *arr_val = json_array_get(params_val, 0); @@ -2745,10 +2827,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val const char *method; char buf[256]; - ck_rlock(&sdata->instance_lock); - client = __instance_by_id(sdata, client_id); - ck_runlock(&sdata->instance_lock); - + client = ref_instance_by_id(sdata, client_id); if (unlikely(!client)) { LOGINFO("Failed to find client id %ld in hashtable!", client_id); return; @@ -2758,7 +2837,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val LOGINFO("Dropping client %d tagged for lazy invalidation", client_id); snprintf(buf, 255, "dropclient=%ld", client->id); send_proc(client->ckp->connector, buf); - return; + goto out; } /* Random broken clients send something not an integer as the id so we copy @@ -2779,7 +2858,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val stratum_add_send(sdata, val, client_id); if (likely(client->subscribed)) update_client(sdata, client, client_id); - return; + goto out; } if (unlikely(cmdmatch(method, "mining.passthrough"))) { @@ -2795,14 +2874,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); free(client); - return; + goto out; } if (cmdmatch(method, "mining.auth") && client->subscribed) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->sauthq, jp); - return; + goto out; } /* We should only accept authorised requests from here on */ @@ -2813,14 +2892,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val LOGINFO("Dropping unauthorised client %ld", client->id); snprintf(buf, 255, "dropclient=%ld", client->id); send_proc(client->ckp->connector, buf); - return; + goto out; } if (cmdmatch(method, "mining.submit")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->sshareq, jp); - return; + goto out; } if (cmdmatch(method, "mining.suggest")) { @@ -2833,9 +2912,11 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); ckmsgq_add(sdata->stxnq, jp); - return; + goto out; } /* Unhandled message here */ +out: + dec_instance_ref(sdata, client); } static void parse_instance_msg(sdata_t *sdata, smsg_t *msg) @@ -2877,7 +2958,6 @@ out: static void srecv_process(ckpool_t *ckp, char *buf) { - stratum_instance_t *instance; sdata_t *sdata = ckp->data; smsg_t *msg; json_t *val; @@ -2912,11 +2992,9 @@ static void srecv_process(ckpool_t *ckp, char *buf) /* Parse the message here */ ck_wlock(&sdata->instance_lock); - instance = __instance_by_id(sdata, msg->client_id); - if (!instance) { - /* client_id instance doesn't exist yet, create one */ - instance = __stratum_add_instance(ckp, msg->client_id); - } + /* client_id instance doesn't exist yet, create one */ + if (!__instance_by_id(sdata, msg->client_id)) + __stratum_add_instance(ckp, msg->client_id); ck_wunlock(&sdata->instance_lock); parse_instance_msg(sdata, msg); @@ -2968,17 +3046,14 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) client_id = jp->client_id; - ck_rlock(&sdata->instance_lock); - client = __instance_by_id(sdata, client_id); - ck_runlock(&sdata->instance_lock); - + client = ref_instance_by_id(sdata, client_id); if (unlikely(!client)) { LOGINFO("Share processor failed to find client id %ld in hashtable!", client_id); goto out; } if (unlikely(!client->authorised)) { LOGDEBUG("Client %ld no longer authorised to submit shares", client_id); - goto out; + goto out_decref; } json_msg = json_object(); result_val = parse_submit(client, json_msg, jp->params, &err_val); @@ -2986,6 +3061,8 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_nocheck(json_msg, "id", jp->id_val); stratum_add_send(sdata, json_msg, client_id); +out_decref: + dec_instance_ref(sdata, client); out: discard_json_params(&jp); } @@ -3000,9 +3077,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) client_id = jp->client_id; - ck_rlock(&sdata->instance_lock); - client = __instance_by_id(sdata, client_id); - ck_runlock(&sdata->instance_lock); + client = ref_instance_by_id(sdata, client_id); if (unlikely(!client)) { LOGINFO("Authoriser failed to find client id %ld in hashtable!", client_id); @@ -3038,6 +3113,8 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) stratum_send_diff(sdata, client); } out: + if (client) + dec_instance_ref(sdata, client); discard_json_params(&jp); } @@ -3146,7 +3223,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) { const char *msg = json_string_value(jp->method), *params = json_string_value(json_array_get(jp->params, 0)); - stratum_instance_t *client; + stratum_instance_t *client = NULL; sdata_t *sdata = ckp->data; json_t *val, *hashes; int64_t job_id = 0; @@ -3183,10 +3260,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out_send; } - ck_rlock(&sdata->instance_lock); - client = __instance_by_id(sdata, jp->client_id); - ck_runlock(&sdata->instance_lock); - + client = ref_instance_by_id(sdata, jp->client_id); if (unlikely(!client)) { LOGINFO("send_transactions failed to find client id %ld in hashtable!", jp->client_id); @@ -3215,6 +3289,8 @@ out_send: stratum_add_send(sdata, val, jp->client_id); out: discard_json_params(&jp); + if (client) + dec_instance_ref(sdata, client); } /* Called every 20 seconds, we send the updated stats to ckdb of those users