|
|
@ -83,12 +83,18 @@ struct connector_data { |
|
|
|
/* Linked list of dead clients no longer in use but may still have references */ |
|
|
|
/* Linked list of dead clients no longer in use but may still have references */ |
|
|
|
client_instance_t *dead_clients; |
|
|
|
client_instance_t *dead_clients; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int clients_generated; |
|
|
|
|
|
|
|
int dead_generated; |
|
|
|
|
|
|
|
|
|
|
|
int64_t client_id; |
|
|
|
int64_t client_id; |
|
|
|
|
|
|
|
|
|
|
|
/* For the linked list of pending sends */ |
|
|
|
/* For the linked list of pending sends */ |
|
|
|
sender_send_t *sender_sends; |
|
|
|
sender_send_t *sender_sends; |
|
|
|
sender_send_t *delayed_sends; |
|
|
|
sender_send_t *delayed_sends; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t sends_generated; |
|
|
|
|
|
|
|
int64_t sends_delayed; |
|
|
|
|
|
|
|
|
|
|
|
/* For protecting the pending sends list */ |
|
|
|
/* For protecting the pending sends list */ |
|
|
|
pthread_mutex_t sender_lock; |
|
|
|
pthread_mutex_t sender_lock; |
|
|
|
pthread_cond_t sender_cond; |
|
|
|
pthread_cond_t sender_cond; |
|
|
@ -194,6 +200,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) |
|
|
|
__inc_instance_ref(client); |
|
|
|
__inc_instance_ref(client); |
|
|
|
|
|
|
|
|
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
ck_wlock(&cdata->lock); |
|
|
|
|
|
|
|
cdata->clients_generated++; |
|
|
|
client->id = cdata->client_id++; |
|
|
|
client->id = cdata->client_id++; |
|
|
|
HASH_ADD_I64(cdata->clients, id, client); |
|
|
|
HASH_ADD_I64(cdata->clients, id, client); |
|
|
|
cdata->nfds++; |
|
|
|
cdata->nfds++; |
|
|
@ -216,6 +223,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) |
|
|
|
/* This is the reference to this client's presence in the
|
|
|
|
/* This is the reference to this client's presence in the
|
|
|
|
* epoll list. */ |
|
|
|
* epoll list. */ |
|
|
|
__dec_instance_ref(client); |
|
|
|
__dec_instance_ref(client); |
|
|
|
|
|
|
|
cdata->dead_generated++; |
|
|
|
} |
|
|
|
} |
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
ck_wunlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
|
@ -495,6 +503,7 @@ void *sender(void *arg) |
|
|
|
* This is the only function that alters it so no |
|
|
|
* This is the only function that alters it so no |
|
|
|
* locking is required. Keep the client ref. */ |
|
|
|
* locking is required. Keep the client ref. */ |
|
|
|
DL_APPEND(cdata->delayed_sends, sender_send); |
|
|
|
DL_APPEND(cdata->delayed_sends, sender_send); |
|
|
|
|
|
|
|
cdata->sends_delayed++; |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
sent = true; |
|
|
|
sent = true; |
|
|
@ -568,6 +577,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) |
|
|
|
sender_send->len = len; |
|
|
|
sender_send->len = len; |
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
|
|
|
cdata->sends_generated++; |
|
|
|
DL_APPEND(cdata->sender_sends, sender_send); |
|
|
|
DL_APPEND(cdata->sender_sends, sender_send); |
|
|
|
pthread_cond_signal(&cdata->sender_cond); |
|
|
|
pthread_cond_signal(&cdata->sender_cond); |
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
@ -627,6 +637,67 @@ static void process_client_msg(cdata_t *cdata, const char *buf) |
|
|
|
json_decref(json_msg); |
|
|
|
json_decref(json_msg); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static char *connector_stats(cdata_t *cdata) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
json_t *val = json_object(), *subval; |
|
|
|
|
|
|
|
client_instance_t *client; |
|
|
|
|
|
|
|
int objects, generated; |
|
|
|
|
|
|
|
sender_send_t *send; |
|
|
|
|
|
|
|
int64_t memsize; |
|
|
|
|
|
|
|
char *buf; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
|
|
|
|
objects = HASH_COUNT(cdata->clients); |
|
|
|
|
|
|
|
memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; |
|
|
|
|
|
|
|
generated = cdata->clients_generated; |
|
|
|
|
|
|
|
ck_runlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "clients", subval); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ck_rlock(&cdata->lock); |
|
|
|
|
|
|
|
DL_COUNT(cdata->dead_clients, client, objects); |
|
|
|
|
|
|
|
generated = cdata->dead_generated; |
|
|
|
|
|
|
|
ck_runlock(&cdata->lock); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
memsize = objects * sizeof(client_instance_t); |
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "dead", subval); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
objects = 0; |
|
|
|
|
|
|
|
memsize = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
|
|
|
generated = cdata->sends_generated; |
|
|
|
|
|
|
|
DL_FOREACH(cdata->sender_sends, send) { |
|
|
|
|
|
|
|
objects++; |
|
|
|
|
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "sends", subval); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
objects = 0; |
|
|
|
|
|
|
|
memsize = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mutex_lock(&cdata->sender_lock); |
|
|
|
|
|
|
|
generated = cdata->sends_delayed; |
|
|
|
|
|
|
|
DL_FOREACH(cdata->delayed_sends, send) { |
|
|
|
|
|
|
|
objects++; |
|
|
|
|
|
|
|
memsize += sizeof(sender_send_t) + send->len + 1; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
mutex_unlock(&cdata->sender_lock); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); |
|
|
|
|
|
|
|
json_set_object(val, "delays", subval); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); |
|
|
|
|
|
|
|
json_decref(val); |
|
|
|
|
|
|
|
LOGNOTICE("Connector stats: %s", buf); |
|
|
|
|
|
|
|
return buf; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) |
|
|
|
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) |
|
|
|
{ |
|
|
|
{ |
|
|
|
int sockd = -1, ret = 0, selret; |
|
|
|
int sockd = -1, ret = 0, selret; |
|
|
@ -709,6 +780,12 @@ retry: |
|
|
|
} else if (cmdmatch(buf, "reject")) { |
|
|
|
} else if (cmdmatch(buf, "reject")) { |
|
|
|
LOGDEBUG("Connector received reject signal"); |
|
|
|
LOGDEBUG("Connector received reject signal"); |
|
|
|
cdata->accept = false; |
|
|
|
cdata->accept = false; |
|
|
|
|
|
|
|
} else if (cmdmatch(buf, "stats")) { |
|
|
|
|
|
|
|
char *msg; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGDEBUG("Connector received stats request"); |
|
|
|
|
|
|
|
msg = connector_stats(cdata); |
|
|
|
|
|
|
|
send_unix_msg(sockd, msg); |
|
|
|
} else if (cmdmatch(buf, "loglevel")) { |
|
|
|
} else if (cmdmatch(buf, "loglevel")) { |
|
|
|
sscanf(buf, "loglevel=%d", &ckp->loglevel); |
|
|
|
sscanf(buf, "loglevel=%d", &ckp->loglevel); |
|
|
|
} else if (cmdmatch(buf, "shutdown")) { |
|
|
|
} else if (cmdmatch(buf, "shutdown")) { |
|
|
|