kanoi 10 years ago
parent
commit
ffe030e976
  1. 111
      src/stratifier.c

111
src/stratifier.c

@ -244,6 +244,7 @@ struct stratum_instance {
char address[INET6_ADDRSTRLEN];
bool subscribed;
bool authorised;
bool dropped;
bool idle;
int reject; /* Indicator that this client is having a run of rejects
* or other problem and should be dropped lazily if
@ -285,6 +286,8 @@ struct stratifier_data {
/* Serialises sends/receives to ckdb if possible */
pthread_mutex_t ckdb_lock;
bool ckdb_offline;
/* Variable length enonce1 always refers back to a u64 */
union {
uint64_t u64;
@ -1103,19 +1106,50 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id)
return instance;
}
/* Decrease the reference count of instance. */
static void __dec_instance_ref(stratum_instance_t *instance)
static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance,
int64_t id)
{
instance->ref--;
stratum_instance_t *old_client = NULL;
HASH_DEL(sdata->stratum_instances, client);
if (instance)
DL_DELETE(instance->instances, client);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
LOGNOTICE("Disconnecting client %ld %s %s", id, client->workername,
client->dropped ? "lazily" : "");
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
client->disconnected_time = time(NULL);
} else {
if (client->workername)
LOGNOTICE("Dropping client %ld %s %s", id, client->workername,
client->dropped ? "lazily" : "");
else
LOGINFO("Dropping workerless client %ld %s", id, client->dropped ? "lazily" : "");
__add_dead(sdata, client);
}
}
static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance)
/* Decrease the reference count of instance. */
static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance, const char *file,
const char *func, const int line)
{
ck_wlock(&sdata->instance_lock);
__dec_instance_ref(instance);
if (unlikely(--instance->ref < 0)) {
LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line);
instance->ref = 0;
}
/* See if there are any instances that were dropped that could not be
* moved due to holding a reference and drop them now. */
if (unlikely(instance->dropped && !instance->ref))
__drop_client(sdata, instance, instance->user_instance, instance->id);
ck_wunlock(&sdata->instance_lock);
}
#define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__)
/* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int server)
{
@ -1260,33 +1294,18 @@ static void drop_client(sdata_t *sdata, int64_t id)
ck_wlock(&sdata->instance_lock);
client = __instance_by_id(sdata, id);
if (client && likely(!client->ref)) {
stratum_instance_t *old_client = NULL;
if (client) {
instance = client->user_instance;
if (client->authorised) {
dec = true;
client->authorised = false;
ckp = client->ckp;
}
HASH_DEL(sdata->stratum_instances, client);
if (instance)
DL_DELETE(instance->instances, client);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) {
LOGNOTICE("Disconnecting client %ld %s", client->id, client->workername);
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
client->disconnected_time = time(NULL);
} else {
if (client->workername)
LOGNOTICE("Dropping client %ld %s", client->id, client->workername);
/* If the client is still holding a reference, don't drop them
* now but wait till the reference is dropped */
if (likely(!client->ref))
__drop_client(sdata, client, instance, id);
else
LOGINFO("Dropping workerless client %ld", client->id);
__add_dead(sdata, client);
}
client->dropped = true;
}
/* Old disconnected instances will not have any valid shares so remove
@ -3309,9 +3328,33 @@ static void parse_ckdb_cmd(ckpool_t __maybe_unused *ckp, const char *cmd)
json_decref(val);
}
/* Test a value under lock and set it, returning the original value */
static bool test_and_set(bool *val, pthread_mutex_t *lock)
{
bool ret;
mutex_lock(lock);
ret = *val;
*val = true;
mutex_unlock(lock);
return ret;
}
static bool test_and_clear(bool *val, pthread_mutex_t *lock)
{
bool ret;
mutex_lock(lock);
ret = *val;
*val = false;
mutex_unlock(lock);
return ret;
}
static void ckdbq_process(ckpool_t *ckp, char *msg)
{
static bool failed = false;
sdata_t *sdata = ckp->data;
char *buf = NULL;
@ -3321,18 +3364,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
mutex_unlock(&sdata->ckdb_lock);
if (unlikely(!buf)) {
if (!failed) {
failed = true;
if (!test_and_set(&sdata->ckdb_offline, &sdata->ckdb_lock))
LOGWARNING("Failed to talk to ckdb, queueing messages");
}
sleep(5);
}
}
free(msg);
if (failed) {
failed = false;
if (test_and_clear(&sdata->ckdb_offline, &sdata->ckdb_lock))
LOGWARNING("Successfully resumed talking to ckdb");
}
/* TODO: Process any requests from ckdb that are heartbeat responses
* with specific requests. */
if (likely(buf)) {
@ -3467,6 +3507,11 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
time_t now_t;
ts_t ts_now;
if (sdata->ckdb_offline) {
LOGDEBUG("Not queueing workerstats due to ckdb offline");
return;
}
if (++sdata->stats.userstats_cycle > 0x1f)
sdata->stats.userstats_cycle = 0;

Loading…
Cancel
Save