|
|
|
@ -286,6 +286,8 @@ struct stratifier_data {
|
|
|
|
|
/* Serialises sends/receives to ckdb if possible */ |
|
|
|
|
pthread_mutex_t ckdb_lock; |
|
|
|
|
|
|
|
|
|
bool ckdb_offline; |
|
|
|
|
|
|
|
|
|
/* Variable length enonce1 always refers back to a u64 */ |
|
|
|
|
union { |
|
|
|
|
uint64_t u64; |
|
|
|
@ -3326,9 +3328,33 @@ static void parse_ckdb_cmd(ckpool_t __maybe_unused *ckp, const char *cmd)
|
|
|
|
|
json_decref(val); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Test a value under lock and set it, returning the original value */ |
|
|
|
|
static bool test_and_set(bool *val, pthread_mutex_t *lock) |
|
|
|
|
{ |
|
|
|
|
bool ret; |
|
|
|
|
|
|
|
|
|
mutex_lock(lock); |
|
|
|
|
ret = *val; |
|
|
|
|
*val = true; |
|
|
|
|
mutex_unlock(lock); |
|
|
|
|
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool test_and_clear(bool *val, pthread_mutex_t *lock) |
|
|
|
|
{ |
|
|
|
|
bool ret; |
|
|
|
|
|
|
|
|
|
mutex_lock(lock); |
|
|
|
|
ret = *val; |
|
|
|
|
*val = false; |
|
|
|
|
mutex_unlock(lock); |
|
|
|
|
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ckdbq_process(ckpool_t *ckp, char *msg) |
|
|
|
|
{ |
|
|
|
|
static bool failed = false; |
|
|
|
|
sdata_t *sdata = ckp->data; |
|
|
|
|
char *buf = NULL; |
|
|
|
|
|
|
|
|
@ -3338,18 +3364,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
|
|
|
|
|
mutex_unlock(&sdata->ckdb_lock); |
|
|
|
|
|
|
|
|
|
if (unlikely(!buf)) { |
|
|
|
|
if (!failed) { |
|
|
|
|
failed = true; |
|
|
|
|
if (!test_and_set(&sdata->ckdb_offline, &sdata->ckdb_lock)) |
|
|
|
|
LOGWARNING("Failed to talk to ckdb, queueing messages"); |
|
|
|
|
} |
|
|
|
|
sleep(5); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
free(msg); |
|
|
|
|
if (failed) { |
|
|
|
|
failed = false; |
|
|
|
|
if (test_and_clear(&sdata->ckdb_offline, &sdata->ckdb_lock)) |
|
|
|
|
LOGWARNING("Successfully resumed talking to ckdb"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO: Process any requests from ckdb that are heartbeat responses
|
|
|
|
|
* with specific requests. */ |
|
|
|
|
if (likely(buf)) { |
|
|
|
|