diff --git a/src/ckdb.c b/src/ckdb.c index 8e783804..1ca385ed 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -115,6 +115,7 @@ static bool plistener_using_data; static bool clistener_using_data; static bool blistener_using_data; static bool breakdown_using_data; +static bool replier_using_data; // -B to override calculated value static int breakdown_threads = -1; @@ -124,6 +125,9 @@ static int cmd_breakdown_count = 0; * Any change to/from 0 will update breakdown_using_data */ static cklock_t breakdown_lock; +static int replier_count = 0; +static cklock_t replier_lock; + char *EMPTY = ""; const char *nullstr = "(null)"; @@ -294,6 +298,9 @@ tv_t last_auth; cklock_t last_lock; // Running stats +// replier() +double reply_full_us; +uint64_t reply_sent, reply_cant, reply_discarded, reply_fails; // socketer() tv_t sock_stt; double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; @@ -373,6 +380,26 @@ int cmd_processing; int sockd_count; int max_sockd_count; +// Trigger breaker() processing +mutex_t bq_reload_waitlock; +mutex_t bq_cmd_waitlock; +pthread_cond_t bq_reload_waitcond; +pthread_cond_t bq_cmd_waitcond; + +uint64_t bq_reload_signals, bq_cmd_signals; +uint64_t bq_reload_wakes, bq_cmd_wakes; +uint64_t bq_reload_timeouts, bq_cmd_timeouts; + +// Trigger reload/socket *_done_* processing +mutex_t process_reload_waitlock; +mutex_t process_socket_waitlock; +pthread_cond_t process_reload_waitcond; +pthread_cond_t process_socket_waitcond; + +uint64_t process_reload_signals, process_socket_signals; +uint64_t process_reload_wakes, process_socket_wakes; +uint64_t process_reload_timeouts, process_socket_timeouts; + // WORKQUEUE K_LIST *workqueue_free; // pool0 is all pool data during the reload @@ -385,6 +412,40 @@ int64_t earlysock_left; int64_t pool0_tot; int64_t pool0_discarded; +// Trigger workqueue threads +mutex_t wq_pool_waitlock; +mutex_t wq_cmd_waitlock; +mutex_t wq_btc_waitlock; +pthread_cond_t wq_pool_waitcond; +pthread_cond_t wq_cmd_waitcond; +pthread_cond_t wq_btc_waitcond; + +uint64_t wq_pool_signals, wq_cmd_signals, wq_btc_signals; +uint64_t wq_pool_wakes, wq_cmd_wakes, wq_btc_wakes; +uint64_t wq_pool_timeouts, wq_cmd_timeouts, wq_btc_timeouts; + +// REPLIES +K_LIST *replies_free; +K_STORE *replies_store; +K_TREE *replies_pool_root; +K_TREE *replies_cmd_root; +K_TREE *replies_btc_root; + +int epollfd_pool; +int epollfd_cmd; +int epollfd_btc; + +int rep_tot_sockd; +int rep_failed_sockd; +int rep_max_sockd; +// maximum counts and fd values +int rep_max_pool_sockd; +int rep_max_cmd_sockd; +int rep_max_btc_sockd; +int rep_max_pool_sockd_fd; +int rep_max_cmd_sockd_fd; +int rep_max_btc_sockd_fd; + // HEARTBEATQUEUE K_LIST *heartbeatqueue_free; K_STORE *heartbeatqueue_store; @@ -807,12 +868,17 @@ void logmsg(int loglevel, const char *fmt, ...) free(buf); } +void setnowts(ts_t *now) +{ + now->tv_sec = 0; + now->tv_nsec = 0; + clock_gettime(CLOCK_REALTIME, now); +} + void setnow(tv_t *now) { ts_t spec; - spec.tv_sec = 0; - spec.tv_nsec = 0; - clock_gettime(CLOCK_REALTIME, &spec); + setnowts(&spec); now->tv_sec = spec.tv_sec; now->tv_usec = spec.tv_nsec / 1000; } @@ -864,14 +930,170 @@ static void check_createdate_ccl(char *cmd, tv_t *cd) STRNCPY(last_cmd, cmd); } -void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS) +static int _ckdb_unix_write(int sockd, const char *msg, int len, WHERE_FFL_ARGS) { - bool ok; + int ret, ofs = 0; + + while (len) { + ret = write(sockd, msg + ofs, len); + if (ret < 0) { + int e = errno; + LOGERR("%s() Failed to write %d bytes (%d:%s)" WHERE_FFL, + __func__, len, e, strerror(e), WHERE_FFL_PASS); + return -1; + } + ofs += ret; + len -= ret; + } + return ofs; +} + +static void _ckdb_unix_send(int sockd, const char *msg, WHERE_FFL_ARGS) +{ + bool warn = true; + uint32_t msglen; + size_t len; + int ret; - ok = _send_unix_msg(sockd, msg, UNIX_WRITE_TIMEOUT, WHERE_FFL_PASS); - if (!ok) { - LOGWARNING(" msg was %.42s%s", msg ? : nullstr, - msg ? "..." : EMPTY); + if (sockd < 0) { + LOGWARNING("%s() invalid socket %d" WHERE_FFL, + __func__, sockd, WHERE_FFL_PASS); + goto tamanee; + } + + if (!msg) { + LOGWARNING("%s() null msg on socket %d" WHERE_FFL, + __func__, sockd, WHERE_FFL_PASS); + warn = false; + goto tamanee; + } + + len = strlen(msg); + if (!len) { + LOGWARNING("%s() zero length msg on socket %d" WHERE_FFL, + __func__, sockd, WHERE_FFL_PASS); + warn = false; + goto tamanee; + } + + msglen = htole32(len); + ret = _ckdb_unix_write(sockd, (const char *)(&msglen), 4, + WHERE_FFL_PASS); + if (ret < 4) { + LOGERR("%s() failed to write four bytes on socket %d" WHERE_FFL, + __func__, sockd, WHERE_FFL_PASS); + goto tamanee; + } + + ret = _ckdb_unix_write(sockd, msg, len, WHERE_FFL_PASS); + if (ret < (int)len) { + LOGERR("%s() failed2 to write %d bytes on socket %d" WHERE_FFL, + __func__, (int)len, sockd, WHERE_FFL_PASS); + goto tamanee; + } + + warn = false; +tamanee: + if (warn) { + LOGWARNING(" msg was %d %.42s%s", + sockd, msg ? : nullstr, msg ? "..." : EMPTY); + } +} + +#define ckdb_unix_msg(_typ, _sockd, _msg, _ml, _dup) \ + _ckdb_unix_msg(_typ, _sockd, _msg, _ml, _dup, WHERE_FFL_HERE) + +static void _ckdb_unix_msg(enum reply_type reply_typ, int sockd, char *msg, + MSGLINE *ml, bool dup, WHERE_FFL_ARGS) +{ + K_TREE *reply_root = NULL; + REPLIES *replies = NULL; + K_ITEM *r_item = NULL; + int epollfd, ret, *rep_max, *rep_max_fd; + char *ptr; + tv_t now; + + switch(reply_typ) { + case REPLIER_POOL: + reply_root = replies_pool_root; + epollfd = epollfd_pool; + rep_max = &rep_max_pool_sockd; + rep_max_fd = &rep_max_pool_sockd_fd; + break; + case REPLIER_CMD: + reply_root = replies_cmd_root; + epollfd = epollfd_cmd; + rep_max = &rep_max_cmd_sockd; + rep_max_fd = &rep_max_cmd_sockd_fd; + break; + case REPLIER_BTC: + // Let the btc thread handle unknowns + default: + reply_root = replies_btc_root; + epollfd = epollfd_btc; + rep_max = &rep_max_btc_sockd; + rep_max_fd = &rep_max_btc_sockd_fd; + if (reply_typ != REPLIER_BTC) { + char *st = NULL; + LOGEMERG("%s() CODE ERROR unknown reply_type %d" + " msg %.32s...", + __func__, reply_typ, + st = safe_text_nonull(msg)); + FREENULL(st); + } + break; + } + + if (!dup) + ptr = msg; + else { + ptr = strdup(msg); + if (!ptr) + quithere(1, "strdup (%d) OOM", (int)strlen(msg)); + } + + K_WLOCK(replies_free); + // ensure now is unique + setnow(&now); + r_item = k_unlink_head(replies_free); + K_WUNLOCK(replies_free); + DATA_REPLIES(replies, r_item); + copy_tv(&(replies->now), &now); + copy_tv(&(replies->createdate), &(ml->now)); + copy_tv(&(replies->accepted), &(ml->accepted)); + copy_tv(&(replies->broken), &(ml->broken)); + copy_tv(&(replies->processed), &(ml->processed)); + replies->event.events = EPOLLOUT | EPOLLHUP; + replies->event.data.ptr = r_item; + replies->sockd = sockd; + replies->reply = ptr; + replies->file = file; + replies->func = func; + replies->line = line; + K_WLOCK(replies_free); + ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, replies->sockd, &(replies->event)); + if (ret == 0) { + k_add_head(replies_store, r_item); + add_to_ktree(reply_root, r_item); + rep_tot_sockd++; + if (rep_max_sockd < replies_store->count) + rep_max_sockd = replies_store->count; + if (*rep_max < reply_root->node_store->count) + *rep_max = reply_root->node_store->count; + if (*rep_max_fd < sockd) + *rep_max_fd = sockd; + } else { + k_add_head(replies_free, r_item); + rep_failed_sockd++; + } + K_WUNLOCK(replies_free); + if (ret != 0) { + char *st = NULL; + int e = errno; + LOGEMERG("%s() failed to epoll add reply (%d:%s) msg=%.32s...", + __func__, e, strerror(e), st = safe_text(msg)); + FREENULL(st); + free(ptr); } } @@ -1198,6 +1420,13 @@ static void alloc_storage() cmd_workqueue_store = k_new_store(workqueue_free); btc_workqueue_store = k_new_store(workqueue_free); + replies_free = k_new_list("Replies", sizeof(REPLIES), + ALLOC_REPLIES, LIMIT_REPLIES, true); + replies_store = k_new_store(replies_free); + replies_pool_root = new_ktree("RepliesPool", cmp_replies, replies_free); + replies_cmd_root = new_ktree("RepliesCmd", cmp_replies, replies_free); + replies_btc_root = new_ktree("RepliesBTC", cmp_replies, replies_free); + heartbeatqueue_free = k_new_list("HeartBeatQueue", sizeof(HEARTBEATQUEUE), ALLOC_HEARTBEATQUEUE, @@ -1459,6 +1688,7 @@ static void alloc_storage() DLPRIO(idcontrol, PRIO_TERMINAL); DLPRIO(paymentaddresses, PRIO_TERMINAL); DLPRIO(ips, PRIO_TERMINAL); + DLPRIO(replies, PRIO_TERMINAL); DLPCHECK(); @@ -1760,6 +1990,11 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); + FREE_TREE(replies_pool); + FREE_TREE(replies_cmd); + FREE_TREE(replies_btc); + FREE_STORE(replies); + FREE_LIST(replies); // TODO: msgline FREE_STORE(pool0_workqueue); FREE_STORE(pool_workqueue); @@ -2991,6 +3226,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, msgline->which_cmds = CMD_UNSET; copy_tv(&(msgline->now), now); copy_tv(&(msgline->cd), now); // default cd to 'now' + DATE_ZERO(&(msgline->accepted)); + DATE_ZERO(&(msgline->broken)); + DATE_ZERO(&(msgline->processed)); msgline->msg = strdup(buf); msgline->seqentryflags = seqentryflags; @@ -3322,11 +3560,14 @@ static void *breaker(void *arg) { K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; + MSGLINE *msgline = NULL; char buf[128]; int thr, zeros; bool reload, was_null, msg = false; int queue_sleep, queue_limit, count; uint64_t processed = 0; + ts_t when, when_add; + int ret; pthread_detach(pthread_self()); @@ -3334,10 +3575,14 @@ static void *breaker(void *arg) reload = *(bool *)(arg); if (reload) { queue_limit = RELOAD_QUEUE_LIMIT; - queue_sleep = RELOAD_QUEUE_SLEEP; + queue_sleep = RELOAD_QUEUE_SLEEP_MS; + when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; } else { queue_limit = CMD_QUEUE_LIMIT; - queue_sleep = CMD_QUEUE_SLEEP; + queue_sleep = CMD_QUEUE_SLEEP_MS; + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; } ck_wlock(&breakdown_lock); @@ -3412,7 +3657,28 @@ static void *breaker(void *arg) if (was_null && reload && !reloading) break; - cksleep_ms(queue_sleep); + setnowts(&when); + timeraddspec(&when, &when_add); + + if (reload) { + mutex_lock(&bq_reload_waitlock); + ret = cond_timedwait(&bq_reload_waitcond, + &bq_reload_waitlock, &when); + if (ret == 0) + bq_reload_wakes++; + else if (errno == ETIMEDOUT) + bq_reload_timeouts++; + mutex_unlock(&bq_reload_waitlock); + } else { + mutex_lock(&bq_cmd_waitlock); + ret = cond_timedwait(&bq_cmd_waitcond, + &bq_cmd_waitlock, &when); + if (ret == 0) + bq_cmd_wakes++; + else if (errno == ETIMEDOUT) + bq_cmd_timeouts++; + mutex_unlock(&bq_cmd_waitlock); + } continue; } @@ -3436,8 +3702,10 @@ static void *breaker(void *arg) } bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags); + DATA_MSGLINE(msgline, bq->ml_item); + setnow(&(msgline->broken)); + copy_tv(&(msgline->accepted), &(bq->accepted)); if (SEQALL_LOG) { - MSGLINE *msgline; K_ITEM *seqall; if (bq->ml_item) { DATA_MSGLINE(msgline, bq->ml_item); @@ -3453,10 +3721,19 @@ static void *breaker(void *arg) } } K_WLOCK(breakqueue_free); - if (reload) + if (reload) { k_add_tail(reload_done_breakqueue_store, bq_item); - else + mutex_lock(&process_reload_waitlock); + process_reload_signals++; + pthread_cond_signal(&process_reload_waitcond); + mutex_unlock(&process_reload_waitlock); + } else { k_add_tail(cmd_done_breakqueue_store, bq_item); + mutex_lock(&process_socket_waitlock); + process_socket_signals++; + pthread_cond_signal(&process_socket_waitcond); + mutex_unlock(&process_socket_waitlock); + } if (breakqueue_free->count == breakqueue_free->total && breakqueue_free->total >= ALLOC_BREAKQUEUE * CULL_BREAKQUEUE) @@ -4475,7 +4752,173 @@ static void *logger(__maybe_unused void *arg) return NULL; } -static void process_sockd(PGconn *conn, K_ITEM *wq_item) +static void *replier(void *arg) +{ + K_TREE *reply_root = NULL; + REPLIES *replies = NULL; + K_ITEM *r_item = NULL, *tmp_item; + K_TREE_CTX ctx[1]; + enum reply_type reply_typ; + struct epoll_event ready; + int epollfd, ret, fails = 0, fails_tot = 0; + char buf[128], *ptr, *st; + bool msg = false; + char typ = '?'; + int discarded, count; + double full_us; + tv_t now; + + pthread_detach(pthread_self()); + + reply_typ = *(enum reply_type *)(arg); + switch (reply_typ) { + case REPLIER_POOL: + typ = 'p'; + reply_root = replies_pool_root; + epollfd = epollfd_pool; + break; + case REPLIER_CMD: + typ = 'c'; + reply_root = replies_cmd_root; + epollfd = epollfd_cmd; + break; + case REPLIER_BTC: + typ = 'b'; + reply_root = replies_btc_root; + epollfd = epollfd_btc; + break; + default: + quithere(1, "%s() started with unknown reply_type %d", + __func__, reply_typ); + break; + } + + snprintf(buf, sizeof(buf), "db_%c%s", typ, __func__); + LOCK_INIT(buf); + rename_proc(buf); + + ck_wlock(&replier_lock); + replier_count++; + replier_using_data = true; + ck_wunlock(&replier_lock); + + LOGNOTICE("%s() %s processing", __func__, buf); + + // _ckdb_unix_msg() deals with adding events + while (!everyone_die) { + ret = epoll_wait(epollfd, &ready, 1, 142); + if (ret == 0) + fails = 0; + else if (ret < 0) { + int e = errno; + fails++; + fails_tot++; + K_WLOCK(replies_free); + reply_fails++; + K_WUNLOCK(replies_free); + LOGEMERG("%s() %c epoll_wait (%d/%d) failed (%d:%s)", + __func__, typ, fails, fails_tot, + e, strerror(e)); + if (fails < 10 && fails_tot < 100) + cksleep_ms(1000); + else { + // Abort on 10 consecutive fails or 100 total + quithere(1, "%c aborting ckdb: epoll_wait " + "(%d/%d) failed (%d:%s)", + typ, fails, fails_tot, + e, strerror(e)); + } + } else { + fails = 0; + // This is OK if there is one thread per reply_root + r_item = (K_ITEM *)(ready.data.ptr); + DATA_REPLIES(replies, r_item); + if (ready.events & EPOLLOUT) { + K_WLOCK(replies_free); + remove_from_ktree(reply_root, r_item); + k_unlink_item(replies_store, r_item); + K_WUNLOCK(replies_free); + _ckdb_unix_send(replies->sockd, replies->reply, + replies->file, replies->func, + replies->line); + FREENULL(replies->reply); + setnow(&now); + full_us = us_tvdiff(&now, &(replies->accepted)); + K_WLOCK(replies_free); + ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, + replies->sockd, &(replies->event)); + close(replies->sockd); + k_add_head(replies_free, r_item); + reply_sent++; + reply_full_us += full_us; + K_WUNLOCK(replies_free); + } else { + K_WLOCK(replies_free); + remove_from_ktree(reply_root, r_item); + k_unlink_item(replies_store, r_item); + ptr = replies->reply; + ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, + replies->sockd, &(replies->event)); + close(replies->sockd); + k_add_head(replies_free, r_item); + reply_cant++; + K_WUNLOCK(replies_free); + LOGWARNING("%s() %c discarding (%"PRIu32") " + "%.42s...", + __func__, typ, ready.events, + st = safe_text(ptr)); + FREENULL(st); + FREENULL(ptr); + } + } + setnow(&now); + discarded = 0; + K_WLOCK(replies_free); + r_item = first_in_ktree(reply_root, ctx); + while (r_item) { + DATA_REPLIES(replies, r_item); + // If the oldest hasn't reached the limit + if (ms_tvdiff(&now, &(replies->now)) < REPLIES_LIMIT_MS) + break; + tmp_item = r_item; + r_item = next_in_ktree(ctx); + discarded++; + remove_from_ktree(reply_root, tmp_item); + k_unlink_item(replies_store, tmp_item); + free(replies->reply); + ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, + replies->sockd, &(replies->event)); + close(replies->sockd); + k_add_head(replies_free, tmp_item); + reply_discarded++; + } + K_WUNLOCK(replies_free); + if (discarded) { + LOGWARNING("%s() %c closed %d old (>=%dms)", + __func__, typ, discarded, REPLIES_LIMIT_MS); + } + } + + K_RLOCK(replies_free); + count = reply_root->node_store->count; + K_RUNLOCK(replies_free); + + LOGNOTICE("%s() %s exiting with tree count %d", __func__, buf, count); + + ck_wlock(&replier_lock); + if (--replier_count < 1) { + msg = true; + replier_using_data = false; + } + ck_wunlock(&replier_lock); + + if (msg) + LOGWARNING("%s() threads shut down", __func__); + + return NULL; +} + +static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_typ) { WORKQUEUE *workqueue; MSGLINE *msgline; @@ -4501,13 +4944,12 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item) snprintf(rep, siz, "%s.%ld.%s", msgline->id, msgline->now.tv_sec, ans); - ckdb_unix_msg(msgline->sockd, rep); - close(msgline->sockd); + setnow(&(msgline->processed)); + ckdb_unix_msg(reply_typ, msgline->sockd, rep, msgline, false); K_WLOCK(breakqueue_free); sockd_count--; K_WUNLOCK(breakqueue_free); FREENULL(ans); - FREENULL(rep); free_msgline_data(ml_item, true, true); K_WLOCK(msgline_free); @@ -4530,9 +4972,14 @@ static void *clistener(__maybe_unused void *arg) K_ITEM *wq_item; tv_t now1, now2; time_t now; + ts_t when, when_add; + int ret; pthread_detach(pthread_self()); + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; + LOCK_INIT("db_clistener"); rename_proc("db_clistener"); @@ -4557,12 +5004,23 @@ static void *clistener(__maybe_unused void *arg) if (wq_item) { setnow(&now1); - process_sockd(conn, wq_item); + process_sockd(conn, wq_item, REPLIER_CMD); setnow(&now2); clis_us += us_tvdiff(&now2, &now1); clis_processed++; - } else - cksleep_ms(4); + } else { + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&wq_cmd_waitlock); + ret = cond_timedwait(&wq_cmd_waitcond, + &wq_cmd_waitlock, &when); + if (ret == 0) + wq_cmd_wakes++; + else if (errno == ETIMEDOUT) + wq_cmd_timeouts++; + mutex_unlock(&wq_cmd_waitlock); + } } LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed); @@ -4581,9 +5039,14 @@ static void *blistener(__maybe_unused void *arg) K_ITEM *wq_item; tv_t now1, now2; time_t now; + ts_t when, when_add; + int ret; pthread_detach(pthread_self()); + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; + LOCK_INIT("db_blistener"); rename_proc("db_blistener"); @@ -4607,12 +5070,23 @@ static void *blistener(__maybe_unused void *arg) if (wq_item) { setnow(&now1); - process_sockd(conn, wq_item); + process_sockd(conn, wq_item, REPLIER_BTC); setnow(&now2); blis_us += us_tvdiff(&now2, &now1); blis_processed++; - } else - cksleep_ms(42); + } else { + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&wq_btc_waitlock); + ret = cond_timedwait(&wq_btc_waitcond, + &wq_btc_waitlock, &when); + if (ret == 0) + wq_btc_wakes++; + else if (errno == ETIMEDOUT) + wq_btc_timeouts++; + mutex_unlock(&wq_btc_waitlock); + } } LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed); @@ -4633,13 +5107,17 @@ static void *process_socket(void *arg) BREAKQUEUE *bq = NULL; MSGLINE *msgline = NULL; bool want_first, replied, btc, dec_sockd; - int loglevel, oldloglevel; + int loglevel, oldloglevel, ret; char reply[1024+1]; - char *ans = NULL, *rep = NULL, *tmp; + char *ans = NULL, *rep = NULL, *tmp, *st; size_t siz; + ts_t when, when_add; pthread_detach(pthread_self()); + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; + LOCK_INIT("db_procsock"); rename_proc("db_procsock"); @@ -4652,7 +5130,18 @@ static void *process_socket(void *arg) K_WUNLOCK(breakqueue_free); if (!bq_item) { - cksleep_ms(CMD_QUEUE_SLEEP); + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&process_socket_waitlock); + ret = cond_timedwait(&process_socket_waitcond, + &process_socket_waitlock, &when); + if (ret == 0) + process_socket_wakes++; + else if (errno == ETIMEDOUT) + process_socket_timeouts++; + mutex_unlock(&process_socket_waitlock); + continue; } @@ -4695,7 +5184,10 @@ static void *process_socket(void *arg) "%s.%ld.?.", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + // Just use REPLIER_CMD ... + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); break; case CMD_ALERTEVENT: case CMD_ALERTOVENT: @@ -4707,8 +5199,10 @@ static void *process_socket(void *arg) tmp = reply_event(EVENTID_NONE, reply); else tmp = reply_ovent(OVENTID_NONE, reply); - ckdb_unix_msg(bq->sockd, tmp); - FREENULL(tmp); + setnow(&(msgline->processed)); + // Just use REPLIER_CMD ... + ckdb_unix_msg(REPLIER_CMD, bq->sockd, tmp, + msgline, false); break; case CMD_TERMINATE: LOGWARNING("Listener received" @@ -4718,7 +5212,9 @@ static void *process_socket(void *arg) "%s.%ld.ok.exiting", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); everyone_die = true; break; case CMD_PING: @@ -4728,7 +5224,9 @@ static void *process_socket(void *arg) "%s.%ld.ok.pong", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); break; case CMD_VERSION: LOGDEBUG("Listener received" @@ -4738,7 +5236,9 @@ static void *process_socket(void *arg) msgline->id, bq->now.tv_sec, CKDB_VERSION); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); break; case CMD_LOGLEVEL: if (!*(msgline->id)) { @@ -4782,7 +5282,9 @@ static void *process_socket(void *arg) " %d currently %d B", loglevel, oldloglevel); } - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); break; case CMD_FLUSH: LOGDEBUG("Listener received" @@ -4790,11 +5292,13 @@ static void *process_socket(void *arg) snprintf(reply, sizeof(reply), "%s.%ld.ok.splash", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); fflush(stdout); fflush(stderr); if (global_ckp && global_ckp->logfd) fflush(global_ckp->logfp); + setnow(&(msgline->processed)); break; case CMD_USERSET: case CMD_BTCSET: @@ -4832,6 +5336,17 @@ static void *process_socket(void *arg) else k_add_tail(cmd_workqueue_store, wq_item); K_WUNLOCK(workqueue_free); + if (btc) { + mutex_lock(&wq_btc_waitlock); + wq_btc_signals++; + pthread_cond_signal(&wq_btc_waitcond); + mutex_unlock(&wq_btc_waitlock); + } else { + mutex_lock(&wq_cmd_waitlock); + wq_cmd_signals++; + pthread_cond_signal(&wq_cmd_waitcond); + mutex_unlock(&wq_cmd_waitlock); + } wq_item = bq->ml_item = NULL; break; // Process, but reject (loading) until startup_complete @@ -4855,7 +5370,9 @@ static void *process_socket(void *arg) msgline->id, bq->now.tv_sec, msgline->cmd); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, + reply, msgline, true); } else { msgline->sockd = bq->sockd; bq->sockd = -1; @@ -4872,6 +5389,17 @@ static void *process_socket(void *arg) k_add_tail(cmd_workqueue_store, wq_item); K_WUNLOCK(workqueue_free); wq_item = bq->ml_item = NULL; + if (btc) { + mutex_lock(&wq_btc_waitlock); + wq_btc_signals++; + pthread_cond_signal(&wq_btc_waitcond); + mutex_unlock(&wq_btc_waitlock); + } else { + mutex_lock(&wq_cmd_waitlock); + wq_cmd_signals++; + pthread_cond_signal(&wq_cmd_waitcond); + mutex_unlock(&wq_cmd_waitlock); + } } break; // Always process immediately: @@ -4895,14 +5423,16 @@ static void *process_socket(void *arg) inet_default, &(msgline->cd), msgline->trf_root, false); + setnow(&(msgline->processed)); siz = strlen(ans) + strlen(msgline->id) + 32; rep = malloc(siz); snprintf(rep, siz, "%s.%ld.%s", msgline->id, bq->now.tv_sec, ans); - ckdb_unix_msg(bq->sockd, rep); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_POOL, bq->sockd, rep, + msgline, false); FREENULL(ans); - FREENULL(rep); replied = true; // Always queue (ok.queued) case CMD_SHARELOG: @@ -4922,7 +5452,8 @@ static void *process_socket(void *arg) "%s.%ld.ok.queued", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + ckdb_unix_msg(REPLIER_POOL, bq->sockd, + reply, msgline, true); } K_WLOCK(workqueue_free); @@ -4957,23 +5488,30 @@ static void *process_socket(void *arg) } K_WUNLOCK(workqueue_free); wq_item = bq->ml_item = NULL; + mutex_lock(&wq_pool_waitlock); + wq_pool_signals++; + pthread_cond_signal(&wq_pool_waitcond); + mutex_unlock(&wq_pool_waitlock); break; // Code error default: LOGEMERG("%s() CODE ERROR unhandled" " message %d %.32s...", - __func__, bq->cmdnum, bq->buf); + __func__, bq->cmdnum, + st = safe_text(bq->buf)); + FREENULL(st); snprintf(reply, sizeof(reply), "%s.%ld.failed.code", msgline->id, bq->now.tv_sec); - ckdb_unix_msg(bq->sockd, reply); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, reply, + msgline, true); break; } - if (bq->sockd >= 0) { - close(bq->sockd); + if (bq->sockd >= 0) dec_sockd = true; - } else + else dec_sockd = false; if (bq->ml_item) { @@ -4999,13 +5537,14 @@ static void *process_socket(void *arg) static void *socketer(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - pthread_t clis_pt, blis_pt, proc_pt; + pthread_t clis_pt, blis_pt, proc_pt, prep_pt, crep_pt, brep_pt; + enum reply_type p_typ, c_typ, b_typ; unixsock_t *us = &pi->us; char *end, *buf = NULL; K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; int sockd; - tv_t now, now1, now2; + tv_t now, nowacc, now1, now2; pthread_detach(pthread_self()); @@ -5016,6 +5555,16 @@ static void *socketer(void *arg) cksem_mswait(&socketer_sem, 420); if (!everyone_die) { + epollfd_pool = epoll_create1(EPOLL_CLOEXEC); + epollfd_cmd = epoll_create1(EPOLL_CLOEXEC); + epollfd_btc = epoll_create1(EPOLL_CLOEXEC); + p_typ = REPLIER_POOL; + c_typ = REPLIER_CMD; + b_typ = REPLIER_BTC; + create_pthread(&prep_pt, replier, &p_typ); + create_pthread(&crep_pt, replier, &c_typ); + create_pthread(&brep_pt, replier, &b_typ); + LOGWARNING("%s() Start processing...", __func__); socketer_using_data = true; @@ -5029,15 +5578,15 @@ static void *socketer(void *arg) setnow(&sock_stt); while (!everyone_die) { setnow(&now1); - sockd = accept(us->sockd, NULL, NULL); + sockd = accept(us->sockd, NULL, SOCK_CLOEXEC); if (sockd < 0) { int e = errno; LOGERR("%s() Failed to accept on socket (%d:%s)", __func__, e, strerror(e)); break; } - setnow(&now2); - sock_us += us_tvdiff(&now2, &now1); + setnow(&nowacc); + sock_us += us_tvdiff(&nowacc, &now1); sock_acc++; setnow(&now1); @@ -5103,6 +5652,7 @@ static void *socketer(void *arg) K_WUNLOCK(breakqueue_free); DATA_BREAKQUEUE(bq, bq_item); bq->buf = buf; + copy_tv(&(bq->accepted), &nowacc); copy_tv(&(bq->now), &now); bq->seqentryflags = seqentryflags; bq->sockd = sockd; @@ -5113,11 +5663,17 @@ static void *socketer(void *arg) K_WUNLOCK(breakqueue_free); setnow(&now2); sock_lock_br_us += us_tvdiff(&now2, &now1); + + mutex_lock(&bq_cmd_waitlock); + bq_cmd_signals++; + pthread_cond_signal(&bq_cmd_waitcond); + mutex_unlock(&bq_cmd_waitlock); } } - LOGNOTICE("%s() exiting, early %"PRIu64" after %"PRIu64" (%"PRIu64")", - __func__, sock_proc_early, sock_processed - sock_proc_early, sock_processed); + LOGWARNING("%s() exiting: early=%"PRIu64" after=%"PRIu64" (%"PRIu64")", + __func__, sock_proc_early, sock_processed - sock_proc_early, + sock_processed); socketer_using_data = false; @@ -5139,9 +5695,14 @@ static void *process_reload(__maybe_unused void *arg) char *ans, *st = NULL; time_t now; uint64_t processed = 0; + ts_t when, when_add; + int ret; pthread_detach(pthread_self()); + when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; + LOCK_INIT("db_procreload"); rename_proc("db_procreload"); @@ -5162,7 +5723,18 @@ static void *process_reload(__maybe_unused void *arg) if (!reloading) break; - cksleep_ms(24); + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&process_reload_waitlock); + ret = cond_timedwait(&process_reload_waitcond, + &process_reload_waitlock, &when); + if (ret == 0) + process_reload_wakes++; + else if (errno == ETIMEDOUT) + process_reload_timeouts++; + mutex_unlock(&process_reload_waitlock); + continue; } @@ -5266,6 +5838,7 @@ static void *process_reload(__maybe_unused void *arg) msgline->trf_root, true); FREENULL(ans); } + // TODO: time stats from each msgline tv_t break; default: // Force this switch to be updated if new cmds are added @@ -5352,6 +5925,7 @@ static void reload_line(char *filename, char *buf, uint64_t count) // release the lock since strdup could be slow, but rarely DATA_BREAKQUEUE(bq, bq_item); bq->buf = strdup(buf); + copy_tv(&(bq->accepted), &now); copy_tv(&(bq->now), &now); bq->seqentryflags = SE_RELOAD; bq->sockd = -1; @@ -5363,8 +5937,13 @@ static void reload_line(char *filename, char *buf, uint64_t count) qcount = reload_breakqueue_store->count; K_WUNLOCK(breakqueue_free); + mutex_lock(&bq_reload_waitlock); + bq_reload_signals++; + pthread_cond_signal(&bq_reload_waitcond); + mutex_unlock(&bq_reload_waitlock); + while (qcount > RELOAD_QUEUE_LIMIT) { - cksleep_ms(RELOAD_QUEUE_SLEEP); + cksleep_ms(RELOAD_QUEUE_SLEEP_MS); K_RLOCK(breakqueue_free); qcount = reload_breakqueue_store->count; K_RUNLOCK(breakqueue_free); @@ -5758,9 +6337,14 @@ static void *listener(void *arg) int cpus, i; bool reloader, cmder, pool0, switch_msg = false; uint64_t proc0 = 0, proc1 = 0; + ts_t when, when_add; + int ret; pthread_detach(pthread_self()); + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; + LOCK_INIT("db_plistener"); rename_proc("db_plistener"); @@ -5941,7 +6525,17 @@ static void *listener(void *arg) if (!wq_item) { POOLINSTANCE_DATA_MSG(); - cksleep_ms(CMD_QUEUE_SLEEP); + setnowts(&when); + timeraddspec(&when, &when_add); + + mutex_lock(&wq_pool_waitlock); + ret = cond_timedwait(&wq_pool_waitcond, + &wq_pool_waitlock, &when); + if (ret == 0) + wq_pool_wakes++; + else if (errno == ETIMEDOUT) + wq_pool_timeouts++; + mutex_unlock(&wq_pool_waitlock); } } @@ -6886,10 +7480,28 @@ int main(int argc, char **argv) ckp.main.processname = strdup("main"); cklock_init(&breakdown_lock); + cklock_init(&replier_lock); cklock_init(&last_lock); cklock_init(&btc_lock); cklock_init(&poolinstance_lock); + mutex_init(&bq_reload_waitlock); + mutex_init(&bq_cmd_waitlock); + cond_init(&bq_reload_waitcond); + cond_init(&bq_cmd_waitcond); + + mutex_init(&process_reload_waitlock); + mutex_init(&process_socket_waitlock); + cond_init(&process_reload_waitcond); + cond_init(&process_socket_waitcond); + + mutex_init(&wq_pool_waitlock); + mutex_init(&wq_cmd_waitlock); + mutex_init(&wq_btc_waitlock); + cond_init(&wq_pool_waitcond); + cond_init(&wq_cmd_waitcond); + cond_init(&wq_btc_waitcond); + // Emulate a list for lock checking process_pplns_free = k_lock_only_list("ProcessPPLNS"); workers_db_free = k_lock_only_list("WorkersDB"); @@ -6917,6 +7529,7 @@ int main(int argc, char **argv) ckp.main.sockname = strdup("listener"); write_namepid(&ckp.main); create_process_unixsock(&ckp.main); + fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC); create_pthread(&ckp.pth_listener, listener, &ckp.main); @@ -6950,7 +7563,7 @@ int main(int argc, char **argv) } if (msg) { trigger = curr; - printf("%s %ds due to%s%s%s%s%s%s%s%s\n", + printf("%s %ds due to%s%s%s%s%s%s%s%s%s\n", msg, (int)(curr - start), socketer_using_data ? " socketer" : EMPTY, summariser_using_data ? " summariser" : EMPTY, @@ -6959,7 +7572,8 @@ int main(int argc, char **argv) clistener_using_data ? " clistener" : EMPTY, blistener_using_data ? " blistener" : EMPTY, marker_using_data ? " marker" : EMPTY, - breakdown_using_data ? " breakdown" : EMPTY); + breakdown_using_data ? " breakdown" : EMPTY, + replier_using_data ? " replier" : EMPTY); fflush(stdout); } sleep(1); diff --git a/src/ckdb.h b/src/ckdb.h index f7385c79..acbf681d 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -51,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.5" -#define CKDB_VERSION DB_VERSION"-2.015" +#define CKDB_VERSION DB_VERSION"-2.100" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -344,6 +345,9 @@ extern tv_t last_auth; extern cklock_t last_lock; // Running stats +// replier() +extern double reply_full_us; +extern uint64_t reply_sent, reply_cant, reply_discarded, reply_fails; // socketer() extern tv_t sock_stt; extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; @@ -1036,6 +1040,10 @@ typedef struct msgline { int which_cmds; tv_t now; tv_t cd; + tv_t accepted; // copied from breakqueue + tv_t broken; // breakdown done + tv_t processed; // not all are processed + tv_t replied; char id[ID_SIZ+1]; char cmd[CMD_SIZ+1]; char *msg; @@ -1065,7 +1073,8 @@ extern K_STORE *msgline_store; // BREAKQUEUE typedef struct breakqueue { char *buf; - tv_t now; + tv_t accepted; // socket accepted or line read + tv_t now; // msg read or line read int seqentryflags; int sockd; enum cmd_values cmdnum; @@ -1082,13 +1091,8 @@ typedef struct breakqueue { /* If a breaker() thread's done break queue count hits the LIMIT, or is empty, * it will sleep for SLEEP ms - * So this means that with a single breaker() thread, - * it can process at most LIMIT records per SLEEP ms - * or: 1000 * LIMIT / SLEEP records per second - * For N breaker() threads, that would mean between 1 and N times that value - * dependent upon the random time spacing of the N thread sleeps - * However, also note that LIMIT defines how much RAM can be used by - * the break queues, so a limit is required + * Also note that LIMIT defines how much RAM can be used by the break queues, + * so a limit is required * A breakqueue item can get quite large since it includes both buf * and ml_item (which has the transfer data) in the 'done' queue * Of course the processing speed of the ml_items will also decide how big the @@ -1101,10 +1105,10 @@ typedef struct breakqueue { * thus limiting the line processing of reload files */ #define RELOAD_QUEUE_LIMIT 16300 -#define RELOAD_QUEUE_SLEEP 42 +#define RELOAD_QUEUE_SLEEP_MS 42 // Don't really limit the cmd queue #define CMD_QUEUE_LIMIT 1048500 -#define CMD_QUEUE_SLEEP 1 +#define CMD_QUEUE_SLEEP_MS 42 extern K_LIST *breakqueue_free; extern K_STORE *reload_breakqueue_store; @@ -1118,6 +1122,26 @@ extern int cmd_processing; extern int sockd_count; extern int max_sockd_count; +// Trigger breaker() processing +extern mutex_t bq_reload_waitlock; +extern mutex_t bq_cmd_waitlock; +extern pthread_cond_t bq_reload_waitcond; +extern pthread_cond_t bq_cmd_waitcond; + +extern uint64_t bq_reload_signals, bq_cmd_signals; +extern uint64_t bq_reload_wakes, bq_cmd_wakes; +extern uint64_t bq_reload_timeouts, bq_cmd_timeouts; + +// Trigger reload/socket *_done_* processing +extern mutex_t process_reload_waitlock; +extern mutex_t process_socket_waitlock; +extern pthread_cond_t process_reload_waitcond; +extern pthread_cond_t process_socket_waitcond; + +extern uint64_t process_reload_signals, process_socket_signals; +extern uint64_t process_reload_wakes, process_socket_wakes; +extern uint64_t process_reload_timeouts, process_socket_timeouts; + // WORKQUEUE typedef struct workqueue { K_ITEM *msgline_item; @@ -1143,6 +1167,62 @@ extern int64_t earlysock_left; extern int64_t pool0_tot; extern int64_t pool0_discarded; +// Trigger workqueue threads +extern mutex_t wq_pool_waitlock; +extern mutex_t wq_cmd_waitlock; +extern mutex_t wq_btc_waitlock; +extern pthread_cond_t wq_pool_waitcond; +extern pthread_cond_t wq_cmd_waitcond; +extern pthread_cond_t wq_btc_waitcond; + +extern uint64_t wq_pool_signals, wq_cmd_signals, wq_btc_signals; +extern uint64_t wq_pool_wakes, wq_cmd_wakes, wq_btc_wakes; +extern uint64_t wq_pool_timeouts, wq_cmd_timeouts, wq_btc_timeouts; + +// REPLIES +typedef struct replies { + tv_t now; + tv_t createdate; + tv_t accepted; + tv_t broken; + tv_t processed; + int sockd; + char *reply; + struct epoll_event event; + const char *file; + const char *func; + int line; +} REPLIES; + +#define ALLOC_REPLIES 65536 +#define LIMIT_REPLIES 0 +#define INIT_REPLIES(_item) INIT_GENERIC(_item, replies) +#define DATA_REPLIES(_var, _item) DATA_GENERIC(_var, _item, replies, true) + +extern K_LIST *replies_free; +extern K_STORE *replies_store; +extern K_TREE *replies_pool_root; +extern K_TREE *replies_cmd_root; +extern K_TREE *replies_btc_root; + +// Close the socket and discard the reply, X ms after it gets in a list +#define REPLIES_LIMIT_MS 10000 + +extern int epollfd_pool; +extern int epollfd_cmd; +extern int epollfd_btc; + +extern int rep_tot_sockd; +extern int rep_failed_sockd; +extern int rep_max_sockd; +// maximum counts and fd values +extern int rep_max_pool_sockd; +extern int rep_max_cmd_sockd; +extern int rep_max_btc_sockd; +extern int rep_max_pool_sockd_fd; +extern int rep_max_cmd_sockd_fd; +extern int rep_max_btc_sockd_fd; + // HEARTBEATQUEUE typedef struct heartbeatqueue { char workername[TXT_BIG+1]; @@ -2676,10 +2756,14 @@ extern K_TREE *userinfo_root; extern K_LIST *userinfo_free; extern K_STORE *userinfo_store; +enum reply_type { + REPLIER_POOL, + REPLIER_CMD, + REPLIER_BTC +}; + extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); -extern void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS); -#define ckdb_unix_msg(_sockd, _msg) _ckdb_unix_msg(_sockd, _msg, WHERE_FFL_HERE) extern void tick(); extern PGconn *dbconnect(); extern void sequence_report(bool lock); @@ -2823,6 +2907,7 @@ extern void workerstatus_ready(); _workerstatus_update(_auths, _shares, _userstats, WHERE_FFL_HERE) extern void _workerstatus_update(AUTHS *auths, SHARES *shares, USERSTATS *userstats, WHERE_FFL_ARGS); +extern cmp_t cmp_replies(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_users(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userid(K_ITEM *a, K_ITEM *b); extern K_ITEM *find_users(char *username); diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index ec840ebf..438f90e7 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -1289,8 +1289,8 @@ static char *cmd_blocklist(__maybe_unused PGconn *conn, char *cmd, char *id, ovent = ovents_add(OVENTID_BLOCKS, trf_root); if (ovent != OVENT_OK) { - snprintf(tmp, sizeof(tmp), "ERR"); - return reply_ovent(ovent, tmp); + snprintf(reply, sizeof(reply), "ERR"); + return reply_ovent(ovent, reply); } maxrows = sys_setting(BLOCKS_SETTING_NAME, BLOCKS_DEFAULT, now); @@ -6788,7 +6788,7 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, char buf[256]; int relq_count, _reload_processing, relqd_count; int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; - int pool0_count, poolq_count; + int pool0_count, poolq_count, rep_max_fd; int64_t _earlysock_left, _pool0_discarded, _pool0_tot; uint64_t count1, count2, count3, count4; double tot1, tot2; @@ -6814,8 +6814,8 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, poolq_count = pool_workqueue_store->count; K_RUNLOCK(workqueue_free); - LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%"PRId64 - " pool0=%d/%"PRId64"/%"PRId64" poolq=%d max_sockd=%d", + LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64 + " pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d", relq_count, _reload_processing, relqd_count, cmdq_count, _cmd_processing, cmdqd_count, _earlysock_left, @@ -6826,8 +6826,10 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, count2 = sock_recv ? : 1; count3 = sock_proc_early ? : 1; count4 = sock_processed ? : 1; - LOGWARNING(" sock: tot %f sock %f/%"PRIu64"/%f recv %f/%"PRIu64"/%f " - "lckw %f/%"PRIu64"/%f lckb %f/%"PRIu64"/%f", + LOGWARNING(" sock: t%fs sock t%fs/t%"PRIu64"/av%fs" + " recv t%fs/t%"PRIu64"/av%fs" + " lckw t%fs/t%"PRIu64"/av%fs" + " lckb t%fs/t%"PRIu64"/av%fs", us_tvdiff(now, &sock_stt)/1000000, sock_us/1000000, sock_acc, (sock_us/count1)/1000000, sock_recv_us/1000000, sock_recv, @@ -6851,16 +6853,54 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, tot2 = us_tvdiff(now, &break_cmd_stt); count1 = break_reload_processed ? : 1; count2 = break_cmd_processed ? : 1; - LOGWARNING(" break reload: %f/%"PRIu64"/%f cmd: %f/%"PRIu64"/%f", + LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs " + "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: t%fs/t%"PRIu64"/av%fs " + "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t", tot1/1000000, break_reload_processed, (tot1/count1)/1000000, - tot2/1000000, break_cmd_processed, (tot2/count2)/1000000); + bq_reload_signals, bq_reload_wakes, bq_reload_timeouts, + tot2/1000000, break_cmd_processed, (tot2/count2)/1000000, + bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts); + + LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", + process_reload_signals, process_reload_wakes, + process_reload_timeouts, + process_socket_signals, process_socket_wakes, + process_socket_timeouts); + + LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", + wq_pool_signals, wq_pool_wakes, wq_pool_timeouts, + wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts, + wq_btc_signals, wq_btc_wakes, wq_btc_timeouts); count1 = clis_processed ? : 1; count2 = blis_processed ? : 1; - LOGWARNING(" clistener %f/%"PRIu64"/%f blistener: %f/%"PRIu64"/%f", + LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs" + " blistener: t%fs/t%"PRIu64"/av%fs", clis_us/1000000, clis_processed, (clis_us/count1)/1000000, blis_us/1000000, blis_processed, (blis_us/count2)/1000000); + rep_max_fd = rep_max_pool_sockd_fd; + if (rep_max_fd < rep_max_cmd_sockd_fd) + rep_max_fd = rep_max_cmd_sockd_fd; + if (rep_max_fd < rep_max_btc_sockd_fd) + rep_max_fd = rep_max_btc_sockd_fd; + LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd" + " btc ^%d/^%dfd", + rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd, + rep_max_pool_sockd, rep_max_pool_sockd_fd, + rep_max_cmd_sockd, rep_max_cmd_sockd_fd, + rep_max_btc_sockd, rep_max_btc_sockd_fd); + + count1 = reply_sent ? : 1; + LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64 + "/t%fs/av%fs", + reply_sent, reply_cant, reply_discarded, reply_fails, + reply_full_us/1000000, (reply_full_us/count1)/1000000); + snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf); return strdup(buf); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 0743748c..7928938b 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -1146,6 +1146,16 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares, } } +/* default tree order by now asc + * now is guaranteed unique since it's acquired under exclusive lock */ +cmp_t cmp_replies(K_ITEM *a, K_ITEM *b) +{ + REPLIES *ra, *rb; + DATA_REPLIES(ra, a); + DATA_REPLIES(rb, b); + return CMP_TV(ra->now, rb->now); +} + // default tree order by username asc,expirydate desc cmp_t cmp_users(K_ITEM *a, K_ITEM *b) { @@ -4991,6 +5001,10 @@ static size_t tmfsiz = sizeof(tmf); // includes null static char tma[] = "Too many accesses, come back later"; static size_t tmasiz = sizeof(tma); // includes null +/* This always returns a reply that needs to be freed + * fre says if buf was malloced + * i.e. fre means buf needs to be freed if it is not returned + * and !fre means we need to strdup buf, if we need to return it */ char *_reply_event(bool is_event, int event, char *buf, bool fre) { size_t len;