From 01784be9efc18e12cb6f076cd899e56e9ca069fd Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 17 Mar 2016 22:24:34 +1100 Subject: [PATCH 01/15] ckdb - ignore duplicate high shares based on the DB index --- src/ckdb.c | 4 ++-- src/ckdb.h | 3 ++- src/ckdb_data.c | 32 ++++++++++++++++++++++++++++++++ src/ckdb_dbio.c | 12 +++++------- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 30141ca4..b47578ac 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1204,8 +1204,8 @@ static void alloc_storage() shares_root = new_ktree(NULL, cmp_shares, shares_free); shares_early_root = new_ktree("SharesEarly", cmp_shares, shares_free); shares_hi_store = k_new_store(shares_free); - shares_hi_root = new_ktree("SharesHi", cmp_shares, shares_free); - shares_db_root = new_ktree("SharesDB", cmp_shares, shares_free); + shares_hi_root = new_ktree("SharesHi", cmp_shares_db, shares_free); + shares_db_root = new_ktree("SharesDB", cmp_shares_db, shares_free); shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS), ALLOC_SHAREERRORS, LIMIT_SHAREERRORS, diff --git a/src/ckdb.h b/src/ckdb.h index f19b174a..031f79d5 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.984" +#define CKDB_VERSION DB_VERSION"-1.985" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2814,6 +2814,7 @@ extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, extern double coinbase_reward(int32_t height); extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid); extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); +extern cmp_t cmp_shares_db(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b); extern void dsp_sharesummary(K_ITEM *item, FILE *stream); extern cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index c8e8b3c9..77cdca55 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -2333,6 +2333,38 @@ cmp_t cmp_shares(K_ITEM *a, K_ITEM *b) return c; } +/* order by workinfoid asc,userid asc,workername asc,enonce1 asc,nonce2 asc, + * nonce asc,expirydate desc + * i.e. match the DB table index so duplicates are ignored and all new shares_db + * can always go in the DB */ +cmp_t cmp_shares_db(K_ITEM *a, K_ITEM *b) +{ + SHARES *sa, *sb; + DATA_SHARES(sa, a); + DATA_SHARES(sb, b); + cmp_t c = CMP_BIGINT(sa->workinfoid, sb->workinfoid); + if (c == 0) { + c = CMP_BIGINT(sa->userid, sb->userid); + if (c == 0) { + c = CMP_STR(sa->workername, sb->workername); + if (c == 0) { + c = CMP_STR(sa->enonce1, sb->enonce1); + if (c == 0) { + c = CMP_STR(sa->nonce2, sb->nonce2); + if (c == 0) { + c = CMP_STR(sa->nonce, sb->nonce); + if (c == 0) { + c = CMP_TV(sb->expirydate, + sa->expirydate); + } + } + } + } + } + } + return c; +} + // order by workinfoid asc,userid asc,createdate asc,nonce asc,expirydate desc cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b) { diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 8d707403..c7747561 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3625,7 +3625,7 @@ discard: static void shareerrors_process_early(PGconn *conn, int64_t good_wid, tv_t *good_cd, K_TREE *trf_root); -// DB Shares are stored by by the summariser to ensure the reload is correct +// DB Shares are stored by the summariser to ensure the reload is correct bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff, char *secondaryuserid, @@ -3640,7 +3640,6 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername USERS *users; bool ok = false; char *st = NULL; - int errn_int; LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld", __func__, @@ -3649,13 +3648,10 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername FREENULL(st); TXT_TO_DOUBLE("sdiff", sdiff, sdiff_amt); - TXT_TO_INT("errn", errn, errn_int); K_WLOCK(shares_free); s_item = k_unlink_head(shares_free); - // Don't store duplicates since they will already exist - if (errn_int != SE_DUPE && share_min_sdiff > 0 && - sdiff_amt >= share_min_sdiff) + if (share_min_sdiff > 0 && sdiff_amt >= share_min_sdiff) s2_item = k_unlink_head(shares_free); K_WUNLOCK(shares_free); @@ -3731,7 +3727,9 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername add_to_ktree(shares_early_root, s_item); k_add_head(shares_early_store, s_item); if (s2_item) { - // Just ignore duplicates + /* Just ignore duplicates - this matches the DB index + N.B. a duplicate share doesn't have to be SE_DUPE, + two shares can be SE_NONE and SE_STALE */ tmp_item = find_in_ktree(shares_db_root, s2_item, ctx); if (tmp_item == NULL) { // Store them in advance - always From 418a898c8e103d7f6dc183db12b0076d68b7c003 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 17 Mar 2016 22:41:44 +1100 Subject: [PATCH 02/15] ckdb - optionally ignore seq values with -I - don't use when connected to ckpool --- src/ckdb.c | 33 ++++++++++++++++++++++----------- src/ckdb.h | 2 +- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index b47578ac..d8decf1c 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -128,6 +128,7 @@ static char *status_chars = "|/-\\"; static char *restorefrom; +static bool ignore_seq = false; bool genpayout_auto; bool markersummary_auto; @@ -2680,6 +2681,9 @@ static enum cmd_values process_seq(MSGLINE *msgline) bool dupall, dupcmd; char *st = NULL; + if (ignore_seq) + return ckdb_cmds[msgline->which_cmds].cmd_val; + dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, msgline->n_seqpid, SEQALL, &(msgline->now), &(msgline->cd), msgline->code, @@ -3095,7 +3099,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, if (confirm_check_createdate) check_createdate_ccl(msgline->cmd, &(msgline->cd)); if (seqall) { - setup_seq(seqall, msgline); + if (!ignore_seq) + setup_seq(seqall, msgline); free(cmdptr); return ckdb_cmds[msgline->which_cmds].cmd_val; } else { @@ -5821,7 +5826,7 @@ static void check_restore_dir(char *name) static struct option long_options[] = { // script to call when alerts happen - { "alert", required_argument, 0, 'c' }, + { "alert", required_argument, 0, 'a' }, { "config", required_argument, 0, 'c' }, { "dbname", required_argument, 0, 'd' }, { "minsdiff", required_argument, 0, 'D' }, @@ -5830,6 +5835,9 @@ static struct option long_options[] = { { "generate", no_argument, 0, 'g' }, { "help", no_argument, 0, 'h' }, { "pool-instance", required_argument, 0, 'i' }, + // only use 'I' for reloading lots of known valid data via CKDB, + // DON'T use when connected to ckpool + { "ignore-seq", required_argument, 0, 'I' }, { "killold", no_argument, 0, 'k' }, { "loglevel", required_argument, 0, 'l' }, // marker = enable mark/workmarker/markersummary auto generation @@ -5880,7 +5888,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:I:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'a': len = strlen(optarg); @@ -5921,14 +5929,6 @@ int main(int argc, char **argv) optarg); } break; - /* WARNING - enabling -i will require a DB data update - * if you've used ckdb before 1.920 - * All (old) marks and workmarkers in the DB will need - * to have poolinstance set to the given -i value - * since they will be blank */ - case 'i': - poolinstance = (const char *)strdup(optarg); - break; case 'g': genpayout_auto = true; break; @@ -5949,6 +5949,17 @@ int main(int argc, char **argv) printf("-%c | --%s\n", jopt->val, jopt->name); } exit(0); + /* WARNING - enabling -i will require a DB data update + * if you've used ckdb before 1.920 + * All (old) marks and workmarkers in the DB will need + * to have poolinstance set to the given -i value + * since they will be blank */ + case 'i': + poolinstance = (const char *)strdup(optarg); + break; + case 'I': + ignore_seq = true; + break; case 'k': ckp.killold = true; break; diff --git a/src/ckdb.h b/src/ckdb.h index 031f79d5..52aa3a04 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.985" +#define CKDB_VERSION DB_VERSION"-1.986" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From a30a75bd3fd03779863600f6b379a4894ef87cbc Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 18 Mar 2016 18:51:51 +1100 Subject: [PATCH 03/15] ckdb - load only one day of shares - or specify a begin workinfo with -b --- src/ckdb.c | 24 +++++++++++++++++++++++- src/ckdb.h | 5 ++++- src/ckdb_dbio.c | 46 ++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index d8decf1c..75085444 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -436,6 +436,7 @@ K_STORE *shares_hi_store; double diff_percent = DIFF_VAL(DIFF_PERCENT_DEFAULT); double share_min_sdiff = 0; +int64_t shares_begin = -1; // SHAREERRORS shareerrors.id.json={...} K_TREE *shareerrors_root; @@ -1728,6 +1729,9 @@ static bool setup_data() mutex_init(&wq_waitlock); cond_init(&wq_waitcond); + LOGWARNING("%sSequence processing is %s", + ignore_seq ? "ALERT: " : "", + ignore_seq ? "Off" : "On"); LOGWARNING("%sStartup payout generation state is %s", genpayout_auto ? "" : "WARNING: ", genpayout_auto ? "On" : "Off"); @@ -2684,6 +2688,12 @@ static enum cmd_values process_seq(MSGLINE *msgline) if (ignore_seq) return ckdb_cmds[msgline->which_cmds].cmd_val; + /* If non-seqall data was in a CCL reload file, + * it can't be processed by update_seq(), so don't */ + if (msgline->n_seqall == 0 && msgline->n_seqstt == 0 && + msgline->n_seqpid == 0) + return ckdb_cmds[msgline->which_cmds].cmd_val; + dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt, msgline->n_seqpid, SEQALL, &(msgline->now), &(msgline->cd), msgline->code, @@ -5827,6 +5837,8 @@ static void check_restore_dir(char *name) static struct option long_options[] = { // script to call when alerts happen { "alert", required_argument, 0, 'a' }, + // workinfo to start shares_fill() default is 1 day + { "shares-begin", required_argument, 0, 'b' }, { "config", required_argument, 0, 'c' }, { "dbname", required_argument, 0, 'd' }, { "minsdiff", required_argument, 0, 'D' }, @@ -5888,7 +5900,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:I:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'a': len = strlen(optarg); @@ -5898,6 +5910,16 @@ int main(int argc, char **argv) (int)len, MAX_ALERT_CMD); ckdb_alert_cmd = strdup(optarg); break; + case 'b': + { + int64_t beg = atoll(optarg); + if (beg < 0) { + quit(1, "Invalid shares begin " + "%"PRId64" - must be >= 0", + beg); + } + shares_begin = beg; + } case 'c': ckp.config = strdup(optarg); break; diff --git a/src/ckdb.h b/src/ckdb.h index 52aa3a04..0d831a6b 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.986" +#define CKDB_VERSION DB_VERSION"-1.987" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1723,6 +1723,9 @@ extern double diff_percent; * This is set only via the runtime parameter -D or --minsdiff */ extern double share_min_sdiff; +// workinfoid to start loading shares, unset = shares_fill() decides +extern int64_t shares_begin; + // SHAREERRORS shareerrors.id.json={...} typedef struct shareerrors { int64_t workinfoid; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index c7747561..80117809 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3858,16 +3858,51 @@ bool shares_fill(PGconn *conn) { ExecStatusType rescode; PGresult *res; - K_ITEM *item = NULL; + K_TREE_CTX ctx[1]; + K_ITEM *item = NULL, *wi_item; + WORKINFO *workinfo = NULL; SHARES *row; int n, t, i; char *field; char *sel = NULL; - int fields = 14; + char *params[1]; + int fields = 14, par = 0; bool ok = false; + int64_t workinfoid; + tv_t old; LOGDEBUG("%s(): select", __func__); + if (shares_begin >= 0) + workinfoid = shares_begin; + else { + /* Workinfo is already loaded + * CKDB doesn't currently use shares_db in processing, + * but make sure we have enough to avoid loading duplicates + * 1 day should be more than enough for normal running, + * however, if more than 1 day is needed, + * use -b to set the shares_begin workinfoid */ + setnow(&old); + old.tv_sec -= 60 * 60 * 24; // 1 day + K_RLOCK(workinfo_free); + wi_item = last_in_ktree(workinfo_root, ctx); + while (wi_item) { + DATA_WORKINFO(workinfo, wi_item); + if (!tv_newer(&old, &(workinfo->createdate))) + break; + wi_item = prev_in_ktree(ctx); + } + if (wi_item) + workinfoid = workinfo->workinfoid; + else { + // none old enough, so just load from them all + workinfoid = 0; + } + K_RUNLOCK(workinfo_free); + } + + LOGWARNING("%s(): loading from workinfoid>=%"PRId64, __func__, workinfoid); + printf(TICK_PREFIX"sh 0\r"); fflush(stdout); @@ -3875,7 +3910,10 @@ bool shares_fill(PGconn *conn) "workinfoid,userid,workername,clientid,enonce1,nonce2,nonce," "diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff" HISTORYDATECONTROL - " from shares"; + " from shares where workinfoid>=$1"; + par = 0; + params[par++] = bigint_to_buf(workinfoid, NULL, 0); + PARCHK(par, params); res = PQexec(conn, "Begin", CKPQ_READ); rescode = PQresultStatus(res); @@ -3893,7 +3931,7 @@ bool shares_fill(PGconn *conn) goto flail; } - res = PQexec(conn, sel, CKPQ_READ); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { From 4d7a54d12e88b86a0b373b0322585fe94d5ed03b Mon Sep 17 00:00:00 2001 From: kanoi Date: Fri, 18 Mar 2016 21:55:54 +1100 Subject: [PATCH 04/15] ckdb - modify -M to allow more options - days or shifts --- src/ckdb.c | 27 ++++++++++++++-- src/ckdb.h | 5 +-- src/ckdb_dbio.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 108 insertions(+), 9 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 75085444..5ed5b1d5 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -595,7 +595,8 @@ K_TREE *markersummary_pool_root; K_STORE *markersummary_pool_store; // The markerid load start for markersummary -char *mark_start = NULL; +char mark_start_type = '\0'; +int64_t mark_start = -1; // WORKMARKERS K_TREE *workmarkers_root; @@ -5996,7 +5997,29 @@ int main(int argc, char **argv) markersummary_auto = true; break; case 'M': - mark_start = strdup(optarg); + { + bool ok = true; + switch (optarg[0]) { + case 'D': // Days * mark_start + mark_start_type = 'D'; + mark_start = atoll(optarg+1); + break; + case 'S': // Shifts * mark_start + mark_start_type = 'S'; + mark_start = atoll(optarg+1); + break; + case 'M': // Markerid = mark_start + mark_start_type = 'M'; + mark_start = atoll(optarg+1); + break; + default: + ok = false; + break; + } + if (!ok || mark_start <= 0) + quit(1, "Invalid -M must be D, S or" + " M followed by a number>0"); + } break; case 'n': ckp.name = strdup(optarg); diff --git a/src/ckdb.h b/src/ckdb.h index 0d831a6b..9e2f71eb 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.987" +#define CKDB_VERSION DB_VERSION"-1.988" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2428,7 +2428,8 @@ extern K_TREE *markersummary_pool_root; extern K_STORE *markersummary_pool_store; // The markerid load start for markersummary -extern char *mark_start; +extern char mark_start_type; +extern int64_t mark_start; // WORKMARKERS typedef struct workmarkers { diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 80117809..f4c33b50 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -7755,17 +7755,86 @@ bool markersummary_fill(PGconn *conn) { ExecStatusType rescode; PGresult *res; - K_ITEM *item = NULL, *p_item; + K_ITEM *item = NULL, *p_item, *wm_item = NULL; + K_TREE_CTX ctx[1]; + char cd_buf[DATE_BUFSIZ]; + char *cd = NULL, *what = NULL; int n, t, i, p_n; MARKERSUMMARY *row, *p_row; + WORKMARKERS *workmarkers; char *params[1]; char *field; char *sel; int fields = 20, par = 0; + int64_t ms = 0, amt = 0; bool ok = false; + tv_t old; LOGDEBUG("%s(): select", __func__); + if (mark_start < 0) + mark_start = 0; + else { + amt = ms = mark_start; + switch (mark_start_type) { + case 'D': // mark_start days + setnow(&old); + old.tv_sec -= 60 * 60 * 24 * ms; + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_root, ctx); + while (wm_item) { + // Newest processed workmarker <= old + DATA_WORKMARKERS(workmarkers, wm_item); + if (CURRENT(&(workmarkers->expirydate)) && + WMPROCESSED(workmarkers->status) && + !tv_newer(&old, &(workmarkers->createdate))) + break; + wm_item = prev_in_ktree(ctx); + } + if (!wm_item) + mark_start = 0; + else { + mark_start = workmarkers->markerid; + tv_to_buf(&(workmarkers->createdate), + cd_buf, sizeof(cd_buf)); + cd = cd_buf; + what = "days"; + } + K_RUNLOCK(workmarkers_free); + break; + case 'S': // mark_start shifts (workmarkers) + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_root, ctx); + while (wm_item) { + DATA_WORKMARKERS(workmarkers, wm_item); + if (CURRENT(&(workmarkers->expirydate)) && + WMPROCESSED(workmarkers->status)) { + ms--; + if (ms <= 0) + break; + } + wm_item = prev_in_ktree(ctx); + } + if (!wm_item) + mark_start = 0; + else { + mark_start = workmarkers->markerid; + tv_to_buf(&(workmarkers->createdate), + cd_buf, sizeof(cd_buf)); + cd = cd_buf; + what = "shifts"; + } + K_RUNLOCK(workmarkers_free); + break; + case 'M': // markerid = mark_start + break; + default: + /* Not possible unless ckdb.c is different + * in which case it will just use mark_start */ + break; + } + } + // TODO: limit how far back sel = "declare ws cursor for select " "markerid,userid,workername,diffacc,diffsta,diffdup,diffhi," @@ -7774,14 +7843,16 @@ bool markersummary_fill(PGconn *conn) "lastshareacc,lastdiffacc" MODIFYDATECONTROL " from markersummary where markerid>=$1"; + par = 0; - if (mark_start) - params[par++] = mark_start; - else - params[par++] = "0"; + params[par++] = bigint_to_buf(mark_start, NULL, 0); PARCHK(par, params); LOGWARNING("%s(): loading from markerid>=%s", __func__, params[0]); + if (cd) { + LOGWARNING(" ... %s = %s >= %"PRId64" %s", + params[0], cd, amt, what); + } printf(TICK_PREFIX"ms 0\r"); fflush(stdout); @@ -8016,6 +8087,10 @@ flail: res = PQexec(conn, "Commit", CKPQ_READ); PQclear(res); + for (i = 0; i < par; i++) + free(params[i]); + par = 0; + if (ok) { LOGDEBUG("%s(): built", __func__); LOGWARNING("%s(): fetched %d markersummary records", __func__, n); From 069760266eac38f8c0dbc454a98e37a22fcae248 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 21 Mar 2016 22:55:15 +1100 Subject: [PATCH 05/15] ckdb - add the ovents list to cmd_events --- src/ckdb.h | 2 +- src/ckdb_cmd.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 9e2f71eb..c3ababa0 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.988" +#define CKDB_VERSION DB_VERSION"-1.989" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 6b41ae28..45120afe 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -7612,8 +7612,9 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *cd, K_TREE *trf_root) { K_ITEM *i_action, *i_cmd, *i_list, *i_ip, *i_eventname, *i_lifetime; - K_ITEM *i_des, *i_item, *next_item; + K_ITEM *i_des, *i_item, *next_item, *o_item; K_TREE_CTX ctx[1]; + OVENTS *ovents; IPS *ips; char *action, *alert_cmd, *list, *ip, *eventname, *des; char reply[1024] = ""; @@ -7621,7 +7622,7 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, char tmp[1024] = ""; char *buf = NULL; size_t len, off; - int i, rows, oldlife, lifetime; + int i, rows, oldlife, lifetime, vid, min; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); @@ -8034,6 +8035,50 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, APPEND_REALLOC_INIT(buf, off, len); snprintf(tmp, sizeof(tmp), "ok.expired %d", rows); APPEND_REALLOC(buf, off, len, tmp); + } else if (strcasecmp(action, "ovents") == 0) { + /* List the ovent tree contents + * Output can be large - check web Admin->ckp for tree sizes */ + bool got; + APPEND_REALLOC_INIT(buf, off, len); + APPEND_REALLOC(buf, off, len, "ok."); + rows = 0; + K_RLOCK(ovents_free); + o_item = first_in_ktree(ovents_root, ctx); + while (o_item) { + DATA_OVENTS(ovents, o_item); + for (vid = 0; o_limits[vid].name; vid++) { + got = false; + for (min = 0; min < 60; min++) { + if (ovents->count[IDMIN(vid, min)]) { + if (!got) { + snprintf(reply, siz, "key:%d=%s%c", + rows, ovents->key, FLDSEP); + APPEND_REALLOC(buf, off, len, reply); + snprintf(reply, siz, "id:%d=%d%c", + rows, vid, FLDSEP); + APPEND_REALLOC(buf, off, len, reply); + snprintf(reply, siz, "idname:%d=%s%c", + rows, o_limits[vid].name, FLDSEP); + APPEND_REALLOC(buf, off, len, reply); + snprintf(reply, siz, "hour:%d=%d%c", + rows, ovents->hour, FLDSEP); + APPEND_REALLOC(buf, off, len, reply); + got = true; + } + snprintf(reply, siz, "min%02d:%d=%d%c", + min, rows, ovents->count[IDMIN(vid, min)], + FLDSEP); + APPEND_REALLOC(buf, off, len, reply); + } + } + if (got) + rows++; + } + o_item = next_in_ktree(ctx); + } + K_RUNLOCK(ovents_free); + snprintf(tmp, sizeof(tmp), "rows=%d", rows); + APPEND_REALLOC(buf, off, len, tmp); } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s() %s.%s", __func__, id, reply); From 060e4c15ef17c6bba230ee5ce6b2ef7f6370ade7 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 21 Mar 2016 23:13:18 +1100 Subject: [PATCH 06/15] php - add the ovents list to page_events --- pool/page_events.php | 53 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/pool/page_events.php b/pool/page_events.php index ead6d036..8f15bc18 100644 --- a/pool/page_events.php +++ b/pool/page_events.php @@ -142,6 +142,59 @@ What: } } + if ($wh == 'ovents') + { + $ans = eventCmd($user, array('action' => 'ovents')); + + $pg .= "

\n"; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= "\n"; + + if ($ans['STATUS'] == 'ok') + { + $pg .= ''; + $count = $ans['rows']; + for ($i = 0; $i < $count; $i++) + { + if (($i % 2) == 0) + $row = 'even'; + else + $row = 'odd'; + + $j = $i+1; + $pg .= ""; + $pg .= ""; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $pg .= ''; + $co = ''; + for ($k = 0; $k < 60; $k++) + { + if ($k < 10) + $min = '0' . $k; + else + $min = $k; + if (isset($ans["min$min:$i"])) + { + if ($co != '') + $co .= ' '; + $co .= "$min=".$ans["min$min:$i"]; + } + } + $pg .= ""; + $pg .= "\n"; + } + $pg .= ''; + } + } + return $pg; } # From 68fe4fd301d2a7181da01ebc685ed11c2862bf1c Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 22 Mar 2016 08:01:46 +1100 Subject: [PATCH 07/15] ckdb/ckp - add the idname to the web events list --- pool/page_events.php | 10 ++++++---- src/ckdb.h | 2 +- src/ckdb_cmd.c | 5 +++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pool/page_events.php b/pool/page_events.php index 8f15bc18..b03a2b42 100644 --- a/pool/page_events.php +++ b/pool/page_events.php @@ -108,7 +108,8 @@ What: $pg .= ''; $pg .= ''; $pg .= ''; - $pg .= ''; + $pg .= ''; + $pg .= ''; $pg .= ''; $pg .= ''; $pg .= ''; @@ -131,7 +132,8 @@ What: $pg .= ""; $pg .= ''; $pg .= ''; - $pg .= ''; + $pg .= ''; + $pg .= ''; $pg .= ''; $pg .= ''; $pg .= ''; @@ -151,7 +153,7 @@ What: $pg .= ''; $pg .= ''; $pg .= ''; - $pg .= ''; + $pg .= ''; $pg .= ''; $pg .= ''; $pg .= "\n"; @@ -172,7 +174,7 @@ What: $pg .= ""; $pg .= ''; $pg .= ''; - $pg .= ''; + $pg .= ''; $pg .= ''; $co = ''; for ($k = 0; $k < 60; $k++) diff --git a/src/ckdb.h b/src/ckdb.h index c3ababa0..9f1579a4 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.989" +#define CKDB_VERSION DB_VERSION"-1.990" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 45120afe..3c451657 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -7570,11 +7570,12 @@ static void event_tree(K_TREE *the_tree, char *list, char *reply, size_t siz, snprintf(reply, siz, "list:%d=%s%c", *rows, list, FLDSEP); APPEND_REALLOC(*buf, *off, *len, reply); - snprintf(reply, siz, "id:%d=%d%c", *rows, e->id, FLDSEP); APPEND_REALLOC(*buf, *off, *len, reply); - + snprintf(reply, siz, "idname:%d=%s%c", + *rows, e_limits[e->id].name, FLDSEP); + APPEND_REALLOC(*buf, *off, *len, reply); snprintf(reply, siz, "user:%d=%s%c", *rows, e->createby, FLDSEP); APPEND_REALLOC(*buf, *off, *len, reply); From 774fd95a1869937aa3916a7bfae20f886c3c186c Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 22 Mar 2016 08:49:00 +1100 Subject: [PATCH 08/15] php - add 4 more workers to the shift graph --- pool/page_usperf.php | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pool/page_usperf.php b/pool/page_usperf.php index d99591b0..813808c7 100644 --- a/pool/page_usperf.php +++ b/pool/page_usperf.php @@ -30,7 +30,8 @@ function dousperf($data, $user) // This also defines how many worker fields there are $cols = array('#0000c0', '#00dd00', '#e06020', '#b020e0'); - $nc = count($cols); + $cols2 = array('#2090e0', '#e0c040', '#ff6090', '#90e040'); + $nc = count($cols)+count($cols2); $workers = 'all'; if (isset($_COOKIE['workers'])) @@ -93,9 +94,24 @@ function dousperf($data, $user) $datacols .= ','; $datacols .= $col; } - $oncl = "wch();location.href=\"".makeURL('usperf')."\""; $pg .= "
"; + + # the rest of the workers/colours go below the graph + $pg2 = '
'; + foreach ($cols2 as $col) + { + $i++; + $pg2 .= " Worker$i"; + $pg2 .= ":"; + $pg2 .= " "; + + if ($i > 1) + $datacols .= ','; + $datacols .= $col; + } + $pg2 .= "\n"; + foreach ($cbx as $nam => $txt) { $pg .= ' '; @@ -107,6 +123,7 @@ function dousperf($data, $user) $pg .= '
'; $pg .= 'A graph will show here if your browser supports html5/canvas'; $pg .= "
\n"; + $pg .= $pg2; $data = str_replace(array("\\","'"), array("\\\\","\\'"), $ans['DATA']); $data .= $fld_sep . 'cols' . $val_sep . $datacols; $pg .= "\n"; $pg .= "Show Details for Invalids:
"; - $pg .= "
#KeyIDIDNameHour UTCCount
$j'.$ans['key:'.$i].''.$ans['id:'.$i].''.$ans['idname:'.$i].''.gmdate('j/M H:i:s',$ans['hour:'.$i]*3600).'$co
#ListIDUserIDNameUserIPIPcHash$j'.$ans['list:'.$i].''.$ans['id:'.$i].''.$ans['user:'.$i].''.$ans['idname:'.$i].''.$ans['user:'.$i].''.isans($ans, 'ip:'.$i).''.isans($ans, 'ipc:'.$i).''.isans($ans, 'hash:'.$i).'#KeyIDIDNameIDNameHour UTCCount
$j'.$ans['key:'.$i].''.$ans['id:'.$i].''.$ans['idname:'.$i].''.$ans['idname:'.$i].''.gmdate('j/M H:i:s',$ans['hour:'.$i]*3600).'
\n"; + $pg .= "
\n"; return $pg; } # diff --git a/pool/page_workmgt.php b/pool/page_workmgt.php index b716e20b..87f60c63 100644 --- a/pool/page_workmgt.php +++ b/pool/page_workmgt.php @@ -23,7 +23,7 @@ function workmgtuser($data, $user, $err) } $pg .= makeForm('workmgt'); - $pg .= "
\n"; + $pg .= "
\n"; $pg .= ''; $pg .= ''; $pg .= ''; From 48b60b91d3a336f9b3304f799b8bcd777dfcba52 Mon Sep 17 00:00:00 2001 From: Alex Ordonez Date: Wed, 23 Mar 2016 00:20:26 -0400 Subject: [PATCH 10/15] Fix count check for page name $count -> count --- pool/page.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool/page.php b/pool/page.php index 32c3e7a0..d7560284 100644 --- a/pool/page.php +++ b/pool/page.php @@ -25,7 +25,7 @@ function getPage() return ''; $vals = explode('&', trim($names[1])); - if ($count($vals) < 1) + if (count($vals) < 1) return ''; return trim($vals[0]); From 57aaad447f4c00f024d943b8abb9d3a0f7528752 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 28 Mar 2016 00:48:46 +1100 Subject: [PATCH 11/15] ckdb - thread breakdown, limit it's ram usage, close/open db connections during db load, report new stats during reload every 15s --- src/ckdb.c | 1128 +++++++++++++++++++++++++++++++---------------- src/ckdb.h | 61 ++- src/ckdb_cmd.c | 162 ++++--- src/ckdb_dbio.c | 5 +- 4 files changed, 911 insertions(+), 445 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 5ed5b1d5..f2f58c18 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -28,9 +28,9 @@ * with an ok.queued reply to ckpool, to be processed after the reload * completes and just process authorise messages immediately while the * reload runs - * We start the ckpool message queue after loading - * the users, idcontrol and workers DB tables, before loading the - * much larger DB tables so that ckdb is effectively ready for messages + * However, we start the ckpool message queue after loading + * the optioncontrol, users, workers and useratts DB tables, before loading + * the much larger DB tables, so that ckdb is effectively ready for messages * almost immediately * The first ckpool message allows us to know where ckpool is up to * in the CCLs - see reload_from() for how this is handled @@ -47,8 +47,10 @@ * complete='a' (or 'y') and were deleted from RAM * If there are none with complete='n' but are others in the DB, * then the newest firstshare is used + * DB shares: no current processing done with the shares_hi tree inside + * CKDB. DB load gets the past 1 day to resolve duplicates * RAM shareerrors: as above - * DB+RAM sharesummary: created from shares, so as above + * RAM sharesummary: created from shares, so as above * Some shares after this may have been summarised to other * sharesummary complete='n', but for any such sharesummary * we reset it back to the first share found and it will @@ -112,6 +114,15 @@ static bool logger_using_data; static bool plistener_using_data; static bool clistener_using_data; static bool blistener_using_data; +static bool breakdown_using_data; + +// -B to override calculated value +static int breakdown_threads = -1; +static int reload_breakdown_count = 0; +static int cmd_breakdown_count = 0; +/* Lock for access to *breakdown_count + * Any change to/from 0 will update breakdown_using_data */ +static cklock_t breakdown_lock; char *EMPTY = ""; const char *nullstr = "(null)"; @@ -332,6 +343,17 @@ K_STORE *logqueue_store; K_LIST *msgline_free; K_STORE *msgline_store; +// BREAKQUEUE +K_LIST *breakqueue_free; +K_STORE *reload_breakqueue_store; +K_STORE *reload_done_breakqueue_store; +K_STORE *cmd_breakqueue_store; +K_STORE *cmd_done_breakqueue_store; +// Locked access with breakqueue_free +static int reload_processing; +static int sockd_count; +int max_sockd_count; + // WORKQUEUE K_LIST *workqueue_free; K_STORE *pool_workqueue_store; @@ -916,8 +938,12 @@ static bool getdata3() if (!(ok = miningpayouts_fill(conn)) || everyone_die) goto sukamudai; } + PQfinish(conn); + conn = dbconnect(); if (!(ok = workinfo_fill(conn)) || everyone_die) goto sukamudai; + PQfinish(conn); + conn = dbconnect(); if (!(ok = marks_fill(conn)) || everyone_die) goto sukamudai; /* must be after workinfo */ @@ -928,8 +954,12 @@ static bool getdata3() if (!(ok = payouts_fill(conn)) || everyone_die) goto sukamudai; } + PQfinish(conn); + conn = dbconnect(); if (!(ok = markersummary_fill(conn)) || everyone_die) goto sukamudai; + PQfinish(conn); + conn = dbconnect(); if (!(ok = shares_fill(conn)) || everyone_die) goto sukamudai; if (!confirm_sharesummary && !everyone_die) @@ -1691,10 +1721,17 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); + // TODO: msgline FREE_STORE(pool_workqueue); FREE_STORE(cmd_workqueue); FREE_STORE(btc_workqueue); FREE_LIST(workqueue); + // TODO: sockets/buf/msgline + FREE_STORE(cmd_done_breakqueue); + FREE_STORE(cmd_breakqueue); + FREE_STORE(reload_done_breakqueue); + FREE_STORE(reload_breakqueue); + FREE_LIST(breakqueue); FREE_LISTS(msgline); if (free_mode != FREE_MODE_ALL) @@ -3160,6 +3197,137 @@ nogood: return CMD_REPLY; } +static void *breaker(void *arg) +{ + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + char buf[128]; + int thr, zeros; + bool reload, was_zero, msg = false; + int queue_sleep, queue_limit, count; + + pthread_detach(pthread_self()); + + // Is this a reload thread or a cmd thread? + reload = *(bool *)(arg); + if (reload) { + queue_limit = RELOAD_QUEUE_LIMIT; + queue_sleep = RELOAD_QUEUE_SLEEP; + } else { + queue_limit = CMD_QUEUE_LIMIT; + queue_sleep = CMD_QUEUE_SLEEP; + } + + ck_wlock(&breakdown_lock); + if (reload) + thr = ++reload_breakdown_count; + else + thr = ++cmd_breakdown_count; + breakdown_using_data = true; + ck_wunlock(&breakdown_lock); + + if (breakdown_threads < 10) + zeros = 1; + else + zeros = (int)log10(breakdown_threads) + 1; + + snprintf(buf, sizeof(buf), "db_%c%0*d%s", + reload ? 'r' : 'c', zeros, thr, __func__); + LOCK_INIT(buf); + rename_proc(buf); + + if (reload) { + /* reload has to wait for the reload to start, however, also + * check for startup_complete in case we missed the reload */ + while (!everyone_die && !reloading && !startup_complete) + cksleep_ms(queue_sleep); + } + + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = NULL; + was_zero = false; + if (reload) + count = reload_done_breakqueue_store->count; + else + count = cmd_done_breakqueue_store->count; + + // Don't unlink if we are above the limit + if (count <= queue_limit) { + if (reload) + bq_item = k_unlink_head(reload_breakqueue_store); + else + bq_item = k_unlink_head(cmd_breakqueue_store); + if (!bq_item) + was_zero = true; + } + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + // Is the queue empty and the reload completed? + if (was_zero && reload && !reloading) + break; + + cksleep_ms(queue_sleep); + continue; + } + + DATA_BREAKQUEUE(bq, bq_item); + + if (reload) { + bool matched = false; + ck_wlock(&fpm_lock); + if (first_pool_message && + strcmp(first_pool_message, bq->buf) == 0) { + matched = true; + FREENULL(first_pool_message); + } + ck_wunlock(&fpm_lock); + if (matched) { + LOGERR("%s() reload ckpool queue match at line %"PRIu64, + __func__, bq->count); + } + } + + bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags); + K_WLOCK(breakqueue_free); + if (reload) + k_add_tail(reload_done_breakqueue_store, bq_item); + else + k_add_tail(cmd_done_breakqueue_store, bq_item); + + if (breakqueue_free->count == breakqueue_free->total && + breakqueue_free->total >= ALLOC_BREAKQUEUE * CULL_BREAKQUEUE) + k_cull_list(breakqueue_free); + K_WUNLOCK(breakqueue_free); + } + + // Get it now while the lock still exists, in case we need it + K_RLOCK(breakqueue_free); + // Not 100% exact since it could still increase, but close enough + count = max_sockd_count; + K_RUNLOCK(breakqueue_free); + + ck_wlock(&breakdown_lock); + if (reload) + reload_breakdown_count--; + else + cmd_breakdown_count--; + + if ((reload_breakdown_count + cmd_breakdown_count) < 1) { + breakdown_using_data = false; + msg = true; + } + ck_wunlock(&breakdown_lock); + + if (msg) { + LOGWARNING("%s() threads shut down - max_sockd_count=%d", + __func__, count); + } + + return NULL; +} + static void check_blocks() { K_TREE_CTX ctx[1]; @@ -4160,7 +4328,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) workqueue->code, workqueue->inet, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, false); siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); snprintf(rep, siz, "%s.%ld.%s", @@ -4168,6 +4336,9 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) msgline->now.tv_sec, ans); send_unix_msg(msgline->sockd, rep); close(msgline->sockd); + K_WLOCK(breakqueue_free); + sockd_count--; + K_WUNLOCK(breakqueue_free); FREENULL(ans); FREENULL(rep); @@ -4182,6 +4353,8 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE) k_cull_list(workqueue_free); K_WUNLOCK(workqueue_free); + + tick(); } static void *clistener(__maybe_unused void *arg) @@ -4190,6 +4363,8 @@ static void *clistener(__maybe_unused void *arg) K_ITEM *wq_item; time_t now; + pthread_detach(pthread_self()); + LOCK_INIT("db_clistener"); rename_proc("db_clistener"); @@ -4210,10 +4385,9 @@ static void *clistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) { + if (wq_item) process_sockd(conn, wq_item); - tick(); - } else + else cksleep_ms(42); } @@ -4231,12 +4405,13 @@ static void *blistener(__maybe_unused void *arg) K_ITEM *wq_item; time_t now; + pthread_detach(pthread_self()); + LOCK_INIT("db_blistener"); rename_proc("db_blistener"); blistener_using_data = true; - conn = dbconnect(); now = time(NULL); while (!everyone_die) { @@ -4251,10 +4426,9 @@ static void *blistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) { + if (wq_item) process_sockd(conn, wq_item); - tick(); - } else + else cksleep_ms(142); } @@ -4266,22 +4440,350 @@ static void *blistener(__maybe_unused void *arg) return NULL; } -static void *socketer(__maybe_unused void *arg) +static void *process_socket(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - pthread_t clis_pt, blis_pt; - unixsock_t *us = &pi->us; - char *end, *ans = NULL, *rep = NULL, *buf = NULL, *tmp; - enum cmd_values cmdnum; - int sockd; - K_ITEM *wq_item = NULL, *ml_item = NULL; - WORKQUEUE *workqueue; - MSGLINE *msgline; + K_ITEM *bq_item = NULL, *wq_item = NULL; + WORKQUEUE *workqueue = NULL; + BREAKQUEUE *bq = NULL; + MSGLINE *msgline = NULL; + bool want_first, replied, btc, dec_sockd; + int loglevel, oldloglevel; char reply[1024+1]; + char *ans = NULL, *rep = NULL, *tmp; size_t siz; + + pthread_detach(pthread_self()); + + LOCK_INIT("db_procsock"); + rename_proc("db_procsock"); + + want_first = true; + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(cmd_done_breakqueue_store); + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + cksleep_ms(24); + continue; + } + + DATA_BREAKQUEUE(bq, bq_item); + DATA_MSGLINE(msgline, bq->ml_item); + replied = btc = false; + switch (bq->cmdnum) { + case CMD_REPLY: + snprintf(reply, sizeof(reply), + "%s.%ld.?.", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + case CMD_ALERTEVENT: + case CMD_ALERTOVENT: + snprintf(reply, sizeof(reply), + "%s.%ld.failed.ERR", + msgline->id, + bq->now.tv_sec); + if (bq->cmdnum == CMD_ALERTEVENT) + tmp = reply_event(EVENTID_NONE, reply); + else + tmp = reply_ovent(OVENTID_NONE, reply); + send_unix_msg(bq->sockd, tmp); + FREENULL(tmp); + break; + case CMD_TERMINATE: + LOGWARNING("Listener received" + " terminate message," + " terminating ckdb"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.exiting", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + everyone_die = true; + break; + case CMD_PING: + LOGDEBUG("Listener received ping" + " request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.pong", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + case CMD_VERSION: + LOGDEBUG("Listener received" + " version request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.CKDB V%s", + msgline->id, + bq->now.tv_sec, + CKDB_VERSION); + send_unix_msg(bq->sockd, reply); + break; + case CMD_LOGLEVEL: + if (!*(msgline->id)) { + LOGDEBUG("Listener received" + " loglevel, currently %d", + pi->ckp->loglevel); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.loglevel" + " currently %d", + msgline->id, + bq->now.tv_sec, + pi->ckp->loglevel); + } else { + oldloglevel = pi->ckp->loglevel; + loglevel = atoi(msgline->id); + LOGDEBUG("Listener received loglevel" + " %d currently %d A", + loglevel, oldloglevel); + if (loglevel < LOG_EMERG || + loglevel > LOG_DEBUG) { + snprintf(reply, sizeof(reply), + "%s.%ld.ERR.invalid" + " loglevel %d" + " - currently %d", + msgline->id, + bq->now.tv_sec, + loglevel, + oldloglevel); + } else { + pi->ckp->loglevel = loglevel; + snprintf(reply, sizeof(reply), + "%s.%ld.ok.loglevel" + " now %d - was %d", + msgline->id, + bq->now.tv_sec, + pi->ckp->loglevel, + oldloglevel); + } + // Do this twice since the loglevel may have changed + LOGDEBUG("Listener received loglevel" + " %d currently %d B", + loglevel, oldloglevel); + } + send_unix_msg(bq->sockd, reply); + break; + case CMD_FLUSH: + LOGDEBUG("Listener received" + " flush request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.splash", + msgline->id, bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + fflush(stdout); + fflush(stderr); + if (global_ckp && global_ckp->logfd) + fflush(global_ckp->logfp); + break; + case CMD_USERSET: + case CMD_BTCSET: + btc = true; + case CMD_CHKPASS: + case CMD_2FA: + case CMD_ADDUSER: + case CMD_NEWPASS: + case CMD_WORKERSET: + case CMD_GETATTS: + case CMD_SETATTS: + case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: + case CMD_BLOCKLIST: + case CMD_NEWID: + case CMD_STATS: + case CMD_USERSTATUS: + case CMD_SHSTA: + case CMD_USERINFO: + case CMD_LOCKS: + case CMD_EVENTS: + case CMD_HIGH: + msgline->sockd = bq->sockd; + bq->sockd = -1; + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + break; + // Process, but reject (loading) until startup_complete + case CMD_HOMEPAGE: + case CMD_ALLUSERS: + case CMD_WORKERS: + case CMD_PAYMENTS: + case CMD_PPLNS: + case CMD_PPLNS2: + case CMD_PAYOUTS: + case CMD_MPAYOUTS: + case CMD_SHIFTS: + case CMD_PSHIFT: + case CMD_DSP: + case CMD_BLOCKSTATUS: + case CMD_MARKS: + case CMD_QUERY: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", + msgline->id, + bq->now.tv_sec, + msgline->cmd); + send_unix_msg(bq->sockd, reply); + } else { + msgline->sockd = bq->sockd; + bq->sockd = -1; + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + } + break; + // Always process immediately: + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(bq->buf); + ck_wunlock(&fpm_lock); + } + DATA_MSGLINE(msgline, bq->ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root, false); + siz = strlen(ans) + strlen(msgline->id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + bq->now.tv_sec, ans); + send_unix_msg(bq->sockd, rep); + FREENULL(ans); + FREENULL(rep); + replied = true; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + if (!replied) { + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(bq->buf); + ck_wunlock(&fpm_lock); + } + snprintf(reply, sizeof(reply), + "%s.%ld.ok.queued", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + } + + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + k_add_tail(pool_workqueue_store, wq_item); + /* Stop the reload queue from growing too big + * Use a size that 'should be big enough' */ + while (reloading && pool_workqueue_store->count > 250000) { + K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); + K_WUNLOCK(workqueue_free); + WORKQUEUE *wq; + DATA_WORKQUEUE(wq, wq2_item); + K_ITEM *ml_item = wq->msgline_item; + free_msgline_data(ml_item, true, false); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq2_item); + } + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + mutex_lock(&wq_waitlock); + pthread_cond_signal(&wq_waitcond); + mutex_unlock(&wq_waitlock); + break; + // Code error + default: + LOGEMERG("%s() CODE ERROR unhandled" + " message %d %.32s...", + __func__, bq->cmdnum, bq->buf); + snprintf(reply, sizeof(reply), + "%s.%ld.failed.code", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + } + if (bq->sockd >= 0) { + close(bq->sockd); + dec_sockd = true; + } else + dec_sockd = false; + + if (bq->ml_item) { + free_msgline_data(bq->ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, bq->ml_item); + K_WUNLOCK(msgline_free); + bq->ml_item = NULL; + } + free(bq->buf); + + K_WLOCK(breakqueue_free); + if (dec_sockd) + sockd_count--; + k_add_head(breakqueue_free, bq_item); + K_WUNLOCK(breakqueue_free); + } + + return NULL; +} + +static void *socketer(void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + pthread_t clis_pt, blis_pt, proc_pt; + unixsock_t *us = &pi->us; + char *end, *buf = NULL; + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + int sockd; tv_t now; - bool want_first, replied, btc; - int loglevel, oldloglevel; pthread_detach(pthread_self()); @@ -4298,20 +4800,17 @@ static void *socketer(__maybe_unused void *arg) create_pthread(&clis_pt, clistener, NULL); create_pthread(&blis_pt, blistener, NULL); + + create_pthread(&proc_pt, process_socket, arg); } - want_first = true; while (!everyone_die) { - if (buf) - dealloc(buf); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGERR("%s() Failed to accept on socket", __func__); break; } - cmdnum = CMD_UNSET; - buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2); // Once we've read the message setnow(&now); @@ -4325,358 +4824,85 @@ static void *socketer(__maybe_unused void *arg) // An empty message wont get a reply if (!buf) LOGWARNING("%s() Failed to get message", __func__); - else + else { LOGWARNING("%s() Empty message", __func__); + free(buf); + } } else { int seqentryflags = SE_SOCKET; if (!reload_queue_complete) seqentryflags = SE_EARLYSOCK; - cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); - DATA_MSGLINE(msgline, ml_item); - replied = btc = false; - switch (cmdnum) { - case CMD_REPLY: - snprintf(reply, sizeof(reply), - "%s.%ld.?.", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_ALERTEVENT: - case CMD_ALERTOVENT: - snprintf(reply, sizeof(reply), - "%s.%ld.failed.ERR", - msgline->id, - now.tv_sec); - if (cmdnum == CMD_ALERTEVENT) - tmp = reply_event(EVENTID_NONE, reply); - else - tmp = reply_ovent(OVENTID_NONE, reply); - send_unix_msg(sockd, tmp); - FREENULL(tmp); - break; - case CMD_TERMINATE: - LOGWARNING("Listener received" - " terminate message," - " terminating ckdb"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.exiting", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - everyone_die = true; - break; - case CMD_PING: - LOGDEBUG("Listener received ping" - " request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.pong", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_VERSION: - LOGDEBUG("Listener received" - " version request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.CKDB V%s", - msgline->id, - now.tv_sec, - CKDB_VERSION); - send_unix_msg(sockd, reply); - break; - case CMD_LOGLEVEL: - if (!*(msgline->id)) { - LOGDEBUG("Listener received" - " loglevel, currently %d", - pi->ckp->loglevel); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " currently %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel); - } else { - oldloglevel = pi->ckp->loglevel; - loglevel = atoi(msgline->id); - LOGDEBUG("Listener received loglevel" - " %d currently %d A", - loglevel, oldloglevel); - if (loglevel < LOG_EMERG || - loglevel > LOG_DEBUG) { - snprintf(reply, sizeof(reply), - "%s.%ld.ERR.invalid" - " loglevel %d" - " - currently %d", - msgline->id, - now.tv_sec, - loglevel, - oldloglevel); - } else { - pi->ckp->loglevel = loglevel; - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " now %d - was %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel, - oldloglevel); - } - // Do this twice since the loglevel may have changed - LOGDEBUG("Listener received loglevel" - " %d currently %d B", - loglevel, oldloglevel); - } - send_unix_msg(sockd, reply); - break; - case CMD_FLUSH: - LOGDEBUG("Listener received" - " flush request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.splash", - msgline->id, now.tv_sec); - send_unix_msg(sockd, reply); - fflush(stdout); - fflush(stderr); - if (global_ckp && global_ckp->logfd) - fflush(global_ckp->logfp); - break; - case CMD_USERSET: - case CMD_BTCSET: - btc = true; - case CMD_CHKPASS: - case CMD_2FA: - case CMD_ADDUSER: - case CMD_NEWPASS: - case CMD_WORKERSET: - case CMD_GETATTS: - case CMD_SETATTS: - case CMD_EXPATTS: - case CMD_GETOPTS: - case CMD_SETOPTS: - case CMD_BLOCKLIST: - case CMD_NEWID: - case CMD_STATS: - case CMD_USERSTATUS: - case CMD_SHSTA: - case CMD_USERINFO: - case CMD_LOCKS: - case CMD_EVENTS: - case CMD_HIGH: - msgline->sockd = sockd; - sockd = -1; - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - if (btc) - k_add_tail(btc_workqueue_store, wq_item); - else - k_add_tail(cmd_workqueue_store, wq_item); - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - break; - // Process, but reject (loading) until startup_complete - case CMD_HOMEPAGE: - case CMD_ALLUSERS: - case CMD_WORKERS: - case CMD_PAYMENTS: - case CMD_PPLNS: - case CMD_PPLNS2: - case CMD_PAYOUTS: - case CMD_MPAYOUTS: - case CMD_SHIFTS: - case CMD_PSHIFT: - case CMD_DSP: - case CMD_BLOCKSTATUS: - case CMD_MARKS: - case CMD_QUERY: - if (!startup_complete) { - snprintf(reply, sizeof(reply), - "%s.%ld.loading.%s", - msgline->id, - now.tv_sec, - msgline->cmd); - send_unix_msg(sockd, reply); - } else { - msgline->sockd = sockd; - sockd = -1; - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - if (btc) - k_add_tail(btc_workqueue_store, wq_item); - else - k_add_tail(cmd_workqueue_store, wq_item); - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - } - break; - // Always process immediately: - case CMD_AUTH: - case CMD_ADDRAUTH: - case CMD_HEARTBEAT: - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } - DATA_MSGLINE(msgline, ml_item); - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", - msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - replied = true; - // Always queue (ok.queued) - case CMD_SHARELOG: - case CMD_POOLSTAT: - case CMD_USERSTAT: - case CMD_WORKERSTAT: - case CMD_BLOCK: - if (!replied) { - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } - snprintf(reply, sizeof(reply), - "%s.%ld.ok.queued", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - } - - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - k_add_tail(pool_workqueue_store, wq_item); - /* Stop the reload queue from growing too big - * Use a size that should be big enough */ - if (reloading && pool_workqueue_store->count > 250000) { - K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); - K_WUNLOCK(workqueue_free); - WORKQUEUE *wq; - DATA_WORKQUEUE(wq, wq2_item); - K_ITEM *ml_item = wq->msgline_item; - free_msgline_data(ml_item, true, false); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - K_WLOCK(workqueue_free); - k_add_head(workqueue_free, wq2_item); - } - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - mutex_lock(&wq_waitlock); - pthread_cond_signal(&wq_waitcond); - mutex_unlock(&wq_waitlock); - break; - // Code error - default: - LOGEMERG("%s() CODE ERROR unhandled" - " message %d %.32s...", - __func__, cmdnum, buf); - snprintf(reply, sizeof(reply), - "%s.%ld.failed.code", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - } - } - if (sockd >= 0) - close(sockd); - if (ml_item) { - free_msgline_data(ml_item, true, true); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - ml_item = NULL; + // Don't limit the speed filling up cmd_breakqueue_store + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(breakqueue_free); + // keep the lock since none of these should be slow + DATA_BREAKQUEUE(bq, bq_item); + bq->buf = buf; + copy_tv(&(bq->now), &now); + bq->seqentryflags = seqentryflags; + bq->sockd = sockd; + if (max_sockd_count < ++sockd_count) + max_sockd_count = sockd_count; + k_add_tail(cmd_breakqueue_store, bq_item); + K_WUNLOCK(breakqueue_free); } - - tick(); } socketer_using_data = false; - if (buf) - dealloc(buf); close_unix_socket(us->sockd, us->path); + // Since the socket is dead ... + everyone_die = true; + return NULL; } -static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) +static void *process_reload(__maybe_unused void *arg) { + PGconn *conn = NULL; + MSGLINE *msgline = NULL; + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; enum cmd_values cmdnum; - char *end, *ans, *st = NULL; - MSGLINE *msgline; - K_ITEM *ml_item; - tv_t now; - bool matched; + char *ans, *st = NULL; + time_t now; - // Once we've read the message - setnow(&now); - if (buf) { - end = buf + strlen(buf) - 1; - // strip trailing \n and \r - while (end >= buf && (*end == '\n' || *end == '\r')) - *(end--) = '\0'; - } - if (!buf || !*buf) { - if (!buf) { - LOGERR("%s() NULL message line %"PRIu64, - __func__, count); - } else { - LOGERR("%s() Empty message line %"PRIu64, - __func__, count); - } - } else { - matched = false; - ck_wlock(&fpm_lock); - if (first_pool_message && - strcmp(first_pool_message, buf) == 0) { - matched = true; - FREENULL(first_pool_message); + pthread_detach(pthread_self()); + + LOCK_INIT("db_procreload"); + rename_proc("db_procreload"); + + conn = dbconnect(); + now = time(NULL); + + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(reload_done_breakqueue_store); + if (bq_item) + reload_processing++; + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + // Finished reloading? + if (!reloading) + break; + + cksleep_ms(24); + continue; } - ck_wunlock(&fpm_lock); - if (matched) { - LOGERR("%s() reload ckpool queue match at line %"PRIu64, - __func__, count); + + // Don't keep a connection for more than ~10s ... of processing + if ((time(NULL) - now) > 10) { + PQfinish(conn); + conn = dbconnect(); + now = time(NULL); } - // ml_item is set for all but CMD_REPLY - cmdnum = breakdown(&ml_item, buf, &now, SE_RELOAD); - DATA_MSGLINE(msgline, ml_item); - switch (cmdnum) { + DATA_BREAKQUEUE(bq, bq_item); + DATA_MSGLINE(msgline, bq->ml_item); + switch (bq->cmdnum) { // Ignore case CMD_REPLY: case CMD_ALERTEVENT: @@ -4726,7 +4952,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_HIGH: LOGERR("%s() INVALID message line %"PRIu64 " ignored '%.42s...", - __func__, count, + __func__, bq->count, st = safe_text(msgline->msg)); FREENULL(st); break; @@ -4751,7 +4977,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) (char *)__func__, inet_default, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, true); FREENULL(ans); } break; @@ -4759,23 +4985,85 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) // Force this switch to be updated if new cmds are added quithere(1, "%s line %"PRIu64" '%s' - not " "handled by reload", - filename, count, + bq->filename, bq->count, st = safe_text_nonull(msgline->cmd)); // Won't get here ... FREENULL(st); break; } - if (ml_item) { - free_msgline_data(ml_item, true, true); + if (bq->ml_item) { + free_msgline_data(bq->ml_item, true, true); K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); + k_add_head(msgline_free, bq->ml_item); K_WUNLOCK(msgline_free); - ml_item = NULL; + bq->ml_item = NULL; } + free(bq->buf); + + K_WLOCK(breakqueue_free); + reload_processing--; + k_add_head(breakqueue_free, bq_item); + K_WUNLOCK(breakqueue_free); + + tick(); } - tick(); + PQfinish(conn); + + return NULL; +} + +static void reload_line(char *filename, char *buf, uint64_t count) +{ + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + int qcount; + char *end; + tv_t now; + + // Once we've read the message + setnow(&now); + if (buf) { + end = buf + strlen(buf) - 1; + // strip trailing \n and \r + while (end >= buf && (*end == '\n' || *end == '\r')) + *(end--) = '\0'; + } + if (!buf || !*buf) { + if (!buf) { + LOGERR("%s() NULL message line %"PRIu64, + __func__, count); + } else { + LOGERR("%s() Empty message line %"PRIu64, + __func__, count); + } + } else { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(breakqueue_free); + K_WUNLOCK(breakqueue_free); + + // release the lock since strdup could be slow, but rarely + DATA_BREAKQUEUE(bq, bq_item); + bq->buf = strdup(buf); + copy_tv(&(bq->now), &now); + bq->seqentryflags = SE_RELOAD; + bq->sockd = -1; + bq->count = count; + bq->filename = filename; + + K_WLOCK(breakqueue_free); + k_add_tail(reload_breakqueue_store, bq_item); + qcount = reload_breakqueue_store->count; + K_WUNLOCK(breakqueue_free); + + while (qcount > RELOAD_QUEUE_LIMIT) { + cksleep_ms(RELOAD_QUEUE_SLEEP); + K_RLOCK(breakqueue_free); + qcount = reload_breakqueue_store->count; + K_RUNLOCK(breakqueue_free); + } + } } // 10Mb for now - transactiontree can be large @@ -4840,7 +5128,8 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) errn, buf); } else { *apipe = true; - free(*filename); + /* Don't free the old filename since + * process_reload() could still access it */ *filename = name; return true; } @@ -4862,12 +5151,12 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) * if ckdb aborts at the beginning of the reload, then start again */ static bool reload_from(tv_t *start) { - PGconn *conn = NULL; + // proc_pt could exit after this returns + static pthread_t proc_pt; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; size_t rflen = strlen(restorefrom); char *missingfirst = NULL, *missinglast = NULL, *st = NULL; - int missing_count; - int processing; + int missing_count, processing, counter; bool finished = false, ret = true, ok, apipe = false; char *filename = NULL; uint64_t count, total; @@ -4875,6 +5164,7 @@ static bool reload_from(tv_t *start) double diff; FILE *fp = NULL; int file_N_limit; + time_t tick_time, tmp_time; reload_buf = malloc(MAX_READ); if (!reload_buf) @@ -4904,10 +5194,11 @@ static bool reload_from(tv_t *start) LOGQUE(reload_buf, true); LOGQUE(reload_buf, false); - conn = dbconnect(); + create_pthread(&proc_pt, process_reload, NULL); total = 0; processing = 0; + tick_time = time(NULL); while (!everyone_die && !finished) { LOGWARNING("%s(): processing %s", __func__, filename); processing++; @@ -4920,7 +5211,30 @@ static bool reload_from(tv_t *start) * order messages in the log file */ while (!everyone_die && logline(reload_buf, MAX_READ, fp, filename)) { - reload_line(conn, filename, ++count, reload_buf); + reload_line(filename, reload_buf, ++count); + + tmp_time = time(NULL); + // Report stats every 15s + if ((tmp_time - tick_time) > 14) { + int relq, relqd, cmdq, cmdqd, mx, poolq; + K_RLOCK(breakqueue_free); + relq = reload_breakqueue_store->count + + reload_processing; + relqd = reload_done_breakqueue_store->count; + cmdq = cmd_breakqueue_store->count; + cmdqd = cmd_done_breakqueue_store->count; + mx = max_sockd_count; + K_RUNLOCK(breakqueue_free); + K_RLOCK(workqueue_free); + poolq = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); + printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" + " ckp %d/%d/%d (%d) \r", + total+count, relq, relqd, + cmdq, cmdqd, poolq, mx); + fflush(stdout); + tick_time = tmp_time; + } } LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", @@ -4938,7 +5252,8 @@ static bool reload_from(tv_t *start) } } else fclose(fp); - free(filename); + /* Don't free the old filename since + * process_reload() could access use it */ if (everyone_die) break; reload_timestamp.tv_sec += ROLL_S; @@ -4995,7 +5310,15 @@ static bool reload_from(tv_t *start) } } - PQfinish(conn); + while (!everyone_die) { + K_RLOCK(breakqueue_free); + counter = reload_done_breakqueue_store->count + + reload_breakqueue_store->count + reload_processing; + K_RUNLOCK(breakqueue_free); + if (counter == 0) + break; + cksleep_ms(142); + } setnow(&now); diff = tvdiff(&now, &begin); @@ -5060,7 +5383,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item) workqueue->code, workqueue->inet, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, false); FREENULL(ans); break; } @@ -5099,6 +5422,7 @@ static void *listener(void *arg) pthread_t sock_pt; pthread_t summ_pt; pthread_t mark_pt; + pthread_t break_pt; K_ITEM *wq_item; time_t now; int wqcount, wqgot; @@ -5109,7 +5433,10 @@ static void *listener(void *arg) SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; - int i; + int cpus, i; + bool reloader, cmder; + + pthread_detach(pthread_self()); LOCK_INIT("db_plistener"); rename_proc("db_plistener"); @@ -5118,9 +5445,29 @@ static void *listener(void *arg) ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); logqueue_store = k_new_store(logqueue_free); + breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), + ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); + reload_breakqueue_store = k_new_store(breakqueue_free); + reload_done_breakqueue_store = k_new_store(breakqueue_free); + cmd_breakqueue_store = k_new_store(breakqueue_free); + cmd_done_breakqueue_store = k_new_store(breakqueue_free); + #if LOCK_CHECK DLPRIO(logqueue, 94); + DLPRIO(breakqueue, PRIO_TERMINAL); #endif + if (breakdown_threads <= 0) { + cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; + breakdown_threads = (int)(cpus / 3) ? : 1; + } + LOGWARNING("%s(): creating %d*2 breaker threads ...", + __func__, breakdown_threads); + reloader = true; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &reloader); + cmder = false; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &cmder); create_pthread(&log_pt, logger, NULL); @@ -5192,7 +5539,6 @@ static void *listener(void *arg) if (wq_item) { wqgot++; process_queued(conn, wq_item); - tick(); } if (left == 0 && wq_stt.tv_sec != 0L) { @@ -5838,8 +6184,10 @@ static void check_restore_dir(char *name) static struct option long_options[] = { // script to call when alerts happen { "alert", required_argument, 0, 'a' }, - // workinfo to start shares_fill() default is 1 day + // workinfoid to start shares_fill() default is 1 day { "shares-begin", required_argument, 0, 'b' }, + // override calculated value + { "breakdown-threads", required_argument, 0, 'B' }, { "config", required_argument, 0, 'c' }, { "dbname", required_argument, 0, 'd' }, { "minsdiff", required_argument, 0, 'D' }, @@ -5901,7 +6249,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'a': len = strlen(optarg); @@ -5921,6 +6269,19 @@ int main(int argc, char **argv) } shares_begin = beg; } + break; + case 'B': + { + int bt = atoi(optarg); + if (bt < 1) { + quit(1, "Invalid breakdown " + "thread count %d " + "- must be > 0", + bt); + } + breakdown_threads = bt; + } + break; case 'c': ckp.config = strdup(optarg); break; @@ -6172,6 +6533,7 @@ int main(int argc, char **argv) ckp.main.ckp = &ckp; ckp.main.processname = strdup("main"); + cklock_init(&breakdown_lock); cklock_init(&last_lock); cklock_init(&btc_lock); cklock_init(&poolinstance_lock); @@ -6219,11 +6581,12 @@ int main(int argc, char **argv) time_t start, trigger, curr; char *msg = NULL; + everyone_die = true; trigger = start = time(NULL); while (socketer_using_data || summariser_using_data || logger_using_data || plistener_using_data || clistener_using_data || blistener_using_data || - marker_using_data) { + marker_using_data || breakdown_using_data) { msg = NULL; curr = time(NULL); if (curr - start > 4) { @@ -6235,7 +6598,7 @@ int main(int argc, char **argv) } if (msg) { trigger = curr; - printf("%s %ds due to%s%s%s%s%s%s%s\n", + printf("%s %ds due to%s%s%s%s%s%s%s%s\n", msg, (int)(curr - start), socketer_using_data ? " socketer" : EMPTY, summariser_using_data ? " summariser" : EMPTY, @@ -6243,7 +6606,8 @@ int main(int argc, char **argv) plistener_using_data ? " plistener" : EMPTY, clistener_using_data ? " clistener" : EMPTY, blistener_using_data ? " blistener" : EMPTY, - marker_using_data ? " marker" : EMPTY); + marker_using_data ? " marker" : EMPTY, + breakdown_using_data ? " breakdown" : EMPTY); fflush(stdout); } sleep(1); diff --git a/src/ckdb.h b/src/ckdb.h index 9f1579a4..9487deb8 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.990" +#define CKDB_VERSION DB_VERSION"-2.000" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1037,7 +1037,7 @@ typedef struct msgline { #define ALLOC_MSGLINE 8192 #define LIMIT_MSGLINE 0 -#define CULL_MSGLINE 16 +#define CULL_MSGLINE 8 #define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline) #define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true) #define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false) @@ -1045,6 +1045,57 @@ typedef struct msgline { extern K_LIST *msgline_free; extern K_STORE *msgline_store; +// BREAKQUEUE +typedef struct breakqueue { + char *buf; + tv_t now; + int seqentryflags; + int sockd; + enum cmd_values cmdnum; + K_ITEM *ml_item; + uint64_t count; + char *filename; +} BREAKQUEUE; + +#define ALLOC_BREAKQUEUE 16384 +#define LIMIT_BREAKQUEUE 0 +#define CULL_BREAKQUEUE 4 +#define INIT_BREAKQUEUE(_item) INIT_GENERIC(_item, breakqueue) +#define DATA_BREAKQUEUE(_var, _item) DATA_GENERIC(_var, _item, breakqueue, true) + +/* If a breaker() thread's done break queue count hits the LIMIT, or is empty, + * it will sleep for SLEEP ms + * So this means that with a single breaker() thread, + * it can process at most LIMIT records per SLEEP ms + * or: 1000 * LIMIT / SLEEP records per second + * For N breaker() threads, that would mean between 1 and N times that value + * dependent upon the random time spacing of the N thread sleeps + * However, also note that LIMIT defines how much RAM can be used by + * the break queues, so a limit is required + * A breakqueue item can get quite large since it includes both buf + * and ml_item (which has the transfer data) in the 'done' queue + * Of course the processing speed of the ml_items will also decide how big the + * break queue count can get + * Note that if the CMD queues get too large they will be too slow responding + * to the sockets that sent the message, however the CMD ml_item processing + * responds immediately before processing the ml_item for all but ADDRAUTH, + * AUTHORISE and HEARTBEAT + * The reload also uses this limit when filling the reload break queue + * thus limiting the line processing of reload files + */ +// 16300,42 equated to single thread limitation of ~388k per second +#define RELOAD_QUEUE_LIMIT 16300 +#define RELOAD_QUEUE_SLEEP 42 +#define CMD_QUEUE_LIMIT 16300 +#define CMD_QUEUE_SLEEP 42 + +extern K_LIST *breakqueue_free; +extern K_STORE *reload_breakqueue_store; +extern K_STORE *reload_done_breakqueue_store; +extern K_STORE *cmd_breakqueue_store; +extern K_STORE *cmd_done_breakqueue_store; +extern int max_sockd_count; + // WORKQUEUE typedef struct workqueue { K_ITEM *msgline_item; @@ -1093,7 +1144,7 @@ typedef struct transfer { // Suggest malloc use MMAP - 1913 = largest under 2MB #define ALLOC_TRANSFER 1913 #define LIMIT_TRANSFER 0 -#define CULL_TRANSFER 64 +#define CULL_TRANSFER 32 #define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer) #define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true) @@ -3124,7 +3175,7 @@ extern bool auths_add(PGconn *conn, char *poolinstance, char *username, char *useragent, char *preauth, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, bool addressuser, USERS **users, WORKERS **workers, - int *event); + int *event, bool reload_data); extern bool poolstats_add(PGconn *conn, bool store, char *poolinstance, char *elapsed, char *users, char *workers, char *hashrate, char *hashrate5m, @@ -3192,7 +3243,7 @@ struct CMDS { bool noid; // doesn't require an id bool createdate; // requires a createdate char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, - char *, tv_t *, K_TREE *); + char *, tv_t *, K_TREE *, bool); enum seq_num seq; int access; }; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 3c451657..62b7ac0b 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -34,7 +34,7 @@ static K_ITEM *adminuser(K_TREE *trf_root, char *reply, size_t siz) static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *notcd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -87,7 +87,8 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_oldhash, *i_newhash, *i_2fa, *u_item; char reply[1024] = ""; @@ -166,7 +167,8 @@ static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_passwordhash, *i_2fa, *u_item; char reply[1024] = ""; @@ -222,7 +224,8 @@ static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_2fa(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_action, *i_entropy, *i_value, *u_item, *u_new; char reply[1024] = ""; @@ -464,7 +467,8 @@ dame: static char *cmd_userset(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_passwordhash, *i_2fa, *i_rows, *i_address; K_ITEM *i_ratio, *i_payname, *i_email, *u_item, *pa_item, *old_pa_item; @@ -781,7 +785,7 @@ struckout: static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { K_ITEM *i_username, *i_workername, *i_diffdef, *i_oldworkers; K_ITEM *u_item, *ua_item, *w_item; @@ -1081,7 +1085,7 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { bool igndup = false; @@ -1096,7 +1100,8 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root) + char *inet, tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1180,7 +1185,8 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_workerstats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root) + char *inet, tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1263,7 +1269,8 @@ static char *cmd_blocklist(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { int ovent = OVENT_OK; K_TREE_CTX ctx[1]; @@ -1525,7 +1532,8 @@ redo: static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_height, *i_blockhash, *i_action, *i_info; char reply[1024] = ""; @@ -1682,7 +1690,7 @@ static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now, static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1718,7 +1726,8 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *p_item, *p2_item, *po_item; K_TREE_CTX ctx[1]; @@ -2056,7 +2065,8 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users) static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item; K_ITEM *ua_item, *us_item, *ws_item; @@ -2382,7 +2392,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_STORE *usu_store = k_new_store(userstats_free); K_ITEM *us_item, *usu_item, *u_item; @@ -2494,7 +2505,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -2946,7 +2958,7 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, int32_t height, char *id, static char *cmd_blocks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -2969,12 +2981,13 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id, igndup = true; } - return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup, trf_root); + return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup, + trf_root); } static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { K_ITEM tmp_poolinstance_item; TRANSFER tmp_poolinstance; @@ -3046,7 +3059,8 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - event = events_add(EVENTID_AUTOACC, trf_root); + if (!reload_data) + event = events_add(EVENTID_AUTOACC, trf_root); if (event == EVENT_OK) { DATA_OPTIONCONTROL(optioncontrol, oc_item); u_item = users_add(conn, username, EMPTY, @@ -3067,7 +3081,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, transfer_data(i_useragent), transfer_data(i_preauth), by, code, inet, cd, trf_root, false, - &users, &workers, &event); + &users, &workers, &event, reload_data); } if (!ok) { @@ -3123,14 +3137,15 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_auth(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { - return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root); + return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root, + reload_data); } static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { K_ITEM tmp_poolinstance_item; TRANSFER tmp_poolinstance; @@ -3201,7 +3216,7 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, transfer_data(i_useragent), transfer_data(i_preauth), by, code, inet, cd, trf_root, true, - &users, &workers, &event); + &users, &workers, &event, reload_data); if (!ok) { LOGDEBUG("%s() %s.failed.DBE", __func__, id); @@ -3256,16 +3271,18 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_addrauth(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { - return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root); + return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root, + reload_data); } static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *cd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { HEARTBEATQUEUE *heartbeatqueue; K_STORE *hq_store; @@ -3331,7 +3348,8 @@ pulse: static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look; K_ITEM *ua_item, *pa_item; @@ -3506,12 +3524,16 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, int psync = pool_workqueue_store->count; int csync = cmd_workqueue_store->count; int bsync = btc_workqueue_store->count; + int qsync = breakqueue_free->total - breakqueue_free->count; snprintf(tmp, sizeof(tmp), "psync=%d%c", psync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "csync=%d%c", csync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "bsync=%d%c", bsync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); + snprintf(tmp, sizeof(tmp), "qsync=%d%c", qsync, FLDSEP); + APPEND_REALLOC(buf, off, len, tmp); + // qsync isn't part of 'sync' snprintf(tmp, sizeof(tmp), "sync=%d%c", psync + csync + bsync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); @@ -3622,7 +3644,8 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_getatts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_attlist, *u_item, *ua_item; char reply[1024] = ""; @@ -3798,7 +3821,8 @@ static void att_to_date(tv_t *date, char *data, tv_t *now) * */ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { ExecStatusType rescode; PGresult *res; @@ -3967,7 +3991,8 @@ bats: static char *cmd_expatts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_attlist, *u_item, *ua_item; char reply[1024] = ""; @@ -4050,7 +4075,8 @@ rats: static char *cmd_getopts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_optlist, *oc_item; char reply[1024] = ""; @@ -4128,7 +4154,8 @@ ruts: * See opt_set_date() above */ static char *cmd_setopts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { ExecStatusType rescode; PGresult *res; @@ -4276,9 +4303,10 @@ rollback: * and the breakdown for percent address users, * the totals per user and per payout should still be the same */ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, - __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024], tmp[1024], *buf; char *block_extra, *share_status = EMPTY, *marks_status = EMPTY; @@ -4753,9 +4781,10 @@ shazbot: // Generated from the payouts, miningpayouts and payments data static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id, - __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024], tmp[1024], *buf; char *block_extra, *marks_status = EMPTY; @@ -5022,7 +5051,8 @@ shazbot: static char *cmd_payouts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5274,7 +5304,8 @@ static char *cmd_mpayouts(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *mp_item, *po_item; K_TREE_CTX ctx[1]; @@ -5466,7 +5497,8 @@ static int select_list(WM *wm, char *select) static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_select; K_ITEM *u_item, *p_item, *m_item, ms_look, *wm_item, *ms_item, *wi_item; @@ -5815,7 +5847,8 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { __maybe_unused K_ITEM *i_file; __maybe_unused char reply[1024] = ""; @@ -5861,7 +5894,9 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { char tmp[1024], *buf; const char *name; @@ -5949,7 +5984,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, // TODO: add to heartbeat to disable the miner if active and status != "" static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6016,7 +6051,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char * static char *cmd_marks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6535,7 +6570,8 @@ dame: static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username; K_ITEM *u_item, *p_item, *m_item, *wm_item, *ms_item, *wi_item; @@ -6740,18 +6776,26 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, } /* Show a share status report on the console - * Currently: sequence status and OoO info */ + * Currently: sequence status, OoO info and max_sockd_count */ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { char ooo_buf[256]; char buf[256]; + int count; LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); + K_RLOCK(breakqueue_free); + count = max_sockd_count; + K_RUNLOCK(breakqueue_free); + LOGWARNING(" max_sockd_count=%d", count); + snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf); return strdup(buf); @@ -6761,7 +6805,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *ui_item; USERINFO *userinfo; @@ -6850,7 +6895,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_btcserver, *i_userpass; char *btcserver = NULL, *userpass = NULL, *tmp; @@ -6902,7 +6948,8 @@ static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_TREE_CTX ctx[1]; char cd_buf[DATE_BUFSIZ]; @@ -7498,7 +7545,8 @@ static char *cmd_locks(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *cd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { bool code_locks = false, code_deadlocks = false; bool was_locks = false, was_deadlocks = false; @@ -7610,7 +7658,8 @@ static void event_tree(K_TREE *the_tree, char *list, char *reply, size_t siz, static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_action, *i_cmd, *i_list, *i_ip, *i_eventname, *i_lifetime; K_ITEM *i_des, *i_item, *next_item, *o_item; @@ -8093,7 +8142,8 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_high(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { bool conned = false; K_TREE_CTX ctx[1]; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index f4c33b50..e42252de 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -7100,7 +7100,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, char *useragent, char *preauth, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, bool addressuser, USERS **users, WORKERS **workers, - int *event) + int *event, bool reload_data) { K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item, *w_item; @@ -7131,7 +7131,8 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, __func__, st = safe_text_nonull(username)); FREENULL(st); - *event = events_add(EVENTID_INVAUTH, trf_root); + if (!reload_data) + *event = events_add(EVENTID_INVAUTH, trf_root); } if (!u_item) goto unitem; From 7dee1e28b5e9f3e9df5a2b4eaad177450586493d Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 28 Mar 2016 01:19:24 +1100 Subject: [PATCH 12/15] ckdb - report the msg sent on a unix socket send failure --- src/ckdb.c | 35 +++++++++++++++++++++++------------ src/ckdb.h | 4 +++- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index f2f58c18..84e0ce47 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -837,6 +837,17 @@ static void check_createdate_ccl(char *cmd, tv_t *cd) STRNCPY(last_cmd, cmd); } +void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS) +{ + bool ok; + + ok = _send_unix_msg(sockd, msg, UNIX_WRITE_TIMEOUT, WHERE_FFL_PASS); + if (!ok) { + LOGWARNING(" msg was %.42s%s", msg ? : nullstr, + msg ? "..." : EMPTY); + } +} + static uint64_t ticks; static time_t last_tick; @@ -4334,7 +4345,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) snprintf(rep, siz, "%s.%ld.%s", msgline->id, msgline->now.tv_sec, ans); - send_unix_msg(msgline->sockd, rep); + ckdb_unix_msg(msgline->sockd, rep); close(msgline->sockd); K_WLOCK(breakqueue_free); sockd_count--; @@ -4478,7 +4489,7 @@ static void *process_socket(void *arg) "%s.%ld.?.", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); break; case CMD_ALERTEVENT: case CMD_ALERTOVENT: @@ -4490,7 +4501,7 @@ static void *process_socket(void *arg) tmp = reply_event(EVENTID_NONE, reply); else tmp = reply_ovent(OVENTID_NONE, reply); - send_unix_msg(bq->sockd, tmp); + ckdb_unix_msg(bq->sockd, tmp); FREENULL(tmp); break; case CMD_TERMINATE: @@ -4501,7 +4512,7 @@ static void *process_socket(void *arg) "%s.%ld.ok.exiting", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); everyone_die = true; break; case CMD_PING: @@ -4511,7 +4522,7 @@ static void *process_socket(void *arg) "%s.%ld.ok.pong", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); break; case CMD_VERSION: LOGDEBUG("Listener received" @@ -4521,7 +4532,7 @@ static void *process_socket(void *arg) msgline->id, bq->now.tv_sec, CKDB_VERSION); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); break; case CMD_LOGLEVEL: if (!*(msgline->id)) { @@ -4565,7 +4576,7 @@ static void *process_socket(void *arg) " %d currently %d B", loglevel, oldloglevel); } - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); break; case CMD_FLUSH: LOGDEBUG("Listener received" @@ -4573,7 +4584,7 @@ static void *process_socket(void *arg) snprintf(reply, sizeof(reply), "%s.%ld.ok.splash", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); fflush(stdout); fflush(stderr); if (global_ckp && global_ckp->logfd) @@ -4638,7 +4649,7 @@ static void *process_socket(void *arg) msgline->id, bq->now.tv_sec, msgline->cmd); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); } else { msgline->sockd = bq->sockd; bq->sockd = -1; @@ -4683,7 +4694,7 @@ static void *process_socket(void *arg) snprintf(rep, siz, "%s.%ld.%s", msgline->id, bq->now.tv_sec, ans); - send_unix_msg(bq->sockd, rep); + ckdb_unix_msg(bq->sockd, rep); FREENULL(ans); FREENULL(rep); replied = true; @@ -4705,7 +4716,7 @@ static void *process_socket(void *arg) "%s.%ld.ok.queued", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); } K_WLOCK(workqueue_free); @@ -4746,7 +4757,7 @@ static void *process_socket(void *arg) "%s.%ld.failed.code", msgline->id, bq->now.tv_sec); - send_unix_msg(bq->sockd, reply); + ckdb_unix_msg(bq->sockd, reply); break; } if (bq->sockd >= 0) { diff --git a/src/ckdb.h b/src/ckdb.h index 9487deb8..beacd968 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.000" +#define CKDB_VERSION DB_VERSION"-2.001" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2649,6 +2649,8 @@ extern K_STORE *userinfo_store; extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); +extern void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS); +#define ckdb_unix_msg(_sockd, _msg) _ckdb_unix_msg(_sockd, _msg, WHERE_FFL_HERE) extern void tick(); extern PGconn *dbconnect(); extern void sequence_report(bool lock); From 521c0cbfde7f771a20289fcacbf921a4f5dd7f3b Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 28 Mar 2016 01:32:05 +1100 Subject: [PATCH 13/15] ckdb - put back the pool msg tick --- src/ckdb.c | 1 + src/ckdb.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ckdb.c b/src/ckdb.c index 84e0ce47..169b2a38 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -5550,6 +5550,7 @@ static void *listener(void *arg) if (wq_item) { wqgot++; process_queued(conn, wq_item); + tick(); } if (left == 0 && wq_stt.tv_sec != 0L) { diff --git a/src/ckdb.h b/src/ckdb.h index beacd968..2bbaeaca 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.001" +#define CKDB_VERSION DB_VERSION"-2.002" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From e318dead0369f78ad193461ab5e83d07c60f3b49 Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 29 Mar 2016 06:30:20 +1100 Subject: [PATCH 14/15] php - add a payout address limitation message --- pool/page_settings.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pool/page_settings.php b/pool/page_settings.php index 5f6901aa..3366da1a 100644 --- a/pool/page_settings.php +++ b/pool/page_settings.php @@ -50,7 +50,8 @@ function settings($data, $user, $email, $addr, $err) $pg .= makeForm('settings'); $pg .= '
Worker NameMinimum Diff
'; $pg .= ''; $pg .= '
'; - $pg .= 'To change your payout address, enter a new address and your password'; + $pg .= 'To change your payout address, enter a new address and your password.
'; + $pg .= 'A payout address can only ever be set to one account'; $pg .= '
'; $pg .= 'BTC Address:'; From fc9f809e01297dfb91c471cdc3d59597a640fe13 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 31 Mar 2016 00:38:41 +1100 Subject: [PATCH 15/15] ckdb - ensure all earlysock data is processed first --- src/ckdb.c | 107 +++++++++++++++++++++++++++++++++++++---------------- src/ckdb.h | 4 +- 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 169b2a38..08b3855f 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -351,16 +351,23 @@ K_STORE *cmd_breakqueue_store; K_STORE *cmd_done_breakqueue_store; // Locked access with breakqueue_free static int reload_processing; +static int cmd_processing; static int sockd_count; int max_sockd_count; // WORKQUEUE K_LIST *workqueue_free; +// pool0 is all pool data during the reload +K_STORE *pool0_workqueue_store; K_STORE *pool_workqueue_store; K_STORE *cmd_workqueue_store; K_STORE *btc_workqueue_store; mutex_t wq_waitlock; pthread_cond_t wq_waitcond; +// this counter ensures we don't switch early from pool0 to pool +static int pool0_left; +static int pool0_tot; +static int pool0_discarded; // HEARTBEATQUEUE K_LIST *heartbeatqueue_free; @@ -1166,6 +1173,7 @@ static void alloc_storage() workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); + pool0_workqueue_store = k_new_store(workqueue_free); pool_workqueue_store = k_new_store(workqueue_free); cmd_workqueue_store = k_new_store(workqueue_free); btc_workqueue_store = k_new_store(workqueue_free); @@ -1733,6 +1741,7 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); // TODO: msgline + FREE_STORE(pool0_workqueue); FREE_STORE(pool_workqueue); FREE_STORE(cmd_workqueue); FREE_STORE(btc_workqueue); @@ -4473,6 +4482,8 @@ static void *process_socket(void *arg) while (!everyone_die) { K_WLOCK(breakqueue_free); bq_item = k_unlink_head(cmd_done_breakqueue_store); + if (bq_item) + cmd_processing++; K_WUNLOCK(breakqueue_free); if (!bq_item) { @@ -4726,21 +4737,27 @@ static void *process_socket(void *arg) workqueue->by = by_default; workqueue->code = (char *)__func__; workqueue->inet = inet_default; - k_add_tail(pool_workqueue_store, wq_item); - /* Stop the reload queue from growing too big - * Use a size that 'should be big enough' */ - while (reloading && pool_workqueue_store->count > 250000) { - K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); - K_WUNLOCK(workqueue_free); - WORKQUEUE *wq; - DATA_WORKQUEUE(wq, wq2_item); - K_ITEM *ml_item = wq->msgline_item; - free_msgline_data(ml_item, true, false); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - K_WLOCK(workqueue_free); - k_add_head(workqueue_free, wq2_item); + if (bq->seqentryflags == SE_SOCKET) + k_add_tail(pool_workqueue_store, wq_item); + else { + k_add_tail(pool0_workqueue_store, wq_item); + /* Stop the reload queue from growing too big + * Use a size that 'should be big enough' */ + if (reloading && pool0_workqueue_store->count > 250000) { + K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store); + pool0_discarded++; + pool0_left--; + K_WUNLOCK(workqueue_free); + WORKQUEUE *wq; + DATA_WORKQUEUE(wq, wq2_item); + K_ITEM *ml_item = wq->msgline_item; + free_msgline_data(ml_item, true, false); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq2_item); + } } K_WUNLOCK(workqueue_free); wq_item = bq->ml_item = NULL; @@ -4778,6 +4795,7 @@ static void *process_socket(void *arg) K_WLOCK(breakqueue_free); if (dec_sockd) sockd_count--; + cmd_processing--; k_add_head(breakqueue_free, bq_item); K_WUNLOCK(breakqueue_free); } @@ -4841,8 +4859,13 @@ static void *socketer(void *arg) } } else { int seqentryflags = SE_SOCKET; - if (!reload_queue_complete) + if (!reload_queue_complete) { seqentryflags = SE_EARLYSOCK; + K_WLOCK(workqueue_free); + pool0_tot++; + pool0_left++; + K_WUNLOCK(workqueue_free); + } // Don't limit the speed filling up cmd_breakqueue_store K_WLOCK(breakqueue_free); @@ -5227,22 +5250,24 @@ static bool reload_from(tv_t *start) tmp_time = time(NULL); // Report stats every 15s if ((tmp_time - tick_time) > 14) { - int relq, relqd, cmdq, cmdqd, mx, poolq; + int relq, relqd, cmdq, cmdqd, mx, pool0q; K_RLOCK(breakqueue_free); relq = reload_breakqueue_store->count + reload_processing; relqd = reload_done_breakqueue_store->count; - cmdq = cmd_breakqueue_store->count; + cmdq = cmd_breakqueue_store->count + + cmd_processing; cmdqd = cmd_done_breakqueue_store->count; mx = max_sockd_count; K_RUNLOCK(breakqueue_free); K_RLOCK(workqueue_free); - poolq = pool_workqueue_store->count; + pool0q = pool0_workqueue_store->count; + // pool_workqueue_store should be zero K_RUNLOCK(workqueue_free); printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" " ckp %d/%d/%d (%d) \r", total+count, relq, relqd, - cmdq, cmdqd, poolq, mx); + cmdq, cmdqd, pool0q, mx); fflush(stdout); tick_time = tmp_time; } @@ -5436,16 +5461,15 @@ static void *listener(void *arg) pthread_t break_pt; K_ITEM *wq_item; time_t now; - int wqcount, wqgot; + int bq, bqp, bqd, wq0count, wqcount, wqgot; char ooo_buf[256]; tv_t wq_stt, wq_fin; double min, sec; - int left; SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; int cpus, i; - bool reloader, cmder; + bool reloader, cmder, pool0; pthread_detach(pthread_self()); @@ -5500,13 +5524,22 @@ static void *listener(void *arg) if (!everyone_die) { K_RLOCK(workqueue_free); + wq0count = pool0_workqueue_store->count; wqcount = pool_workqueue_store->count; + K_RLOCK(breakqueue_free); + bq = cmd_breakqueue_store->count; + bqp = cmd_processing; + bqd = cmd_done_breakqueue_store->count; + K_RUNLOCK(breakqueue_free); K_RUNLOCK(workqueue_free); - LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + LOGWARNING("reload shares OoO %s", + ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); - LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); + LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)", + __func__, bq+bqp+bqd+wq0count+wqcount, + bq, bqp, bqd, wq0count, wqcount); /* Until startup_complete, the values should be ignored * Setting them to 'now' means that they won't time out @@ -5528,14 +5561,26 @@ static void *listener(void *arg) wqgot = 0; } - // Process queued work + /* Process queued work - ensure pool0 is emptied first, + * even if there is pending pool0 data being processed by breaker() */ + pool0 = true; while (!everyone_die) { + wq_item = NULL; K_WLOCK(workqueue_free); - wq_item = k_unlink_head(pool_workqueue_store); - left = pool_workqueue_store->count; + if (pool0) { + if (pool0_left == 0) + pool0 = false; + else { + wq_item = k_unlink_head(pool0_workqueue_store); + if (wq_item) + pool0_left--; + } + } + if (!pool0) + wq_item = k_unlink_head(pool_workqueue_store); K_WUNLOCK(workqueue_free); - if (left == 0 && wq_stt.tv_sec != 0L) + if (!pool0 && wq_stt.tv_sec != 0L) setnow(&wq_fin); /* Don't keep a connection for more than ~10s or ~10000 items @@ -5553,11 +5598,11 @@ static void *listener(void *arg) tick(); } - if (left == 0 && wq_stt.tv_sec != 0L) { + if (!pool0 && wq_stt.tv_sec != 0L) { sec = tvdiff(&wq_fin, &wq_stt); min = floor(sec / 60.0); sec -= min * 60.0; - LOGWARNING("reload queue completed %.0fm %.3fs", min, sec); + LOGWARNING("pool queue completed %.0fm %.3fs", min, sec); // Used as the flag to display the message once wq_stt.tv_sec = 0L; reload_queue_complete = true; diff --git a/src/ckdb.h b/src/ckdb.h index 2bbaeaca..f099ed04 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.002" +#define CKDB_VERSION DB_VERSION"-2.003" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1111,6 +1111,8 @@ typedef struct workqueue { #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) extern K_LIST *workqueue_free; +// pool0 is all pool data during the reload +extern K_STORE *pool0_workqueue_store; extern K_STORE *pool_workqueue_store; extern K_STORE *cmd_workqueue_store; extern K_STORE *btc_workqueue_store;