Browse Source

ckdb - breaker() thread management

master
kanoi 8 years ago
parent
commit
de85c3fb5a
  1. 341
      src/ckdb.c
  2. 5
      src/ckdb.h
  3. 16
      src/ckdb_cmd.c

341
src/ckdb.c

@ -126,13 +126,16 @@ int queue_threads_delta = 0;
* handle the extra locking or the threads are swapping */ * handle the extra locking or the threads are swapping */
static int queue_threads = 1; static int queue_threads = 1;
// -B to override calculated value // -B to override calculated value
static int breakdown_threads = -1; static int reload_breakdown_threads = -1;
// This is normally the same as above, but key_update only requires 1
static int cmd_breakdown_threads = -1;
// cpu count to breakdown thread ratio // cpu count to breakdown thread ratio
#define BREAKDOWN_RATIO 3 #define BREAKDOWN_RATIO 3
static int reload_breakdown_count = 0; // Flags to notify thread changes
static int cmd_breakdown_count = 0; int reload_breakdown_threads_delta = 0;
/* Lock for access to *breakdown_count int cmd_breakdown_threads_delta = 0;
* Any change to/from 0 will update breakdown_using_data */
// Lock used to determine when the last breakdown thread exits
static cklock_t breakdown_lock; static cklock_t breakdown_lock;
static int replier_count = 0; static int replier_count = 0;
@ -3719,23 +3722,63 @@ nogood:
return CMD_REPLY; return CMD_REPLY;
} }
struct breaker_setup {
bool reload;
int thread;
};
#define ISRELOAD 0
#define ISCMD 1
static void *breaker(void *arg) static void *breaker(void *arg)
{ {
static pthread_t breaker_pt[2][THREAD_LIMIT];
static struct breaker_setup breaker_setup[2][THREAD_LIMIT];
static bool breaker_running[2][THREAD_LIMIT];
static bool reload0 = false;
static bool cmd0 = false;
struct breaker_setup *setup;
K_ITEM *bq_item = NULL; K_ITEM *bq_item = NULL;
BREAKQUEUE *bq = NULL; BREAKQUEUE *bq = NULL;
MSGLINE *msgline = NULL; MSGLINE *msgline = NULL;
char buf[128]; char buf[128];
int thr, zeros; bool reload, was_null, msg;
bool reload, was_null, msg = false;
int queue_sleep, queue_limit, count; int queue_sleep, queue_limit, count;
uint64_t processed = 0; uint64_t processed = 0;
ts_t when, when_add; ts_t when, when_add;
int ret; int i, typ, mythread, done, tot, ret;
int breaker_delta = 0;
pthread_detach(pthread_self()); setup = (struct breaker_setup *)(arg);
mythread = setup->thread;
if ((reload = setup->reload))
typ = ISRELOAD;
else
typ = ISCMD;
if (mythread == 0) {
pthread_detach(pthread_self());
for (i = 0; i < THREAD_LIMIT; i++) {
breaker_setup[typ][i].thread = i;
breaker_setup[typ][i].reload = reload;
breaker_running[typ][i] = false;
}
breaker_running[typ][0] = true;
if (reload) {
reload0 = true;
breaker_delta = reload_breakdown_threads - 1;
} else {
cmd0 = true;
breaker_delta = cmd_breakdown_threads - 1;
}
LOGNOTICE("%s() %s initialised - delta %d",
__func__, reload ? "reload" : "cmd", breaker_delta);
}
// Is this a reload thread or a cmd thread?
reload = *(bool *)(arg);
if (reload) { if (reload) {
queue_limit = reload_queue_limit; queue_limit = reload_queue_limit;
queue_sleep = RELOAD_QUEUE_SLEEP_MS; queue_sleep = RELOAD_QUEUE_SLEEP_MS;
@ -3748,37 +3791,22 @@ static void *breaker(void *arg)
when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000;
} }
ck_wlock(&breakdown_lock); snprintf(buf, sizeof(buf), "db_%c%02d%s",
if (reload) reload ? 'r' : 'c', mythread, __func__);
thr = ++reload_breakdown_count;
else
thr = ++cmd_breakdown_count;
breakdown_using_data = true;
ck_wunlock(&breakdown_lock);
if (breakdown_threads < 10)
zeros = 1;
else
zeros = (int)log10(breakdown_threads) + 1;
snprintf(buf, sizeof(buf), "db_%c%0*d%s",
reload ? 'r' : 'c', zeros, thr, __func__);
LOCK_INIT(buf); LOCK_INIT(buf);
rename_proc(buf); rename_proc(buf);
LOGNOTICE("%s() %s %s starting", LOGNOTICE("%s() %s starting", __func__, buf);
__func__, buf, reload ? "reload" : "cmd");
if (reload) { if (reload) {
/* reload has to wait for the reload to start, however, also /* reload has to wait for the reload to start, however, also
* check for startup_complete in case we miss the reload */ * check for startup_complete in case we miss the reload */
while (!everyone_die && !reloading && !startup_complete) while (!everyone_die && !reloading && !startup_complete)
cksleep_ms(queue_sleep); cksleep_ms(queue_sleep);
LOGNOTICE("%s() %s reload processing", __func__, buf);
} }
LOGNOTICE("%s() %s processing", __func__, buf);
// The first one to start // The first one to start
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
if (reload) { if (reload) {
@ -3790,31 +3818,112 @@ static void *breaker(void *arg)
} }
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
while (!everyone_die) { while (!everyone_die) {
K_WLOCK(breakqueue_free); if (mythread && !breaker_running[typ][mythread])
bq_item = NULL; break;
was_null = false;
if (reload)
count = reload_done_breakqueue_store->count;
else
count = cmd_done_breakqueue_store->count;
// Don't unlink if we are above the limit K_WLOCK(breakqueue_free);
if (count <= queue_limit) { if (mythread == 0 && reload && reload_breakdown_threads_delta != 0) {
if (reload) breaker_delta = reload_breakdown_threads_delta;
bq_item = k_unlink_head(reload_breakqueue_store); reload_breakdown_threads_delta = 0;
else } else if (mythread == 0 && !reload && cmd_breakdown_threads_delta != 0) {
bq_item = k_unlink_head(cmd_breakqueue_store); breaker_delta = cmd_breakdown_threads_delta;
if (!bq_item) cmd_breakdown_threads_delta = 0;
was_null = true; } else {
} bq_item = NULL;
if (bq_item) { was_null = false;
if (reload) if (reload)
break_reload_processed++; count = reload_done_breakqueue_store->count;
else else
break_cmd_processed++; count = cmd_done_breakqueue_store->count;
// Don't unlink if we are above the limit
if (count <= queue_limit) {
if (reload)
bq_item = k_unlink_head(reload_breakqueue_store);
else
bq_item = k_unlink_head(cmd_breakqueue_store);
if (!bq_item)
was_null = true;
}
if (bq_item) {
if (reload)
break_reload_processed++;
else
break_cmd_processed++;
}
} }
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
// TODO: deal with thread creation/shutdown failure
if (breaker_delta != 0) {
if (breaker_delta > 0) {
// Add threads
tot = 1;
done = 0;
for (i = 1; i < THREAD_LIMIT; i++) {
if (!breaker_running[typ][i]) {
if (breaker_delta > 0) {
breaker_delta--;
breaker_running[typ][i] = true;
create_pthread(&(breaker_pt[typ][i]),
breaker,
&(breaker_setup[typ][i]));
done++;
tot++;
}
} else
tot++;
}
LOGWARNING("%s() created %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
reload ? "reload" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
} else {
// Notify and wait for each to exit
tot = 1;
done = 0;
for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (breaker_running[typ][i]) {
if (breaker_delta < 0) {
breaker_delta++;
LOGNOTICE("%s() %s stopping %d",
__func__,
reload ? "reload" : "cmd",
i);
breaker_running[typ][i] = false;
join_pthread(breaker_pt[typ][i]);
done++;
} else
tot++;
}
}
LOGWARNING("%s() stopped %d %s thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__,
done,
reload ? "reload" : "cmd",
(done == 1) ? EMPTY : "s",
tot
#if LOCK_CHECK
, next_thread_id
#endif
);
}
breaker_delta = 0;
continue;
}
if (!bq_item) { if (!bq_item) {
// Is the queue empty and the reload completed? // Is the queue empty and the reload completed?
if (was_null && reload && !reloading) if (was_null && reload && !reloading)
@ -3913,23 +4022,37 @@ static void *breaker(void *arg)
count = max_sockd_count; count = max_sockd_count;
K_RUNLOCK(breakqueue_free); K_RUNLOCK(breakqueue_free);
ck_wlock(&breakdown_lock); if (mythread == 0) {
if (reload) { for (i = 1; i < THREAD_LIMIT; i++) {
reload_breakdown_count--; if (breaker_running[typ][i]) {
// The last one to finish - updated each exit breaker_running[typ][i] = false;
setnow(&break_reload_fin); LOGNOTICE("%s() %s waiting for %d",
} else __func__, buf, i);
cmd_breakdown_count--; join_pthread(breaker_pt[typ][i]);
}
}
if ((reload_breakdown_count + cmd_breakdown_count) < 1) { if (reload)
breakdown_using_data = false; setnow(&break_reload_fin);
msg = true;
} msg = false;
ck_wunlock(&breakdown_lock); ck_wlock(&breakdown_lock);
if (reload)
reload0 = false;
else
cmd0 = false;
if (msg) { if (reload0 == false && cmd0 == false) {
LOGWARNING("%s() threads shut down - max_sockd_count=%d", breakdown_using_data = false;
__func__, count); msg = true;
}
ck_wunlock(&breakdown_lock);
if (msg) {
LOGWARNING("%s() threads shut down - "
"max_sockd_count=%d",
__func__, count);
}
} }
return NULL; return NULL;
@ -6124,6 +6247,9 @@ static void *process_reload(__maybe_unused void *arg)
now = time(NULL); now = time(NULL);
while (!everyone_die) { while (!everyone_die) {
if (mythread && !running[mythread])
break;
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
if (mythread == 0 && queue_threads_delta != 0) { if (mythread == 0 && queue_threads_delta != 0) {
threads_delta = queue_threads_delta; threads_delta = queue_threads_delta;
@ -6137,9 +6263,6 @@ static void *process_reload(__maybe_unused void *arg)
} }
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
if (!running[mythread])
break;
// TODO: deal with thread creation/shutdown failure // TODO: deal with thread creation/shutdown failure
if (threads_delta != 0) { if (threads_delta != 0) {
if (threads_delta > 0) { if (threads_delta > 0) {
@ -6174,11 +6297,12 @@ static void *process_reload(__maybe_unused void *arg)
// Notify and wait for each to exit // Notify and wait for each to exit
tot = 1; tot = 1;
done = 0; done = 0;
i = THREAD_LIMIT - 1;
for (i = THREAD_LIMIT - 1; i > 0; i--) { for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (running[i]) { if (running[i]) {
if (threads_delta < 0) { if (threads_delta < 0) {
threads_delta++; threads_delta++;
LOGNOTICE("%s() stopping %d",
__func__, i);
running[i] = false; running[i] = false;
join_pthread(procrel_pt[i]); join_pthread(procrel_pt[i]);
done++; done++;
@ -6240,11 +6364,13 @@ static void *process_reload(__maybe_unused void *arg)
PQfinish(conn); PQfinish(conn);
// Only when everyone_die is true
if (mythread == 0) { if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) { for (i = 1; i < THREAD_LIMIT; i++) {
if (running[i]) if (running[i]) {
running[i] = false;
LOGNOTICE("%s() waiting for %d", __func__, i);
join_pthread(procrel_pt[i]); join_pthread(procrel_pt[i]);
}
} }
LOGNOTICE("%s() exiting, processed %"PRIu64, LOGNOTICE("%s() exiting, processed %"PRIu64,
@ -6883,12 +7009,13 @@ static void *listener(void *arg)
pthread_t sock_pt; pthread_t sock_pt;
pthread_t summ_pt; pthread_t summ_pt;
pthread_t mark_pt; pthread_t mark_pt;
pthread_t break_pt; pthread_t reload_break_pt;
pthread_t cmd_break_pt;
int bq, bqp, bqd, wq0count, wqcount; int bq, bqp, bqd, wq0count, wqcount;
char ooo_buf[256]; char ooo_buf[256];
char buf[128]; char buf[128];
int cpus, i; int cpus;
bool reloader, cmder; struct breaker_setup reloader, cmder;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -6911,20 +7038,25 @@ static void *listener(void *arg)
DLPRIO(logqueue, 94); DLPRIO(logqueue, 94);
DLPRIO(breakqueue, PRIO_TERMINAL); DLPRIO(breakqueue, PRIO_TERMINAL);
#endif #endif
if (breakdown_threads <= 0) { if (reload_breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN); cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (cpus < 1) if (cpus < 1)
cpus = 1; cpus = 1;
breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; reload_breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1;
} }
cmd_breakdown_threads = reload_breakdown_threads;
LOGWARNING("%s(): creating %d*2 breaker threads ...", LOGWARNING("%s(): creating %d*2 breaker threads ...",
__func__, breakdown_threads); __func__, reload_breakdown_threads);
reloader = true;
for (i = 0; i < breakdown_threads; i++) breakdown_using_data = true;
create_pthread(&break_pt, breaker, &reloader);
cmder = false; reloader.reload = true;
for (i = 0; i < breakdown_threads; i++) reloader.thread = 0;
create_pthread(&break_pt, breaker, &cmder); create_pthread(&reload_break_pt, breaker, &reloader);
cmder.reload = false;
cmder.thread = 0;
create_pthread(&cmd_break_pt, breaker, &cmder);
if (no_data_log == false) if (no_data_log == false)
create_pthread(&log_pt, logger, NULL); create_pthread(&log_pt, logger, NULL);
@ -7274,10 +7406,12 @@ static void update_keysummary(ckpool_t *ckp)
int64_t markerid_stt, markerid_fin; int64_t markerid_stt, markerid_fin;
char *tmp, *minus; char *tmp, *minus;
tv_t db_stt, db_fin; tv_t db_stt, db_fin;
pthread_t break_pt, sock_pt; pthread_t reload_break_pt;
pthread_t cmd_break_pt;
pthread_t sock_pt;
double min, sec; double min, sec;
bool reloader, cmder; int cpus;
int cpus, i; struct breaker_setup reloader, cmder;
// Simple value check to abort early // Simple value check to abort early
if (!key_range || !(*key_range)) { if (!key_range || !(*key_range)) {
@ -7338,21 +7472,26 @@ static void update_keysummary(ckpool_t *ckp)
#if LOCK_CHECK #if LOCK_CHECK
DLPRIO(breakqueue, PRIO_TERMINAL); DLPRIO(breakqueue, PRIO_TERMINAL);
#endif #endif
if (breakdown_threads <= 0) { if (reload_breakdown_threads <= 0) {
cpus = sysconf(_SC_NPROCESSORS_ONLN); cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (cpus < 1) if (cpus < 1)
cpus = 1; cpus = 1;
breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; reload_breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1;
} }
LOGWARNING("%s(): creating %d+1 breaker threads ...",
__func__, breakdown_threads);
reloader = true;
for (i = 0; i < breakdown_threads; i++)
create_pthread(&break_pt, breaker, &reloader);
cmder = false;
// Only needs one // Only needs one
for (i = 0; i < 1; i++) cmd_breakdown_threads = 1;
create_pthread(&break_pt, breaker, &cmder); LOGWARNING("%s(): creating %d+1 breaker threads ...",
__func__, reload_breakdown_threads);
breakdown_using_data = true;
reloader.reload = true;
reloader.thread = 0;
create_pthread(&reload_break_pt, breaker, &reloader);
cmder.reload = false;
cmder.thread = 0;
create_pthread(&cmd_break_pt, breaker, &cmder);
alloc_storage(); alloc_storage();
@ -8039,7 +8178,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:f:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) {
switch(c) { switch(c) {
case '?': case '?':
case ':': case ':':
@ -8073,7 +8212,7 @@ int main(int argc, char **argv)
"- must be >0 and <=%d", "- must be >0 and <=%d",
bt, THREAD_LIMIT); bt, THREAD_LIMIT);
} }
breakdown_threads = bt; reload_breakdown_threads = bt;
} }
break; break;
case 'c': case 'c':
@ -8237,12 +8376,12 @@ int main(int argc, char **argv)
case 'S': case 'S':
btc_server = strdup(optarg); btc_server = strdup(optarg);
break; break;
case 'T':
txn_tree_store = false;
break;
case 't': case 't':
btc_timeout = atoi(optarg); btc_timeout = atoi(optarg);
break; break;
case 'T':
txn_tree_store = false;
break;
case 'u': case 'u':
db_user = strdup(optarg); db_user = strdup(optarg);
kill = optarg; kill = optarg;

5
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.402" #define CKDB_VERSION DB_VERSION"-2.403"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -124,6 +124,9 @@ extern enum free_modes free_mode;
* Set/checked under the function's main loop's first lock * Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */ * This is always a 'delta' value meaning add or subtract that many */
extern int queue_threads_delta; extern int queue_threads_delta;
/* Flags to notify thread changes */
extern int reload_breakdown_threads_delta;
extern int cmd_breakdown_threads_delta;
#define BLANK " " #define BLANK " "
extern char *EMPTY; extern char *EMPTY;

16
src/ckdb_cmd.c

@ -8402,6 +8402,22 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value); snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply); return strdup(reply);
} else if (strcasecmp(name, "rb") == 0 ||
strcasecmp(name, "reload_breaker") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
reload_breakdown_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "cb") == 0 ||
strcasecmp(name, "cmd_breaker") == 0) {
K_WLOCK(breakqueue_free);
// Just overwrite whatever's there
cmd_breakdown_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else { } else {
snprintf(reply, siz, "unknown name '%s'", name); snprintf(reply, siz, "unknown name '%s'", name);
LOGERR("%s() %s.%s", __func__, id, reply); LOGERR("%s() %s.%s", __func__, id, reply);

Loading…
Cancel
Save