kanoi 10 years ago
parent
commit
ef33a2e4de
  1. 8
      src/ckpool.c
  2. 2
      src/ckpool.h
  3. 106
      src/connector.c
  4. 13
      src/jansson-2.6/src/dump.c
  5. 3
      src/jansson-2.6/src/jansson_private.h
  6. 25
      src/jansson-2.6/src/memory.c
  7. 5
      src/jansson-2.6/src/strbuffer.c
  8. 162
      src/stratifier.c
  9. 22
      src/utlist.h

8
src/ckpool.c

@ -276,7 +276,7 @@ static void *listener(void *arg)
proc_instance_t *pi = (proc_instance_t *)arg;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
char *buf = NULL;
char *buf = NULL, *msg;
int sockd;
rename_proc(pi->sockname);
@ -351,11 +351,13 @@ retry:
execv(ckp->initial_args[0], (char *const *)ckp->initial_args);
}
} else if (cmdmatch(buf, "stratifierstats")) {
char *msg;
LOGDEBUG("Listener received stratifierstats request");
msg = send_recv_proc(ckp->stratifier, "stats");
send_unix_msg(sockd, msg);
} else if (cmdmatch(buf, "connectorstats")) {
LOGDEBUG("Listener received connectorstats request");
msg = send_recv_proc(ckp->connector, "stats");
send_unix_msg(sockd, msg);
} else {
LOGINFO("Listener received unhandled message: %s", buf);
send_unix_msg(sockd, "unknown");

2
src/ckpool.h

@ -190,6 +190,8 @@ struct ckpool_instance {
#define CKP_STANDALONE(CKP) (true)
#endif
#define SAFE_HASH_OVERHEAD(HASHLIST) (HASHLIST ? HASH_OVERHEAD(hh, HASHLIST) : 0)
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);

106
src/connector.c

@ -36,6 +36,7 @@ struct client_instance {
/* For dead_clients list */
struct client_instance *next;
struct client_instance *prev;
struct sockaddr address;
char address_name[INET6_ADDRSTRLEN];
@ -82,12 +83,18 @@ struct connector_data {
/* Linked list of dead clients no longer in use but may still have references */
client_instance_t *dead_clients;
int clients_generated;
int dead_generated;
int64_t client_id;
/* For the linked list of pending sends */
sender_send_t *sender_sends;
sender_send_t *delayed_sends;
int64_t sends_generated;
int64_t sends_delayed;
/* For protecting the pending sends list */
pthread_mutex_t sender_lock;
pthread_cond_t sender_cond;
@ -193,6 +200,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
__inc_instance_ref(client);
ck_wlock(&cdata->lock);
cdata->clients_generated++;
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
@ -211,10 +219,11 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
if (fd != -1) {
Close(client->fd);
HASH_DEL(cdata->clients, client);
LL_PREPEND(cdata->dead_clients, client);
DL_APPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the
* epoll list. */
__dec_instance_ref(client);
cdata->dead_generated++;
}
ck_wunlock(&cdata->lock);
@ -249,9 +258,9 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
ck_wlock(&cdata->lock);
LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
if (!client->ref) {
LL_DELETE(cdata->dead_clients, client);
DL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %ld", client->id);
dealloc(client);
}
@ -494,6 +503,7 @@ void *sender(void *arg)
* This is the only function that alters it so no
* locking is required. Keep the client ref. */
DL_APPEND(cdata->delayed_sends, sender_send);
cdata->sends_delayed++;
continue;
}
sent = true;
@ -567,6 +577,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
sender_send->len = len;
mutex_lock(&cdata->sender_lock);
cdata->sends_generated++;
DL_APPEND(cdata->sender_sends, sender_send);
pthread_cond_signal(&cdata->sender_cond);
mutex_unlock(&cdata->sender_lock);
@ -626,12 +637,74 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_decref(json_msg);
}
static char *connector_stats(cdata_t *cdata)
{
json_t *val = json_object(), *subval;
client_instance_t *client;
int objects, generated;
sender_send_t *send;
int64_t memsize;
char *buf;
ck_rlock(&cdata->lock);
objects = HASH_COUNT(cdata->clients);
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects;
generated = cdata->clients_generated;
ck_runlock(&cdata->lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "clients", subval);
ck_rlock(&cdata->lock);
DL_COUNT(cdata->dead_clients, client, objects);
generated = cdata->dead_generated;
ck_runlock(&cdata->lock);
memsize = objects * sizeof(client_instance_t);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "dead", subval);
objects = 0;
memsize = 0;
mutex_lock(&cdata->sender_lock);
generated = cdata->sends_generated;
DL_FOREACH(cdata->sender_sends, send) {
objects++;
memsize += sizeof(sender_send_t) + send->len + 1;
}
mutex_unlock(&cdata->sender_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "sends", subval);
objects = 0;
memsize = 0;
mutex_lock(&cdata->sender_lock);
generated = cdata->sends_delayed;
DL_FOREACH(cdata->delayed_sends, send) {
objects++;
memsize += sizeof(sender_send_t) + send->len + 1;
}
mutex_unlock(&cdata->sender_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "delays", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
LOGNOTICE("Connector stats: %s", buf);
return buf;
}
static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{
int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
uint8_t test_cycle = 0;
char *buf = NULL;
do {
@ -646,15 +719,18 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
LOGWARNING("%s connector ready", ckp->name);
retry:
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
if (!++test_cycle) {
/* Test for pthread join every 256 messages */
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
}
}
Close(sockd);
@ -704,6 +780,12 @@ retry:
} else if (cmdmatch(buf, "reject")) {
LOGDEBUG("Connector received reject signal");
cdata->accept = false;
} else if (cmdmatch(buf, "stats")) {
char *msg;
LOGDEBUG("Connector received stats request");
msg = connector_stats(cdata);
send_unix_msg(sockd, msg);
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) {

13
src/jansson-2.6/src/dump.c

@ -1,5 +1,6 @@
/*
* Copyright (c) 2009-2013 Petri Lehtinen <petri@digip.org>
* Copyright (c) 2015 Con Kolivas <kernel@kolivas.org>
*
* Jansson is free software; you can redistribute it and/or modify
* it under the terms of the MIT license. See LICENSE for details.
@ -427,13 +428,6 @@ static int do_dump(const json_t *json, size_t flags, int depth,
}
}
char *json_dump_dup(const char *str, size_t flags)
{
if (flags & JSON_EOL)
return jsonp_eolstrdup(str);
return jsonp_strdup(str);
}
char *json_dumps(const json_t *json, size_t flags)
{
strbuffer_t strbuff;
@ -444,10 +438,11 @@ char *json_dumps(const json_t *json, size_t flags)
if(json_dump_callback(json, dump_to_strbuffer, (void *)&strbuff, flags))
result = NULL;
else if (flags & JSON_EOL)
result = jsonp_eolstrsteal(&strbuff);
else
result = json_dump_dup(strbuffer_value(&strbuff), flags);
result = jsonp_strsteal(&strbuff);
strbuffer_close(&strbuff);
return result;
}

3
src/jansson-2.6/src/jansson_private.h

@ -86,7 +86,8 @@ void _jsonp_free(void **ptr);
char *jsonp_strndup(const char *str, size_t length);
char *jsonp_strdup(const char *str);
char *jsonp_eolstrdup(const char *str);
char *jsonp_strsteal(strbuffer_t *strbuff);
char *jsonp_eolstrsteal(strbuffer_t *strbuff);
/* Windows compatibility */
#ifdef _WIN32

25
src/jansson-2.6/src/memory.c

@ -51,23 +51,22 @@ char *jsonp_strdup(const char *str)
return new_str;
}
char *jsonp_eolstrdup(const char *str)
char *jsonp_strsteal(strbuffer_t *strbuff)
{
char *new_str;
size_t len;
size_t len = strbuff->length + 1;
char *ret = realloc(strbuff->value, len);
len = strlen(str);
if(len == (size_t)-1)
return NULL;
return ret;
}
new_str = jsonp_malloc(len + 2);
if(!new_str)
return NULL;
char *jsonp_eolstrsteal(strbuffer_t *strbuff)
{
size_t len = strbuff->length + 2;
char *ret = realloc(strbuff->value, len);
memcpy(new_str, str, len);
new_str[len] = '\n';
new_str[len + 1] = '\0';
return new_str;
ret[strbuff->length] = '\n';
ret[strbuff->length + 1] = '\0';
return ret;
}
void json_set_alloc_funcs(json_malloc_t malloc_fn, json_free_t free_fn)

5
src/jansson-2.6/src/strbuffer.c

@ -16,7 +16,7 @@
#include "jansson_private.h"
#include "strbuffer.h"
#define STRBUFFER_MIN_SIZE 16
#define STRBUFFER_MIN_SIZE 4096
#define STRBUFFER_FACTOR 2
#define STRBUFFER_SIZE_MAX ((size_t)-1)
@ -74,7 +74,8 @@ int strbuffer_append_byte(strbuffer_t *strbuff, char byte)
int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size)
{
if(size >= strbuff->size - strbuff->length)
/* Leave room for EOL and NULL bytes */
if(size + 2 > strbuff->size - strbuff->length)
{
int backoff = 1;
size_t new_size;

162
src/stratifier.c

@ -524,6 +524,7 @@ static void clear_workbase(workbase_t *wb)
free(wb);
}
/* Remove all shares with a workbase id less than wb_id for block changes */
static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id)
{
share_t *share, *tmp;
@ -543,6 +544,26 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id)
LOGINFO("Cleared %d shares from share hashtable", purged);
}
/* Remove all shares with a workbase id == wb_id being discarded */
static void age_share_hashtable(sdata_t *sdata, int64_t wb_id)
{
share_t *share, *tmp;
int aged = 0;
ck_wlock(&sdata->share_lock);
HASH_ITER(hh, sdata->shares, share, tmp) {
if (share->workbase_id == wb_id) {
HASH_DEL(sdata->shares, share);
dealloc(share);
aged++;
}
}
ck_wunlock(&sdata->share_lock);
if (aged)
LOGINFO("Aged %d shares from share hashtable", aged);
}
static char *status_chars = "|/-\\";
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
@ -692,6 +713,8 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
HASH_ITER(hh, sdata->workbases, tmp, tmpa) {
if (HASH_COUNT(sdata->workbases) < 3)
break;
if (wb == tmp)
continue;
/* Age old workbases older than 10 minutes old */
if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) {
HASH_DEL(sdata->workbases, tmp);
@ -712,6 +735,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
* to prevent taking recursive locks */
if (aged) {
send_ageworkinfo(ckp, aged->id);
age_share_hashtable(sdata, aged->id);
clear_workbase(aged);
}
}
@ -873,7 +897,7 @@ static void update_base(ckpool_t *ckp, int prio)
static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Adding dead instance %ld", client->id);
LL_PREPEND(sdata->dead_instances, client);
DL_APPEND(sdata->dead_instances, client);
sdata->stats.dead++;
sdata->dead_generated++;
}
@ -881,7 +905,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
static void __del_dead(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Deleting dead instance %ld", client->id);
LL_DELETE(sdata->dead_instances, client);
DL_DELETE_INIT(sdata->dead_instances, client);
sdata->stats.dead--;
}
@ -1036,7 +1060,7 @@ static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client);
static void update_diff(ckpool_t *ckp)
{
stratum_instance_t *client;
stratum_instance_t *client, *tmp;
sdata_t *sdata = ckp->data;
double old_diff, diff;
json_t *val;
@ -1075,7 +1099,7 @@ static void update_diff(ckpool_t *ckp)
/* If the diff has dropped, iterate over all the clients and check
* they're at or below the new diff, and update it if not. */
ck_rlock(&sdata->instance_lock);
for (client = sdata->stratum_instances; client != NULL; client = client->hh.next) {
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->diff > diff) {
client->diff = diff;
stratum_send_diff(sdata, client);
@ -1122,7 +1146,7 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta
HASH_DEL(sdata->stratum_instances, client);
if (instance)
DL_DELETE(instance->instances, client);
DL_DELETE_INIT(instance->instances, client);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
@ -1308,6 +1332,7 @@ static void drop_client(sdata_t *sdata, int64_t id)
if (client) {
instance = client->user_instance;
if (client->authorised) {
client->authorised = false;
dec = true;
ckp = client->ckp;
}
@ -1333,7 +1358,7 @@ static void drop_client(sdata_t *sdata, int64_t id)
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) {
DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) {
if (!client->ref) {
LOGINFO("Stratifier discarding dead instance %ld", client->id);
__del_dead(sdata, client);
@ -1431,7 +1456,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
if (!strcmp(solvehash, blockhash)) {
dealloc(solvehash);
found = block;
DL_DELETE(sdata->block_solves, block);
DL_DELETE_INIT(sdata->block_solves, block);
break;
}
dealloc(solvehash);
@ -1478,7 +1503,7 @@ static void block_reject(sdata_t *sdata, const char *blockhash)
if (!strcmp(solvehash, blockhash)) {
dealloc(solvehash);
found = block;
DL_DELETE(sdata->block_solves, block);
DL_DELETE_INIT(sdata->block_solves, block);
break;
}
dealloc(solvehash);
@ -1512,8 +1537,6 @@ static void broadcast_ping(sdata_t *sdata)
stratum_broadcast(sdata, json_msg);
}
#define SAFE_HASH_OVERHEAD(HASHLIST) (HASHLIST ? HASH_OVERHEAD(hh, HASHLIST) : 0)
static void ckmsgq_stats(ckmsgq_t *ckmsgq, int size, json_t **val)
{
int objects, generated;
@ -1741,7 +1764,8 @@ static void *blockupdate(void *arg)
return NULL;
}
static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1)
/* Enter holding instance_lock */
static bool __enonce1_free(sdata_t *sdata, uint64_t enonce1)
{
stratum_instance_t *client, *tmp;
bool ret = true;
@ -1751,14 +1775,12 @@ static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1)
goto out;
}
ck_rlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
if (client->enonce1_64 == enonce1) {
ret = false;
break;
}
}
ck_runlock(&sdata->instance_lock);
out:
return ret;
}
@ -1781,13 +1803,19 @@ static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client)
static bool new_enonce1(stratum_instance_t *client)
{
sdata_t *sdata = client->ckp->data;
int enonce1varlen, i;
bool ret = false;
workbase_t *wb;
int i;
ck_wlock(&sdata->workbase_lock);
wb = sdata->current_workbase;
switch(wb->enonce1varlen) {
/* Extract the enonce1varlen from the current workbase which may be
* a different workbase to when we __fill_enonce1data but the value
* will not change and this avoids grabbing recursive locks */
ck_rlock(&sdata->workbase_lock);
enonce1varlen = sdata->current_workbase->enonce1varlen;
ck_runlock(&sdata->workbase_lock);
/* instance_lock protects sdata->enonce1u */
ck_wlock(&sdata->instance_lock);
switch(enonce1varlen) {
case 8:
sdata->enonce1u.u64++;
ret = true;
@ -1803,7 +1831,7 @@ static bool new_enonce1(stratum_instance_t *client)
case 2:
for (i = 0; i < 65536; i++) {
sdata->enonce1u.u16++;
ret = enonce1_free(sdata, sdata->enonce1u.u64);
ret = __enonce1_free(sdata, sdata->enonce1u.u64);
if (ret)
break;
}
@ -1811,18 +1839,21 @@ static bool new_enonce1(stratum_instance_t *client)
case 1:
for (i = 0; i < 256; i++) {
sdata->enonce1u.u8++;
ret = enonce1_free(sdata, sdata->enonce1u.u64);
ret = __enonce1_free(sdata, sdata->enonce1u.u64);
if (ret)
break;
}
break;
default:
quit(0, "Invalid enonce1varlen %d", wb->enonce1varlen);
quit(0, "Invalid enonce1varlen %d", enonce1varlen);
}
if (ret)
client->enonce1_64 = sdata->enonce1u.u64;
__fill_enonce1data(wb, client);
ck_wunlock(&sdata->workbase_lock);
ck_wunlock(&sdata->instance_lock);
ck_rlock(&sdata->workbase_lock);
__fill_enonce1data(sdata->current_workbase, client);
ck_runlock(&sdata->workbase_lock);
if (unlikely(!ret))
LOGWARNING("Enonce1 space exhausted! Proxy rejecting clients");
@ -2624,24 +2655,26 @@ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const
return ret;
}
/* Optimised for the common case where shares are new */
static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id)
{
share_t *share, *match = NULL;
bool ret = false;
share_t *share = ckzalloc(sizeof(share_t)), *match = NULL;
bool ret = true;
ck_wlock(&sdata->share_lock);
HASH_FIND(hh, sdata->shares, hash, 32, match);
if (match)
goto out_unlock;
share = ckzalloc(sizeof(share_t));
memcpy(share->hash, hash, 32);
share->workbase_id = wb_id;
ck_wlock(&sdata->share_lock);
sdata->shares_generated++;
HASH_ADD(hh, sdata->shares, hash, 32, share);
ret = true;
out_unlock:
HASH_FIND(hh, sdata->shares, hash, 32, match);
if (likely(!match))
HASH_ADD(hh, sdata->shares, hash, 32, share);
ck_wunlock(&sdata->share_lock);
if (unlikely(match)) {
dealloc(share);
ret = false;
}
return ret;
}
@ -2857,7 +2890,7 @@ out_unlock:
json_set_double(val, "sdiff", sdiff);
json_set_string(val, "hash", hexhash);
json_set_bool(val, "result", result);
json_object_set(val, "reject-reason", json_object_dup(json_msg, "reject-reason"));
json_object_set(val, "reject-reason", json_object_get(json_msg, "reject-reason"));
json_object_set(val, "error", *err_val);
json_set_int(val, "errn", err);
json_set_string(val, "createdate", cdfield);
@ -3095,24 +3128,18 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t
stratum_send_diff(sdata, client);
}
static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val,
json_t *method_val, json_t *params_val, char *address)
/* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, char *address)
{
stratum_instance_t *client;
const char *method;
char buf[256];
client = ref_instance_by_id(sdata, client_id);
if (unlikely(!client)) {
LOGINFO("Failed to find client id %ld in hashtable!", client_id);
return;
}
if (unlikely(client->reject == 2)) {
LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%ld", client->id);
snprintf(buf, 255, "dropclient=%ld", client_id);
send_proc(client->ckp->connector, buf);
goto out;
return;
}
/* Random broken clients send something not an integer as the id so we copy
@ -3124,7 +3151,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
/* Shouldn't happen, sanity check */
if (unlikely(!result_val)) {
LOGWARNING("parse_subscribe returned NULL result_val");
goto out;
return;
}
val = json_object();
json_object_set_new_nocheck(val, "result", result_val);
@ -3133,10 +3160,11 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
stratum_add_send(sdata, val, client_id);
if (likely(client->subscribed))
update_client(sdata, client, client_id);
goto out;
return;
}
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
LOGNOTICE("Adding passthrough client %ld", client_id);
/* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly.
* Remove this instance since the client id may well be
@ -3147,17 +3175,16 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
__add_dead(sdata, client);
ck_wunlock(&sdata->instance_lock);
LOGNOTICE("Adding passthrough client %ld", client->id);
snprintf(buf, 255, "passthrough=%ld", client->id);
snprintf(buf, 255, "passthrough=%ld", client_id);
send_proc(client->ckp->connector, buf);
goto out;
return;
}
if (cmdmatch(method, "mining.auth") && client->subscribed) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sauthq, jp);
goto out;
return;
}
/* We should only accept authorised requests from here on */
@ -3165,22 +3192,22 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
/* Dropping unauthorised clients here also allows the
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %ld", client->id);
snprintf(buf, 255, "dropclient=%ld", client->id);
LOGINFO("Dropping unauthorised client %ld", client_id);
snprintf(buf, 255, "dropclient=%ld", client_id);
send_proc(client->ckp->connector, buf);
goto out;
return;
}
if (cmdmatch(method, "mining.submit")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp);
goto out;
return;
}
if (cmdmatch(method, "mining.suggest")) {
suggest_diff(client, method, params_val);
goto out;
return;
}
/* Covers both get_transactions and get_txnhashes */
@ -3188,14 +3215,15 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp);
goto out;
return;
}
/* Unhandled message here */
out:
dec_instance_ref(sdata, client);
LOGINFO("Unhandled client %ld method %s", client_id, method);
return;
}
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg)
/* Entered with client holding ref count */
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client)
{
json_t *val = msg->json_msg, *id_val, *method, *params;
int64_t client_id = msg->client_id;
@ -3226,7 +3254,7 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg)
send_json_err(sdata, client_id, id_val, "-1:params not found");
goto out;
}
parse_method(sdata, client_id, id_val, method, params, msg->address);
parse_method(sdata, client, client_id, id_val, method, params, msg->address);
out:
json_decref(val);
free(msg);
@ -3235,6 +3263,7 @@ out:
static void srecv_process(ckpool_t *ckp, char *buf)
{
sdata_t *sdata = ckp->data;
stratum_instance_t *client;
smsg_t *msg;
json_t *val;
int server;
@ -3279,12 +3308,15 @@ static void srecv_process(ckpool_t *ckp, char *buf)
/* Parse the message here */
ck_wlock(&sdata->instance_lock);
/* client_id instance doesn't exist yet, create one */
if (!__instance_by_id(sdata, msg->client_id))
__stratum_add_instance(ckp, msg->client_id, server);
client = __instance_by_id(sdata, msg->client_id);
/* If client_id instance doesn't exist yet, create one */
if (unlikely(!client))
client = __stratum_add_instance(ckp, msg->client_id, server);
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock);
parse_instance_msg(sdata, msg);
parse_instance_msg(sdata, msg, client);
dec_instance_ref(sdata, client);
out:
free(buf);
}

22
src/utlist.h

@ -572,6 +572,28 @@ do {
} \
} while (0)
#define DL_DELETE_INIT(head,del) \
DL_DELETE3(head,del,prev,next)
#define DL_DELETE3(head,del,prev,next) \
do { \
assert((del)->prev != NULL); \
if ((del)->prev == (del)) { \
(head)=NULL; \
} else if ((del)==(head)) { \
(del)->next->prev = (del)->prev; \
(head) = (del)->next; \
} else { \
(del)->prev->next = (del)->next; \
if ((del)->next) { \
(del)->next->prev = (del)->prev; \
} else { \
(head)->prev = (del)->prev; \
} \
} \
(del)->prev = (del)->next = NULL; \
} while (0)
#define DL_COUNT(head,el,counter) \
DL_COUNT2(head,el,counter,next) \

Loading…
Cancel
Save