From d0093e7c2dc7c95084196eac9f0cea99d826a63c Mon Sep 17 00:00:00 2001 From: Alex Li Date: Tue, 31 Dec 2024 17:54:43 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=90=9B=20Implement=20multi-subscripti?= =?UTF-8?q?on=20and=20cancellation=20mechanism=20for=20peers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../relay_client/json_rpc_2/src/client.dart | 40 +++++++++---- .../lib/relay_client/json_rpc_2/src/peer.dart | 56 ++++++++++++++----- .../relay_client/json_rpc_2/src/server.dart | 56 ++++++++++++++----- 3 files changed, 114 insertions(+), 38 deletions(-) diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart index 70bde912..fd273775 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart @@ -17,6 +17,8 @@ import 'utils.dart'; /// [sendNotification] if no response is expected. class Client { final StreamChannel _channel; + late final _stream = _channel.stream.asBroadcastStream(); + final _channelSubscriptions = >{}; /// The next request id. var _id = 0; @@ -49,9 +51,11 @@ class Client { /// /// Note that the client won't begin listening to [responses] until /// [Client.listen] is called. - Client(StreamChannel channel) - : this.withoutJson( - jsonDocument.bind(channel).transformStream(ignoreFormatExceptions)); + factory Client(StreamChannel channel) { + return Client.withoutJson( + jsonDocument.bind(channel).transformStream(ignoreFormatExceptions), + ); + } /// Creates a [Client] that communicates using decoded messages over /// [channel]. @@ -80,13 +84,22 @@ class Client { /// /// [listen] may only be called once. Future listen() { - _channel.stream.listen(_handleResponse, onError: (error, stackTrace) { - _done.completeError(error, stackTrace); - _channel.sink.close(); - }, onDone: () { - if (!_done.isCompleted) _done.complete(); - close(); - }); + late final StreamSubscription subscription; + subscription = _stream.listen( + _handleResponse, + onError: (error, stackTrace) { + _done.completeError(error, stackTrace); + _channel.sink.close(); + }, + onDone: () { + if (!_done.isCompleted) { + _done.complete(); + } + subscription.cancel(); + _channelSubscriptions.remove(subscription); + close(); + }, + ); return done; } @@ -97,6 +110,13 @@ class Client { Future close() { _channel.sink.close(); if (!_done.isCompleted) _done.complete(); + Future.forEach( + _channelSubscriptions.toSet(), + (subscription) async { + _channelSubscriptions.remove(subscription); + await subscription.cancel(); + }, + ); return done; } diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart index b9fada4c..f7c98c88 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart @@ -18,6 +18,8 @@ import 'utils.dart'; /// communication channel and expects to connect to a peer that does the same. class Peer implements Client, Server { final StreamChannel _channel; + late final _stream = _channel.stream.asBroadcastStream(); + final _channelSubscriptions = >{}; /// The underlying client that handles request-sending and response-receiving /// logic. @@ -59,12 +61,17 @@ class Peer implements Client, Server { /// some requests which are not conformant with the JSON-RPC 2.0 /// specification. In particular, requests missing the `jsonrpc` parameter /// will be accepted. - Peer(StreamChannel channel, - {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) - : this.withoutJson( - jsonDocument.bind(channel).transform(respondToFormatExceptions), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks); + factory Peer( + StreamChannel channel, { + ErrorCallback? onUnhandledError, + bool strictProtocolChecks = true, + }) { + return Peer.withoutJson( + jsonDocument.bind(channel).transform(respondToFormatExceptions), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks, + ); + } /// Creates a [Peer] that communicates using decoded messages over [channel]. /// @@ -81,14 +88,22 @@ class Peer implements Client, Server { /// some requests which are not conformant with the JSON-RPC 2.0 /// specification. In particular, requests missing the `jsonrpc` parameter /// will be accepted. - Peer.withoutJson(this._channel, - {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) { + Peer.withoutJson( + this._channel, { + ErrorCallback? onUnhandledError, + bool strictProtocolChecks = true, + }) { _server = Server.withoutJson( - StreamChannel(_serverIncomingForwarder.stream, _channel.sink), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks); + StreamChannel( + _serverIncomingForwarder.stream, + _channel.sink, + ), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks, + ); _client = Client.withoutJson( - StreamChannel(_clientIncomingForwarder.stream, _channel.sink)); + StreamChannel(_clientIncomingForwarder.stream, _channel.sink), + ); } // Client methods. @@ -120,7 +135,8 @@ class Peer implements Client, Server { Future listen() { _client.listen(); _server.listen(); - _channel.stream.listen((message) { + late final StreamSubscription subscription; + subscription = _stream.listen((message) { if (message is Map) { if (message.containsKey('result') || message.containsKey('error')) { _clientIncomingForwarder.add(message); @@ -143,7 +159,12 @@ class Peer implements Client, Server { } }, onError: (error, stackTrace) { _serverIncomingForwarder.addError(error, stackTrace); - }, onDone: close); + }, onDone: () { + subscription.cancel(); + _channelSubscriptions.remove(subscription); + close(); + }); + _channelSubscriptions.add(subscription); return done; } @@ -151,6 +172,13 @@ class Peer implements Client, Server { Future close() { _client.close(); _server.close(); + Future.forEach( + _channelSubscriptions.toSet(), + (subscription) async { + _channelSubscriptions.remove(subscription); + await subscription.cancel(); + }, + ); return done; } } diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart index 7d8202e1..caa1ca7c 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart @@ -29,6 +29,8 @@ typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace); /// time, or even for a single method to be invoked multiple times at once. class Server { final StreamChannel _channel; + late final _stream = _channel.stream.asBroadcastStream(); + final _channelSubscriptions = >{}; /// The methods registered for this server. final _methods = {}; @@ -81,12 +83,17 @@ class Server { /// If [strictProtocolChecks] is false, this [Server] will accept some /// requests which are not conformant with the JSON-RPC 2.0 specification. In /// particular, requests missing the `jsonrpc` parameter will be accepted. - Server(StreamChannel channel, - {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) - : this.withoutJson( - jsonDocument.bind(channel).transform(respondToFormatExceptions), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks); + factory Server( + StreamChannel channel, { + ErrorCallback? onUnhandledError, + bool strictProtocolChecks = true, + }) { + return Server.withoutJson( + jsonDocument.bind(channel).transform(respondToFormatExceptions), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks, + ); + } /// Creates a [Server] that communicates using decoded messages over /// [channel]. @@ -103,8 +110,11 @@ class Server { /// If [strictProtocolChecks] is false, this [Server] will accept some /// requests which are not conformant with the JSON-RPC 2.0 specification. In /// particular, requests missing the `jsonrpc` parameter will be accepted. - Server.withoutJson(this._channel, - {this.onUnhandledError, this.strictProtocolChecks = true}); + Server.withoutJson( + this._channel, { + this.onUnhandledError, + this.strictProtocolChecks = true, + }); /// Starts listening to the underlying stream. /// @@ -113,12 +123,23 @@ class Server { /// /// [listen] may only be called once. Future listen() { - _channel.stream.listen(_handleRequest, onError: (error, stackTrace) { - _done.completeError(error, stackTrace); - _channel.sink.close(); - }, onDone: () { - if (!_done.isCompleted) _done.complete(); - }); + late final StreamSubscription subscription; + subscription = _stream.listen( + _handleRequest, + onError: (error, stackTrace) { + _done.completeError(error, stackTrace); + _channel.sink.close(); + }, + onDone: () { + if (!_done.isCompleted) { + _done.complete(); + } + subscription.cancel(); + _channelSubscriptions.remove(subscription); + close(); + }, + ); + _channelSubscriptions.add(subscription); return done; } @@ -129,6 +150,13 @@ class Server { Future close() { _channel.sink.close(); if (!_done.isCompleted) _done.complete(); + Future.forEach( + _channelSubscriptions.toSet(), + (subscription) async { + _channelSubscriptions.remove(subscription); + await subscription.cancel(); + }, + ); return done; } From d1f6bcf282572b59cef81d605fd502bf5dcb82bd Mon Sep 17 00:00:00 2001 From: Alex Li Date: Tue, 31 Dec 2024 17:54:53 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=85=20Add=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/reown_core/test/relay_client_test.dart | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/reown_core/test/relay_client_test.dart b/packages/reown_core/test/relay_client_test.dart index 77ad4e63..a839ab28 100644 --- a/packages/reown_core/test/relay_client_test.dart +++ b/packages/reown_core/test/relay_client_test.dart @@ -1,5 +1,6 @@ // ignore: library_annotations @Timeout(Duration(seconds: 45)) +library; import 'dart:async'; @@ -309,6 +310,15 @@ void main() { // expect(counterA, 1); // expect(counterB, 1); // }); + + test('Does not throws when calling listen() multiple times', () async { + await Future.wait([ + coreA.relayClient.init(), + coreA.relayClient.init(), + coreB.relayClient.init(), + coreB.relayClient.init(), + ]); + }); }); }); }