diff --git a/configure.ac b/configure.ac index 3983bd0d..d183ca0b 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.8.1, kernel@kolivas.org) +AC_INIT(ckpool, 0.8.2, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) diff --git a/src/ckpool.c b/src/ckpool.c index 60374273..5f775fc5 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -404,6 +404,8 @@ int read_socket_line(connsock_t *cs, int timeout) while (42) { char readbuf[PAGESIZE] = {}; + int backoff = 1; + char *newbuf; ret = wait_read_select(fd, eom ? 0 : timeout); if (eom && !ret) @@ -422,7 +424,16 @@ int read_socket_line(connsock_t *cs, int timeout) goto out; } buflen = cs->bufofs + ret + 1; - cs->buf = realloc(cs->buf, buflen); + while (42) { + newbuf = realloc(cs->buf, buflen); + if (likely(newbuf)) + break; + if (backoff == 1) + fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); + cksleep_ms(backoff); + backoff <<= 1; + } + cs->buf = newbuf; if (unlikely(!cs->buf)) quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); memcpy(cs->buf + cs->bufofs, readbuf, ret); diff --git a/src/ckpool.h b/src/ckpool.h index 0b28aa40..83424fc5 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free diff --git a/src/connector.c b/src/connector.c index 0bfae189..811e20b1 100644 --- a/src/connector.c +++ b/src/connector.c @@ -238,7 +238,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) * count. */ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - client_instance_t *tmp; + client_instance_t *tmp, *client_delete = NULL; int ret; ret = drop_client(cdata, client); @@ -250,12 +250,15 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c * counts for them. */ ck_wlock(&cdata->lock); LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { + /* Don't free client ram when loop may still access it */ + dealloc(client_delete); if (!client->ref) { LL_DELETE(cdata->dead_clients, client); LOGINFO("Connector discarding client %ld", client->id); - free(client); + client_delete = client; } } + dealloc(client_delete); ck_wunlock(&cdata->lock); out: diff --git a/src/jansson-2.6/src/strbuffer.c b/src/jansson-2.6/src/strbuffer.c index c8abf46e..86be2b38 100644 --- a/src/jansson-2.6/src/strbuffer.c +++ b/src/jansson-2.6/src/strbuffer.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2009-2013 Petri Lehtinen + * Copyright (c) 2015 Con Kolivas * * Jansson is free software; you can redistribute it and/or modify * it under the terms of the MIT license. See LICENSE for details. @@ -11,6 +12,7 @@ #include #include +#include #include "jansson_private.h" #include "strbuffer.h" @@ -74,6 +76,7 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size) { if(size >= strbuff->size - strbuff->length) { + int backoff = 1; size_t new_size; char *new_value; @@ -86,9 +89,13 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size) new_size = max(strbuff->size * STRBUFFER_FACTOR, strbuff->length + size + 1); - new_value = realloc(strbuff->value, new_size); - if(!new_value) - return -1; + while (42) { + new_value = realloc(strbuff->value, new_size); + if (new_value) + break; + usleep(backoff * 1000); + backoff <<= 1; + } strbuff->value = new_value; strbuff->size = new_size; diff --git a/src/libckpool.c b/src/libckpool.c index 5d992e2e..f2937a05 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -589,7 +589,7 @@ int bind_socket(char *url, char *port) if (sockd > 0) break; } - if (sockd < 0) { + if (sockd < 1 || p == NULL) { LOGWARNING("Failed to open socket for %s:%s", url, port); goto out; } @@ -726,7 +726,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun if (likely(server_path)) { len = strlen(server_path); - if (unlikely(len < 1 || len > UNIX_PATH_MAX)) { + if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) { LOGERR("Invalid server path length %d in open_unix_server", len); goto out; } @@ -793,7 +793,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun if (likely(server_path)) { len = strlen(server_path); - if (unlikely(len < 1 || len > UNIX_PATH_MAX)) { + if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) { LOGERR("Invalid server path length %d in open_unix_client", len); goto out; } @@ -1351,6 +1351,30 @@ const int hex2bin_tbl[256] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, }; +bool _validhex(const char *buf, const char *file, const char *func, const int line) +{ + unsigned int i, slen; + bool ret = false; + + slen = strlen(buf); + if (!slen || slen % 2) { + LOGDEBUG("Invalid hex due to length %u from %s %s:%d", slen, file, func, line); + goto out; + } + for (i = 0; i < slen; i++) { + uchar idx = buf[i]; + + if (hex2bin_tbl[idx] == -1) { + LOGDEBUG("Invalid hex due to value %u at offset %d from %s %s:%d", + idx, i, file, func, line); + goto out; + } + } + ret = true; +out: + return ret; +} + /* Does the reverse of bin2hex but does not allocate any ram */ bool _hex2bin(void *vp, const void *vhexstr, size_t len, const char *file, const char *func, const int line) { diff --git a/src/libckpool.h b/src/libckpool.h index a52628fe..65fb4ae4 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -495,6 +495,8 @@ void _dealloc(void **ptr); extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); +bool _validhex(const char *buf, const char *file, const char *func, const int line); +#define validhex(buf) _validhex(buf, __FILE__, __func__, __LINE__) bool _hex2bin(void *p, const void *vhexstr, size_t len, const char *file, const char *func, const int line); #define hex2bin(p, vhexstr, len) _hex2bin(p, vhexstr, len, __FILE__, __func__, __LINE__) char *http_base64(const char *src); diff --git a/src/stratifier.c b/src/stratifier.c index 84f9a9a7..678062d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -171,7 +171,6 @@ struct user_instance { /* A linked list of all connected workers of this user */ worker_instance_t *worker_instances; - int workernames; /* How many different workernames exist */ int workers; @@ -648,7 +647,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) wb->network_diff = diff_from_nbits(wb->headerbin + 72); len = strlen(ckp->logdir) + 8 + 1 + 16 + 1; - wb->logdir = ckalloc(len); + wb->logdir = ckzalloc(len); /* In proxy mode, the wb->id is received in the notify update and * we set workbase_id from it. In server mode the stratifier is @@ -860,22 +859,26 @@ static void update_base(ckpool_t *ckp, int prio) create_pthread(pth, do_update, ur); } -/* Add a stratum instance to the dead instances list */ -static void __kill_instance(sdata_t *sdata, stratum_instance_t *client) +static void __add_dead(sdata_t *sdata, stratum_instance_t *client) { - user_instance_t *instance = client->user_instance; - - if (likely(instance)) - DL_DELETE(instance->instances, client); + LOGDEBUG("Adding dead instance %ld", client->id); LL_PREPEND(sdata->dead_instances, client); sdata->stats.dead++; } +static void __del_dead(sdata_t *sdata, stratum_instance_t *client) +{ + LOGDEBUG("Deleting dead instance %ld", client->id); + LL_DELETE(sdata->dead_instances, client); + sdata->stats.dead--; +} + static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) { + LOGDEBUG("Deleting disconnected instance %ld", client->id); HASH_DEL(sdata->disconnected_instances, client); sdata->stats.disconnected--; - __kill_instance(sdata, client); + __add_dead(sdata, client); } static void drop_allclients(ckpool_t *ckp) @@ -887,11 +890,13 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_DEL(sdata->stratum_instances, client); + __add_dead(sdata, client); sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } - HASH_ITER(hh, sdata->disconnected_instances, client, tmp) + HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { __del_disconnected(sdata, client); + } sdata->stats.users = sdata->stats.workers = 0; ck_wunlock(&sdata->instance_lock); } @@ -1127,36 +1132,40 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int return instance; } -/* Only supports a full ckpool instance sessionid with an 8 byte sessionid */ -static bool disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id) +static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id) { stratum_instance_t *instance, *tmp; - uint64_t session64; - bool ret = false; + uint64_t enonce1_64 = 0, ret = 0; + int slen; if (!sessionid) goto out; - if (strlen(sessionid) != 16) + slen = strlen(sessionid) / 2; + if (slen < 1 || slen > 8) + goto out; + + if (!validhex(sessionid)) goto out; + /* Number is in BE but we don't swap either of them */ - hex2bin(&session64, sessionid, 8); + hex2bin(&enonce1_64, sessionid, slen); ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, instance, tmp) { if (instance->id == id) continue; - if (instance->enonce1_64 == session64) { + if (instance->enonce1_64 == enonce1_64) { /* Only allow one connected instance per enonce1 */ goto out_unlock; } } instance = NULL; - HASH_FIND(hh, sdata->disconnected_instances, &session64, sizeof(uint64_t), instance); + HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance); if (instance) { - /* If we've found a matching disconnected instance, use it only - * once and discard it */ + /* Delete the entry once we are going to use it since there + * will be a new instance with the enonce1_64 */ __del_disconnected(sdata, instance); - ret = true; + ret = enonce1_64; } out_unlock: ck_wunlock(&sdata->instance_lock); @@ -1241,36 +1250,39 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) static void drop_client(sdata_t *sdata, int64_t id) { - stratum_instance_t *client, *tmp; + stratum_instance_t *client, *tmp, *client_delete = NULL; user_instance_t *instance = NULL; time_t now_t = time(NULL); ckpool_t *ckp = NULL; bool dec = false; - LOGINFO("Stratifier requested to drop client %ld", id); + LOGINFO("Stratifier dropping client %ld", id); ck_wlock(&sdata->instance_lock); client = __instance_by_id(sdata, id); - if (client) { + if (client && likely(!client->ref)) { stratum_instance_t *old_client = NULL; + instance = client->user_instance; if (client->authorised) { dec = true; client->authorised = false; + ckp = client->ckp; } HASH_DEL(sdata->stratum_instances, client); + if (instance) + DL_DELETE(instance->instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { + LOGDEBUG("Adding disconnected instance %ld", client->id); HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); sdata->stats.disconnected++; client->disconnected_time = time(NULL); - } else - __kill_instance(sdata, client); - ckp = client->ckp; - instance = client->user_instance; - LOGINFO("Stratifer dropped %sauthorised client %ld", dec ? "" : "un", id); + } else { + __add_dead(sdata, client); + } } /* Old disconnected instances will not have any valid shares so remove @@ -1279,26 +1291,33 @@ static void drop_client(sdata_t *sdata, int64_t id) HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { if (now_t - client->disconnected_time < 600) continue; - LOGINFO("Discarding aged disconnected instance %ld", client->id); + if (unlikely(client->ref)) + continue; + LOGINFO("Ageing disconnected instance %ld to dead", client->id); __del_disconnected(sdata, client); } /* Cull old unused clients lazily when there are no more reference * counts for them. */ LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { + /* We can't delete the ram safely in this loop, even if we can + * safely remove the entry from the linked list so we do it on + * the next pass through the loop. */ + if (client != client_delete) + dealloc(client_delete); if (!client->ref) { - LOGINFO("Stratifier discarding instance %ld", client->id); - LL_DELETE(sdata->dead_instances, client); - sdata->stats.dead--; - free(client->workername); - free(client->useragent); - free(client); + LOGINFO("Stratifier discarding dead instance %ld", client->id); + __del_dead(sdata, client); + dealloc(client->workername); + dealloc(client->useragent); + client_delete = client; } } + dealloc(client_delete); ck_wunlock(&sdata->instance_lock); /* Decrease worker count outside of instance_lock to avoid recursive - * locking. ckp and instance are guaranteed to be set if dec is true */ + * locking */ if (dec) dec_worker(ckp, instance); } @@ -1354,6 +1373,8 @@ static void reset_bestshares(sdata_t *sdata) ck_runlock(&sdata->instance_lock); } +/* Ram from blocks is NOT freed at all for now, only their entry is removed + * from the linked list, leaving a very small leak here and reject. */ static void block_solve(ckpool_t *ckp, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; @@ -1621,6 +1642,16 @@ out: return ret; } +/* Enter holding workbase_lock */ +static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client) +{ + if (wb->enonce1constlen) + memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); + memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); +} + /* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number * of bytes we have to work with when we are proxying with a split nonce2. * When the proxy space is less than 32 bits to work with, we look for an @@ -1669,11 +1700,7 @@ static bool new_enonce1(stratum_instance_t *client) } if (ret) client->enonce1_64 = sdata->enonce1u.u64; - if (wb->enonce1constlen) - memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); - memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); + __fill_enonce1data(wb, client); ck_wunlock(&sdata->workbase_lock); if (unlikely(!ret)) @@ -1719,9 +1746,13 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js buf = json_string_value(json_array_get(params_val, 1)); LOGDEBUG("Found old session id %s", buf); /* Add matching here */ - if (disconnected_sessionid_exists(sdata, buf, client_id)) { + if ((client->enonce1_64 = disconnected_sessionid_exists(sdata, buf, client_id))) { sprintf(client->enonce1, "%016lx", client->enonce1_64); old_match = true; + + ck_rlock(&sdata->workbase_lock); + __fill_enonce1data(sdata->current_workbase, client); + ck_runlock(&sdata->workbase_lock); } } } else @@ -1733,11 +1764,11 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js client->reject = 2; return json_string("proxy full"); } - LOGINFO("Set new subscription %ld to new enonce1 %s", client->id, - client->enonce1); + LOGINFO("Set new subscription %ld to new enonce1 %lx string %s", client->id, + client->enonce1_64, client->enonce1); } else { - LOGINFO("Set new subscription %ld to old matched enonce1 %s", client->id, - client->enonce1); + LOGINFO("Set new subscription %ld to old matched enonce1 %lx string %s", + client->id, client->enonce1_64, client->enonce1); } ck_rlock(&sdata->workbase_lock); @@ -1931,7 +1962,6 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, read_workerstats(ckp, worker); worker->start_time = time(NULL); client->worker_instance = worker; - instance->workernames++; } DL_APPEND(instance->instances, client); ck_wunlock(&sdata->instance_lock); @@ -2005,6 +2035,8 @@ static int send_recv_auth(stratum_instance_t *client) LOGINFO("Got ckdb response: %s", buf); if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { + if (cmdmatch(response, "failed")) + goto out; LOGWARNING("Got unparseable ckdb auth response: %s", buf); goto out_fail; } @@ -2987,13 +3019,14 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val * Remove this instance since the client id may well be * reused */ ck_wlock(&sdata->instance_lock); - HASH_DEL(sdata->stratum_instances, client); + if (likely(__instance_by_id(sdata, client_id))) + HASH_DEL(sdata->stratum_instances, client); + __add_dead(sdata, client); ck_wunlock(&sdata->instance_lock); - LOGINFO("Adding passthrough client %ld", client->id); + LOGNOTICE("Adding passthrough client %ld", client->id); snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); - free(client); goto out; } @@ -3221,6 +3254,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, client->user_instance->username); stratum_send_message(sdata, client, buf); + free(buf); } else { if (errnum < 0) stratum_send_message(sdata, client, "Authorisations temporarily offline :("); @@ -3539,15 +3573,15 @@ static void *statsupdate(void *arg) decay_time(&client->dsps60, 0, per_tdiff, 3600); decay_time(&client->dsps1440, 0, per_tdiff, 86400); decay_time(&client->dsps10080, 0, per_tdiff, 604800); + idle_workers++; if (per_tdiff > 600) client->idle = true; - idle_workers++; + continue; } } HASH_ITER(hh, sdata->user_instances, instance, tmpuser) { worker_instance_t *worker; - int iterations = 0; bool idle = false; if (!instance->authorised) @@ -3555,12 +3589,6 @@ static void *statsupdate(void *arg) /* Decay times per worker */ DL_FOREACH(instance->worker_instances, worker) { - /* Sanity check, should never happen */ - if (unlikely(iterations++ > instance->workernames)) { - LOGWARNING("Statsupdate trying to iterate more than %d existing workers for worker %s", - instance->workernames, worker->workername); - break; - } per_tdiff = tvdiff(&now, &worker->last_share); if (per_tdiff > 60) { decay_time(&worker->dsps1, 0, per_tdiff, 60);