kanoi 10 years ago
parent
commit
04c1d3c058
  1. 2
      configure.ac
  2. 15
      src/ckpool.c
  3. 2
      src/ckpool.h
  4. 7
      src/connector.c
  5. 11
      src/jansson-2.6/src/strbuffer.c
  6. 32
      src/libckpool.c
  7. 4
      src/libckpool.h
  8. 144
      src/stratifier.c

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.8.1, kernel@kolivas.org)
AC_INIT(ckpool, 0.8.2, kernel@kolivas.org)
AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4])

15
src/ckpool.c

@ -1,5 +1,5 @@
/*
* Copyright 2014 Con Kolivas
* Copyright 2014-2015 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
@ -404,6 +404,8 @@ int read_socket_line(connsock_t *cs, int timeout)
while (42) {
char readbuf[PAGESIZE] = {};
int backoff = 1;
char *newbuf;
ret = wait_read_select(fd, eom ? 0 : timeout);
if (eom && !ret)
@ -422,7 +424,16 @@ int read_socket_line(connsock_t *cs, int timeout)
goto out;
}
buflen = cs->bufofs + ret + 1;
cs->buf = realloc(cs->buf, buflen);
while (42) {
newbuf = realloc(cs->buf, buflen);
if (likely(newbuf))
break;
if (backoff == 1)
fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen);
cksleep_ms(backoff);
backoff <<= 1;
}
cs->buf = newbuf;
if (unlikely(!cs->buf))
quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen);
memcpy(cs->buf + cs->bufofs, readbuf, ret);

2
src/ckpool.h

@ -1,5 +1,5 @@
/*
* Copyright 2014 Con Kolivas
* Copyright 2014-2015 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free

7
src/connector.c

@ -238,7 +238,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
* count. */
static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{
client_instance_t *tmp;
client_instance_t *tmp, *client_delete = NULL;
int ret;
ret = drop_client(cdata, client);
@ -250,12 +250,15 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
* counts for them. */
ck_wlock(&cdata->lock);
LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
/* Don't free client ram when loop may still access it */
dealloc(client_delete);
if (!client->ref) {
LL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %ld", client->id);
free(client);
client_delete = client;
}
}
dealloc(client_delete);
ck_wunlock(&cdata->lock);
out:

11
src/jansson-2.6/src/strbuffer.c

@ -1,5 +1,6 @@
/*
* Copyright (c) 2009-2013 Petri Lehtinen <petri@digip.org>
* Copyright (c) 2015 Con Kolivas <kernel@kolivas.org>
*
* Jansson is free software; you can redistribute it and/or modify
* it under the terms of the MIT license. See LICENSE for details.
@ -11,6 +12,7 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "jansson_private.h"
#include "strbuffer.h"
@ -74,6 +76,7 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size)
{
if(size >= strbuff->size - strbuff->length)
{
int backoff = 1;
size_t new_size;
char *new_value;
@ -86,9 +89,13 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size)
new_size = max(strbuff->size * STRBUFFER_FACTOR,
strbuff->length + size + 1);
while (42) {
new_value = realloc(strbuff->value, new_size);
if(!new_value)
return -1;
if (new_value)
break;
usleep(backoff * 1000);
backoff <<= 1;
}
strbuff->value = new_value;
strbuff->size = new_size;

32
src/libckpool.c

@ -1,5 +1,5 @@
/*
* Copyright 2014 Con Kolivas
* Copyright 2014-2015 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
@ -589,7 +589,7 @@ int bind_socket(char *url, char *port)
if (sockd > 0)
break;
}
if (sockd < 0) {
if (sockd < 1 || p == NULL) {
LOGWARNING("Failed to open socket for %s:%s", url, port);
goto out;
}
@ -726,7 +726,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
if (likely(server_path)) {
len = strlen(server_path);
if (unlikely(len < 1 || len > UNIX_PATH_MAX)) {
if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) {
LOGERR("Invalid server path length %d in open_unix_server", len);
goto out;
}
@ -793,7 +793,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun
if (likely(server_path)) {
len = strlen(server_path);
if (unlikely(len < 1 || len > UNIX_PATH_MAX)) {
if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) {
LOGERR("Invalid server path length %d in open_unix_client", len);
goto out;
}
@ -1351,6 +1351,30 @@ const int hex2bin_tbl[256] = {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
};
bool _validhex(const char *buf, const char *file, const char *func, const int line)
{
unsigned int i, slen;
bool ret = false;
slen = strlen(buf);
if (!slen || slen % 2) {
LOGDEBUG("Invalid hex due to length %u from %s %s:%d", slen, file, func, line);
goto out;
}
for (i = 0; i < slen; i++) {
uchar idx = buf[i];
if (hex2bin_tbl[idx] == -1) {
LOGDEBUG("Invalid hex due to value %u at offset %d from %s %s:%d",
idx, i, file, func, line);
goto out;
}
}
ret = true;
out:
return ret;
}
/* Does the reverse of bin2hex but does not allocate any ram */
bool _hex2bin(void *vp, const void *vhexstr, size_t len, const char *file, const char *func, const int line)
{

4
src/libckpool.h

@ -1,5 +1,5 @@
/*
* Copyright 2014 Con Kolivas
* Copyright 2014-2015 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
@ -495,6 +495,8 @@ void _dealloc(void **ptr);
extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len);
bool _validhex(const char *buf, const char *file, const char *func, const int line);
#define validhex(buf) _validhex(buf, __FILE__, __func__, __LINE__)
bool _hex2bin(void *p, const void *vhexstr, size_t len, const char *file, const char *func, const int line);
#define hex2bin(p, vhexstr, len) _hex2bin(p, vhexstr, len, __FILE__, __func__, __LINE__)
char *http_base64(const char *src);

144
src/stratifier.c

@ -171,7 +171,6 @@ struct user_instance {
/* A linked list of all connected workers of this user */
worker_instance_t *worker_instances;
int workernames; /* How many different workernames exist */
int workers;
@ -648,7 +647,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
wb->network_diff = diff_from_nbits(wb->headerbin + 72);
len = strlen(ckp->logdir) + 8 + 1 + 16 + 1;
wb->logdir = ckalloc(len);
wb->logdir = ckzalloc(len);
/* In proxy mode, the wb->id is received in the notify update and
* we set workbase_id from it. In server mode the stratifier is
@ -860,22 +859,26 @@ static void update_base(ckpool_t *ckp, int prio)
create_pthread(pth, do_update, ur);
}
/* Add a stratum instance to the dead instances list */
static void __kill_instance(sdata_t *sdata, stratum_instance_t *client)
static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
{
user_instance_t *instance = client->user_instance;
if (likely(instance))
DL_DELETE(instance->instances, client);
LOGDEBUG("Adding dead instance %ld", client->id);
LL_PREPEND(sdata->dead_instances, client);
sdata->stats.dead++;
}
static void __del_dead(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Deleting dead instance %ld", client->id);
LL_DELETE(sdata->dead_instances, client);
sdata->stats.dead--;
}
static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Deleting disconnected instance %ld", client->id);
HASH_DEL(sdata->disconnected_instances, client);
sdata->stats.disconnected--;
__kill_instance(sdata, client);
__add_dead(sdata, client);
}
static void drop_allclients(ckpool_t *ckp)
@ -887,11 +890,13 @@ static void drop_allclients(ckpool_t *ckp)
ck_wlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
HASH_DEL(sdata->stratum_instances, client);
__add_dead(sdata, client);
sprintf(buf, "dropclient=%ld", client->id);
send_proc(ckp->connector, buf);
}
HASH_ITER(hh, sdata->disconnected_instances, client, tmp)
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
__del_disconnected(sdata, client);
}
sdata->stats.users = sdata->stats.workers = 0;
ck_wunlock(&sdata->instance_lock);
}
@ -1127,36 +1132,40 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int
return instance;
}
/* Only supports a full ckpool instance sessionid with an 8 byte sessionid */
static bool disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id)
static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id)
{
stratum_instance_t *instance, *tmp;
uint64_t session64;
bool ret = false;
uint64_t enonce1_64 = 0, ret = 0;
int slen;
if (!sessionid)
goto out;
if (strlen(sessionid) != 16)
slen = strlen(sessionid) / 2;
if (slen < 1 || slen > 8)
goto out;
if (!validhex(sessionid))
goto out;
/* Number is in BE but we don't swap either of them */
hex2bin(&session64, sessionid, 8);
hex2bin(&enonce1_64, sessionid, slen);
ck_wlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, instance, tmp) {
if (instance->id == id)
continue;
if (instance->enonce1_64 == session64) {
if (instance->enonce1_64 == enonce1_64) {
/* Only allow one connected instance per enonce1 */
goto out_unlock;
}
}
instance = NULL;
HASH_FIND(hh, sdata->disconnected_instances, &session64, sizeof(uint64_t), instance);
HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance);
if (instance) {
/* If we've found a matching disconnected instance, use it only
* once and discard it */
/* Delete the entry once we are going to use it since there
* will be a new instance with the enonce1_64 */
__del_disconnected(sdata, instance);
ret = true;
ret = enonce1_64;
}
out_unlock:
ck_wunlock(&sdata->instance_lock);
@ -1241,36 +1250,39 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance)
static void drop_client(sdata_t *sdata, int64_t id)
{
stratum_instance_t *client, *tmp;
stratum_instance_t *client, *tmp, *client_delete = NULL;
user_instance_t *instance = NULL;
time_t now_t = time(NULL);
ckpool_t *ckp = NULL;
bool dec = false;
LOGINFO("Stratifier requested to drop client %ld", id);
LOGINFO("Stratifier dropping client %ld", id);
ck_wlock(&sdata->instance_lock);
client = __instance_by_id(sdata, id);
if (client) {
if (client && likely(!client->ref)) {
stratum_instance_t *old_client = NULL;
instance = client->user_instance;
if (client->authorised) {
dec = true;
client->authorised = false;
ckp = client->ckp;
}
HASH_DEL(sdata->stratum_instances, client);
if (instance)
DL_DELETE(instance->instances, client);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) {
LOGDEBUG("Adding disconnected instance %ld", client->id);
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
client->disconnected_time = time(NULL);
} else
__kill_instance(sdata, client);
ckp = client->ckp;
instance = client->user_instance;
LOGINFO("Stratifer dropped %sauthorised client %ld", dec ? "" : "un", id);
} else {
__add_dead(sdata, client);
}
}
/* Old disconnected instances will not have any valid shares so remove
@ -1279,26 +1291,33 @@ static void drop_client(sdata_t *sdata, int64_t id)
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
if (now_t - client->disconnected_time < 600)
continue;
LOGINFO("Discarding aged disconnected instance %ld", client->id);
if (unlikely(client->ref))
continue;
LOGINFO("Ageing disconnected instance %ld to dead", client->id);
__del_disconnected(sdata, client);
}
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) {
/* We can't delete the ram safely in this loop, even if we can
* safely remove the entry from the linked list so we do it on
* the next pass through the loop. */
if (client != client_delete)
dealloc(client_delete);
if (!client->ref) {
LOGINFO("Stratifier discarding instance %ld", client->id);
LL_DELETE(sdata->dead_instances, client);
sdata->stats.dead--;
free(client->workername);
free(client->useragent);
free(client);
LOGINFO("Stratifier discarding dead instance %ld", client->id);
__del_dead(sdata, client);
dealloc(client->workername);
dealloc(client->useragent);
client_delete = client;
}
}
dealloc(client_delete);
ck_wunlock(&sdata->instance_lock);
/* Decrease worker count outside of instance_lock to avoid recursive
* locking. ckp and instance are guaranteed to be set if dec is true */
* locking */
if (dec)
dec_worker(ckp, instance);
}
@ -1354,6 +1373,8 @@ static void reset_bestshares(sdata_t *sdata)
ck_runlock(&sdata->instance_lock);
}
/* Ram from blocks is NOT freed at all for now, only their entry is removed
* from the linked list, leaving a very small leak here and reject. */
static void block_solve(ckpool_t *ckp, const char *blockhash)
{
ckmsg_t *block, *tmp, *found = NULL;
@ -1621,6 +1642,16 @@ out:
return ret;
}
/* Enter holding workbase_lock */
static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client)
{
if (wb->enonce1constlen)
memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen);
memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen);
__bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen);
__bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen);
}
/* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number
* of bytes we have to work with when we are proxying with a split nonce2.
* When the proxy space is less than 32 bits to work with, we look for an
@ -1669,11 +1700,7 @@ static bool new_enonce1(stratum_instance_t *client)
}
if (ret)
client->enonce1_64 = sdata->enonce1u.u64;
if (wb->enonce1constlen)
memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen);
memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen);
__bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen);
__bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen);
__fill_enonce1data(wb, client);
ck_wunlock(&sdata->workbase_lock);
if (unlikely(!ret))
@ -1719,9 +1746,13 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js
buf = json_string_value(json_array_get(params_val, 1));
LOGDEBUG("Found old session id %s", buf);
/* Add matching here */
if (disconnected_sessionid_exists(sdata, buf, client_id)) {
if ((client->enonce1_64 = disconnected_sessionid_exists(sdata, buf, client_id))) {
sprintf(client->enonce1, "%016lx", client->enonce1_64);
old_match = true;
ck_rlock(&sdata->workbase_lock);
__fill_enonce1data(sdata->current_workbase, client);
ck_runlock(&sdata->workbase_lock);
}
}
} else
@ -1733,11 +1764,11 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js
client->reject = 2;
return json_string("proxy full");
}
LOGINFO("Set new subscription %ld to new enonce1 %s", client->id,
client->enonce1);
LOGINFO("Set new subscription %ld to new enonce1 %lx string %s", client->id,
client->enonce1_64, client->enonce1);
} else {
LOGINFO("Set new subscription %ld to old matched enonce1 %s", client->id,
client->enonce1);
LOGINFO("Set new subscription %ld to old matched enonce1 %lx string %s",
client->id, client->enonce1_64, client->enonce1);
}
ck_rlock(&sdata->workbase_lock);
@ -1931,7 +1962,6 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
read_workerstats(ckp, worker);
worker->start_time = time(NULL);
client->worker_instance = worker;
instance->workernames++;
}
DL_APPEND(instance->instances, client);
ck_wunlock(&sdata->instance_lock);
@ -2005,6 +2035,8 @@ static int send_recv_auth(stratum_instance_t *client)
LOGINFO("Got ckdb response: %s", buf);
if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) {
if (cmdmatch(response, "failed"))
goto out;
LOGWARNING("Got unparseable ckdb auth response: %s", buf);
goto out_fail;
}
@ -2987,13 +3019,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
* Remove this instance since the client id may well be
* reused */
ck_wlock(&sdata->instance_lock);
if (likely(__instance_by_id(sdata, client_id)))
HASH_DEL(sdata->stratum_instances, client);
__add_dead(sdata, client);
ck_wunlock(&sdata->instance_lock);
LOGINFO("Adding passthrough client %ld", client->id);
LOGNOTICE("Adding passthrough client %ld", client->id);
snprintf(buf, 255, "passthrough=%ld", client->id);
send_proc(client->ckp->connector, buf);
free(client);
goto out;
}
@ -3221,6 +3254,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name,
client->user_instance->username);
stratum_send_message(sdata, client, buf);
free(buf);
} else {
if (errnum < 0)
stratum_send_message(sdata, client, "Authorisations temporarily offline :(");
@ -3539,15 +3573,15 @@ static void *statsupdate(void *arg)
decay_time(&client->dsps60, 0, per_tdiff, 3600);
decay_time(&client->dsps1440, 0, per_tdiff, 86400);
decay_time(&client->dsps10080, 0, per_tdiff, 604800);
idle_workers++;
if (per_tdiff > 600)
client->idle = true;
idle_workers++;
continue;
}
}
HASH_ITER(hh, sdata->user_instances, instance, tmpuser) {
worker_instance_t *worker;
int iterations = 0;
bool idle = false;
if (!instance->authorised)
@ -3555,12 +3589,6 @@ static void *statsupdate(void *arg)
/* Decay times per worker */
DL_FOREACH(instance->worker_instances, worker) {
/* Sanity check, should never happen */
if (unlikely(iterations++ > instance->workernames)) {
LOGWARNING("Statsupdate trying to iterate more than %d existing workers for worker %s",
instance->workernames, worker->workername);
break;
}
per_tdiff = tvdiff(&now, &worker->last_share);
if (per_tdiff > 60) {
decay_time(&worker->dsps1, 0, per_tdiff, 60);

Loading…
Cancel
Save