Skip to content

Commit

Permalink
Merge pull request #67 from AlexV525/fix/channel-stream-subscription
Browse files Browse the repository at this point in the history
🐛 Implement peer subscription disposing mechanism
  • Loading branch information
quetool authored Jan 27, 2025
2 parents 046033c + d1f6bcf commit 0752389
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 38 deletions.
40 changes: 30 additions & 10 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import 'utils.dart';
/// [sendNotification] if no response is expected.
class Client {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The next request id.
var _id = 0;
Expand Down Expand Up @@ -49,9 +51,11 @@ class Client {
///
/// Note that the client won't begin listening to [responses] until
/// [Client.listen] is called.
Client(StreamChannel<String> channel)
: this.withoutJson(
jsonDocument.bind(channel).transformStream(ignoreFormatExceptions));
factory Client(StreamChannel<String> channel) {
return Client.withoutJson(
jsonDocument.bind(channel).transformStream(ignoreFormatExceptions),
);
}

/// Creates a [Client] that communicates using decoded messages over
/// [channel].
Expand Down Expand Up @@ -80,13 +84,22 @@ class Client {
///
/// [listen] may only be called once.
Future listen() {
_channel.stream.listen(_handleResponse, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
close();
});
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen(
_handleResponse,
onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
},
onDone: () {
if (!_done.isCompleted) {
_done.complete();
}
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
},
);
return done;
}

Expand All @@ -97,6 +110,13 @@ class Client {
Future close() {
_channel.sink.close();
if (!_done.isCompleted) _done.complete();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}

Expand Down
56 changes: 42 additions & 14 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import 'utils.dart';
/// communication channel and expects to connect to a peer that does the same.
class Peer implements Client, Server {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The underlying client that handles request-sending and response-receiving
/// logic.
Expand Down Expand Up @@ -59,12 +61,17 @@ class Peer implements Client, Server {
/// some requests which are not conformant with the JSON-RPC 2.0
/// specification. In particular, requests missing the `jsonrpc` parameter
/// will be accepted.
Peer(StreamChannel<String> channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true})
: this.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);
factory Peer(
StreamChannel<String> channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
return Peer.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
}

/// Creates a [Peer] that communicates using decoded messages over [channel].
///
Expand All @@ -81,14 +88,22 @@ class Peer implements Client, Server {
/// some requests which are not conformant with the JSON-RPC 2.0
/// specification. In particular, requests missing the `jsonrpc` parameter
/// will be accepted.
Peer.withoutJson(this._channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) {
Peer.withoutJson(
this._channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
_server = Server.withoutJson(
StreamChannel(_serverIncomingForwarder.stream, _channel.sink),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);
StreamChannel(
_serverIncomingForwarder.stream,
_channel.sink,
),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
_client = Client.withoutJson(
StreamChannel(_clientIncomingForwarder.stream, _channel.sink));
StreamChannel(_clientIncomingForwarder.stream, _channel.sink),
);
}

// Client methods.
Expand Down Expand Up @@ -120,7 +135,8 @@ class Peer implements Client, Server {
Future listen() {
_client.listen();
_server.listen();
_channel.stream.listen((message) {
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen((message) {
if (message is Map) {
if (message.containsKey('result') || message.containsKey('error')) {
_clientIncomingForwarder.add(message);
Expand All @@ -143,14 +159,26 @@ class Peer implements Client, Server {
}
}, onError: (error, stackTrace) {
_serverIncomingForwarder.addError(error, stackTrace);
}, onDone: close);
}, onDone: () {
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
});
_channelSubscriptions.add(subscription);
return done;
}

@override
Future close() {
_client.close();
_server.close();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}
}
56 changes: 42 additions & 14 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace);
/// time, or even for a single method to be invoked multiple times at once.
class Server {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The methods registered for this server.
final _methods = <String, Function>{};
Expand Down Expand Up @@ -81,12 +83,17 @@ class Server {
/// If [strictProtocolChecks] is false, this [Server] will accept some
/// requests which are not conformant with the JSON-RPC 2.0 specification. In
/// particular, requests missing the `jsonrpc` parameter will be accepted.
Server(StreamChannel<String> channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true})
: this.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);
factory Server(
StreamChannel<String> channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
return Server.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
}

/// Creates a [Server] that communicates using decoded messages over
/// [channel].
Expand All @@ -103,8 +110,11 @@ class Server {
/// If [strictProtocolChecks] is false, this [Server] will accept some
/// requests which are not conformant with the JSON-RPC 2.0 specification. In
/// particular, requests missing the `jsonrpc` parameter will be accepted.
Server.withoutJson(this._channel,
{this.onUnhandledError, this.strictProtocolChecks = true});
Server.withoutJson(
this._channel, {
this.onUnhandledError,
this.strictProtocolChecks = true,
});

/// Starts listening to the underlying stream.
///
Expand All @@ -113,12 +123,23 @@ class Server {
///
/// [listen] may only be called once.
Future listen() {
_channel.stream.listen(_handleRequest, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
});
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen(
_handleRequest,
onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
},
onDone: () {
if (!_done.isCompleted) {
_done.complete();
}
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
},
);
_channelSubscriptions.add(subscription);
return done;
}

Expand All @@ -129,6 +150,13 @@ class Server {
Future close() {
_channel.sink.close();
if (!_done.isCompleted) _done.complete();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}

Expand Down
10 changes: 10 additions & 0 deletions packages/reown_core/test/relay_client_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// ignore: library_annotations
@Timeout(Duration(seconds: 45))
library;

import 'dart:async';

Expand Down Expand Up @@ -309,6 +310,15 @@ void main() {
// expect(counterA, 1);
// expect(counterB, 1);
// });

test('Does not throws when calling listen() multiple times', () async {
await Future.wait([
coreA.relayClient.init(),
coreA.relayClient.init(),
coreB.relayClient.init(),
coreB.relayClient.init(),
]);
});
});
});
}

0 comments on commit 0752389

Please sign in to comment.