diff --git a/src/ckdb.c b/src/ckdb.c index 5ed5b1d5..f2f58c18 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -28,9 +28,9 @@ * with an ok.queued reply to ckpool, to be processed after the reload * completes and just process authorise messages immediately while the * reload runs - * We start the ckpool message queue after loading - * the users, idcontrol and workers DB tables, before loading the - * much larger DB tables so that ckdb is effectively ready for messages + * However, we start the ckpool message queue after loading + * the optioncontrol, users, workers and useratts DB tables, before loading + * the much larger DB tables, so that ckdb is effectively ready for messages * almost immediately * The first ckpool message allows us to know where ckpool is up to * in the CCLs - see reload_from() for how this is handled @@ -47,8 +47,10 @@ * complete='a' (or 'y') and were deleted from RAM * If there are none with complete='n' but are others in the DB, * then the newest firstshare is used + * DB shares: no current processing done with the shares_hi tree inside + * CKDB. DB load gets the past 1 day to resolve duplicates * RAM shareerrors: as above - * DB+RAM sharesummary: created from shares, so as above + * RAM sharesummary: created from shares, so as above * Some shares after this may have been summarised to other * sharesummary complete='n', but for any such sharesummary * we reset it back to the first share found and it will @@ -112,6 +114,15 @@ static bool logger_using_data; static bool plistener_using_data; static bool clistener_using_data; static bool blistener_using_data; +static bool breakdown_using_data; + +// -B to override calculated value +static int breakdown_threads = -1; +static int reload_breakdown_count = 0; +static int cmd_breakdown_count = 0; +/* Lock for access to *breakdown_count + * Any change to/from 0 will update breakdown_using_data */ +static cklock_t breakdown_lock; char *EMPTY = ""; const char *nullstr = "(null)"; @@ -332,6 +343,17 @@ K_STORE *logqueue_store; K_LIST *msgline_free; K_STORE *msgline_store; +// BREAKQUEUE +K_LIST *breakqueue_free; +K_STORE *reload_breakqueue_store; +K_STORE *reload_done_breakqueue_store; +K_STORE *cmd_breakqueue_store; +K_STORE *cmd_done_breakqueue_store; +// Locked access with breakqueue_free +static int reload_processing; +static int sockd_count; +int max_sockd_count; + // WORKQUEUE K_LIST *workqueue_free; K_STORE *pool_workqueue_store; @@ -916,8 +938,12 @@ static bool getdata3() if (!(ok = miningpayouts_fill(conn)) || everyone_die) goto sukamudai; } + PQfinish(conn); + conn = dbconnect(); if (!(ok = workinfo_fill(conn)) || everyone_die) goto sukamudai; + PQfinish(conn); + conn = dbconnect(); if (!(ok = marks_fill(conn)) || everyone_die) goto sukamudai; /* must be after workinfo */ @@ -928,8 +954,12 @@ static bool getdata3() if (!(ok = payouts_fill(conn)) || everyone_die) goto sukamudai; } + PQfinish(conn); + conn = dbconnect(); if (!(ok = markersummary_fill(conn)) || everyone_die) goto sukamudai; + PQfinish(conn); + conn = dbconnect(); if (!(ok = shares_fill(conn)) || everyone_die) goto sukamudai; if (!confirm_sharesummary && !everyone_die) @@ -1691,10 +1721,17 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); + // TODO: msgline FREE_STORE(pool_workqueue); FREE_STORE(cmd_workqueue); FREE_STORE(btc_workqueue); FREE_LIST(workqueue); + // TODO: sockets/buf/msgline + FREE_STORE(cmd_done_breakqueue); + FREE_STORE(cmd_breakqueue); + FREE_STORE(reload_done_breakqueue); + FREE_STORE(reload_breakqueue); + FREE_LIST(breakqueue); FREE_LISTS(msgline); if (free_mode != FREE_MODE_ALL) @@ -3160,6 +3197,137 @@ nogood: return CMD_REPLY; } +static void *breaker(void *arg) +{ + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + char buf[128]; + int thr, zeros; + bool reload, was_zero, msg = false; + int queue_sleep, queue_limit, count; + + pthread_detach(pthread_self()); + + // Is this a reload thread or a cmd thread? + reload = *(bool *)(arg); + if (reload) { + queue_limit = RELOAD_QUEUE_LIMIT; + queue_sleep = RELOAD_QUEUE_SLEEP; + } else { + queue_limit = CMD_QUEUE_LIMIT; + queue_sleep = CMD_QUEUE_SLEEP; + } + + ck_wlock(&breakdown_lock); + if (reload) + thr = ++reload_breakdown_count; + else + thr = ++cmd_breakdown_count; + breakdown_using_data = true; + ck_wunlock(&breakdown_lock); + + if (breakdown_threads < 10) + zeros = 1; + else + zeros = (int)log10(breakdown_threads) + 1; + + snprintf(buf, sizeof(buf), "db_%c%0*d%s", + reload ? 'r' : 'c', zeros, thr, __func__); + LOCK_INIT(buf); + rename_proc(buf); + + if (reload) { + /* reload has to wait for the reload to start, however, also + * check for startup_complete in case we missed the reload */ + while (!everyone_die && !reloading && !startup_complete) + cksleep_ms(queue_sleep); + } + + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = NULL; + was_zero = false; + if (reload) + count = reload_done_breakqueue_store->count; + else + count = cmd_done_breakqueue_store->count; + + // Don't unlink if we are above the limit + if (count <= queue_limit) { + if (reload) + bq_item = k_unlink_head(reload_breakqueue_store); + else + bq_item = k_unlink_head(cmd_breakqueue_store); + if (!bq_item) + was_zero = true; + } + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + // Is the queue empty and the reload completed? + if (was_zero && reload && !reloading) + break; + + cksleep_ms(queue_sleep); + continue; + } + + DATA_BREAKQUEUE(bq, bq_item); + + if (reload) { + bool matched = false; + ck_wlock(&fpm_lock); + if (first_pool_message && + strcmp(first_pool_message, bq->buf) == 0) { + matched = true; + FREENULL(first_pool_message); + } + ck_wunlock(&fpm_lock); + if (matched) { + LOGERR("%s() reload ckpool queue match at line %"PRIu64, + __func__, bq->count); + } + } + + bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags); + K_WLOCK(breakqueue_free); + if (reload) + k_add_tail(reload_done_breakqueue_store, bq_item); + else + k_add_tail(cmd_done_breakqueue_store, bq_item); + + if (breakqueue_free->count == breakqueue_free->total && + breakqueue_free->total >= ALLOC_BREAKQUEUE * CULL_BREAKQUEUE) + k_cull_list(breakqueue_free); + K_WUNLOCK(breakqueue_free); + } + + // Get it now while the lock still exists, in case we need it + K_RLOCK(breakqueue_free); + // Not 100% exact since it could still increase, but close enough + count = max_sockd_count; + K_RUNLOCK(breakqueue_free); + + ck_wlock(&breakdown_lock); + if (reload) + reload_breakdown_count--; + else + cmd_breakdown_count--; + + if ((reload_breakdown_count + cmd_breakdown_count) < 1) { + breakdown_using_data = false; + msg = true; + } + ck_wunlock(&breakdown_lock); + + if (msg) { + LOGWARNING("%s() threads shut down - max_sockd_count=%d", + __func__, count); + } + + return NULL; +} + static void check_blocks() { K_TREE_CTX ctx[1]; @@ -4160,7 +4328,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) workqueue->code, workqueue->inet, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, false); siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); snprintf(rep, siz, "%s.%ld.%s", @@ -4168,6 +4336,9 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) msgline->now.tv_sec, ans); send_unix_msg(msgline->sockd, rep); close(msgline->sockd); + K_WLOCK(breakqueue_free); + sockd_count--; + K_WUNLOCK(breakqueue_free); FREENULL(ans); FREENULL(rep); @@ -4182,6 +4353,8 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE) k_cull_list(workqueue_free); K_WUNLOCK(workqueue_free); + + tick(); } static void *clistener(__maybe_unused void *arg) @@ -4190,6 +4363,8 @@ static void *clistener(__maybe_unused void *arg) K_ITEM *wq_item; time_t now; + pthread_detach(pthread_self()); + LOCK_INIT("db_clistener"); rename_proc("db_clistener"); @@ -4210,10 +4385,9 @@ static void *clistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) { + if (wq_item) process_sockd(conn, wq_item); - tick(); - } else + else cksleep_ms(42); } @@ -4231,12 +4405,13 @@ static void *blistener(__maybe_unused void *arg) K_ITEM *wq_item; time_t now; + pthread_detach(pthread_self()); + LOCK_INIT("db_blistener"); rename_proc("db_blistener"); blistener_using_data = true; - conn = dbconnect(); now = time(NULL); while (!everyone_die) { @@ -4251,10 +4426,9 @@ static void *blistener(__maybe_unused void *arg) now = time(NULL); } - if (wq_item) { + if (wq_item) process_sockd(conn, wq_item); - tick(); - } else + else cksleep_ms(142); } @@ -4266,22 +4440,350 @@ static void *blistener(__maybe_unused void *arg) return NULL; } -static void *socketer(__maybe_unused void *arg) +static void *process_socket(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - pthread_t clis_pt, blis_pt; - unixsock_t *us = &pi->us; - char *end, *ans = NULL, *rep = NULL, *buf = NULL, *tmp; - enum cmd_values cmdnum; - int sockd; - K_ITEM *wq_item = NULL, *ml_item = NULL; - WORKQUEUE *workqueue; - MSGLINE *msgline; + K_ITEM *bq_item = NULL, *wq_item = NULL; + WORKQUEUE *workqueue = NULL; + BREAKQUEUE *bq = NULL; + MSGLINE *msgline = NULL; + bool want_first, replied, btc, dec_sockd; + int loglevel, oldloglevel; char reply[1024+1]; + char *ans = NULL, *rep = NULL, *tmp; size_t siz; + + pthread_detach(pthread_self()); + + LOCK_INIT("db_procsock"); + rename_proc("db_procsock"); + + want_first = true; + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(cmd_done_breakqueue_store); + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + cksleep_ms(24); + continue; + } + + DATA_BREAKQUEUE(bq, bq_item); + DATA_MSGLINE(msgline, bq->ml_item); + replied = btc = false; + switch (bq->cmdnum) { + case CMD_REPLY: + snprintf(reply, sizeof(reply), + "%s.%ld.?.", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + case CMD_ALERTEVENT: + case CMD_ALERTOVENT: + snprintf(reply, sizeof(reply), + "%s.%ld.failed.ERR", + msgline->id, + bq->now.tv_sec); + if (bq->cmdnum == CMD_ALERTEVENT) + tmp = reply_event(EVENTID_NONE, reply); + else + tmp = reply_ovent(OVENTID_NONE, reply); + send_unix_msg(bq->sockd, tmp); + FREENULL(tmp); + break; + case CMD_TERMINATE: + LOGWARNING("Listener received" + " terminate message," + " terminating ckdb"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.exiting", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + everyone_die = true; + break; + case CMD_PING: + LOGDEBUG("Listener received ping" + " request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.pong", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + case CMD_VERSION: + LOGDEBUG("Listener received" + " version request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.CKDB V%s", + msgline->id, + bq->now.tv_sec, + CKDB_VERSION); + send_unix_msg(bq->sockd, reply); + break; + case CMD_LOGLEVEL: + if (!*(msgline->id)) { + LOGDEBUG("Listener received" + " loglevel, currently %d", + pi->ckp->loglevel); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.loglevel" + " currently %d", + msgline->id, + bq->now.tv_sec, + pi->ckp->loglevel); + } else { + oldloglevel = pi->ckp->loglevel; + loglevel = atoi(msgline->id); + LOGDEBUG("Listener received loglevel" + " %d currently %d A", + loglevel, oldloglevel); + if (loglevel < LOG_EMERG || + loglevel > LOG_DEBUG) { + snprintf(reply, sizeof(reply), + "%s.%ld.ERR.invalid" + " loglevel %d" + " - currently %d", + msgline->id, + bq->now.tv_sec, + loglevel, + oldloglevel); + } else { + pi->ckp->loglevel = loglevel; + snprintf(reply, sizeof(reply), + "%s.%ld.ok.loglevel" + " now %d - was %d", + msgline->id, + bq->now.tv_sec, + pi->ckp->loglevel, + oldloglevel); + } + // Do this twice since the loglevel may have changed + LOGDEBUG("Listener received loglevel" + " %d currently %d B", + loglevel, oldloglevel); + } + send_unix_msg(bq->sockd, reply); + break; + case CMD_FLUSH: + LOGDEBUG("Listener received" + " flush request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.splash", + msgline->id, bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + fflush(stdout); + fflush(stderr); + if (global_ckp && global_ckp->logfd) + fflush(global_ckp->logfp); + break; + case CMD_USERSET: + case CMD_BTCSET: + btc = true; + case CMD_CHKPASS: + case CMD_2FA: + case CMD_ADDUSER: + case CMD_NEWPASS: + case CMD_WORKERSET: + case CMD_GETATTS: + case CMD_SETATTS: + case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: + case CMD_BLOCKLIST: + case CMD_NEWID: + case CMD_STATS: + case CMD_USERSTATUS: + case CMD_SHSTA: + case CMD_USERINFO: + case CMD_LOCKS: + case CMD_EVENTS: + case CMD_HIGH: + msgline->sockd = bq->sockd; + bq->sockd = -1; + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + break; + // Process, but reject (loading) until startup_complete + case CMD_HOMEPAGE: + case CMD_ALLUSERS: + case CMD_WORKERS: + case CMD_PAYMENTS: + case CMD_PPLNS: + case CMD_PPLNS2: + case CMD_PAYOUTS: + case CMD_MPAYOUTS: + case CMD_SHIFTS: + case CMD_PSHIFT: + case CMD_DSP: + case CMD_BLOCKSTATUS: + case CMD_MARKS: + case CMD_QUERY: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", + msgline->id, + bq->now.tv_sec, + msgline->cmd); + send_unix_msg(bq->sockd, reply); + } else { + msgline->sockd = bq->sockd; + bq->sockd = -1; + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + } + break; + // Always process immediately: + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(bq->buf); + ck_wunlock(&fpm_lock); + } + DATA_MSGLINE(msgline, bq->ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root, false); + siz = strlen(ans) + strlen(msgline->id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + bq->now.tv_sec, ans); + send_unix_msg(bq->sockd, rep); + FREENULL(ans); + FREENULL(rep); + replied = true; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + if (!replied) { + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(bq->buf); + ck_wunlock(&fpm_lock); + } + snprintf(reply, sizeof(reply), + "%s.%ld.ok.queued", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + } + + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = bq->ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + k_add_tail(pool_workqueue_store, wq_item); + /* Stop the reload queue from growing too big + * Use a size that 'should be big enough' */ + while (reloading && pool_workqueue_store->count > 250000) { + K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); + K_WUNLOCK(workqueue_free); + WORKQUEUE *wq; + DATA_WORKQUEUE(wq, wq2_item); + K_ITEM *ml_item = wq->msgline_item; + free_msgline_data(ml_item, true, false); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq2_item); + } + K_WUNLOCK(workqueue_free); + wq_item = bq->ml_item = NULL; + mutex_lock(&wq_waitlock); + pthread_cond_signal(&wq_waitcond); + mutex_unlock(&wq_waitlock); + break; + // Code error + default: + LOGEMERG("%s() CODE ERROR unhandled" + " message %d %.32s...", + __func__, bq->cmdnum, bq->buf); + snprintf(reply, sizeof(reply), + "%s.%ld.failed.code", + msgline->id, + bq->now.tv_sec); + send_unix_msg(bq->sockd, reply); + break; + } + if (bq->sockd >= 0) { + close(bq->sockd); + dec_sockd = true; + } else + dec_sockd = false; + + if (bq->ml_item) { + free_msgline_data(bq->ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, bq->ml_item); + K_WUNLOCK(msgline_free); + bq->ml_item = NULL; + } + free(bq->buf); + + K_WLOCK(breakqueue_free); + if (dec_sockd) + sockd_count--; + k_add_head(breakqueue_free, bq_item); + K_WUNLOCK(breakqueue_free); + } + + return NULL; +} + +static void *socketer(void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + pthread_t clis_pt, blis_pt, proc_pt; + unixsock_t *us = &pi->us; + char *end, *buf = NULL; + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + int sockd; tv_t now; - bool want_first, replied, btc; - int loglevel, oldloglevel; pthread_detach(pthread_self()); @@ -4298,20 +4800,17 @@ static void *socketer(__maybe_unused void *arg) create_pthread(&clis_pt, clistener, NULL); create_pthread(&blis_pt, blistener, NULL); + + create_pthread(&proc_pt, process_socket, arg); } - want_first = true; while (!everyone_die) { - if (buf) - dealloc(buf); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGERR("%s() Failed to accept on socket", __func__); break; } - cmdnum = CMD_UNSET; - buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2); // Once we've read the message setnow(&now); @@ -4325,358 +4824,85 @@ static void *socketer(__maybe_unused void *arg) // An empty message wont get a reply if (!buf) LOGWARNING("%s() Failed to get message", __func__); - else + else { LOGWARNING("%s() Empty message", __func__); + free(buf); + } } else { int seqentryflags = SE_SOCKET; if (!reload_queue_complete) seqentryflags = SE_EARLYSOCK; - cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); - DATA_MSGLINE(msgline, ml_item); - replied = btc = false; - switch (cmdnum) { - case CMD_REPLY: - snprintf(reply, sizeof(reply), - "%s.%ld.?.", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_ALERTEVENT: - case CMD_ALERTOVENT: - snprintf(reply, sizeof(reply), - "%s.%ld.failed.ERR", - msgline->id, - now.tv_sec); - if (cmdnum == CMD_ALERTEVENT) - tmp = reply_event(EVENTID_NONE, reply); - else - tmp = reply_ovent(OVENTID_NONE, reply); - send_unix_msg(sockd, tmp); - FREENULL(tmp); - break; - case CMD_TERMINATE: - LOGWARNING("Listener received" - " terminate message," - " terminating ckdb"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.exiting", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - everyone_die = true; - break; - case CMD_PING: - LOGDEBUG("Listener received ping" - " request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.pong", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_VERSION: - LOGDEBUG("Listener received" - " version request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.CKDB V%s", - msgline->id, - now.tv_sec, - CKDB_VERSION); - send_unix_msg(sockd, reply); - break; - case CMD_LOGLEVEL: - if (!*(msgline->id)) { - LOGDEBUG("Listener received" - " loglevel, currently %d", - pi->ckp->loglevel); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " currently %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel); - } else { - oldloglevel = pi->ckp->loglevel; - loglevel = atoi(msgline->id); - LOGDEBUG("Listener received loglevel" - " %d currently %d A", - loglevel, oldloglevel); - if (loglevel < LOG_EMERG || - loglevel > LOG_DEBUG) { - snprintf(reply, sizeof(reply), - "%s.%ld.ERR.invalid" - " loglevel %d" - " - currently %d", - msgline->id, - now.tv_sec, - loglevel, - oldloglevel); - } else { - pi->ckp->loglevel = loglevel; - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " now %d - was %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel, - oldloglevel); - } - // Do this twice since the loglevel may have changed - LOGDEBUG("Listener received loglevel" - " %d currently %d B", - loglevel, oldloglevel); - } - send_unix_msg(sockd, reply); - break; - case CMD_FLUSH: - LOGDEBUG("Listener received" - " flush request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.splash", - msgline->id, now.tv_sec); - send_unix_msg(sockd, reply); - fflush(stdout); - fflush(stderr); - if (global_ckp && global_ckp->logfd) - fflush(global_ckp->logfp); - break; - case CMD_USERSET: - case CMD_BTCSET: - btc = true; - case CMD_CHKPASS: - case CMD_2FA: - case CMD_ADDUSER: - case CMD_NEWPASS: - case CMD_WORKERSET: - case CMD_GETATTS: - case CMD_SETATTS: - case CMD_EXPATTS: - case CMD_GETOPTS: - case CMD_SETOPTS: - case CMD_BLOCKLIST: - case CMD_NEWID: - case CMD_STATS: - case CMD_USERSTATUS: - case CMD_SHSTA: - case CMD_USERINFO: - case CMD_LOCKS: - case CMD_EVENTS: - case CMD_HIGH: - msgline->sockd = sockd; - sockd = -1; - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - if (btc) - k_add_tail(btc_workqueue_store, wq_item); - else - k_add_tail(cmd_workqueue_store, wq_item); - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - break; - // Process, but reject (loading) until startup_complete - case CMD_HOMEPAGE: - case CMD_ALLUSERS: - case CMD_WORKERS: - case CMD_PAYMENTS: - case CMD_PPLNS: - case CMD_PPLNS2: - case CMD_PAYOUTS: - case CMD_MPAYOUTS: - case CMD_SHIFTS: - case CMD_PSHIFT: - case CMD_DSP: - case CMD_BLOCKSTATUS: - case CMD_MARKS: - case CMD_QUERY: - if (!startup_complete) { - snprintf(reply, sizeof(reply), - "%s.%ld.loading.%s", - msgline->id, - now.tv_sec, - msgline->cmd); - send_unix_msg(sockd, reply); - } else { - msgline->sockd = sockd; - sockd = -1; - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - if (btc) - k_add_tail(btc_workqueue_store, wq_item); - else - k_add_tail(cmd_workqueue_store, wq_item); - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - } - break; - // Always process immediately: - case CMD_AUTH: - case CMD_ADDRAUTH: - case CMD_HEARTBEAT: - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } - DATA_MSGLINE(msgline, ml_item); - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", - msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - replied = true; - // Always queue (ok.queued) - case CMD_SHARELOG: - case CMD_POOLSTAT: - case CMD_USERSTAT: - case CMD_WORKERSTAT: - case CMD_BLOCK: - if (!replied) { - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } - snprintf(reply, sizeof(reply), - "%s.%ld.ok.queued", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - } - - K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_free); - DATA_WORKQUEUE(workqueue, wq_item); - workqueue->msgline_item = ml_item; - workqueue->by = by_default; - workqueue->code = (char *)__func__; - workqueue->inet = inet_default; - k_add_tail(pool_workqueue_store, wq_item); - /* Stop the reload queue from growing too big - * Use a size that should be big enough */ - if (reloading && pool_workqueue_store->count > 250000) { - K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); - K_WUNLOCK(workqueue_free); - WORKQUEUE *wq; - DATA_WORKQUEUE(wq, wq2_item); - K_ITEM *ml_item = wq->msgline_item; - free_msgline_data(ml_item, true, false); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - K_WLOCK(workqueue_free); - k_add_head(workqueue_free, wq2_item); - } - K_WUNLOCK(workqueue_free); - wq_item = ml_item = NULL; - mutex_lock(&wq_waitlock); - pthread_cond_signal(&wq_waitcond); - mutex_unlock(&wq_waitlock); - break; - // Code error - default: - LOGEMERG("%s() CODE ERROR unhandled" - " message %d %.32s...", - __func__, cmdnum, buf); - snprintf(reply, sizeof(reply), - "%s.%ld.failed.code", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - } - } - if (sockd >= 0) - close(sockd); - if (ml_item) { - free_msgline_data(ml_item, true, true); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - ml_item = NULL; + // Don't limit the speed filling up cmd_breakqueue_store + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(breakqueue_free); + // keep the lock since none of these should be slow + DATA_BREAKQUEUE(bq, bq_item); + bq->buf = buf; + copy_tv(&(bq->now), &now); + bq->seqentryflags = seqentryflags; + bq->sockd = sockd; + if (max_sockd_count < ++sockd_count) + max_sockd_count = sockd_count; + k_add_tail(cmd_breakqueue_store, bq_item); + K_WUNLOCK(breakqueue_free); } - - tick(); } socketer_using_data = false; - if (buf) - dealloc(buf); close_unix_socket(us->sockd, us->path); + // Since the socket is dead ... + everyone_die = true; + return NULL; } -static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) +static void *process_reload(__maybe_unused void *arg) { + PGconn *conn = NULL; + MSGLINE *msgline = NULL; + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; enum cmd_values cmdnum; - char *end, *ans, *st = NULL; - MSGLINE *msgline; - K_ITEM *ml_item; - tv_t now; - bool matched; + char *ans, *st = NULL; + time_t now; - // Once we've read the message - setnow(&now); - if (buf) { - end = buf + strlen(buf) - 1; - // strip trailing \n and \r - while (end >= buf && (*end == '\n' || *end == '\r')) - *(end--) = '\0'; - } - if (!buf || !*buf) { - if (!buf) { - LOGERR("%s() NULL message line %"PRIu64, - __func__, count); - } else { - LOGERR("%s() Empty message line %"PRIu64, - __func__, count); - } - } else { - matched = false; - ck_wlock(&fpm_lock); - if (first_pool_message && - strcmp(first_pool_message, buf) == 0) { - matched = true; - FREENULL(first_pool_message); + pthread_detach(pthread_self()); + + LOCK_INIT("db_procreload"); + rename_proc("db_procreload"); + + conn = dbconnect(); + now = time(NULL); + + while (!everyone_die) { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(reload_done_breakqueue_store); + if (bq_item) + reload_processing++; + K_WUNLOCK(breakqueue_free); + + if (!bq_item) { + // Finished reloading? + if (!reloading) + break; + + cksleep_ms(24); + continue; } - ck_wunlock(&fpm_lock); - if (matched) { - LOGERR("%s() reload ckpool queue match at line %"PRIu64, - __func__, count); + + // Don't keep a connection for more than ~10s ... of processing + if ((time(NULL) - now) > 10) { + PQfinish(conn); + conn = dbconnect(); + now = time(NULL); } - // ml_item is set for all but CMD_REPLY - cmdnum = breakdown(&ml_item, buf, &now, SE_RELOAD); - DATA_MSGLINE(msgline, ml_item); - switch (cmdnum) { + DATA_BREAKQUEUE(bq, bq_item); + DATA_MSGLINE(msgline, bq->ml_item); + switch (bq->cmdnum) { // Ignore case CMD_REPLY: case CMD_ALERTEVENT: @@ -4726,7 +4952,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_HIGH: LOGERR("%s() INVALID message line %"PRIu64 " ignored '%.42s...", - __func__, count, + __func__, bq->count, st = safe_text(msgline->msg)); FREENULL(st); break; @@ -4751,7 +4977,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) (char *)__func__, inet_default, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, true); FREENULL(ans); } break; @@ -4759,23 +4985,85 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) // Force this switch to be updated if new cmds are added quithere(1, "%s line %"PRIu64" '%s' - not " "handled by reload", - filename, count, + bq->filename, bq->count, st = safe_text_nonull(msgline->cmd)); // Won't get here ... FREENULL(st); break; } - if (ml_item) { - free_msgline_data(ml_item, true, true); + if (bq->ml_item) { + free_msgline_data(bq->ml_item, true, true); K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); + k_add_head(msgline_free, bq->ml_item); K_WUNLOCK(msgline_free); - ml_item = NULL; + bq->ml_item = NULL; } + free(bq->buf); + + K_WLOCK(breakqueue_free); + reload_processing--; + k_add_head(breakqueue_free, bq_item); + K_WUNLOCK(breakqueue_free); + + tick(); } - tick(); + PQfinish(conn); + + return NULL; +} + +static void reload_line(char *filename, char *buf, uint64_t count) +{ + K_ITEM *bq_item = NULL; + BREAKQUEUE *bq = NULL; + int qcount; + char *end; + tv_t now; + + // Once we've read the message + setnow(&now); + if (buf) { + end = buf + strlen(buf) - 1; + // strip trailing \n and \r + while (end >= buf && (*end == '\n' || *end == '\r')) + *(end--) = '\0'; + } + if (!buf || !*buf) { + if (!buf) { + LOGERR("%s() NULL message line %"PRIu64, + __func__, count); + } else { + LOGERR("%s() Empty message line %"PRIu64, + __func__, count); + } + } else { + K_WLOCK(breakqueue_free); + bq_item = k_unlink_head(breakqueue_free); + K_WUNLOCK(breakqueue_free); + + // release the lock since strdup could be slow, but rarely + DATA_BREAKQUEUE(bq, bq_item); + bq->buf = strdup(buf); + copy_tv(&(bq->now), &now); + bq->seqentryflags = SE_RELOAD; + bq->sockd = -1; + bq->count = count; + bq->filename = filename; + + K_WLOCK(breakqueue_free); + k_add_tail(reload_breakqueue_store, bq_item); + qcount = reload_breakqueue_store->count; + K_WUNLOCK(breakqueue_free); + + while (qcount > RELOAD_QUEUE_LIMIT) { + cksleep_ms(RELOAD_QUEUE_SLEEP); + K_RLOCK(breakqueue_free); + qcount = reload_breakqueue_store->count; + K_RUNLOCK(breakqueue_free); + } + } } // 10Mb for now - transactiontree can be large @@ -4840,7 +5128,8 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) errn, buf); } else { *apipe = true; - free(*filename); + /* Don't free the old filename since + * process_reload() could still access it */ *filename = name; return true; } @@ -4862,12 +5151,12 @@ static bool logopen(char **filename, FILE **fp, bool *apipe) * if ckdb aborts at the beginning of the reload, then start again */ static bool reload_from(tv_t *start) { - PGconn *conn = NULL; + // proc_pt could exit after this returns + static pthread_t proc_pt; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; size_t rflen = strlen(restorefrom); char *missingfirst = NULL, *missinglast = NULL, *st = NULL; - int missing_count; - int processing; + int missing_count, processing, counter; bool finished = false, ret = true, ok, apipe = false; char *filename = NULL; uint64_t count, total; @@ -4875,6 +5164,7 @@ static bool reload_from(tv_t *start) double diff; FILE *fp = NULL; int file_N_limit; + time_t tick_time, tmp_time; reload_buf = malloc(MAX_READ); if (!reload_buf) @@ -4904,10 +5194,11 @@ static bool reload_from(tv_t *start) LOGQUE(reload_buf, true); LOGQUE(reload_buf, false); - conn = dbconnect(); + create_pthread(&proc_pt, process_reload, NULL); total = 0; processing = 0; + tick_time = time(NULL); while (!everyone_die && !finished) { LOGWARNING("%s(): processing %s", __func__, filename); processing++; @@ -4920,7 +5211,30 @@ static bool reload_from(tv_t *start) * order messages in the log file */ while (!everyone_die && logline(reload_buf, MAX_READ, fp, filename)) { - reload_line(conn, filename, ++count, reload_buf); + reload_line(filename, reload_buf, ++count); + + tmp_time = time(NULL); + // Report stats every 15s + if ((tmp_time - tick_time) > 14) { + int relq, relqd, cmdq, cmdqd, mx, poolq; + K_RLOCK(breakqueue_free); + relq = reload_breakqueue_store->count + + reload_processing; + relqd = reload_done_breakqueue_store->count; + cmdq = cmd_breakqueue_store->count; + cmdqd = cmd_done_breakqueue_store->count; + mx = max_sockd_count; + K_RUNLOCK(breakqueue_free); + K_RLOCK(workqueue_free); + poolq = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); + printf(TICK_PREFIX"reload %"PRIu64"/%d/%d" + " ckp %d/%d/%d (%d) \r", + total+count, relq, relqd, + cmdq, cmdqd, poolq, mx); + fflush(stdout); + tick_time = tmp_time; + } } LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", @@ -4938,7 +5252,8 @@ static bool reload_from(tv_t *start) } } else fclose(fp); - free(filename); + /* Don't free the old filename since + * process_reload() could access use it */ if (everyone_die) break; reload_timestamp.tv_sec += ROLL_S; @@ -4995,7 +5310,15 @@ static bool reload_from(tv_t *start) } } - PQfinish(conn); + while (!everyone_die) { + K_RLOCK(breakqueue_free); + counter = reload_done_breakqueue_store->count + + reload_breakqueue_store->count + reload_processing; + K_RUNLOCK(breakqueue_free); + if (counter == 0) + break; + cksleep_ms(142); + } setnow(&now); diff = tvdiff(&now, &begin); @@ -5060,7 +5383,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item) workqueue->code, workqueue->inet, &(msgline->cd), - msgline->trf_root); + msgline->trf_root, false); FREENULL(ans); break; } @@ -5099,6 +5422,7 @@ static void *listener(void *arg) pthread_t sock_pt; pthread_t summ_pt; pthread_t mark_pt; + pthread_t break_pt; K_ITEM *wq_item; time_t now; int wqcount, wqgot; @@ -5109,7 +5433,10 @@ static void *listener(void *arg) SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; - int i; + int cpus, i; + bool reloader, cmder; + + pthread_detach(pthread_self()); LOCK_INIT("db_plistener"); rename_proc("db_plistener"); @@ -5118,9 +5445,29 @@ static void *listener(void *arg) ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); logqueue_store = k_new_store(logqueue_free); + breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), + ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); + reload_breakqueue_store = k_new_store(breakqueue_free); + reload_done_breakqueue_store = k_new_store(breakqueue_free); + cmd_breakqueue_store = k_new_store(breakqueue_free); + cmd_done_breakqueue_store = k_new_store(breakqueue_free); + #if LOCK_CHECK DLPRIO(logqueue, 94); + DLPRIO(breakqueue, PRIO_TERMINAL); #endif + if (breakdown_threads <= 0) { + cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; + breakdown_threads = (int)(cpus / 3) ? : 1; + } + LOGWARNING("%s(): creating %d*2 breaker threads ...", + __func__, breakdown_threads); + reloader = true; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &reloader); + cmder = false; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &cmder); create_pthread(&log_pt, logger, NULL); @@ -5192,7 +5539,6 @@ static void *listener(void *arg) if (wq_item) { wqgot++; process_queued(conn, wq_item); - tick(); } if (left == 0 && wq_stt.tv_sec != 0L) { @@ -5838,8 +6184,10 @@ static void check_restore_dir(char *name) static struct option long_options[] = { // script to call when alerts happen { "alert", required_argument, 0, 'a' }, - // workinfo to start shares_fill() default is 1 day + // workinfoid to start shares_fill() default is 1 day { "shares-begin", required_argument, 0, 'b' }, + // override calculated value + { "breakdown-threads", required_argument, 0, 'B' }, { "config", required_argument, 0, 'c' }, { "dbname", required_argument, 0, 'd' }, { "minsdiff", required_argument, 0, 'D' }, @@ -5901,7 +6249,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:Ikl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'a': len = strlen(optarg); @@ -5921,6 +6269,19 @@ int main(int argc, char **argv) } shares_begin = beg; } + break; + case 'B': + { + int bt = atoi(optarg); + if (bt < 1) { + quit(1, "Invalid breakdown " + "thread count %d " + "- must be > 0", + bt); + } + breakdown_threads = bt; + } + break; case 'c': ckp.config = strdup(optarg); break; @@ -6172,6 +6533,7 @@ int main(int argc, char **argv) ckp.main.ckp = &ckp; ckp.main.processname = strdup("main"); + cklock_init(&breakdown_lock); cklock_init(&last_lock); cklock_init(&btc_lock); cklock_init(&poolinstance_lock); @@ -6219,11 +6581,12 @@ int main(int argc, char **argv) time_t start, trigger, curr; char *msg = NULL; + everyone_die = true; trigger = start = time(NULL); while (socketer_using_data || summariser_using_data || logger_using_data || plistener_using_data || clistener_using_data || blistener_using_data || - marker_using_data) { + marker_using_data || breakdown_using_data) { msg = NULL; curr = time(NULL); if (curr - start > 4) { @@ -6235,7 +6598,7 @@ int main(int argc, char **argv) } if (msg) { trigger = curr; - printf("%s %ds due to%s%s%s%s%s%s%s\n", + printf("%s %ds due to%s%s%s%s%s%s%s%s\n", msg, (int)(curr - start), socketer_using_data ? " socketer" : EMPTY, summariser_using_data ? " summariser" : EMPTY, @@ -6243,7 +6606,8 @@ int main(int argc, char **argv) plistener_using_data ? " plistener" : EMPTY, clistener_using_data ? " clistener" : EMPTY, blistener_using_data ? " blistener" : EMPTY, - marker_using_data ? " marker" : EMPTY); + marker_using_data ? " marker" : EMPTY, + breakdown_using_data ? " breakdown" : EMPTY); fflush(stdout); } sleep(1); diff --git a/src/ckdb.h b/src/ckdb.h index 9f1579a4..9487deb8 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,7 +51,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-1.990" +#define CKDB_VERSION DB_VERSION"-2.000" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1037,7 +1037,7 @@ typedef struct msgline { #define ALLOC_MSGLINE 8192 #define LIMIT_MSGLINE 0 -#define CULL_MSGLINE 16 +#define CULL_MSGLINE 8 #define INIT_MSGLINE(_item) INIT_GENERIC(_item, msgline) #define DATA_MSGLINE(_var, _item) DATA_GENERIC(_var, _item, msgline, true) #define DATA_MSGLINE_NULL(_var, _item) DATA_GENERIC(_var, _item, msgline, false) @@ -1045,6 +1045,57 @@ typedef struct msgline { extern K_LIST *msgline_free; extern K_STORE *msgline_store; +// BREAKQUEUE +typedef struct breakqueue { + char *buf; + tv_t now; + int seqentryflags; + int sockd; + enum cmd_values cmdnum; + K_ITEM *ml_item; + uint64_t count; + char *filename; +} BREAKQUEUE; + +#define ALLOC_BREAKQUEUE 16384 +#define LIMIT_BREAKQUEUE 0 +#define CULL_BREAKQUEUE 4 +#define INIT_BREAKQUEUE(_item) INIT_GENERIC(_item, breakqueue) +#define DATA_BREAKQUEUE(_var, _item) DATA_GENERIC(_var, _item, breakqueue, true) + +/* If a breaker() thread's done break queue count hits the LIMIT, or is empty, + * it will sleep for SLEEP ms + * So this means that with a single breaker() thread, + * it can process at most LIMIT records per SLEEP ms + * or: 1000 * LIMIT / SLEEP records per second + * For N breaker() threads, that would mean between 1 and N times that value + * dependent upon the random time spacing of the N thread sleeps + * However, also note that LIMIT defines how much RAM can be used by + * the break queues, so a limit is required + * A breakqueue item can get quite large since it includes both buf + * and ml_item (which has the transfer data) in the 'done' queue + * Of course the processing speed of the ml_items will also decide how big the + * break queue count can get + * Note that if the CMD queues get too large they will be too slow responding + * to the sockets that sent the message, however the CMD ml_item processing + * responds immediately before processing the ml_item for all but ADDRAUTH, + * AUTHORISE and HEARTBEAT + * The reload also uses this limit when filling the reload break queue + * thus limiting the line processing of reload files + */ +// 16300,42 equated to single thread limitation of ~388k per second +#define RELOAD_QUEUE_LIMIT 16300 +#define RELOAD_QUEUE_SLEEP 42 +#define CMD_QUEUE_LIMIT 16300 +#define CMD_QUEUE_SLEEP 42 + +extern K_LIST *breakqueue_free; +extern K_STORE *reload_breakqueue_store; +extern K_STORE *reload_done_breakqueue_store; +extern K_STORE *cmd_breakqueue_store; +extern K_STORE *cmd_done_breakqueue_store; +extern int max_sockd_count; + // WORKQUEUE typedef struct workqueue { K_ITEM *msgline_item; @@ -1093,7 +1144,7 @@ typedef struct transfer { // Suggest malloc use MMAP - 1913 = largest under 2MB #define ALLOC_TRANSFER 1913 #define LIMIT_TRANSFER 0 -#define CULL_TRANSFER 64 +#define CULL_TRANSFER 32 #define INIT_TRANSFER(_item) INIT_GENERIC(_item, transfer) #define DATA_TRANSFER(_var, _item) DATA_GENERIC(_var, _item, transfer, true) @@ -3124,7 +3175,7 @@ extern bool auths_add(PGconn *conn, char *poolinstance, char *username, char *useragent, char *preauth, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, bool addressuser, USERS **users, WORKERS **workers, - int *event); + int *event, bool reload_data); extern bool poolstats_add(PGconn *conn, bool store, char *poolinstance, char *elapsed, char *users, char *workers, char *hashrate, char *hashrate5m, @@ -3192,7 +3243,7 @@ struct CMDS { bool noid; // doesn't require an id bool createdate; // requires a createdate char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, - char *, tv_t *, K_TREE *); + char *, tv_t *, K_TREE *, bool); enum seq_num seq; int access; }; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 3c451657..62b7ac0b 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -34,7 +34,7 @@ static K_ITEM *adminuser(K_TREE *trf_root, char *reply, size_t siz) static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *notcd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -87,7 +87,8 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_oldhash, *i_newhash, *i_2fa, *u_item; char reply[1024] = ""; @@ -166,7 +167,8 @@ static char *cmd_newpass(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_passwordhash, *i_2fa, *u_item; char reply[1024] = ""; @@ -222,7 +224,8 @@ static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_2fa(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_action, *i_entropy, *i_value, *u_item, *u_new; char reply[1024] = ""; @@ -464,7 +467,8 @@ dame: static char *cmd_userset(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_passwordhash, *i_2fa, *i_rows, *i_address; K_ITEM *i_ratio, *i_payname, *i_email, *u_item, *pa_item, *old_pa_item; @@ -781,7 +785,7 @@ struckout: static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { K_ITEM *i_username, *i_workername, *i_diffdef, *i_oldworkers; K_ITEM *u_item, *ua_item, *w_item; @@ -1081,7 +1085,7 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { bool igndup = false; @@ -1096,7 +1100,8 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root) + char *inet, tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1180,7 +1185,8 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_workerstats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root) + char *inet, tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1263,7 +1269,8 @@ static char *cmd_blocklist(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { int ovent = OVENT_OK; K_TREE_CTX ctx[1]; @@ -1525,7 +1532,8 @@ redo: static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_height, *i_blockhash, *i_action, *i_info; char reply[1024] = ""; @@ -1682,7 +1690,7 @@ static char *cmd_blockstatus(PGconn *conn, char *cmd, char *id, tv_t *now, static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -1718,7 +1726,8 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *p_item, *p2_item, *po_item; K_TREE_CTX ctx[1]; @@ -2056,7 +2065,8 @@ static char *cmd_percent(char *cmd, char *id, tv_t *now, USERS *users) static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_stats, *i_percent, w_look, *u_item, *w_item; K_ITEM *ua_item, *us_item, *ws_item; @@ -2382,7 +2392,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_STORE *usu_store = k_new_store(userstats_free); K_ITEM *us_item, *usu_item, *u_item; @@ -2494,7 +2505,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -2946,7 +2958,7 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, int32_t height, char *id, static char *cmd_blocks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -2969,12 +2981,13 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id, igndup = true; } - return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup, trf_root); + return cmd_blocks_do(conn, cmd, height, id, by, code, inet, cd, igndup, + trf_root); } static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { K_ITEM tmp_poolinstance_item; TRANSFER tmp_poolinstance; @@ -3046,7 +3059,8 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, u_item = find_users(username); K_RUNLOCK(users_free); if (!u_item) { - event = events_add(EVENTID_AUTOACC, trf_root); + if (!reload_data) + event = events_add(EVENTID_AUTOACC, trf_root); if (event == EVENT_OK) { DATA_OPTIONCONTROL(optioncontrol, oc_item); u_item = users_add(conn, username, EMPTY, @@ -3067,7 +3081,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, transfer_data(i_useragent), transfer_data(i_preauth), by, code, inet, cd, trf_root, false, - &users, &workers, &event); + &users, &workers, &event, reload_data); } if (!ok) { @@ -3123,14 +3137,15 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_auth(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { - return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root); + return cmd_auth_do(conn, cmd, id, by, code, inet, cd, trf_root, + reload_data); } static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { K_ITEM tmp_poolinstance_item; TRANSFER tmp_poolinstance; @@ -3201,7 +3216,7 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, transfer_data(i_useragent), transfer_data(i_preauth), by, code, inet, cd, trf_root, true, - &users, &workers, &event); + &users, &workers, &event, reload_data); if (!ok) { LOGDEBUG("%s() %s.failed.DBE", __func__, id); @@ -3256,16 +3271,18 @@ static char *cmd_addrauth_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_addrauth(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, bool reload_data) { - return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root); + return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, trf_root, + reload_data); } static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *cd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { HEARTBEATQUEUE *heartbeatqueue; K_STORE *hq_store; @@ -3331,7 +3348,8 @@ pulse: static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look; K_ITEM *ua_item, *pa_item; @@ -3506,12 +3524,16 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, int psync = pool_workqueue_store->count; int csync = cmd_workqueue_store->count; int bsync = btc_workqueue_store->count; + int qsync = breakqueue_free->total - breakqueue_free->count; snprintf(tmp, sizeof(tmp), "psync=%d%c", psync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "csync=%d%c", csync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "bsync=%d%c", bsync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); + snprintf(tmp, sizeof(tmp), "qsync=%d%c", qsync, FLDSEP); + APPEND_REALLOC(buf, off, len, tmp); + // qsync isn't part of 'sync' snprintf(tmp, sizeof(tmp), "sync=%d%c", psync + csync + bsync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); @@ -3622,7 +3644,8 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_getatts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_attlist, *u_item, *ua_item; char reply[1024] = ""; @@ -3798,7 +3821,8 @@ static void att_to_date(tv_t *date, char *data, tv_t *now) * */ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { ExecStatusType rescode; PGresult *res; @@ -3967,7 +3991,8 @@ bats: static char *cmd_expatts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_attlist, *u_item, *ua_item; char reply[1024] = ""; @@ -4050,7 +4075,8 @@ rats: static char *cmd_getopts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_optlist, *oc_item; char reply[1024] = ""; @@ -4128,7 +4154,8 @@ ruts: * See opt_set_date() above */ static char *cmd_setopts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { ExecStatusType rescode; PGresult *res; @@ -4276,9 +4303,10 @@ rollback: * and the breakdown for percent address users, * the totals per user and per payout should still be the same */ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, - __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024], tmp[1024], *buf; char *block_extra, *share_status = EMPTY, *marks_status = EMPTY; @@ -4753,9 +4781,10 @@ shazbot: // Generated from the payouts, miningpayouts and payments data static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id, - __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024], tmp[1024], *buf; char *block_extra, *marks_status = EMPTY; @@ -5022,7 +5051,8 @@ shazbot: static char *cmd_payouts(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5274,7 +5304,8 @@ static char *cmd_mpayouts(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *u_item, *mp_item, *po_item; K_TREE_CTX ctx[1]; @@ -5466,7 +5497,8 @@ static int select_list(WM *wm, char *select) static char *cmd_shifts(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username, *i_select; K_ITEM *u_item, *p_item, *m_item, ms_look, *wm_item, *ms_item, *wi_item; @@ -5815,7 +5847,8 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { __maybe_unused K_ITEM *i_file; __maybe_unused char reply[1024] = ""; @@ -5861,7 +5894,9 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { char tmp[1024], *buf; const char *name; @@ -5949,7 +5984,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, // TODO: add to heartbeat to disable the miner if active and status != "" static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, __maybe_unused tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6016,7 +6051,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char * static char *cmd_marks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, char *code, char *inet, tv_t *cd, - K_TREE *trf_root) + K_TREE *trf_root, __maybe_unused bool reload_data) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6535,7 +6570,8 @@ dame: static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_username; K_ITEM *u_item, *p_item, *m_item, *wm_item, *ms_item, *wi_item; @@ -6740,18 +6776,26 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, } /* Show a share status report on the console - * Currently: sequence status and OoO info */ + * Currently: sequence status, OoO info and max_sockd_count */ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { char ooo_buf[256]; char buf[256]; + int count; LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); sequence_report(true); + K_RLOCK(breakqueue_free); + count = max_sockd_count; + K_RUNLOCK(breakqueue_free); + LOGWARNING(" max_sockd_count=%d", count); + snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf); return strdup(buf); @@ -6761,7 +6805,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *notcd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *ui_item; USERINFO *userinfo; @@ -6850,7 +6895,8 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd, K_TREE *trf_root) + __maybe_unused tv_t *notcd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_btcserver, *i_userpass; char *btcserver = NULL, *userpass = NULL, *tmp; @@ -6902,7 +6948,8 @@ static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_query(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_TREE_CTX ctx[1]; char cd_buf[DATE_BUFSIZ]; @@ -7498,7 +7545,8 @@ static char *cmd_locks(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *cd, - __maybe_unused K_TREE *trf_root) + __maybe_unused K_TREE *trf_root, + __maybe_unused bool reload_data) { bool code_locks = false, code_deadlocks = false; bool was_locks = false, was_deadlocks = false; @@ -7610,7 +7658,8 @@ static void event_tree(K_TREE *the_tree, char *list, char *reply, size_t siz, static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { K_ITEM *i_action, *i_cmd, *i_list, *i_ip, *i_eventname, *i_lifetime; K_ITEM *i_des, *i_item, *next_item, *o_item; @@ -8093,7 +8142,8 @@ static char *cmd_events(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_high(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *cd, K_TREE *trf_root) + __maybe_unused tv_t *cd, K_TREE *trf_root, + __maybe_unused bool reload_data) { bool conned = false; K_TREE_CTX ctx[1]; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index f4c33b50..e42252de 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -7100,7 +7100,7 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, char *useragent, char *preauth, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, bool addressuser, USERS **users, WORKERS **workers, - int *event) + int *event, bool reload_data) { K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item, *w_item; @@ -7131,7 +7131,8 @@ bool auths_add(PGconn *conn, char *poolinstance, char *username, __func__, st = safe_text_nonull(username)); FREENULL(st); - *event = events_add(EVENTID_INVAUTH, trf_root); + if (!reload_data) + *event = events_add(EVENTID_INVAUTH, trf_root); } if (!u_item) goto unitem;