|
|
|
@ -643,8 +643,9 @@ static void *sender(void *arg)
|
|
|
|
|
|
|
|
|
|
/* Send a client by id a heap allocated buffer, allowing this function to
|
|
|
|
|
* free the ram. */ |
|
|
|
|
static void send_client(cdata_t *cdata, int64_t id, char *buf) |
|
|
|
|
static void send_client(cdata_t *cdata, const int64_t id, char *buf) |
|
|
|
|
{ |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
sender_send_t *sender_send; |
|
|
|
|
client_instance_t *client; |
|
|
|
|
int len; |
|
|
|
@ -661,16 +662,36 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Grab a reference to this client until the sender_send has
|
|
|
|
|
* completed processing. */ |
|
|
|
|
* completed processing. Is this a passthrough subclient ? */ |
|
|
|
|
if (id > 0xffffffffll) { |
|
|
|
|
int64_t client_id, pass_id; |
|
|
|
|
|
|
|
|
|
client_id = id & 0xffffffffll; |
|
|
|
|
pass_id = id >> 32; |
|
|
|
|
/* Make sure the passthrough exists for passthrough subclients */ |
|
|
|
|
client = ref_client_by_id(cdata, pass_id); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", |
|
|
|
|
pass_id, client_id); |
|
|
|
|
/* Now see if the subclient exists */ |
|
|
|
|
client = ref_client_by_id(cdata, client_id); |
|
|
|
|
if (client) { |
|
|
|
|
invalidate_client(ckp, cdata, client); |
|
|
|
|
dec_instance_ref(cdata, client); |
|
|
|
|
} else |
|
|
|
|
stratifier_drop_id(ckp, id); |
|
|
|
|
free(buf); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
client = ref_client_by_id(cdata, id); |
|
|
|
|
if (unlikely(!client)) { |
|
|
|
|
ckpool_t *ckp = cdata->ckp; |
|
|
|
|
|
|
|
|
|
LOGINFO("Connector failed to find client id %"PRId64" to send to", id); |
|
|
|
|
stratifier_drop_id(ckp, id); |
|
|
|
|
free(buf); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sender_send = ckzalloc(sizeof(sender_send_t)); |
|
|
|
|
sender_send->client = client; |
|
|
|
@ -696,7 +717,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
|
|
|
|
|
|
|
|
|
|
static void process_client_msg(cdata_t *cdata, const char *buf) |
|
|
|
|
{ |
|
|
|
|
int64_t client_id64, client_id; |
|
|
|
|
int64_t client_id; |
|
|
|
|
json_t *json_msg; |
|
|
|
|
char *msg; |
|
|
|
|
|
|
|
|
@ -707,16 +728,12 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Extract the client id from the json message and remove its entry */ |
|
|
|
|
client_id64 = json_integer_value(json_object_get(json_msg, "client_id")); |
|
|
|
|
client_id = json_integer_value(json_object_get(json_msg, "client_id")); |
|
|
|
|
json_object_del(json_msg, "client_id"); |
|
|
|
|
if (client_id64 > 0xffffffffll) { |
|
|
|
|
int64_t passthrough_id; |
|
|
|
|
|
|
|
|
|
passthrough_id = client_id64 & 0xffffffffll; |
|
|
|
|
client_id = client_id64 >> 32; |
|
|
|
|
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); |
|
|
|
|
} else |
|
|
|
|
client_id = client_id64; |
|
|
|
|
/* Put client_id back in for a passthrough subclient, passing its
|
|
|
|
|
* upstream client_id instead of the passthrough's. */ |
|
|
|
|
if (client_id > 0xffffffffll) |
|
|
|
|
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); |
|
|
|
|
msg = json_dumps(json_msg, JSON_EOL); |
|
|
|
|
send_client(cdata, client_id, msg); |
|
|
|
|
json_decref(json_msg); |
|
|
|
|