';
$pg .= 'BTC Address:';
diff --git a/pool/page_shifts.php b/pool/page_shifts.php
index 66d33c4d..5de1b39c 100644
--- a/pool/page_shifts.php
+++ b/pool/page_shifts.php
@@ -5,7 +5,7 @@ function doshifts($data, $user)
$ans = getShifts($user);
$pg = "Click here to jump to the start of the last payout ";
- $pg .= "\n";
+ $pg .= "\n";
$pg .= '';
$pg .= 'Shift ';
$pg .= 'Start UTC ';
diff --git a/pool/page_stats.php b/pool/page_stats.php
index bd525790..d528f630 100644
--- a/pool/page_stats.php
+++ b/pool/page_stats.php
@@ -65,7 +65,7 @@ function dostats($data, $user)
$ans = getAllUsers($user);
- $pg .= "\n";
+ $pg .= "\n";
$pg .= '';
$pg .= 'Username ';
$pg .= 'Hash Rate 5m ';
diff --git a/pool/page_userinfo.php b/pool/page_userinfo.php
index fc235610..436cbad3 100644
--- a/pool/page_userinfo.php
+++ b/pool/page_userinfo.php
@@ -20,7 +20,7 @@ function douserinfo($data, $user)
$ans = getUserInfo($user);
$pg = 'Block Acclaim ';
- $pg .= "\n";
+ $pg .= "\n";
$pg .= '';
$pg .= 'User ';
$pg .= 'Blocks ';
diff --git a/pool/page_usperf.php b/pool/page_usperf.php
index d99591b0..813808c7 100644
--- a/pool/page_usperf.php
+++ b/pool/page_usperf.php
@@ -30,7 +30,8 @@ function dousperf($data, $user)
// This also defines how many worker fields there are
$cols = array('#0000c0', '#00dd00', '#e06020', '#b020e0');
- $nc = count($cols);
+ $cols2 = array('#2090e0', '#e0c040', '#ff6090', '#90e040');
+ $nc = count($cols)+count($cols2);
$workers = 'all';
if (isset($_COOKIE['workers']))
@@ -93,9 +94,24 @@ function dousperf($data, $user)
$datacols .= ',';
$datacols .= $col;
}
-
$oncl = "wch();location.href=\"".makeURL('usperf')."\"";
$pg .= "Update ";
+
+ # the rest of the workers/colours go below the graph
+ $pg2 = '
\n";
+
foreach ($cbx as $nam => $txt)
{
$pg .= '
';
@@ -107,6 +123,7 @@ function dousperf($data, $user)
$pg .= '';
$pg .= 'A graph will show here if your browser supports html5/canvas';
$pg .= "
\n";
+ $pg .= $pg2;
$data = str_replace(array("\\","'"), array("\\\\","\\'"), $ans['DATA']);
$data .= $fld_sep . 'cols' . $val_sep . $datacols;
$pg .= "\n";
$pg .= "Show Details for Invalids: ";
- $pg .= "\n";
+ $pg .= "\n";
return $pg;
}
#
diff --git a/pool/page_workmgt.php b/pool/page_workmgt.php
index b716e20b..87f60c63 100644
--- a/pool/page_workmgt.php
+++ b/pool/page_workmgt.php
@@ -23,7 +23,7 @@ function workmgtuser($data, $user, $err)
}
$pg .= makeForm('workmgt');
- $pg .= "\n";
+ $pg .= "\n";
$pg .= '';
$pg .= 'Worker Name ';
$pg .= 'Minimum Diff ';
diff --git a/src/ckdb.c b/src/ckdb.c
index 30141ca4..08b3855f 100644
--- a/src/ckdb.c
+++ b/src/ckdb.c
@@ -28,9 +28,9 @@
* with an ok.queued reply to ckpool, to be processed after the reload
* completes and just process authorise messages immediately while the
* reload runs
- * We start the ckpool message queue after loading
- * the users, idcontrol and workers DB tables, before loading the
- * much larger DB tables so that ckdb is effectively ready for messages
+ * However, we start the ckpool message queue after loading
+ * the optioncontrol, users, workers and useratts DB tables, before loading
+ * the much larger DB tables, so that ckdb is effectively ready for messages
* almost immediately
* The first ckpool message allows us to know where ckpool is up to
* in the CCLs - see reload_from() for how this is handled
@@ -47,8 +47,10 @@
* complete='a' (or 'y') and were deleted from RAM
* If there are none with complete='n' but are others in the DB,
* then the newest firstshare is used
+ * DB shares: no current processing done with the shares_hi tree inside
+ * CKDB. DB load gets the past 1 day to resolve duplicates
* RAM shareerrors: as above
- * DB+RAM sharesummary: created from shares, so as above
+ * RAM sharesummary: created from shares, so as above
* Some shares after this may have been summarised to other
* sharesummary complete='n', but for any such sharesummary
* we reset it back to the first share found and it will
@@ -112,6 +114,15 @@ static bool logger_using_data;
static bool plistener_using_data;
static bool clistener_using_data;
static bool blistener_using_data;
+static bool breakdown_using_data;
+
+// -B to override calculated value
+static int breakdown_threads = -1;
+static int reload_breakdown_count = 0;
+static int cmd_breakdown_count = 0;
+/* Lock for access to *breakdown_count
+ * Any change to/from 0 will update breakdown_using_data */
+static cklock_t breakdown_lock;
char *EMPTY = "";
const char *nullstr = "(null)";
@@ -128,6 +139,7 @@ static char *status_chars = "|/-\\";
static char *restorefrom;
+static bool ignore_seq = false;
bool genpayout_auto;
bool markersummary_auto;
@@ -331,13 +343,31 @@ K_STORE *logqueue_store;
K_LIST *msgline_free;
K_STORE *msgline_store;
+// BREAKQUEUE
+K_LIST *breakqueue_free;
+K_STORE *reload_breakqueue_store;
+K_STORE *reload_done_breakqueue_store;
+K_STORE *cmd_breakqueue_store;
+K_STORE *cmd_done_breakqueue_store;
+// Locked access with breakqueue_free
+static int reload_processing;
+static int cmd_processing;
+static int sockd_count;
+int max_sockd_count;
+
// WORKQUEUE
K_LIST *workqueue_free;
+// pool0 is all pool data during the reload
+K_STORE *pool0_workqueue_store;
K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store;
mutex_t wq_waitlock;
pthread_cond_t wq_waitcond;
+// this counter ensures we don't switch early from pool0 to pool
+static int pool0_left;
+static int pool0_tot;
+static int pool0_discarded;
// HEARTBEATQUEUE
K_LIST *heartbeatqueue_free;
@@ -435,6 +465,7 @@ K_STORE *shares_hi_store;
double diff_percent = DIFF_VAL(DIFF_PERCENT_DEFAULT);
double share_min_sdiff = 0;
+int64_t shares_begin = -1;
// SHAREERRORS shareerrors.id.json={...}
K_TREE *shareerrors_root;
@@ -593,7 +624,8 @@ K_TREE *markersummary_pool_root;
K_STORE *markersummary_pool_store;
// The markerid load start for markersummary
-char *mark_start = NULL;
+char mark_start_type = '\0';
+int64_t mark_start = -1;
// WORKMARKERS
K_TREE *workmarkers_root;
@@ -812,6 +844,17 @@ static void check_createdate_ccl(char *cmd, tv_t *cd)
STRNCPY(last_cmd, cmd);
}
+void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS)
+{
+ bool ok;
+
+ ok = _send_unix_msg(sockd, msg, UNIX_WRITE_TIMEOUT, WHERE_FFL_PASS);
+ if (!ok) {
+ LOGWARNING(" msg was %.42s%s", msg ? : nullstr,
+ msg ? "..." : EMPTY);
+ }
+}
+
static uint64_t ticks;
static time_t last_tick;
@@ -913,8 +956,12 @@ static bool getdata3()
if (!(ok = miningpayouts_fill(conn)) || everyone_die)
goto sukamudai;
}
+ PQfinish(conn);
+ conn = dbconnect();
if (!(ok = workinfo_fill(conn)) || everyone_die)
goto sukamudai;
+ PQfinish(conn);
+ conn = dbconnect();
if (!(ok = marks_fill(conn)) || everyone_die)
goto sukamudai;
/* must be after workinfo */
@@ -925,8 +972,12 @@ static bool getdata3()
if (!(ok = payouts_fill(conn)) || everyone_die)
goto sukamudai;
}
+ PQfinish(conn);
+ conn = dbconnect();
if (!(ok = markersummary_fill(conn)) || everyone_die)
goto sukamudai;
+ PQfinish(conn);
+ conn = dbconnect();
if (!(ok = shares_fill(conn)) || everyone_die)
goto sukamudai;
if (!confirm_sharesummary && !everyone_die)
@@ -1122,6 +1173,7 @@ static void alloc_storage()
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
+ pool0_workqueue_store = k_new_store(workqueue_free);
pool_workqueue_store = k_new_store(workqueue_free);
cmd_workqueue_store = k_new_store(workqueue_free);
btc_workqueue_store = k_new_store(workqueue_free);
@@ -1204,8 +1256,8 @@ static void alloc_storage()
shares_root = new_ktree(NULL, cmp_shares, shares_free);
shares_early_root = new_ktree("SharesEarly", cmp_shares, shares_free);
shares_hi_store = k_new_store(shares_free);
- shares_hi_root = new_ktree("SharesHi", cmp_shares, shares_free);
- shares_db_root = new_ktree("SharesDB", cmp_shares, shares_free);
+ shares_hi_root = new_ktree("SharesHi", cmp_shares_db, shares_free);
+ shares_db_root = new_ktree("SharesDB", cmp_shares_db, shares_free);
shareerrors_free = k_new_list("ShareErrors", sizeof(SHAREERRORS),
ALLOC_SHAREERRORS, LIMIT_SHAREERRORS,
@@ -1688,10 +1740,18 @@ static void dealloc_storage()
FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue);
+ // TODO: msgline
+ FREE_STORE(pool0_workqueue);
FREE_STORE(pool_workqueue);
FREE_STORE(cmd_workqueue);
FREE_STORE(btc_workqueue);
FREE_LIST(workqueue);
+ // TODO: sockets/buf/msgline
+ FREE_STORE(cmd_done_breakqueue);
+ FREE_STORE(cmd_breakqueue);
+ FREE_STORE(reload_done_breakqueue);
+ FREE_STORE(reload_breakqueue);
+ FREE_LIST(breakqueue);
FREE_LISTS(msgline);
if (free_mode != FREE_MODE_ALL)
@@ -1727,6 +1787,9 @@ static bool setup_data()
mutex_init(&wq_waitlock);
cond_init(&wq_waitcond);
+ LOGWARNING("%sSequence processing is %s",
+ ignore_seq ? "ALERT: " : "",
+ ignore_seq ? "Off" : "On");
LOGWARNING("%sStartup payout generation state is %s",
genpayout_auto ? "" : "WARNING: ",
genpayout_auto ? "On" : "Off");
@@ -2680,6 +2743,15 @@ static enum cmd_values process_seq(MSGLINE *msgline)
bool dupall, dupcmd;
char *st = NULL;
+ if (ignore_seq)
+ return ckdb_cmds[msgline->which_cmds].cmd_val;
+
+ /* If non-seqall data was in a CCL reload file,
+ * it can't be processed by update_seq(), so don't */
+ if (msgline->n_seqall == 0 && msgline->n_seqstt == 0 &&
+ msgline->n_seqpid == 0)
+ return ckdb_cmds[msgline->which_cmds].cmd_val;
+
dupall = update_seq(SEQ_ALL, msgline->n_seqall, msgline->n_seqstt,
msgline->n_seqpid, SEQALL, &(msgline->now),
&(msgline->cd), msgline->code,
@@ -3095,7 +3167,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
if (confirm_check_createdate)
check_createdate_ccl(msgline->cmd, &(msgline->cd));
if (seqall) {
- setup_seq(seqall, msgline);
+ if (!ignore_seq)
+ setup_seq(seqall, msgline);
free(cmdptr);
return ckdb_cmds[msgline->which_cmds].cmd_val;
} else {
@@ -3144,6 +3217,137 @@ nogood:
return CMD_REPLY;
}
+static void *breaker(void *arg)
+{
+ K_ITEM *bq_item = NULL;
+ BREAKQUEUE *bq = NULL;
+ char buf[128];
+ int thr, zeros;
+ bool reload, was_zero, msg = false;
+ int queue_sleep, queue_limit, count;
+
+ pthread_detach(pthread_self());
+
+ // Is this a reload thread or a cmd thread?
+ reload = *(bool *)(arg);
+ if (reload) {
+ queue_limit = RELOAD_QUEUE_LIMIT;
+ queue_sleep = RELOAD_QUEUE_SLEEP;
+ } else {
+ queue_limit = CMD_QUEUE_LIMIT;
+ queue_sleep = CMD_QUEUE_SLEEP;
+ }
+
+ ck_wlock(&breakdown_lock);
+ if (reload)
+ thr = ++reload_breakdown_count;
+ else
+ thr = ++cmd_breakdown_count;
+ breakdown_using_data = true;
+ ck_wunlock(&breakdown_lock);
+
+ if (breakdown_threads < 10)
+ zeros = 1;
+ else
+ zeros = (int)log10(breakdown_threads) + 1;
+
+ snprintf(buf, sizeof(buf), "db_%c%0*d%s",
+ reload ? 'r' : 'c', zeros, thr, __func__);
+ LOCK_INIT(buf);
+ rename_proc(buf);
+
+ if (reload) {
+ /* reload has to wait for the reload to start, however, also
+ * check for startup_complete in case we missed the reload */
+ while (!everyone_die && !reloading && !startup_complete)
+ cksleep_ms(queue_sleep);
+ }
+
+ while (!everyone_die) {
+ K_WLOCK(breakqueue_free);
+ bq_item = NULL;
+ was_zero = false;
+ if (reload)
+ count = reload_done_breakqueue_store->count;
+ else
+ count = cmd_done_breakqueue_store->count;
+
+ // Don't unlink if we are above the limit
+ if (count <= queue_limit) {
+ if (reload)
+ bq_item = k_unlink_head(reload_breakqueue_store);
+ else
+ bq_item = k_unlink_head(cmd_breakqueue_store);
+ if (!bq_item)
+ was_zero = true;
+ }
+ K_WUNLOCK(breakqueue_free);
+
+ if (!bq_item) {
+ // Is the queue empty and the reload completed?
+ if (was_zero && reload && !reloading)
+ break;
+
+ cksleep_ms(queue_sleep);
+ continue;
+ }
+
+ DATA_BREAKQUEUE(bq, bq_item);
+
+ if (reload) {
+ bool matched = false;
+ ck_wlock(&fpm_lock);
+ if (first_pool_message &&
+ strcmp(first_pool_message, bq->buf) == 0) {
+ matched = true;
+ FREENULL(first_pool_message);
+ }
+ ck_wunlock(&fpm_lock);
+ if (matched) {
+ LOGERR("%s() reload ckpool queue match at line %"PRIu64,
+ __func__, bq->count);
+ }
+ }
+
+ bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags);
+ K_WLOCK(breakqueue_free);
+ if (reload)
+ k_add_tail(reload_done_breakqueue_store, bq_item);
+ else
+ k_add_tail(cmd_done_breakqueue_store, bq_item);
+
+ if (breakqueue_free->count == breakqueue_free->total &&
+ breakqueue_free->total >= ALLOC_BREAKQUEUE * CULL_BREAKQUEUE)
+ k_cull_list(breakqueue_free);
+ K_WUNLOCK(breakqueue_free);
+ }
+
+ // Get it now while the lock still exists, in case we need it
+ K_RLOCK(breakqueue_free);
+ // Not 100% exact since it could still increase, but close enough
+ count = max_sockd_count;
+ K_RUNLOCK(breakqueue_free);
+
+ ck_wlock(&breakdown_lock);
+ if (reload)
+ reload_breakdown_count--;
+ else
+ cmd_breakdown_count--;
+
+ if ((reload_breakdown_count + cmd_breakdown_count) < 1) {
+ breakdown_using_data = false;
+ msg = true;
+ }
+ ck_wunlock(&breakdown_lock);
+
+ if (msg) {
+ LOGWARNING("%s() threads shut down - max_sockd_count=%d",
+ __func__, count);
+ }
+
+ return NULL;
+}
+
static void check_blocks()
{
K_TREE_CTX ctx[1];
@@ -4144,14 +4348,17 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item)
workqueue->code,
workqueue->inet,
&(msgline->cd),
- msgline->trf_root);
+ msgline->trf_root, false);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
msgline->now.tv_sec, ans);
- send_unix_msg(msgline->sockd, rep);
+ ckdb_unix_msg(msgline->sockd, rep);
close(msgline->sockd);
+ K_WLOCK(breakqueue_free);
+ sockd_count--;
+ K_WUNLOCK(breakqueue_free);
FREENULL(ans);
FREENULL(rep);
@@ -4166,6 +4373,8 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item)
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE)
k_cull_list(workqueue_free);
K_WUNLOCK(workqueue_free);
+
+ tick();
}
static void *clistener(__maybe_unused void *arg)
@@ -4174,6 +4383,8 @@ static void *clistener(__maybe_unused void *arg)
K_ITEM *wq_item;
time_t now;
+ pthread_detach(pthread_self());
+
LOCK_INIT("db_clistener");
rename_proc("db_clistener");
@@ -4194,10 +4405,9 @@ static void *clistener(__maybe_unused void *arg)
now = time(NULL);
}
- if (wq_item) {
+ if (wq_item)
process_sockd(conn, wq_item);
- tick();
- } else
+ else
cksleep_ms(42);
}
@@ -4215,12 +4425,13 @@ static void *blistener(__maybe_unused void *arg)
K_ITEM *wq_item;
time_t now;
+ pthread_detach(pthread_self());
+
LOCK_INIT("db_blistener");
rename_proc("db_blistener");
blistener_using_data = true;
- conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
@@ -4235,10 +4446,9 @@ static void *blistener(__maybe_unused void *arg)
now = time(NULL);
}
- if (wq_item) {
+ if (wq_item)
process_sockd(conn, wq_item);
- tick();
- } else
+ else
cksleep_ms(142);
}
@@ -4250,209 +4460,214 @@ static void *blistener(__maybe_unused void *arg)
return NULL;
}
-static void *socketer(__maybe_unused void *arg)
+static void *process_socket(void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
- pthread_t clis_pt, blis_pt;
- unixsock_t *us = &pi->us;
- char *end, *ans = NULL, *rep = NULL, *buf = NULL, *tmp;
- enum cmd_values cmdnum;
- int sockd;
- K_ITEM *wq_item = NULL, *ml_item = NULL;
- WORKQUEUE *workqueue;
- MSGLINE *msgline;
+ K_ITEM *bq_item = NULL, *wq_item = NULL;
+ WORKQUEUE *workqueue = NULL;
+ BREAKQUEUE *bq = NULL;
+ MSGLINE *msgline = NULL;
+ bool want_first, replied, btc, dec_sockd;
+ int loglevel, oldloglevel;
char reply[1024+1];
+ char *ans = NULL, *rep = NULL, *tmp;
size_t siz;
- tv_t now;
- bool want_first, replied, btc;
- int loglevel, oldloglevel;
pthread_detach(pthread_self());
- LOCK_INIT("db_socketer");
- rename_proc("db_socketer");
-
- while (!everyone_die && !db_users_complete)
- cksem_mswait(&socketer_sem, 420);
-
- if (!everyone_die) {
- LOGWARNING("%s() Start processing...", __func__);
- socketer_using_data = true;
-
- create_pthread(&clis_pt, clistener, NULL);
-
- create_pthread(&blis_pt, blistener, NULL);
- }
+ LOCK_INIT("db_procsock");
+ rename_proc("db_procsock");
want_first = true;
while (!everyone_die) {
- if (buf)
- dealloc(buf);
- sockd = accept(us->sockd, NULL, NULL);
- if (sockd < 0) {
- LOGERR("%s() Failed to accept on socket", __func__);
- break;
+ K_WLOCK(breakqueue_free);
+ bq_item = k_unlink_head(cmd_done_breakqueue_store);
+ if (bq_item)
+ cmd_processing++;
+ K_WUNLOCK(breakqueue_free);
+
+ if (!bq_item) {
+ cksleep_ms(24);
+ continue;
}
- cmdnum = CMD_UNSET;
-
- buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2);
- // Once we've read the message
- setnow(&now);
- if (buf) {
- end = buf + strlen(buf) - 1;
- // strip trailing \n and \r
- while (end >= buf && (*end == '\n' || *end == '\r'))
- *(end--) = '\0';
- }
- if (!buf || !*buf) {
- // An empty message wont get a reply
- if (!buf)
- LOGWARNING("%s() Failed to get message", __func__);
- else
- LOGWARNING("%s() Empty message", __func__);
- } else {
- int seqentryflags = SE_SOCKET;
- if (!reload_queue_complete)
- seqentryflags = SE_EARLYSOCK;
- cmdnum = breakdown(&ml_item, buf, &now, seqentryflags);
- DATA_MSGLINE(msgline, ml_item);
- replied = btc = false;
- switch (cmdnum) {
- case CMD_REPLY:
- snprintf(reply, sizeof(reply),
- "%s.%ld.?.",
- msgline->id,
- now.tv_sec);
- send_unix_msg(sockd, reply);
- break;
- case CMD_ALERTEVENT:
- case CMD_ALERTOVENT:
- snprintf(reply, sizeof(reply),
- "%s.%ld.failed.ERR",
- msgline->id,
- now.tv_sec);
- if (cmdnum == CMD_ALERTEVENT)
- tmp = reply_event(EVENTID_NONE, reply);
- else
- tmp = reply_ovent(OVENTID_NONE, reply);
- send_unix_msg(sockd, tmp);
- FREENULL(tmp);
- break;
- case CMD_TERMINATE:
- LOGWARNING("Listener received"
- " terminate message,"
- " terminating ckdb");
- snprintf(reply, sizeof(reply),
- "%s.%ld.ok.exiting",
- msgline->id,
- now.tv_sec);
- send_unix_msg(sockd, reply);
- everyone_die = true;
- break;
- case CMD_PING:
- LOGDEBUG("Listener received ping"
- " request");
- snprintf(reply, sizeof(reply),
- "%s.%ld.ok.pong",
- msgline->id,
- now.tv_sec);
- send_unix_msg(sockd, reply);
- break;
- case CMD_VERSION:
+ DATA_BREAKQUEUE(bq, bq_item);
+ DATA_MSGLINE(msgline, bq->ml_item);
+ replied = btc = false;
+ switch (bq->cmdnum) {
+ case CMD_REPLY:
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.?.",
+ msgline->id,
+ bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ break;
+ case CMD_ALERTEVENT:
+ case CMD_ALERTOVENT:
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.failed.ERR",
+ msgline->id,
+ bq->now.tv_sec);
+ if (bq->cmdnum == CMD_ALERTEVENT)
+ tmp = reply_event(EVENTID_NONE, reply);
+ else
+ tmp = reply_ovent(OVENTID_NONE, reply);
+ ckdb_unix_msg(bq->sockd, tmp);
+ FREENULL(tmp);
+ break;
+ case CMD_TERMINATE:
+ LOGWARNING("Listener received"
+ " terminate message,"
+ " terminating ckdb");
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.exiting",
+ msgline->id,
+ bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ everyone_die = true;
+ break;
+ case CMD_PING:
+ LOGDEBUG("Listener received ping"
+ " request");
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.pong",
+ msgline->id,
+ bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ break;
+ case CMD_VERSION:
+ LOGDEBUG("Listener received"
+ " version request");
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.CKDB V%s",
+ msgline->id,
+ bq->now.tv_sec,
+ CKDB_VERSION);
+ ckdb_unix_msg(bq->sockd, reply);
+ break;
+ case CMD_LOGLEVEL:
+ if (!*(msgline->id)) {
LOGDEBUG("Listener received"
- " version request");
+ " loglevel, currently %d",
+ pi->ckp->loglevel);
snprintf(reply, sizeof(reply),
- "%s.%ld.ok.CKDB V%s",
+ "%s.%ld.ok.loglevel"
+ " currently %d",
msgline->id,
- now.tv_sec,
- CKDB_VERSION);
- send_unix_msg(sockd, reply);
- break;
- case CMD_LOGLEVEL:
- if (!*(msgline->id)) {
- LOGDEBUG("Listener received"
- " loglevel, currently %d",
- pi->ckp->loglevel);
+ bq->now.tv_sec,
+ pi->ckp->loglevel);
+ } else {
+ oldloglevel = pi->ckp->loglevel;
+ loglevel = atoi(msgline->id);
+ LOGDEBUG("Listener received loglevel"
+ " %d currently %d A",
+ loglevel, oldloglevel);
+ if (loglevel < LOG_EMERG ||
+ loglevel > LOG_DEBUG) {
snprintf(reply, sizeof(reply),
- "%s.%ld.ok.loglevel"
- " currently %d",
+ "%s.%ld.ERR.invalid"
+ " loglevel %d"
+ " - currently %d",
msgline->id,
- now.tv_sec,
- pi->ckp->loglevel);
+ bq->now.tv_sec,
+ loglevel,
+ oldloglevel);
} else {
- oldloglevel = pi->ckp->loglevel;
- loglevel = atoi(msgline->id);
- LOGDEBUG("Listener received loglevel"
- " %d currently %d A",
- loglevel, oldloglevel);
- if (loglevel < LOG_EMERG ||
- loglevel > LOG_DEBUG) {
- snprintf(reply, sizeof(reply),
- "%s.%ld.ERR.invalid"
- " loglevel %d"
- " - currently %d",
- msgline->id,
- now.tv_sec,
- loglevel,
- oldloglevel);
- } else {
- pi->ckp->loglevel = loglevel;
- snprintf(reply, sizeof(reply),
- "%s.%ld.ok.loglevel"
- " now %d - was %d",
- msgline->id,
- now.tv_sec,
- pi->ckp->loglevel,
- oldloglevel);
- }
- // Do this twice since the loglevel may have changed
- LOGDEBUG("Listener received loglevel"
- " %d currently %d B",
- loglevel, oldloglevel);
+ pi->ckp->loglevel = loglevel;
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.loglevel"
+ " now %d - was %d",
+ msgline->id,
+ bq->now.tv_sec,
+ pi->ckp->loglevel,
+ oldloglevel);
}
- send_unix_msg(sockd, reply);
- break;
- case CMD_FLUSH:
- LOGDEBUG("Listener received"
- " flush request");
+ // Do this twice since the loglevel may have changed
+ LOGDEBUG("Listener received loglevel"
+ " %d currently %d B",
+ loglevel, oldloglevel);
+ }
+ ckdb_unix_msg(bq->sockd, reply);
+ break;
+ case CMD_FLUSH:
+ LOGDEBUG("Listener received"
+ " flush request");
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.splash",
+ msgline->id, bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ fflush(stdout);
+ fflush(stderr);
+ if (global_ckp && global_ckp->logfd)
+ fflush(global_ckp->logfp);
+ break;
+ case CMD_USERSET:
+ case CMD_BTCSET:
+ btc = true;
+ case CMD_CHKPASS:
+ case CMD_2FA:
+ case CMD_ADDUSER:
+ case CMD_NEWPASS:
+ case CMD_WORKERSET:
+ case CMD_GETATTS:
+ case CMD_SETATTS:
+ case CMD_EXPATTS:
+ case CMD_GETOPTS:
+ case CMD_SETOPTS:
+ case CMD_BLOCKLIST:
+ case CMD_NEWID:
+ case CMD_STATS:
+ case CMD_USERSTATUS:
+ case CMD_SHSTA:
+ case CMD_USERINFO:
+ case CMD_LOCKS:
+ case CMD_EVENTS:
+ case CMD_HIGH:
+ msgline->sockd = bq->sockd;
+ bq->sockd = -1;
+ K_WLOCK(workqueue_free);
+ wq_item = k_unlink_head(workqueue_free);
+ DATA_WORKQUEUE(workqueue, wq_item);
+ workqueue->msgline_item = bq->ml_item;
+ workqueue->by = by_default;
+ workqueue->code = (char *)__func__;
+ workqueue->inet = inet_default;
+ if (btc)
+ k_add_tail(btc_workqueue_store, wq_item);
+ else
+ k_add_tail(cmd_workqueue_store, wq_item);
+ K_WUNLOCK(workqueue_free);
+ wq_item = bq->ml_item = NULL;
+ break;
+ // Process, but reject (loading) until startup_complete
+ case CMD_HOMEPAGE:
+ case CMD_ALLUSERS:
+ case CMD_WORKERS:
+ case CMD_PAYMENTS:
+ case CMD_PPLNS:
+ case CMD_PPLNS2:
+ case CMD_PAYOUTS:
+ case CMD_MPAYOUTS:
+ case CMD_SHIFTS:
+ case CMD_PSHIFT:
+ case CMD_DSP:
+ case CMD_BLOCKSTATUS:
+ case CMD_MARKS:
+ case CMD_QUERY:
+ if (!startup_complete) {
snprintf(reply, sizeof(reply),
- "%s.%ld.ok.splash",
- msgline->id, now.tv_sec);
- send_unix_msg(sockd, reply);
- fflush(stdout);
- fflush(stderr);
- if (global_ckp && global_ckp->logfd)
- fflush(global_ckp->logfp);
- break;
- case CMD_USERSET:
- case CMD_BTCSET:
- btc = true;
- case CMD_CHKPASS:
- case CMD_2FA:
- case CMD_ADDUSER:
- case CMD_NEWPASS:
- case CMD_WORKERSET:
- case CMD_GETATTS:
- case CMD_SETATTS:
- case CMD_EXPATTS:
- case CMD_GETOPTS:
- case CMD_SETOPTS:
- case CMD_BLOCKLIST:
- case CMD_NEWID:
- case CMD_STATS:
- case CMD_USERSTATUS:
- case CMD_SHSTA:
- case CMD_USERINFO:
- case CMD_LOCKS:
- case CMD_EVENTS:
- case CMD_HIGH:
- msgline->sockd = sockd;
- sockd = -1;
+ "%s.%ld.loading.%s",
+ msgline->id,
+ bq->now.tv_sec,
+ msgline->cmd);
+ ckdb_unix_msg(bq->sockd, reply);
+ } else {
+ msgline->sockd = bq->sockd;
+ bq->sockd = -1;
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_free);
DATA_WORKQUEUE(workqueue, wq_item);
- workqueue->msgline_item = ml_item;
+ workqueue->msgline_item = bq->ml_item;
workqueue->by = by_default;
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
@@ -4461,110 +4676,77 @@ static void *socketer(__maybe_unused void *arg)
else
k_add_tail(cmd_workqueue_store, wq_item);
K_WUNLOCK(workqueue_free);
- wq_item = ml_item = NULL;
- break;
- // Process, but reject (loading) until startup_complete
- case CMD_HOMEPAGE:
- case CMD_ALLUSERS:
- case CMD_WORKERS:
- case CMD_PAYMENTS:
- case CMD_PPLNS:
- case CMD_PPLNS2:
- case CMD_PAYOUTS:
- case CMD_MPAYOUTS:
- case CMD_SHIFTS:
- case CMD_PSHIFT:
- case CMD_DSP:
- case CMD_BLOCKSTATUS:
- case CMD_MARKS:
- case CMD_QUERY:
- if (!startup_complete) {
- snprintf(reply, sizeof(reply),
- "%s.%ld.loading.%s",
- msgline->id,
- now.tv_sec,
- msgline->cmd);
- send_unix_msg(sockd, reply);
- } else {
- msgline->sockd = sockd;
- sockd = -1;
- K_WLOCK(workqueue_free);
- wq_item = k_unlink_head(workqueue_free);
- DATA_WORKQUEUE(workqueue, wq_item);
- workqueue->msgline_item = ml_item;
- workqueue->by = by_default;
- workqueue->code = (char *)__func__;
- workqueue->inet = inet_default;
- if (btc)
- k_add_tail(btc_workqueue_store, wq_item);
- else
- k_add_tail(cmd_workqueue_store, wq_item);
- K_WUNLOCK(workqueue_free);
- wq_item = ml_item = NULL;
- }
- break;
- // Always process immediately:
- case CMD_AUTH:
- case CMD_ADDRAUTH:
- case CMD_HEARTBEAT:
+ wq_item = bq->ml_item = NULL;
+ }
+ break;
+ // Always process immediately:
+ case CMD_AUTH:
+ case CMD_ADDRAUTH:
+ case CMD_HEARTBEAT:
+ // First message from the pool
+ if (want_first) {
+ want_first = false;
+ ck_wlock(&fpm_lock);
+ first_pool_message = strdup(bq->buf);
+ ck_wunlock(&fpm_lock);
+ }
+ DATA_MSGLINE(msgline, bq->ml_item);
+ ans = ckdb_cmds[msgline->which_cmds].func(NULL,
+ msgline->cmd,
+ msgline->id,
+ &(msgline->now),
+ by_default,
+ (char *)__func__,
+ inet_default,
+ &(msgline->cd),
+ msgline->trf_root, false);
+ siz = strlen(ans) + strlen(msgline->id) + 32;
+ rep = malloc(siz);
+ snprintf(rep, siz, "%s.%ld.%s",
+ msgline->id,
+ bq->now.tv_sec, ans);
+ ckdb_unix_msg(bq->sockd, rep);
+ FREENULL(ans);
+ FREENULL(rep);
+ replied = true;
+ // Always queue (ok.queued)
+ case CMD_SHARELOG:
+ case CMD_POOLSTAT:
+ case CMD_USERSTAT:
+ case CMD_WORKERSTAT:
+ case CMD_BLOCK:
+ if (!replied) {
// First message from the pool
if (want_first) {
want_first = false;
ck_wlock(&fpm_lock);
- first_pool_message = strdup(buf);
+ first_pool_message = strdup(bq->buf);
ck_wunlock(&fpm_lock);
}
- DATA_MSGLINE(msgline, ml_item);
- ans = ckdb_cmds[msgline->which_cmds].func(NULL,
- msgline->cmd,
- msgline->id,
- &(msgline->now),
- by_default,
- (char *)__func__,
- inet_default,
- &(msgline->cd),
- msgline->trf_root);
- siz = strlen(ans) + strlen(msgline->id) + 32;
- rep = malloc(siz);
- snprintf(rep, siz, "%s.%ld.%s",
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.ok.queued",
msgline->id,
- now.tv_sec, ans);
- send_unix_msg(sockd, rep);
- FREENULL(ans);
- replied = true;
- // Always queue (ok.queued)
- case CMD_SHARELOG:
- case CMD_POOLSTAT:
- case CMD_USERSTAT:
- case CMD_WORKERSTAT:
- case CMD_BLOCK:
- if (!replied) {
- // First message from the pool
- if (want_first) {
- want_first = false;
- ck_wlock(&fpm_lock);
- first_pool_message = strdup(buf);
- ck_wunlock(&fpm_lock);
- }
- snprintf(reply, sizeof(reply),
- "%s.%ld.ok.queued",
- msgline->id,
- now.tv_sec);
- send_unix_msg(sockd, reply);
- }
+ bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ }
- K_WLOCK(workqueue_free);
- wq_item = k_unlink_head(workqueue_free);
- DATA_WORKQUEUE(workqueue, wq_item);
- workqueue->msgline_item = ml_item;
- workqueue->by = by_default;
- workqueue->code = (char *)__func__;
- workqueue->inet = inet_default;
+ K_WLOCK(workqueue_free);
+ wq_item = k_unlink_head(workqueue_free);
+ DATA_WORKQUEUE(workqueue, wq_item);
+ workqueue->msgline_item = bq->ml_item;
+ workqueue->by = by_default;
+ workqueue->code = (char *)__func__;
+ workqueue->inet = inet_default;
+ if (bq->seqentryflags == SE_SOCKET)
k_add_tail(pool_workqueue_store, wq_item);
+ else {
+ k_add_tail(pool0_workqueue_store, wq_item);
/* Stop the reload queue from growing too big
- * Use a size that should be big enough */
- if (reloading && pool_workqueue_store->count > 250000) {
- K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store);
+ * Use a size that 'should be big enough' */
+ if (reloading && pool0_workqueue_store->count > 250000) {
+ K_ITEM *wq2_item = k_unlink_head(pool0_workqueue_store);
+ pool0_discarded++;
+ pool0_left--;
K_WUNLOCK(workqueue_free);
WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item);
@@ -4576,91 +4758,185 @@ static void *socketer(__maybe_unused void *arg)
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq2_item);
}
- K_WUNLOCK(workqueue_free);
- wq_item = ml_item = NULL;
- mutex_lock(&wq_waitlock);
- pthread_cond_signal(&wq_waitcond);
- mutex_unlock(&wq_waitlock);
- break;
- // Code error
- default:
- LOGEMERG("%s() CODE ERROR unhandled"
- " message %d %.32s...",
- __func__, cmdnum, buf);
- snprintf(reply, sizeof(reply),
- "%s.%ld.failed.code",
- msgline->id,
- now.tv_sec);
- send_unix_msg(sockd, reply);
- break;
- }
+ }
+ K_WUNLOCK(workqueue_free);
+ wq_item = bq->ml_item = NULL;
+ mutex_lock(&wq_waitlock);
+ pthread_cond_signal(&wq_waitcond);
+ mutex_unlock(&wq_waitlock);
+ break;
+ // Code error
+ default:
+ LOGEMERG("%s() CODE ERROR unhandled"
+ " message %d %.32s...",
+ __func__, bq->cmdnum, bq->buf);
+ snprintf(reply, sizeof(reply),
+ "%s.%ld.failed.code",
+ msgline->id,
+ bq->now.tv_sec);
+ ckdb_unix_msg(bq->sockd, reply);
+ break;
}
- if (sockd >= 0)
- close(sockd);
+ if (bq->sockd >= 0) {
+ close(bq->sockd);
+ dec_sockd = true;
+ } else
+ dec_sockd = false;
- if (ml_item) {
- free_msgline_data(ml_item, true, true);
+ if (bq->ml_item) {
+ free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
- k_add_head(msgline_free, ml_item);
+ k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
- ml_item = NULL;
+ bq->ml_item = NULL;
}
+ free(bq->buf);
+
+ K_WLOCK(breakqueue_free);
+ if (dec_sockd)
+ sockd_count--;
+ cmd_processing--;
+ k_add_head(breakqueue_free, bq_item);
+ K_WUNLOCK(breakqueue_free);
+ }
- tick();
+ return NULL;
+}
+
+static void *socketer(void *arg)
+{
+ proc_instance_t *pi = (proc_instance_t *)arg;
+ pthread_t clis_pt, blis_pt, proc_pt;
+ unixsock_t *us = &pi->us;
+ char *end, *buf = NULL;
+ K_ITEM *bq_item = NULL;
+ BREAKQUEUE *bq = NULL;
+ int sockd;
+ tv_t now;
+
+ pthread_detach(pthread_self());
+
+ LOCK_INIT("db_socketer");
+ rename_proc("db_socketer");
+
+ while (!everyone_die && !db_users_complete)
+ cksem_mswait(&socketer_sem, 420);
+
+ if (!everyone_die) {
+ LOGWARNING("%s() Start processing...", __func__);
+ socketer_using_data = true;
+
+ create_pthread(&clis_pt, clistener, NULL);
+
+ create_pthread(&blis_pt, blistener, NULL);
+
+ create_pthread(&proc_pt, process_socket, arg);
+ }
+
+ while (!everyone_die) {
+ sockd = accept(us->sockd, NULL, NULL);
+ if (sockd < 0) {
+ LOGERR("%s() Failed to accept on socket", __func__);
+ break;
+ }
+
+ buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2);
+ // Once we've read the message
+ setnow(&now);
+ if (buf) {
+ end = buf + strlen(buf) - 1;
+ // strip trailing \n and \r
+ while (end >= buf && (*end == '\n' || *end == '\r'))
+ *(end--) = '\0';
+ }
+ if (!buf || !*buf) {
+ // An empty message wont get a reply
+ if (!buf)
+ LOGWARNING("%s() Failed to get message", __func__);
+ else {
+ LOGWARNING("%s() Empty message", __func__);
+ free(buf);
+ }
+ } else {
+ int seqentryflags = SE_SOCKET;
+ if (!reload_queue_complete) {
+ seqentryflags = SE_EARLYSOCK;
+ K_WLOCK(workqueue_free);
+ pool0_tot++;
+ pool0_left++;
+ K_WUNLOCK(workqueue_free);
+ }
+
+ // Don't limit the speed filling up cmd_breakqueue_store
+ K_WLOCK(breakqueue_free);
+ bq_item = k_unlink_head(breakqueue_free);
+ // keep the lock since none of these should be slow
+ DATA_BREAKQUEUE(bq, bq_item);
+ bq->buf = buf;
+ copy_tv(&(bq->now), &now);
+ bq->seqentryflags = seqentryflags;
+ bq->sockd = sockd;
+ if (max_sockd_count < ++sockd_count)
+ max_sockd_count = sockd_count;
+ k_add_tail(cmd_breakqueue_store, bq_item);
+ K_WUNLOCK(breakqueue_free);
+ }
}
socketer_using_data = false;
- if (buf)
- dealloc(buf);
close_unix_socket(us->sockd, us->path);
+ // Since the socket is dead ...
+ everyone_die = true;
+
return NULL;
}
-static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
+static void *process_reload(__maybe_unused void *arg)
{
+ PGconn *conn = NULL;
+ MSGLINE *msgline = NULL;
+ K_ITEM *bq_item = NULL;
+ BREAKQUEUE *bq = NULL;
enum cmd_values cmdnum;
- char *end, *ans, *st = NULL;
- MSGLINE *msgline;
- K_ITEM *ml_item;
- tv_t now;
- bool matched;
+ char *ans, *st = NULL;
+ time_t now;
- // Once we've read the message
- setnow(&now);
- if (buf) {
- end = buf + strlen(buf) - 1;
- // strip trailing \n and \r
- while (end >= buf && (*end == '\n' || *end == '\r'))
- *(end--) = '\0';
- }
- if (!buf || !*buf) {
- if (!buf) {
- LOGERR("%s() NULL message line %"PRIu64,
- __func__, count);
- } else {
- LOGERR("%s() Empty message line %"PRIu64,
- __func__, count);
- }
- } else {
- matched = false;
- ck_wlock(&fpm_lock);
- if (first_pool_message &&
- strcmp(first_pool_message, buf) == 0) {
- matched = true;
- FREENULL(first_pool_message);
+ pthread_detach(pthread_self());
+
+ LOCK_INIT("db_procreload");
+ rename_proc("db_procreload");
+
+ conn = dbconnect();
+ now = time(NULL);
+
+ while (!everyone_die) {
+ K_WLOCK(breakqueue_free);
+ bq_item = k_unlink_head(reload_done_breakqueue_store);
+ if (bq_item)
+ reload_processing++;
+ K_WUNLOCK(breakqueue_free);
+
+ if (!bq_item) {
+ // Finished reloading?
+ if (!reloading)
+ break;
+
+ cksleep_ms(24);
+ continue;
}
- ck_wunlock(&fpm_lock);
- if (matched) {
- LOGERR("%s() reload ckpool queue match at line %"PRIu64,
- __func__, count);
+
+ // Don't keep a connection for more than ~10s ... of processing
+ if ((time(NULL) - now) > 10) {
+ PQfinish(conn);
+ conn = dbconnect();
+ now = time(NULL);
}
- // ml_item is set for all but CMD_REPLY
- cmdnum = breakdown(&ml_item, buf, &now, SE_RELOAD);
- DATA_MSGLINE(msgline, ml_item);
- switch (cmdnum) {
+ DATA_BREAKQUEUE(bq, bq_item);
+ DATA_MSGLINE(msgline, bq->ml_item);
+ switch (bq->cmdnum) {
// Ignore
case CMD_REPLY:
case CMD_ALERTEVENT:
@@ -4710,7 +4986,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
case CMD_HIGH:
LOGERR("%s() INVALID message line %"PRIu64
" ignored '%.42s...",
- __func__, count,
+ __func__, bq->count,
st = safe_text(msgline->msg));
FREENULL(st);
break;
@@ -4735,7 +5011,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
(char *)__func__,
inet_default,
&(msgline->cd),
- msgline->trf_root);
+ msgline->trf_root, true);
FREENULL(ans);
}
break;
@@ -4743,23 +5019,85 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
// Force this switch to be updated if new cmds are added
quithere(1, "%s line %"PRIu64" '%s' - not "
"handled by reload",
- filename, count,
+ bq->filename, bq->count,
st = safe_text_nonull(msgline->cmd));
// Won't get here ...
FREENULL(st);
break;
}
- if (ml_item) {
- free_msgline_data(ml_item, true, true);
+ if (bq->ml_item) {
+ free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
- k_add_head(msgline_free, ml_item);
+ k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
- ml_item = NULL;
+ bq->ml_item = NULL;
}
+ free(bq->buf);
+
+ K_WLOCK(breakqueue_free);
+ reload_processing--;
+ k_add_head(breakqueue_free, bq_item);
+ K_WUNLOCK(breakqueue_free);
+
+ tick();
}
- tick();
+ PQfinish(conn);
+
+ return NULL;
+}
+
+static void reload_line(char *filename, char *buf, uint64_t count)
+{
+ K_ITEM *bq_item = NULL;
+ BREAKQUEUE *bq = NULL;
+ int qcount;
+ char *end;
+ tv_t now;
+
+ // Once we've read the message
+ setnow(&now);
+ if (buf) {
+ end = buf + strlen(buf) - 1;
+ // strip trailing \n and \r
+ while (end >= buf && (*end == '\n' || *end == '\r'))
+ *(end--) = '\0';
+ }
+ if (!buf || !*buf) {
+ if (!buf) {
+ LOGERR("%s() NULL message line %"PRIu64,
+ __func__, count);
+ } else {
+ LOGERR("%s() Empty message line %"PRIu64,
+ __func__, count);
+ }
+ } else {
+ K_WLOCK(breakqueue_free);
+ bq_item = k_unlink_head(breakqueue_free);
+ K_WUNLOCK(breakqueue_free);
+
+ // release the lock since strdup could be slow, but rarely
+ DATA_BREAKQUEUE(bq, bq_item);
+ bq->buf = strdup(buf);
+ copy_tv(&(bq->now), &now);
+ bq->seqentryflags = SE_RELOAD;
+ bq->sockd = -1;
+ bq->count = count;
+ bq->filename = filename;
+
+ K_WLOCK(breakqueue_free);
+ k_add_tail(reload_breakqueue_store, bq_item);
+ qcount = reload_breakqueue_store->count;
+ K_WUNLOCK(breakqueue_free);
+
+ while (qcount > RELOAD_QUEUE_LIMIT) {
+ cksleep_ms(RELOAD_QUEUE_SLEEP);
+ K_RLOCK(breakqueue_free);
+ qcount = reload_breakqueue_store->count;
+ K_RUNLOCK(breakqueue_free);
+ }
+ }
}
// 10Mb for now - transactiontree can be large
@@ -4824,7 +5162,8 @@ static bool logopen(char **filename, FILE **fp, bool *apipe)
errn, buf);
} else {
*apipe = true;
- free(*filename);
+ /* Don't free the old filename since
+ * process_reload() could still access it */
*filename = name;
return true;
}
@@ -4846,12 +5185,12 @@ static bool logopen(char **filename, FILE **fp, bool *apipe)
* if ckdb aborts at the beginning of the reload, then start again */
static bool reload_from(tv_t *start)
{
- PGconn *conn = NULL;
+ // proc_pt could exit after this returns
+ static pthread_t proc_pt;
char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1];
size_t rflen = strlen(restorefrom);
char *missingfirst = NULL, *missinglast = NULL, *st = NULL;
- int missing_count;
- int processing;
+ int missing_count, processing, counter;
bool finished = false, ret = true, ok, apipe = false;
char *filename = NULL;
uint64_t count, total;
@@ -4859,6 +5198,7 @@ static bool reload_from(tv_t *start)
double diff;
FILE *fp = NULL;
int file_N_limit;
+ time_t tick_time, tmp_time;
reload_buf = malloc(MAX_READ);
if (!reload_buf)
@@ -4888,10 +5228,11 @@ static bool reload_from(tv_t *start)
LOGQUE(reload_buf, true);
LOGQUE(reload_buf, false);
- conn = dbconnect();
+ create_pthread(&proc_pt, process_reload, NULL);
total = 0;
processing = 0;
+ tick_time = time(NULL);
while (!everyone_die && !finished) {
LOGWARNING("%s(): processing %s", __func__, filename);
processing++;
@@ -4904,7 +5245,32 @@ static bool reload_from(tv_t *start)
* order messages in the log file */
while (!everyone_die &&
logline(reload_buf, MAX_READ, fp, filename)) {
- reload_line(conn, filename, ++count, reload_buf);
+ reload_line(filename, reload_buf, ++count);
+
+ tmp_time = time(NULL);
+ // Report stats every 15s
+ if ((tmp_time - tick_time) > 14) {
+ int relq, relqd, cmdq, cmdqd, mx, pool0q;
+ K_RLOCK(breakqueue_free);
+ relq = reload_breakqueue_store->count +
+ reload_processing;
+ relqd = reload_done_breakqueue_store->count;
+ cmdq = cmd_breakqueue_store->count +
+ cmd_processing;
+ cmdqd = cmd_done_breakqueue_store->count;
+ mx = max_sockd_count;
+ K_RUNLOCK(breakqueue_free);
+ K_RLOCK(workqueue_free);
+ pool0q = pool0_workqueue_store->count;
+ // pool_workqueue_store should be zero
+ K_RUNLOCK(workqueue_free);
+ printf(TICK_PREFIX"reload %"PRIu64"/%d/%d"
+ " ckp %d/%d/%d (%d) \r",
+ total+count, relq, relqd,
+ cmdq, cmdqd, pool0q, mx);
+ fflush(stdout);
+ tick_time = tmp_time;
+ }
}
LOGWARNING("%s(): %sread %"PRIu64" line%s from %s",
@@ -4922,7 +5288,8 @@ static bool reload_from(tv_t *start)
}
} else
fclose(fp);
- free(filename);
+ /* Don't free the old filename since
+ * process_reload() could access use it */
if (everyone_die)
break;
reload_timestamp.tv_sec += ROLL_S;
@@ -4979,7 +5346,15 @@ static bool reload_from(tv_t *start)
}
}
- PQfinish(conn);
+ while (!everyone_die) {
+ K_RLOCK(breakqueue_free);
+ counter = reload_done_breakqueue_store->count +
+ reload_breakqueue_store->count + reload_processing;
+ K_RUNLOCK(breakqueue_free);
+ if (counter == 0)
+ break;
+ cksleep_ms(142);
+ }
setnow(&now);
diff = tvdiff(&now, &begin);
@@ -5044,7 +5419,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item)
workqueue->code,
workqueue->inet,
&(msgline->cd),
- msgline->trf_root);
+ msgline->trf_root, false);
FREENULL(ans);
break;
}
@@ -5083,17 +5458,20 @@ static void *listener(void *arg)
pthread_t sock_pt;
pthread_t summ_pt;
pthread_t mark_pt;
+ pthread_t break_pt;
K_ITEM *wq_item;
time_t now;
- int wqcount, wqgot;
+ int bq, bqp, bqd, wq0count, wqcount, wqgot;
char ooo_buf[256];
tv_t wq_stt, wq_fin;
double min, sec;
- int left;
SEQSET *seqset = NULL;
SEQDATA *seqdata;
K_ITEM *ss_item;
- int i;
+ int cpus, i;
+ bool reloader, cmder, pool0;
+
+ pthread_detach(pthread_self());
LOCK_INIT("db_plistener");
rename_proc("db_plistener");
@@ -5102,9 +5480,29 @@ static void *listener(void *arg)
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
+ breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
+ ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
+ reload_breakqueue_store = k_new_store(breakqueue_free);
+ reload_done_breakqueue_store = k_new_store(breakqueue_free);
+ cmd_breakqueue_store = k_new_store(breakqueue_free);
+ cmd_done_breakqueue_store = k_new_store(breakqueue_free);
+
#if LOCK_CHECK
DLPRIO(logqueue, 94);
+ DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
+ if (breakdown_threads <= 0) {
+ cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1;
+ breakdown_threads = (int)(cpus / 3) ? : 1;
+ }
+ LOGWARNING("%s(): creating %d*2 breaker threads ...",
+ __func__, breakdown_threads);
+ reloader = true;
+ for (i = 0; i < breakdown_threads; i++)
+ create_pthread(&break_pt, breaker, &reloader);
+ cmder = false;
+ for (i = 0; i < breakdown_threads; i++)
+ create_pthread(&break_pt, breaker, &cmder);
create_pthread(&log_pt, logger, NULL);
@@ -5126,13 +5524,22 @@ static void *listener(void *arg)
if (!everyone_die) {
K_RLOCK(workqueue_free);
+ wq0count = pool0_workqueue_store->count;
wqcount = pool_workqueue_store->count;
+ K_RLOCK(breakqueue_free);
+ bq = cmd_breakqueue_store->count;
+ bqp = cmd_processing;
+ bqd = cmd_done_breakqueue_store->count;
+ K_RUNLOCK(breakqueue_free);
K_RUNLOCK(workqueue_free);
- LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
+ LOGWARNING("reload shares OoO %s",
+ ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
- LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount);
+ LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)",
+ __func__, bq+bqp+bqd+wq0count+wqcount,
+ bq, bqp, bqd, wq0count, wqcount);
/* Until startup_complete, the values should be ignored
* Setting them to 'now' means that they won't time out
@@ -5154,14 +5561,26 @@ static void *listener(void *arg)
wqgot = 0;
}
- // Process queued work
+ /* Process queued work - ensure pool0 is emptied first,
+ * even if there is pending pool0 data being processed by breaker() */
+ pool0 = true;
while (!everyone_die) {
+ wq_item = NULL;
K_WLOCK(workqueue_free);
- wq_item = k_unlink_head(pool_workqueue_store);
- left = pool_workqueue_store->count;
+ if (pool0) {
+ if (pool0_left == 0)
+ pool0 = false;
+ else {
+ wq_item = k_unlink_head(pool0_workqueue_store);
+ if (wq_item)
+ pool0_left--;
+ }
+ }
+ if (!pool0)
+ wq_item = k_unlink_head(pool_workqueue_store);
K_WUNLOCK(workqueue_free);
- if (left == 0 && wq_stt.tv_sec != 0L)
+ if (!pool0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin);
/* Don't keep a connection for more than ~10s or ~10000 items
@@ -5179,11 +5598,11 @@ static void *listener(void *arg)
tick();
}
- if (left == 0 && wq_stt.tv_sec != 0L) {
+ if (!pool0 && wq_stt.tv_sec != 0L) {
sec = tvdiff(&wq_fin, &wq_stt);
min = floor(sec / 60.0);
sec -= min * 60.0;
- LOGWARNING("reload queue completed %.0fm %.3fs", min, sec);
+ LOGWARNING("pool queue completed %.0fm %.3fs", min, sec);
// Used as the flag to display the message once
wq_stt.tv_sec = 0L;
reload_queue_complete = true;
@@ -5821,7 +6240,11 @@ static void check_restore_dir(char *name)
static struct option long_options[] = {
// script to call when alerts happen
- { "alert", required_argument, 0, 'c' },
+ { "alert", required_argument, 0, 'a' },
+ // workinfoid to start shares_fill() default is 1 day
+ { "shares-begin", required_argument, 0, 'b' },
+ // override calculated value
+ { "breakdown-threads", required_argument, 0, 'B' },
{ "config", required_argument, 0, 'c' },
{ "dbname", required_argument, 0, 'd' },
{ "minsdiff", required_argument, 0, 'D' },
@@ -5830,6 +6253,9 @@ static struct option long_options[] = {
{ "generate", no_argument, 0, 'g' },
{ "help", no_argument, 0, 'h' },
{ "pool-instance", required_argument, 0, 'i' },
+ // only use 'I' for reloading lots of known valid data via CKDB,
+ // DON'T use when connected to ckpool
+ { "ignore-seq", required_argument, 0, 'I' },
{ "killold", no_argument, 0, 'k' },
{ "loglevel", required_argument, 0, 'l' },
// marker = enable mark/workmarker/markersummary auto generation
@@ -5880,7 +6306,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
- while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:kl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
+ while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
switch(c) {
case 'a':
len = strlen(optarg);
@@ -5890,6 +6316,29 @@ int main(int argc, char **argv)
(int)len, MAX_ALERT_CMD);
ckdb_alert_cmd = strdup(optarg);
break;
+ case 'b':
+ {
+ int64_t beg = atoll(optarg);
+ if (beg < 0) {
+ quit(1, "Invalid shares begin "
+ "%"PRId64" - must be >= 0",
+ beg);
+ }
+ shares_begin = beg;
+ }
+ break;
+ case 'B':
+ {
+ int bt = atoi(optarg);
+ if (bt < 1) {
+ quit(1, "Invalid breakdown "
+ "thread count %d "
+ "- must be > 0",
+ bt);
+ }
+ breakdown_threads = bt;
+ }
+ break;
case 'c':
ckp.config = strdup(optarg);
break;
@@ -5921,14 +6370,6 @@ int main(int argc, char **argv)
optarg);
}
break;
- /* WARNING - enabling -i will require a DB data update
- * if you've used ckdb before 1.920
- * All (old) marks and workmarkers in the DB will need
- * to have poolinstance set to the given -i value
- * since they will be blank */
- case 'i':
- poolinstance = (const char *)strdup(optarg);
- break;
case 'g':
genpayout_auto = true;
break;
@@ -5949,6 +6390,17 @@ int main(int argc, char **argv)
printf("-%c | --%s\n", jopt->val, jopt->name);
}
exit(0);
+ /* WARNING - enabling -i will require a DB data update
+ * if you've used ckdb before 1.920
+ * All (old) marks and workmarkers in the DB will need
+ * to have poolinstance set to the given -i value
+ * since they will be blank */
+ case 'i':
+ poolinstance = (const char *)strdup(optarg);
+ break;
+ case 'I':
+ ignore_seq = true;
+ break;
case 'k':
ckp.killold = true;
break;
@@ -5963,7 +6415,29 @@ int main(int argc, char **argv)
markersummary_auto = true;
break;
case 'M':
- mark_start = strdup(optarg);
+ {
+ bool ok = true;
+ switch (optarg[0]) {
+ case 'D': // Days * mark_start
+ mark_start_type = 'D';
+ mark_start = atoll(optarg+1);
+ break;
+ case 'S': // Shifts * mark_start
+ mark_start_type = 'S';
+ mark_start = atoll(optarg+1);
+ break;
+ case 'M': // Markerid = mark_start
+ mark_start_type = 'M';
+ mark_start = atoll(optarg+1);
+ break;
+ default:
+ ok = false;
+ break;
+ }
+ if (!ok || mark_start <= 0)
+ quit(1, "Invalid -M must be D, S or"
+ " M followed by a number>0");
+ }
break;
case 'n':
ckp.name = strdup(optarg);
@@ -6116,6 +6590,7 @@ int main(int argc, char **argv)
ckp.main.ckp = &ckp;
ckp.main.processname = strdup("main");
+ cklock_init(&breakdown_lock);
cklock_init(&last_lock);
cklock_init(&btc_lock);
cklock_init(&poolinstance_lock);
@@ -6163,11 +6638,12 @@ int main(int argc, char **argv)
time_t start, trigger, curr;
char *msg = NULL;
+ everyone_die = true;
trigger = start = time(NULL);
while (socketer_using_data || summariser_using_data ||
logger_using_data || plistener_using_data ||
clistener_using_data || blistener_using_data ||
- marker_using_data) {
+ marker_using_data || breakdown_using_data) {
msg = NULL;
curr = time(NULL);
if (curr - start > 4) {
@@ -6179,7 +6655,7 @@ int main(int argc, char **argv)
}
if (msg) {
trigger = curr;
- printf("%s %ds due to%s%s%s%s%s%s%s\n",
+ printf("%s %ds due to%s%s%s%s%s%s%s%s\n",
msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY,
summariser_using_data ? " summariser" : EMPTY,
@@ -6187,7 +6663,8 @@ int main(int argc, char **argv)
plistener_using_data ? " plistener" : EMPTY,
clistener_using_data ? " clistener" : EMPTY,
blistener_using_data ? " blistener" : EMPTY,
- marker_using_data ? " marker" : EMPTY);
+ marker_using_data ? " marker" : EMPTY,
+ breakdown_using_data ? " breakdown" : EMPTY);
fflush(stdout);
}
sleep(1);
diff --git a/src/ckdb.h b/src/ckdb.h
index f19b174a..f099ed04 100644
--- a/src/ckdb.h
+++ b/src/ckdb.h
@@ -51,7 +51,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.5"
-#define CKDB_VERSION DB_VERSION"-1.984"
+#define CKDB_VERSION DB_VERSION"-2.003"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@@ -1037,7 +1037,7 @@ typedef struct msgline {
#define ALLOC_MSGLINE 8192
#define LIMIT_MSGLINE 0
-#define CULL_MSGLINE 16
+#define CULL_MSGLINE 8
#define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline)
#define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true)
#define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false)
@@ -1045,6 +1045,57 @@ typedef struct msgline {
extern K_LIST *msgline_free;
extern K_STORE *msgline_store;
+// BREAKQUEUE
+typedef struct breakqueue {
+ char *buf;
+ tv_t now;
+ int seqentryflags;
+ int sockd;
+ enum cmd_values cmdnum;
+ K_ITEM *ml_item;
+ uint64_t count;
+ char *filename;
+} BREAKQUEUE;
+
+#define ALLOC_BREAKQUEUE 16384
+#define LIMIT_BREAKQUEUE 0
+#define CULL_BREAKQUEUE 4
+#define INIT_BREAKQUEUE(_item) INIT_GENERIC(_item, breakqueue)
+#define DATA_BREAKQUEUE(_var, _item) DATA_GENERIC(_var, _item, breakqueue, true)
+
+/* If a breaker() thread's done break queue count hits the LIMIT, or is empty,
+ * it will sleep for SLEEP ms
+ * So this means that with a single breaker() thread,
+ * it can process at most LIMIT records per SLEEP ms
+ * or: 1000 * LIMIT / SLEEP records per second
+ * For N breaker() threads, that would mean between 1 and N times that value
+ * dependent upon the random time spacing of the N thread sleeps
+ * However, also note that LIMIT defines how much RAM can be used by
+ * the break queues, so a limit is required
+ * A breakqueue item can get quite large since it includes both buf
+ * and ml_item (which has the transfer data) in the 'done' queue
+ * Of course the processing speed of the ml_items will also decide how big the
+ * break queue count can get
+ * Note that if the CMD queues get too large they will be too slow responding
+ * to the sockets that sent the message, however the CMD ml_item processing
+ * responds immediately before processing the ml_item for all but ADDRAUTH,
+ * AUTHORISE and HEARTBEAT
+ * The reload also uses this limit when filling the reload break queue
+ * thus limiting the line processing of reload files
+ */
+// 16300,42 equated to single thread limitation of ~388k per second
+#define RELOAD_QUEUE_LIMIT 16300
+#define RELOAD_QUEUE_SLEEP 42
+#define CMD_QUEUE_LIMIT 16300
+#define CMD_QUEUE_SLEEP 42
+
+extern K_LIST *breakqueue_free;
+extern K_STORE *reload_breakqueue_store;
+extern K_STORE *reload_done_breakqueue_store;
+extern K_STORE *cmd_breakqueue_store;
+extern K_STORE *cmd_done_breakqueue_store;
+extern int max_sockd_count;
+
// WORKQUEUE
typedef struct workqueue {
K_ITEM *msgline_item;
@@ -1060,6 +1111,8 @@ typedef struct workqueue {
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
extern K_LIST *workqueue_free;
+// pool0 is all pool data during the reload
+extern K_STORE *pool0_workqueue_store;
extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store;
@@ -1093,7 +1146,7 @@ typedef struct transfer {
// Suggest malloc use MMAP - 1913 = largest under 2MB
#define ALLOC_TRANSFER 1913
#define LIMIT_TRANSFER 0
-#define CULL_TRANSFER 64
+#define CULL_TRANSFER 32
#define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer)
#define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true)
@@ -1723,6 +1776,9 @@ extern double diff_percent;
* This is set only via the runtime parameter -D or --minsdiff */
extern double share_min_sdiff;
+// workinfoid to start loading shares, unset = shares_fill() decides
+extern int64_t shares_begin;
+
// SHAREERRORS shareerrors.id.json={...}
typedef struct shareerrors {
int64_t workinfoid;
@@ -2425,7 +2481,8 @@ extern K_TREE *markersummary_pool_root;
extern K_STORE *markersummary_pool_store;
// The markerid load start for markersummary
-extern char *mark_start;
+extern char mark_start_type;
+extern int64_t mark_start;
// WORKMARKERS
typedef struct workmarkers {
@@ -2594,6 +2651,8 @@ extern K_STORE *userinfo_store;
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now);
+extern void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS);
+#define ckdb_unix_msg(_sockd, _msg) _ckdb_unix_msg(_sockd, _msg, WHERE_FFL_HERE)
extern void tick();
extern PGconn *dbconnect();
extern void sequence_report(bool lock);
@@ -2814,6 +2873,7 @@ extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by,
extern double coinbase_reward(int32_t height);
extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid);
extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b);
+extern cmp_t cmp_shares_db(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b);
extern void dsp_sharesummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b);
@@ -3119,7 +3179,7 @@ extern bool auths_add(PGconn *conn, char *poolinstance, char *username,
char *useragent, char *preauth, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root,
bool addressuser, USERS **users, WORKERS **workers,
- int *event);
+ int *event, bool reload_data);
extern bool poolstats_add(PGconn *conn, bool store, char *poolinstance,
char *elapsed, char *users, char *workers,
char *hashrate, char *hashrate5m,
@@ -3187,7 +3247,7 @@ struct CMDS {
bool noid; // doesn't require an id
bool createdate; // requires a createdate
char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *,
- char *, tv_t *, K_TREE *);
+ char *, tv_t *, K_TREE *, bool);
enum seq_num seq;
int access;
};
diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c
index 6b41ae28..62b7ac0b 100644
--- a/src/ckdb_cmd.c
+++ b/src/ckdb_cmd.c
@@ -34,7 +34,7 @@ static K_ITEM *adminuser(K_TREE *trf_root, char *reply, size_t siz)
static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
char *code, char *inet, __maybe_unused tv_t *notcd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -87,7 +87,8 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, char *by, char *code, char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_oldhash, *i_newhash, *i_2fa, *u_item;
char reply[1024] = "";
@@ -166,7 +167,8 @@ static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_passwordhash, *i_2fa, *u_item;
char reply[1024] = "";
@@ -222,7 +224,8 @@ static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_2fa(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, char *by, char *code, char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_action, *i_entropy, *i_value, *u_item, *u_new;
char reply[1024] = "";
@@ -464,7 +467,8 @@ dame:
static char *cmd_userset(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_passwordhash, *i_2fa, *i_rows, *i_address;
K_ITEM *i_ratio, *i_payname, *i_email, *u_item, *pa_item, *old_pa_item;
@@ -781,7 +785,7 @@ struckout:
static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now,
char *by, char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_workername, *i_diffdef, *i_oldworkers;
K_ITEM *u_item, *ua_item, *w_item;
@@ -1081,7 +1085,7 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by,
static char *cmd_poolstats(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notnow, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
bool igndup = false;
@@ -1096,7 +1100,8 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id,
static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notnow, char *by, char *code,
- char *inet, tv_t *cd, K_TREE *trf_root)
+ char *inet, tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -1180,7 +1185,8 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_workerstats(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notnow, char *by, char *code,
- char *inet, tv_t *cd, K_TREE *trf_root)
+ char *inet, tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -1263,7 +1269,8 @@ static char *cmd_blocklist(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
int ovent = OVENT_OK;
K_TREE_CTX ctx[1];
@@ -1525,7 +1532,8 @@ redo:
static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now,
char *by, char *code, char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_height, *i_blockhash, *i_action, *i_info;
char reply[1024] = "";
@@ -1682,7 +1690,7 @@ static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now,
static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
char *code, char *inet, __maybe_unused tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -1718,7 +1726,8 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *u_item, *p_item, *p2_item, *po_item;
K_TREE_CTX ctx[1];
@@ -2056,7 +2065,8 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users)
static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item;
K_ITEM *ua_item, *us_item, *ws_item;
@@ -2382,7 +2392,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_STORE *usu_store = k_new_store(userstats_free);
K_ITEM *us_item, *usu_item, *u_item;
@@ -2494,7 +2505,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_sharelog(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notnow, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -2946,7 +2958,7 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, int32_t height, char *id,
static char *cmd_blocks(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *notnow, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -2969,12 +2981,13 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id,
igndup = true;
}
- return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup, trf_root);
+ return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup,
+ trf_root);
}
static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, bool reload_data)
{
K_ITEM tmp_poolinstance_item;
TRANSFER tmp_poolinstance;
@@ -3046,7 +3059,8 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
u_item = find_users(username);
K_RUNLOCK(users_free);
if (!u_item) {
- event = events_add(EVENTID_AUTOACC, trf_root);
+ if (!reload_data)
+ event = events_add(EVENTID_AUTOACC, trf_root);
if (event == EVENT_OK) {
DATA_OPTIONCONTROL(optioncontrol, oc_item);
u_item = users_add(conn, username, EMPTY,
@@ -3067,7 +3081,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
transfer_data(i_useragent),
transfer_data(i_preauth),
by, code, inet, cd, trf_root, false,
- &users, &workers, &event);
+ &users, &workers, &event, reload_data);
}
if (!ok) {
@@ -3123,14 +3137,15 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by,
static char *cmd_auth(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, bool reload_data)
{
- return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root);
+ return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root,
+ reload_data);
}
static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, bool reload_data)
{
K_ITEM tmp_poolinstance_item;
TRANSFER tmp_poolinstance;
@@ -3201,7 +3216,7 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by,
transfer_data(i_useragent),
transfer_data(i_preauth),
by, code, inet, cd, trf_root, true,
- &users, &workers, &event);
+ &users, &workers, &event, reload_data);
if (!ok) {
LOGDEBUG("%s() %s.failed.DBE", __func__, id);
@@ -3256,16 +3271,18 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by,
static char *cmd_addrauth(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, bool reload_data)
{
- return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root);
+ return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root,
+ reload_data);
}
static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
HEARTBEATQUEUE *heartbeatqueue;
K_STORE *hq_store;
@@ -3331,7 +3348,8 @@ pulse:
static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look;
K_ITEM *ua_item, *pa_item;
@@ -3506,12 +3524,16 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
int psync = pool_workqueue_store->count;
int csync = cmd_workqueue_store->count;
int bsync = btc_workqueue_store->count;
+ int qsync = breakqueue_free->total - breakqueue_free->count;
snprintf(tmp, sizeof(tmp), "psync=%d%c", psync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "csync=%d%c", csync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "bsync=%d%c", bsync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
+ snprintf(tmp, sizeof(tmp), "qsync=%d%c", qsync, FLDSEP);
+ APPEND_REALLOC(buf, off, len, tmp);
+ // qsync isn't part of 'sync'
snprintf(tmp, sizeof(tmp), "sync=%d%c", psync + csync + bsync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
@@ -3622,7 +3644,8 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_getatts(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_attlist, *u_item, *ua_item;
char reply[1024] = "";
@@ -3798,7 +3821,8 @@ static void att_to_date(tv_t *date, char *data, tv_t *now)
* */
static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
tv_t *now, char *by, char *code, char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
ExecStatusType rescode;
PGresult *res;
@@ -3967,7 +3991,8 @@ bats:
static char *cmd_expatts(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_attlist, *u_item, *ua_item;
char reply[1024] = "";
@@ -4050,7 +4075,8 @@ rats:
static char *cmd_getopts(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_optlist, *oc_item;
char reply[1024] = "";
@@ -4128,7 +4154,8 @@ ruts:
* See opt_set_date() above */
static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
tv_t *now, char *by, char *code, char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
ExecStatusType rescode;
PGresult *res;
@@ -4276,9 +4303,10 @@ rollback:
* and the breakdown for percent address users,
* the totals per user and per payout should still be the same */
static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
- __maybe_unused tv_t *now, __maybe_unused char *by,
- __maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *now, __maybe_unused char *by,
+ __maybe_unused char *code, __maybe_unused char *inet,
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024], tmp[1024], *buf;
char *block_extra, *share_status = EMPTY, *marks_status = EMPTY;
@@ -4753,9 +4781,10 @@ shazbot:
// Generated from the payouts, miningpayouts and payments data
static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id,
- __maybe_unused tv_t *now, __maybe_unused char *by,
- __maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *now, __maybe_unused char *by,
+ __maybe_unused char *code, __maybe_unused char *inet,
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024], tmp[1024], *buf;
char *block_extra, *marks_status = EMPTY;
@@ -5022,7 +5051,8 @@ shazbot:
static char *cmd_payouts(PGconn *conn, char *cmd, char *id, tv_t *now,
char *by, char *code, char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -5274,7 +5304,8 @@ static char *cmd_mpayouts(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *u_item, *mp_item, *po_item;
K_TREE_CTX ctx[1];
@@ -5466,7 +5497,8 @@ static int select_list(WM *wm, char *select)
static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username, *i_select;
K_ITEM *u_item, *p_item, *m_item, ms_look, *wm_item, *ms_item, *wi_item;
@@ -5815,7 +5847,8 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd,
char *id, __maybe_unused tv_t *now,
__maybe_unused char *by, __maybe_unused char *code,
__maybe_unused char *inet, __maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
__maybe_unused K_ITEM *i_file;
__maybe_unused char reply[1024] = "";
@@ -5861,7 +5894,9 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd,
static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root)
+ __maybe_unused tv_t *notcd,
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char tmp[1024], *buf;
const char *name;
@@ -5949,7 +5984,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
// TODO: add to heartbeat to disable the miner if active and status != ""
static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by,
char *code, char *inet, __maybe_unused tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -6016,7 +6051,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *
static char *cmd_marks(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, char *by,
char *code, char *inet, tv_t *cd,
- K_TREE *trf_root)
+ K_TREE *trf_root, __maybe_unused bool reload_data)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@@ -6535,7 +6570,8 @@ dame:
static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_username;
K_ITEM *u_item, *p_item, *m_item, *wm_item, *ms_item, *wi_item;
@@ -6740,18 +6776,26 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
}
/* Show a share status report on the console
- * Currently: sequence status and OoO info */
+ * Currently: sequence status, OoO info and max_sockd_count */
static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root)
+ __maybe_unused tv_t *notcd,
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
char ooo_buf[256];
char buf[256];
+ int count;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
+ K_RLOCK(breakqueue_free);
+ count = max_sockd_count;
+ K_RUNLOCK(breakqueue_free);
+ LOGWARNING(" max_sockd_count=%d", count);
+
snprintf(buf, sizeof(buf), "ok.%s", cmd);
LOGDEBUG("%s.%s", id, buf);
return strdup(buf);
@@ -6761,7 +6805,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *notcd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *ui_item;
USERINFO *userinfo;
@@ -6850,7 +6895,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *notcd, K_TREE *trf_root)
+ __maybe_unused tv_t *notcd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_btcserver, *i_userpass;
char *btcserver = NULL, *userpass = NULL, *tmp;
@@ -6902,7 +6948,8 @@ static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_TREE_CTX ctx[1];
char cd_buf[DATE_BUFSIZ];
@@ -7498,7 +7545,8 @@ static char *cmd_locks(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd,
- __maybe_unused K_TREE *trf_root)
+ __maybe_unused K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
bool code_locks = false, code_deadlocks = false;
bool was_locks = false, was_deadlocks = false;
@@ -7570,11 +7618,12 @@ static void event_tree(K_TREE *the_tree, char *list, char *reply, size_t siz,
snprintf(reply, siz, "list:%d=%s%c",
*rows, list, FLDSEP);
APPEND_REALLOC(*buf, *off, *len, reply);
-
snprintf(reply, siz, "id:%d=%d%c",
*rows, e->id, FLDSEP);
APPEND_REALLOC(*buf, *off, *len, reply);
-
+ snprintf(reply, siz, "idname:%d=%s%c",
+ *rows, e_limits[e->id].name, FLDSEP);
+ APPEND_REALLOC(*buf, *off, *len, reply);
snprintf(reply, siz, "user:%d=%s%c",
*rows, e->createby, FLDSEP);
APPEND_REALLOC(*buf, *off, *len, reply);
@@ -7609,11 +7658,13 @@ static void event_tree(K_TREE *the_tree, char *list, char *reply, size_t siz,
static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
K_ITEM *i_action, *i_cmd, *i_list, *i_ip, *i_eventname, *i_lifetime;
- K_ITEM *i_des, *i_item, *next_item;
+ K_ITEM *i_des, *i_item, *next_item, *o_item;
K_TREE_CTX ctx[1];
+ OVENTS *ovents;
IPS *ips;
char *action, *alert_cmd, *list, *ip, *eventname, *des;
char reply[1024] = "";
@@ -7621,7 +7672,7 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id,
char tmp[1024] = "";
char *buf = NULL;
size_t len, off;
- int i, rows, oldlife, lifetime;
+ int i, rows, oldlife, lifetime, vid, min;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@@ -8034,6 +8085,50 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC_INIT(buf, off, len);
snprintf(tmp, sizeof(tmp), "ok.expired %d", rows);
APPEND_REALLOC(buf, off, len, tmp);
+ } else if (strcasecmp(action, "ovents") == 0) {
+ /* List the ovent tree contents
+ * Output can be large - check web Admin->ckp for tree sizes */
+ bool got;
+ APPEND_REALLOC_INIT(buf, off, len);
+ APPEND_REALLOC(buf, off, len, "ok.");
+ rows = 0;
+ K_RLOCK(ovents_free);
+ o_item = first_in_ktree(ovents_root, ctx);
+ while (o_item) {
+ DATA_OVENTS(ovents, o_item);
+ for (vid = 0; o_limits[vid].name; vid++) {
+ got = false;
+ for (min = 0; min < 60; min++) {
+ if (ovents->count[IDMIN(vid, min)]) {
+ if (!got) {
+ snprintf(reply, siz, "key:%d=%s%c",
+ rows, ovents->key, FLDSEP);
+ APPEND_REALLOC(buf, off, len, reply);
+ snprintf(reply, siz, "id:%d=%d%c",
+ rows, vid, FLDSEP);
+ APPEND_REALLOC(buf, off, len, reply);
+ snprintf(reply, siz, "idname:%d=%s%c",
+ rows, o_limits[vid].name, FLDSEP);
+ APPEND_REALLOC(buf, off, len, reply);
+ snprintf(reply, siz, "hour:%d=%d%c",
+ rows, ovents->hour, FLDSEP);
+ APPEND_REALLOC(buf, off, len, reply);
+ got = true;
+ }
+ snprintf(reply, siz, "min%02d:%d=%d%c",
+ min, rows, ovents->count[IDMIN(vid, min)],
+ FLDSEP);
+ APPEND_REALLOC(buf, off, len, reply);
+ }
+ }
+ if (got)
+ rows++;
+ }
+ o_item = next_in_ktree(ctx);
+ }
+ K_RUNLOCK(ovents_free);
+ snprintf(tmp, sizeof(tmp), "rows=%d", rows);
+ APPEND_REALLOC(buf, off, len, tmp);
} else {
snprintf(reply, siz, "unknown action '%s'", action);
LOGERR("%s() %s.%s", __func__, id, reply);
@@ -8047,7 +8142,8 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id,
static char *cmd_high(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
- __maybe_unused tv_t *cd, K_TREE *trf_root)
+ __maybe_unused tv_t *cd, K_TREE *trf_root,
+ __maybe_unused bool reload_data)
{
bool conned = false;
K_TREE_CTX ctx[1];
diff --git a/src/ckdb_data.c b/src/ckdb_data.c
index c8e8b3c9..77cdca55 100644
--- a/src/ckdb_data.c
+++ b/src/ckdb_data.c
@@ -2333,6 +2333,38 @@ cmp_t cmp_shares(K_ITEM *a, K_ITEM *b)
return c;
}
+/* order by workinfoid asc,userid asc,workername asc,enonce1 asc,nonce2 asc,
+ * nonce asc,expirydate desc
+ * i.e. match the DB table index so duplicates are ignored and all new shares_db
+ * can always go in the DB */
+cmp_t cmp_shares_db(K_ITEM *a, K_ITEM *b)
+{
+ SHARES *sa, *sb;
+ DATA_SHARES(sa, a);
+ DATA_SHARES(sb, b);
+ cmp_t c = CMP_BIGINT(sa->workinfoid, sb->workinfoid);
+ if (c == 0) {
+ c = CMP_BIGINT(sa->userid, sb->userid);
+ if (c == 0) {
+ c = CMP_STR(sa->workername, sb->workername);
+ if (c == 0) {
+ c = CMP_STR(sa->enonce1, sb->enonce1);
+ if (c == 0) {
+ c = CMP_STR(sa->nonce2, sb->nonce2);
+ if (c == 0) {
+ c = CMP_STR(sa->nonce, sb->nonce);
+ if (c == 0) {
+ c = CMP_TV(sb->expirydate,
+ sa->expirydate);
+ }
+ }
+ }
+ }
+ }
+ }
+ return c;
+}
+
// order by workinfoid asc,userid asc,createdate asc,nonce asc,expirydate desc
cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b)
{
diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c
index 8d707403..e42252de 100644
--- a/src/ckdb_dbio.c
+++ b/src/ckdb_dbio.c
@@ -3625,7 +3625,7 @@ discard:
static void shareerrors_process_early(PGconn *conn, int64_t good_wid,
tv_t *good_cd, K_TREE *trf_root);
-// DB Shares are stored by by the summariser to ensure the reload is correct
+// DB Shares are stored by the summariser to ensure the reload is correct
bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername,
char *clientid, char *errn, char *enonce1, char *nonce2,
char *nonce, char *diff, char *sdiff, char *secondaryuserid,
@@ -3640,7 +3640,6 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
USERS *users;
bool ok = false;
char *st = NULL;
- int errn_int;
LOGDEBUG("%s(): %s/%s/%s/%s/%ld,%ld",
__func__,
@@ -3649,13 +3648,10 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
FREENULL(st);
TXT_TO_DOUBLE("sdiff", sdiff, sdiff_amt);
- TXT_TO_INT("errn", errn, errn_int);
K_WLOCK(shares_free);
s_item = k_unlink_head(shares_free);
- // Don't store duplicates since they will already exist
- if (errn_int != SE_DUPE && share_min_sdiff > 0 &&
- sdiff_amt >= share_min_sdiff)
+ if (share_min_sdiff > 0 && sdiff_amt >= share_min_sdiff)
s2_item = k_unlink_head(shares_free);
K_WUNLOCK(shares_free);
@@ -3731,7 +3727,9 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
add_to_ktree(shares_early_root, s_item);
k_add_head(shares_early_store, s_item);
if (s2_item) {
- // Just ignore duplicates
+ /* Just ignore duplicates - this matches the DB index
+ N.B. a duplicate share doesn't have to be SE_DUPE,
+ two shares can be SE_NONE and SE_STALE */
tmp_item = find_in_ktree(shares_db_root, s2_item, ctx);
if (tmp_item == NULL) {
// Store them in advance - always
@@ -3860,16 +3858,51 @@ bool shares_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
- K_ITEM *item = NULL;
+ K_TREE_CTX ctx[1];
+ K_ITEM *item = NULL, *wi_item;
+ WORKINFO *workinfo = NULL;
SHARES *row;
int n, t, i;
char *field;
char *sel = NULL;
- int fields = 14;
+ char *params[1];
+ int fields = 14, par = 0;
bool ok = false;
+ int64_t workinfoid;
+ tv_t old;
LOGDEBUG("%s(): select", __func__);
+ if (shares_begin >= 0)
+ workinfoid = shares_begin;
+ else {
+ /* Workinfo is already loaded
+ * CKDB doesn't currently use shares_db in processing,
+ * but make sure we have enough to avoid loading duplicates
+ * 1 day should be more than enough for normal running,
+ * however, if more than 1 day is needed,
+ * use -b to set the shares_begin workinfoid */
+ setnow(&old);
+ old.tv_sec -= 60 * 60 * 24; // 1 day
+ K_RLOCK(workinfo_free);
+ wi_item = last_in_ktree(workinfo_root, ctx);
+ while (wi_item) {
+ DATA_WORKINFO(workinfo, wi_item);
+ if (!tv_newer(&old, &(workinfo->createdate)))
+ break;
+ wi_item = prev_in_ktree(ctx);
+ }
+ if (wi_item)
+ workinfoid = workinfo->workinfoid;
+ else {
+ // none old enough, so just load from them all
+ workinfoid = 0;
+ }
+ K_RUNLOCK(workinfo_free);
+ }
+
+ LOGWARNING("%s(): loading from workinfoid>=%"PRId64, __func__, workinfoid);
+
printf(TICK_PREFIX"sh 0\r");
fflush(stdout);
@@ -3877,7 +3910,10 @@ bool shares_fill(PGconn *conn)
"workinfoid,userid,workername,clientid,enonce1,nonce2,nonce,"
"diff,sdiff,errn,error,secondaryuserid,ntime,minsdiff"
HISTORYDATECONTROL
- " from shares";
+ " from shares where workinfoid>=$1";
+ par = 0;
+ params[par++] = bigint_to_buf(workinfoid, NULL, 0);
+ PARCHK(par, params);
res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res);
@@ -3895,7 +3931,7 @@ bool shares_fill(PGconn *conn)
goto flail;
}
- res = PQexec(conn, sel, CKPQ_READ);
+ res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
@@ -7064,7 +7100,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
char *useragent, char *preauth, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root,
bool addressuser, USERS **users, WORKERS **workers,
- int *event)
+ int *event, bool reload_data)
{
K_TREE_CTX ctx[1];
K_ITEM *a_item, *u_item, *w_item;
@@ -7095,7 +7131,8 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username,
__func__,
st = safe_text_nonull(username));
FREENULL(st);
- *event = events_add(EVENTID_INVAUTH, trf_root);
+ if (!reload_data)
+ *event = events_add(EVENTID_INVAUTH, trf_root);
}
if (!u_item)
goto unitem;
@@ -7719,17 +7756,86 @@ bool markersummary_fill(PGconn *conn)
{
ExecStatusType rescode;
PGresult *res;
- K_ITEM *item = NULL, *p_item;
+ K_ITEM *item = NULL, *p_item, *wm_item = NULL;
+ K_TREE_CTX ctx[1];
+ char cd_buf[DATE_BUFSIZ];
+ char *cd = NULL, *what = NULL;
int n, t, i, p_n;
MARKERSUMMARY *row, *p_row;
+ WORKMARKERS *workmarkers;
char *params[1];
char *field;
char *sel;
int fields = 20, par = 0;
+ int64_t ms = 0, amt = 0;
bool ok = false;
+ tv_t old;
LOGDEBUG("%s(): select", __func__);
+ if (mark_start < 0)
+ mark_start = 0;
+ else {
+ amt = ms = mark_start;
+ switch (mark_start_type) {
+ case 'D': // mark_start days
+ setnow(&old);
+ old.tv_sec -= 60 * 60 * 24 * ms;
+ K_RLOCK(workmarkers_free);
+ wm_item = last_in_ktree(workmarkers_root, ctx);
+ while (wm_item) {
+ // Newest processed workmarker <= old
+ DATA_WORKMARKERS(workmarkers, wm_item);
+ if (CURRENT(&(workmarkers->expirydate)) &&
+ WMPROCESSED(workmarkers->status) &&
+ !tv_newer(&old, &(workmarkers->createdate)))
+ break;
+ wm_item = prev_in_ktree(ctx);
+ }
+ if (!wm_item)
+ mark_start = 0;
+ else {
+ mark_start = workmarkers->markerid;
+ tv_to_buf(&(workmarkers->createdate),
+ cd_buf, sizeof(cd_buf));
+ cd = cd_buf;
+ what = "days";
+ }
+ K_RUNLOCK(workmarkers_free);
+ break;
+ case 'S': // mark_start shifts (workmarkers)
+ K_RLOCK(workmarkers_free);
+ wm_item = last_in_ktree(workmarkers_root, ctx);
+ while (wm_item) {
+ DATA_WORKMARKERS(workmarkers, wm_item);
+ if (CURRENT(&(workmarkers->expirydate)) &&
+ WMPROCESSED(workmarkers->status)) {
+ ms--;
+ if (ms <= 0)
+ break;
+ }
+ wm_item = prev_in_ktree(ctx);
+ }
+ if (!wm_item)
+ mark_start = 0;
+ else {
+ mark_start = workmarkers->markerid;
+ tv_to_buf(&(workmarkers->createdate),
+ cd_buf, sizeof(cd_buf));
+ cd = cd_buf;
+ what = "shifts";
+ }
+ K_RUNLOCK(workmarkers_free);
+ break;
+ case 'M': // markerid = mark_start
+ break;
+ default:
+ /* Not possible unless ckdb.c is different
+ * in which case it will just use mark_start */
+ break;
+ }
+ }
+
// TODO: limit how far back
sel = "declare ws cursor for select "
"markerid,userid,workername,diffacc,diffsta,diffdup,diffhi,"
@@ -7738,14 +7844,16 @@ bool markersummary_fill(PGconn *conn)
"lastshareacc,lastdiffacc"
MODIFYDATECONTROL
" from markersummary where markerid>=$1";
+
par = 0;
- if (mark_start)
- params[par++] = mark_start;
- else
- params[par++] = "0";
+ params[par++] = bigint_to_buf(mark_start, NULL, 0);
PARCHK(par, params);
LOGWARNING("%s(): loading from markerid>=%s", __func__, params[0]);
+ if (cd) {
+ LOGWARNING(" ... %s = %s >= %"PRId64" %s",
+ params[0], cd, amt, what);
+ }
printf(TICK_PREFIX"ms 0\r");
fflush(stdout);
@@ -7980,6 +8088,10 @@ flail:
res = PQexec(conn, "Commit", CKPQ_READ);
PQclear(res);
+ for (i = 0; i < par; i++)
+ free(params[i]);
+ par = 0;
+
if (ok) {
LOGDEBUG("%s(): built", __func__);
LOGWARNING("%s(): fetched %d markersummary records", __func__, n);