Browse Source

ckdb - pqproc() thread management

master
kanoi 8 years ago
parent
commit
4b302af97f
  1. 200
      src/ckdb.c
  2. 5
      src/ckdb.h
  3. 10
      src/ckdb_cmd.c

200
src/ckdb.c

@ -120,11 +120,13 @@ static bool replier_using_data;
/* To notify thread changes /* To notify thread changes
* Set/checked under the function's main loop's first lock * Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */ * This is always a 'delta' value meaning add or subtract that many */
int queue_threads_delta = 0; int reload_queue_threads_delta = 0;
/* Use -Q to set it higher int proc_queue_threads_delta = 0;
* Setting it higher can degrade performance if the CPUs can't /* Use -Q to set them higher
* Setting them higher can degrade performance if the CPUs can't
* handle the extra locking or the threads are swapping */ * handle the extra locking or the threads are swapping */
static int queue_threads = 1; static int reload_queue_threads = 1;
static int proc_queue_threads = 1;
// -B to override calculated value // -B to override calculated value
static int reload_breakdown_threads = -1; static int reload_breakdown_threads = -1;
// This is normally the same as above, but key_update only requires 1 // This is normally the same as above, but key_update only requires 1
@ -6231,7 +6233,7 @@ static void *process_reload(__maybe_unused void *arg)
running[0] = true; running[0] = true;
// Set to create the rest of the threads // Set to create the rest of the threads
queue_threads_delta = queue_threads - 1; reload_queue_threads_delta = reload_queue_threads - 1;
LOGNOTICE("%s() starting", __func__); LOGNOTICE("%s() starting", __func__);
} }
@ -6251,9 +6253,9 @@ static void *process_reload(__maybe_unused void *arg)
break; break;
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
if (mythread == 0 && queue_threads_delta != 0) { if (mythread == 0 && reload_queue_threads_delta != 0) {
threads_delta = queue_threads_delta; threads_delta = reload_queue_threads_delta;
queue_threads_delta = 0; reload_queue_threads_delta = 0;
} else { } else {
bq_item = k_unlink_head(reload_done_breakqueue_store); bq_item = k_unlink_head(reload_done_breakqueue_store);
if (bq_item) { if (bq_item) {
@ -6362,7 +6364,8 @@ static void *process_reload(__maybe_unused void *arg)
tick(); tick();
} }
PQfinish(conn); if (conn)
PQfinish(conn);
if (mythread == 0) { if (mythread == 0) {
for (i = 1; i < THREAD_LIMIT; i++) { for (i = 1; i < THREAD_LIMIT; i++) {
@ -6831,12 +6834,15 @@ static void free_lost(SEQDATA *seqdata)
static void *pqproc(void *arg) static void *pqproc(void *arg)
{ {
static pthread_t pqproc_pt[THREAD_LIMIT];
static int n[THREAD_LIMIT];
static bool running[THREAD_LIMIT];
/* Process queued work - ensure pool0 is emptied first, /* Process queued work - ensure pool0 is emptied first,
* even if there is pending pool0 data being processed by breaker() */ * even if there is pending pool0 data being processed by breaker() */
static bool pool0 = true; static bool pool0 = true;
static tv_t wq_stt, wq_fin; static tv_t wq_stt, wq_fin;
pthread_t *queue_pt;
PGconn *conn = NULL; PGconn *conn = NULL;
K_ITEM *wq_item; K_ITEM *wq_item;
time_t now = 0; time_t now = 0;
@ -6847,28 +6853,26 @@ static void *pqproc(void *arg)
SEQSET *seqset = NULL; SEQSET *seqset = NULL;
SEQDATA *seqdata; SEQDATA *seqdata;
K_ITEM *ss_item; K_ITEM *ss_item;
int i, *n, zeros;
ts_t when, when_add; ts_t when, when_add;
int ret; int i, mythread, threads_delta = 0, done, tot, ret;
if (!arg) { if (!arg) {
setnow(&wq_stt); setnow(&wq_stt);
n = malloc(queue_threads * sizeof(int)); for (i = 0; i < THREAD_LIMIT; i++) {
queue_pt = malloc(queue_threads * sizeof(*queue_pt));
for (i = 1; i < queue_threads; i++) {
n[i] = i; n[i] = i;
create_pthread(&(queue_pt[i]), pqproc, &(n[i])); running[i] = false;
} }
} else {
i = *(int *)(arg);
if (queue_threads < 10) mythread = 0;
zeros = 1; running[0] = true;
else
zeros = (int)log10(queue_threads) + 1;
snprintf(buf, sizeof(buf), "db_p%0*dqproc", zeros, i); // Set to create the rest of the threads
proc_queue_threads_delta = proc_queue_threads - 1;
} else {
mythread = *(int *)(arg);
snprintf(buf, sizeof(buf), "db_p%02dqproc", mythread);
LOCK_INIT(buf); LOCK_INIT(buf);
rename_proc(buf); rename_proc(buf);
} }
@ -6883,34 +6887,104 @@ static void *pqproc(void *arg)
// Override checking until pool0 is complete // Override checking until pool0 is complete
wqcount = -1; wqcount = -1;
while (!everyone_die) { while (!everyone_die) {
if (mythread && !running[mythread])
break;
wq_item = NULL; wq_item = NULL;
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
if (pool0) { if (mythread == 0 && proc_queue_threads_delta != 0) {
if (earlysock_left == 0) { threads_delta = proc_queue_threads_delta;
pool0 = false; proc_queue_threads_delta = 0;
switch_msg = true; } else {
} else { if (pool0) {
wq_item = k_unlink_head(pool0_workqueue_store); if (earlysock_left == 0) {
if (wq_item) pool0 = false;
earlysock_left--; switch_msg = true;
} else {
wq_item = k_unlink_head(pool0_workqueue_store);
if (wq_item)
earlysock_left--;
}
}
if (!pool0) {
wq_item = k_unlink_head(pool_workqueue_store);
wqcount = pool_workqueue_store->count;
} }
}
if (!pool0) {
wq_item = k_unlink_head(pool_workqueue_store);
wqcount = pool_workqueue_store->count;
}
if (wqcount == 0 && wq_stt.tv_sec != 0L) if (wqcount == 0 && wq_stt.tv_sec != 0L)
setnow(&wq_fin); setnow(&wq_fin);
if (wq_item) { if (wq_item) {
if (pool0) if (pool0)
workqueue_proc0++; workqueue_proc0++;
else else
workqueue_proc1++; workqueue_proc1++;
}
} }
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
// TODO: deal with thread creation/shutdown failure
if (threads_delta != 0) {
if (threads_delta > 0) {
// Add threads
tot = 1;
done = 0;
for (i = 1; i < THREAD_LIMIT; i++) {
if (!running[i]) {
if (threads_delta > 0) {
threads_delta--;
running[i] = true;
create_pthread(&(pqproc_pt[i]),
pqproc,
&(n[i]));
done++;
tot++;
}
} else
tot++;
}
LOGWARNING("%s() created %d thread%s total=%d"
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__, done,
(done == 1) ? EMPTY : "s", tot
#if LOCK_CHECK
, next_thread_id
#endif
);
} else {
// Notify and wait for each to exit
tot = 1;
done = 0;
for (i = THREAD_LIMIT - 1; i > 0; i--) {
if (running[i]) {
if (threads_delta < 0) {
threads_delta++;
LOGNOTICE("%s() stopping %d",
__func__, i);
running[i] = false;
join_pthread(pqproc_pt[i]);
done++;
} else
tot++;
}
}
LOGWARNING("%s() stopped %d thread%s total=%d "
#if LOCK_CHECK
" next_thread_id=%d"
#endif
, __func__, done,
(done == 1) ? EMPTY : "s", tot
#if LOCK_CHECK
, next_thread_id
#endif
);
}
threads_delta = 0;
continue;
}
if (switch_msg) { if (switch_msg) {
switch_msg = false; switch_msg = false;
LOGNOTICE("%s() pool0 complete, processed %"PRIu64, LOGNOTICE("%s() pool0 complete, processed %"PRIu64,
@ -6995,9 +7069,14 @@ static void *pqproc(void *arg)
if (conn) if (conn)
PQfinish(conn); PQfinish(conn);
if (!arg) { if (mythread == 0) {
for (i = 1; i < queue_threads; i++) for (i = 1; i < THREAD_LIMIT; i++) {
join_pthread(queue_pt[i]); if (running[i]) {
running[i] = false;
LOGNOTICE("%s() waiting for %d", __func__, i);
join_pthread(pqproc_pt[i]);
}
}
} }
return NULL; return NULL;
@ -7019,7 +7098,7 @@ static void *listener(void *arg)
pthread_detach(pthread_self()); pthread_detach(pthread_self());
snprintf(buf, sizeof(buf), "db_p0qproc"); snprintf(buf, sizeof(buf), "db_p00qproc");
LOCK_INIT(buf); LOCK_INIT(buf);
rename_proc(buf); rename_proc(buf);
@ -8354,14 +8433,33 @@ int main(int argc, char **argv)
break; break;
case 'Q': case 'Q':
{ {
int qt = atoi(optarg); // N for both or N:M
char *q = strdup(optarg);
char *colon = strchr(q, ':');
int qt;
if (colon)
*(colon++) = '\0';
qt = atoi(q);
if (qt < 1 || qt > THREAD_LIMIT) { if (qt < 1 || qt > THREAD_LIMIT) {
quit(1, "Invalid queue " quit(1, "Invalid queue "
"thread count %d " "thread count '%s' "
"- must be >0 and <=%d", "- must be >0 and <=%d",
qt, THREAD_LIMIT); colon ? q : optarg,
THREAD_LIMIT);
}
reload_queue_threads = qt;
if (!colon)
proc_queue_threads = qt;
else {
qt = atoi(colon);
if (qt < 1 || qt > THREAD_LIMIT) {
quit(1, "Invalid 2nd queue "
"thread count '%s' "
"- must be >0 and <=%d",
colon, THREAD_LIMIT);
}
proc_queue_threads = qt;
} }
queue_threads = qt;
} }
break; break;
case 'r': case 'r':

5
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.7" #define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.404" #define CKDB_VERSION DB_VERSION"-2.405"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -123,7 +123,8 @@ extern enum free_modes free_mode;
/* To notify thread changes /* To notify thread changes
* Set/checked under the function's main loop's first lock * Set/checked under the function's main loop's first lock
* This is always a 'delta' value meaning add or subtract that many */ * This is always a 'delta' value meaning add or subtract that many */
extern int queue_threads_delta; extern int reload_queue_threads_delta;
extern int proc_queue_threads_delta;
// To notify thread changes // To notify thread changes
extern int reload_breakdown_threads_delta; extern int reload_breakdown_threads_delta;
extern int cmd_breakdown_threads_delta; extern int cmd_breakdown_threads_delta;

10
src/ckdb_cmd.c

@ -8398,10 +8398,18 @@ static char *cmd_threads(__maybe_unused PGconn *conn, char *cmd, char *id,
strcasecmp(name, "process_reload") == 0) { strcasecmp(name, "process_reload") == 0) {
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);
// Just overwrite whatever's there // Just overwrite whatever's there
queue_threads_delta = delta_value; reload_queue_threads_delta = delta_value;
K_WUNLOCK(breakqueue_free); K_WUNLOCK(breakqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value); snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply); return strdup(reply);
} else if (strcasecmp(name, "pq") == 0 ||
strcasecmp(name, "pqproc") == 0) {
K_WLOCK(workqueue_free);
// Just overwrite whatever's there
proc_queue_threads_delta = delta_value;
K_WUNLOCK(workqueue_free);
snprintf(reply, siz, "ok.delta %d request sent", delta_value);
return strdup(reply);
} else if (strcasecmp(name, "rb") == 0 || } else if (strcasecmp(name, "rb") == 0 ||
strcasecmp(name, "reload_breaker") == 0) { strcasecmp(name, "reload_breaker") == 0) {
K_WLOCK(breakqueue_free); K_WLOCK(breakqueue_free);

Loading…
Cancel
Save