diff --git a/src/stratifier.c b/src/stratifier.c index a8347a64..839e70eb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -286,6 +286,8 @@ struct stratifier_data { /* Serialises sends/receives to ckdb if possible */ pthread_mutex_t ckdb_lock; + bool ckdb_offline; + /* Variable length enonce1 always refers back to a u64 */ union { uint64_t u64; @@ -3326,9 +3328,33 @@ static void parse_ckdb_cmd(ckpool_t __maybe_unused *ckp, const char *cmd) json_decref(val); } +/* Test a value under lock and set it, returning the original value */ +static bool test_and_set(bool *val, pthread_mutex_t *lock) +{ + bool ret; + + mutex_lock(lock); + ret = *val; + *val = true; + mutex_unlock(lock); + + return ret; +} + +static bool test_and_clear(bool *val, pthread_mutex_t *lock) +{ + bool ret; + + mutex_lock(lock); + ret = *val; + *val = false; + mutex_unlock(lock); + + return ret; +} + static void ckdbq_process(ckpool_t *ckp, char *msg) { - static bool failed = false; sdata_t *sdata = ckp->data; char *buf = NULL; @@ -3338,18 +3364,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) mutex_unlock(&sdata->ckdb_lock); if (unlikely(!buf)) { - if (!failed) { - failed = true; + if (!test_and_set(&sdata->ckdb_offline, &sdata->ckdb_lock)) LOGWARNING("Failed to talk to ckdb, queueing messages"); - } sleep(5); } } free(msg); - if (failed) { - failed = false; + if (test_and_clear(&sdata->ckdb_offline, &sdata->ckdb_lock)) LOGWARNING("Successfully resumed talking to ckdb"); - } + /* TODO: Process any requests from ckdb that are heartbeat responses * with specific requests. */ if (likely(buf)) {