A framework for building convergent cross-platform Nextcloud clients using Flutter.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

467 lines
16 KiB

part of '../../neon_spreed.dart';
abstract class SpreedCallBlocEvents {
Future<void> leaveCall();
// ignore: avoid_positional_boolean_parameters
void changeAudio(final bool enabled);
// ignore: avoid_positional_boolean_parameters
void changeVideo(final bool enabled);
// ignore: avoid_positional_boolean_parameters
void changeScreen(final bool enabled);
}
abstract class SpreedCallBlocStates {
BehaviorSubject<List<SpreedRemoteCallParticipant>> get remoteParticipants;
BehaviorSubject<bool> get audioEnabled;
BehaviorSubject<bool> get videoEnabled;
BehaviorSubject<bool> get screenEnabled;
}
class SpreedCallBloc extends InteractiveBloc implements SpreedCallBlocEvents, SpreedCallBlocStates {
SpreedCallBloc(
this._settings,
this._client,
this._roomToken,
this._sessionID,
) {
unawaited(_setupLocalParticipant().then((final _) => refresh()));
}
final spreed.SignalingSettings _settings;
final NextcloudClient _client;
final String _roomToken;
final String _sessionID;
var _listeningSignalingMessages = false;
late SpreedLocalCallParticipant localParticipant;
@override
void dispose() {
_listeningSignalingMessages = false;
remoteParticipants.valueOrNull?.forEach((final participant) => participant.dispose());
unawaited(remoteParticipants.close());
unawaited(audioEnabled.close());
unawaited(videoEnabled.close());
unawaited(screenEnabled.close());
super.dispose();
}
@override
BehaviorSubject<List<SpreedRemoteCallParticipant>> remoteParticipants = BehaviorSubject();
@override
BehaviorSubject<bool> audioEnabled = BehaviorSubject.seeded(false);
@override
BehaviorSubject<bool> videoEnabled = BehaviorSubject.seeded(false);
@override
BehaviorSubject<bool> screenEnabled = BehaviorSubject.seeded(false);
@override
Future<void> refresh() async {
try {
await _client.spreed.call.joinCall(token: _roomToken);
_listenForSignalingMessages();
} on Exception catch (e, s) {
debugPrint(e.toString());
debugPrint(s.toString());
addError(e);
}
}
@override
Future<void> leaveCall() async {
try {
await _client.spreed.call.leaveCall(token: _roomToken);
} on Exception catch (e, s) {
debugPrint(e.toString());
debugPrint(s.toString());
addError(e);
}
}
@override
// ignore: avoid_void_async
void changeAudio(final bool enabled) async {
audioEnabled.add(enabled);
await _updateLocalParticipant();
}
@override
// ignore: avoid_void_async
void changeVideo(final bool enabled) async {
videoEnabled.add(enabled);
await _updateLocalParticipant();
}
@override
void changeScreen(final bool enabled) {
screenEnabled.add(enabled);
}
Future<void> _setupLocalParticipant() async {
final stream = await navigator.mediaDevices.getUserMedia({
'audio': true,
'video': true,
});
for (final track in stream.getTracks()) {
track.enabled = false;
}
final renderer = await _getInitializedRenderer();
renderer.srcObject = stream;
localParticipant = SpreedLocalCallParticipant(
_settings.userId!,
_sessionID,
renderer,
stream,
);
}
Future<void> _sendSignalingMessages(final List<spreed.SignalingMessage> messages) async {
for (final message in messages) {
// TODO: Send all messages at once, needs to send it over the body and not the URL, because that gets too long
try {
await _client.spreed.signaling.sendMessages(
token: _roomToken,
messages: ContentString(
(final b) => b
..content = BuiltList([
spreed.SignalingSendMessagesMessages(
(final b) => b
..fn = ContentString<spreed.SignalingMessage>(
(final b) => b..content = message,
).toBuilder()
..sessionId = _sessionID,
),
]),
),
);
} on Exception catch (e, s) {
debugPrint(e.toString());
debugPrint(s.toString());
addError(e);
}
}
}
SpreedRemoteCallParticipant? _getRemoteParticipant(final String sessionID) {
final remoteParticipantMatches =
remoteParticipants.value.where((final participant) => participant.sessionID == sessionID);
if (remoteParticipantMatches.length == 1) {
return remoteParticipantMatches.single;
}
return null;
}
Future<void> _updateRemoteParticipant(
final String sessionID,
final Future<SpreedRemoteCallParticipant> Function(SpreedRemoteCallParticipant) call,
) async {
final updatedRemoteParticipants = <SpreedRemoteCallParticipant>[];
for (final remoteParticipant in remoteParticipants.value) {
if (remoteParticipant.sessionID == sessionID) {
updatedRemoteParticipants.add(await call(remoteParticipant));
} else {
updatedRemoteParticipants.add(remoteParticipant);
}
}
remoteParticipants.add(updatedRemoteParticipants);
}
Stream<List<spreed.SignalingData>> _pullSignalingMessages() async* {
while (_listeningSignalingMessages) {
try {
yield (await _client.spreed.signaling.pullMessages(token: _roomToken)).body.ocs.data.toList();
} on Exception catch (e, s) {
if (e is DynamiteApiException && e.statusCode >= 500) {
continue;
}
debugPrint(e.toString());
debugPrint(s.toString());
addError(e);
}
}
}
Future<void> _updateLocalParticipant() async {
if (localParticipant.stream != null) {
for (final track in localParticipant.stream!.getTracks()) {
switch (track.kind) {
case 'video':
track.enabled = videoEnabled.value;
case 'audio':
track.enabled = audioEnabled.value;
default:
debugPrint('Unknown track kind ${track.kind}');
}
}
}
await _sendSignalingMessages(_generateMuteMessages(remoteParticipants.value));
}
List<spreed.SignalingMessage> _generateMuteMessages(final List<SpreedRemoteCallParticipant> participants) => [
for (final remoteParticipant in participants) ...[
for (final entry in {
spreed.SignalingMuteMessage_Payload_Name.audio: audioEnabled.value,
spreed.SignalingMuteMessage_Payload_Name.video: videoEnabled.value,
}.entries) ...[
spreed.SignalingMessage(
(final b) => b
..signalingMuteMessage = spreed.SignalingMuteMessage(
(final b) => b
..from = _sessionID
..to = remoteParticipant.sessionID
..type = entry.value ? spreed.SignalingMessageType.unmute : spreed.SignalingMessageType.mute
..payload = spreed.SignalingMuteMessage_Payload(
(final b) => b.name = entry.key,
).toBuilder(),
).toBuilder(),
),
],
],
];
bool _isWeakerParticipant(final SpreedRemoteCallParticipant remoteParticipant) =>
_sessionID.compareTo(remoteParticipant.sessionID) > 0;
Future<void> _sendOffer(final SpreedRemoteCallParticipant remoteParticipant) async {
debugPrint('Sending offer to ${remoteParticipant.userID} ${remoteParticipant.sessionID}');
// TODO: For now this is disabled, because sending long or many signaling messages is broken.
//return;
final connection = await _setupConnection(remoteParticipant);
final localSDP = await connection.createOffer();
await connection.setLocalDescription(localSDP);
await _sendSignalingMessages([
spreed.SignalingMessage(
(final b) => b
..signalingSessionDescriptionMessage = spreed.SignalingSessionDescriptionMessage(
(final b) => b
..from = _sessionID
..to = remoteParticipant.sessionID
..type = spreed.SignalingMessageType.offer
..payload = spreed.SignalingSessionDescriptionMessage_Payload(
(final b) => b
..type = spreed.SignalingSessionDescriptionMessage_Payload_Type.offer
..sdp = localSDP.sdp
..nick = '',
).toBuilder(),
).toBuilder(),
),
..._generateMuteMessages([remoteParticipant]),
]);
}
Future<RTCPeerConnection> _setupConnection(final SpreedRemoteCallParticipant remoteParticipant) async {
final connection = await createPeerConnection(
{
'sdpSemantics': 'unified-plan',
'iceServers': [
..._settings.stunservers.map((final s) => s.toJson()),
..._settings.turnservers.map((final s) => s.toJson()),
],
},
);
connection
..onTrack = (final event) async {
if (event.track.kind == 'video') {
final stream = event.streams.first;
final renderer = await _getInitializedRenderer();
renderer.srcObject = stream;
await _updateRemoteParticipant(
remoteParticipant.sessionID,
(final remoteParticipant) async => remoteParticipant
..renderer = renderer
..stream = stream,
);
}
}
..onIceCandidate = (final candidate) async {
await _sendSignalingMessages([
spreed.SignalingMessage(
(final b) => b
..signalingICECandidateMessage = spreed.SignalingICECandidateMessage(
(final b) => b
..from = _sessionID
..to = remoteParticipant.sessionID
..type = spreed.SignalingMessageType.answer
..payload = spreed.SignalingICECandidateMessage_Payload(
(final b) => b
..candidate = spreed.SignalingICECandidateMessage_Payload_Candidate(
(final b) => b
..candidate = candidate.candidate
..sdpMid = candidate.sdpMid
..sdpMLineIndex = candidate.sdpMLineIndex,
).toBuilder(),
).toBuilder(),
).toBuilder(),
),
]);
}
..onIceGatheringState = print
..onIceConnectionState = print
..onConnectionState = print;
await remoteParticipant.acceptNewConnection(connection);
await remoteParticipant.acceptNewLocalStream(localParticipant.stream);
return connection;
}
void _listenForSignalingMessages() {
if (_listeningSignalingMessages) {
return;
}
_listeningSignalingMessages = true;
_pullSignalingMessages().listen((final messages) async {
for (final message in messages) {
if (!_listeningSignalingMessages) {
return;
}
if (message.signalingSessions != null) {
final users = message.signalingSessions!.data.where(
(final user) =>
spreed.ParticipantInCallFlag.values.byBinary(user.inCall).contains(spreed.ParticipantInCallFlag.inCall),
);
final currentParticipants = remoteParticipants.valueOrNull ?? [];
final updatedParticipants = <SpreedRemoteCallParticipant>[];
for (final currentParticipant in currentParticipants) {
if (users.where((final user) => user.userId == currentParticipant.userID).isNotEmpty) {
updatedParticipants.add(currentParticipant);
} else {
currentParticipant.dispose();
}
}
for (final user in users) {
if (currentParticipants
.where((final currentParticipant) => user.userId == currentParticipant.userID)
.isEmpty &&
user.sessionId != _sessionID) {
final remoteParticipant = SpreedRemoteCallParticipant(
user.userId,
user.sessionId,
null,
null,
null,
null,
);
if (_isWeakerParticipant(remoteParticipant)) {
await _sendOffer(remoteParticipant);
}
updatedParticipants.add(remoteParticipant);
}
}
remoteParticipants.add(updatedParticipants);
continue;
}
if (message.signalingMessageWrapper != null) {
final signalingMessage = message.signalingMessageWrapper!.data.content;
if (signalingMessage.signalingSessionDescriptionMessage != null) {
final remoteSDP = signalingMessage.signalingSessionDescriptionMessage!;
await _updateRemoteParticipant(remoteSDP.from, (final remoteParticipant) async {
switch (remoteSDP.payload.type) {
case spreed.SignalingSessionDescriptionMessage_Payload_Type.offer:
debugPrint('Received offer from ${remoteParticipant.userID} ${remoteParticipant.sessionID}');
final connection = await _setupConnection(remoteParticipant);
await connection.setRemoteDescription(
RTCSessionDescription(
remoteSDP.payload.sdp,
'offer',
),
);
final localSDP = await connection.createAnswer();
await connection.setLocalDescription(localSDP);
await _sendSignalingMessages([
spreed.SignalingMessage(
(final b) => b
..signalingSessionDescriptionMessage = spreed.SignalingSessionDescriptionMessage(
(final b) => b
..from = _sessionID
..to = remoteParticipant.sessionID
..type = spreed.SignalingMessageType.answer
..payload = spreed.SignalingSessionDescriptionMessage_Payload(
(final b) => b
..type = spreed.SignalingSessionDescriptionMessage_Payload_Type.answer
..sdp = localSDP.sdp
..nick = '',
).toBuilder(),
).toBuilder(),
),
..._generateMuteMessages([remoteParticipant]),
]);
case spreed.SignalingSessionDescriptionMessage_Payload_Type.answer:
debugPrint('Received answer from ${remoteParticipant.userID} ${remoteParticipant.sessionID}');
}
return remoteParticipant;
});
continue;
}
if (signalingMessage.signalingICECandidateMessage != null) {
final iceCandidateMessage = signalingMessage.signalingICECandidateMessage!;
final remoteParticipant = _getRemoteParticipant(iceCandidateMessage.from);
if (remoteParticipant == null) {
continue;
}
if (iceCandidateMessage.payload.candidate.candidate.isEmpty) {
// TODO: Handle end-of-candidates properly
continue;
}
await remoteParticipant.addCandidate(
RTCIceCandidate(
iceCandidateMessage.payload.candidate.candidate,
iceCandidateMessage.payload.candidate.sdpMid,
iceCandidateMessage.payload.candidate.sdpMLineIndex,
),
);
continue;
}
if (signalingMessage.signalingMuteMessage != null) {
final muteMessage = signalingMessage.signalingMuteMessage!;
await _updateRemoteParticipant(muteMessage.from, (final remoteParticipant) async {
final isUnmute = muteMessage.type == spreed.SignalingMessageType.unmute;
switch (muteMessage.payload.name) {
case spreed.SignalingMuteMessage_Payload_Name.audio:
remoteParticipant.audioEnabled = isUnmute;
case spreed.SignalingMuteMessage_Payload_Name.video:
remoteParticipant.videoEnabled = isUnmute;
}
return remoteParticipant;
});
continue;
}
}
debugPrint('Unknown signaling message ${message.toJson()}');
}
});
}
}
Future<RTCVideoRenderer> _getInitializedRenderer() async {
final renderer = RTCVideoRenderer();
await renderer.initialize();
return renderer;
}