From 8708b36b8da77a3465d8a8a96d94fb3e9275a6ee Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:40:02 +1100 Subject: [PATCH] Use async send proc in the connector --- src/connector.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index d79d441a..6c0bf43b 100644 --- a/src/connector.c +++ b/src/connector.c @@ -97,6 +97,8 @@ struct connector_data { /* For protecting the pending sends list */ mutex_t sender_lock; pthread_cond_t sender_cond; + + ckwq_t *ckwqs; }; typedef struct connector_data cdata_t; @@ -242,7 +244,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) char buf[256]; sprintf(buf, "dropclient=%"PRId64, id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } /* Invalidate this instance. Remove them from the hashtables we look up @@ -360,9 +362,9 @@ reparse: * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) - send_proc(ckp->generator, s); + async_send_proc(ckp, ckp->generator, s); else - send_proc(ckp->stratifier, s); + async_send_proc(ckp, ckp->stratifier, s); } free(s); @@ -846,6 +848,8 @@ int connector(proc_instance_t *pi) LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; cdata->ckp = ckp; + /* Connector only requires one work queue */ + ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); if (!ckp->serverurls) cdata->serverfd = ckalloc(sizeof(int *));