diff --git a/src/stratifier.c b/src/stratifier.c index d5c052d4..c69221a1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -244,6 +244,7 @@ struct stratum_instance { char address[INET6_ADDRSTRLEN]; bool subscribed; bool authorised; + bool dropped; bool idle; int reject; /* Indicator that this client is having a run of rejects * or other problem and should be dropped lazily if @@ -285,6 +286,8 @@ struct stratifier_data { /* Serialises sends/receives to ckdb if possible */ pthread_mutex_t ckdb_lock; + bool ckdb_offline; + /* Variable length enonce1 always refers back to a u64 */ union { uint64_t u64; @@ -1103,19 +1106,50 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id) return instance; } -/* Decrease the reference count of instance. */ -static void __dec_instance_ref(stratum_instance_t *instance) +static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance, + int64_t id) { - instance->ref--; + stratum_instance_t *old_client = NULL; + + HASH_DEL(sdata->stratum_instances, client); + if (instance) + DL_DELETE(instance->instances, client); + HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); + /* Only keep around one copy of the old client in server mode */ + if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { + LOGNOTICE("Disconnecting client %ld %s %s", id, client->workername, + client->dropped ? "lazily" : ""); + HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); + sdata->stats.disconnected++; + client->disconnected_time = time(NULL); + } else { + if (client->workername) + LOGNOTICE("Dropping client %ld %s %s", id, client->workername, + client->dropped ? "lazily" : ""); + else + LOGINFO("Dropping workerless client %ld %s", id, client->dropped ? "lazily" : ""); + __add_dead(sdata, client); + } } -static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance) +/* Decrease the reference count of instance. */ +static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance, const char *file, + const char *func, const int line) { ck_wlock(&sdata->instance_lock); - __dec_instance_ref(instance); + if (unlikely(--instance->ref < 0)) { + LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); + instance->ref = 0; + } + /* See if there are any instances that were dropped that could not be + * moved due to holding a reference and drop them now. */ + if (unlikely(instance->dropped && !instance->ref)) + __drop_client(sdata, instance, instance->user_instance, instance->id); ck_wunlock(&sdata->instance_lock); } +#define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__) + /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int server) { @@ -1260,33 +1294,18 @@ static void drop_client(sdata_t *sdata, int64_t id) ck_wlock(&sdata->instance_lock); client = __instance_by_id(sdata, id); - if (client && likely(!client->ref)) { - stratum_instance_t *old_client = NULL; - + if (client) { instance = client->user_instance; if (client->authorised) { dec = true; - client->authorised = false; ckp = client->ckp; } - - HASH_DEL(sdata->stratum_instances, client); - if (instance) - DL_DELETE(instance->instances, client); - HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); - /* Only keep around one copy of the old client in server mode */ - if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { - LOGNOTICE("Disconnecting client %ld %s", client->id, client->workername); - HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); - sdata->stats.disconnected++; - client->disconnected_time = time(NULL); - } else { - if (client->workername) - LOGNOTICE("Dropping client %ld %s", client->id, client->workername); - else - LOGINFO("Dropping workerless client %ld", client->id); - __add_dead(sdata, client); - } + /* If the client is still holding a reference, don't drop them + * now but wait till the reference is dropped */ + if (likely(!client->ref)) + __drop_client(sdata, client, instance, id); + else + client->dropped = true; } /* Old disconnected instances will not have any valid shares so remove @@ -3309,9 +3328,33 @@ static void parse_ckdb_cmd(ckpool_t __maybe_unused *ckp, const char *cmd) json_decref(val); } +/* Test a value under lock and set it, returning the original value */ +static bool test_and_set(bool *val, pthread_mutex_t *lock) +{ + bool ret; + + mutex_lock(lock); + ret = *val; + *val = true; + mutex_unlock(lock); + + return ret; +} + +static bool test_and_clear(bool *val, pthread_mutex_t *lock) +{ + bool ret; + + mutex_lock(lock); + ret = *val; + *val = false; + mutex_unlock(lock); + + return ret; +} + static void ckdbq_process(ckpool_t *ckp, char *msg) { - static bool failed = false; sdata_t *sdata = ckp->data; char *buf = NULL; @@ -3321,18 +3364,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) mutex_unlock(&sdata->ckdb_lock); if (unlikely(!buf)) { - if (!failed) { - failed = true; + if (!test_and_set(&sdata->ckdb_offline, &sdata->ckdb_lock)) LOGWARNING("Failed to talk to ckdb, queueing messages"); - } sleep(5); } } free(msg); - if (failed) { - failed = false; + if (test_and_clear(&sdata->ckdb_offline, &sdata->ckdb_lock)) LOGWARNING("Successfully resumed talking to ckdb"); - } + /* TODO: Process any requests from ckdb that are heartbeat responses * with specific requests. */ if (likely(buf)) { @@ -3467,6 +3507,11 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) time_t now_t; ts_t ts_now; + if (sdata->ckdb_offline) { + LOGDEBUG("Not queueing workerstats due to ckdb offline"); + return; + } + if (++sdata->stats.userstats_cycle > 0x1f) sdata->stats.userstats_cycle = 0;