Browse Source

Send the connector json structures from the stratifier to avoid json codec duplication

master
Con Kolivas 9 years ago
parent
commit
43a18946ce
  1. 22
      src/connector.c
  2. 1
      src/connector.h
  3. 10
      src/stratifier.c
  4. 2
      src/stratifier.h

22
src/connector.c

@ -1224,17 +1224,11 @@ out:
return ret; return ret;
} }
static void client_message_processor(ckpool_t *ckp, char *buf) static void client_message_processor(ckpool_t *ckp, json_t *json_msg)
{ {
json_t *json_msg = json_loads(buf, 0, NULL);
int64_t client_id; int64_t client_id;
char *msg; char *msg;
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
goto out;
}
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
@ -1246,8 +1240,13 @@ static void client_message_processor(ckpool_t *ckp, char *buf)
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(ckp->cdata, client_id, msg); send_client(ckp->cdata, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
out: }
free(buf);
void connector_add_message(ckpool_t *ckp, json_t *val)
{
cdata_t *cdata = ckp->cdata;
ckmsgq_add(cdata->cmpq, val);
} }
/* Send the passthrough the terminate node.method */ /* Send the passthrough the terminate node.method */
@ -1358,10 +1357,7 @@ retry:
LOGDEBUG("Connector received message: %s", buf); LOGDEBUG("Connector received message: %s", buf);
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (likely(buf[0] == '{')) { if (cmdmatch(buf, "upstream=")) {
ckmsgq_add(cdata->cmpq, buf);
umsg->buf = NULL;
} else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); char *msg = strdup(buf + 9);
LOGDEBUG("Upstreaming %s", msg); LOGDEBUG("Upstreaming %s", msg);

1
src/connector.h

@ -10,6 +10,7 @@
#ifndef CONNECTOR_H #ifndef CONNECTOR_H
#define CONNECTOR_H #define CONNECTOR_H
void connector_add_message(ckpool_t *ckp, json_t *val);
void *connector(void *arg); void *connector(void *arg);
#endif /* CONNECTOR_H */ #endif /* CONNECTOR_H */

10
src/stratifier.c

@ -27,6 +27,7 @@
#include "stratifier.h" #include "stratifier.h"
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
#include "connector.h"
#define MIN1 60 #define MIN1 60
#define MIN5 300 #define MIN5 300
@ -6582,8 +6583,6 @@ void stratifier_add_recv(ckpool_t *ckp, json_t *val)
static void ssend_process(ckpool_t *ckp, smsg_t *msg) static void ssend_process(ckpool_t *ckp, smsg_t *msg)
{ {
char *s;
if (unlikely(!msg->json_msg)) { if (unlikely(!msg->json_msg)) {
LOGERR("Sent null json msg to stratum_sender"); LOGERR("Sent null json msg to stratum_sender");
free(msg); free(msg);
@ -6593,10 +6592,9 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
/* Add client_id to the json message and send it to the /* Add client_id to the json message and send it to the
* connector process to be delivered */ * connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, JSON_COMPACT); connector_add_message(ckp, msg->json_msg);
send_proc(ckp->connector, s); /* The connector will free msg->json_msg */
free(s); free(msg);
free_smsg(msg);
} }
static void discard_json_params(json_params_t *jp) static void discard_json_params(json_params_t *jp)

2
src/stratifier.h

@ -10,8 +10,6 @@
#ifndef STRATIFIER_H #ifndef STRATIFIER_H
#define STRATIFIER_H #define STRATIFIER_H
#include "ckpool.h"
void stratifier_add_recv(ckpool_t *ckp, json_t *val); void stratifier_add_recv(ckpool_t *ckp, json_t *val);
void *stratifier(void *arg); void *stratifier(void *arg);

Loading…
Cancel
Save