diff --git a/src/ckdb.c b/src/ckdb.c index 3c17a671..23ae4664 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -727,7 +727,6 @@ typedef struct workqueue { static K_LIST *workqueue_free; static K_STORE *workqueue_store; -static sem_t workqueue_sem; // TRANSFER #define NAME_SIZE 63 @@ -6067,7 +6066,6 @@ static bool setup_data() WORKINFO wi; cklock_init(&fpm_lock); - cksem_init(&workqueue_sem); cksem_init(&socketer_sem); workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), @@ -8381,8 +8379,6 @@ static void *socketer(__maybe_unused void *arg) K_WLOCK(workqueue_free); k_add_tail(workqueue_store, item); K_WUNLOCK(workqueue_free); - - cksem_post(&workqueue_sem); break; // Code error default: @@ -8670,7 +8666,7 @@ static bool reload_from(tv_t *start) return ret; } -static void process_queued(K_ITEM *wq_item) +static void process_queued(PGconn *conn, K_ITEM *wq_item) { static char *last_buf = NULL; WORKQUEUE *workqueue; @@ -8681,7 +8677,7 @@ static void process_queued(K_ITEM *wq_item) // Simply ignore the (very rare) duplicates if (!last_buf || strcmp(workqueue->buf, last_buf)) { - ans = cmds[workqueue->which_cmds].func(NULL, workqueue->cmd, workqueue->id, + ans = cmds[workqueue->which_cmds].func(conn, workqueue->cmd, workqueue->id, &(workqueue->now), workqueue->by, workqueue->code, workqueue->inet, &(workqueue->cd), workqueue->trf_root); @@ -8712,6 +8708,7 @@ static void process_queued(K_ITEM *wq_item) // TODO: equivalent of api_allow static void *listener(void *arg) { + PGconn *conn = NULL; pthread_t log_pt; pthread_t sock_pt; pthread_t summ_pt; @@ -8748,18 +8745,22 @@ static void *listener(void *arg) startup_complete = true; } + conn = dbconnect(); + // Process queued work while (!everyone_die) { K_WLOCK(workqueue_store); wq_item = k_unlink_head(workqueue_store); K_WUNLOCK(workqueue_store); if (wq_item) { - process_queued(wq_item); + process_queued(conn, wq_item); tick(); } else - cksem_mswait(&workqueue_sem, 420); + cksleep_ms(4); } + PQfinish(conn); + return NULL; }