Browse Source

ckdb - add thread control for b/clistener and comment about ckdb thread setup

master
kanoi 8 years ago
parent
commit
3beb8cc049
  1. 351
      src/ckdb.c
  2. 7
      src/ckdb.h
  3. 16
      src/ckdb_cmd.c

351
src/ckdb.c

@ -14,6 +14,48 @@
* Consider adding row level locking (a per kitem usage count) if needed
*/
/* Thread layout
* -------------
* Any thread that manages a thread count will have 00 in it's name
* and name each subsequent thread it creates, with the same name
* but with 01, 02 etc, and then wait on them all before exiting
* The 2 digit 00 relates to THREAD_LIMIT which is 99
* there's a limit of THREAD_LIMIT active threads per thread manager
* WARNING - however the total number of threads created is limited by the
* LOCK_CHECK code which allows only MAX_THREADS total ckdb thread to
* be created irrelevant of any being deleted
*
* The threads that can be managed have a command option to set them when
* running ckdb and can also be changed via the cmd_threads socket command
*
* The main() 'ckdb' thread starts:
* iomsgs() for filelog '_fiomsgs' and console '_ciomsgs'
* listener() '_p00qproc'
* which manages it's thread count in pqproc()
*
* listener() starts:
* breakdown() for reload '_r00breaker' and cmd '_c00breaker'
* each of which manage their thead counts
* logger() '_logger'
* socksetup() '_socksetup'
* summariser() '_summarise'
* marker() '_marker'
* then calls setup_data() which calls reload()
* then calls pqproc()
* which manages the thread count
*
* socksetup() starts:
* replier() for pool '_preplier' cmd '_creplier' and btc '_breplier'
* listener_all() for cmd '_c00listen' and btc '_b00listen'
* each of which manage their thead counts
* process_socket() '_procsock'
* sockrun() for ckpool '_psockrun' web '_wsockrun' and cmd '_csockrun'
*
* reload() starts:
* process_reload() '_p00rload'
* which manages it's thread count
*/
/* Startup
* -------
* During startup we load the DB and track where it is up to with
@ -157,6 +199,11 @@ static int cmd_breakdown_threads = -1;
int reload_breakdown_threads_delta = 0;
int cmd_breakdown_threads_delta = 0;
int cmd_listener_threads = 2;
int btc_listener_threads = 2;
int cmd_listener_threads_delta = 0;
int btc_listener_threads_delta = 0;
// Lock used to determine when the last breakdown thread exits
static cklock_t breakdown_lock;
@ -381,10 +428,10 @@ static uint64_t sock_acc[MAXSOCK], sock_recv[MAXSOCK];
// breaker() summarised
static tv_t break_reload_stt, break_cmd_stt, break_reload_fin;
static uint64_t break_reload_processed, break_cmd_processed;
// clistener()
// listener_all()
static cklock_t listener_all_lock;
static double clis_us;
static uint64_t clis_processed;
// blistener()
static double blis_us;
static uint64_t blis_processed;
@ -958,7 +1005,7 @@ static void ioprocess(IOQUEUE *io)
flock(logfd, LOCK_EX);
if (io->errn) {
fprintf(LOGFP, "%s%s with errno %d: %s\n",
stamp, io->msg,
stamp, io->msg,
io->errn, strerror(io->errn));
} else
fprintf(LOGFP, "%s%s\n", stamp, io->msg);
@ -3202,7 +3249,7 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd,
goto gotseqset;
}
}
}
}
// Need to setup a new seqset
newseq = true;
@ -3464,7 +3511,7 @@ gotseqset:
}
}
seqdata->seqbase++;
}
}
seqdata->maxseq++;
}
// store n_seqcmd
@ -4525,7 +4572,7 @@ static void *breaker(void *arg)
}
K_WUNLOCK(breakqueue_free);
while (!everyone_die) {
if (mythread && !breaker_running[typ][mythread])
if (mythread && !breaker_running[typ][mythread])
break;
K_WLOCK(breakqueue_free);
@ -6051,37 +6098,163 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t
tick();
}
static void *clistener(__maybe_unused void *arg)
struct listener_setup {
int bc;
int thread;
};
#define BC_B 0
#define BC_C 1
static pthread_t listener_pt[2][THREAD_LIMIT];
static struct listener_setup listener_setup[2][THREAD_LIMIT];
static void *listener_all(void *arg)
{
static bool running[2][THREAD_LIMIT];
struct listener_setup *setup;
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
char buf[128];
time_t now;
ts_t when, when_add;
int ret;
int i, typ, mythread, done, tot, ret;
int listener_delta = 0;
pthread_detach(pthread_self());
setup = (struct listener_setup *)(arg);
typ = setup->bc;
mythread = setup->thread;
snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__);
snprintf(buf, sizeof(buf), "db%s_%c%02d%s",
dbcode, (typ == BC_B) ? 'b' : 'c', mythread, "listen");
LOCK_INIT(buf);
rename_proc(buf);
LOGNOTICE("%s() processing", __func__);
if (mythread == 0) {
pthread_detach(pthread_self());
for (i = 1; i < THREAD_LIMIT; i++) {
listener_setup[typ][i].thread = i;
listener_setup[typ][i].bc = typ;
running[typ][i] = false;
}
running[typ][0] = true;
if (typ == BC_B)
listener_delta = btc_listener_threads - 1;
else
listener_delta = cmd_listener_threads - 1;
LOGNOTICE("%s() %s initialised - delta %d",
__func__, (typ == BC_B) ? "btc" : "cmd", listener_delta);
}
LOGNOTICE("%s() %s processing", __func__, buf);
when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
clistener_using_data = true;
if (typ == BC_B)
blistener_using_data = true;
else
clistener_using_data = true;
conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
if (mythread && !running[typ][mythread])
break;
wq_item = NULL;
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(cmd_workqueue_store);
if (mythread == 0 && typ == BC_B &&
btc_listener_threads_delta != 0) {
listener_delta = btc_listener_threads_delta;
btc_listener_threads_delta = 0;
} else if (mythread == 0 && typ == BC_C &&
cmd_listener_threads_delta != 0) {
listener_delta = cmd_listener_threads_delta;
cmd_listener_threads_delta = 0;
} else {
if (typ == BC_B)
wq_item = k_unlink_head(btc_workqueue_store);
else
wq_item = k_unlink_head(cmd_workqueue_store);
}
K_WUNLOCK(workqueue_free);
// TODO: deal with thread creation/shutdown failure
if (listener_delta != 0) {
if (listener_delta > 0) {
// Add threads
tot = 1;
done = 0;
for (i = 1; i < THREAD_LIMIT; i++) {
if (!running[typ][i]) {
if (listener_delta > 0) {
listener_delta--;
running[typ][i] = true;
create_pthread(&(listener_pt[typ][i]),
listener_all,
&(listener_setup[typ][i]));
done++;
tot++;
}
} else
tot++;
}
LOGWARNING("%s() created %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
(typ == BC_B) ? "btc" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
} else {
// Notify and wait for each to exit
tot = 1;
done = 0;
for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (running[typ][i]) {
if (listener_delta < 0) {
listener_delta++;
LOGNOTICE("%s() %s stopping %d",
__func__,
(typ == BC_B) ? "btc" : "cmd",
i);
running[typ][i] = false;
join_pthread(listener_pt[typ][i]);
done++;
} else
tot++;
}
}
LOGWARNING("%s() stopped %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
(typ == BC_B) ? "btc" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
}
listener_delta = 0;
continue;
}
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
@ -6091,100 +6264,68 @@ static void *clistener(__maybe_unused void *arg)
if (wq_item) {
setnow(&now1);
process_sockd(conn, wq_item, REPLIER_CMD);
process_sockd(conn, wq_item,
(typ == BC_B) ? REPLIER_BTC : REPLIER_CMD);
setnow(&now2);
clis_us += us_tvdiff(&now2, &now1);
clis_processed++;
ck_wlock(&listener_all_lock);
if (typ == BC_B) {
blis_us += us_tvdiff(&now2, &now1);
blis_processed++;
} else {
clis_us += us_tvdiff(&now2, &now1);
clis_processed++;
}
ck_wunlock(&listener_all_lock);
} else {
setnowts(&when);
timeraddspec(&when, &when_add);
mutex_lock(&wq_cmd_waitlock);
ret = cond_timedwait(&wq_cmd_waitcond,
&wq_cmd_waitlock, &when);
if (ret == 0)
wq_cmd_wakes++;
else if (errno == ETIMEDOUT)
wq_cmd_timeouts++;
mutex_unlock(&wq_cmd_waitlock);
if (typ == BC_B) {
mutex_lock(&wq_btc_waitlock);
ret = cond_timedwait(&wq_btc_waitcond,
&wq_btc_waitlock, &when);
if (ret == 0)
wq_btc_wakes++;
else if (errno == ETIMEDOUT)
wq_btc_timeouts++;
mutex_unlock(&wq_btc_waitlock);
} else {
mutex_lock(&wq_cmd_waitlock);
ret = cond_timedwait(&wq_cmd_waitcond,
&wq_cmd_waitlock, &when);
if (ret == 0)
wq_cmd_wakes++;
else if (errno == ETIMEDOUT)
wq_cmd_timeouts++;
mutex_unlock(&wq_cmd_waitlock);
}
}
}
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, clis_processed);
clistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
static void *blistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
tv_t now1, now2;
char buf[128];
time_t now;
ts_t when, when_add;
int ret;
pthread_detach(pthread_self());
snprintf(buf, sizeof(buf), "db%s_%s", dbcode, __func__);
LOCK_INIT(buf);
rename_proc(buf);
LOGNOTICE("%s() processing", __func__);
when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000;
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
blistener_using_data = true;
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(btc_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
if (mythread != 0)
LOGNOTICE("%s() %s exiting", __func__, buf);
else {
for (i = 1; i < THREAD_LIMIT; i++) {
if (running[typ][i]) {
running[typ][i] = false;
LOGNOTICE("%s() %s waiting for %d",
__func__, buf, i);
join_pthread(listener_pt[typ][i]);
}
}
if (wq_item) {
setnow(&now1);
process_sockd(conn, wq_item, REPLIER_BTC);
setnow(&now2);
blis_us += us_tvdiff(&now2, &now1);
blis_processed++;
if (typ == BC_B) {
LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, blis_processed);
blistener_using_data = false;
} else {
setnowts(&when);
timeraddspec(&when, &when_add);
mutex_lock(&wq_btc_waitlock);
ret = cond_timedwait(&wq_btc_waitcond,
&wq_btc_waitlock, &when);
if (ret == 0)
wq_btc_wakes++;
else if (errno == ETIMEDOUT)
wq_btc_timeouts++;
mutex_unlock(&wq_btc_waitlock);
LOGNOTICE("%s() %s exiting, processed %"PRIu64, __func__, buf, clis_processed);
clistener_using_data = false;
}
}
LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, blis_processed);
blistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
@ -6841,7 +6982,7 @@ static void *socksetup(__maybe_unused void *arg)
{
pthread_t prep_pt, crep_pt, brep_pt;
enum reply_type p_typ, c_typ, b_typ;
pthread_t clis_pt, blis_pt, proc_pt;
pthread_t proc_pt;
pthread_t psock_pt, wsock_pt, csock_pt;
char nbuf[64];
@ -6868,9 +7009,15 @@ static void *socksetup(__maybe_unused void *arg)
LOGWARNING("%s() Start processing...", __func__);
socksetup_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
listener_setup[BC_B][0].bc = BC_B;
listener_setup[BC_B][0].thread = 0;
create_pthread(&listener_pt[BC_B][0], listener_all,
&(listener_setup[BC_B][0]));
create_pthread(&blis_pt, blistener, NULL);
listener_setup[BC_C][0].bc = BC_C;
listener_setup[BC_C][0].thread = 0;
create_pthread(&listener_pt[BC_C][0], listener_all,
&(listener_setup[BC_C][0]));
create_pthread(&proc_pt, process_socket, NULL);
@ -7429,7 +7576,7 @@ static bool reload_from(tv_t *start, const tv_t *finish)
* Also since ckpool messages are not in order, we could be
* aborting early and not get the few slightly later out of
* order messages in the log file */
while (!everyone_die &&
while (!everyone_die &&
logline(reload_buf, MAX_READ, fp, filename)) {
reload_line(filename, reload_buf, ++count);
@ -8995,6 +9142,7 @@ static struct option long_options[] = {
// override calculated value
{ "breakdown-threads", required_argument, 0, 'B' },
{ "config", required_argument, 0, 'c' },
{ "cmd-listener-threads", required_argument, 0, 'C' },
{ "dbname", required_argument, 0, 'd' },
{ "minsdiff", required_argument, 0, 'D' },
{ "free", required_argument, 0, 'f' },
@ -9077,7 +9225,7 @@ int main(int argc, char **argv)
memset(&ckpcmd, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:Ab:B:c:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "a:Ab:B:c:C:d:D:f:ghi:IkK:l:L:mM:n:N:o:p:P:q:Q:r:R:s:S:t:Tu:U:vw:xXyY:", long_options, &i)) != -1) {
switch(c) {
case '?':
case ':':
@ -9120,6 +9268,18 @@ int main(int argc, char **argv)
case 'c':
ckp.config = strdup(optarg);
break;
case 'C':
{
int cl = atoi(optarg);
if (cl < 1 || cl > THREAD_LIMIT) {
quit(1, "Invalid listener "
"thread count %d "
"- must be >0 and <=%d",
cl, THREAD_LIMIT);
}
cmd_listener_threads = cl;
}
break;
case 'd':
db_name = strdup(optarg);
kill = optarg;
@ -9467,6 +9627,7 @@ int main(int argc, char **argv)
cklock_init(&breakdown_lock);
cklock_init(&replier_lock);
cklock_init(&listener_all_lock);
cklock_init(&last_lock);
cklock_init(&btc_lock);
cklock_init(&poolinstance_lock);

7
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.508"
#define CKDB_VERSION DB_VERSION"-2.509"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -129,6 +129,11 @@ extern int proc_queue_threads_delta;
extern int reload_breakdown_threads_delta;
extern int cmd_breakdown_threads_delta;
extern int cmd_listener_threads;
extern int btc_listener_threads;
extern int cmd_listener_threads_delta;
extern int btc_listener_threads_delta;
#define BLANK " "
extern char *EMPTY;
extern const char *nullstr;

16
src/ckdb_cmd.c

@ -8463,6 +8463,22 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "cl") == 0 ||
strcasecmp(name, "cmd_listener") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
cmd_listener_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "bl") == 0 ||
strcasecmp(name, "btc_listener") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
btc_listener_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else {
snprintf(reply, siz, "unknown name '%s'", name);
LOGERR("%s() %s.%s", __func__, id, reply);

Loading…
Cancel
Save