Skip to content

Commit

Permalink
update.
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwebrtc committed Aug 15, 2024
1 parent faf5c68 commit a1d7da0
Show file tree
Hide file tree
Showing 10 changed files with 987 additions and 133 deletions.
136 changes: 89 additions & 47 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/disposable.dart';
import '../support/region_url_provider.dart';
import '../support/websocket.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
Expand Down Expand Up @@ -130,6 +131,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

bool attemptingReconnect = false;

RegionUrlProvider? _regionUrlProvider;

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
Expand Down Expand Up @@ -171,6 +174,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
ConnectOptions? connectOptions,
RoomOptions? roomOptions,
FastConnectOptions? fastConnectOptions,
RegionUrlProvider? regionUrlProvider,
}) async {
this.url = url;
this.token = token;
Expand All @@ -179,6 +183,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
this.roomOptions = roomOptions ?? this.roomOptions;
this.fastConnectOptions = fastConnectOptions;

if (regionUrlProvider != null) {
_regionUrlProvider = regionUrlProvider;
}

try {
// wait for socket to connect rtc server
await signalClient.connect(
Expand Down Expand Up @@ -663,6 +671,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
));

clearReconnectTimeout();
if (token != null && _regionUrlProvider != null) {
// token may have been refreshed, we do not want to recreate the regionUrlProvider
// since the current engine may have inherited a regional url
_regionUrlProvider!.updateToken(token!);
}
logger.fine(
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
Expand Down Expand Up @@ -789,53 +802,65 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
Future<void> restartConnection([bool signalEvents = false]) async {
Future<void> restartConnection({String? regionUrl}) async {
if (_isClosed) {
return;
}

events.emit(const EngineFullRestartingEvent());
try {
events.emit(const EngineFullRestartingEvent());

if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}
if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}

await publisher?.dispose();
publisher = null;
await publisher?.dispose();
publisher = null;

await subscriber?.dispose();
subscriber = null;
await subscriber?.dispose();
subscriber = null;

_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;
_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;

await _signalListener.cancelAll();
await _signalListener.cancelAll();

_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();
_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();

await connect(
url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);

if (_hasPublished) {
await negotiate();
logger.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
await connect(
regionUrl ?? url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);
}

fullReconnectOnNext = false;

events.emit(const EngineRestartedEvent());
if (_hasPublished) {
await negotiate();
logger
.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
);
}
fullReconnectOnNext = false;
_regionUrlProvider?.resetAttempts();
events.emit(const EngineRestartedEvent());
} catch (error) {
final nextRegionUrl = await _regionUrlProvider?.getNextBestRegionUrl();
if (nextRegionUrl != null) {
await restartConnection(regionUrl: nextRegionUrl);
return;
} else {
// no more regions to try (or we're not on cloud)
_regionUrlProvider?.resetAttempts();
rethrow;
}
}
}

@internal
Expand Down Expand Up @@ -992,19 +1017,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
token = event.token;
})
..on<SignalLeaveEvent>((event) async {
if (event.canReconnect) {
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
} else {
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
if (event.regions != null && _regionUrlProvider != null) {
logger.fine('updating regions');
_regionUrlProvider?.setServerReportedRegions(event.regions!);
}
switch (event.action) {
case lk_rtc.LeaveRequest_Action.DISCONNECT:
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events
.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
break;
case lk_rtc.LeaveRequest_Action.RECONNECT:
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
break;
case lk_rtc.LeaveRequest_Action.RESUME:
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
default:
break;
}
});

Expand All @@ -1016,6 +1054,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
}
}

void setRegionUrlProvider(RegionUrlProvider provider) {
_regionUrlProvider = provider;
}
}

extension EnginePrivateMethods on Engine {
Expand Down
1 change: 1 addition & 0 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
connectOptions: connectOptions,
roomOptions: roomOptions,
fastConnectOptions: fastConnectOptions,
regionUrlProvider: _regionUrlProvider,
);
}

Expand Down
3 changes: 1 addition & 2 deletions lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
));
break;
case lk_rtc.SignalResponse_Message.leave:
events.emit(SignalLeaveEvent(
canReconnect: msg.leave.canReconnect, reason: msg.leave.reason));
events.emit(SignalLeaveEvent(request: msg.leave));
break;
case lk_rtc.SignalResponse_Message.mute:
events.emit(SignalRemoteMuteTrackEvent(
Expand Down
11 changes: 7 additions & 4 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,14 @@ class EngineActiveSpeakersUpdateEvent with EngineEvent, InternalEvent {

@internal
class SignalLeaveEvent with SignalEvent, InternalEvent {
final bool canReconnect;
final lk_models.DisconnectReason reason;
bool get canReconnect => request.canReconnect;
lk_rtc.LeaveRequest_Action get action => request.action;
lk_models.DisconnectReason get reason => request.reason;
lk_rtc.RegionSettings? get regions =>
request.hasReason() ? request.regions : null;
final lk_rtc.LeaveRequest request;
const SignalLeaveEvent({
required this.canReconnect,
required this.reason,
required this.request,
});
}

Expand Down
Loading

0 comments on commit a1d7da0

Please sign in to comment.