Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prepare connection/region pinning. #574

Merged
merged 13 commits into from
Aug 23, 2024
13 changes: 9 additions & 4 deletions example/lib/pages/prejoin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ class _PreJoinPageState extends State<PreJoinPage> {

try {
//create new room
var cameraEncoding = VideoEncoding(
var cameraEncoding = const VideoEncoding(
maxBitrate: 5 * 1000 * 1000,
maxFramerate: 30,
);

var screenEncoding = VideoEncoding(
var screenEncoding = const VideoEncoding(
maxBitrate: 3 * 1000 * 1000,
maxFramerate: 15,
);
Expand All @@ -189,10 +189,10 @@ class _PreJoinPageState extends State<PreJoinPage> {
defaultAudioPublishOptions: const AudioPublishOptions(
name: 'custom_audio_track_name',
),
defaultCameraCaptureOptions: CameraCaptureOptions(
defaultCameraCaptureOptions: const CameraCaptureOptions(
maxFrameRate: 30,
params: VideoParameters(
dimensions: const VideoDimensions(1280, 720),
dimensions: VideoDimensions(1280, 720),
)),
defaultScreenShareCaptureOptions: const ScreenShareCaptureOptions(
useiOSBroadcastExtension: true,
Expand All @@ -214,6 +214,11 @@ class _PreJoinPageState extends State<PreJoinPage> {
// Create a Listener before connecting
final listener = room.createListener();

if (args.url.contains('.livekit.cloud') ||
args.url.contains('.livekit.run')) {
await room.prepareConnection(args.url, args.token);
}

// Try to connect to the room
// This will throw an Exception if it fails for any reason.
await room.connect(
Expand Down
2 changes: 1 addition & 1 deletion example/lib/widgets/controls.dart
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class _ControlsWidgetState extends State<ControlsWidget> {
const androidConfig = FlutterBackgroundAndroidConfig(
notificationTitle: 'Screen Sharing',
notificationText: 'LiveKit Example is sharing the screen.',
notificationImportance: AndroidNotificationImportance.Default,
notificationImportance: AndroidNotificationImportance.normal,
notificationIcon: AndroidResource(
name: 'livekit_ic_launcher', defType: 'mipmap'),
);
Expand Down
145 changes: 95 additions & 50 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/disposable.dart';
import '../support/region_url_provider.dart';
import '../support/websocket.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
Expand Down Expand Up @@ -130,6 +131,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

bool attemptingReconnect = false;

RegionUrlProvider? _regionUrlProvider;

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
Expand Down Expand Up @@ -171,6 +174,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
ConnectOptions? connectOptions,
RoomOptions? roomOptions,
FastConnectOptions? fastConnectOptions,
RegionUrlProvider? regionUrlProvider,
}) async {
this.url = url;
this.token = token;
Expand All @@ -179,6 +183,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
this.roomOptions = roomOptions ?? this.roomOptions;
this.fastConnectOptions = fastConnectOptions;

if (regionUrlProvider != null) {
_regionUrlProvider = regionUrlProvider;
}

try {
// wait for socket to connect rtc server
await signalClient.connect(
Expand All @@ -192,7 +200,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await _signalListener.waitFor<SignalJoinResponseEvent>(
duration: this.connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'Timed out waiting for SignalJoinResponseEvent'),
'Timed out waiting for SignalJoinResponseEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('Waiting for engine to connect...');
Expand Down Expand Up @@ -663,6 +672,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
));

clearReconnectTimeout();
if (token != null && _regionUrlProvider != null) {
// token may have been refreshed, we do not want to recreate the regionUrlProvider
// since the current engine may have inherited a regional url
_regionUrlProvider!.updateToken(token!);
}
logger.fine(
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
Expand Down Expand Up @@ -700,7 +714,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
duration: connectOptions.timeouts.connection * 10,
filter: (event) => !event.state.contains(ConnectivityResult.none),
onTimeout: () => throw ConnectException(
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent'),
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent',
reason: ConnectionErrorReason.Timeout),
);
}

Expand Down Expand Up @@ -756,7 +771,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await events.waitFor<SignalReconnectedEvent>(
duration: connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'resumeConnection: Timed out waiting for SignalReconnectedEvent'),
'resumeConnection: Timed out waiting for SignalReconnectedEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('resumeConnection: reason: ${reason.name}');
Expand Down Expand Up @@ -789,53 +805,65 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
Future<void> restartConnection([bool signalEvents = false]) async {
Future<void> restartConnection({String? regionUrl}) async {
if (_isClosed) {
return;
}

events.emit(const EngineFullRestartingEvent());
try {
events.emit(const EngineFullRestartingEvent());

if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}
if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}

await publisher?.dispose();
publisher = null;
await publisher?.dispose();
publisher = null;

await subscriber?.dispose();
subscriber = null;
await subscriber?.dispose();
subscriber = null;

_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;
_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;

await _signalListener.cancelAll();
await _signalListener.cancelAll();

_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();
_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();

await connect(
url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);

if (_hasPublished) {
await negotiate();
logger.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
await connect(
regionUrl ?? url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);
}

fullReconnectOnNext = false;

events.emit(const EngineRestartedEvent());
if (_hasPublished) {
await negotiate();
logger
.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
);
}
fullReconnectOnNext = false;
_regionUrlProvider?.resetAttempts();
events.emit(const EngineRestartedEvent());
} catch (error) {
final nextRegionUrl = await _regionUrlProvider?.getNextBestRegionUrl();
if (nextRegionUrl != null) {
await restartConnection(regionUrl: nextRegionUrl);
return;
} else {
// no more regions to try (or we're not on cloud)
_regionUrlProvider?.resetAttempts();
rethrow;
}
}
}

@internal
Expand Down Expand Up @@ -992,19 +1020,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
token = event.token;
})
..on<SignalLeaveEvent>((event) async {
if (event.canReconnect) {
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
} else {
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
if (event.regions != null && _regionUrlProvider != null) {
logger.fine('updating regions');
_regionUrlProvider?.setServerReportedRegions(event.regions!);
}
switch (event.action) {
case lk_rtc.LeaveRequest_Action.DISCONNECT:
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events
.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
break;
case lk_rtc.LeaveRequest_Action.RECONNECT:
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
break;
case lk_rtc.LeaveRequest_Action.RESUME:
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
default:
break;
}
});

Expand All @@ -1016,6 +1057,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
}
}

void setRegionUrlProvider(RegionUrlProvider provider) {
_regionUrlProvider = provider;
}
}

extension EnginePrivateMethods on Engine {
Expand Down
Loading
Loading