From 8fae6546c558ac4ac14839f6634805b219d4fe68 Mon Sep 17 00:00:00 2001 From: kanoi Date: Mon, 29 Aug 2016 17:32:02 +1000 Subject: [PATCH] ckdb - have separate sockets for data and make one main allocation point --- pool/socket.php | 2 +- src/ckdb.c | 517 ++++++++++++++++++++++++++++++++++-------------- src/ckdb.h | 30 +-- src/ckdb_cmd.c | 120 +---------- src/ckdb_data.c | 4 +- 5 files changed, 379 insertions(+), 294 deletions(-) diff --git a/pool/socket.php b/pool/socket.php index a8107494..01c2c122 100644 --- a/pool/socket.php +++ b/pool/socket.php @@ -82,7 +82,7 @@ function _getsock($fun, $port, $tmo, $unix=true) # function getsock($fun, $tmo) { - return _getsock($fun, '/opt/ckdb/listener', $tmo); + return _getsock($fun, '/opt/ckdb/listenerweb', $tmo); } # function readsockline($fun, $socket) diff --git a/src/ckdb.c b/src/ckdb.c index 52994d67..96d7afbe 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -107,7 +107,27 @@ * message */ -static bool socketer_using_data; +// sockets +static ckpool_t ckp, ckpweb, ckpcmd; +static int accesspool, accessweb, accesscmd; +static const char *ispool = "pool"; +static const char *isweb = "web"; +static const char *iscmd = "cmd"; +#define SOCKISPOOL(_name) (_name == ispool) +#define SOCKISWEB(_name) (_name == isweb) +#define SOCKISCMD(_name) (_name == iscmd) +#define POOLSOCK 0 +#define WEBSOCK 1 +#define CMDSOCK 2 +#define MAXSOCK 3 +#define SOCKNAME(_n) (((_n) == POOLSOCK) ? ispool : \ + (((_n) == WEBSOCK) ? isweb : \ + (((_n) == CMDSOCK) ? iscmd : "?"))) +#define SOCKNUM(_name) (SOCKISPOOL(_name) ? POOLSOCK : \ + (SOCKISWEB(_name) ? WEBSOCK : \ + (SOCKISCMD(_name) ? CMDSOCK : MAXSOCK))) + +static bool socksetup_using_data; static bool summariser_using_data; static bool marker_using_data; static bool logger_using_data; @@ -349,25 +369,27 @@ cklock_t last_lock; // Running stats // replier() -double reply_full_us; -uint64_t reply_sent, reply_cant, reply_discarded, reply_fails; -// socketer() -tv_t sock_stt; -double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; -uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv; +static double reply_full_us; +static uint64_t reply_sent, reply_cant, reply_discarded, reply_fails; +// sockrun() +static tv_t sock_stt[MAXSOCK]; +static double sock_us[MAXSOCK], sock_recv_us[MAXSOCK]; +static double sock_lock_wq_us[MAXSOCK], sock_lock_br_us[MAXSOCK]; +static uint64_t sock_proc_early[MAXSOCK], sock_processed[MAXSOCK]; +static uint64_t sock_acc[MAXSOCK], sock_recv[MAXSOCK]; // breaker() summarised -tv_t break_reload_stt, break_cmd_stt, break_reload_fin; -uint64_t break_reload_processed, break_cmd_processed; +static tv_t break_reload_stt, break_cmd_stt, break_reload_fin; +static uint64_t break_reload_processed, break_cmd_processed; // clistener() -double clis_us; -uint64_t clis_processed; +static double clis_us; +static uint64_t clis_processed; // blistener() -double blis_us; -uint64_t blis_processed; +static double blis_us; +static uint64_t blis_processed; static cklock_t fpm_lock; static char *first_pool_message; -static sem_t socketer_sem; +static sem_t socksetup_sem; // command called for any ckdb alerts char *ckdb_alert_cmd = NULL; @@ -1211,6 +1233,129 @@ void setnow(tv_t *now) now->tv_usec = spec.tv_nsec / 1000; } +void status_report(tv_t *now) +{ + char ooo_buf[256]; + int relq_count, _reload_processing, relqd_count; + int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; + int pool0_count, poolq_count, rep_max_fd, i; + int64_t _earlysock_left, _pool0_discarded, _pool0_tot; + uint64_t count1, count2, count3, count4; + double tot1, tot2; + + LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); + + K_RLOCK(breakqueue_free); + relq_count = reload_breakqueue_store->count; + _reload_processing = reload_processing; + relqd_count = reload_done_breakqueue_store->count; + cmdq_count = cmd_breakqueue_store->count; + _cmd_processing = cmd_processing; + cmdqd_count = cmd_done_breakqueue_store->count; + _max_sockd_count = max_sockd_count; + K_RUNLOCK(breakqueue_free); + + K_RLOCK(workqueue_free); + _earlysock_left = earlysock_left; + pool0_count = pool0_workqueue_store->count; + _pool0_discarded = pool0_discarded; + _pool0_tot = pool0_tot; + poolq_count = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); + + LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64 + " pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d", + relq_count, _reload_processing, relqd_count, + cmdq_count, _cmd_processing, cmdqd_count, + _earlysock_left, + pool0_count, _pool0_discarded, _pool0_tot, + poolq_count, _max_sockd_count); + + for (i = 0; i < MAXSOCK; i++) { + count1 = sock_acc[i] ? : 1; + count2 = sock_recv[i] ? : 1; + count3 = sock_proc_early[i] ? : 1; + count4 = sock_processed[i] ? : 1; + LOGWARNING(" %s sock: t%fs sock t%fs/t%"PRIu64"/av%fs" + " recv t%fs/t%"PRIu64"/av%fs" + " lckw t%fs/t%"PRIu64"/av%fs" + " lckb t%fs/t%"PRIu64"/av%fs", + SOCKNAME(i), tvdiff(now, &(sock_stt[i])), + sock_us[i]/1000000, sock_acc[i], + (sock_us[i]/count1)/1000000, + sock_recv_us[i]/1000000, sock_recv[i], + (sock_recv_us[i]/count2)/1000000, + sock_lock_wq_us[i]/1000000, sock_proc_early[i], + (sock_lock_wq_us[i]/count3)/1000000, + sock_lock_br_us[i]/1000000, sock_processed[i], + (sock_lock_br_us[i]/count4)/1000000); + } + + if (!break_reload_stt.tv_sec) + tot1 = 0; + else { + if (!break_reload_fin.tv_sec) + tot1 = tvdiff(now, &break_reload_stt); + else + tot1 = tvdiff(&break_reload_fin, &break_reload_stt); + } + if (!break_cmd_stt.tv_sec) + tot2 = 0; + else + tot2 = tvdiff(now, &break_cmd_stt); + count1 = break_reload_processed ? : 1; + count2 = break_cmd_processed ? : 1; + LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs " + "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: t%fs/t%"PRIu64"/av%fs " + "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t", + tot1, break_reload_processed, tot1/count1, + bq_reload_signals, bq_reload_wakes, bq_reload_timeouts, + tot2, break_cmd_processed, tot2/count2, + bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts); + + LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", + process_reload_signals, process_reload_wakes, + process_reload_timeouts, + process_socket_signals, process_socket_wakes, + process_socket_timeouts); + + LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" + " btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", + wq_pool_signals, wq_pool_wakes, wq_pool_timeouts, + wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts, + wq_btc_signals, wq_btc_wakes, wq_btc_timeouts); + + count1 = clis_processed ? : 1; + count2 = blis_processed ? : 1; + LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs" + " blistener: t%fs/t%"PRIu64"/av%fs", + clis_us/1000000, clis_processed, (clis_us/count1)/1000000, + blis_us/1000000, blis_processed, (blis_us/count2)/1000000); + + rep_max_fd = rep_max_pool_sockd_fd; + if (rep_max_fd < rep_max_cmd_sockd_fd) + rep_max_fd = rep_max_cmd_sockd_fd; + if (rep_max_fd < rep_max_btc_sockd_fd) + rep_max_fd = rep_max_btc_sockd_fd; + LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd" + " btc ^%d/^%dfd", + rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd, + rep_max_pool_sockd, rep_max_pool_sockd_fd, + rep_max_cmd_sockd, rep_max_cmd_sockd_fd, + rep_max_btc_sockd, rep_max_btc_sockd_fd); + + count1 = reply_sent ? : 1; + LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64 + "/t%fs/av%fs", + reply_sent, reply_cant, reply_discarded, reply_fails, + reply_full_us/1000000, (reply_full_us/count1)/1000000); + +} + /* Limits are all +/-1s since on the live machine all were well within that * TODO: not thread safe */ static void check_createdate_ccl(char *cmd, tv_t *cd) @@ -1702,11 +1847,41 @@ static void clean_up(ckpool_t *ckp) fclose(ckp->logfp); } +// Allocate all but ioqueue here static void alloc_storage() { size_t len; int seq; + // Emulate a list for lock checking + process_pplns_free = k_lock_only_list("ProcessPPLNS"); + workers_db_free = k_lock_only_list("WorkersDB"); + users_db_free = k_lock_only_list("UsersDB"); + event_limits_free = k_lock_only_list("EventLimits"); + +#if LOCK_CHECK + DLPRIO(process_pplns, 99); + DLPRIO(workers_db, 98); + DLPRIO(users_db, 97); + DLPRIO(event_limits, 46); // events-2 +#endif + + logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), + ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); + logqueue_store = k_new_store(logqueue_free); + + breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), + ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); + reload_breakqueue_store = k_new_store(breakqueue_free); + reload_done_breakqueue_store = k_new_store(breakqueue_free); + cmd_breakqueue_store = k_new_store(breakqueue_free); + cmd_done_breakqueue_store = k_new_store(breakqueue_free); + +#if LOCK_CHECK + DLPRIO(logqueue, 94); + DLPRIO(breakqueue, PRIO_TERMINAL); +#endif + seqset_free = k_new_list("SeqSet", sizeof(SEQSET), ALLOC_SEQSET, LIMIT_SEQSET, true); seqset_store = k_new_store(seqset_free); @@ -2403,9 +2578,6 @@ static bool setup_data() tv_t db_stt, db_fin, rel_stt, rel_fin; double min, sec; - cklock_init(&fpm_lock); - cksem_init(&socketer_sem); - LOGWARNING("%sSequence processing is %s", ignore_seq ? "ALERT: " : "", ignore_seq ? "Off" : "On"); @@ -2418,15 +2590,13 @@ static bool setup_data() LOGWARNING("Workinfo transaction storage is %s", txn_tree_store ? "On" : "Off"); - alloc_storage(); - setnow(&db_stt); if (!getdata1() || everyone_die) return false; db_users_complete = true; - cksem_post(&socketer_sem); + cksem_post(&socksetup_sem); if (!getdata2() || everyone_die) return false; @@ -3722,7 +3892,7 @@ static void setup_seq(K_ITEM *seqall, MSGLINE *msgline) } static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, - int seqentryflags) + int seqentryflags, char *source, int access) { char reply[1024] = ""; TRANSFER *transfer; @@ -3776,11 +3946,26 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, goto nogood; } + /* If you want to manually replay a log file with ckpmsg, + * you can ignore the access failed items by skipping items + * that start with a capital, since all (currently) are lower case + * however, command checks are case insensitive, so replaying + * the file will allow these commands, if they are present */ + if ((ckdb_cmds[msgline->which_cmds].access & access) == 0) + buf[0] = toupper(buf[0]); + if (ckdb_cmds[msgline->which_cmds].access & ACCESS_POOL) LOGQUE(buf, true); else LOGQUE(buf, false); + if ((ckdb_cmds[msgline->which_cmds].access & access) == 0) { + LOGERR("Command disallowed for %s: '%.42s...", + source, st2 = safe_text(buf)); + FREENULL(st2); + goto nogood; + } + if (noid) { if (ckdb_cmds[msgline->which_cmds].noid) { free(cmdptr); @@ -4323,7 +4508,8 @@ static void *breaker(void *arg) } } - bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags); + bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), + bq->seqentryflags, bq->source, bq->access); DATA_MSGLINE(msgline, bq->ml_item); setnow(&(msgline->broken)); copy_tv(&(msgline->accepted), &(bq->accepted)); @@ -5824,9 +6010,8 @@ static void *blistener(__maybe_unused void *arg) return NULL; } -static void *process_socket(void *arg) +static void *process_socket(__maybe_unused void *arg) { - proc_instance_t *pi = (proc_instance_t *)arg; K_ITEM *bq_item = NULL, *wq_item = NULL; WORKQUEUE *workqueue = NULL; BREAKQUEUE *bq = NULL; @@ -6002,15 +6187,15 @@ static void *process_socket(void *arg) if (!*(msgline->id)) { LOGDEBUG("Listener received" " loglevel, currently %d", - pi->ckp->loglevel); + ckp.loglevel); snprintf(reply, sizeof(reply), "%s.%ld.ok.loglevel" " currently %d", msgline->id, bq->now.tv_sec, - pi->ckp->loglevel); + ckp.loglevel); } else { - oldloglevel = pi->ckp->loglevel; + oldloglevel = ckp.loglevel; loglevel = atoi(msgline->id); LOGDEBUG("Listener received loglevel" " %d currently %d A", @@ -6026,13 +6211,13 @@ static void *process_socket(void *arg) loglevel, oldloglevel); } else { - pi->ckp->loglevel = loglevel; + ckp.loglevel = loglevel; snprintf(reply, sizeof(reply), "%s.%ld.ok.loglevel" " now %d - was %d", msgline->id, bq->now.tv_sec, - pi->ckp->loglevel, + ckp.loglevel, oldloglevel); } // Do this twice since the loglevel may have changed @@ -6058,7 +6243,7 @@ static void *process_socket(void *arg) fflush(global_ckp->logfp); if (*(msgline->id)) { // If you set the flush id to 2 - if(atoi(msgline->id) == 2) + if (atoi(msgline->id) == 2) ioqueue_flush = true; } setnow(&(msgline->processed)); @@ -6301,52 +6486,31 @@ skippy: return NULL; } -static void *socketer(void *arg) +static void *sockrun(void *arg) { - proc_instance_t *pi = (proc_instance_t *)arg; - pthread_t clis_pt, blis_pt, proc_pt, prep_pt, crep_pt, brep_pt; - enum reply_type p_typ, c_typ, b_typ; - unixsock_t *us = &pi->us; + ckpool_t *this = (ckpool_t *)arg; + unixsock_t *us = &(this->main.us); char *end, *buf = NULL; K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; - int ret, sockd; + int ret, sockd, thissock; fd_set rfds; - char nbuf[128]; + char *name = (char *)(this->gdata); + char nbuf[64]; tv_t now, nowacc, now1, now2, tmo; - pthread_detach(pthread_self()); + thissock = SOCKNUM(name); + if (thissock == MAXSOCK) { + quithere(1, "thread started with invalid ckpool_t %p %p", + this, name); + } - snprintf(nbuf, sizeof(nbuf), "db%s_%s", dbcode, __func__); + snprintf(nbuf, sizeof(nbuf), "db%s_%c%s", dbcode, name[0], __func__); LOCK_INIT(nbuf); rename_proc(nbuf); - while (!everyone_die && !db_users_complete) - cksem_mswait(&socketer_sem, 420); - - if (!everyone_die) { - epollfd_pool = epoll_create1(EPOLL_CLOEXEC); - epollfd_cmd = epoll_create1(EPOLL_CLOEXEC); - epollfd_btc = epoll_create1(EPOLL_CLOEXEC); - p_typ = REPLIER_POOL; - c_typ = REPLIER_CMD; - b_typ = REPLIER_BTC; - create_pthread(&prep_pt, replier, &p_typ); - create_pthread(&crep_pt, replier, &c_typ); - create_pthread(&brep_pt, replier, &b_typ); - - LOGWARNING("%s() Start processing...", __func__); - socketer_using_data = true; - - create_pthread(&clis_pt, clistener, NULL); - - create_pthread(&blis_pt, blistener, NULL); - - create_pthread(&proc_pt, process_socket, arg); - } - ret = 0; - setnow(&sock_stt); + setnow(&sock_stt[thissock]); while (!everyone_die) { setnow(&now1); while (!everyone_die) { @@ -6359,32 +6523,36 @@ static void *socketer(void *arg) break; if (ret < 0) { int e = errno; - LOGERR("%s() Failed to select on socket (%d:%s)", - __func__, e, strerror(e)); + LOGERR("%s() Failed to select on %s socket " + "(%d:%s)", + __func__, name, e, strerror(e)); break; } } - // Timeout exit on no input (or error) - if (everyone_die || ret < 0) + // If one fails, stop everything + if (ret < 0) + everyone_die = true; + + if (everyone_die) break; sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { int e = errno; - LOGERR("%s() Failed to accept on socket (%d:%s)", - __func__, e, strerror(e)); + LOGERR("%s() Failed to accept on %s socket (%d:%s)", + __func__, name, e, strerror(e)); break; } setnow(&nowacc); - sock_us += us_tvdiff(&nowacc, &now1); - sock_acc++; + sock_us[thissock] += us_tvdiff(&nowacc, &now1); + sock_acc[thissock]++; setnow(&now1); buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2); // Once we've read the message setnow(&now); - sock_recv_us += us_tvdiff(&now, &now1); - sock_recv++; + sock_recv_us[thissock] += us_tvdiff(&now, &now1); + sock_recv[thissock]++; if (buf) { end = buf + strlen(buf) - 1; // strip trailing \n and \r @@ -6393,10 +6561,12 @@ static void *socketer(void *arg) } if (!buf || !*buf) { // An empty message wont get a reply - if (!buf) - LOGWARNING("%s() Failed to get message", __func__); - else { - LOGWARNING("%s() Empty message", __func__); + if (!buf) { + LOGWARNING("%s() Failed to get %s message", + __func__, name); + } else { + LOGWARNING("%s() Empty %s message", + __func__, name); free(buf); } } else { @@ -6408,9 +6578,9 @@ static void *socketer(void *arg) K_WLOCK(workqueue_free); earlysock_left++; K_WUNLOCK(workqueue_free); - sock_proc_early++; setnow(&now2); - sock_lock_wq_us += us_tvdiff(&now2, &now1); + sock_proc_early[thissock]++; + sock_lock_wq_us[thissock] += us_tvdiff(&now2, &now1); } if (SEQALL_LOG) { @@ -6434,7 +6604,7 @@ static void *socketer(void *arg) } } - sock_processed++; + sock_processed[thissock]++; // Don't limit the speed filling up cmd_breakqueue_store setnow(&now1); K_WLOCK(breakqueue_free); @@ -6442,6 +6612,8 @@ static void *socketer(void *arg) K_WUNLOCK(breakqueue_free); DATA_BREAKQUEUE(bq, bq_item); bq->buf = buf; + bq->source = (char *)(this->gdata); + bq->access = *(int *)(this->cdata); copy_tv(&(bq->accepted), &nowacc); copy_tv(&(bq->now), &now); bq->seqentryflags = seqentryflags; @@ -6452,7 +6624,7 @@ static void *socketer(void *arg) k_add_tail(cmd_breakqueue_store, bq_item); K_WUNLOCK(breakqueue_free); setnow(&now2); - sock_lock_br_us += us_tvdiff(&now2, &now1); + sock_lock_br_us[thissock] += us_tvdiff(&now2, &now1); mutex_lock(&bq_cmd_waitlock); bq_cmd_signals++; @@ -6461,15 +6633,66 @@ static void *socketer(void *arg) } } - LOGWARNING("%s() exiting: early=%"PRIu64" after=%"PRIu64" (%"PRIu64")", - __func__, sock_proc_early, sock_processed - sock_proc_early, - sock_processed); + close_unix_socket(us->sockd, us->path); - socketer_using_data = false; + LOGWARNING("%s() %s exiting: early=%"PRIu64" after=%"PRIu64 + " (%"PRIu64")", + __func__, name, sock_proc_early[thissock], + sock_processed[thissock] - sock_proc_early[thissock], + sock_processed[thissock]); - close_unix_socket(us->sockd, us->path); + return NULL; +} + +static void *socksetup(__maybe_unused void *arg) +{ + pthread_t prep_pt, crep_pt, brep_pt; + enum reply_type p_typ, c_typ, b_typ; + pthread_t clis_pt, blis_pt, proc_pt; + pthread_t psock_pt, wsock_pt, csock_pt; + char nbuf[64]; + + pthread_detach(pthread_self()); + + snprintf(nbuf, sizeof(nbuf), "db%s_%s", dbcode, __func__); + LOCK_INIT(nbuf); + rename_proc(nbuf); + + while (!everyone_die && !db_users_complete) + cksem_mswait(&socksetup_sem, 420); + + if (!everyone_die) { + epollfd_pool = epoll_create1(EPOLL_CLOEXEC); + epollfd_cmd = epoll_create1(EPOLL_CLOEXEC); + epollfd_btc = epoll_create1(EPOLL_CLOEXEC); + p_typ = REPLIER_POOL; + c_typ = REPLIER_CMD; + b_typ = REPLIER_BTC; + create_pthread(&prep_pt, replier, &p_typ); + create_pthread(&crep_pt, replier, &c_typ); + create_pthread(&brep_pt, replier, &b_typ); + + LOGWARNING("%s() Start processing...", __func__); + socksetup_using_data = true; + + create_pthread(&clis_pt, clistener, NULL); + + create_pthread(&blis_pt, blistener, NULL); + + create_pthread(&proc_pt, process_socket, NULL); - // Since the socket is dead ... + create_pthread(&psock_pt, sockrun, &ckp); + create_pthread(&wsock_pt, sockrun, &ckpweb); + create_pthread(&csock_pt, sockrun, &ckpcmd); + + join_pthread(psock_pt); + join_pthread(wsock_pt); + join_pthread(csock_pt); + } + + socksetup_using_data = false; + + // Since the sockets are dead ... everyone_die = true; return NULL; @@ -6828,6 +7051,9 @@ static void reload_line(char *filename, char *buf, uint64_t count) // release the lock since strdup could be slow, but rarely DATA_BREAKQUEUE(bq, bq_item); bq->buf = strdup(buf); + // reloads are all pool data + bq->source = (char *)ispool; + bq->access = ACCESS_POOL; copy_tv(&(bq->accepted), &now); copy_tv(&(bq->now), &now); bq->seqentryflags = SE_RELOAD; @@ -6965,7 +7191,6 @@ static bool reload_from(tv_t *start, const tv_t *finish) reloading = true; copy_tv(&reload_timestamp, start); - // Go back further - one reload file reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S; tv_to_buf(start, buf, sizeof(buf)); @@ -7497,7 +7722,7 @@ static void *pqproc(void *arg) return NULL; } -static void *listener(void *arg) +static void *listener(__maybe_unused void *arg) { pthread_t log_pt; pthread_t sock_pt; @@ -7517,21 +7742,6 @@ static void *listener(void *arg) LOCK_INIT(buf); rename_proc(buf); - logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), - ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); - logqueue_store = k_new_store(logqueue_free); - - breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), - ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); - reload_breakqueue_store = k_new_store(breakqueue_free); - reload_done_breakqueue_store = k_new_store(breakqueue_free); - cmd_breakqueue_store = k_new_store(breakqueue_free); - cmd_done_breakqueue_store = k_new_store(breakqueue_free); - -#if LOCK_CHECK - DLPRIO(logqueue, 94); - DLPRIO(breakqueue, PRIO_TERMINAL); -#endif if (reload_breakdown_threads <= 0) { cpus = sysconf(_SC_NPROCESSORS_ONLN); if (cpus < 1) @@ -7556,7 +7766,7 @@ static void *listener(void *arg) create_pthread(&log_pt, logger, NULL); if (!confirm_sharesummary) - create_pthread(&sock_pt, socketer, arg); + create_pthread(&sock_pt, socksetup, NULL); create_pthread(&summ_pt, summariser, NULL); @@ -7897,7 +8107,7 @@ static void update_check(int64_t markerid_stt, int64_t markerid_fin) LOGWARNING("update complete %.0fm %.3fs", min, sec); } -static void update_keysummary(ckpool_t *ckp) +static void update_keysummary() { int64_t markerid_stt, markerid_fin; char *tmp, *minus; @@ -7957,17 +8167,6 @@ static void update_keysummary(ckpool_t *ckp) LOCK_INIT("dbk_updater"); rename_proc("dbk_updater"); - breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), - ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); - reload_breakqueue_store = k_new_store(breakqueue_free); - reload_done_breakqueue_store = k_new_store(breakqueue_free); - // Must exist (but will be empty) - cmd_breakqueue_store = k_new_store(breakqueue_free); - cmd_done_breakqueue_store = k_new_store(breakqueue_free); - -#if LOCK_CHECK - DLPRIO(breakqueue, PRIO_TERMINAL); -#endif if (reload_breakdown_threads <= 0) { cpus = sysconf(_SC_NPROCESSORS_ONLN); if (cpus < 1) @@ -7989,13 +8188,11 @@ static void update_keysummary(ckpool_t *ckp) cmder.thread = 0; create_pthread(&cmd_break_pt, breaker, &cmder); - alloc_storage(); - workmarkers_key_store = k_new_store(workmarkers_free); setnow(&db_stt); - create_pthread(&sock_pt, socketer, &(ckp->main)); + create_pthread(&sock_pt, socksetup, NULL); if (!getdata1() || everyone_die) return; @@ -8540,21 +8737,11 @@ static void confirm_summaries() } } - logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), - ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); - logqueue_store = k_new_store(logqueue_free); - -#if LOCK_CHECK - DLPRIO(logqueue, 94); -#endif - create_pthread(&log_pt, logger, NULL); LOCK_INIT("dby_confirmer"); rename_proc("dby_confirmer"); - alloc_storage(); - if (!getdata1()) { LOGEMERG("%s() ABORTING from getdata1()", __func__); return; @@ -8659,8 +8846,7 @@ int main(int argc, char **argv) char *btc_user = "user"; char *btc_pass = "p"; pthread_t f_iomsgs_pt, c_iomsgs_pt; - char buf[512]; - ckpool_t ckp; + char buf[512], lbuf[64]; int c, ret, i = 0, j; size_t len; char *kill; @@ -8677,6 +8863,8 @@ int main(int argc, char **argv) global_ckp = &ckp; memset(&ckp, 0, sizeof(ckp)); + memset(&ckpweb, 0, sizeof(ckp)); + memset(&ckpcmd, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { @@ -9034,6 +9222,29 @@ int main(int argc, char **argv) ckp.main.ckp = &ckp; ckp.main.processname = strdup(process_name); + snprintf(lbuf, sizeof(lbuf), "%s%s", dbcode, listener_base); + ckp.main.sockname = strdup(lbuf); + accesspool = ACCESS_POOL; + ckp.cdata = &accesspool; + ckp.gdata = (void *)ispool; + + ckpweb.main.ckp = &ckpweb; + ckpweb.name = strdup(ckp.name); + ckpweb.socket_dir = strdup(ckp.socket_dir); + snprintf(lbuf, sizeof(lbuf), "%s%sweb", dbcode, listener_base); + ckpweb.main.sockname = strdup(lbuf); + accessweb = ACCESS_WEB; + ckpweb.cdata = &accessweb; + ckpweb.gdata = (void *)isweb; + + ckpcmd.main.ckp = &ckpcmd; + ckpcmd.name = strdup(ckp.name); + ckpcmd.socket_dir = strdup(ckp.socket_dir); + snprintf(lbuf, sizeof(lbuf), "%s%scmd", dbcode, listener_base); + ckpcmd.main.sockname = strdup(lbuf); + accesscmd = ACCESS_ALL; + ckpcmd.cdata = &accesscmd; + ckpcmd.gdata = (void *)iscmd; cklock_init(&breakdown_lock); cklock_init(&replier_lock); @@ -9062,6 +9273,9 @@ int main(int argc, char **argv) mutex_init(&f_ioqueue_waitlock); cond_init(&f_ioqueue_waitcond); + cklock_init(&fpm_lock); + cksem_init(&socksetup_sem); + // Initialise IOQUEUE before anything needs it ioqueue_free = k_new_list("IOQueue", sizeof(IOQUEUE), ALLOC_IOQUEUE, LIMIT_IOQUEUE, true); @@ -9076,18 +9290,7 @@ int main(int argc, char **argv) bool consol = true; create_pthread(&c_iomsgs_pt, iomsgs, &consol); - // Emulate a list for lock checking - process_pplns_free = k_lock_only_list("ProcessPPLNS"); - workers_db_free = k_lock_only_list("WorkersDB"); - users_db_free = k_lock_only_list("UsersDB"); - event_limits_free = k_lock_only_list("EventLimits"); - -#if LOCK_CHECK - DLPRIO(process_pplns, 99); - DLPRIO(workers_db, 98); - DLPRIO(users_db, 97); - DLPRIO(event_limits, 46); // events-2 -#endif + alloc_storage(); // set initial value o_limits_max_lifetime = -1; @@ -9098,26 +9301,34 @@ int main(int argc, char **argv) } if (key_update) { - char buf[64]; - snprintf(buf, sizeof(buf), "k%s", listener_base); - ckp.main.sockname = strdup(buf); write_namepid(&ckp.main); create_process_unixsock(&ckp.main); fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC); - update_keysummary(&ckp); + create_process_unixsock(&ckpweb.main); + fcntl(ckpweb.main.us.sockd, F_SETFD, FD_CLOEXEC); + + create_process_unixsock(&ckpcmd.main); + fcntl(ckpcmd.main.us.sockd, F_SETFD, FD_CLOEXEC); + + update_keysummary(); everyone_die = true; } else if (confirm_sharesummary) { // TODO: add a system lock to stop running 2 at once? confirm_summaries(); everyone_die = true; } else { - ckp.main.sockname = strdup(listener_base); write_namepid(&ckp.main); create_process_unixsock(&ckp.main); fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC); - create_pthread(&ckp.pth_listener, listener, &ckp.main); + create_process_unixsock(&ckpweb.main); + fcntl(ckpweb.main.us.sockd, F_SETFD, FD_CLOEXEC); + + create_process_unixsock(&ckpcmd.main); + fcntl(ckpcmd.main.us.sockd, F_SETFD, FD_CLOEXEC); + + create_pthread(&ckp.pth_listener, listener, NULL); handler.sa_handler = sighandler; handler.sa_flags = 0; @@ -9134,7 +9345,7 @@ int main(int argc, char **argv) everyone_die = true; trigger = start = time(NULL); - while (socketer_using_data || summariser_using_data || + while (socksetup_using_data || summariser_using_data || logger_using_data || plistener_using_data || clistener_using_data || blistener_using_data || marker_using_data || breakdown_using_data) { @@ -9152,7 +9363,7 @@ int main(int argc, char **argv) snprintf(buf, sizeof(buf), "%s %ds due to%s%s%s%s%s%s%s%s%s\n", msg, (int)(curr - start), - socketer_using_data ? " socketer" : EMPTY, + socksetup_using_data ? " socksetup" : EMPTY, summariser_using_data ? " summariser" : EMPTY, logger_using_data ? " logger" : EMPTY, plistener_using_data ? " plistener" : EMPTY, diff --git a/src/ckdb.h b/src/ckdb.h index bf623a9c..0e81d0c4 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.420" +#define CKDB_VERSION DB_VERSION"-2.430" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -371,24 +371,6 @@ extern tv_t last_share_inv; extern tv_t last_auth; extern cklock_t last_lock; -// Running stats -// replier() -extern double reply_full_us; -extern uint64_t reply_sent, reply_cant, reply_discarded, reply_fails; -// socketer() -extern tv_t sock_stt; -extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; -extern uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv; -// breaker() summarised -extern tv_t break_reload_stt, break_cmd_stt, break_reload_fin; -extern uint64_t break_reload_processed, break_cmd_processed; -// clistener() -extern double clis_us; -extern uint64_t clis_processed; -// blistener() -extern double blis_us; -extern uint64_t blis_processed; - #define JSON_TRANSFER "json=" #define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1) #define JSON_BEGIN '{' @@ -1184,6 +1166,8 @@ extern K_STORE *msgline_store; // BREAKQUEUE typedef struct breakqueue { char *buf; + char *source; + int access; tv_t accepted; // socket accepted or line read tv_t now; // msg read or line read int seqentryflags; @@ -2964,6 +2948,7 @@ enum reply_type { extern void logmsg(int loglevel, const char *fmt, ...); extern void setnowts(ts_t *now); extern void setnow(tv_t *now); +extern void status_report(tv_t *now); extern void tick(); extern PGconn *dbconnect(); extern void sequence_report(bool lock); @@ -3089,7 +3074,9 @@ extern char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS); extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS); extern void dsp_transfer(K_ITEM *item, FILE *stream); extern cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b); -extern K_ITEM *find_transfer(K_TREE *trf_root, char *name); +#define find_transfer(_trf_root, _name) \ + _find_transfer(_trf_root, _name, WHERE_FFL_HERE) +extern K_ITEM *_find_transfer(K_TREE *trf_root, char *name, WHERE_FFL_ARGS); #define optional_name(_root, _name, _len, _patt, _reply, _siz) \ _optional_name(_root, _name, _len, _patt, _reply, _siz, \ WHERE_FFL_HERE) @@ -3555,12 +3542,13 @@ extern bool check_db_version(PGconn *conn); // *** ckdb_cmd.c // *** -// TODO: limit access by having seperate sockets for each #define ACCESS_POOL (1 << 0) #define ACCESS_SYSTEM (1 << 1) #define ACCESS_WEB (1 << 2) #define ACCESS_PROXY (1 << 3) #define ACCESS_CKDB (1 << 4) +#define ACCESS_ALL (ACCESS_POOL | ACCESS_SYSTEM | ACCESS_WEB | ACCESS_PROXY | \ + ACCESS_CKDB) struct CMDS { enum cmd_values cmd_val; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index fed39d11..f76beb36 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -6821,8 +6821,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, return(buf); } -/* Show a share status report on the console - * Currently: sequence status, OoO info and max_sockd_count */ +// Show a status report on the console static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, @@ -6830,122 +6829,9 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused K_TREE *trf_root, __maybe_unused bool reload_data) { - char ooo_buf[256]; char buf[256]; - int relq_count, _reload_processing, relqd_count; - int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; - int pool0_count, poolq_count, rep_max_fd; - int64_t _earlysock_left, _pool0_discarded, _pool0_tot; - uint64_t count1, count2, count3, count4; - double tot1, tot2; - - LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); - sequence_report(true); - - K_RLOCK(breakqueue_free); - relq_count = reload_breakqueue_store->count; - _reload_processing = reload_processing; - relqd_count = reload_done_breakqueue_store->count; - cmdq_count = cmd_breakqueue_store->count; - _cmd_processing = cmd_processing; - cmdqd_count = cmd_done_breakqueue_store->count; - _max_sockd_count = max_sockd_count; - K_RUNLOCK(breakqueue_free); - - K_RLOCK(workqueue_free); - _earlysock_left = earlysock_left; - pool0_count = pool0_workqueue_store->count; - _pool0_discarded = pool0_discarded; - _pool0_tot = pool0_tot; - poolq_count = pool_workqueue_store->count; - K_RUNLOCK(workqueue_free); - - LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64 - " pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d", - relq_count, _reload_processing, relqd_count, - cmdq_count, _cmd_processing, cmdqd_count, - _earlysock_left, - pool0_count, _pool0_discarded, _pool0_tot, - poolq_count, _max_sockd_count); - - count1 = sock_acc ? : 1; - count2 = sock_recv ? : 1; - count3 = sock_proc_early ? : 1; - count4 = sock_processed ? : 1; - LOGWARNING(" sock: t%fs sock t%fs/t%"PRIu64"/av%fs" - " recv t%fs/t%"PRIu64"/av%fs" - " lckw t%fs/t%"PRIu64"/av%fs" - " lckb t%fs/t%"PRIu64"/av%fs", - tvdiff(now, &sock_stt), - sock_us/1000000, sock_acc, (sock_us/count1)/1000000, - sock_recv_us/1000000, sock_recv, - (sock_recv_us/count2)/1000000, - sock_lock_wq_us/1000000, sock_proc_early, - (sock_lock_wq_us/count3)/1000000, - sock_lock_br_us/1000000, sock_processed, - (sock_lock_br_us/count4)/1000000); - - if (!break_reload_stt.tv_sec) - tot1 = 0; - else { - if (!break_reload_fin.tv_sec) - tot1 = tvdiff(now, &break_reload_stt); - else - tot1 = tvdiff(&break_reload_fin, &break_reload_stt); - } - if (!break_cmd_stt.tv_sec) - tot2 = 0; - else - tot2 = tvdiff(now, &break_cmd_stt); - count1 = break_reload_processed ? : 1; - count2 = break_cmd_processed ? : 1; - LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs " - "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t" - " cmd: t%fs/t%"PRIu64"/av%fs " - "%"PRIu64"s/%"PRIu64"w/%"PRIu64"t", - tot1, break_reload_processed, tot1/count1, - bq_reload_signals, bq_reload_wakes, bq_reload_timeouts, - tot2, break_cmd_processed, tot2/count2, - bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts); - - LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" - " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", - process_reload_signals, process_reload_wakes, - process_reload_timeouts, - process_socket_signals, process_socket_wakes, - process_socket_timeouts); - - LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" - " cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t" - " btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t", - wq_pool_signals, wq_pool_wakes, wq_pool_timeouts, - wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts, - wq_btc_signals, wq_btc_wakes, wq_btc_timeouts); - - count1 = clis_processed ? : 1; - count2 = blis_processed ? : 1; - LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs" - " blistener: t%fs/t%"PRIu64"/av%fs", - clis_us/1000000, clis_processed, (clis_us/count1)/1000000, - blis_us/1000000, blis_processed, (blis_us/count2)/1000000); - - rep_max_fd = rep_max_pool_sockd_fd; - if (rep_max_fd < rep_max_cmd_sockd_fd) - rep_max_fd = rep_max_cmd_sockd_fd; - if (rep_max_fd < rep_max_btc_sockd_fd) - rep_max_fd = rep_max_btc_sockd_fd; - LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd" - " btc ^%d/^%dfd", - rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd, - rep_max_pool_sockd, rep_max_pool_sockd_fd, - rep_max_cmd_sockd, rep_max_cmd_sockd_fd, - rep_max_btc_sockd, rep_max_btc_sockd_fd); - - count1 = reply_sent ? : 1; - LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64 - "/t%fs/av%fs", - reply_sent, reply_cant, reply_discarded, reply_fails, - reply_full_us/1000000, (reply_full_us/count1)/1000000); + + status_report(now); snprintf(buf, sizeof(buf), "ok.%s", cmd); LOGDEBUG("%s.%s", id, buf); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 84cb34f2..bc95a1c1 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -756,7 +756,7 @@ cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b) return CMP_STR(ta->name, tb->name); } -K_ITEM *find_transfer(K_TREE *trf_root, char *name) +K_ITEM *_find_transfer(K_TREE *trf_root, char *name, WHERE_FFL_ARGS) { TRANSFER transfer; K_TREE_CTX ctx[1]; @@ -766,7 +766,7 @@ K_ITEM *find_transfer(K_TREE *trf_root, char *name) INIT_TRANSFER(&look); look.data = (void *)(&transfer); // trf_root stores aren't shared - return find_in_ktree_nolock(trf_root, &look, ctx); + return _find_in_ktree(trf_root, &look, ctx, false, WHERE_FFL_PASS); } K_ITEM *_optional_name(K_TREE *trf_root, char *name, int len, char *patt,