|
|
@ -74,6 +74,8 @@ struct connector_data { |
|
|
|
cklock_t lock; |
|
|
|
cklock_t lock; |
|
|
|
proc_instance_t *pi; |
|
|
|
proc_instance_t *pi; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time_t start_time; |
|
|
|
|
|
|
|
|
|
|
|
/* Array of server fds */ |
|
|
|
/* Array of server fds */ |
|
|
|
int *serverfd; |
|
|
|
int *serverfd; |
|
|
|
/* All time count of clients connected */ |
|
|
|
/* All time count of clients connected */ |
|
|
@ -728,7 +730,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf) |
|
|
|
json_decref(json_msg); |
|
|
|
json_decref(json_msg); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static char *connector_stats(cdata_t *cdata) |
|
|
|
static char *connector_stats(cdata_t *cdata, const int runtime) |
|
|
|
{ |
|
|
|
{ |
|
|
|
json_t *val = json_object(), *subval; |
|
|
|
json_t *val = json_object(), *subval; |
|
|
|
client_instance_t *client; |
|
|
|
client_instance_t *client; |
|
|
@ -737,6 +739,10 @@ static char *connector_stats(cdata_t *cdata) |
|
|
|
int64_t memsize; |
|
|
|
int64_t memsize; |
|
|
|
char *buf; |
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* If called in passthrough mode we log stats instead of the stratifier */ |
|
|
|
|
|
|
|
if (runtime) |
|
|
|
|
|
|
|
json_set_int(val, "runtime", runtime); |
|
|
|
|
|
|
|
|
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
objects = HASH_COUNT(cdata->clients); |
|
|
|
objects = HASH_COUNT(cdata->clients); |
|
|
|
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; |
|
|
|
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; |
|
|
@ -773,6 +779,9 @@ static char *connector_stats(cdata_t *cdata) |
|
|
|
|
|
|
|
|
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|
json_decref(val); |
|
|
|
json_decref(val); |
|
|
|
|
|
|
|
if (runtime) |
|
|
|
|
|
|
|
LOGNOTICE("Passthrough:%s", buf); |
|
|
|
|
|
|
|
else |
|
|
|
LOGNOTICE("Connector stats: %s", buf); |
|
|
|
LOGNOTICE("Connector stats: %s", buf); |
|
|
|
return buf; |
|
|
|
return buf; |
|
|
|
} |
|
|
|
} |
|
|
@ -781,13 +790,26 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) |
|
|
|
{ |
|
|
|
{ |
|
|
|
unix_msg_t *umsg = NULL; |
|
|
|
unix_msg_t *umsg = NULL; |
|
|
|
ckpool_t *ckp = pi->ckp; |
|
|
|
ckpool_t *ckp = pi->ckp; |
|
|
|
|
|
|
|
time_t last_stats; |
|
|
|
int64_t client_id; |
|
|
|
int64_t client_id; |
|
|
|
char *buf; |
|
|
|
|
|
|
|
int ret = 0; |
|
|
|
int ret = 0; |
|
|
|
|
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
|
|
LOGWARNING("%s connector ready", ckp->name); |
|
|
|
LOGWARNING("%s connector ready", ckp->name); |
|
|
|
|
|
|
|
last_stats = cdata->start_time; |
|
|
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
retry: |
|
|
|
|
|
|
|
if (ckp->passthrough) { |
|
|
|
|
|
|
|
time_t diff = time(NULL); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (diff - last_stats >= 60) { |
|
|
|
|
|
|
|
last_stats = diff; |
|
|
|
|
|
|
|
diff -= cdata->start_time; |
|
|
|
|
|
|
|
buf = connector_stats(cdata, diff); |
|
|
|
|
|
|
|
dealloc(buf); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (umsg) { |
|
|
|
if (umsg) { |
|
|
|
Close(umsg->sockd); |
|
|
|
Close(umsg->sockd); |
|
|
|
free(umsg->buf); |
|
|
|
free(umsg->buf); |
|
|
@ -837,7 +859,7 @@ retry: |
|
|
|
char *msg; |
|
|
|
char *msg; |
|
|
|
|
|
|
|
|
|
|
|
LOGDEBUG("Connector received stats request"); |
|
|
|
LOGDEBUG("Connector received stats request"); |
|
|
|
msg = connector_stats(cdata); |
|
|
|
msg = connector_stats(cdata, 0); |
|
|
|
send_unix_msg(umsg->sockd, msg); |
|
|
|
send_unix_msg(umsg->sockd, msg); |
|
|
|
} else if (cmdmatch(buf, "loglevel")) { |
|
|
|
} else if (cmdmatch(buf, "loglevel")) { |
|
|
|
sscanf(buf, "loglevel=%d", &ckp->loglevel); |
|
|
|
sscanf(buf, "loglevel=%d", &ckp->loglevel); |
|
|
@ -982,6 +1004,7 @@ int connector(proc_instance_t *pi) |
|
|
|
cond_init(&cdata->sender_cond); |
|
|
|
cond_init(&cdata->sender_cond); |
|
|
|
create_pthread(&cdata->pth_sender, sender, cdata); |
|
|
|
create_pthread(&cdata->pth_sender, sender, cdata); |
|
|
|
create_pthread(&cdata->pth_receiver, receiver, cdata); |
|
|
|
create_pthread(&cdata->pth_receiver, receiver, cdata); |
|
|
|
|
|
|
|
cdata->start_time = time(NULL); |
|
|
|
|
|
|
|
|
|
|
|
create_unix_receiver(pi); |
|
|
|
create_unix_receiver(pi); |
|
|
|
|
|
|
|
|
|
|
|