Browse Source

Send stratifier json structures from the connector where possible, avoiding the duplication of json codec

master
Con Kolivas 9 years ago
parent
commit
84c340c1c4
  1. 14
      src/connector.c
  2. 21
      src/stratifier.c
  3. 3
      src/stratifier.h

14
src/connector.c

@ -20,6 +20,7 @@
#include "libckpool.h"
#include "uthash.h"
#include "utlist.h"
#include "stratifier.h"
#define MAX_MSGSIZE 1024
@ -529,13 +530,16 @@ reparse:
* do this unlocked as the occasional false negative can be
* filtered by the stratifier. */
if (likely(!client->invalid)) {
if (!ckp->passthrough || ckp->node)
send_proc(ckp->stratifier, s);
if (!ckp->passthrough || ckp->node) {
stratifier_add_recv(ckp, val);
val = NULL;
}
if (ckp->passthrough)
send_proc(ckp->generator, s);
}
free(s);
if (val)
json_decref(val);
}
client->bufofs -= buflen;
@ -965,17 +969,13 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
}
if (ckp->node) {
json_t *val = json_loads(buf, 0, NULL);
char *msg;
if (!val) // Can happen if client sent invalid json message
goto out;
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server));
msg = json_dumps(val, JSON_COMPACT);
json_decref(val);
send_proc(ckp->stratifier, msg);
free(msg);
stratifier_add_recv(ckp, val);
}
if (ckp->redirector && !client->redirected)
test_redirector_shares(ckp, client, buf);

21
src/stratifier.c

@ -3914,7 +3914,7 @@ static void get_poolstats(sdata_t *sdata, int *sockd)
_Close(sockd);
}
static void srecv_process(ckpool_t *ckp, char *buf);
static void srecv_process(ckpool_t *ckp, json_t *val);
/* For emergency use only, flushes all pending ckdbq messages */
static void ckdbq_flush(sdata_t *sdata)
@ -6498,21 +6498,16 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat
parse_method(ckp, sdata, client, client_id, id_val, method, params);
}
static void srecv_process(ckpool_t *ckp, char *buf)
static void srecv_process(ckpool_t *ckp, json_t *val)
{
char address[INET6_ADDRSTRLEN], *buf;
bool noid = false, dropped = false;
char address[INET6_ADDRSTRLEN];
sdata_t *sdata = ckp->sdata;
stratum_instance_t *client;
smsg_t *msg;
json_t *val;
int server;
val = json_loads(buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Received unrecognised non-json message: %s", buf);
goto out;
}
buf = json_dumps(val, JSON_COMPACT);
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val;
val = json_object_get(msg->json_msg, "client_id");
@ -6575,10 +6570,16 @@ static void srecv_process(ckpool_t *ckp, char *buf)
dec_instance_ref(sdata, client);
out_freemsg:
free_smsg(msg);
out:
free(buf);
}
void stratifier_add_recv(ckpool_t *ckp, json_t *val)
{
sdata_t *sdata = ckp->sdata;
ckmsgq_add(sdata->srecvs, val);
}
static void ssend_process(ckpool_t *ckp, smsg_t *msg)
{
char *s;

3
src/stratifier.h

@ -10,6 +10,9 @@
#ifndef STRATIFIER_H
#define STRATIFIER_H
#include "ckpool.h"
void stratifier_add_recv(ckpool_t *ckp, json_t *val);
void *stratifier(void *arg);
#endif /* STRATIFIER_H */

Loading…
Cancel
Save