|
|
@ -299,6 +299,10 @@ struct stratifier_data { |
|
|
|
|
|
|
|
|
|
|
|
/* Serialises sends/receives to ckdb if possible */ |
|
|
|
/* Serialises sends/receives to ckdb if possible */ |
|
|
|
mutex_t ckdb_lock; |
|
|
|
mutex_t ckdb_lock; |
|
|
|
|
|
|
|
/* Protects sequence numbers */ |
|
|
|
|
|
|
|
mutex_t ckdb_msg_lock; |
|
|
|
|
|
|
|
/* Incrementing sequence number */ |
|
|
|
|
|
|
|
int ckdb_seq; |
|
|
|
|
|
|
|
|
|
|
|
bool ckdb_offline; |
|
|
|
bool ckdb_offline; |
|
|
|
|
|
|
|
|
|
|
@ -611,12 +615,18 @@ static char *status_chars = "|/-\\"; |
|
|
|
|
|
|
|
|
|
|
|
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
|
|
|
|
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
|
|
|
|
* log and returns the malloced message. */ |
|
|
|
* log and returns the malloced message. */ |
|
|
|
static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype) |
|
|
|
static char *ckdb_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const int idtype) |
|
|
|
{ |
|
|
|
{ |
|
|
|
char *json_msg = json_dumps(val, JSON_COMPACT); |
|
|
|
char *json_msg; |
|
|
|
char logname[512]; |
|
|
|
char logname[512]; |
|
|
|
char *ret = NULL; |
|
|
|
char *ret = NULL; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Set the atomically incrementing sequence number */ |
|
|
|
|
|
|
|
mutex_lock(&sdata->ckdb_msg_lock); |
|
|
|
|
|
|
|
json_set_int(val, "seq", sdata->ckdb_seq++); |
|
|
|
|
|
|
|
mutex_unlock(&sdata->ckdb_msg_lock); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
json_msg = json_dumps(val, JSON_COMPACT); |
|
|
|
if (unlikely(!json_msg)) |
|
|
|
if (unlikely(!json_msg)) |
|
|
|
goto out; |
|
|
|
goto out; |
|
|
|
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg); |
|
|
|
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg); |
|
|
@ -655,7 +665,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char |
|
|
|
if (CKP_STANDALONE(ckp)) |
|
|
|
if (CKP_STANDALONE(ckp)) |
|
|
|
return json_decref(val); |
|
|
|
return json_decref(val); |
|
|
|
|
|
|
|
|
|
|
|
json_msg = ckdb_msg(ckp, val, idtype); |
|
|
|
json_msg = ckdb_msg(ckp, sdata, val, idtype); |
|
|
|
if (unlikely(!json_msg)) { |
|
|
|
if (unlikely(!json_msg)) { |
|
|
|
LOGWARNING("Failed to dump json from %s %s:%d", file, func, line); |
|
|
|
LOGWARNING("Failed to dump json from %s %s:%d", file, func, line); |
|
|
|
return; |
|
|
|
return; |
|
|
@ -2349,9 +2359,9 @@ static int send_recv_auth(stratum_instance_t *client) |
|
|
|
json_set_string(val, "createcode", __func__); |
|
|
|
json_set_string(val, "createcode", __func__); |
|
|
|
json_set_string(val, "createinet", client->address); |
|
|
|
json_set_string(val, "createinet", client->address); |
|
|
|
if (user->btcaddress) |
|
|
|
if (user->btcaddress) |
|
|
|
json_msg = ckdb_msg(ckp, val, ID_ADDRAUTH); |
|
|
|
json_msg = ckdb_msg(ckp, sdata, val, ID_ADDRAUTH); |
|
|
|
else |
|
|
|
else |
|
|
|
json_msg = ckdb_msg(ckp, val, ID_AUTH); |
|
|
|
json_msg = ckdb_msg(ckp, sdata, val, ID_AUTH); |
|
|
|
if (unlikely(!json_msg)) { |
|
|
|
if (unlikely(!json_msg)) { |
|
|
|
LOGWARNING("Failed to dump json in send_recv_auth"); |
|
|
|
LOGWARNING("Failed to dump json in send_recv_auth"); |
|
|
|
goto out; |
|
|
|
goto out; |
|
|
@ -4467,6 +4477,7 @@ int stratifier(proc_instance_t *pi) |
|
|
|
cklock_init(&sdata->instance_lock); |
|
|
|
cklock_init(&sdata->instance_lock); |
|
|
|
|
|
|
|
|
|
|
|
mutex_init(&sdata->ckdb_lock); |
|
|
|
mutex_init(&sdata->ckdb_lock); |
|
|
|
|
|
|
|
mutex_init(&sdata->ckdb_msg_lock); |
|
|
|
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); |
|
|
|
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); |
|
|
|
/* Create half as many share processing threads as there are CPUs */ |
|
|
|
/* Create half as many share processing threads as there are CPUs */ |
|
|
|
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; |
|
|
|
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; |
|
|
|