Browse Source

Revert "Revert "Add atomically increasing sequence number to all ckdb messages""

master
kanoi 10 years ago
parent
commit
b5946287b0
  1. 21
      src/stratifier.c

21
src/stratifier.c

@ -299,6 +299,10 @@ struct stratifier_data {
/* Serialises sends/receives to ckdb if possible */
mutex_t ckdb_lock;
/* Protects sequence numbers */
mutex_t ckdb_msg_lock;
/* Incrementing sequence number */
int ckdb_seq;
bool ckdb_offline;
@ -611,12 +615,18 @@ static char *status_chars = "|/-\\";
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
* log and returns the malloced message. */
static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype)
static char *ckdb_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const int idtype)
{
char *json_msg = json_dumps(val, JSON_COMPACT);
char *json_msg;
char logname[512];
char *ret = NULL;
/* Set the atomically incrementing sequence number */
mutex_lock(&sdata->ckdb_msg_lock);
json_set_int(val, "seq", sdata->ckdb_seq++);
mutex_unlock(&sdata->ckdb_msg_lock);
json_msg = json_dumps(val, JSON_COMPACT);
if (unlikely(!json_msg))
goto out;
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg);
@ -655,7 +665,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
if (CKP_STANDALONE(ckp))
return json_decref(val);
json_msg = ckdb_msg(ckp, val, idtype);
json_msg = ckdb_msg(ckp, sdata, val, idtype);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json from %s %s:%d", file, func, line);
return;
@ -2349,9 +2359,9 @@ static int send_recv_auth(stratum_instance_t *client)
json_set_string(val, "createcode", __func__);
json_set_string(val, "createinet", client->address);
if (user->btcaddress)
json_msg = ckdb_msg(ckp, val, ID_ADDRAUTH);
json_msg = ckdb_msg(ckp, sdata, val, ID_ADDRAUTH);
else
json_msg = ckdb_msg(ckp, val, ID_AUTH);
json_msg = ckdb_msg(ckp, sdata, val, ID_AUTH);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json in send_recv_auth");
goto out;
@ -4477,6 +4487,7 @@ int stratifier(proc_instance_t *pi)
cklock_init(&sdata->instance_lock);
mutex_init(&sdata->ckdb_lock);
mutex_init(&sdata->ckdb_msg_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;

Loading…
Cancel
Save