Browse Source

ckdb - have separate sockets for data and make one main allocation point

master
kanoi 8 years ago
parent
commit
8fae6546c5
  1. 2
      pool/socket.php
  2. 517
      src/ckdb.c
  3. 30
      src/ckdb.h
  4. 120
      src/ckdb_cmd.c
  5. 4
      src/ckdb_data.c

2
pool/socket.php

@ -82,7 +82,7 @@ function _getsock($fun, $port, $tmo, $unix=true)
#
function getsock($fun, $tmo)
{
return _getsock($fun, '/opt/ckdb/listener', $tmo);
return _getsock($fun, '/opt/ckdb/listenerweb', $tmo);
}
#
function readsockline($fun, $socket)

517
src/ckdb.c

@ -107,7 +107,27 @@
* message
*/
static bool socketer_using_data;
// sockets
static ckpool_t ckp, ckpweb, ckpcmd;
static int accesspool, accessweb, accesscmd;
static const char *ispool = "pool";
static const char *isweb = "web";
static const char *iscmd = "cmd";
#define SOCKISPOOL(_name) (_name == ispool)
#define SOCKISWEB(_name) (_name == isweb)
#define SOCKISCMD(_name) (_name == iscmd)
#define POOLSOCK 0
#define WEBSOCK 1
#define CMDSOCK 2
#define MAXSOCK 3
#define SOCKNAME(_n) (((_n) == POOLSOCK) ? ispool : \
(((_n) == WEBSOCK) ? isweb : \
(((_n) == CMDSOCK) ? iscmd : "?")))
#define SOCKNUM(_name) (SOCKISPOOL(_name) ? POOLSOCK : \
(SOCKISWEB(_name) ? WEBSOCK : \
(SOCKISCMD(_name) ? CMDSOCK : MAXSOCK)))
static bool socksetup_using_data;
static bool summariser_using_data;
static bool marker_using_data;
static bool logger_using_data;
@ -349,25 +369,27 @@ cklock_t last_lock;
// Running stats
// replier()
double reply_full_us;
uint64_t reply_sent, reply_cant, reply_discarded, reply_fails;
// socketer()
tv_t sock_stt;
double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us;
uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv;
static double reply_full_us;
static uint64_t reply_sent, reply_cant, reply_discarded, reply_fails;
// sockrun()
static tv_t sock_stt[MAXSOCK];
static double sock_us[MAXSOCK], sock_recv_us[MAXSOCK];
static double sock_lock_wq_us[MAXSOCK], sock_lock_br_us[MAXSOCK];
static uint64_t sock_proc_early[MAXSOCK], sock_processed[MAXSOCK];
static uint64_t sock_acc[MAXSOCK], sock_recv[MAXSOCK];
// breaker() summarised
tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
uint64_t break_reload_processed, break_cmd_processed;
static tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
static uint64_t break_reload_processed, break_cmd_processed;
// clistener()
double clis_us;
uint64_t clis_processed;
static double clis_us;
static uint64_t clis_processed;
// blistener()
double blis_us;
uint64_t blis_processed;
static double blis_us;
static uint64_t blis_processed;
static cklock_t fpm_lock;
static char *first_pool_message;
static sem_t socketer_sem;
static sem_t socksetup_sem;
// command called for any ckdb alerts
char *ckdb_alert_cmd = NULL;
@ -1211,6 +1233,129 @@ void setnow(tv_t *now)
now->tv_usec = spec.tv_nsec / 1000;
}
void status_report(tv_t *now)
{
char ooo_buf[256];
int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int pool0_count, poolq_count, rep_max_fd, i;
int64_t _earlysock_left, _pool0_discarded, _pool0_tot;
uint64_t count1, count2, count3, count4;
double tot1, tot2;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
K_RLOCK(breakqueue_free);
relq_count = reload_breakqueue_store->count;
_reload_processing = reload_processing;
relqd_count = reload_done_breakqueue_store->count;
cmdq_count = cmd_breakqueue_store->count;
_cmd_processing = cmd_processing;
cmdqd_count = cmd_done_breakqueue_store->count;
_max_sockd_count = max_sockd_count;
K_RUNLOCK(breakqueue_free);
K_RLOCK(workqueue_free);
_earlysock_left = earlysock_left;
pool0_count = pool0_workqueue_store->count;
_pool0_discarded = pool0_discarded;
_pool0_tot = pool0_tot;
poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64
" pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d",
relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count,
_earlysock_left,
pool0_count, _pool0_discarded, _pool0_tot,
poolq_count, _max_sockd_count);
for (i = 0; i < MAXSOCK; i++) {
count1 = sock_acc[i] ? : 1;
count2 = sock_recv[i] ? : 1;
count3 = sock_proc_early[i] ? : 1;
count4 = sock_processed[i] ? : 1;
LOGWARNING(" %s sock: t%fs sock t%fs/t%"PRIu64"/av%fs"
" recv t%fs/t%"PRIu64"/av%fs"
" lckw t%fs/t%"PRIu64"/av%fs"
" lckb t%fs/t%"PRIu64"/av%fs",
SOCKNAME(i), tvdiff(now, &(sock_stt[i])),
sock_us[i]/1000000, sock_acc[i],
(sock_us[i]/count1)/1000000,
sock_recv_us[i]/1000000, sock_recv[i],
(sock_recv_us[i]/count2)/1000000,
sock_lock_wq_us[i]/1000000, sock_proc_early[i],
(sock_lock_wq_us[i]/count3)/1000000,
sock_lock_br_us[i]/1000000, sock_processed[i],
(sock_lock_br_us[i]/count4)/1000000);
}
if (!break_reload_stt.tv_sec)
tot1 = 0;
else {
if (!break_reload_fin.tv_sec)
tot1 = tvdiff(now, &break_reload_stt);
else
tot1 = tvdiff(&break_reload_fin, &break_reload_stt);
}
if (!break_cmd_stt.tv_sec)
tot2 = 0;
else
tot2 = tvdiff(now, &break_cmd_stt);
count1 = break_reload_processed ? : 1;
count2 = break_cmd_processed ? : 1;
LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
tot1, break_reload_processed, tot1/count1,
bq_reload_signals, bq_reload_wakes, bq_reload_timeouts,
tot2, break_cmd_processed, tot2/count2,
bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts);
LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
process_reload_signals, process_reload_wakes,
process_reload_timeouts,
process_socket_signals, process_socket_wakes,
process_socket_timeouts);
LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
wq_pool_signals, wq_pool_wakes, wq_pool_timeouts,
wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts,
wq_btc_signals, wq_btc_wakes, wq_btc_timeouts);
count1 = clis_processed ? : 1;
count2 = blis_processed ? : 1;
LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs"
" blistener: t%fs/t%"PRIu64"/av%fs",
clis_us/1000000, clis_processed, (clis_us/count1)/1000000,
blis_us/1000000, blis_processed, (blis_us/count2)/1000000);
rep_max_fd = rep_max_pool_sockd_fd;
if (rep_max_fd < rep_max_cmd_sockd_fd)
rep_max_fd = rep_max_cmd_sockd_fd;
if (rep_max_fd < rep_max_btc_sockd_fd)
rep_max_fd = rep_max_btc_sockd_fd;
LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd"
" btc ^%d/^%dfd",
rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd,
rep_max_pool_sockd, rep_max_pool_sockd_fd,
rep_max_cmd_sockd, rep_max_cmd_sockd_fd,
rep_max_btc_sockd, rep_max_btc_sockd_fd);
count1 = reply_sent ? : 1;
LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64
"/t%fs/av%fs",
reply_sent, reply_cant, reply_discarded, reply_fails,
reply_full_us/1000000, (reply_full_us/count1)/1000000);
}
/* Limits are all +/-1s since on the live machine all were well within that
* TODO: not thread safe */
static void check_createdate_ccl(char *cmd, tv_t *cd)
@ -1702,11 +1847,41 @@ static void clean_up(ckpool_t *ckp)
fclose(ckp->logfp);
}
// Allocate all but ioqueue here
static void alloc_storage()
{
size_t len;
int seq;
// Emulate a list for lock checking
process_pplns_free = k_lock_only_list("ProcessPPLNS");
workers_db_free = k_lock_only_list("WorkersDB");
users_db_free = k_lock_only_list("UsersDB");
event_limits_free = k_lock_only_list("EventLimits");
#if LOCK_CHECK
DLPRIO(process_pplns, 99);
DLPRIO(workers_db, 98);
DLPRIO(users_db, 97);
DLPRIO(event_limits, 46); // events-2
#endif
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(logqueue, 94);
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
seqset_free = k_new_list("SeqSet", sizeof(SEQSET),
ALLOC_SEQSET, LIMIT_SEQSET, true);
seqset_store = k_new_store(seqset_free);
@ -2403,9 +2578,6 @@ static bool setup_data()
tv_t db_stt, db_fin, rel_stt, rel_fin;
double min, sec;
cklock_init(&fpm_lock);
cksem_init(&socketer_sem);
LOGWARNING("%sSequence processing is %s",
ignore_seq ? "ALERT: " : "",
ignore_seq ? "Off" : "On");
@ -2418,15 +2590,13 @@ static bool setup_data()
LOGWARNING("Workinfo transaction storage is %s",
txn_tree_store ? "On" : "Off");
alloc_storage();
setnow(&db_stt);
if (!getdata1() || everyone_die)
return false;
db_users_complete = true;
cksem_post(&socketer_sem);
cksem_post(&socksetup_sem);
if (!getdata2() || everyone_die)
return false;
@ -3722,7 +3892,7 @@ static void setup_seq(K_ITEM *seqall, MSGLINE *msgline)
}
static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
int seqentryflags)
int seqentryflags, char *source, int access)
{
char reply[1024] = "";
TRANSFER *transfer;
@ -3776,11 +3946,26 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
goto nogood;
}
/* If you want to manually replay a log file with ckpmsg,
* you can ignore the access failed items by skipping items
* that start with a capital, since all (currently) are lower case
* however, command checks are case insensitive, so replaying
* the file will allow these commands, if they are present */
if ((ckdb_cmds[msgline->which_cmds].access & access) == 0)
buf[0] = toupper(buf[0]);
if (ckdb_cmds[msgline->which_cmds].access & ACCESS_POOL)
LOGQUE(buf, true);
else
LOGQUE(buf, false);
if ((ckdb_cmds[msgline->which_cmds].access & access) == 0) {
LOGERR("Command disallowed for %s: '%.42s...",
source, st2 = safe_text(buf));
FREENULL(st2);
goto nogood;
}
if (noid) {
if (ckdb_cmds[msgline->which_cmds].noid) {
free(cmdptr);
@ -4323,7 +4508,8 @@ static void *breaker(void *arg)
}
}
bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now), bq->seqentryflags);
bq->cmdnum = breakdown(&(bq->ml_item), bq->buf, &(bq->now),
bq->seqentryflags, bq->source, bq->access);
DATA_MSGLINE(msgline, bq->ml_item);
setnow(&(msgline->broken));
copy_tv(&(msgline->accepted), &(bq->accepted));
@ -5824,9 +6010,8 @@ static void *blistener(__maybe_unused void *arg)
return NULL;
}
static void *process_socket(void *arg)
static void *process_socket(__maybe_unused void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
K_ITEM *bq_item = NULL, *wq_item = NULL;
WORKQUEUE *workqueue = NULL;
BREAKQUEUE *bq = NULL;
@ -6002,15 +6187,15 @@ static void *process_socket(void *arg)
if (!*(msgline->id)) {
LOGDEBUG("Listener received"
" loglevel, currently %d",
pi->ckp->loglevel);
ckp.loglevel);
snprintf(reply, sizeof(reply),
"%s.%ld.ok.loglevel"
" currently %d",
msgline->id,
bq->now.tv_sec,
pi->ckp->loglevel);
ckp.loglevel);
} else {
oldloglevel = pi->ckp->loglevel;
oldloglevel = ckp.loglevel;
loglevel = atoi(msgline->id);
LOGDEBUG("Listener received loglevel"
" %d currently %d A",
@ -6026,13 +6211,13 @@ static void *process_socket(void *arg)
loglevel,
oldloglevel);
} else {
pi->ckp->loglevel = loglevel;
ckp.loglevel = loglevel;
snprintf(reply, sizeof(reply),
"%s.%ld.ok.loglevel"
" now %d - was %d",
msgline->id,
bq->now.tv_sec,
pi->ckp->loglevel,
ckp.loglevel,
oldloglevel);
}
// Do this twice since the loglevel may have changed
@ -6058,7 +6243,7 @@ static void *process_socket(void *arg)
fflush(global_ckp->logfp);
if (*(msgline->id)) {
// If you set the flush id to 2
if(atoi(msgline->id) == 2)
if (atoi(msgline->id) == 2)
ioqueue_flush = true;
}
setnow(&(msgline->processed));
@ -6301,52 +6486,31 @@ skippy:
return NULL;
}
static void *socketer(void *arg)
static void *sockrun(void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t clis_pt, blis_pt, proc_pt, prep_pt, crep_pt, brep_pt;
enum reply_type p_typ, c_typ, b_typ;
unixsock_t *us = &pi->us;
ckpool_t *this = (ckpool_t *)arg;
unixsock_t *us = &(this->main.us);
char *end, *buf = NULL;
K_ITEM *bq_item = NULL;
BREAKQUEUE *bq = NULL;
int ret, sockd;
int ret, sockd, thissock;
fd_set rfds;
char nbuf[128];
char *name = (char *)(this->gdata);
char nbuf[64];
tv_t now, nowacc, now1, now2, tmo;
pthread_detach(pthread_self());
thissock = SOCKNUM(name);
if (thissock == MAXSOCK) {
quithere(1, "thread started with invalid ckpool_t %p %p",
this, name);
}
snprintf(nbuf, sizeof(nbuf), "db%s_%s", dbcode, __func__);
snprintf(nbuf, sizeof(nbuf), "db%s_%c%s", dbcode, name[0], __func__);
LOCK_INIT(nbuf);
rename_proc(nbuf);
while (!everyone_die && !db_users_complete)
cksem_mswait(&socketer_sem, 420);
if (!everyone_die) {
epollfd_pool = epoll_create1(EPOLL_CLOEXEC);
epollfd_cmd = epoll_create1(EPOLL_CLOEXEC);
epollfd_btc = epoll_create1(EPOLL_CLOEXEC);
p_typ = REPLIER_POOL;
c_typ = REPLIER_CMD;
b_typ = REPLIER_BTC;
create_pthread(&prep_pt, replier, &p_typ);
create_pthread(&crep_pt, replier, &c_typ);
create_pthread(&brep_pt, replier, &b_typ);
LOGWARNING("%s() Start processing...", __func__);
socketer_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
create_pthread(&blis_pt, blistener, NULL);
create_pthread(&proc_pt, process_socket, arg);
}
ret = 0;
setnow(&sock_stt);
setnow(&sock_stt[thissock]);
while (!everyone_die) {
setnow(&now1);
while (!everyone_die) {
@ -6359,32 +6523,36 @@ static void *socketer(void *arg)
break;
if (ret < 0) {
int e = errno;
LOGERR("%s() Failed to select on socket (%d:%s)",
__func__, e, strerror(e));
LOGERR("%s() Failed to select on %s socket "
"(%d:%s)",
__func__, name, e, strerror(e));
break;
}
}
// Timeout exit on no input (or error)
if (everyone_die || ret < 0)
// If one fails, stop everything
if (ret < 0)
everyone_die = true;
if (everyone_die)
break;
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
int e = errno;
LOGERR("%s() Failed to accept on socket (%d:%s)",
__func__, e, strerror(e));
LOGERR("%s() Failed to accept on %s socket (%d:%s)",
__func__, name, e, strerror(e));
break;
}
setnow(&nowacc);
sock_us += us_tvdiff(&nowacc, &now1);
sock_acc++;
sock_us[thissock] += us_tvdiff(&nowacc, &now1);
sock_acc[thissock]++;
setnow(&now1);
buf = recv_unix_msg_tmo2(sockd, RECV_UNIX_TIMEOUT1, RECV_UNIX_TIMEOUT2);
// Once we've read the message
setnow(&now);
sock_recv_us += us_tvdiff(&now, &now1);
sock_recv++;
sock_recv_us[thissock] += us_tvdiff(&now, &now1);
sock_recv[thissock]++;
if (buf) {
end = buf + strlen(buf) - 1;
// strip trailing \n and \r
@ -6393,10 +6561,12 @@ static void *socketer(void *arg)
}
if (!buf || !*buf) {
// An empty message wont get a reply
if (!buf)
LOGWARNING("%s() Failed to get message", __func__);
else {
LOGWARNING("%s() Empty message", __func__);
if (!buf) {
LOGWARNING("%s() Failed to get %s message",
__func__, name);
} else {
LOGWARNING("%s() Empty %s message",
__func__, name);
free(buf);
}
} else {
@ -6408,9 +6578,9 @@ static void *socketer(void *arg)
K_WLOCK(workqueue_free);
earlysock_left++;
K_WUNLOCK(workqueue_free);
sock_proc_early++;
setnow(&now2);
sock_lock_wq_us += us_tvdiff(&now2, &now1);
sock_proc_early[thissock]++;
sock_lock_wq_us[thissock] += us_tvdiff(&now2, &now1);
}
if (SEQALL_LOG) {
@ -6434,7 +6604,7 @@ static void *socketer(void *arg)
}
}
sock_processed++;
sock_processed[thissock]++;
// Don't limit the speed filling up cmd_breakqueue_store
setnow(&now1);
K_WLOCK(breakqueue_free);
@ -6442,6 +6612,8 @@ static void *socketer(void *arg)
K_WUNLOCK(breakqueue_free);
DATA_BREAKQUEUE(bq, bq_item);
bq->buf = buf;
bq->source = (char *)(this->gdata);
bq->access = *(int *)(this->cdata);
copy_tv(&(bq->accepted), &nowacc);
copy_tv(&(bq->now), &now);
bq->seqentryflags = seqentryflags;
@ -6452,7 +6624,7 @@ static void *socketer(void *arg)
k_add_tail(cmd_breakqueue_store, bq_item);
K_WUNLOCK(breakqueue_free);
setnow(&now2);
sock_lock_br_us += us_tvdiff(&now2, &now1);
sock_lock_br_us[thissock] += us_tvdiff(&now2, &now1);
mutex_lock(&bq_cmd_waitlock);
bq_cmd_signals++;
@ -6461,15 +6633,66 @@ static void *socketer(void *arg)
}
}
LOGWARNING("%s() exiting: early=%"PRIu64" after=%"PRIu64" (%"PRIu64")",
__func__, sock_proc_early, sock_processed - sock_proc_early,
sock_processed);
close_unix_socket(us->sockd, us->path);
socketer_using_data = false;
LOGWARNING("%s() %s exiting: early=%"PRIu64" after=%"PRIu64
" (%"PRIu64")",
__func__, name, sock_proc_early[thissock],
sock_processed[thissock] - sock_proc_early[thissock],
sock_processed[thissock]);
close_unix_socket(us->sockd, us->path);
return NULL;
}
static void *socksetup(__maybe_unused void *arg)
{
pthread_t prep_pt, crep_pt, brep_pt;
enum reply_type p_typ, c_typ, b_typ;
pthread_t clis_pt, blis_pt, proc_pt;
pthread_t psock_pt, wsock_pt, csock_pt;
char nbuf[64];
pthread_detach(pthread_self());
snprintf(nbuf, sizeof(nbuf), "db%s_%s", dbcode, __func__);
LOCK_INIT(nbuf);
rename_proc(nbuf);
while (!everyone_die && !db_users_complete)
cksem_mswait(&socksetup_sem, 420);
if (!everyone_die) {
epollfd_pool = epoll_create1(EPOLL_CLOEXEC);
epollfd_cmd = epoll_create1(EPOLL_CLOEXEC);
epollfd_btc = epoll_create1(EPOLL_CLOEXEC);
p_typ = REPLIER_POOL;
c_typ = REPLIER_CMD;
b_typ = REPLIER_BTC;
create_pthread(&prep_pt, replier, &p_typ);
create_pthread(&crep_pt, replier, &c_typ);
create_pthread(&brep_pt, replier, &b_typ);
LOGWARNING("%s() Start processing...", __func__);
socksetup_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
create_pthread(&blis_pt, blistener, NULL);
create_pthread(&proc_pt, process_socket, NULL);
// Since the socket is dead ...
create_pthread(&psock_pt, sockrun, &ckp);
create_pthread(&wsock_pt, sockrun, &ckpweb);
create_pthread(&csock_pt, sockrun, &ckpcmd);
join_pthread(psock_pt);
join_pthread(wsock_pt);
join_pthread(csock_pt);
}
socksetup_using_data = false;
// Since the sockets are dead ...
everyone_die = true;
return NULL;
@ -6828,6 +7051,9 @@ static void reload_line(char *filename, char *buf, uint64_t count)
// release the lock since strdup could be slow, but rarely
DATA_BREAKQUEUE(bq, bq_item);
bq->buf = strdup(buf);
// reloads are all pool data
bq->source = (char *)ispool;
bq->access = ACCESS_POOL;
copy_tv(&(bq->accepted), &now);
copy_tv(&(bq->now), &now);
bq->seqentryflags = SE_RELOAD;
@ -6965,7 +7191,6 @@ static bool reload_from(tv_t *start, const tv_t *finish)
reloading = true;
copy_tv(&reload_timestamp, start);
// Go back further - one reload file
reload_timestamp.tv_sec -= reload_timestamp.tv_sec % ROLL_S;
tv_to_buf(start, buf, sizeof(buf));
@ -7497,7 +7722,7 @@ static void *pqproc(void *arg)
return NULL;
}
static void *listener(void *arg)
static void *listener(__maybe_unused void *arg)
{
pthread_t log_pt;
pthread_t sock_pt;
@ -7517,21 +7742,6 @@ static void *listener(void *arg)
LOCK_INIT(buf);
rename_proc(buf);
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(logqueue, 94);
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
if (reload_breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (cpus < 1)
@ -7556,7 +7766,7 @@ static void *listener(void *arg)
create_pthread(&log_pt, logger, NULL);
if (!confirm_sharesummary)
create_pthread(&sock_pt, socketer, arg);
create_pthread(&sock_pt, socksetup, NULL);
create_pthread(&summ_pt, summariser, NULL);
@ -7897,7 +8107,7 @@ static void update_check(int64_t markerid_stt, int64_t markerid_fin)
LOGWARNING("update complete %.0fm %.3fs", min, sec);
}
static void update_keysummary(ckpool_t *ckp)
static void update_keysummary()
{
int64_t markerid_stt, markerid_fin;
char *tmp, *minus;
@ -7957,17 +8167,6 @@ static void update_keysummary(ckpool_t *ckp)
LOCK_INIT("dbk_updater");
rename_proc("dbk_updater");
breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE),
ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true);
reload_breakqueue_store = k_new_store(breakqueue_free);
reload_done_breakqueue_store = k_new_store(breakqueue_free);
// Must exist (but will be empty)
cmd_breakqueue_store = k_new_store(breakqueue_free);
cmd_done_breakqueue_store = k_new_store(breakqueue_free);
#if LOCK_CHECK
DLPRIO(breakqueue, PRIO_TERMINAL);
#endif
if (reload_breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (cpus < 1)
@ -7989,13 +8188,11 @@ static void update_keysummary(ckpool_t *ckp)
cmder.thread = 0;
create_pthread(&cmd_break_pt, breaker, &cmder);
alloc_storage();
workmarkers_key_store = k_new_store(workmarkers_free);
setnow(&db_stt);
create_pthread(&sock_pt, socketer, &(ckp->main));
create_pthread(&sock_pt, socksetup, NULL);
if (!getdata1() || everyone_die)
return;
@ -8540,21 +8737,11 @@ static void confirm_summaries()
}
}
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
logqueue_store = k_new_store(logqueue_free);
#if LOCK_CHECK
DLPRIO(logqueue, 94);
#endif
create_pthread(&log_pt, logger, NULL);
LOCK_INIT("dby_confirmer");
rename_proc("dby_confirmer");
alloc_storage();
if (!getdata1()) {
LOGEMERG("%s() ABORTING from getdata1()", __func__);
return;
@ -8659,8 +8846,7 @@ int main(int argc, char **argv)
char *btc_user = "user";
char *btc_pass = "p";
pthread_t f_iomsgs_pt, c_iomsgs_pt;
char buf[512];
ckpool_t ckp;
char buf[512], lbuf[64];
int c, ret, i = 0, j;
size_t len;
char *kill;
@ -8677,6 +8863,8 @@ int main(int argc, char **argv)
global_ckp = &ckp;
memset(&ckp, 0, sizeof(ckp));
memset(&ckpweb, 0, sizeof(ckp));
memset(&ckpcmd, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) {
@ -9034,6 +9222,29 @@ int main(int argc, char **argv)
ckp.main.ckp = &ckp;
ckp.main.processname = strdup(process_name);
snprintf(lbuf, sizeof(lbuf), "%s%s", dbcode, listener_base);
ckp.main.sockname = strdup(lbuf);
accesspool = ACCESS_POOL;
ckp.cdata = &accesspool;
ckp.gdata = (void *)ispool;
ckpweb.main.ckp = &ckpweb;
ckpweb.name = strdup(ckp.name);
ckpweb.socket_dir = strdup(ckp.socket_dir);
snprintf(lbuf, sizeof(lbuf), "%s%sweb", dbcode, listener_base);
ckpweb.main.sockname = strdup(lbuf);
accessweb = ACCESS_WEB;
ckpweb.cdata = &accessweb;
ckpweb.gdata = (void *)isweb;
ckpcmd.main.ckp = &ckpcmd;
ckpcmd.name = strdup(ckp.name);
ckpcmd.socket_dir = strdup(ckp.socket_dir);
snprintf(lbuf, sizeof(lbuf), "%s%scmd", dbcode, listener_base);
ckpcmd.main.sockname = strdup(lbuf);
accesscmd = ACCESS_ALL;
ckpcmd.cdata = &accesscmd;
ckpcmd.gdata = (void *)iscmd;
cklock_init(&breakdown_lock);
cklock_init(&replier_lock);
@ -9062,6 +9273,9 @@ int main(int argc, char **argv)
mutex_init(&f_ioqueue_waitlock);
cond_init(&f_ioqueue_waitcond);
cklock_init(&fpm_lock);
cksem_init(&socksetup_sem);
// Initialise IOQUEUE before anything needs it
ioqueue_free = k_new_list("IOQueue", sizeof(IOQUEUE),
ALLOC_IOQUEUE, LIMIT_IOQUEUE, true);
@ -9076,18 +9290,7 @@ int main(int argc, char **argv)
bool consol = true;
create_pthread(&c_iomsgs_pt, iomsgs, &consol);
// Emulate a list for lock checking
process_pplns_free = k_lock_only_list("ProcessPPLNS");
workers_db_free = k_lock_only_list("WorkersDB");
users_db_free = k_lock_only_list("UsersDB");
event_limits_free = k_lock_only_list("EventLimits");
#if LOCK_CHECK
DLPRIO(process_pplns, 99);
DLPRIO(workers_db, 98);
DLPRIO(users_db, 97);
DLPRIO(event_limits, 46); // events-2
#endif
alloc_storage();
// set initial value
o_limits_max_lifetime = -1;
@ -9098,26 +9301,34 @@ int main(int argc, char **argv)
}
if (key_update) {
char buf[64];
snprintf(buf, sizeof(buf), "k%s", listener_base);
ckp.main.sockname = strdup(buf);
write_namepid(&ckp.main);
create_process_unixsock(&ckp.main);
fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC);
update_keysummary(&ckp);
create_process_unixsock(&ckpweb.main);
fcntl(ckpweb.main.us.sockd, F_SETFD, FD_CLOEXEC);
create_process_unixsock(&ckpcmd.main);
fcntl(ckpcmd.main.us.sockd, F_SETFD, FD_CLOEXEC);
update_keysummary();
everyone_die = true;
} else if (confirm_sharesummary) {
// TODO: add a system lock to stop running 2 at once?
confirm_summaries();
everyone_die = true;
} else {
ckp.main.sockname = strdup(listener_base);
write_namepid(&ckp.main);
create_process_unixsock(&ckp.main);
fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC);
create_pthread(&ckp.pth_listener, listener, &ckp.main);
create_process_unixsock(&ckpweb.main);
fcntl(ckpweb.main.us.sockd, F_SETFD, FD_CLOEXEC);
create_process_unixsock(&ckpcmd.main);
fcntl(ckpcmd.main.us.sockd, F_SETFD, FD_CLOEXEC);
create_pthread(&ckp.pth_listener, listener, NULL);
handler.sa_handler = sighandler;
handler.sa_flags = 0;
@ -9134,7 +9345,7 @@ int main(int argc, char **argv)
everyone_die = true;
trigger = start = time(NULL);
while (socketer_using_data || summariser_using_data ||
while (socksetup_using_data || summariser_using_data ||
logger_using_data || plistener_using_data ||
clistener_using_data || blistener_using_data ||
marker_using_data || breakdown_using_data) {
@ -9152,7 +9363,7 @@ int main(int argc, char **argv)
snprintf(buf, sizeof(buf),
"%s %ds due to%s%s%s%s%s%s%s%s%s\n",
msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY,
socksetup_using_data ? " socksetup" : EMPTY,
summariser_using_data ? " summariser" : EMPTY,
logger_using_data ? " logger" : EMPTY,
plistener_using_data ? " plistener" : EMPTY,

30
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.420"
#define CKDB_VERSION DB_VERSION"-2.430"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -371,24 +371,6 @@ extern tv_t last_share_inv;
extern tv_t last_auth;
extern cklock_t last_lock;
// Running stats
// replier()
extern double reply_full_us;
extern uint64_t reply_sent, reply_cant, reply_discarded, reply_fails;
// socketer()
extern tv_t sock_stt;
extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us;
extern uint64_t sock_proc_early, sock_processed, sock_acc, sock_recv;
// breaker() summarised
extern tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
extern uint64_t break_reload_processed, break_cmd_processed;
// clistener()
extern double clis_us;
extern uint64_t clis_processed;
// blistener()
extern double blis_us;
extern uint64_t blis_processed;
#define JSON_TRANSFER "json="
#define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1)
#define JSON_BEGIN '{'
@ -1184,6 +1166,8 @@ extern K_STORE *msgline_store;
// BREAKQUEUE
typedef struct breakqueue {
char *buf;
char *source;
int access;
tv_t accepted; // socket accepted or line read
tv_t now; // msg read or line read
int seqentryflags;
@ -2964,6 +2948,7 @@ enum reply_type {
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnowts(ts_t *now);
extern void setnow(tv_t *now);
extern void status_report(tv_t *now);
extern void tick();
extern PGconn *dbconnect();
extern void sequence_report(bool lock);
@ -3089,7 +3074,9 @@ extern char *_ms_to_buf(time_t *data, char *buf, size_t siz, WHERE_FFL_ARGS);
extern char *_transfer_data(K_ITEM *item, WHERE_FFL_ARGS);
extern void dsp_transfer(K_ITEM *item, FILE *stream);
extern cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_transfer(K_TREE *trf_root, char *name);
#define find_transfer(_trf_root, _name) \
_find_transfer(_trf_root, _name, WHERE_FFL_HERE)
extern K_ITEM *_find_transfer(K_TREE *trf_root, char *name, WHERE_FFL_ARGS);
#define optional_name(_root, _name, _len, _patt, _reply, _siz) \
_optional_name(_root, _name, _len, _patt, _reply, _siz, \
WHERE_FFL_HERE)
@ -3555,12 +3542,13 @@ extern bool check_db_version(PGconn *conn);
// *** ckdb_cmd.c
// ***
// TODO: limit access by having seperate sockets for each
#define ACCESS_POOL (1 << 0)
#define ACCESS_SYSTEM (1 << 1)
#define ACCESS_WEB (1 << 2)
#define ACCESS_PROXY (1 << 3)
#define ACCESS_CKDB (1 << 4)
#define ACCESS_ALL (ACCESS_POOL | ACCESS_SYSTEM | ACCESS_WEB | ACCESS_PROXY | \
ACCESS_CKDB)
struct CMDS {
enum cmd_values cmd_val;

120
src/ckdb_cmd.c

@ -6821,8 +6821,7 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id,
return(buf);
}
/* Show a share status report on the console
* Currently: sequence status, OoO info and max_sockd_count */
// Show a status report on the console
static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet,
@ -6830,122 +6829,9 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused K_TREE *trf_root,
__maybe_unused bool reload_data)
{
char ooo_buf[256];
char buf[256];
int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int pool0_count, poolq_count, rep_max_fd;
int64_t _earlysock_left, _pool0_discarded, _pool0_tot;
uint64_t count1, count2, count3, count4;
double tot1, tot2;
LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
sequence_report(true);
K_RLOCK(breakqueue_free);
relq_count = reload_breakqueue_store->count;
_reload_processing = reload_processing;
relqd_count = reload_done_breakqueue_store->count;
cmdq_count = cmd_breakqueue_store->count;
_cmd_processing = cmd_processing;
cmdqd_count = cmd_done_breakqueue_store->count;
_max_sockd_count = max_sockd_count;
K_RUNLOCK(breakqueue_free);
K_RLOCK(workqueue_free);
_earlysock_left = earlysock_left;
pool0_count = pool0_workqueue_store->count;
_pool0_discarded = pool0_discarded;
_pool0_tot = pool0_tot;
poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64
" pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d",
relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count,
_earlysock_left,
pool0_count, _pool0_discarded, _pool0_tot,
poolq_count, _max_sockd_count);
count1 = sock_acc ? : 1;
count2 = sock_recv ? : 1;
count3 = sock_proc_early ? : 1;
count4 = sock_processed ? : 1;
LOGWARNING(" sock: t%fs sock t%fs/t%"PRIu64"/av%fs"
" recv t%fs/t%"PRIu64"/av%fs"
" lckw t%fs/t%"PRIu64"/av%fs"
" lckb t%fs/t%"PRIu64"/av%fs",
tvdiff(now, &sock_stt),
sock_us/1000000, sock_acc, (sock_us/count1)/1000000,
sock_recv_us/1000000, sock_recv,
(sock_recv_us/count2)/1000000,
sock_lock_wq_us/1000000, sock_proc_early,
(sock_lock_wq_us/count3)/1000000,
sock_lock_br_us/1000000, sock_processed,
(sock_lock_br_us/count4)/1000000);
if (!break_reload_stt.tv_sec)
tot1 = 0;
else {
if (!break_reload_fin.tv_sec)
tot1 = tvdiff(now, &break_reload_stt);
else
tot1 = tvdiff(&break_reload_fin, &break_reload_stt);
}
if (!break_cmd_stt.tv_sec)
tot2 = 0;
else
tot2 = tvdiff(now, &break_cmd_stt);
count1 = break_reload_processed ? : 1;
count2 = break_cmd_processed ? : 1;
LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
tot1, break_reload_processed, tot1/count1,
bq_reload_signals, bq_reload_wakes, bq_reload_timeouts,
tot2, break_cmd_processed, tot2/count2,
bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts);
LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
process_reload_signals, process_reload_wakes,
process_reload_timeouts,
process_socket_signals, process_socket_wakes,
process_socket_timeouts);
LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
wq_pool_signals, wq_pool_wakes, wq_pool_timeouts,
wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts,
wq_btc_signals, wq_btc_wakes, wq_btc_timeouts);
count1 = clis_processed ? : 1;
count2 = blis_processed ? : 1;
LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs"
" blistener: t%fs/t%"PRIu64"/av%fs",
clis_us/1000000, clis_processed, (clis_us/count1)/1000000,
blis_us/1000000, blis_processed, (blis_us/count2)/1000000);
rep_max_fd = rep_max_pool_sockd_fd;
if (rep_max_fd < rep_max_cmd_sockd_fd)
rep_max_fd = rep_max_cmd_sockd_fd;
if (rep_max_fd < rep_max_btc_sockd_fd)
rep_max_fd = rep_max_btc_sockd_fd;
LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd"
" btc ^%d/^%dfd",
rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd,
rep_max_pool_sockd, rep_max_pool_sockd_fd,
rep_max_cmd_sockd, rep_max_cmd_sockd_fd,
rep_max_btc_sockd, rep_max_btc_sockd_fd);
count1 = reply_sent ? : 1;
LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64
"/t%fs/av%fs",
reply_sent, reply_cant, reply_discarded, reply_fails,
reply_full_us/1000000, (reply_full_us/count1)/1000000);
status_report(now);
snprintf(buf, sizeof(buf), "ok.%s", cmd);
LOGDEBUG("%s.%s", id, buf);

4
src/ckdb_data.c

@ -756,7 +756,7 @@ cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b)
return CMP_STR(ta->name, tb->name);
}
K_ITEM *find_transfer(K_TREE *trf_root, char *name)
K_ITEM *_find_transfer(K_TREE *trf_root, char *name, WHERE_FFL_ARGS)
{
TRANSFER transfer;
K_TREE_CTX ctx[1];
@ -766,7 +766,7 @@ K_ITEM *find_transfer(K_TREE *trf_root, char *name)
INIT_TRANSFER(&look);
look.data = (void *)(&transfer);
// trf_root stores aren't shared
return find_in_ktree_nolock(trf_root, &look, ctx);
return _find_in_ktree(trf_root, &look, ctx, false, WHERE_FFL_PASS);
}
K_ITEM *_optional_name(K_TREE *trf_root, char *name, int len, char *patt,

Loading…
Cancel
Save