You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
466 lines
16 KiB
466 lines
16 KiB
part of '../../neon_spreed.dart'; |
|
|
|
abstract class SpreedCallBlocEvents { |
|
Future<void> leaveCall(); |
|
|
|
// ignore: avoid_positional_boolean_parameters |
|
void changeAudio(final bool enabled); |
|
|
|
// ignore: avoid_positional_boolean_parameters |
|
void changeVideo(final bool enabled); |
|
|
|
// ignore: avoid_positional_boolean_parameters |
|
void changeScreen(final bool enabled); |
|
} |
|
|
|
abstract class SpreedCallBlocStates { |
|
BehaviorSubject<List<SpreedRemoteCallParticipant>> get remoteParticipants; |
|
|
|
BehaviorSubject<bool> get audioEnabled; |
|
|
|
BehaviorSubject<bool> get videoEnabled; |
|
|
|
BehaviorSubject<bool> get screenEnabled; |
|
} |
|
|
|
class SpreedCallBloc extends InteractiveBloc implements SpreedCallBlocEvents, SpreedCallBlocStates { |
|
SpreedCallBloc( |
|
this._settings, |
|
this._client, |
|
this._roomToken, |
|
this._sessionID, |
|
) { |
|
unawaited(_setupLocalParticipant().then((final _) => refresh())); |
|
} |
|
|
|
final spreed.SignalingSettings _settings; |
|
final NextcloudClient _client; |
|
final String _roomToken; |
|
final String _sessionID; |
|
|
|
var _listeningSignalingMessages = false; |
|
late SpreedLocalCallParticipant localParticipant; |
|
|
|
@override |
|
void dispose() { |
|
_listeningSignalingMessages = false; |
|
remoteParticipants.valueOrNull?.forEach((final participant) => participant.dispose()); |
|
unawaited(remoteParticipants.close()); |
|
unawaited(audioEnabled.close()); |
|
unawaited(videoEnabled.close()); |
|
unawaited(screenEnabled.close()); |
|
super.dispose(); |
|
} |
|
|
|
@override |
|
BehaviorSubject<List<SpreedRemoteCallParticipant>> remoteParticipants = BehaviorSubject(); |
|
|
|
@override |
|
BehaviorSubject<bool> audioEnabled = BehaviorSubject.seeded(false); |
|
|
|
@override |
|
BehaviorSubject<bool> videoEnabled = BehaviorSubject.seeded(false); |
|
|
|
@override |
|
BehaviorSubject<bool> screenEnabled = BehaviorSubject.seeded(false); |
|
|
|
@override |
|
Future<void> refresh() async { |
|
try { |
|
await _client.spreed.call.joinCall(token: _roomToken); |
|
_listenForSignalingMessages(); |
|
} on Exception catch (e, s) { |
|
debugPrint(e.toString()); |
|
debugPrint(s.toString()); |
|
addError(e); |
|
} |
|
} |
|
|
|
@override |
|
Future<void> leaveCall() async { |
|
try { |
|
await _client.spreed.call.leaveCall(token: _roomToken); |
|
} on Exception catch (e, s) { |
|
debugPrint(e.toString()); |
|
debugPrint(s.toString()); |
|
addError(e); |
|
} |
|
} |
|
|
|
@override |
|
// ignore: avoid_void_async |
|
void changeAudio(final bool enabled) async { |
|
audioEnabled.add(enabled); |
|
await _updateLocalParticipant(); |
|
} |
|
|
|
@override |
|
// ignore: avoid_void_async |
|
void changeVideo(final bool enabled) async { |
|
videoEnabled.add(enabled); |
|
await _updateLocalParticipant(); |
|
} |
|
|
|
@override |
|
void changeScreen(final bool enabled) { |
|
screenEnabled.add(enabled); |
|
} |
|
|
|
Future<void> _setupLocalParticipant() async { |
|
final stream = await navigator.mediaDevices.getUserMedia({ |
|
'audio': true, |
|
'video': true, |
|
}); |
|
for (final track in stream.getTracks()) { |
|
track.enabled = false; |
|
} |
|
final renderer = await _getInitializedRenderer(); |
|
renderer.srcObject = stream; |
|
localParticipant = SpreedLocalCallParticipant( |
|
_settings.userId!, |
|
_sessionID, |
|
renderer, |
|
stream, |
|
); |
|
} |
|
|
|
Future<void> _sendSignalingMessages(final List<spreed.SignalingMessage> messages) async { |
|
for (final message in messages) { |
|
// TODO: Send all messages at once, needs to send it over the body and not the URL, because that gets too long |
|
try { |
|
await _client.spreed.signaling.sendMessages( |
|
token: _roomToken, |
|
messages: ContentString( |
|
(final b) => b |
|
..content = BuiltList([ |
|
spreed.SignalingSendMessagesMessages( |
|
(final b) => b |
|
..fn = ContentString<spreed.SignalingMessage>( |
|
(final b) => b..content = message, |
|
).toBuilder() |
|
..sessionId = _sessionID, |
|
), |
|
]), |
|
), |
|
); |
|
} on Exception catch (e, s) { |
|
debugPrint(e.toString()); |
|
debugPrint(s.toString()); |
|
addError(e); |
|
} |
|
} |
|
} |
|
|
|
SpreedRemoteCallParticipant? _getRemoteParticipant(final String sessionID) { |
|
final remoteParticipantMatches = |
|
remoteParticipants.value.where((final participant) => participant.sessionID == sessionID); |
|
if (remoteParticipantMatches.length == 1) { |
|
return remoteParticipantMatches.single; |
|
} |
|
return null; |
|
} |
|
|
|
Future<void> _updateRemoteParticipant( |
|
final String sessionID, |
|
final Future<SpreedRemoteCallParticipant> Function(SpreedRemoteCallParticipant) call, |
|
) async { |
|
final updatedRemoteParticipants = <SpreedRemoteCallParticipant>[]; |
|
for (final remoteParticipant in remoteParticipants.value) { |
|
if (remoteParticipant.sessionID == sessionID) { |
|
updatedRemoteParticipants.add(await call(remoteParticipant)); |
|
} else { |
|
updatedRemoteParticipants.add(remoteParticipant); |
|
} |
|
} |
|
remoteParticipants.add(updatedRemoteParticipants); |
|
} |
|
|
|
Stream<List<spreed.SignalingData>> _pullSignalingMessages() async* { |
|
while (_listeningSignalingMessages) { |
|
try { |
|
yield (await _client.spreed.signaling.pullMessages(token: _roomToken)).body.ocs.data.toList(); |
|
} on Exception catch (e, s) { |
|
if (e is DynamiteApiException && e.statusCode >= 500) { |
|
continue; |
|
} |
|
debugPrint(e.toString()); |
|
debugPrint(s.toString()); |
|
addError(e); |
|
} |
|
} |
|
} |
|
|
|
Future<void> _updateLocalParticipant() async { |
|
if (localParticipant.stream != null) { |
|
for (final track in localParticipant.stream!.getTracks()) { |
|
switch (track.kind) { |
|
case 'video': |
|
track.enabled = videoEnabled.value; |
|
case 'audio': |
|
track.enabled = audioEnabled.value; |
|
default: |
|
debugPrint('Unknown track kind ${track.kind}'); |
|
} |
|
} |
|
} |
|
|
|
await _sendSignalingMessages(_generateMuteMessages(remoteParticipants.value)); |
|
} |
|
|
|
List<spreed.SignalingMessage> _generateMuteMessages(final List<SpreedRemoteCallParticipant> participants) => [ |
|
for (final remoteParticipant in participants) ...[ |
|
for (final entry in { |
|
spreed.SignalingMuteMessage_Payload_Name.audio: audioEnabled.value, |
|
spreed.SignalingMuteMessage_Payload_Name.video: videoEnabled.value, |
|
}.entries) ...[ |
|
spreed.SignalingMessage( |
|
(final b) => b |
|
..signalingMuteMessage = spreed.SignalingMuteMessage( |
|
(final b) => b |
|
..from = _sessionID |
|
..to = remoteParticipant.sessionID |
|
..type = entry.value ? spreed.SignalingMessageType.unmute : spreed.SignalingMessageType.mute |
|
..payload = spreed.SignalingMuteMessage_Payload( |
|
(final b) => b.name = entry.key, |
|
).toBuilder(), |
|
).toBuilder(), |
|
), |
|
], |
|
], |
|
]; |
|
|
|
bool _isWeakerParticipant(final SpreedRemoteCallParticipant remoteParticipant) => |
|
_sessionID.compareTo(remoteParticipant.sessionID) > 0; |
|
|
|
Future<void> _sendOffer(final SpreedRemoteCallParticipant remoteParticipant) async { |
|
debugPrint('Sending offer to ${remoteParticipant.userID} ${remoteParticipant.sessionID}'); |
|
// TODO: For now this is disabled, because sending long or many signaling messages is broken. |
|
//return; |
|
final connection = await _setupConnection(remoteParticipant); |
|
final localSDP = await connection.createOffer(); |
|
await connection.setLocalDescription(localSDP); |
|
await _sendSignalingMessages([ |
|
spreed.SignalingMessage( |
|
(final b) => b |
|
..signalingSessionDescriptionMessage = spreed.SignalingSessionDescriptionMessage( |
|
(final b) => b |
|
..from = _sessionID |
|
..to = remoteParticipant.sessionID |
|
..type = spreed.SignalingMessageType.offer |
|
..payload = spreed.SignalingSessionDescriptionMessage_Payload( |
|
(final b) => b |
|
..type = spreed.SignalingSessionDescriptionMessage_Payload_Type.offer |
|
..sdp = localSDP.sdp |
|
..nick = '', |
|
).toBuilder(), |
|
).toBuilder(), |
|
), |
|
..._generateMuteMessages([remoteParticipant]), |
|
]); |
|
} |
|
|
|
Future<RTCPeerConnection> _setupConnection(final SpreedRemoteCallParticipant remoteParticipant) async { |
|
final connection = await createPeerConnection( |
|
{ |
|
'sdpSemantics': 'unified-plan', |
|
'iceServers': [ |
|
..._settings.stunservers.map((final s) => s.toJson()), |
|
..._settings.turnservers.map((final s) => s.toJson()), |
|
], |
|
}, |
|
); |
|
connection |
|
..onTrack = (final event) async { |
|
if (event.track.kind == 'video') { |
|
final stream = event.streams.first; |
|
final renderer = await _getInitializedRenderer(); |
|
renderer.srcObject = stream; |
|
await _updateRemoteParticipant( |
|
remoteParticipant.sessionID, |
|
(final remoteParticipant) async => remoteParticipant |
|
..renderer = renderer |
|
..stream = stream, |
|
); |
|
} |
|
} |
|
..onIceCandidate = (final candidate) async { |
|
await _sendSignalingMessages([ |
|
spreed.SignalingMessage( |
|
(final b) => b |
|
..signalingICECandidateMessage = spreed.SignalingICECandidateMessage( |
|
(final b) => b |
|
..from = _sessionID |
|
..to = remoteParticipant.sessionID |
|
..type = spreed.SignalingMessageType.answer |
|
..payload = spreed.SignalingICECandidateMessage_Payload( |
|
(final b) => b |
|
..candidate = spreed.SignalingICECandidateMessage_Payload_Candidate( |
|
(final b) => b |
|
..candidate = candidate.candidate |
|
..sdpMid = candidate.sdpMid |
|
..sdpMLineIndex = candidate.sdpMLineIndex, |
|
).toBuilder(), |
|
).toBuilder(), |
|
).toBuilder(), |
|
), |
|
]); |
|
} |
|
..onIceGatheringState = print |
|
..onIceConnectionState = print |
|
..onConnectionState = print; |
|
await remoteParticipant.acceptNewConnection(connection); |
|
await remoteParticipant.acceptNewLocalStream(localParticipant.stream); |
|
|
|
return connection; |
|
} |
|
|
|
void _listenForSignalingMessages() { |
|
if (_listeningSignalingMessages) { |
|
return; |
|
} |
|
_listeningSignalingMessages = true; |
|
_pullSignalingMessages().listen((final messages) async { |
|
for (final message in messages) { |
|
if (!_listeningSignalingMessages) { |
|
return; |
|
} |
|
if (message.signalingSessions != null) { |
|
final users = message.signalingSessions!.data.where( |
|
(final user) => |
|
spreed.ParticipantInCallFlag.values.byBinary(user.inCall).contains(spreed.ParticipantInCallFlag.inCall), |
|
); |
|
|
|
final currentParticipants = remoteParticipants.valueOrNull ?? []; |
|
final updatedParticipants = <SpreedRemoteCallParticipant>[]; |
|
|
|
for (final currentParticipant in currentParticipants) { |
|
if (users.where((final user) => user.userId == currentParticipant.userID).isNotEmpty) { |
|
updatedParticipants.add(currentParticipant); |
|
} else { |
|
currentParticipant.dispose(); |
|
} |
|
} |
|
|
|
for (final user in users) { |
|
if (currentParticipants |
|
.where((final currentParticipant) => user.userId == currentParticipant.userID) |
|
.isEmpty && |
|
user.sessionId != _sessionID) { |
|
final remoteParticipant = SpreedRemoteCallParticipant( |
|
user.userId, |
|
user.sessionId, |
|
null, |
|
null, |
|
null, |
|
null, |
|
); |
|
if (_isWeakerParticipant(remoteParticipant)) { |
|
await _sendOffer(remoteParticipant); |
|
} |
|
updatedParticipants.add(remoteParticipant); |
|
} |
|
} |
|
remoteParticipants.add(updatedParticipants); |
|
|
|
continue; |
|
} |
|
|
|
if (message.signalingMessageWrapper != null) { |
|
final signalingMessage = message.signalingMessageWrapper!.data.content; |
|
|
|
if (signalingMessage.signalingSessionDescriptionMessage != null) { |
|
final remoteSDP = signalingMessage.signalingSessionDescriptionMessage!; |
|
|
|
await _updateRemoteParticipant(remoteSDP.from, (final remoteParticipant) async { |
|
switch (remoteSDP.payload.type) { |
|
case spreed.SignalingSessionDescriptionMessage_Payload_Type.offer: |
|
debugPrint('Received offer from ${remoteParticipant.userID} ${remoteParticipant.sessionID}'); |
|
final connection = await _setupConnection(remoteParticipant); |
|
await connection.setRemoteDescription( |
|
RTCSessionDescription( |
|
remoteSDP.payload.sdp, |
|
'offer', |
|
), |
|
); |
|
final localSDP = await connection.createAnswer(); |
|
await connection.setLocalDescription(localSDP); |
|
await _sendSignalingMessages([ |
|
spreed.SignalingMessage( |
|
(final b) => b |
|
..signalingSessionDescriptionMessage = spreed.SignalingSessionDescriptionMessage( |
|
(final b) => b |
|
..from = _sessionID |
|
..to = remoteParticipant.sessionID |
|
..type = spreed.SignalingMessageType.answer |
|
..payload = spreed.SignalingSessionDescriptionMessage_Payload( |
|
(final b) => b |
|
..type = spreed.SignalingSessionDescriptionMessage_Payload_Type.answer |
|
..sdp = localSDP.sdp |
|
..nick = '', |
|
).toBuilder(), |
|
).toBuilder(), |
|
), |
|
..._generateMuteMessages([remoteParticipant]), |
|
]); |
|
case spreed.SignalingSessionDescriptionMessage_Payload_Type.answer: |
|
debugPrint('Received answer from ${remoteParticipant.userID} ${remoteParticipant.sessionID}'); |
|
} |
|
|
|
return remoteParticipant; |
|
}); |
|
|
|
continue; |
|
} |
|
|
|
if (signalingMessage.signalingICECandidateMessage != null) { |
|
final iceCandidateMessage = signalingMessage.signalingICECandidateMessage!; |
|
final remoteParticipant = _getRemoteParticipant(iceCandidateMessage.from); |
|
if (remoteParticipant == null) { |
|
continue; |
|
} |
|
|
|
if (iceCandidateMessage.payload.candidate.candidate.isEmpty) { |
|
// TODO: Handle end-of-candidates properly |
|
continue; |
|
} |
|
|
|
await remoteParticipant.addCandidate( |
|
RTCIceCandidate( |
|
iceCandidateMessage.payload.candidate.candidate, |
|
iceCandidateMessage.payload.candidate.sdpMid, |
|
iceCandidateMessage.payload.candidate.sdpMLineIndex, |
|
), |
|
); |
|
|
|
continue; |
|
} |
|
|
|
if (signalingMessage.signalingMuteMessage != null) { |
|
final muteMessage = signalingMessage.signalingMuteMessage!; |
|
|
|
await _updateRemoteParticipant(muteMessage.from, (final remoteParticipant) async { |
|
final isUnmute = muteMessage.type == spreed.SignalingMessageType.unmute; |
|
switch (muteMessage.payload.name) { |
|
case spreed.SignalingMuteMessage_Payload_Name.audio: |
|
remoteParticipant.audioEnabled = isUnmute; |
|
case spreed.SignalingMuteMessage_Payload_Name.video: |
|
remoteParticipant.videoEnabled = isUnmute; |
|
} |
|
return remoteParticipant; |
|
}); |
|
|
|
continue; |
|
} |
|
} |
|
|
|
debugPrint('Unknown signaling message ${message.toJson()}'); |
|
} |
|
}); |
|
} |
|
} |
|
|
|
Future<RTCVideoRenderer> _getInitializedRenderer() async { |
|
final renderer = RTCVideoRenderer(); |
|
await renderer.initialize(); |
|
return renderer; |
|
}
|
|
|