From 2980ff40329c99cc1309f86b165d93097b0dd58c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:01:34 +1100 Subject: [PATCH 01/56] Only give unparseable auth message when failed message is not recognised --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 84f9a9a7..43512fdc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2005,7 +2005,8 @@ static int send_recv_auth(stratum_instance_t *client) LOGINFO("Got ckdb response: %s", buf); if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { - LOGWARNING("Got unparseable ckdb auth response: %s", buf); + if (!cmdmatch(response, "failed")) + LOGWARNING("Got unparseable ckdb auth response: %s", buf); goto out_fail; } cmd = response; From b1f7cd833d0090a4d916493fe083b456eedb49cf Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:04:13 +1100 Subject: [PATCH 02/56] Give an outright fail response if failed msg is received from ckdb on auth --- src/stratifier.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 43512fdc..241cc981 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2005,8 +2005,9 @@ static int send_recv_auth(stratum_instance_t *client) LOGINFO("Got ckdb response: %s", buf); if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { - if (!cmdmatch(response, "failed")) - LOGWARNING("Got unparseable ckdb auth response: %s", buf); + if (cmdmatch(response, "failed")) + goto out; + LOGWARNING("Got unparseable ckdb auth response: %s", buf); goto out_fail; } cmd = response; From 9b56c0c54620e42a356e6d104cccd12e27189c8d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:24:42 +1100 Subject: [PATCH 03/56] Fill enonce1 data properly for stratum reconnect support --- src/stratifier.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 241cc981..a3715dfb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1621,6 +1621,16 @@ out: return ret; } +/* Enter holding workbase_lock */ +static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client) +{ + if (wb->enonce1constlen) + memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); + memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); +} + /* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number * of bytes we have to work with when we are proxying with a split nonce2. * When the proxy space is less than 32 bits to work with, we look for an @@ -1669,11 +1679,7 @@ static bool new_enonce1(stratum_instance_t *client) } if (ret) client->enonce1_64 = sdata->enonce1u.u64; - if (wb->enonce1constlen) - memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); - memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); + __fill_enonce1data(wb, client); ck_wunlock(&sdata->workbase_lock); if (unlikely(!ret)) @@ -1722,6 +1728,10 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js if (disconnected_sessionid_exists(sdata, buf, client_id)) { sprintf(client->enonce1, "%016lx", client->enonce1_64); old_match = true; + + ck_rlock(&sdata->workbase_lock); + __fill_enonce1data(sdata->current_workbase, client); + ck_runlock(&sdata->workbase_lock); } } } else From 8964743cd1306be027646df1fb00206c9f6922a7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 22:04:18 +1100 Subject: [PATCH 04/56] Disable resume support till dereference issue in disconnected_instances resolved --- src/stratifier.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index a3715dfb..3a5f26b3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1243,7 +1243,6 @@ static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client, *tmp; user_instance_t *instance = NULL; - time_t now_t = time(NULL); ckpool_t *ckp = NULL; bool dec = false; @@ -1252,14 +1251,16 @@ static void drop_client(sdata_t *sdata, int64_t id) ck_wlock(&sdata->instance_lock); client = __instance_by_id(sdata, id); if (client) { - stratum_instance_t *old_client = NULL; - if (client->authorised) { dec = true; client->authorised = false; } HASH_DEL(sdata->stratum_instances, client); +#if 0 + /* Disable resume support till debugged properly */ + stratum_instance_t *old_client = NULL; + HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { @@ -1267,12 +1268,16 @@ static void drop_client(sdata_t *sdata, int64_t id) sdata->stats.disconnected++; client->disconnected_time = time(NULL); } else +#endif __kill_instance(sdata, client); ckp = client->ckp; instance = client->user_instance; LOGINFO("Stratifer dropped %sauthorised client %ld", dec ? "" : "un", id); } +#if 0 + time_t now_t = time(NULL); + /* Old disconnected instances will not have any valid shares so remove * them from the disconnected instances list if they've been dead for * more than 10 minutes */ @@ -1282,6 +1287,7 @@ static void drop_client(sdata_t *sdata, int64_t id) LOGINFO("Discarding aged disconnected instance %ld", client->id); __del_disconnected(sdata, client); } +#endif /* Cull old unused clients lazily when there are no more reference * counts for them. */ From f1f0290074b57d9a1553e5203411363f0c67a470 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 15 Jan 2015 19:08:31 +1100 Subject: [PATCH 05/56] ckdb - enable, optionally automatic but off by default, workmarkers processing --- src/ckdb.c | 101 ++---------- src/ckdb.h | 65 ++++++-- src/ckdb_cmd.c | 14 +- src/ckdb_data.c | 131 ++++++++++++++- src/ckdb_dbio.c | 425 ++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 621 insertions(+), 115 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 8fa5580d..91bb3fdc 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -169,6 +169,9 @@ static char *status_chars = "|/-\\"; static char *restorefrom; +// Only accessed in here +static bool markersummary_auto; + // disallow: '/' '.' '_' and FLDSEP const char *userpatt = "^[^/\\._"FLDSEPSTR"]*$"; const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.-]*[A-Za-z0-9]$"; @@ -1076,82 +1079,6 @@ static void alloc_storage() marks_root = new_ktree(); } -static void free_workinfo_data(K_ITEM *item) -{ - WORKINFO *workinfo; - - DATA_WORKINFO(workinfo, item); - if (workinfo->transactiontree) - FREENULL(workinfo->transactiontree); - if (workinfo->merklehash) - FREENULL(workinfo->merklehash); -} - -static void free_sharesummary_data(K_ITEM *item) -{ - SHARESUMMARY *sharesummary; - - DATA_SHARESUMMARY(sharesummary, item); - if (sharesummary->workername) { - LIST_MEM_SUB(sharesummary_free, sharesummary->workername); - FREENULL(sharesummary->workername); - } - SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); - SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); - SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); - SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); - SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); - SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); -} - -static void free_optioncontrol_data(K_ITEM *item) -{ - OPTIONCONTROL *optioncontrol; - - DATA_OPTIONCONTROL(optioncontrol, item); - if (optioncontrol->optionvalue) - FREENULL(optioncontrol->optionvalue); -} - -static void free_markersummary_data(K_ITEM *item) -{ - MARKERSUMMARY *markersummary; - - DATA_MARKERSUMMARY(markersummary, item); - if (markersummary->workername) - FREENULL(markersummary->workername); - SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY); - SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY); - SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY); - SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY); - SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY); - SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY); -} - -static void free_workmarkers_data(K_ITEM *item) -{ - WORKMARKERS *workmarkers; - - DATA_WORKMARKERS(workmarkers, item); - if (workmarkers->poolinstance) - FREENULL(workmarkers->poolinstance); - if (workmarkers->description) - FREENULL(workmarkers->description); -} - -static void free_marks_data(K_ITEM *item) -{ - MARKS *marks; - - DATA_MARKS(marks, item); - if (marks->poolinstance && marks->poolinstance != EMPTY) - FREENULL(marks->poolinstance); - if (marks->description && marks->description != EMPTY) - FREENULL(marks->description); - if (marks->extra && marks->extra != EMPTY) - FREENULL(marks->extra); -} - #define FREE_TREE(_tree) \ if (_tree ## _root) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \ @@ -2081,8 +2008,8 @@ static void make_a_shift_mark() K_ITEM wi_look, ss_look; SHARESUMMARY *sharesummary, looksharesummary; WORKINFO *workinfo, lookworkinfo; - BLOCKS *blocks; - MARKS *marks, *sh_marks; + BLOCKS *blocks = NULL; + MARKS *marks = NULL, *sh_marks = NULL; int64_t ss_age_wid, last_marks_wid, marks_wid, prev_wid; bool was_block = false, ok; char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ]; @@ -2197,7 +2124,8 @@ static void make_a_shift_mark() if (m_item) { /* First block after the last mark - * Shift must stop at or before this */ + * Shift must stop at or before this + * N.B. any block, even 'New' */ K_RLOCK(blocks_free); b_item = first_in_ktree(blocks_root, b_ctx); while (b_item) { @@ -2474,16 +2402,16 @@ static void *marker(__maybe_unused void *arg) else make_a_workmarker(); -#if 0 for (i = 0; i < 4; i++) { if (!everyone_die) sleep(1); } if (everyone_die) break; - else - make_markersummaries(); -#endif + else { + if (markersummary_auto) + make_markersummaries(false, NULL, NULL, NULL, NULL, NULL); + } } marker_using_data = false; @@ -4044,6 +3972,8 @@ static struct option long_options[] = { { "help", no_argument, 0, 'h' }, { "killold", no_argument, 0, 'k' }, { "loglevel", required_argument, 0, 'l' }, + // markersummary = enable markersummary auto generation + { "markersummary", no_argument, 0, 'm' }, { "name", required_argument, 0, 'n' }, { "dbpass", required_argument, 0, 'p' }, { "btc-pass", required_argument, 0, 'P' }, @@ -4088,7 +4018,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "c:d:hkl:mn:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'c': ckp.config = strdup(optarg); @@ -4126,6 +4056,9 @@ int main(int argc, char **argv) LOG_EMERG, LOG_DEBUG, ckp.loglevel); } break; + case 'm': + markersummary_auto = true; + break; case 'n': ckp.name = strdup(optarg); break; diff --git a/src/ckdb.h b/src/ckdb.h index 6206887d..341ced0c 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.822" +#define CKDB_VERSION DB_VERSION"-0.830" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -523,6 +523,34 @@ enum cmd_values { _row->pointers = _row->pointers; \ } while (0) +/* Override _row defaults if transfer fields are present + * We don't care about the reply so it can be small + * This is the pointer version - only one required so far */ +#define MODIFYDATETRANSFER(_list, _root, _row) do { \ + if (_root) { \ + char __reply[16]; \ + size_t __siz = sizeof(__reply); \ + K_ITEM *__item; \ + TRANSFER *__transfer; \ + __item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \ + } \ + __item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \ + } \ + __item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \ + } \ + _row->pointers = _row->pointers; \ + } \ + } while (0) + #define SIMPLEDATECONTROL ",createdate,createby,createcode,createinet" #define SIMPLEDATECOUNT 4 #define SIMPLEDATECONTROLFIELDS \ @@ -1480,6 +1508,14 @@ extern PGconn *dbconnect(); // *** ckdb_data.c *** // *** +// Data free functions (first) +extern void free_workinfo_data(K_ITEM *item); +extern void free_sharesummary_data(K_ITEM *item); +extern void free_optioncontrol_data(K_ITEM *item); +extern void free_markersummary_data(K_ITEM *item); +extern void free_workmarkers_data(K_ITEM *item); +extern void free_marks_data(K_ITEM *item); + extern char *safe_text(char *txt); extern void username_trim(USERS *users); extern bool like_address(char *username); @@ -1629,10 +1665,12 @@ extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate); extern void dsp_markersummary(K_ITEM *item, FILE *stream); extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b); -extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, - char *workername); extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername, K_TREE_CTX *ctx); +extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, + char *workername); +extern bool make_markersummaries(bool msg, char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root); extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); @@ -1750,6 +1788,9 @@ extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *error, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); +extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, + char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root); #define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ _sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ WHERE_FFL_HERE) @@ -1796,20 +1837,22 @@ extern bool workerstats_add(char *poolinstance, char *elapsed, char *username, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); extern bool userstats_fill(PGconn *conn); +extern bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root); extern bool markersummary_fill(PGconn *conn); -#define workmarkers_process(_conn, _add, _markerid, _poolinstance, \ +#define workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \ _workinfoidend, _workinfoidstart, _description, \ _status, _by, _code, _inet, _cd, _trf_root) \ - _workmarkers_process(_conn, _add, _markerid, _poolinstance, \ + _workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \ _workinfoidend, _workinfoidstart, _description, \ _status, _by, _code, _inet, _cd, _trf_root, \ WHERE_FFL_HERE) -extern bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, - char *poolinstance, int64_t workinfoidend, - int64_t workinfoidstart, char *description, - char *status, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root, - WHERE_FFL_ARGS); +extern bool _workmarkers_process(PGconn *conn, bool already, bool add, + int64_t markerid, char *poolinstance, + int64_t workinfoidend, int64_t workinfoidstart, + char *description, char *status, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root, WHERE_FFL_ARGS); extern bool workmarkers_fill(PGconn *conn); #define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \ _extra, _marktype, _status, _by, _code, _inet, _cd, \ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 2ec947a6..233b36bf 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -4092,7 +4092,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char * } /* Socket interface to the functions that will be used later to automatically - * create marks, workmarkers and process the workmarkers + * create marks, workmarkers and process the workmarkers and sharesummaries * to generate markersummaries */ static char *cmd_marks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, @@ -4372,7 +4372,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, DATA_WORKMARKERS(workmarkers, wm_item); if (CURRENT(&(workmarkers->expirydate)) && !WMPROCESSED(workmarkers->status)) { - ok = workmarkers_process(conn, false, + ok = workmarkers_process(conn, false, false, workmarkers->markerid, NULL, 0, 0, NULL, NULL, by, code, inet, cd, trf_root); @@ -4391,6 +4391,16 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, "%d workmarkers expunged", count); } } + } else if (strcasecmp(action, "sum") == 0) { + /* For the last available workmarker, + * summarise it's sharesummaries into markersummaries + * No parameters */ + ok = make_markersummaries(true, by, code, inet, cd, trf_root); + if (!ok) { + snprintf(reply, siz, "%s failed", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s.%s", id, reply); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index be92fbd0..abb1b40f 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -9,6 +9,83 @@ #include "ckdb.h" +// Data free functions (added here as needed) +void free_workinfo_data(K_ITEM *item) +{ + WORKINFO *workinfo; + + DATA_WORKINFO(workinfo, item); + if (workinfo->transactiontree) + FREENULL(workinfo->transactiontree); + if (workinfo->merklehash) + FREENULL(workinfo->merklehash); +} + +void free_sharesummary_data(K_ITEM *item) +{ + SHARESUMMARY *sharesummary; + + DATA_SHARESUMMARY(sharesummary, item); + if (sharesummary->workername) { + LIST_MEM_SUB(sharesummary_free, sharesummary->workername); + FREENULL(sharesummary->workername); + } + SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); + SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); + SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); + SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); + SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); + SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); +} + +void free_optioncontrol_data(K_ITEM *item) +{ + OPTIONCONTROL *optioncontrol; + + DATA_OPTIONCONTROL(optioncontrol, item); + if (optioncontrol->optionvalue) + FREENULL(optioncontrol->optionvalue); +} + +void free_markersummary_data(K_ITEM *item) +{ + MARKERSUMMARY *markersummary; + + DATA_MARKERSUMMARY(markersummary, item); + if (markersummary->workername) + FREENULL(markersummary->workername); + SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY); + SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY); + SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY); + SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY); + SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY); + SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY); +} + +void free_workmarkers_data(K_ITEM *item) +{ + WORKMARKERS *workmarkers; + + DATA_WORKMARKERS(workmarkers, item); + if (workmarkers->poolinstance) + FREENULL(workmarkers->poolinstance); + if (workmarkers->description) + FREENULL(workmarkers->description); +} + +void free_marks_data(K_ITEM *item) +{ + MARKS *marks; + + DATA_MARKS(marks, item); + if (marks->poolinstance && marks->poolinstance != EMPTY) + FREENULL(marks->poolinstance); + if (marks->description && marks->description != EMPTY) + FREENULL(marks->description); + if (marks->extra && marks->extra != EMPTY) + FREENULL(marks->extra); +} + // Clear text printable version of txt up to first '\0' char *safe_text(char *txt) { @@ -2390,6 +2467,58 @@ K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername) return ms_item; } +bool make_markersummaries(bool msg, char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root) +{ + K_TREE_CTX ctx[1]; + WORKMARKERS *workmarkers; + K_ITEM *wm_item, *wm_last = NULL; + tv_t now; + + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); + while (wm_item) { + DATA_WORKMARKERS(workmarkers, wm_item); + if (!CURRENT(&(workmarkers->expirydate))) + break; + // find the oldest READY workinfoid + if (WMREADY(workmarkers->status)) + wm_last = wm_item; + wm_item = prev_in_ktree(ctx); + } + K_RUNLOCK(workmarkers_free); + + if (!wm_last) { + if (!msg) + LOGDEBUG("%s() no READY workmarkers", __func__); + else + LOGWARNING("%s() no READY workmarkers", __func__); + return false; + } + + DATA_WORKMARKERS(workmarkers, wm_last); + + LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/" + "Stt %"PRId64"/%s/%s", + __func__, workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, workmarkers->workinfoidstart, + workmarkers->description, workmarkers->status); + + if (by == NULL) + by = (char *)by_default; + if (code == NULL) + code = (char *)__func__; + if (inet == NULL) + inet = (char *)inet_default; + if (cd) + copy_tv(&now, cd); + else + setnow(&now); + + return sharesummaries_to_markersummaries(NULL, workmarkers, by, code, + inet, &now, trf_root); +} + void dsp_workmarkers(K_ITEM *item, FILE *stream) { WORKMARKERS *wm; @@ -2573,7 +2702,7 @@ static bool gen_workmarkers(PGconn *conn, MARKS *stt, bool after, MARKS *fin, stt->description, after ? "++" : "", fin->description, before ? "--" : ""); - ok = workmarkers_process(conn, true, 0, EMPTY, + ok = workmarkers_process(conn, false, true, 0, EMPTY, wi_fin->workinfoid, wi_stt->workinfoid, description, MARKER_READY_STR, by, code, inet, cd, trf_root); diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 9245f968..c4385982 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -1,5 +1,5 @@ /* - * Copyright 1995-2014 Andrew Smith + * Copyright 1995-2015 Andrew Smith * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -168,7 +168,8 @@ char *pqerrmsg(PGconn *conn) #define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15" #define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16" #define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22" -#define PQPARAM27 PQPARAM22 ",$23,$24,$25,$26,$27" +#define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26" +#define PQPARAM27 PQPARAM26 ",$27" #define PARCHK(_par, _params) do { \ if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \ @@ -2751,6 +2752,318 @@ bool shareerrors_fill() return true; } +/* TODO: what to do about a failure? + * since it will repeat every ~13s + * Of course manual intervention is possible via cmd_marks, + * so that is probably the best solution since + * we should be watching the pool all the time :) + * The cause would most likely be either a code bug or a DB problem + * so there many be no obvious automated fix + * and flagging the workmarkers to be skipped may or may not be the solution, + * thus manual intervention will be the rule for now */ +bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, + char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root) +{ + // shorter name for log messages + const char *shortname = "SS_to_MS"; + ExecStatusType rescode; + PGresult *res; + K_TREE_CTX ss_ctx[1], ms_ctx[1]; + SHARESUMMARY *sharesummary, looksharesummary; + MARKERSUMMARY *markersummary, lookmarkersummary; + K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look; + bool ok = false, conned = false; + int64_t diffacc, shareacc; + char *reason = NULL, *tuples = NULL; + char *params[2]; + int n, par = 0, deleted = -7; + int ss_count, ms_count; + char *del; + + LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, workmarkers->workinfoidstart, + workmarkers->description, workmarkers->status); + + K_STORE *old_sharesummary_store = k_new_store(sharesummary_free); + K_STORE *new_markersummary_store = k_new_store(markersummary_free); + K_TREE *ms_root = new_ktree(); + + if (!CURRENT(&(workmarkers->expirydate))) { + reason = "unexpired"; + goto flail; + } + + if (!WMREADY(workmarkers->status)) { + reason = "not ready"; + goto flail; + } + + // Check there aren't already any matching markersummaries + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = 0; + lookmarkersummary.workername = EMPTY; + + INIT_MARKERSUMMARY(&ms_look); + ms_look.data = (void *)(&lookmarkersummary); + K_RLOCK(markersummary_free); + ms_item = find_after_in_ktree(markersummary_root, &ms_look, + cmp_markersummary, ms_ctx); + K_RUNLOCK(markersummary_free); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + if (ms_item && markersummary->markerid == workmarkers->markerid) { + reason = "markersummaries already exist"; + goto flail; + } + + diffacc = shareacc = 0; + ms_item = NULL; + + looksharesummary.workinfoid = workmarkers->workinfoidend; + looksharesummary.userid = MAXID; + looksharesummary.workername = EMPTY; + + INIT_SHARESUMMARY(&ss_look); + ss_look.data = (void *)(&looksharesummary); + /* Since shares come in from ckpool at a high rate, + * we don't want to lock sharesummary for long + * Those incoming shares will not be touching the sharesummaries + * we are processing here */ + K_RLOCK(sharesummary_free); + ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look, + cmp_sharesummary_workinfoid, ss_ctx); + K_RUNLOCK(sharesummary_free); + while (ss_item) { + DATA_SHARESUMMARY(sharesummary, ss_item); + if (sharesummary->workinfoid < workmarkers->workinfoidstart) + break; + K_RLOCK(sharesummary_free); + ss_prev = prev_in_ktree(ss_ctx); + K_RUNLOCK(sharesummary_free); + + // Find/create the markersummary only once per worker change + if (!ms_item || markersummary->userid != sharesummary->userid || + strcmp(markersummary->workername, sharesummary->workername) != 0) { + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = sharesummary->userid; + lookmarkersummary.workername = sharesummary->workername; + + ms_look.data = (void *)(&lookmarkersummary); + ms_item = find_in_ktree(ms_root, &ms_look, + cmp_markersummary, ms_ctx); + if (!ms_item) { + K_WLOCK(markersummary_free); + ms_item = k_unlink_head(markersummary_free); + K_WUNLOCK(markersummary_free); + k_add_head(new_markersummary_store, ms_item); + DATA_MARKERSUMMARY(markersummary, ms_item); + bzero(markersummary, sizeof(*markersummary)); + markersummary->markerid = workmarkers->markerid; + markersummary->userid = sharesummary->userid; + markersummary->workername = strdup(sharesummary->workername); + LIST_MEM_ADD(markersummary_free, markersummary->workername); + ms_root = add_to_ktree(ms_root, ms_item, cmp_markersummary); + + LOGDEBUG("%s() new ms %"PRId64"/%"PRId64"/%s", + shortname, markersummary->markerid, + markersummary->userid, + markersummary->workername); + } else { + DATA_MARKERSUMMARY(markersummary, ms_item); + } + } + markersummary->diffacc += sharesummary->diffacc; + markersummary->diffsta += sharesummary->diffsta; + markersummary->diffdup += sharesummary->diffdup; + markersummary->diffhi += sharesummary->diffhi; + markersummary->diffrej += sharesummary->diffrej; + markersummary->shareacc += sharesummary->shareacc; + markersummary->sharesta += sharesummary->sharesta; + markersummary->sharedup += sharesummary->sharedup; + markersummary->sharehi += sharesummary->sharehi; + markersummary->sharerej += sharesummary->sharerej; + markersummary->sharecount += sharesummary->sharecount; + markersummary->errorcount += sharesummary->errorcount; + if (!markersummary->firstshare.tv_sec || + !tv_newer(&(markersummary->firstshare), &(sharesummary->firstshare))) { + copy_tv(&(markersummary->firstshare), &(sharesummary->firstshare)); + } + if (tv_newer(&(markersummary->lastshare), &(sharesummary->lastshare))) { + copy_tv(&(markersummary->lastshare), &(sharesummary->lastshare)); + markersummary->lastdiffacc = sharesummary->lastdiffacc; + } + + diffacc += sharesummary->diffacc; + shareacc += sharesummary->shareacc; + + k_unlink_item(sharesummary_store, ss_item); + k_add_head(old_sharesummary_store, ss_item); + + ss_item = ss_prev; + } + + if (old_sharesummary_store->count == 0) + reason = "no sharesummaries"; + else { + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto flail; + } + + ms_item = new_markersummary_store->head; + while (ms_item) { + if (!(markersummary_add(conn, ms_item, by, code, inet, + cd, trf_root))) { + reason = "db error"; + goto rollback; + } + ms_item = ms_item->next; + } + + par = 0; + params[par++] = bigint_to_buf(workmarkers->workinfoidstart, NULL, 0); + params[par++] = bigint_to_buf(workmarkers->workinfoidend, NULL, 0); + PARCHK(par, params); + + del = "delete from sharesummary " + "where workinfoid >= $1 and workinfoid <= $2"; + + res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (PGOK(rescode)) { + tuples = PQcmdTuples(res); + if (tuples && *tuples) + deleted = atoi(tuples); + } + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Delete", rescode, conn); + reason = "delete failure"; + goto rollback; + } + + if (deleted != old_sharesummary_store->count) { + LOGERR("%s() processed sharesummaries=%d but deleted=%d", + shortname, old_sharesummary_store->count, deleted); + reason = "delete mismatch"; + goto rollback; + } + + ok = workmarkers_process(conn, true, true, + workmarkers->markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_PROCESSED_STR, + by, code, inet, cd, trf_root); +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); + } +flail: + for (n = 0; n < par; n++) + free(params[n]); + + if (conned) + PQfinish(conn); + + if (reason) { + // already displayed the full workmarkers detail at the top + LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s", + shortname, reason, workmarkers->markerid, + workmarkers->description, workmarkers->status); + + ok = false; + } + + if (!ok) { + if (new_markersummary_store->count > 0) { + // Throw them away (they don't exist anywhere else) + ms_item = new_markersummary_store->head; + while (ms_item) { + free_markersummary_data(ms_item); + ms_item = ms_item->next; + } + K_WLOCK(markersummary_free); + k_list_transfer_to_head(new_markersummary_store, markersummary_free); + K_WUNLOCK(markersummary_free); + } + if (old_sharesummary_store->count > 0) { + // Put them back in the store where they came from + K_WLOCK(sharesummary_free); + k_list_transfer_to_head(old_sharesummary_store, sharesummary_store); + K_WUNLOCK(sharesummary_free); + } + } else { + ms_count = new_markersummary_store->count; + ss_count = old_sharesummary_store->count; + // Deadlock alert for other newer code ... + K_WLOCK(sharesummary_free); + K_WLOCK(markersummary_free); + ms_item = new_markersummary_store->head; + while (ms_item) { + // Move the new markersummaries into the trees/stores + markersummary_root = add_to_ktree(markersummary_root, + ms_item, + cmp_markersummary); + markersummary_userid_root = add_to_ktree(markersummary_userid_root, + ms_item, + cmp_markersummary_userid); + ms_item = ms_item->next; + } + k_list_transfer_to_head(new_markersummary_store, markersummary_store); + + /* For normal shift processing this wont be very quick + * so it will be a 'long' LOCK */ + ss_item = old_sharesummary_store->head; + while (ss_item) { + // remove the old sharesummaries from the trees + sharesummary_root = remove_from_ktree(sharesummary_root, + ss_item, + cmp_sharesummary); + sharesummary_workinfoid_root = remove_from_ktree(sharesummary_workinfoid_root, + ss_item, + cmp_sharesummary_workinfoid); + free_sharesummary_data(ss_item); + + ss_item = ss_item->next; + } + k_list_transfer_to_head(old_sharesummary_store, sharesummary_free); + K_WUNLOCK(markersummary_free); + K_WUNLOCK(sharesummary_free); + + LOGWARNING("%s() Processed: %d ms %d ss %"PRId64" shares " + "%"PRId64" diff for workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, ms_count, ss_count, shareacc, diffacc, + workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + workmarkers->status); + } + ms_root = free_ktree(ms_root, NULL); + new_markersummary_store = k_free_store(new_markersummary_store); + old_sharesummary_store = k_free_store(old_sharesummary_store); + + return ok; +} + bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) { @@ -5204,6 +5517,82 @@ clean: return ok; } +bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res; + MARKERSUMMARY *row; + char *params[18 + MODIFYDATECOUNT]; + int n, par = 0; + char *ins; + bool ok = false; + + LOGDEBUG("%s(): add", __func__); + + DATA_MARKERSUMMARY(row, ms_item); + + MODIFYDATEPOINTERS(markersummary_free, row, cd, by, code, inet); + MODIFYDATETRANSFER(markersummary_free, trf_root, row); + + par = 0; + params[par++] = bigint_to_buf(row->markerid, NULL, 0); + params[par++] = bigint_to_buf(row->userid, NULL, 0); + params[par++] = str_to_buf(row->workername, NULL, 0); + params[par++] = double_to_buf(row->diffacc, NULL, 0); + params[par++] = double_to_buf(row->diffsta, NULL, 0); + params[par++] = double_to_buf(row->diffdup, NULL, 0); + params[par++] = double_to_buf(row->diffhi, NULL, 0); + params[par++] = double_to_buf(row->diffrej, NULL, 0); + params[par++] = double_to_buf(row->shareacc, NULL, 0); + params[par++] = double_to_buf(row->sharesta, NULL, 0); + params[par++] = double_to_buf(row->sharedup, NULL, 0); + params[par++] = double_to_buf(row->sharehi, NULL, 0); + params[par++] = double_to_buf(row->sharerej, NULL, 0); + params[par++] = bigint_to_buf(row->sharecount, NULL, 0); + params[par++] = bigint_to_buf(row->errorcount, NULL, 0); + params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); + params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); + MODIFYDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into markersummary " + "(markerid,userid,workername,diffacc,diffsta,diffdup,diffhi," + "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," + "sharecount,errorcount,firstshare,lastshare,lastdiffacc" + MODIFYDATECONTROL ") values (" PQPARAM26 ")"; + + LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f", + __func__, row->markerid, row->userid, row->workername, + row->diffacc); + + if (!conn) { + conn = dbconnect(); + conned = true; + } + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto unparam; + } + + ok = true; +unparam: + PQclear(res); + if (conned) + PQfinish(conn); + for (n = 0; n < par; n++) + free(params[n]); + + // caller must do tree/list/store changes + + return ok; +} + bool markersummary_fill(PGconn *conn) { ExecStatusType rescode; @@ -5380,10 +5769,10 @@ bool markersummary_fill(PGconn *conn) * since we only check for a CURRENT workmarkers * N.B. also, this returns success if !add and there is no matching * old workmarkers */ -bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, - char *poolinstance, int64_t workinfoidend, - int64_t workinfoidstart, char *description, - char *status, char *by, char *code, +bool _workmarkers_process(PGconn *conn, bool already, bool add, + int64_t markerid, char *poolinstance, + int64_t workinfoidend, int64_t workinfoidstart, + char *description, char *status, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, WHERE_FFL_ARGS) { @@ -5391,7 +5780,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, bool conned = false; PGresult *res = NULL; K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item; - WORKMARKERS *row, *oldworkmarkers; + WORKMARKERS *row, *oldworkmarkers = NULL; char *upd, *ins; char *params[6 + HISTORYDATECOUNT]; bool ok = false, begun = false; @@ -5414,15 +5803,17 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, conn = dbconnect(); conned = true; } - res = PQexec(conn, "Begin", CKPQ_WRITE); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Begin", rescode, conn); - goto unparam; - } + if (!already) { + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } - begun = true; + begun = true; + } upd = "update workmarkers set expirydate=$1 where markerid=$2" " and expirydate=$3"; @@ -5473,7 +5864,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, conned = true; } - if (!begun) { + if (!already && !begun) { res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); @@ -5707,7 +6098,7 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance, bool conned = false; PGresult *res = NULL; K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item; - MARKS *row, *oldmarks; + MARKS *row, *oldmarks = NULL; char *upd, *ins; char *params[6 + HISTORYDATECOUNT]; bool ok = false, begun = false; From cc86234cd0d2c582940d923c716a85a063ace7c1 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 15 Jan 2015 21:16:36 +1100 Subject: [PATCH 06/56] ckdb - allow setting workmarkers as processed via marks command --- src/ckdb.h | 2 +- src/ckdb_cmd.c | 37 +++++++++++++++++++++++++++++++++++-- src/ckdb_dbio.c | 4 +++- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 341ced0c..03184147 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.830" +#define CKDB_VERSION DB_VERSION"-0.831" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 233b36bf..5ece47c8 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -4105,13 +4105,13 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, char msg[1024] = ""; K_ITEM *i_action, *i_workinfoid, *i_marktype, *i_description; K_ITEM *i_height, *i_status, *i_extra, *m_item, *b_item, *w_item; - K_ITEM *wm_item, *wm_item_prev; + K_ITEM *wm_item, *wm_item_prev, *i_markerid; WORKMARKERS *workmarkers; K_TREE_CTX ctx[1]; BLOCKS *blocks; MARKS *marks; char *action; - int64_t workinfoid = -1; + int64_t workinfoid = -1, markerid = -1; char *marktype; int32_t height = 0; char description[TXT_BIG+1] = { '\0' }; @@ -4401,6 +4401,39 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, LOGERR("%s.%s", id, reply); return strdup(reply); } + } else if (strcasecmp(action, "processed") == 0) { + /* Mark a workmarker as processed + * Requires markerid */ + i_markerid = require_name(trf_root, "markerid", 1, (char *)intpatt, reply, siz); + if (!i_markerid) + return strdup(reply); + TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid); + K_RLOCK(workmarkers_free); + wm_item = find_workmarkerid(markerid, true, '\0'); + K_RUNLOCK(workmarkers_free); + if (!wm_item) { + snprintf(reply, siz, + "unknown workmarkers with markerid %"PRId64, markerid); + return strdup(reply); + } + DATA_WORKMARKERS(workmarkers, wm_item); + if (WMPROCESSED(workmarkers->status)) { + snprintf(reply, siz, + "already processed markerid %"PRId64, markerid); + return strdup(reply); + } + ok = workmarkers_process(NULL, false, true, markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_PROCESSED_STR, + by, code, inet, cd, trf_root); + if (!ok) { + snprintf(reply, siz, "%s failed", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s.%s", id, reply); diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index c4385982..74877f58 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -5756,7 +5756,9 @@ bool markersummary_fill(PGconn *conn) return ok; } -/* Add means create a new one and expire the old one if it exists, +/* Already means there is a transaction already in progress + * so don't begin or commit/rollback + * Add means create a new one and expire the old one if it exists, * otherwise we only expire the old one if it exists * Add requires all db fields except markerid, however if markerid * is non-zero, it will be used instead of getting a new one From 23c13f864789aab2bd2052e333834923d1c41204 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 15 Jan 2015 21:25:17 +1100 Subject: [PATCH 07/56] ckdb - workmarkers processing: no sharesummaries is not an error --- src/ckdb.h | 2 +- src/ckdb_dbio.c | 70 ++++++++++++++++++++++++------------------------- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 03184147..b0733d55 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.831" +#define CKDB_VERSION DB_VERSION"-0.832" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 74877f58..6c453461 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -2904,32 +2904,30 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, ss_item = ss_prev; } - if (old_sharesummary_store->count == 0) - reason = "no sharesummaries"; - else { - if (conn == NULL) { - conn = dbconnect(); - conned = true; - } + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } - res = PQexec(conn, "Begin", CKPQ_WRITE); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Begin", rescode, conn); - goto flail; - } + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto flail; + } - ms_item = new_markersummary_store->head; - while (ms_item) { - if (!(markersummary_add(conn, ms_item, by, code, inet, - cd, trf_root))) { - reason = "db error"; - goto rollback; - } - ms_item = ms_item->next; + ms_item = new_markersummary_store->head; + while (ms_item) { + if (!(markersummary_add(conn, ms_item, by, code, inet, + cd, trf_root))) { + reason = "db error"; + goto rollback; } + ms_item = ms_item->next; + } + if (old_sharesummary_store->count > 0) { par = 0; params[par++] = bigint_to_buf(workmarkers->workinfoidstart, NULL, 0); params[par++] = bigint_to_buf(workmarkers->workinfoidend, NULL, 0); @@ -2958,23 +2956,23 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, reason = "delete mismatch"; goto rollback; } + } - ok = workmarkers_process(conn, true, true, - workmarkers->markerid, - workmarkers->poolinstance, - workmarkers->workinfoidend, - workmarkers->workinfoidstart, - workmarkers->description, - MARKER_PROCESSED_STR, - by, code, inet, cd, trf_root); + ok = workmarkers_process(conn, true, true, + workmarkers->markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_PROCESSED_STR, + by, code, inet, cd, trf_root); rollback: - if (ok) - res = PQexec(conn, "Commit", CKPQ_WRITE); - else - res = PQexec(conn, "Rollback", CKPQ_WRITE); + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); - PQclear(res); - } + PQclear(res); flail: for (n = 0; n < par; n++) free(params[n]); From 4d72f49061b92a625388a85fce5324b721ae7f34 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 16 Jan 2015 09:41:50 +1100 Subject: [PATCH 08/56] Preauth workers for the first 10 minutes after a user first authorises --- src/stratifier.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 0b974d5f..bb189934 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -180,6 +180,7 @@ struct user_instance { double dsps1440; double dsps10080; tv_t last_share; + time_t auth_time; }; /* Combined data from workers with the same workername */ @@ -1953,6 +1954,8 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); json_get_int(&worker->mindiff, val, "difficultydefault"); client->suggest_diff = worker->mindiff; + if (!user_instance->auth_time) + user_instance->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || !safecmp(response, "ok.addrauth"))) { @@ -2054,7 +2057,13 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j if (CKP_STANDALONE(ckp)) ret = true; else { - *errnum = send_recv_auth(client); + /* Preauth workers for the first 10 minutes after the user is + * first authorised by ckdb to avoid floods of worker auths. + * *errnum is implied zero already so ret will be set true */ + if (user_instance->auth_time && time(NULL) - user_instance->auth_time < 600) + queue_delayed_auth(client); + else + *errnum = send_recv_auth(client); if (!*errnum) ret = true; else if (*errnum < 0 && user_instance->secondaryuserid) { From d64a0fa5ded801baeb0ecb8d695567ed4c46c31c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 16 Jan 2015 11:52:34 +1100 Subject: [PATCH 09/56] Dead instances are a singly linked list only Conflicts: src/stratifier.c --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index bb189934..8756a851 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1252,7 +1252,7 @@ static void drop_client(sdata_t *sdata, int64_t id) ck_wlock(&sdata->instance_lock); if (client) __dec_instance_ref(client); - DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { + LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { if (!client->ref) { LOGINFO("Stratifier discarding instance %ld", client->id); LL_DELETE(sdata->dead_instances, client); From 19a9b244b3167ba01c0759a846036cab7261d08e Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 16 Jan 2015 14:32:27 +1100 Subject: [PATCH 10/56] ckdb - log steps in the storage dealloc during shutdown --- src/ckdb.c | 16 ++++++++++++++++ src/ckdb.h | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ckdb.c b/src/ckdb.c index 91bb3fdc..3b9858a2 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1117,6 +1117,8 @@ static void alloc_storage() static void dealloc_storage() { + LOGWARNING("%s() logqueue ...", __func__); + FREE_LISTS(logqueue); FREE_TREE(marks); @@ -1128,6 +1130,8 @@ static void dealloc_storage() FREE_STORE_DATA(workmarkers); FREE_LIST_DATA(workmarkers); + LOGWARNING("%s() markersummary ...", __func__); + FREE_TREE(markersummary_userid); FREE_TREE(markersummary); FREE_STORE_DATA(markersummary); @@ -1135,6 +1139,8 @@ static void dealloc_storage() FREE_ALL(workerstatus); + LOGWARNING("%s() userstats ...", __func__); + FREE_TREE(userstats_workerstatus); FREE_TREE(userstats_statsdate); if (userstats_summ) @@ -1142,11 +1148,15 @@ static void dealloc_storage() FREE_STORE(userstats_eos); FREE_ALL(userstats); + LOGWARNING("%s() poolstats ...", __func__); + FREE_ALL(poolstats); FREE_ALL(auths); FREE_ALL(miningpayouts); FREE_ALL(blocks); + LOGWARNING("%s() sharesummary ...", __func__); + FREE_TREE(sharesummary_workinfoid); FREE_TREE(sharesummary); FREE_STORE_DATA(sharesummary); @@ -1155,6 +1165,8 @@ static void dealloc_storage() FREE_ALL(shareerrors); FREE_ALL(shares); + LOGWARNING("%s() workinfo ...", __func__); + FREE_TREE(workinfo_height); FREE_TREE(workinfo); FREE_STORE_DATA(workinfo); @@ -1174,9 +1186,13 @@ static void dealloc_storage() FREE_TREE(userid); FREE_ALL(users); + LOGWARNING("%s() transfer/heartbeatqueue/workqueue ...", __func__); + FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); FREE_LISTS(workqueue); + + LOGWARNING("%s() finished", __func__); } static bool setup_data() diff --git a/src/ckdb.h b/src/ckdb.h index b0733d55..443c32c6 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.832" +#define CKDB_VERSION DB_VERSION"-0.833" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From 468a24c51e88ed42ee8a5f3fc41d5b5e2e9824d1 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 16 Jan 2015 14:46:34 +1100 Subject: [PATCH 11/56] ckdb - since they are as yet unused, don't store new auths in ram --- src/ckdb.h | 2 +- src/ckdb_dbio.c | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 443c32c6..4de78164 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.833" +#define CKDB_VERSION DB_VERSION"-0.834" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 6c453461..0a903aad 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -4627,12 +4627,18 @@ unitem: if (conned) PQfinish(conn); K_WLOCK(auths_free); +#if 1 + /* To save ram for now, don't store them, + * we don't actually use them anywhere yet */ + k_add_head(auths_free, a_item); +#else if (!ok) k_add_head(auths_free, a_item); else { auths_root = add_to_ktree(auths_root, a_item, cmp_auths); k_add_head(auths_store, a_item); } +#endif K_WUNLOCK(auths_free); return ok; @@ -4644,25 +4650,30 @@ bool auths_fill(PGconn *conn) PGresult *res; K_ITEM *item; AUTHS *row; -// char *params[1]; + char *params[2]; int n, i; -// int par = 0; + int par = 0; char *field; char *sel; - int fields = 7 + 1; // +1 = 'best' + int fields = 7; bool ok; + tv_t now; LOGDEBUG("%s(): select", __func__); // TODO: add/update a (single) fake auth every ~10min or 10min after the last one? -#if 0 + sel = "select " "authid,userid,workername,clientid,enonce1,useragent,preauth" HISTORYDATECONTROL - " from auths where expirydate=$1"; + " from auths where expirydate=$1 and createdate>=$2"; + + setnow(&now); + now.tv_sec -= (24 * 60 * 60); // last day worth par = 0; params[par++] = tv_to_buf((tv_t *)(&default_expiry), NULL, 0); + params[par++] = tv_to_buf((tv_t *)(&now), NULL, 0); PARCHK(par, params); res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); @@ -4671,8 +4682,8 @@ bool auths_fill(PGconn *conn) PQclear(res); return false; } -#endif +#if 0 // Only load the last record for each workername sel = "with last as (" "select authid,userid,workername,clientid,enonce1,useragent,preauth" @@ -4689,6 +4700,7 @@ bool auths_fill(PGconn *conn) PQclear(res); return false; } +#endif n = PQnfields(res); if (n != (fields + HISTORYDATECOUNT)) { From 69ec5e702cc7aabdd2bcb8ee5d84f3eb9fc8ebf6 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 16 Jan 2015 18:31:55 +1100 Subject: [PATCH 12/56] ckdb - add a flush socket command to flush stdin, stdout and the logmsg log file --- src/ckdb.c | 12 ++++++++++++ src/ckdb.h | 3 ++- src/ckdb_cmd.c | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/ckdb.c b/src/ckdb.c index 3b9858a2..71b22bea 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2720,6 +2720,17 @@ static void *socketer(__maybe_unused void *arg) } send_unix_msg(sockd, reply); break; + case CMD_FLUSH: + LOGDEBUG("Listener received flush request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.splash", + id, now.tv_sec); + send_unix_msg(sockd, reply); + fflush(stdout); + fflush(stderr); + if (global_ckp && global_ckp->logfd) + fflush(global_ckp->logfp); + break; // Always process immediately: case CMD_AUTH: case CMD_ADDRAUTH: @@ -2996,6 +3007,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_PING: case CMD_VERSION: case CMD_LOGLEVEL: + case CMD_FLUSH: // Non pool commands, shouldn't be there case CMD_ADDUSER: case CMD_NEWPASS: diff --git a/src/ckdb.h b/src/ckdb.h index 4de78164..40da3ffa 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.834" +#define CKDB_VERSION DB_VERSION"-0.835" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -290,6 +290,7 @@ enum cmd_values { CMD_PING, CMD_VERSION, CMD_LOGLEVEL, + CMD_FLUSH, CMD_SHARELOG, CMD_AUTH, CMD_ADDRAUTH, diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 5ece47c8..37fbd179 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -4525,6 +4525,7 @@ struct CMDS ckdb_cmds[] = { { CMD_PING, "ping", true, false, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, { CMD_VERSION, "version", true, false, NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB }, { CMD_LOGLEVEL, "loglevel", true, false, NULL, ACCESS_SYSTEM }, + { CMD_FLUSH, "flush", true, false, NULL, ACCESS_SYSTEM }, { CMD_SHARELOG, STR_WORKINFO, false, true, cmd_sharelog, ACCESS_POOL }, { CMD_SHARELOG, STR_SHARES, false, true, cmd_sharelog, ACCESS_POOL }, { CMD_SHARELOG, STR_SHAREERRORS, false, true, cmd_sharelog, ACCESS_POOL }, From 6a35a02c777308c874cfb2c4e15815c3f08ee805 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 16 Jan 2015 20:22:09 +1100 Subject: [PATCH 13/56] php - convert a failed blocks access to pblocks --- pool/prime.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pool/prime.php b/pool/prime.php index 529e0438..2ac16d2e 100644 --- a/pool/prime.php +++ b/pool/prime.php @@ -22,6 +22,7 @@ function process($p, $user, $menu) $menu['Admin']['PPLNS'] = 'pplns'; $menu['Admin']['AllWork'] = 'allwork'; } +bp: $page = ''; $n = ''; foreach ($menu as $item => $options) @@ -33,6 +34,11 @@ function process($p, $user, $menu) $n = " - $name"; } + if ($page === '' and $p == 'blocks') + { + $p = 'pblocks'; + goto bp; + } if ($page === '') showPage($info, 'index', $menu, '', $user); else From 60a26a5cf060a10cfce617cc4c860609ab3e3700 Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 16 Jan 2015 23:11:31 +1100 Subject: [PATCH 14/56] ckdb - add debug to pplns --- src/ckdb.h | 2 +- src/ckdb_cmd.c | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 40da3ffa..d243a037 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.835" +#define CKDB_VERSION DB_VERSION"-0.836" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 37fbd179..c64775ec 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -3502,6 +3502,8 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, allow_aged = true; } + LOGDEBUG("%s(): height %"PRId32, __func__, height); + block_tv.tv_sec = block_tv.tv_usec = 0L; cd.tv_sec = cd.tv_usec = 0L; lookblocks.height = height + 1; @@ -3538,6 +3540,9 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, blocks_confirmed(BLOCKS_NEW_STR)); return strdup(reply); } + LOGDEBUG("%s(): block %"PRId32"/%"PRId64"/%s/%s/%"PRId64, + __func__, blocks->height, blocks->workinfoid, + blocks->workername, blocks->confirmed, blocks->reward); switch (blocks->confirmed[0]) { case BLOCKS_NEW: block_extra = "Can't be paid out yet"; @@ -3571,7 +3576,9 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, countbacklimit = true; else countbacklimit = false; - + LOGDEBUG("%s(): ndiff %.0f limit=%s", + __func__, ndiff, + countbacklimit ? "true" : "false"); begin_workinfoid = end_workinfoid = 0; total_share_count = acc_share_count = 0; total_diff = 0; @@ -3649,7 +3656,8 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, ss_item = prev_in_ktree(ctx); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); } - + LOGDEBUG("%s(): ss %"PRId64" total %.0f want %.0f", + __func__, ss_count, total_diff, diff_want); /* If we haven't met or exceeded the required N, * move on to the markersummaries */ if (total_diff < diff_want) { @@ -3664,6 +3672,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, wm_item = find_before_in_ktree(workmarkers_workinfoid_root, &wm_look, cmp_workmarkers_workinfoid, wm_ctx); DATA_WORKMARKERS_NULL(workmarkers, wm_item); + LOGDEBUG("%s(): workmarkers < %"PRId64, __func__, lookworkmarkers.workinfoidend); while (total_diff < diff_want && wm_item && CURRENT(&(workmarkers->expirydate))) { if (WMPROCESSED(workmarkers->status)) { // Stop before FIVExWID if necessary @@ -3700,11 +3709,14 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, wm_item = prev_in_ktree(wm_ctx); DATA_WORKMARKERS_NULL(workmarkers, wm_item); } + LOGDEBUG("%s(): wm %"PRId64" ms %"PRId64" total %.0f want %.0f", + __func__, wm_count, ms_count, total_diff, diff_want); } K_RUNLOCK(markersummary_free); K_RUNLOCK(workmarkers_free); K_RUNLOCK(sharesummary_free); + LOGDEBUG("%s(): total %.0f want %.0f", __func__, total_diff, diff_want); if (total_diff == 0.0) { snprintf(reply, siz, "ERR.total share diff 0 before workinfo %"PRId64, From bba8cc5a5491dcf1eec1efaa33a366056b47fd86 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sat, 17 Jan 2015 13:20:41 +1100 Subject: [PATCH 15/56] ckdb - workmarkers has 2 trees, fix so both are always updated --- src/ckdb.h | 2 +- src/ckdb_dbio.c | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/ckdb.h b/src/ckdb.h index d243a037..731607a5 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.836" +#define CKDB_VERSION DB_VERSION"-0.837" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 0a903aad..f2b194cb 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -5978,15 +5978,24 @@ unparam: workmarkers_root = remove_from_ktree(workmarkers_root, old_wm_item, cmp_workmarkers); + workmarkers_workinfoid_root = remove_from_ktree(workmarkers_workinfoid_root, + old_wm_item, + cmp_workmarkers_workinfoid); copy_tv(&(oldworkmarkers->expirydate), cd); workmarkers_root = add_to_ktree(workmarkers_root, old_wm_item, cmp_workmarkers); + workmarkers_workinfoid_root = add_to_ktree(workmarkers_workinfoid_root, + old_wm_item, + cmp_workmarkers_workinfoid); } if (wm_item) { workmarkers_root = add_to_ktree(workmarkers_root, wm_item, cmp_workmarkers); + workmarkers_workinfoid_root = add_to_ktree(workmarkers_workinfoid_root, + wm_item, + cmp_workmarkers_workinfoid); k_add_head(workmarkers_store, wm_item); } } From 6c6912b0cd6420953e0dec7c4ea24a6657764789 Mon Sep 17 00:00:00 2001 From: kanoi Date: Sat, 17 Jan 2015 21:09:58 +1100 Subject: [PATCH 16/56] php - update the payout details to match the new shift code --- pool/page_payout.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool/page_payout.php b/pool/page_payout.php index 5949edf3..2363c1a3 100644 --- a/pool/page_payout.php +++ b/pool/page_payout.php @@ -10,7 +10,7 @@ function dopayout($data, $user) $pg .= ' when a block is found,
'; $pg .= 'but includes the full shift at the start and end of the range,
'; $pg .= 'so it usually will be a bit more than 5N.

'; - $pg .= 'Shifts are ~30s long, however, when any network block is found
'; + $pg .= 'Shifts are ~50min long, however, when we find any pool blocks
'; $pg .= 'the current shift ends at the point the block was found.
'; $pg .= 'A ckpool restart will also start a new shift.

'; $pg .= 'Transaction fees are included in the miner payout.
'; From 5cff5ae477a6f5374871e2234d7e1c5bb9679d38 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 18 Jan 2015 09:04:33 +1100 Subject: [PATCH 17/56] Drop two potentally unhandled reference counts --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 8756a851..bfac1dcc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2897,7 +2897,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val /* Shouldn't happen, sanity check */ if (unlikely(!result_val)) { LOGWARNING("parse_subscribe returned NULL result_val"); - return; + goto out; } val = json_object(); json_object_set_new_nocheck(val, "result", result_val); @@ -2952,7 +2952,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val if (cmdmatch(method, "mining.suggest")) { suggest_diff(client, method, params_val); - return; + goto out; } /* Covers both get_transactions and get_txnhashes */ From 62117bb38004fca58543ff930d1910d17a42a773 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 18 Jan 2015 09:24:30 +1100 Subject: [PATCH 18/56] Reset all best share counters after a block solve --- src/stratifier.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index bfac1dcc..9a86a8eb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1295,6 +1295,26 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) stratum_broadcast(sdata, json_msg); } +static void reset_bestshares(sdata_t *sdata) +{ + user_instance_t *instance, *tmpuser; + stratum_instance_t *client, *tmp; + + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + client->best_diff = 0; + } + HASH_ITER(hh, sdata->user_instances, instance, tmpuser) { + worker_instance_t *worker; + + instance->best_diff = 0; + DL_FOREACH(instance->worker_instances, worker) { + worker->best_diff = 0; + } + } + ck_runlock(&sdata->instance_lock); +} + static void block_solve(ckpool_t *ckp, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; @@ -1348,6 +1368,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) free(msg); LOGWARNING("Solved and confirmed block %d", height); + reset_bestshares(sdata); } static void block_reject(sdata_t *sdata, const char *blockhash) From 6b9c558abc1630de1f48069b519b35ced19e3fc2 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 18 Jan 2015 09:36:47 +1100 Subject: [PATCH 19/56] Add a json_get_double helper --- src/ckpool.c | 20 ++++++++++++++++++++ src/ckpool.h | 1 + 2 files changed, 21 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index bef14365..60374273 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -997,6 +997,26 @@ out: return ret; } +bool json_get_double(double *store, json_t *val, const char *res) +{ + json_t *entry = json_object_get(val, res); + bool ret = false; + + if (!entry) { + LOGDEBUG("Json did not find entry %s", res); + goto out; + } + if (!json_is_real(entry)) { + LOGWARNING("Json entry %s is not a double", res); + goto out; + } + *store = json_real_value(entry); + LOGDEBUG("Json found entry %s: %f", res, *store); + ret = true; +out: + return ret; +} + static bool json_get_bool(bool *store, json_t *val, const char *res) { json_t *entry = json_object_get(val, res); diff --git a/src/ckpool.h b/src/ckpool.h index b8293474..0b28aa40 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -214,5 +214,6 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret); bool json_get_string(char **store, json_t *val, const char *res); bool json_get_int(int *store, json_t *val, const char *res); +bool json_get_double(double *store, json_t *val, const char *res); #endif /* CKPOOL_H */ From 2231a38455451ed3bc30d835487dd18c5ebc5adc Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 18 Jan 2015 09:40:32 +1100 Subject: [PATCH 20/56] Read off best share stats if they exist per user and worker --- src/stratifier.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9a86a8eb..d64d0e08 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1796,9 +1796,10 @@ static void read_userstats(ckpool_t *ckp, user_instance_t *instance) instance->dsps60 = dsps_from_key(val, "hashrate1hr"); instance->dsps1440 = dsps_from_key(val, "hashrate1d"); instance->dsps10080 = dsps_from_key(val, "hashrate7d"); - LOGINFO("Successfully read user %s stats %f %f %f %f %f", instance->username, + json_get_double(&instance->best_diff, val, "bestshare"); + LOGINFO("Successfully read user %s stats %f %f %f %f %f %f", instance->username, instance->dsps1, instance->dsps5, instance->dsps60, instance->dsps1440, - instance->dsps10080); + instance->dsps10080, instance->best_diff); json_decref(val); } @@ -1833,8 +1834,9 @@ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) worker->dsps5 = dsps_from_key(val, "hashrate5m"); worker->dsps60 = dsps_from_key(val, "hashrate1d"); worker->dsps1440 = dsps_from_key(val, "hashrate1d"); - LOGINFO("Successfully read worker %s stats %f %f %f %f", worker->workername, - worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440); + json_get_double(&worker->best_diff, val, "bestshare"); + LOGINFO("Successfully read worker %s stats %f %f %f %f %f", worker->workername, + worker->dsps1, worker->dsps5, worker->dsps60, worker->dsps1440, worker->best_diff); json_decref(val); } From 8c6e786cef78997d683d9354d26034cf36b2a341 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 19:22:13 +1100 Subject: [PATCH 21/56] Check for corrupt ckdb responses in send_recv_auth --- src/stratifier.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index d64d0e08..11a2fcc7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1944,7 +1944,7 @@ static int send_recv_auth(stratum_instance_t *client) json_msg = ckdb_msg(ckp, val, ID_AUTH); if (unlikely(!json_msg)) { LOGWARNING("Failed to dump json in send_recv_auth"); - return ret; + goto out; } /* We want responses from ckdb serialised and not interleaved with @@ -1964,7 +1964,10 @@ static int send_recv_auth(stratum_instance_t *client) json_t *val = NULL; LOGINFO("Got ckdb response: %s", buf); - sscanf(buf, "id.%*d.%s", response); + if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { + LOGWARNING("Got unparseable ckdb auth response: %s", buf); + goto out_fail; + } cmd = response; strsep(&cmd, "="); LOGINFO("User %s Worker %s got auth response: %s cmd: %s", @@ -1972,7 +1975,7 @@ static int send_recv_auth(stratum_instance_t *client) response, cmd); val = json_loads(cmd, 0, &err_val); if (unlikely(!val)) - LOGINFO("AUTH JSON decode failed(%d): %s", err_val.line, err_val.text); + LOGWARNING("AUTH JSON decode failed(%d): %s", err_val.line, err_val.text); else { json_get_string(&secondaryuserid, val, "secondaryuserid"); json_get_int(&worker->mindiff, val, "difficultydefault"); @@ -1990,11 +1993,12 @@ static int send_recv_auth(stratum_instance_t *client) } if (likely(val)) json_decref(val); - } else { - ret = -1; - LOGWARNING("Got no auth response from ckdb :("); + goto out; } - + LOGWARNING("Got no auth response from ckdb :("); +out_fail: + ret = -1; +out: return ret; } From c7600f281b2df15d24c9cb777dfec7469d74dae0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 20:46:33 +1100 Subject: [PATCH 22/56] Differentiate generated authed users from unauthorised and don't do any stats on unauthed users --- src/stratifier.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 11a2fcc7..d3511277 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -180,6 +180,8 @@ struct user_instance { double dsps1440; double dsps10080; tv_t last_share; + + bool authorised; /* Has this username ever been authorised? */ time_t auth_time; }; @@ -2078,8 +2080,6 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j client->start_time = now.tv_sec; strcpy(client->address, address); - LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, - user_instance->username); client->workername = strdup(buf); if (CKP_STANDALONE(ckp)) ret = true; @@ -2101,9 +2101,16 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j ret = true; } } - client->authorised = ret; - if (client->authorised) + if (ret) { + client->authorised = ret; + user_instance->authorised = ret; inc_worker(ckp, user_instance); + LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, + user_instance->username); + } else { + LOGNOTICE("Client %ld worker %s failed to authorise as user %s", client->id, buf, + user_instance->username); + } out: return json_boolean(ret); } @@ -3401,6 +3408,9 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) worker_instance_t *worker; uint8_t cycle_mask; + if (!user->authorised) + continue; + /* Select users using a mask to return each user's stats once * every ~10 minutes */ cycle_mask = user->id & 0x1f; @@ -3572,6 +3582,9 @@ static void *statsupdate(void *arg) worker_instance_t *worker; bool idle = false; + if (!instance->authorised) + continue; + /* Decay times per worker */ DL_FOREACH(instance->worker_instances, worker) { per_tdiff = tvdiff(&now, &worker->last_share); From 451256f6cb723b69927562f30a026fc790b04114 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:01:34 +1100 Subject: [PATCH 23/56] Only give unparseable auth message when failed message is not recognised --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index d3511277..aeba3757 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1967,7 +1967,8 @@ static int send_recv_auth(stratum_instance_t *client) LOGINFO("Got ckdb response: %s", buf); if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { - LOGWARNING("Got unparseable ckdb auth response: %s", buf); + if (!cmdmatch(response, "failed")) + LOGWARNING("Got unparseable ckdb auth response: %s", buf); goto out_fail; } cmd = response; From 407f087c63639a3cedb11958ed863309d011be3d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:04:13 +1100 Subject: [PATCH 24/56] Give an outright fail response if failed msg is received from ckdb on auth --- src/stratifier.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index aeba3757..76356493 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1967,8 +1967,9 @@ static int send_recv_auth(stratum_instance_t *client) LOGINFO("Got ckdb response: %s", buf); if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { - if (!cmdmatch(response, "failed")) - LOGWARNING("Got unparseable ckdb auth response: %s", buf); + if (cmdmatch(response, "failed")) + goto out; + LOGWARNING("Got unparseable ckdb auth response: %s", buf); goto out_fail; } cmd = response; From 98bf62ab67f4ca7287a62bb7d55cd28cebd6ffba Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 18 Jan 2015 21:24:42 +1100 Subject: [PATCH 25/56] Fill enonce1 data properly for stratum reconnect support --- src/stratifier.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 76356493..a31abf7e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1584,6 +1584,16 @@ out: return ret; } +/* Enter holding workbase_lock */ +static void __fill_enonce1data(workbase_t *wb, stratum_instance_t *client) +{ + if (wb->enonce1constlen) + memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); + memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); + __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); +} + /* Create a new enonce1 from the 64 bit enonce1_64 value, using only the number * of bytes we have to work with when we are proxying with a split nonce2. * When the proxy space is less than 32 bits to work with, we look for an @@ -1632,11 +1642,7 @@ static bool new_enonce1(stratum_instance_t *client) } if (ret) client->enonce1_64 = sdata->enonce1u.u64; - if (wb->enonce1constlen) - memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); - memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); - __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); + __fill_enonce1data(wb, client); ck_wunlock(&sdata->workbase_lock); if (unlikely(!ret)) @@ -1685,6 +1691,10 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js if (disconnected_sessionid_exists(sdata, buf, client_id)) { sprintf(client->enonce1, "%016lx", client->enonce1_64); old_match = true; + + ck_rlock(&sdata->workbase_lock); + __fill_enonce1data(sdata->current_workbase, client); + ck_runlock(&sdata->workbase_lock); } } } else From b3d251638f608bb011898c753a9eeca69534ee52 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 16 Jan 2015 13:31:54 +1100 Subject: [PATCH 26/56] Only add authorised clients to disconnected list Conflicts: src/stratifier.c --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index a31abf7e..e9de70e3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1234,7 +1234,7 @@ static void drop_client(sdata_t *sdata, int64_t id) HASH_DEL(sdata->stratum_instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ - if (!client->ckp->proxy && !old_client && client->enonce1_64) + if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); else { if (client->user_instance) From ced22bd5ea04973ab8078f7422f911f39e9b71db Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 10:54:25 +1100 Subject: [PATCH 27/56] Only lock/unlock once in drop_client --- src/stratifier.c | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e9de70e3..da9b545b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1216,6 +1216,8 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client, *tmp; + user_instance_t *instance = NULL; + ckpool_t *ckp = NULL; bool dec = false; LOGINFO("Stratifier dropping client %ld", id); @@ -1225,10 +1227,11 @@ static void drop_client(sdata_t *sdata, int64_t id) if (client) { stratum_instance_t *old_client = NULL; - __inc_instance_ref(client); if (client->authorised) { dec = true; client->authorised = false; + ckp = client->ckp; + instance = client->user_instance; } HASH_DEL(sdata->stratum_instances, client); @@ -1242,18 +1245,8 @@ static void drop_client(sdata_t *sdata, int64_t id) LL_PREPEND(sdata->dead_instances, client); } } - ck_wunlock(&sdata->instance_lock); - - /* Decrease worker count outside of instance_lock to avoid recursive - * locking */ - if (dec) - dec_worker(client->ckp, client->user_instance); - /* Cull old unused clients lazily when there are no more reference * counts for them. */ - ck_wlock(&sdata->instance_lock); - if (client) - __dec_instance_ref(client); LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { if (!client->ref) { LOGINFO("Stratifier discarding instance %ld", client->id); @@ -1264,6 +1257,11 @@ static void drop_client(sdata_t *sdata, int64_t id) } } ck_wunlock(&sdata->instance_lock); + + /* Decrease worker count outside of instance_lock to avoid recursive + * locking */ + if (dec) + dec_worker(ckp, instance); } static void stratum_broadcast_message(sdata_t *sdata, const char *msg) From 71e60437194f32efd151a9241dc91bad27b80d8a Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 15:11:15 +1100 Subject: [PATCH 28/56] Perform back off and retry on failed realloc in jansson strbuffer_append_bytes instead of failing --- src/jansson-2.6/src/strbuffer.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/jansson-2.6/src/strbuffer.c b/src/jansson-2.6/src/strbuffer.c index c8abf46e..b93231cd 100644 --- a/src/jansson-2.6/src/strbuffer.c +++ b/src/jansson-2.6/src/strbuffer.c @@ -11,6 +11,7 @@ #include #include +#include #include "jansson_private.h" #include "strbuffer.h" @@ -74,6 +75,7 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size) { if(size >= strbuff->size - strbuff->length) { + int backoff = 1; size_t new_size; char *new_value; @@ -86,9 +88,13 @@ int strbuffer_append_bytes(strbuffer_t *strbuff, const char *data, size_t size) new_size = max(strbuff->size * STRBUFFER_FACTOR, strbuff->length + size + 1); - new_value = realloc(strbuff->value, new_size); - if(!new_value) - return -1; + while (42) { + new_value = realloc(strbuff->value, new_size); + if (new_value) + break; + usleep(backoff * 1000); + backoff <<= 1; + } strbuff->value = new_value; strbuff->size = new_size; From ef805811d2b5ae9d51d20c9cbae29064481a04c7 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 15:14:03 +1100 Subject: [PATCH 29/56] Update copyright years on affected files --- src/ckpool.c | 2 +- src/ckpool.h | 2 +- src/jansson-2.6/src/strbuffer.c | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 60374273..c34b6055 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free diff --git a/src/ckpool.h b/src/ckpool.h index 0b28aa40..83424fc5 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free diff --git a/src/jansson-2.6/src/strbuffer.c b/src/jansson-2.6/src/strbuffer.c index b93231cd..86be2b38 100644 --- a/src/jansson-2.6/src/strbuffer.c +++ b/src/jansson-2.6/src/strbuffer.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2009-2013 Petri Lehtinen + * Copyright (c) 2015 Con Kolivas * * Jansson is free software; you can redistribute it and/or modify * it under the terms of the MIT license. See LICENSE for details. From 17e5f996d807aa22d38fc74611e245ef15e6d2c7 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 15:31:57 +1100 Subject: [PATCH 30/56] Reinstate add idle, disconnected and dead counts to pool stats --- src/stratifier.c | 183 +++++++++++++++++++++++++++-------------------- 1 file changed, 105 insertions(+), 78 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index da9b545b..57a9ef08 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -41,6 +41,8 @@ struct pool_stats { int workers; int users; + int disconnected; + int dead; /* Absolute shares stats */ int64_t unaccounted_shares; @@ -856,6 +858,12 @@ static void update_base(ckpool_t *ckp, int prio) create_pthread(pth, do_update, ur); } +static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) +{ + HASH_DEL(sdata->disconnected_instances, client); + sdata->stats.disconnected--; +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; @@ -868,8 +876,9 @@ static void drop_allclients(ckpool_t *ckp) sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } - HASH_ITER(hh, sdata->disconnected_instances, client, tmp) - HASH_DEL(sdata->disconnected_instances, client); + HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { + __del_disconnected(sdata, client); + } sdata->stats.users = sdata->stats.workers = 0; ck_wunlock(&sdata->instance_lock); } @@ -1213,6 +1222,18 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) mutex_unlock(&sdata->stats_lock); } +static void __add_dead(sdata_t *sdata, stratum_instance_t *client) +{ + LL_PREPEND(sdata->dead_instances, client); + sdata->stats.dead++; +} + +static void __del_dead(sdata_t *sdata, stratum_instance_t *client) +{ + LL_DELETE(sdata->dead_instances, client); + sdata->stats.dead--; +} + static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client, *tmp; @@ -1237,12 +1258,13 @@ static void drop_client(sdata_t *sdata, int64_t id) HASH_DEL(sdata->stratum_instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ - if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) + if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); - else { + sdata->stats.disconnected++; + } else { if (client->user_instance) DL_DELETE(client->user_instance->instances, client); - LL_PREPEND(sdata->dead_instances, client); + __add_dead(sdata, client); } } /* Cull old unused clients lazily when there are no more reference @@ -1250,7 +1272,7 @@ static void drop_client(sdata_t *sdata, int64_t id) LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { if (!client->ref) { LOGINFO("Stratifier discarding instance %ld", client->id); - LL_DELETE(sdata->dead_instances, client); + __del_dead(sdata, client); free(client->workername); free(client->useragent); free(client); @@ -3484,6 +3506,7 @@ static void *statsupdate(void *arg) user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; double sps1, sps5, sps15, sps60; + int idle_workers = 0; char fname[512] = {}; tv_t now, diff; ts_t ts_now; @@ -3496,78 +3519,6 @@ static void *statsupdate(void *arg) timersub(&now, &stats->start_time, &diff); tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000; - ghs1 = stats->dsps1 * nonces; - suffix_string(ghs1, suffix1, 16, 0); - sps1 = stats->sps1; - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 300); - ghs5 = stats->dsps5 * nonces / bias; - sps5 = stats->sps5 / bias; - suffix_string(ghs5, suffix5, 16, 0); - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 900); - ghs15 = stats->dsps15 * nonces / bias; - suffix_string(ghs15, suffix15, 16, 0); - sps15 = stats->sps15 / bias; - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 3600); - ghs60 = stats->dsps60 * nonces / bias; - sps60 = stats->sps60 / bias; - suffix_string(ghs60, suffix60, 16, 0); - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 21600); - ghs360 = stats->dsps360 * nonces / bias; - suffix_string(ghs360, suffix360, 16, 0); - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 86400); - ghs1440 = stats->dsps1440 * nonces / bias; - suffix_string(ghs1440, suffix1440, 16, 0); - - bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 604800); - ghs10080 = stats->dsps10080 * nonces / bias; - suffix_string(ghs10080, suffix10080, 16, 0); - - snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); - fp = fopen(fname, "we"); - if (unlikely(!fp)) - LOGERR("Failed to fopen %s", fname); - - JSON_CPACK(val, "{si,si,si}", - "runtime", diff.tv_sec, - "Users", stats->users, - "Workers", stats->workers); - s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); - json_decref(val); - LOGNOTICE("Pool:%s", s); - fprintf(fp, "%s\n", s); - dealloc(s); - - JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss,ss}", - "hashrate1m", suffix1, - "hashrate5m", suffix5, - "hashrate15m", suffix15, - "hashrate1hr", suffix60, - "hashrate6hr", suffix360, - "hashrate1d", suffix1440, - "hashrate7d", suffix10080); - s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); - json_decref(val); - LOGNOTICE("Pool:%s", s); - fprintf(fp, "%s\n", s); - dealloc(s); - - JSON_CPACK(val, "{sf,sf,sf,sf}", - "SPS1m", sps1, - "SPS5m", sps5, - "SPS15m", sps15, - "SPS1h", sps60); - s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); - json_decref(val); - LOGNOTICE("Pool:%s", s); - fprintf(fp, "%s\n", s); - dealloc(s); - fclose(fp); - ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (!client->authorised) @@ -3582,6 +3533,7 @@ static void *statsupdate(void *arg) decay_time(&client->dsps60, 0, per_tdiff, 3600); decay_time(&client->dsps1440, 0, per_tdiff, 86400); decay_time(&client->dsps10080, 0, per_tdiff, 604800); + idle_workers++; if (per_tdiff > 600) client->idle = true; continue; @@ -3687,6 +3639,81 @@ static void *statsupdate(void *arg) } ck_runlock(&sdata->instance_lock); + ghs1 = stats->dsps1 * nonces; + suffix_string(ghs1, suffix1, 16, 0); + sps1 = stats->sps1; + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 300); + ghs5 = stats->dsps5 * nonces / bias; + sps5 = stats->sps5 / bias; + suffix_string(ghs5, suffix5, 16, 0); + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 900); + ghs15 = stats->dsps15 * nonces / bias; + suffix_string(ghs15, suffix15, 16, 0); + sps15 = stats->sps15 / bias; + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 3600); + ghs60 = stats->dsps60 * nonces / bias; + sps60 = stats->sps60 / bias; + suffix_string(ghs60, suffix60, 16, 0); + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 21600); + ghs360 = stats->dsps360 * nonces / bias; + suffix_string(ghs360, suffix360, 16, 0); + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 86400); + ghs1440 = stats->dsps1440 * nonces / bias; + suffix_string(ghs1440, suffix1440, 16, 0); + + bias = !CKP_STANDALONE(ckp) ? 1.0 : time_bias(tdiff, 604800); + ghs10080 = stats->dsps10080 * nonces / bias; + suffix_string(ghs10080, suffix10080, 16, 0); + + snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); + fp = fopen(fname, "we"); + if (unlikely(!fp)) + LOGERR("Failed to fopen %s", fname); + + JSON_CPACK(val, "{si,si,si,si,si,si}", + "runtime", diff.tv_sec, + "Users", stats->users, + "Workers", stats->workers, + "Idle", idle_workers, + "Disconnected", stats->disconnected, + "Dead", stats->dead); + s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); + json_decref(val); + LOGNOTICE("Pool:%s", s); + fprintf(fp, "%s\n", s); + dealloc(s); + + JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss,ss}", + "hashrate1m", suffix1, + "hashrate5m", suffix5, + "hashrate15m", suffix15, + "hashrate1hr", suffix60, + "hashrate6hr", suffix360, + "hashrate1d", suffix1440, + "hashrate7d", suffix10080); + s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); + json_decref(val); + LOGNOTICE("Pool:%s", s); + fprintf(fp, "%s\n", s); + dealloc(s); + + JSON_CPACK(val, "{sf,sf,sf,sf}", + "SPS1m", sps1, + "SPS5m", sps5, + "SPS15m", sps15, + "SPS1h", sps60); + s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); + json_decref(val); + LOGNOTICE("Pool:%s", s); + fprintf(fp, "%s\n", s); + dealloc(s); + fclose(fp); + ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); JSON_CPACK(val, "{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}", From 36d814d48446a1dd03f7410d573db9c75800e66b Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 15:47:39 +1100 Subject: [PATCH 31/56] Add clients to dead list when dropping all clients instead of losing all references to them --- src/stratifier.c | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 57a9ef08..37e4ea1e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -864,6 +864,18 @@ static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) sdata->stats.disconnected--; } +static void __add_dead(sdata_t *sdata, stratum_instance_t *client) +{ + LL_PREPEND(sdata->dead_instances, client); + sdata->stats.dead++; +} + +static void __del_dead(sdata_t *sdata, stratum_instance_t *client) +{ + LL_DELETE(sdata->dead_instances, client); + sdata->stats.dead--; +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; @@ -873,11 +885,13 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_DEL(sdata->stratum_instances, client); + __add_dead(sdata, client); sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { __del_disconnected(sdata, client); + __add_dead(sdata, client); } sdata->stats.users = sdata->stats.workers = 0; ck_wunlock(&sdata->instance_lock); @@ -1222,18 +1236,6 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) mutex_unlock(&sdata->stats_lock); } -static void __add_dead(sdata_t *sdata, stratum_instance_t *client) -{ - LL_PREPEND(sdata->dead_instances, client); - sdata->stats.dead++; -} - -static void __del_dead(sdata_t *sdata, stratum_instance_t *client) -{ - LL_DELETE(sdata->dead_instances, client); - sdata->stats.dead--; -} - static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client, *tmp; From 34853c598db82e4c43e9f45d63de829ac30fa370 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Mon, 19 Jan 2015 15:58:30 +1100 Subject: [PATCH 32/56] Back off and retry on failed realloc in read_socket_line instead of failing. --- src/ckpool.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index c34b6055..5f775fc5 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -404,6 +404,8 @@ int read_socket_line(connsock_t *cs, int timeout) while (42) { char readbuf[PAGESIZE] = {}; + int backoff = 1; + char *newbuf; ret = wait_read_select(fd, eom ? 0 : timeout); if (eom && !ret) @@ -422,7 +424,16 @@ int read_socket_line(connsock_t *cs, int timeout) goto out; } buflen = cs->bufofs + ret + 1; - cs->buf = realloc(cs->buf, buflen); + while (42) { + newbuf = realloc(cs->buf, buflen); + if (likely(newbuf)) + break; + if (backoff == 1) + fprintf(stderr, "Failed to realloc %d in read_socket_line, retrying\n", (int)buflen); + cksleep_ms(backoff); + backoff <<= 1; + } + cs->buf = newbuf; if (unlikely(!cs->buf)) quit(1, "Failed to alloc buf of %d bytes in read_socket_line", (int)buflen); memcpy(cs->buf + cs->bufofs, readbuf, ret); From 2d688ddb079a4b7951c6a3e8d12df7fc701cfd5f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 14 Jan 2015 20:10:51 +1100 Subject: [PATCH 33/56] Push version number to 0.8.1 --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index 65d311c0..3983bd0d 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.8, kernel@kolivas.org) +AC_INIT(ckpool, 0.8.1, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) From 50f48aa5e3dba75ffb9fde25d367ac1d60238a72 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 19:21:08 +1100 Subject: [PATCH 34/56] Push version to 0.8.2 --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index 3983bd0d..d183ca0b 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.8.1, kernel@kolivas.org) +AC_INIT(ckpool, 0.8.2, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) From cfe29a1e9217c5dc92c2ecfcd473db2b1167db25 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 19:54:31 +1100 Subject: [PATCH 35/56] Fix stratum resume support to work with any supported enonce1 sizes --- src/stratifier.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 37e4ea1e..c0771cd5 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1128,33 +1128,34 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int return instance; } -/* Only supports a full ckpool instance sessionid with an 8 byte sessionid */ -static bool disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id) +static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id) { stratum_instance_t *instance, *tmp; - uint64_t session64; - bool ret = false; + uint64_t enonce1_64 = 0, ret = 0; + int slen; if (!sessionid) goto out; - if (strlen(sessionid) != 16) + slen = strlen(sessionid) / 2; + if (slen < 1 || slen > 8) goto out; + /* Number is in BE but we don't swap either of them */ - hex2bin(&session64, sessionid, 8); + hex2bin(&enonce1_64, sessionid, slen); ck_rlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, instance, tmp) { if (instance->id == id) continue; - if (instance->enonce1_64 == session64) { + if (instance->enonce1_64 == enonce1_64) { /* Only allow one connected instance per enonce1 */ goto out_unlock; } } instance = NULL; - HASH_FIND(hh, sdata->disconnected_instances, &session64, sizeof(uint64_t), instance); + HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance); if (instance) - ret = true; + ret = enonce1_64; out_unlock: ck_runlock(&sdata->instance_lock); out: @@ -1710,7 +1711,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js buf = json_string_value(json_array_get(params_val, 1)); LOGDEBUG("Found old session id %s", buf); /* Add matching here */ - if (disconnected_sessionid_exists(sdata, buf, client_id)) { + if ((client->enonce1_64 = disconnected_sessionid_exists(sdata, buf, client_id))) { sprintf(client->enonce1, "%016lx", client->enonce1_64); old_match = true; @@ -1728,11 +1729,11 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js client->reject = 2; return json_string("proxy full"); } - LOGINFO("Set new subscription %ld to new enonce1 %s", client->id, - client->enonce1); + LOGINFO("Set new subscription %ld to new enonce1 %lx string %s", client->id, + client->enonce1_64, client->enonce1); } else { - LOGINFO("Set new subscription %ld to old matched enonce1 %s", client->id, - client->enonce1); + LOGINFO("Set new subscription %ld to old matched enonce1 %lx string %s", + client->id, client->enonce1_64, client->enonce1); } ck_rlock(&sdata->workbase_lock); From 1b5e73e57be6bccf1b38dda1663095e471fb5e5d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 21:28:08 +1100 Subject: [PATCH 36/56] Fix issues with freeing ram safely on dropping clients and converting them from disconnected to dead clients by removing their instance every time and freeing dead client ram safely when it will no longer be used by the loop. --- src/stratifier.c | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index c0771cd5..c92ad794 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -858,12 +858,6 @@ static void update_base(ckpool_t *ckp, int prio) create_pthread(pth, do_update, ur); } -static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) -{ - HASH_DEL(sdata->disconnected_instances, client); - sdata->stats.disconnected--; -} - static void __add_dead(sdata_t *sdata, stratum_instance_t *client) { LL_PREPEND(sdata->dead_instances, client); @@ -876,6 +870,13 @@ static void __del_dead(sdata_t *sdata, stratum_instance_t *client) sdata->stats.dead--; } +static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) +{ + HASH_DEL(sdata->disconnected_instances, client); + sdata->stats.disconnected--; + __add_dead(sdata, client); +} + static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; @@ -891,7 +892,6 @@ static void drop_allclients(ckpool_t *ckp) } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { __del_disconnected(sdata, client); - __add_dead(sdata, client); } sdata->stats.users = sdata->stats.workers = 0; ck_wunlock(&sdata->instance_lock); @@ -1143,7 +1143,7 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio /* Number is in BE but we don't swap either of them */ hex2bin(&enonce1_64, sessionid, slen); - ck_rlock(&sdata->instance_lock); + ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, instance, tmp) { if (instance->id == id) continue; @@ -1154,10 +1154,14 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio } instance = NULL; HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance); - if (instance) + if (instance) { + /* Delete the entry once we are going to use it since there + * will be a new instance with the enonce1_64 */ + __del_disconnected(sdata, instance); ret = enonce1_64; + } out_unlock: - ck_runlock(&sdata->instance_lock); + ck_wunlock(&sdata->instance_lock); out: return ret; } @@ -1239,7 +1243,7 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) static void drop_client(sdata_t *sdata, int64_t id) { - stratum_instance_t *client, *tmp; + stratum_instance_t *client, *tmp, *client_delete = NULL; user_instance_t *instance = NULL; ckpool_t *ckp = NULL; bool dec = false; @@ -1259,28 +1263,33 @@ static void drop_client(sdata_t *sdata, int64_t id) } HASH_DEL(sdata->stratum_instances, client); + if (client->user_instance) + DL_DELETE(client->user_instance->instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); sdata->stats.disconnected++; } else { - if (client->user_instance) - DL_DELETE(client->user_instance->instances, client); __add_dead(sdata, client); } } /* Cull old unused clients lazily when there are no more reference * counts for them. */ LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { + /* We can't delete the ram safely in this loop, even if we can + * safely remove the entry from the linked list so we do it on + * the next pass through the loop. */ + dealloc(client_delete); if (!client->ref) { LOGINFO("Stratifier discarding instance %ld", client->id); __del_dead(sdata, client); free(client->workername); free(client->useragent); - free(client); + client_delete = client; } } + dealloc(client_delete); ck_wunlock(&sdata->instance_lock); /* Decrease worker count outside of instance_lock to avoid recursive From 99dc8947d28688a46f6beaa8646bdb48ff4e62f4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 21:37:26 +1100 Subject: [PATCH 37/56] Add message about lost ram in block solve and reject. --- src/stratifier.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index c92ad794..e9585512 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1349,6 +1349,8 @@ static void reset_bestshares(sdata_t *sdata) ck_runlock(&sdata->instance_lock); } +/* Ram from blocks is NOT freed at all for now, only their entry is removed + * from the linked list, leaving a very small leak here and reject. */ static void block_solve(ckpool_t *ckp, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; From 330c966556a5edaea1ef50097b70a556534d3911 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 21:42:25 +1100 Subject: [PATCH 38/56] Remove client ram safely in invalidate_client foreach loop --- src/connector.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index c23173e1..6c89b02c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -238,7 +238,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) * count. */ static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { - client_instance_t *tmp; + client_instance_t *tmp, *client_delete = NULL; drop_client(cdata, client); if (ckp->passthrough) @@ -249,12 +249,15 @@ static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t * * counts for them. */ ck_wlock(&cdata->lock); LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { + /* Don't free client ram when loop may still access it */ + dealloc(client_delete); if (!client->ref) { LL_DELETE(cdata->dead_clients, client); LOGINFO("Connector discarding client %ld", client->id); - free(client); + client_delete = client; } } + dealloc(client_delete); ck_wunlock(&cdata->lock); } From b5f65d2e5a9060dc201566a7a2521a1adcc885aa Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 13 Jan 2015 16:30:52 +1100 Subject: [PATCH 39/56] Tell the stratifier to drop a client if the connector is told to drop it and cannot find it any more --- src/connector.c | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/connector.c b/src/connector.c index 6c89b02c..2a4b9b64 100644 --- a/src/connector.c +++ b/src/connector.c @@ -677,19 +677,6 @@ retry: * so look for them first. */ if (likely(buf[0] == '{')) { process_client_msg(cdata, buf); - } else if (cmdmatch(buf, "ping")) { - LOGDEBUG("Connector received ping request"); - send_unix_msg(sockd, "pong"); - } else if (cmdmatch(buf, "accept")) { - LOGDEBUG("Connector received accept signal"); - cdata->accept = true; - } else if (cmdmatch(buf, "reject")) { - LOGDEBUG("Connector received reject signal"); - cdata->accept = false; - } else if (cmdmatch(buf, "loglevel")) { - sscanf(buf, "loglevel=%d", &ckp->loglevel); - } else if (cmdmatch(buf, "shutdown")) { - goto out; } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; @@ -702,12 +689,26 @@ retry: client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %ld to drop", client_id); + stratifier_drop_client(ckp, client_id); goto retry; } ret = drop_client(cdata, client); dec_instance_ref(cdata, client); if (ret >= 0) LOGINFO("Connector dropped client id: %ld", client_id); + } else if (cmdmatch(buf, "ping")) { + LOGDEBUG("Connector received ping request"); + send_unix_msg(sockd, "pong"); + } else if (cmdmatch(buf, "accept")) { + LOGDEBUG("Connector received accept signal"); + cdata->accept = true; + } else if (cmdmatch(buf, "reject")) { + LOGDEBUG("Connector received reject signal"); + cdata->accept = false; + } else if (cmdmatch(buf, "loglevel")) { + sscanf(buf, "loglevel=%d", &ckp->loglevel); + } else if (cmdmatch(buf, "shutdown")) { + goto out; } else if (cmdmatch(buf, "passthrough")) { client_instance_t *client; From 259f8ca4a0094e38f93e5a6c4eaa7928f378c096 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 13 Jan 2015 16:49:16 +1100 Subject: [PATCH 40/56] Use invalidate in the connector when dropping a client to ensure the stratifier drops it as well Conflicts: src/connector.c --- src/connector.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2a4b9b64..811e20b1 100644 --- a/src/connector.c +++ b/src/connector.c @@ -236,13 +236,14 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) * regularly but keep the instances in a linked list until their ref count * drops to zero when we can remove them lazily. Client must hold a reference * count. */ -static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) +static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client) { client_instance_t *tmp, *client_delete = NULL; + int ret; - drop_client(cdata, client); + ret = drop_client(cdata, client); if (ckp->passthrough) - return; + goto out; stratifier_drop_client(ckp, client->id); /* Cull old unused clients lazily when there are no more reference @@ -259,6 +260,9 @@ static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t * } dealloc(client_delete); ck_wunlock(&cdata->lock); + +out: + return ret; } static void send_client(cdata_t *cdata, int64_t id, char *buf); @@ -692,7 +696,7 @@ retry: stratifier_drop_client(ckp, client_id); goto retry; } - ret = drop_client(cdata, client); + ret = invalidate_client(ckp, cdata, client); dec_instance_ref(cdata, client); if (ret >= 0) LOGINFO("Connector dropped client id: %ld", client_id); From bf1ac3b57b7533053fa0daf21f8374137adf7c72 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 19 Jan 2015 22:02:25 +1100 Subject: [PATCH 41/56] Age disconnected instances over 10 minutes old to the dead list to be reaped --- src/stratifier.c | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index e9585512..91c9bb84 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -260,6 +260,7 @@ struct stratum_instance { ckpool_t *ckp; time_t last_txns; /* Last time this worker requested txn hashes */ + time_t disconnected_time; /* Time this instance disconnected */ int64_t suggest_diff; /* Stratum client suggested diff */ double best_diff; /* Best share found by this instance */ @@ -860,18 +861,21 @@ static void update_base(ckpool_t *ckp, int prio) static void __add_dead(sdata_t *sdata, stratum_instance_t *client) { + LOGDEBUG("Adding dead instance %ld", client->id); LL_PREPEND(sdata->dead_instances, client); sdata->stats.dead++; } static void __del_dead(sdata_t *sdata, stratum_instance_t *client) { + LOGDEBUG("Deleting dead instance %ld", client->id); LL_DELETE(sdata->dead_instances, client); sdata->stats.dead--; } static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) { + LOGDEBUG("Deleting disconnected instance %ld", client->id); HASH_DEL(sdata->disconnected_instances, client); sdata->stats.disconnected--; __add_dead(sdata, client); @@ -1245,6 +1249,7 @@ static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client, *tmp, *client_delete = NULL; user_instance_t *instance = NULL; + time_t now_t = time(NULL); ckpool_t *ckp = NULL; bool dec = false; @@ -1268,12 +1273,25 @@ static void drop_client(sdata_t *sdata, int64_t id) HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { + LOGDEBUG("Adding disconnected instance %ld", client->id); HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); sdata->stats.disconnected++; + client->disconnected_time = time(NULL); } else { __add_dead(sdata, client); } } + + /* Old disconnected instances will not have any valid shares so remove + * them from the disconnected instances list if they've been dead for + * more than 10 minutes */ + HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { + if (now_t - client->disconnected_time < 600) + continue; + LOGINFO("Ageing disconnected instance %ld to dead", client->id); + __del_disconnected(sdata, client); + } + /* Cull old unused clients lazily when there are no more reference * counts for them. */ LL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { @@ -1282,7 +1300,7 @@ static void drop_client(sdata_t *sdata, int64_t id) * the next pass through the loop. */ dealloc(client_delete); if (!client->ref) { - LOGINFO("Stratifier discarding instance %ld", client->id); + LOGINFO("Stratifier discarding dead instance %ld", client->id); __del_dead(sdata, client); free(client->workername); free(client->useragent); From e5b9e841294464b13aa6770a3502ef247b1bb485 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 20 Jan 2015 11:09:49 +1100 Subject: [PATCH 42/56] Add a helper function to check a hex string is valid --- src/libckpool.c | 26 +++++++++++++++++++++++++- src/libckpool.h | 4 +++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 5d992e2e..d5873d8c 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -1351,6 +1351,30 @@ const int hex2bin_tbl[256] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, }; +bool _validhex(const char *buf, const char *file, const char *func, const int line) +{ + unsigned int i, slen; + bool ret = false; + + slen = strlen(buf); + if (!slen || slen % 2) { + LOGDEBUG("Invalid hex due to length %u from %s %s:%d", slen, file, func, line); + goto out; + } + for (i = 0; i < slen; i++) { + uchar idx = buf[i]; + + if (hex2bin_tbl[idx] == -1) { + LOGDEBUG("Invalid hex due to value %u at offset %d from %s %s:%d", + idx, i, file, func, line); + goto out; + } + } + ret = true; +out: + return ret; +} + /* Does the reverse of bin2hex but does not allocate any ram */ bool _hex2bin(void *vp, const void *vhexstr, size_t len, const char *file, const char *func, const int line) { diff --git a/src/libckpool.h b/src/libckpool.h index a52628fe..65fb4ae4 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -495,6 +495,8 @@ void _dealloc(void **ptr); extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); +bool _validhex(const char *buf, const char *file, const char *func, const int line); +#define validhex(buf) _validhex(buf, __FILE__, __func__, __LINE__) bool _hex2bin(void *p, const void *vhexstr, size_t len, const char *file, const char *func, const int line); #define hex2bin(p, vhexstr, len) _hex2bin(p, vhexstr, len, __FILE__, __func__, __LINE__) char *http_base64(const char *src); From 9892f3af6441264ca32633fedd366b13861addce Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 20 Jan 2015 11:22:24 +1100 Subject: [PATCH 43/56] Use validhex before checking sessionid in disconnected_sessionid_exists to avoid spewing warnings for client errors --- src/stratifier.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/stratifier.c b/src/stratifier.c index 91c9bb84..1c2d673f 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1144,6 +1144,9 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio if (slen < 1 || slen > 8) goto out; + if (!validhex(sessionid)) + goto out; + /* Number is in BE but we don't swap either of them */ hex2bin(&enonce1_64, sessionid, slen); From b97132dde3956989fac27d72ebba3c9041086900 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 01:52:47 +1100 Subject: [PATCH 44/56] Zero wb->logdir even if we don't write to the share log since we still use the value of it --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1c2d673f..c66cad82 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -647,7 +647,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) wb->network_diff = diff_from_nbits(wb->headerbin + 72); len = strlen(ckp->logdir) + 8 + 1 + 16 + 1; - wb->logdir = ckalloc(len); + wb->logdir = ckzalloc(len); /* In proxy mode, the wb->id is received in the notify update and * we set workbase_id from it. In server mode the stratifier is From 9d067777240c0ff1be75e35b6fca1dc70248a0a2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:06:03 +1100 Subject: [PATCH 45/56] Do not attempt to decref a freed passthrough client --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index c66cad82..18a625aa 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3023,7 +3023,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); free(client); - goto out; + return; } if (cmdmatch(method, "mining.auth") && client->subscribed) { From 09c7086808d8a62abfedf0c20fe621b00683f4c5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:10:16 +1100 Subject: [PATCH 46/56] Fix impossible patch -p1 -i crap.patch warning in bind_socket --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index d5873d8c..03784cf7 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -589,7 +589,7 @@ int bind_socket(char *url, char *port) if (sockd > 0) break; } - if (sockd < 0) { + if (sockd < 0 || p == NULL) { LOGWARNING("Failed to open socket for %s:%s", url, port); goto out; } From ff01a1c414ef7714acc7274b3bdc806cf2532f99 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:11:55 +1100 Subject: [PATCH 47/56] Fix impossible sockd < 1 scenario --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 03784cf7..757c7414 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -589,7 +589,7 @@ int bind_socket(char *url, char *port) if (sockd > 0) break; } - if (sockd < 0 || p == NULL) { + if (sockd < 1 || p == NULL) { LOGWARNING("Failed to open socket for %s:%s", url, port); goto out; } From cc0d18857397a5e0f8f0e1c93077bb93b6a5b71c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:14:38 +1100 Subject: [PATCH 48/56] Fix impossible len >= UNIX_PATH_MAX warning --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 757c7414..8c367bbb 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -726,7 +726,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun if (likely(server_path)) { len = strlen(server_path); - if (unlikely(len < 1 || len > UNIX_PATH_MAX)) { + if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) { LOGERR("Invalid server path length %d in open_unix_server", len); goto out; } From 827f42ba739a8aee3519e8657307558f417cb91f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:15:13 +1100 Subject: [PATCH 49/56] Fix impossible >= UNIX_PATH_MAX warning --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 8c367bbb..f2937a05 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -793,7 +793,7 @@ int _open_unix_client(const char *server_path, const char *file, const char *fun if (likely(server_path)) { len = strlen(server_path); - if (unlikely(len < 1 || len > UNIX_PATH_MAX)) { + if (unlikely(len < 1 || len >= UNIX_PATH_MAX)) { LOGERR("Invalid server path length %d in open_unix_client", len); goto out; } From d6d81093beff952c9b1fb69d9143101c715055c6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 02:17:53 +1100 Subject: [PATCH 50/56] Fix impossible warnings in drop_client --- src/stratifier.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 18a625aa..8e3c6264 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1301,12 +1301,13 @@ static void drop_client(sdata_t *sdata, int64_t id) /* We can't delete the ram safely in this loop, even if we can * safely remove the entry from the linked list so we do it on * the next pass through the loop. */ - dealloc(client_delete); + if (client != client_delete) + dealloc(client_delete); if (!client->ref) { LOGINFO("Stratifier discarding dead instance %ld", client->id); __del_dead(sdata, client); - free(client->workername); - free(client->useragent); + dealloc(client->workername); + dealloc(client->useragent); client_delete = client; } } From 3a40311ece14ea916e92726e1c9a7f2f4a1efda0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 12:29:33 +1100 Subject: [PATCH 51/56] Passthrough clients should be rare so increase verbosity of them being added --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 8e3c6264..b7687b0c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3020,7 +3020,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val HASH_DEL(sdata->stratum_instances, client); ck_wunlock(&sdata->instance_lock); - LOGINFO("Adding passthrough client %ld", client->id); + LOGNOTICE("Adding passthrough client %ld", client->id); snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); free(client); From 712d5c38d6fe455352ee8cd629e89ac659857351 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 12:37:23 +1100 Subject: [PATCH 52/56] Make sure a passthrough client is still in the hash table before deleting it --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index b7687b0c..89f964cb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3017,7 +3017,8 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val * Remove this instance since the client id may well be * reused */ ck_wlock(&sdata->instance_lock); - HASH_DEL(sdata->stratum_instances, client); + if (likely(__instance_by_id(sdata, client_id))) + HASH_DEL(sdata->stratum_instances, client); ck_wunlock(&sdata->instance_lock); LOGNOTICE("Adding passthrough client %ld", client->id); From bb8593bfa0cd7d9662421f9670ab86853dd91fd5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 13:02:56 +1100 Subject: [PATCH 53/56] Take the same attendant precautions when deleting passthrough client ram as regular clients by adding them to the dead list --- src/stratifier.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 89f964cb..7c05def1 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3019,13 +3019,13 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val ck_wlock(&sdata->instance_lock); if (likely(__instance_by_id(sdata, client_id))) HASH_DEL(sdata->stratum_instances, client); + __add_dead(sdata, client); ck_wunlock(&sdata->instance_lock); LOGNOTICE("Adding passthrough client %ld", client->id); snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); - free(client); - return; + goto out; } if (cmdmatch(method, "mining.auth") && client->subscribed) { From b7186cfcd963dda47c8b052136a0c7db898f9f78 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 13:09:38 +1100 Subject: [PATCH 54/56] Use cached instance variable --- src/stratifier.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 7c05def1..9f758fbe 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1263,16 +1263,16 @@ static void drop_client(sdata_t *sdata, int64_t id) if (client) { stratum_instance_t *old_client = NULL; + instance = client->user_instance; if (client->authorised) { dec = true; client->authorised = false; ckp = client->ckp; - instance = client->user_instance; } HASH_DEL(sdata->stratum_instances, client); - if (client->user_instance) - DL_DELETE(client->user_instance->instances, client); + if (instance) + DL_DELETE(instance->instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && dec) { From f11250dd2d31d2d4537e0d54bb07df768ac86f7a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 14:15:29 +1100 Subject: [PATCH 55/56] Do not move clients from hash/linked lists while they hold a ref count --- src/stratifier.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9f758fbe..39a2583d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1260,7 +1260,7 @@ static void drop_client(sdata_t *sdata, int64_t id) ck_wlock(&sdata->instance_lock); client = __instance_by_id(sdata, id); - if (client) { + if (client && likely(!client->ref)) { stratum_instance_t *old_client = NULL; instance = client->user_instance; @@ -1291,6 +1291,8 @@ static void drop_client(sdata_t *sdata, int64_t id) HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { if (now_t - client->disconnected_time < 600) continue; + if (unlikely(client->ref)) + continue; LOGINFO("Ageing disconnected instance %ld to dead", client->id); __del_disconnected(sdata, client); } From f34516bd4d29da360d4c7311a09a6d92050937ad Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 21 Jan 2015 15:20:56 +1100 Subject: [PATCH 56/56] Free buffer used for authorisation message --- src/stratifier.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stratifier.c b/src/stratifier.c index 39a2583d..678062d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3254,6 +3254,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, client->user_instance->username); stratum_send_message(sdata, client, buf); + free(buf); } else { if (errnum < 0) stratum_send_message(sdata, client, "Authorisations temporarily offline :(");