|
|
|
@ -97,6 +97,8 @@ struct connector_data {
|
|
|
|
|
/* For protecting the pending sends list */ |
|
|
|
|
mutex_t sender_lock; |
|
|
|
|
pthread_cond_t sender_cond; |
|
|
|
|
|
|
|
|
|
ckwq_t *ckwqs; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
typedef struct connector_data cdata_t; |
|
|
|
@ -242,7 +244,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
|
|
|
|
|
char buf[256]; |
|
|
|
|
|
|
|
|
|
sprintf(buf, "dropclient=%"PRId64, id); |
|
|
|
|
send_proc(ckp->stratifier, buf); |
|
|
|
|
async_send_proc(ckp, ckp->stratifier, buf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Invalidate this instance. Remove them from the hashtables we look up
|
|
|
|
@ -360,9 +362,9 @@ reparse:
|
|
|
|
|
* filtered by the stratifier. */ |
|
|
|
|
if (likely(client->fd != -1)) { |
|
|
|
|
if (ckp->passthrough) |
|
|
|
|
send_proc(ckp->generator, s); |
|
|
|
|
async_send_proc(ckp, ckp->generator, s); |
|
|
|
|
else |
|
|
|
|
send_proc(ckp->stratifier, s); |
|
|
|
|
async_send_proc(ckp, ckp->stratifier, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
free(s); |
|
|
|
@ -846,6 +848,8 @@ int connector(proc_instance_t *pi)
|
|
|
|
|
LOGWARNING("%s connector starting", ckp->name); |
|
|
|
|
ckp->data = cdata; |
|
|
|
|
cdata->ckp = ckp; |
|
|
|
|
/* Connector only requires one work queue */ |
|
|
|
|
ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); |
|
|
|
|
|
|
|
|
|
if (!ckp->serverurls) |
|
|
|
|
cdata->serverfd = ckalloc(sizeof(int *)); |
|
|
|
|