|
|
@ -102,6 +102,8 @@ struct connector_data { |
|
|
|
|
|
|
|
|
|
|
|
int64_t sends_generated; |
|
|
|
int64_t sends_generated; |
|
|
|
int64_t sends_delayed; |
|
|
|
int64_t sends_delayed; |
|
|
|
|
|
|
|
int64_t sends_queued; |
|
|
|
|
|
|
|
int64_t sends_size; |
|
|
|
|
|
|
|
|
|
|
|
/* For protecting the pending sends list */ |
|
|
|
/* For protecting the pending sends list */ |
|
|
|
mutex_t sender_lock; |
|
|
|
mutex_t sender_lock; |
|
|
@ -590,6 +592,7 @@ static void *sender(void *arg) |
|
|
|
rename_proc("csender"); |
|
|
|
rename_proc("csender"); |
|
|
|
|
|
|
|
|
|
|
|
while (42) { |
|
|
|
while (42) { |
|
|
|
|
|
|
|
int64_t sends_queued = 0, sends_size = 0; |
|
|
|
sender_send_t *sending, *tmp; |
|
|
|
sender_send_t *sending, *tmp; |
|
|
|
|
|
|
|
|
|
|
|
/* Check all sends to see if they can be written out */ |
|
|
|
/* Check all sends to see if they can be written out */ |
|
|
@ -597,11 +600,16 @@ static void *sender(void *arg) |
|
|
|
if (send_sender_send(ckp, cdata, sending)) { |
|
|
|
if (send_sender_send(ckp, cdata, sending)) { |
|
|
|
DL_DELETE(sends, sending); |
|
|
|
DL_DELETE(sends, sending); |
|
|
|
clear_sender_send(sending, cdata); |
|
|
|
clear_sender_send(sending, cdata); |
|
|
|
} else |
|
|
|
} else { |
|
|
|
cdata->sends_delayed++; |
|
|
|
sends_queued++; |
|
|
|
|
|
|
|
sends_size += sizeof(sender_send_t) + sending->len + 1; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
|
|
|
cdata->sends_delayed += sends_queued; |
|
|
|
|
|
|
|
cdata->sends_queued = sends_queued; |
|
|
|
|
|
|
|
cdata->sends_size = sends_size; |
|
|
|
/* Poll every 10ms if there are no new sends. */ |
|
|
|
/* Poll every 10ms if there are no new sends. */ |
|
|
|
if (!cdata->sender_sends) { |
|
|
|
if (!cdata->sender_sends) { |
|
|
|
const ts_t polltime = {0, 10000000}; |
|
|
|
const ts_t polltime = {0, 10000000}; |
|
|
@ -751,21 +759,16 @@ static char *connector_stats(cdata_t *cdata) |
|
|
|
memsize = 0; |
|
|
|
memsize = 0; |
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
generated = cdata->sends_generated; |
|
|
|
|
|
|
|
DL_FOREACH(cdata->sender_sends, send) { |
|
|
|
DL_FOREACH(cdata->sender_sends, send) { |
|
|
|
objects++; |
|
|
|
objects++; |
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); |
|
|
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "sends", subval); |
|
|
|
json_set_object(val, "sends", subval); |
|
|
|
|
|
|
|
|
|
|
|
objects = 0; |
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); |
|
|
|
memsize = 0; |
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
|
|
generated = cdata->sends_delayed; |
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si}", "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "delays", subval); |
|
|
|
json_set_object(val, "delays", subval); |
|
|
|
|
|
|
|
|
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|