Browse Source

ckdb - remove a wakeup semaphore and replace with a short sleep

master
kanoi 10 years ago
parent
commit
f36b2763bc
  1. 17
      src/ckdb.c

17
src/ckdb.c

@ -727,7 +727,6 @@ typedef struct workqueue {
static K_LIST *workqueue_free; static K_LIST *workqueue_free;
static K_STORE *workqueue_store; static K_STORE *workqueue_store;
static sem_t workqueue_sem;
// TRANSFER // TRANSFER
#define NAME_SIZE 63 #define NAME_SIZE 63
@ -6067,7 +6066,6 @@ static bool setup_data()
WORKINFO wi; WORKINFO wi;
cklock_init(&fpm_lock); cklock_init(&fpm_lock);
cksem_init(&workqueue_sem);
cksem_init(&socketer_sem); cksem_init(&socketer_sem);
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
@ -8381,8 +8379,6 @@ static void *socketer(__maybe_unused void *arg)
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
k_add_tail(workqueue_store, item); k_add_tail(workqueue_store, item);
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
cksem_post(&workqueue_sem);
break; break;
// Code error // Code error
default: default:
@ -8670,7 +8666,7 @@ static bool reload_from(tv_t *start)
return ret; return ret;
} }
static void process_queued(K_ITEM *wq_item) static void process_queued(PGconn *conn, K_ITEM *wq_item)
{ {
static char *last_buf = NULL; static char *last_buf = NULL;
WORKQUEUE *workqueue; WORKQUEUE *workqueue;
@ -8681,7 +8677,7 @@ static void process_queued(K_ITEM *wq_item)
// Simply ignore the (very rare) duplicates // Simply ignore the (very rare) duplicates
if (!last_buf || strcmp(workqueue->buf, last_buf)) { if (!last_buf || strcmp(workqueue->buf, last_buf)) {
ans = cmds[workqueue->which_cmds].func(NULL, workqueue->cmd, workqueue->id, ans = cmds[workqueue->which_cmds].func(conn, workqueue->cmd, workqueue->id,
&(workqueue->now), workqueue->by, &(workqueue->now), workqueue->by,
workqueue->code, workqueue->inet, workqueue->code, workqueue->inet,
&(workqueue->cd), workqueue->trf_root); &(workqueue->cd), workqueue->trf_root);
@ -8712,6 +8708,7 @@ static void process_queued(K_ITEM *wq_item)
// TODO: equivalent of api_allow // TODO: equivalent of api_allow
static void *listener(void *arg) static void *listener(void *arg)
{ {
PGconn *conn = NULL;
pthread_t log_pt; pthread_t log_pt;
pthread_t sock_pt; pthread_t sock_pt;
pthread_t summ_pt; pthread_t summ_pt;
@ -8748,18 +8745,22 @@ static void *listener(void *arg)
startup_complete = true; startup_complete = true;
} }
conn = dbconnect();
// Process queued work // Process queued work
while (!everyone_die) { while (!everyone_die) {
K_WLOCK(workqueue_store); K_WLOCK(workqueue_store);
wq_item = k_unlink_head(workqueue_store); wq_item = k_unlink_head(workqueue_store);
K_WUNLOCK(workqueue_store); K_WUNLOCK(workqueue_store);
if (wq_item) { if (wq_item) {
process_queued(wq_item); process_queued(conn, wq_item);
tick(); tick();
} else } else
cksem_mswait(&workqueue_sem, 420); cksleep_ms(4);
} }
PQfinish(conn);
return NULL; return NULL;
} }

Loading…
Cancel
Save