Browse Source

Pass the generator json structures avoiding duplicating codec

master
Con Kolivas 9 years ago
parent
commit
0d8746249e
  1. 29
      src/connector.c
  2. 31
      src/generator.c
  3. 1
      src/generator.h

29
src/connector.c

@ -21,6 +21,7 @@
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
#include "stratifier.h" #include "stratifier.h"
#include "generator.h"
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
@ -363,15 +364,11 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client)
{ {
json_t *val; json_t *val;
char *s;
JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address",
client->address_name, "server", client->server, "method", "mining.term", client->address_name, "server", client->server, "method", "mining.term",
"params"); "params");
s = json_dumps(val, JSON_COMPACT); generator_add_send(ckp, val);
json_decref(val);
send_proc(ckp->generator, s);
free(s);
} }
static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) static void stratifier_drop_id(ckpool_t *ckp, const int64_t id)
@ -509,8 +506,6 @@ reparse:
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
return false; return false;
} else { } else {
char *s;
if (client->passthrough) { if (client->passthrough) {
int64_t passthrough_id; int64_t passthrough_id;
@ -524,22 +519,18 @@ reparse:
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "address", json_string(client->address_name));
} }
json_object_set_new_nocheck(val, "server", json_integer(client->server)); json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, JSON_COMPACT);
/* Do not send messages of clients we've already dropped. We /* Do not send messages of clients we've already dropped. We
* do this unlocked as the occasional false negative can be * do this unlocked as the occasional false negative can be
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(!client->invalid)) { if (likely(!client->invalid)) {
if (!ckp->passthrough || ckp->node) { if (!ckp->passthrough)
stratifier_add_recv(ckp, val); stratifier_add_recv(ckp, val);
val = NULL; if (ckp->node)
} stratifier_add_recv(ckp, json_deep_copy(val));
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); generator_add_send(ckp, val);
} } else
free(s);
if (val)
json_decref(val); json_decref(val);
} }
client->bufofs -= buflen; client->bufofs -= buflen;
@ -1357,7 +1348,11 @@ retry:
LOGDEBUG("Connector received message: %s", buf); LOGDEBUG("Connector received message: %s", buf);
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (cmdmatch(buf, "upstream=")) { if (likely(buf[0] == '{')) {
json_t *val = json_loads(buf, JSON_DISABLE_EOF_CHECK, NULL);
ckmsgq_add(cdata->cmpq, val);
} else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); char *msg = strdup(buf + 9);
LOGDEBUG("Upstreaming %s", msg); LOGDEBUG("Upstreaming %s", msg);

31
src/generator.c

@ -163,6 +163,8 @@ struct generator_data {
mutex_t share_lock; mutex_t share_lock;
share_msg_t *shares; share_msg_t *shares;
int64_t share_id; int64_t share_id;
proxy_instance_t *current_proxy;
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -1731,16 +1733,39 @@ out:
free(pm); free(pm);
} }
static void passthrough_add_send(proxy_instance_t *proxy, const char *msg) static void passthrough_add_send(proxy_instance_t *proxy, char *msg)
{ {
pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t));
pm->proxy = proxy; pm->proxy = proxy;
pm->cs = &proxy->cs; pm->cs = &proxy->cs;
ASPRINTF(&pm->msg, "%s\n", msg); pm->msg = msg;
ckmsgq_add(proxy->passsends, pm); ckmsgq_add(proxy->passsends, pm);
} }
void generator_add_send(ckpool_t *ckp, json_t *val)
{
gdata_t *gdata = ckp->gdata;
char *buf;
if (!ckp->passthrough) {
submit_share(gdata, val);
return;
}
if (unlikely(!gdata->current_proxy)) {
LOGWARNING("No current proxy to send passthrough data to");
goto out;
}
buf = json_dumps(val, JSON_COMPACT | JSON_EOL);
if (unlikely(!buf)) {
LOGWARNING("Unable to decode json in generator_add_send");
goto out;
}
passthrough_add_send(gdata->current_proxy, buf);
out:
json_decref(val);
}
static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs,
bool pinging) bool pinging)
{ {
@ -2692,7 +2717,7 @@ reconnect:
if (!cproxy) if (!cproxy)
goto out; goto out;
if (proxi != cproxy) { if (proxi != cproxy) {
proxi = cproxy; gdata->current_proxy = proxi = cproxy;
LOGWARNING("Successfully connected to pool %d %s as proxy%s", LOGWARNING("Successfully connected to pool %d %s as proxy%s",
proxi->id, proxi->url, ckp->passthrough ? " in passthrough mode" : ""); proxi->id, proxi->url, ckp->passthrough ? " in passthrough mode" : "");
} }

1
src/generator.h

@ -12,6 +12,7 @@
#include "config.h" #include "config.h"
void generator_add_send(ckpool_t *ckp, json_t *val);
void *generator(void *arg); void *generator(void *arg);
#endif /* GENERATOR_H */ #endif /* GENERATOR_H */

Loading…
Cancel
Save